Flowgraph
A Flowgraph is a directed graph of blocks and connections. Blocks do the actual work; the flowgraph describes which stream ports and message ports are connected.
Stream connections carry sample streams between blocks. They must form a directed acyclic graph. Message connections carry PMTs between message handlers and can use arbitrary topologies.
Constructing Flowgraphs
Create an empty flowgraph with Flowgraph::new(), add blocks, and connect them. The usual way to build a flowgraph is the connect! macro. It adds blocks to the flowgraph if needed and wires their ports.
The simplest stream connection uses the default stream output and input port names:
use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
let mut fg = Flowgraph::new();
let src = NullSource::<u8>::new();
let head = Head::<u8>::new(1024);
let snk = NullSink::<u8>::new();
connect!(fg, src > head > snk);
Named stream ports can be selected explicitly. Output ports are written after the source block, and input ports are written before the destination block:
connect!(fg, src.output > input.head > snk);
Message connections use | instead of >. This example connects the out message output of msg_source to the in message input of msg_copy, then forwards messages to msg_sink:
use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::prelude::*;
use std::time::Duration;
let mut fg = Flowgraph::new();
let msg_source = MessageSourceBuilder::new(Pmt::String("foo".to_string()), Duration::from_millis(100))
.n_messages(20)
.build();
let msg_copy = MessageCopy::new();
let msg_sink = MessageSink::new();
connect!(fg, msg_source | msg_copy | msg_sink);
Message ports can also be named explicitly:
connect!(fg, msg_source.out | r#in.msg_copy);
The r#in spelling is Rust’s raw-identifier syntax for a port named in.
Stream and message connections can be mixed in one macro invocation. Separate independent connections with semicolons:
connect!(fg,
src > head > snk;
msg_source | msg_copy | msg_sink;
);
Blocks can also be added and connected manually. This is what the macro is doing for the common cases: it stores blocks in the flowgraph, gets their port endpoints, and records stream or message edges.
use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
use std::time::Duration;
let mut fg = Flowgraph::new();
let src = fg.add(VectorSource::<u32>::new(vec![1, 2, 3, 4]));
let snk = fg.add(VectorSink::<u32>::new(4));
fg.stream_dyn(src, "output", snk, "input")?;
let msg_source = fg.add(
MessageSourceBuilder::new(Pmt::String("foo".to_string()),
Duration::from_millis(100))
.n_messages(20)
.build(),
);
let msg_copy = fg.add(MessageCopy::new());
let msg_sink = fg.add(MessageSink::new());
fg.message(msg_source, "out", msg_copy, "in")?;
fg.message(msg_copy, "out", msg_sink, "in")?;
let fg = Runtime::new().run(fg)?;
Use connect! for normal application code. The explicit form is useful when block types are selected dynamically or when it helps to understand the lower-level API.
Accessing Blocks
When a block is added to a flowgraph, FutureSDR returns a BlockRef<T>. A block reference is a lightweight typed identifier. It is copyable, can be converted to a BlockId, and can be used to access the block while the flowgraph owns it.
The connect! macro also leaves you with block references for the blocks it added. After a blocking Runtime::run(), the finished Flowgraph is returned, so the same BlockRef can be used to inspect block state:
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
let mut fg = Flowgraph::new();
let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);
connect!(fg, src > snk);
let fg = Runtime::new().run(fg)?;
let snk = fg.block(&snk)?;
assert_eq!(snk.items(), &vec![1, 2, 3, 4]);
Similarly, block_mut() can be used to update block metadata or block state:
let mut fg = Flowgraph::new();
let snk = fg.add(VectorSink::<u32>::new(4));
fg.block_mut(&snk)?.set_instance_name("samples");
Use BlockRef::id() or convert a BlockRef into BlockId when a runtime interaction API needs an untyped block identifier.
Flowgraph Interactions
Runtime::run() is the simplest way to execute a flowgraph when you only need the result after it finishes. To interact with a flowgraph while it is running, start it with Runtime::start() on native targets or Runtime::start_async() in async code. Both return a RunningFlowgraph.
RunningFlowgraph can post messages, call message handlers, describe the running graph, stop it, and wait for completion.
The following example starts a flowgraph and continuously hops through a list of frequencies by posting Pmt::F64 values to a block’s freq message handler:
use futuresdr::prelude::*;
use std::time::Duration;
let mut fg = Flowgraph::new();
// set up the flowgraph
// `my_seify_source` is a source or sink block with a `freq` message input.
let radio = fg.add(my_seify_source);
let rt = Runtime::new();
let running = rt.start(fg)?;
Runtime::block_on(async move {
let frequencies = [100.0e6, 101.0e6, 102.0e6];
for freq in frequencies.iter().cycle() {
running.post(radio, "freq", Pmt::F64(*freq)).await?;
Timer::after(Duration::from_secs(1)).await;
}
})?;
Waiting for completion is a separate operation. Use it when the flowgraph is expected to finish on its own, for example when a finite source reaches the end of its input:
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
let mut fg = Flowgraph::new();
let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);
connect!(fg, src > snk);
let rt = Runtime::new();
let running = rt.start(fg)?;
let fg = running.wait()?;
let snk = fg.block(&snk)?;
assert_eq!(snk.items(), &vec![1, 2, 3, 4]);
For flowgraphs that do not finish on their own, request shutdown before waiting:
Runtime::block_on(async move {
running.stop().await?;
let fg = running.wait_async().await?;
Ok::<_, Error>(fg)
})?;
If multiple tasks need access to the same running flowgraph, keep a clonable handle:
let handle = running.handle();
Runtime::block_on(async move {
handle.post(radio_id, "freq", Pmt::F64(100.0e6)).await
})?;