veecle_os_runtime/
actor.rs1use core::convert::Infallible;
3use core::pin::Pin;
4
5#[doc(inline)]
6pub use veecle_os_runtime_macros::actor;
7
8use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
9use crate::datastore::{Slot, generational};
10
11mod sealed {
12 pub trait Sealed {}
13}
14
15pub trait Actor<'a> {
112 type StoreRequest: StoreRequest<'a>;
114
115 type InitContext;
117
118 type Error: core::error::Error;
122
123 fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
127
128 fn run(
132 self,
133 ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
134}
135
136pub trait StoreRequest<'a>: sealed::Sealed {
142 #[doc(hidden)]
144 #[allow(async_fn_in_trait)] async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
146}
147
148impl sealed::Sealed for () {}
149
150pub trait Datastore {
152 fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
157
158 #[expect(rustdoc::private_intra_doc_links)] #[expect(private_interfaces)] fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
166 where
167 T: Storable + 'static;
168}
169
170impl<S> Datastore for Pin<&S>
171where
172 S: Datastore,
173{
174 fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
175 Pin::into_inner(self).source()
176 }
177
178 #[expect(private_interfaces)] fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
180 where
181 T: Storable + 'static,
182 {
183 Pin::into_inner(self).slot()
184 }
185}
186
187pub(crate) trait DatastoreExt<'a>: Copy {
188 #[cfg(test)]
189 fn increment_generation(self);
193
194 fn reader<T>(self) -> Reader<'a, T>
200 where
201 T: Storable + 'static;
202
203 fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
212 where
213 T: Storable + 'static;
214
215 fn writer<T>(self) -> Writer<'a, T>
223 where
224 T: Storable + 'static;
225}
226
227impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
228where
229 S: Datastore,
230{
231 #[cfg(test)]
232 fn increment_generation(self) {
233 self.source().increment_generation()
234 }
235
236 fn reader<T>(self) -> Reader<'a, T>
237 where
238 T: Storable + 'static,
239 {
240 Reader::from_slot(self.slot::<T>())
241 }
242
243 fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
244 where
245 T: Storable + 'static,
246 {
247 ExclusiveReader::from_slot(self.slot::<T>())
248 }
249
250 fn writer<T>(self) -> Writer<'a, T>
251 where
252 T: Storable + 'static,
253 {
254 Writer::new(self.source().waiter(), self.slot::<T>())
255 }
256}
257
258impl<'a> StoreRequest<'a> for () {
260 async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
261}
262
263impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
264
265impl<'a, T> StoreRequest<'a> for Reader<'a, T>
266where
267 T: Storable + 'static,
268{
269 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
270 datastore.reader()
271 }
272}
273
274impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
275
276impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
277where
278 T: Storable + 'static,
279{
280 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
281 datastore.exclusive_reader()
282 }
283}
284
285impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
286
287impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
288where
289 T: Storable + 'static,
290{
291 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
292 Reader::from_slot(datastore.slot()).wait_init().await
293 }
294}
295
296impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
297
298impl<'a, T> StoreRequest<'a> for Writer<'a, T>
299where
300 T: Storable + 'static,
301{
302 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
303 datastore.writer()
304 }
305}
306
307macro_rules! impl_request_helper {
309 ($t:ident) => {
310 #[cfg_attr(docsrs, doc(fake_variadic))]
311 impl<'a, $t> sealed::Sealed for ($t,) { }
313
314 #[cfg_attr(docsrs, doc(fake_variadic))]
315 impl<'a, $t> StoreRequest<'a> for ($t,)
317 where
318 $t: StoreRequest<'a>,
319 {
320 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
321 (<$t as StoreRequest>::request(datastore).await,)
322 }
323 }
324 };
325
326 (@impl $($t:ident)*) => {
327 #[cfg_attr(docsrs, doc(hidden))]
328 impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
329 where
330 $($t: sealed::Sealed),*
331 { }
332
333 #[cfg_attr(docsrs, doc(hidden))]
334 impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
335 where
336 $($t: StoreRequest<'a>),*
337 {
338 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
339 futures::join!($( <$t as StoreRequest>::request(datastore), )*)
345 }
346 }
347 };
348
349 ($head:ident $($rest:ident)*) => {
350 impl_request_helper!(@impl $head $($rest)*);
351 impl_request_helper!($($rest)*);
352 };
353}
354
355impl_request_helper!(Z Y X W V U T);
356
357#[diagnostic::on_unimplemented(
359 message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
360 label = "not a valid actor return type"
361)]
362pub trait IsActorResult: sealed::Sealed {
363 type Error;
365
366 fn into_result(self) -> Result<Infallible, Self::Error>;
368}
369
370impl<E> sealed::Sealed for Result<Infallible, E> {}
371
372impl<E> IsActorResult for Result<Infallible, E> {
373 type Error = E;
374
375 fn into_result(self) -> Result<Infallible, E> {
376 self
377 }
378}
379
380impl sealed::Sealed for Infallible {}
381
382impl IsActorResult for Infallible {
383 type Error = Infallible;
384
385 fn into_result(self) -> Result<Infallible, Self::Error> {
386 match self {}
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use core::future::Future;
393 use core::pin::pin;
394 use core::task::{Context, Poll};
395
396 use futures::future::FutureExt;
397
398 use crate::actor::{DatastoreExt, StoreRequest};
399 use crate::cons::{Cons, Nil};
400 use crate::datastore::{InitializedReader, Storable};
401
402 #[test]
403 fn multi_request_order_independence() {
404 #[derive(Debug, Storable)]
405 #[storable(crate = crate)]
406 struct A;
407
408 #[derive(Debug, Storable)]
409 #[storable(crate = crate)]
410 struct B;
411
412 let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
413
414 let mut a_writer = datastore.as_ref().writer::<A>();
415 let mut b_writer = datastore.as_ref().writer::<B>();
416
417 let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
420 datastore.as_ref()
421 ));
422 let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
423 datastore.as_ref()
424 ));
425
426 let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
427 let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
428
429 let mut request_1_context = Context::from_waker(&request_1_waker);
430 let mut request_2_context = Context::from_waker(&request_2_waker);
431
432 assert!(matches!(
433 request_1.as_mut().poll(&mut request_1_context),
434 Poll::Pending
435 ));
436 assert!(matches!(
437 request_2.as_mut().poll(&mut request_2_context),
438 Poll::Pending
439 ));
440
441 let old_request_1_wake_count = request_1_wake_count.get();
442 let old_request_2_wake_count = request_2_wake_count.get();
443
444 datastore.as_ref().increment_generation();
445
446 a_writer.write(A).now_or_never().unwrap();
447
448 if request_1_wake_count.get() > old_request_1_wake_count {
450 assert!(matches!(
451 request_1.as_mut().poll(&mut request_1_context),
452 Poll::Pending
453 ));
454 }
455 if request_2_wake_count.get() > old_request_2_wake_count {
456 assert!(matches!(
457 request_2.as_mut().poll(&mut request_2_context),
458 Poll::Pending
459 ));
460 }
461
462 let old_request_1_wake_count = request_1_wake_count.get();
463 let old_request_2_wake_count = request_2_wake_count.get();
464
465 datastore.as_ref().increment_generation();
466
467 b_writer.write(B).now_or_never().unwrap();
468
469 assert!(request_1_wake_count.get() > old_request_1_wake_count);
471 assert!(request_2_wake_count.get() > old_request_2_wake_count);
472
473 let Poll::Ready((mut request_1_a, mut request_1_b)) =
474 request_1.as_mut().poll(&mut request_1_context)
475 else {
476 panic!("request 1 was not ready")
477 };
478
479 let Poll::Ready((mut request_2_a, mut request_2_b)) =
480 request_2.as_mut().poll(&mut request_2_context)
481 else {
482 panic!("request 2 was not ready")
483 };
484
485 assert!(request_1_a.wait_for_update().now_or_never().is_some());
487 assert!(request_1_b.wait_for_update().now_or_never().is_some());
488
489 assert!(request_2_a.wait_for_update().now_or_never().is_some());
490 assert!(request_2_b.wait_for_update().now_or_never().is_some());
491 }
492}