Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Flowgraph

A Flowgraph is a directed graph of blocks and connections. Blocks do the actual work; the flowgraph describes which stream ports and message ports are connected.

Stream connections carry sample streams between blocks. They must form a directed acyclic graph. Message connections carry PMTs between message handlers and can use arbitrary topologies.

Constructing Flowgraphs

Create an empty flowgraph with Flowgraph::new(), add blocks, and connect them. The usual way to build a flowgraph is the connect! macro. It adds blocks to the flowgraph if needed and wires their ports.

The simplest stream connection uses the default stream output and input port names:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = NullSource::<u8>::new();
let head = Head::<u8>::new(1024);
let snk = NullSink::<u8>::new();

connect!(fg, src > head > snk);

Named stream ports can be selected explicitly. Output ports are written after the source block, and input ports are written before the destination block:

connect!(fg, src.output > input.head > snk);

Message connections use | instead of >. This example connects the out message output of msg_source to the in message input of msg_copy, then forwards messages to msg_sink:

use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();

let msg_source = MessageSourceBuilder::new(Pmt::String("foo".to_string()), Duration::from_millis(100))
    .n_messages(20)
    .build();
let msg_copy = MessageCopy::new();
let msg_sink = MessageSink::new();

connect!(fg, msg_source | msg_copy | msg_sink);

Message ports can also be named explicitly:

connect!(fg, msg_source.out | r#in.msg_copy);

The r#in spelling is Rust’s raw-identifier syntax for a port named in.

Stream and message connections can be mixed in one macro invocation. Separate independent connections with semicolons:

connect!(fg,
    src > head > snk;
    msg_source | msg_copy | msg_sink;
);

Blocks can also be added and connected manually. This is what the macro is doing for the common cases: it stores blocks in the flowgraph, gets their port endpoints, and records stream or message edges.

use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();

let src = fg.add(VectorSource::<u32>::new(vec![1, 2, 3, 4]));
let snk = fg.add(VectorSink::<u32>::new(4));
fg.stream_dyn(src, "output", snk, "input")?;

let msg_source = fg.add(
    MessageSourceBuilder::new(Pmt::String("foo".to_string()),
            Duration::from_millis(100))
        .n_messages(20)
        .build(),
);
let msg_copy = fg.add(MessageCopy::new());
let msg_sink = fg.add(MessageSink::new());

fg.message(msg_source, "out", msg_copy, "in")?;
fg.message(msg_copy, "out", msg_sink, "in")?;

let fg = Runtime::new().run(fg)?;

Use connect! for normal application code. The explicit form is useful when block types are selected dynamically or when it helps to understand the lower-level API.

Accessing Blocks

When a block is added to a flowgraph, FutureSDR returns a BlockRef<T>. A block reference is a lightweight typed identifier. It is copyable, can be converted to a BlockId, and can be used to access the block while the flowgraph owns it.

The connect! macro also leaves you with block references for the blocks it added. After a blocking Runtime::run(), the finished Flowgraph is returned, so the same BlockRef can be used to inspect block state:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);

connect!(fg, src > snk);

let fg = Runtime::new().run(fg)?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3, 4]);

Similarly, block_mut() can be used to update block metadata or block state:

let mut fg = Flowgraph::new();
let snk = fg.add(VectorSink::<u32>::new(4));

fg.block_mut(&snk)?.set_instance_name("samples");

Use BlockRef::id() or convert a BlockRef into BlockId when a runtime interaction API needs an untyped block identifier.

Flowgraph Interactions

Runtime::run() is the simplest way to execute a flowgraph when you only need the result after it finishes. To interact with a flowgraph while it is running, start it with Runtime::start() on native targets or Runtime::start_async() in async code. Both return a RunningFlowgraph.

RunningFlowgraph can post messages, call message handlers, describe the running graph, stop it, and wait for completion.

The following example starts a flowgraph and continuously hops through a list of frequencies by posting Pmt::F64 values to a block’s freq message handler:

use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();
// set up the flowgraph

// `my_seify_source` is a source or sink block with a `freq` message input.
let radio = fg.add(my_seify_source);

let rt = Runtime::new();
let running = rt.start(fg)?;

Runtime::block_on(async move {
    let frequencies = [100.0e6, 101.0e6, 102.0e6];

    for freq in frequencies.iter().cycle() {
        running.post(radio, "freq", Pmt::F64(*freq)).await?;
        Timer::after(Duration::from_secs(1)).await;
    }
})?;

Waiting for completion is a separate operation. Use it when the flowgraph is expected to finish on its own, for example when a finite source reaches the end of its input:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);

connect!(fg, src > snk);

let rt = Runtime::new();
let running = rt.start(fg)?;

let fg = running.wait()?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3, 4]);

For flowgraphs that do not finish on their own, request shutdown before waiting:

Runtime::block_on(async move {
    running.stop().await?;
    let fg = running.wait_async().await?;
    Ok::<_, Error>(fg)
})?;

If multiple tasks need access to the same running flowgraph, keep a clonable handle:

let handle = running.handle();
Runtime::block_on(async move {
    handle.post(radio_id, "freq", Pmt::F64(100.0e6)).await
})?;