veecle_os_runtime/
actor.rs

1//! Smallest unit of work within a runtime instance.
2use core::convert::Infallible;
3use core::pin::Pin;
4
5#[doc(inline)]
6pub use veecle_os_runtime_macros::actor;
7
8use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
9use crate::datastore::{Slot, generational};
10
11mod sealed {
12    pub trait Sealed {}
13}
14
15/// Actor interface.
16///
17/// The [`Actor`] trait allows writing actors that communicate within a runtime.
18/// It allows to define an initial context, which will be available for the whole life of the actor;
19/// a constructor method, with all the [`StoreRequest`] types it needs to communicate with other actors;
20/// and also the [`Actor::run`] method.
21///
22/// # Usage
23///
24/// Add the `Actor` implementing types to the actor list in [`veecle_os::runtime::execute!`](crate::execute!) when
25/// constructing a runtime instance.
26///
27/// The [`Actor::run`] method implements the actor's event loop.
28/// To yield back to the executor, every event loop must contain at least one `await`.
29/// Otherwise, the endless loop of the actor will block the executor and other actors.
30///
31/// ## Macros
32///
33/// The [`actor`][macro@crate::actor::actor] attribute macro can be used to implement actors.
34/// The function the macro is applied to is converted into the event loop.
35/// See its documentation for more details.
36///
37/// ### Example
38///
39/// ```rust
40/// # use std::convert::Infallible;
41/// # use std::fmt::Debug;
42/// #
43/// # use veecle_os_runtime::{Storable, Reader, Writer};
44/// #
45/// # #[derive(Debug, Default, Storable)]
46/// # pub struct Foo;
47/// #
48/// # #[derive(Debug, Default, Storable)]
49/// # pub struct Bar;
50/// #
51/// # pub struct Ctx;
52///
53/// #[veecle_os_runtime::actor]
54/// async fn my_actor(
55///     reader: Reader<'_, Foo>,
56///     writer: Writer<'_, Bar>,
57///     #[init_context] ctx: Ctx,
58/// ) -> Infallible {
59///     loop {
60///         // Do something here.
61///     }
62/// }
63/// ```
64///
65/// This will create a new struct called `MyActor` which implements [`Actor`], letting you register it into a runtime.
66///
67/// ## Manual
68///
69/// For cases where the macro is not sufficient, the [`Actor`] trait can also be implemented manually:
70///
71/// ```rust
72/// # use std::convert::Infallible;
73/// # use std::fmt::Debug;
74/// #
75/// # use veecle_os_runtime::{Storable, Reader, Writer, Actor};
76/// #
77/// # #[derive(Debug, Default, Storable)]
78/// # pub struct Foo;
79/// #
80/// # #[derive(Debug, Default, Storable)]
81/// # pub struct Bar;
82/// #
83/// # pub struct Ctx;
84///
85/// struct MyActor<'a> {
86///     reader: Reader<'a, Foo>,
87///     writer: Writer<'a, Bar>,
88///     context: Ctx,
89/// }
90///
91/// impl<'a> Actor<'a> for MyActor<'a> {
92///     type StoreRequest = (Reader<'a, Foo>, Writer<'a, Bar>);
93///     type InitContext = Ctx;
94///     type Error = Infallible;
95///
96///     fn new((reader, writer): Self::StoreRequest, context: Self::InitContext) -> Self {
97///         Self {
98///             reader,
99///             writer,
100///             context,
101///         }
102///     }
103///
104///     async fn run(mut self) -> Result<Infallible, Self::Error> {
105///         loop {
106///             // Do something here.
107///         }
108///     }
109/// }
110/// ```
111pub trait Actor<'a> {
112    /// [`Reader`]s and [`Writer`]s this actor requires.
113    type StoreRequest: StoreRequest<'a>;
114
115    /// Context that needs to be passed to the actor at initialisation.
116    type InitContext;
117
118    /// Error that this actor might return while running.
119    ///
120    /// This error is treated as fatal, if any actor returns an error the whole runtime will shutdown.
121    type Error: core::error::Error;
122
123    /// Creates a new instance of the struct implementing [`Actor`].
124    ///
125    /// See the [crate documentation][crate] for examples.
126    fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
127
128    /// Runs the [`Actor`] event loop.
129    ///
130    /// See the [crate documentation][crate] for examples.
131    fn run(
132        self,
133    ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
134}
135
136/// Allows requesting a (nearly) arbitrary amount of [`Reader`]s and [`Writer`]s in an [`Actor`].
137///
138/// This trait is not intended for direct usage by users.
139// Developer notes: This works by using type inference via `Datastore::reader` etc. to request `Reader`s etc. from the
140// `Datastore`.
141pub trait StoreRequest<'a>: sealed::Sealed {
142    /// Requests an instance of `Self` from the [`Datastore`].
143    #[doc(hidden)]
144    #[allow(async_fn_in_trait)] // It's actually private so it's fine.
145    async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
146}
147
148impl sealed::Sealed for () {}
149
150/// Internal trait to abstract out type-erased and concrete data stores.
151pub trait Datastore {
152    /// Returns a generational source tracking the global datastore generation.
153    ///
154    /// This is used to ensure that every reader has had (or will have) a chance to read a value before a writer may
155    /// overwrite it.
156    fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
157
158    #[expect(rustdoc::private_intra_doc_links)] // `rustdoc` is buggy with links from "pub" but unreachable types.
159    /// Returns a reference to the slot for a specific type.
160    ///
161    /// # Panics
162    ///
163    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
164    #[expect(private_interfaces)] // The methods are internal.
165    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
166    where
167        T: Storable + 'static;
168}
169
170impl<S> Datastore for Pin<&S>
171where
172    S: Datastore,
173{
174    fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
175        Pin::into_inner(self).source()
176    }
177
178    #[expect(private_interfaces)] // The methods are internal.
179    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
180    where
181        T: Storable + 'static,
182    {
183        Pin::into_inner(self).slot()
184    }
185}
186
187pub(crate) trait DatastoreExt<'a>: Copy {
188    #[cfg(test)]
189    /// Increments the global datastore generation.
190    ///
191    /// Asserts that every reader has had (or will have) a chance to read a value before a writer may overwrite it.
192    fn increment_generation(self);
193
194    /// Returns the [`Reader`] for a specific slot.
195    ///
196    /// # Panics
197    ///
198    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
199    fn reader<T>(self) -> Reader<'a, T>
200    where
201        T: Storable + 'static;
202
203    /// Returns the [`ExclusiveReader`] for a specific slot.
204    ///
205    /// Exclusivity of the reader is not guaranteed by this method and must be ensured via other means (e.g.
206    /// [`crate::execute::validate_actors`]).
207    ///
208    /// # Panics
209    ///
210    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
211    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
212    where
213        T: Storable + 'static;
214
215    /// Returns the [`Writer`] for a specific slot.
216    ///
217    /// # Panics
218    ///
219    /// * If the [`Writer`] for this slot has already been acquired.
220    ///
221    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
222    fn writer<T>(self) -> Writer<'a, T>
223    where
224        T: Storable + 'static;
225}
226
227impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
228where
229    S: Datastore,
230{
231    #[cfg(test)]
232    fn increment_generation(self) {
233        self.source().increment_generation()
234    }
235
236    fn reader<T>(self) -> Reader<'a, T>
237    where
238        T: Storable + 'static,
239    {
240        Reader::from_slot(self.slot::<T>())
241    }
242
243    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
244    where
245        T: Storable + 'static,
246    {
247        ExclusiveReader::from_slot(self.slot::<T>())
248    }
249
250    fn writer<T>(self) -> Writer<'a, T>
251    where
252        T: Storable + 'static,
253    {
254        Writer::new(self.source().waiter(), self.slot::<T>())
255    }
256}
257
258/// Implements a no-op for Actors that do not read or write any values.
259impl<'a> StoreRequest<'a> for () {
260    async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
261}
262
263impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
264
265impl<'a, T> StoreRequest<'a> for Reader<'a, T>
266where
267    T: Storable + 'static,
268{
269    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
270        datastore.reader()
271    }
272}
273
274impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
275
276impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
277where
278    T: Storable + 'static,
279{
280    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
281        datastore.exclusive_reader()
282    }
283}
284
285impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
286
287impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
288where
289    T: Storable + 'static,
290{
291    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
292        Reader::from_slot(datastore.slot()).wait_init().await
293    }
294}
295
296impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
297
298impl<'a, T> StoreRequest<'a> for Writer<'a, T>
299where
300    T: Storable + 'static,
301{
302    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
303        datastore.writer()
304    }
305}
306
307/// Implements [`StoreRequest`] for provided types.
308macro_rules! impl_request_helper {
309    ($t:ident) => {
310        #[cfg_attr(docsrs, doc(fake_variadic))]
311        /// This trait is implemented for tuples up to seven items long.
312        impl<'a, $t> sealed::Sealed for ($t,) { }
313
314        #[cfg_attr(docsrs, doc(fake_variadic))]
315        /// This trait is implemented for tuples up to seven items long.
316        impl<'a, $t> StoreRequest<'a> for ($t,)
317        where
318            $t: StoreRequest<'a>,
319        {
320            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
321                (<$t as StoreRequest>::request(datastore).await,)
322            }
323        }
324    };
325
326    (@impl $($t:ident)*) => {
327        #[cfg_attr(docsrs, doc(hidden))]
328        impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
329        where
330            $($t: sealed::Sealed),*
331        { }
332
333        #[cfg_attr(docsrs, doc(hidden))]
334        impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
335        where
336            $($t: StoreRequest<'a>),*
337        {
338            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
339                // join! is necessary here to avoid argument-order-dependence with the #[actor] macro.
340                // This ensures that any `InitializedReaders` in self correctly track the generation at which they were
341                // first ready, so that the first `wait_for_update` sees the value that caused them to become
342                // initialized.
343                // See `multi_request_order_independence` for the verification of this.
344                futures::join!($( <$t as StoreRequest>::request(datastore), )*)
345            }
346        }
347    };
348
349    ($head:ident $($rest:ident)*) => {
350        impl_request_helper!(@impl $head $($rest)*);
351        impl_request_helper!($($rest)*);
352    };
353}
354
355impl_request_helper!(Z Y X W V U T);
356
357/// Macro helper to allow actors to return either a [`Result`] type or [`Infallible`] (and eventually [`!`]).
358#[diagnostic::on_unimplemented(
359    message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
360    label = "not a valid actor return type"
361)]
362pub trait IsActorResult: sealed::Sealed {
363    /// The error type this result converts into.
364    type Error;
365
366    /// Convert the result into an actual [`Result`] value.
367    fn into_result(self) -> Result<Infallible, Self::Error>;
368}
369
370impl<E> sealed::Sealed for Result<Infallible, E> {}
371
372impl<E> IsActorResult for Result<Infallible, E> {
373    type Error = E;
374
375    fn into_result(self) -> Result<Infallible, E> {
376        self
377    }
378}
379
380impl sealed::Sealed for Infallible {}
381
382impl IsActorResult for Infallible {
383    type Error = Infallible;
384
385    fn into_result(self) -> Result<Infallible, Self::Error> {
386        match self {}
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use core::future::Future;
393    use core::pin::pin;
394    use core::task::{Context, Poll};
395
396    use futures::future::FutureExt;
397
398    use crate::actor::{DatastoreExt, StoreRequest};
399    use crate::cons::{Cons, Nil};
400    use crate::datastore::{InitializedReader, Storable};
401
402    #[test]
403    fn multi_request_order_independence() {
404        #[derive(Debug, Storable)]
405        #[storable(crate = crate)]
406        struct A;
407
408        #[derive(Debug, Storable)]
409        #[storable(crate = crate)]
410        struct B;
411
412        let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
413
414        let mut a_writer = datastore.as_ref().writer::<A>();
415        let mut b_writer = datastore.as_ref().writer::<B>();
416
417        // No matter the order these two request the readers, they should both resolve during the generation where the
418        // later of the two is first written.
419        let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
420            datastore.as_ref()
421        ));
422        let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
423            datastore.as_ref()
424        ));
425
426        let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
427        let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
428
429        let mut request_1_context = Context::from_waker(&request_1_waker);
430        let mut request_2_context = Context::from_waker(&request_2_waker);
431
432        assert!(matches!(
433            request_1.as_mut().poll(&mut request_1_context),
434            Poll::Pending
435        ));
436        assert!(matches!(
437            request_2.as_mut().poll(&mut request_2_context),
438            Poll::Pending
439        ));
440
441        let old_request_1_wake_count = request_1_wake_count.get();
442        let old_request_2_wake_count = request_2_wake_count.get();
443
444        datastore.as_ref().increment_generation();
445
446        a_writer.write(A).now_or_never().unwrap();
447
448        // When the first value is written, each future may or may not wake up, but if they do we need to poll them.
449        if request_1_wake_count.get() > old_request_1_wake_count {
450            assert!(matches!(
451                request_1.as_mut().poll(&mut request_1_context),
452                Poll::Pending
453            ));
454        }
455        if request_2_wake_count.get() > old_request_2_wake_count {
456            assert!(matches!(
457                request_2.as_mut().poll(&mut request_2_context),
458                Poll::Pending
459            ));
460        }
461
462        let old_request_1_wake_count = request_1_wake_count.get();
463        let old_request_2_wake_count = request_2_wake_count.get();
464
465        datastore.as_ref().increment_generation();
466
467        b_writer.write(B).now_or_never().unwrap();
468
469        // When the second value is written, both futures _must_ wake up and complete.
470        assert!(request_1_wake_count.get() > old_request_1_wake_count);
471        assert!(request_2_wake_count.get() > old_request_2_wake_count);
472
473        let Poll::Ready((mut request_1_a, mut request_1_b)) =
474            request_1.as_mut().poll(&mut request_1_context)
475        else {
476            panic!("request 1 was not ready")
477        };
478
479        let Poll::Ready((mut request_2_a, mut request_2_b)) =
480            request_2.as_mut().poll(&mut request_2_context)
481        else {
482            panic!("request 2 was not ready")
483        };
484
485        // All readers should see an update, since they've just been initialized but not `wait_for_update`d.
486        assert!(request_1_a.wait_for_update().now_or_never().is_some());
487        assert!(request_1_b.wait_for_update().now_or_never().is_some());
488
489        assert!(request_2_a.wait_for_update().now_or_never().is_some());
490        assert!(request_2_b.wait_for_update().now_or_never().is_some());
491    }
492}