Model

A model is a network of clients connected to each other by pairs of output/input. The actor/client interfaces specify the tasks that are going to be executed and the topology of the network defines the order in which the tasks are executed.

A pair of output/input defines a unique channel with a sender and a receiver. The sender is given to the output and the receiver to the input.

Each actor performs the following task sequentially in a never ending inner loop and asynchronously with respect to the other actors :

  1. if any inputs, receive inputs & invoke client gmt_dos_actors::io::Read trait implementation on each input,
  2. update client state with the client gmt_dos_actors::io::Update trait implementation,
  3. if any outputs, invoke the client gmt_dos_actors::io::Write trait implementation on each output & send outputs.

The default behavior of an actor is to pause the inner loop both until all the inputs have been received and until all the outputs have been received by other actors.

Model

In the model above, the network topology imposes the following sequence of events (from top to bottom):

ABCD
1Update---
2Write::<AB,ABC>---
3-Read::<AB,ABC>Read::<ABC>-
4Update-Update-
5Write::<AB,ABC>-Write::<CB,CD>-
6-Read::<CB>-Read::<CD>
7-UpdateRead::<ABC>-
8-Write::<BD1,BD2>Update-
9--Write::<CB,CD>Read::<BD1,BD2>
10-Read::<AB,ABC,CB>-Update
11UpdateUpdate-Read::<CD>
12...

Note that events in the same row may occur simultaneously thanks to the asynchronous nature of the actors.

Laying out the network consists in building actors outputs and relaying them to other actors inputs. A pair of output and input must meet the following requirements:

  • the client of the output actor must implement the trait gmt_dos_actors::io::Write<I>
  • the client of the input actor must implement the trait gmt_dos_actors::io::Read<O>
  • I and O must be of the same type i.e. I=O
  • the ouput rate NO must be equal to the input rate NI (NO=NI)

A model will not compile in any of the above requirements is not met.

For client A that gives:

A.add_output().build::<AB>().into_input(&mut B);
A.add_output().multiplex(2).build::<ABC>()
   .into_input(&mut B)
   .into_input(&mut C);

As illustrated above, an output can be multiplexed and sent to multiple inputs.

Then for B:

B.add_output().build::<BD1>().into_input(&mut D);
B.add_output().build::<BD2>().into_input(&mut D);

and C :

C.add_output().build::<CB>().into_input(&mut B);
C.add_output().build::<CD>().into_input(&mut D);

Per default, an actor inner loop is blocked until all its outputs have been read by the associated inputs as a guarantee that the data has been effectively received.

This is not always necessary, particulary for a client that acts as a data sink like D. The links to D can be rewritten:

B.add_output().unbounded().build::<BD1>().into_input(&mut D);
B.add_output().unbounded().build::<BD2>().into_input(&mut D);
C.add_output().unbounded().build::<CD>().into_input(&mut D);

and the outputs BD1, BD2 and CD won't block their actors inner loop anymore.

Once all the connections have been set-up, we can assemble the actors into a model:

let mut model = Model::new(vec![Box::new(A),Box::new(B),Box::new(C),Box::new(D)]);

check the model for errors, run it and wait for it to finish:

model.check()?.run().await?;

Note that the run method cannot be invoked on an unchecked model. The methods that can be called upon Model depend on the value of the Model State generic type parameter. Some method performs a state transition giving access to other methods.

The model State table shows from which state a Model method is called and in which state the model is transitioned to:

MethodFromTo
Model::newUnknownUnknown
Model::checkUnknownReady
Model::runReadyRunning
Model::awaitRunningCompleted
Model::nameUnknownUnknown
Model::addUnknownUnknown
Model::flowchartUnknownUnknown
Model::flowchartReadyReady

A flow chart of the model can be obtained with the flowchart method. Each client is identified by either its type or the actor's name if one was given. Combining all the Model methods in a single call gives:

Model::new(vec![Box::new(A),
               Box::new(B),
               Box::new(C),
               Box::new(D)])
   .flowchart()
   .check()?
   .run()
   .await?;

The flowchart is written to the file integrated_model.dot.svg.

The boilerplate code used for model declaration:

Model::new(vec![Box::new(...),vec![Box::new(...),...])

can be advantageously replaced with the Rust macro model!, e.g.

model!(A,B,C,D)
   .flowchart()
   .check()?
   .run()
   .await?;

When an output detects that the data the client has written to its buffer is None, it closes the channel it belongs to and return an error to the actor that forces the actor to shut down.

When an actor shuts down its inputs and outputs close the channels they are part of. A closed channel also generates an error that is caught by the actors at both end of the channel and forces these actors to also shut down and so, by a domino effect, all the actors are terminated and the model gracefully comes to an end.

Working example

Building upon the example in the previous section, lets add 2 more clients:

  • a random generator
struct RandGen {
    data: Vec<i32>,
}
impl RandGen {
    pub fn new(n_sample: usize) -> Self {
        let mut data = vec![0i32; n_sample];
        let mut rng = WyRand::new();
        rng.fill(&mut data);
        Self { data }
    }
}
impl Update for RandGen {}
impl Write<In> for RandGen {
    fn write(&mut self) -> Option<Data<In>> {
        self.data.pop().map(|val| Data::new(val))
    }
}
  • a data logger
#[derive(Default)]
struct DataLogger {
    data: Vec<f32>,
}
impl Update for DataLogger {}
impl Read<Out> for DataLogger {
    fn read(&mut self, data: Data<Out>) {
        self.data.push(*data);
    }
}

With all the clients defined with an actor/client interface, the actors are instanciated with

    let mut source = Initiator::<_>::from(RandGen::new(1_000_000));
    let mut filter = Actor::<_>::from(Client::default());
    let mut log = Terminator::<_>::from(DataLogger::default());

Each actor requires 3 generic type parameters: the client type and 2 constants: the inputs and outputs sampling rates. The inputs rate is zero if the actor has no inputs and the outputs rate is zero if the actor has no outputs. The default sampling rate for inputs and outputs is 1.

The next step is to build the network. The links between actors are established by successively creating channels between an actor output and the input of another actor, both the output and the input must have been given the same type and the same sampling rate.

    source.add_output().build::<In>().into_input(&mut filter)?;
    filter
        .add_output()
        .unbounded()
        .build::<Out>()
        .into_input(&mut log)?;

Now the model can be assembled, charted, checked for errors and run:

    model!(source, filter, log)
        .flowchart()
        .check()?
        .run()
        .await?;

Integrated Model

The model stop itself when the data from the RandGen client is exhausted.

By setting a logger at the begining of the main application, insight into the behavior of the model can be gathered. For example, setting the env_logger crate with

    env_logger::builder()
        .format_timestamp(None)
        .format_target(false)
        .init();

and running the application with the RUST_LOG environment variable:

RUST_LOG=warn cargo run ... 

outputs the following:

Warning

Setting RUST_LOG to info instead gives

RUST_LOG=info cargo run ... 

Warning

and with

RUST_LOG=debug cargo run ... 

Warning

Debug information is displayed only for application run in debug mode.