Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

FutureSDR is a software-defined radio (SDR) runtime written in Rust with a focus on portability, performance, and developer ergonomics.

Main Features

  • Platform support: FutureSDR runs on Linux, Windows, macOS, Android, and on the web. Support for both native and browser targets allows you to reuse the same signal-processing code across desktop, embedded, and WebAssembly deployments.
  • Accelerators: FutureSDR integrates with accelerators through custom buffers that provide direct access to accelerator memory (e.g., DMA buffers, GPU staging buffers, machine-learning tensors). Developers can implement their own buffers or reuse existing ones for Xilinx Zynq DMA, Vulkan GPU, and Burn, a Rust machine-learning framework.
  • Custom Schedulers: FutureSDR uses an async runtime that schedules data-processing workloads as user-space tasks. This architecture lets you plug in different scheduling strategies to match your latency and throughput goals.

Core Concepts

While FutureSDR’s implementation differs from other SDR frameworks, the core abstractions remain familiar. It supports Blocks that implement stream-based or message-based data processing. These blocks can be combined into a Flowgraph and launched on a Runtime that is driven by a Scheduler.

Documentation Structure

User Documentation explains how to:

  • Use an existing FutureSDR application (an example or a third-party implementation).
  • Interface a FutureSDR application through the built-in or a custom web interface.
  • Interface a FutureSDR application through the REST API (e.g., Curl or a custom Python script).

Application Development explains how to:

  • Create FutureSDR applications using existing blocks.
  • Interact with running flowgraphs through Rust code.
  • Integrate FutureSDR in a broader application, potentially with custom GUIs.

SDR Development explains how to:

  • Implement custom blocks for specific technologies or custom integrations.
  • Extend FutureSDR with custom buffers or custom schedulers.

Installation

Compiling and running FutureSDR applications requires at least a Rust toolchain. The sections below walk you through setting up Rust and the additional tooling needed for building native binaries and the web user interface.

Install Rust

To install Rust, follow the official instructions.

FutureSDR requires the nightly Rust toolchain. The root crate uses nightly-only Rust features, and the Leptos-based web UI crates also enable Leptos’ nightly syntax feature. The rust-version in Cargo.toml is only the minimum compiler version; the channel must still be nightly.

Install nightly with the standard development components:

rustup toolchain install nightly --component rustfmt clippy

For FutureSDR applications, either make nightly your default toolchain:

rustup default nightly

or set it per project:

cd <into your project or FutureSDR>
rustup override set nightly

The FutureSDR repository contains a rust-toolchain.toml, so cargo automatically selects nightly when run inside the checkout.

Web GUI and Web SDR Applications

FutureSDR ships with pre-compiled web UIs, so you can use them without extra tooling. If you want to extend or adapt the web UIs, install the wasm32-unknown-unknown target:

rustup target add wasm32-unknown-unknown --toolchain nightly

Install Trunk, a build and packaging tool for Rust WebAssembly projects, with Cargo or one of the other options listed in their documentation:

cargo install --locked trunk

Linux (Ubuntu)

  • Clone the FutureSDR repository
    git clone https://github.com/FutureSDR/FutureSDR.git
  • Optionally, install SoapySDR
    sudo apt install -y libsoapysdr-dev soapysdr-module-all soapysdr-tools
  • Check if your setup is working by running cargo build in the FutureSDR directory.

macOS

These instructions assume that you use Homebrew as your package manager.

  • Clone the FutureSDR repository
    git clone https://github.com/FutureSDR/FutureSDR.git
  • Optionally, install SoapySDR
    brew install soapysdr
  • Additional drivers are available in the Pothos Homebrew tap.
  • Check if your setup is working by running cargo build in the FutureSDR directory.

Windows

  • Clone the FutureSDR repository
    git clone https://github.com/FutureSDR/FutureSDR.git.

  • Install Visual Studio C++ Community Edition (required components: Win10 SDK and VC++).

    Visual Studio does not add its binaries and libraries to the PATH. Instead, it offers various terminal environments, configured for a given toolchain. Please use the native toolchain for your system to build FutureSDR, e.g., x64 Native Tools Command Prompt for VS 2022.

For SoapySDR hardware drivers:

  • Miniconda for pre-built SDR drivers. The installer offers to add the binaries to your PATH. Do not check this option.

  • After installation, open Anaconda Prompt application.

  • Create an environment and activate it:
    conda create -n sdr_env && conda activate sdr_env

  • Install SoapySDR:
    conda install -c conda-forge soapysdr

  • Install necessary drivers (e.g. for USRP):
    conda install -c conda-forge soapysdr-module-uhd
    Note: Download FPGA images if using USRP:
    uhd_images_downloader

  • Add the following to your User Environment Variables:

    VariableValue
    SOAPY_SDR_ROOTC:\Users\<User>\miniconda3\envs\sdr_env\Library
    SOAPY_SDR_PLUGIN_PATHC:\Users\<User>\miniconda3\envs\sdr_env\Library\lib\SoapySDR\modules0.8
    LIBC:\Users\<User>\miniconda3\envs\sdr_env\Library\lib
    PATH (Append this one)C:\Users\<User>\miniconda3\envs\sdr_env\Library\bin
  • For verification, restart a new terminal and run SoapySDRUtil --info. Check if your hardware (e.g., uhd) is listed under Available factories.

  • Check if your setup is working by running cargo build in the FutureSDR directory.

Windows Subsystem for Linux (WSL)

Alternatively, FutureSDR can be run on Windows using WSL (These steps are verified for Ubuntu 24.04).

  • Open PowerShell as Administrator and run:
    wsl --install -d Ubuntu-24.04

  • After installation, restart your PC.

  • Open a Linux terminal, set up your username/password, and run:
    sudo apt update && sudo apt upgrade

  • Install the core tools required for compiling Rust and C++ projects:
    sudo apt install git build-essential cmake libfontconfig1-dev clang libclang-dev usbutils

  • Install Rust and make nightly the default toolchain:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
    source $HOME/.cargo/env
    rustup toolchain install nightly --component rustfmt clippy
    rustup default nightly
    
  • Depending on your SDR device, install the necessary drivers and firmware (e.g. for USRP devices):

    sudo apt install libuhd-dev uhd-host
    sudo uhd_images_downloader
    
  • Install the bridge between SDR hardware and software, along with the audio bridge:

    sudo apt install libsoapysdr-dev soapysdr-tools soapysdr-module-uhd
    sudo apt install pulseaudio libasound2-dev libasound2-plugins
    

    Note: After installing pulseaudio, run wsl --shutdown in PowerShell and restart your terminal to activate the audio bridge. Ensure ls -l /mnt/wslg/PulseServer exists to enable audio support in examples.

  • To prevent a Segmentation Fault during device discovery, disable the default Soapy audio module:
    sudo mv /usr/lib/x86_64-linux-gnu/SoapySDR/modules0.8/libaudioSupport.so /usr/lib/x86_64-linux-gnu/SoapySDR/modules0.8/libaudioSupport.so.bak

  • To share your USB device with Linux, install usbipd on Windows:

    • Open PowerShell as Administrator and run:
      winget install usbipd

    • Plug in your SDR and identify the VID:PID:
      usbipd list

    • Bind and Attach (use Hardware-ID for port independence):

      usbipd bind --hardware-id <VID:PID>
      usbipd attach --hardware-id <VID:PID> --auto-attach
      
  • In your Linux terminal, verify if the hardware is visible:

    lsusb
    sudo uhd_find_devices
    

    Note: When you run uhd_find_devices or an SDR-based example for the first time, the USRP will download firmware and reset. This breaks the connection to WSL. You should return to Windows PowerShell and manually run this command again:
    usbipd attach --hardware-id <VID:PID> --auto-attach Once re-attached, your device will be stable until the device is physically disconnected.

  • Finally, install FutureSDR and check if your setup is working by running cargo build in the FutureSDR directory:

    git clone https://github.com/FutureSDR/FutureSDR.git
    cd FutureSDR
    cargo build --release
    

Running FutureSDR Applications

FutureSDR examples and applications can be run like normal Rust applications:

cargo run --release --bin rx

Or examples

cargo run --release --example minimal

Note

If --release is not specified, Rust will by default build the binary in debug mode, which is much slower and often unusable for real-time signal processing.

Configuration

FutureSDR offers runtime options that can be configured through a config.toml or environment variables.

It will search for a global user config at ~/.config/futuresdr/config.toml, a project config.toml in the current directory, and environment variables. The user config has the lowest precedence, while environment variables have the highest precedence.

The available options are:

  • queue_size: number of messages that fit into a block’s inbox
  • buffer_size: default minimum size of a stream buffer in bytes
  • stack_size: stack size (in bytes) for all threads
  • slab_reserved: number of items a Slab buffer copies into the next buffer
  • log_level: one of off, info, warn, error, debug, or trace
  • ctrlport_enable: whether control port should be enabled (true or false)
  • ctrlport_bind: endpoint that the control-port web server should bind to (e.g., 127.0.0.1:1337)
  • frontend_path: path to a web UI that is served as the root URL of the control-port server

An example config.toml:

log_level = "debug"
buffer_size = 32768
queue_size = 8192
ctrlport_enable = true
ctrlport_bind = "127.0.0.1:1337"

Alternatively, pass these options through environment variables. Each key uses the prefix FUTURESDR_ and is uppercased:

export FUTURESDR_CTRLPORT_ENABLE="true"
export FUTURESDR_CTRLPORT_BIND="0.0.0.0:1337"

Rust Features

Some examples use Cargo features to selectively enable functionality such as SDR drivers or GPU backends. Check the [features] section in an example’s Cargo.toml for the full list of supported flags.

[features]
default = ["soapy"]
aaronia_http = ["futuresdr/aaronia_http"]
soapy = ["futuresdr/soapy"]

In this example soapy is enabled by default, and the Aaronia HTTP driver can be enabled by adding the corresponding feature.

cargo run --release --bin rx --features=aaronia_http

Disable default features with:

cargo run --release --bin rx --no-default-features

Log and Debug Messages

FutureSDR uses the tracing library for log and debug messages. Applications can set their own handler for log messages, otherwise FutureSDR will set EnvFilter as default handler. If the application uses a custom handler, the logging-related configuration of FutureSDR will not be considered, and you’d have to check with the documentation of the application for information about logging.

If no log handler is set when a flowgraph is launched on a runtime, FutureSDR will set EnvFilter. There are extensive configuration options to configure logging per module through environment variables. Please see the documentation.

Some examples:

# set log level to warn
FUTURESDR_LOG=warn cargo run --bin rx

# disable log messages from lora::frame_sync module
FUTURESDR_LOG=lora::frame_sync=off cargo run --bin rx

# set default log level to info but disable messages from lora::decoder
FUTURESDR_LOG=info,lora::decoder=off cargo run --release --bin rx

Warning

By default, FutureSDR sets feature flags that apply compile-time tracing filters: trace messages are disabled in debug builds, and messages more detailed than info are disabled in release builds.

Also, these flags are transitive! If you want more detailed logs in your application, disable default features for the FutureSDR dependency.

[dependencies]
futuresdr = { version = "...", default-features = false, features = ["audio", "seify"] }

Command Line Arguments

Most examples allow passing command line arguments. When running the application with cargo, use -- to separate Cargo’s arguments from the application’s arguments.

To check which arguments are available, pass the -h/--help flag.

$ cargo run --release -- -h
Usage: fm-receiver [OPTIONS]

Options:
  -g, --gain <GAIN>              Gain to apply to the seify source [default: 30]
  -f, --frequency <FREQUENCY>    Center frequency [default: 100000000]
  -r, --rate <RATE>              Sample rate [default: 1000000]
  -a, --args <ARGS>              Seify args [default: ]
      --audio-mult <AUDIO_MULT>  Multiplier for intermedia sample rate
      --audio-rate <AUDIO_RATE>  Audio Rate
  -h, --help                     Print help

Important

When running applications with cargo, use -- to separate command line parameters of cargo and the application.

cargo run --release --bin foo -- --sample_rate 3e6

SDR Device Selection and Configuration

Most example applications support an -a/--argument command line option that is passed to the SDR hardware drivers. The argument can be used to pass additional options, select the hardware driver, or specify the SDR, if more than one is connected.

Driver selection can be necessary in more cases than one might expect. FutureSDR uses Seify as SDR hardware abstraction layer, which usually defaults to using Soapy drivers under the hood. Many distributions ship a bundle of Soapy drivers that include an audio driver, which enumerates your sound card as SDR. You can run SoapySDR --probe to see what is detected.

If Seify selects the wrong device, specify the device argument to select the correct one by defining the driver (e.g., -a soapy_driver=rtlsdr) and optionally the device index (e.g., -a soapy_driver=rtlsdr,index=1) or any other identifier supported by the driver (e.g., serial number, IP address, or USB device ID). See the driver documentation for information about what is supported.

A complete command could be

cargo run --release --bin receiver -- -a soapy_driver=rtlsdr

Important

Seify will forward all arguments to Soapy. Only the driver argument has to be prefixed to soapy_driver to differentiate it from Seify driver selection.

Important

Soapy might select the wrong device even if only one SDR is plugged into your PC. Use the -a/--argument to select the Soapy driver, e.g., -a soapy_driver=rtlsdr.

Remote Interaction

It is possible to interact with a running flowgraph through the control port REST API, which can be used as the base for web UIs or any other tool supporting REST (e.g., Curl, Python requests).

REST API

Control port provides a REST API to expose the flowgraph structure and enable remote interaction. It is enabled by default, but you can configure it explicitly through the configuration, for example:

ctrlport_enable = true
ctrlport_bind = "127.0.0.1:1337"

To allow remote hosts to access control port, bind it to a public interface or an unrestricted address:

ctrlport_enable = true
ctrlport_bind = "0.0.0.0:1337"

Alternatively, configure control port through environment variables, which always take precedence:

export FUTURESDR_CTRLPORT_ENABLE="true"
export FUTURESDR_CTRLPORT_BIND="0.0.0.0:1337"

Control port can be accessed with a browser or programmatically (e.g., using curl, the Python requests library, etc.). FutureSDR also provides a support library to ease remote interaction from Rust.

To get a JSON description of the first flowgraph executed on a runtime, open 127.0.0.1:1337/api/fg/0/ in your browser or use curl:

curl http://127.0.0.1:1337/api/fg/0/ | jq
{
  "blocks": [
    {
      "id": 0,
      "type_name": "Encoder",
      "instance_name": "Encoder-0",
      "stream_inputs": [],
      "stream_outputs": [
        "output"
      ],
      "message_inputs": [
        "tx"
      ],
      "message_outputs": [],
      "blocking": false
    },
    {
      "id": 1,
      "type_name": "Mac",
      "instance_name": "Mac-1",
      "stream_inputs": [],
      "stream_outputs": [],
      "message_inputs": [
        "tx"
      ],
      "message_outputs": [
        "tx"
      ],
      "blocking": false
    }
  ],
  "stream_edges": [
    [
      0,
      "output",
      2,
      "input"
    ]
  ],
  "message_edges": [
    [
      1,
      "tx",
      0,
      "tx"
    ]
  ]
}

It is also possible to get information about a particular block.

curl http://127.0.0.1:1337/api/fg/0/block/0/ | jq
{
  "id": 0,
  "type_name": "Encoder",
  "instance_name": "Encoder-0",
  "stream_inputs": [],
  "stream_outputs": [
    "output"
  ],
  "message_inputs": [
    "tx"
  ],
  "message_outputs": [],
  "blocking": false
}

All message handlers of a block are exposed automatically through the REST API. Assuming block 0 is the SDR source or sink, you can set the frequency by posting a JSON-serialized PMT to the corresponding message handler:

curl -X POST -H "Content-Type: application/json" -d '{ "U32": 123 }'  http://127.0.0.1:1337/api/fg/0/block/0/call/freq/

Here are some more examples of serialized PMTs:

{ "U32": 123 }
{ "U64": 5}
{ "F32": 123 }
{ "Bool": true }
{ "VecU64": [ 1, 2, 3] }
"Ok"
"Null"
{ "String": "foo" }

Endpoints

The control port interface is associated with the runtime. By default, it comes with the following endpoints:

  • GET /api/fg/: Array of flowgraph IDs of the flowgraphs spawned on the runtime.
  • GET /api/fg/0/: JSON description of flowgraph with ID 0.
  • GET /api/fg/0/block/0/: JSON description of the block with ID 0.
  • GET /api/fg/0/block/0/call/freq: Call message handler freq of the block with Pmt::Null as argument.
  • POST /api/fg/0/block/0/call/freq: Call message handler freq with JSON-serialized Pmt as input.

Example: Frequency Hopping

The following Python script uses the REST API to find the first block in flowgraph 0 whose type or instance name contains seify, then repeatedly calls its freq message handler with a list of frequencies.

It requires the Python requests package:

#!/usr/bin/env python3

import itertools
import requests
import time


BASE_URL = "http://127.0.0.1:1337"
FREQUENCIES = [
    100.0e6,
    101.0e6,
    102.0e6,
]
DWELL_TIME = 1.0


description = requests.get(f"{BASE_URL}/api/fg/0/").json()

seify_block = next(
    block
    for block in description["blocks"]
    if "seify" in block["type_name"].lower()
    or "seify" in block["instance_name"].lower()
)

freq_url = f"{BASE_URL}/api/fg/0/block/{seify_block['id']}/call/freq/"

for freq in itertools.cycle(FREQUENCIES):
    # Pmt::F64(freq), serialized as JSON.
    requests.post(freq_url, json={"F64": freq})
    print(f"set frequency to {freq / 1e6:.3f} MHz")
    time.sleep(DWELL_TIME)

Web UI

FutureSDR comes with a minimal, work-in-progress web UI, implemented in the prophecy crate. It comes pre-compiled at crates/prophecy/dist. When FutureSDR is started with control port enabled, you can specify the frontend_path configuration option to serve a custom frontend at the root path of the control-port URL (e.g., 127.0.0.1:1337).

Using the REST API, it is straightforward to build custom UIs.

  • A web UI served by an independent server
  • A web UI served through FutureSDR control port (see the WLAN and ADS-B examples)
  • A UI using arbitrary technology (GTK, Qt, etc.) running as a separate process (see the Egui example)

Project Creation

To create a Rust crate that uses FutureSDR, initialize the crate and add FutureSDR as a dependency. FutureSDR requires nightly Rust, so configure the project to use the nightly toolchain.

cargo init my_project
cd my_project
rustup override set nightly

Edit the Cargo.toml to add the dependency. There are several options:

Use a specific version (stable, but code might be outdated due to irregular release cycles)

[dependencies]
futuresdr = { version = "0.0.40" }

Track the main branch (unstable but always up-to-date)

[dependencies]
futuresdr = { git = "https://github.com/FutureSDR/FutureSDR.git", branch = "main" }

Use a specific commit (potentially best of both worlds)

[dependencies]
futuresdr = { git = "https://github.com/FutureSDR/FutureSDR.git", rev = "7afd76c6d768ebc6432e705efe13e73543d33668" }

Use a local working tree (if you work on FutureSDR in parallel)

[dependencies]
futuresdr = { path = "../FutureSDR" }

Features

FutureSDR supports several features that you may want to enable.

  • default: by default tracing_max_level_debug and tracing_release_max_level_info are enabled
  • aaronia_http: drivers for Aaronia HTTP servers, usable through Seify
  • audio: read/write audio files and interface speakers/mic
  • burn: buffers using Burn tensors
  • flow_scheduler: enable the Flow Scheduler
  • hackrf: enable Rust HackRF driver for Seify (unstable, not recommended)
  • rtlsdr: enable Rust RTL SDR driver for Seify (unstable, not recommended)
  • seify: enable Seify SDR hardware abstraction
  • seify_dummy: enable dummy driver for Seify for use in unit tests
  • soapy: enable SoapySDR driver for Seify
  • tracing_max_level_debug: compile out trace messages in debug mode
  • tracing_release_max_level_info: compile out messages more detailed than info in release mode
  • vulkan: enable Vulkan buffers and blocks
  • wgpu: enable WGPU buffers and blocks
  • zeromq: enable ZeroMQ source and sink
  • zynq: enable Xilinx Zynq DMA buffers

For example:

[dependencies]
futuresdr = { version = "0.0.40", default-features = false, features = ["audio", "seify"] }

Minimal Example

To test if everything is working, you can paste the following minimal example in src/main.rs and execute it with cargo run.

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

fn main() -> Result<()> {
    let mut fg = Flowgraph::new();

    let src = NullSource::<u8>::new();
    let head = Head::<u8>::new(123);
    let snk = NullSink::<u8>::new();

    connect!(fg, src > head > snk);

    Runtime::new().run(fg)?;

    Ok(())
}

Runtime

A FutureSDR Runtime owns a Scheduler and starts one or more Flowgraphs. On native targets, the runtime can start an integrated web server to serve a web UI and expose the control port interface for runtime and flowgraph interaction.

Running a Flowgraph

The simplest way to execute a flowgraph is to construct a runtime, pass the flowgraph to run(), and block until it terminates.

let mut fg = Flowgraph::new();
// set up the flowgraph

let fg = Runtime::new().run(fg)?;

The run() method is a blocking call that takes ownership of the flowgraph and returns the finished flowgraph after all blocks have terminated. This is useful when you need to inspect blocks after execution, for example to read data or statistics.

In async code, use run_async() instead:

let mut fg = Flowgraph::new();
// set up the flowgraph

let fg = Runtime::new().run_async(fg).await?;

Starting a Flowgraph

Use start_async() when the application should keep doing other work while the flowgraph is running. It returns once all blocks have initialized.

let mut fg = Flowgraph::new();
// set up the flowgraph

let rt = Runtime::new();
let running = rt.start_async(fg).await?;

On native targets, start() provides the same behavior from synchronous code:

let mut fg = Flowgraph::new();
// set up the flowgraph

let rt = Runtime::new();
let running = rt.start(fg)?;

Both methods return a RunningFlowgraph. It combines the completion task with a FlowgraphHandle:

let running = rt.start(fg)?;
let handle = running.handle();

Runtime::block_on(async move { handle.post(block_id, "handler_name", Pmt::U32(42)).await })?;

let fg = running.wait()?;

Use running.post() and running.call() to interact with blocks, running.wait() to block until termination on native targets, and running.wait_async().await in async code. running.stop_and_wait().await requests shutdown and then recovers the finished flowgraph. Use running.handle() when you need to keep a clonable control handle. If you need to pass the two parts around separately, running.split() returns the FlowgraphTask and FlowgraphHandle.

Selecting a Scheduler

To use a different scheduler or change its configuration, you can specify it when constructing the runtime.

let mut fg = Flowgraph::new();
// set up the flowgraph

let rt = Runtime::with_scheduler(FlowScheduler::new());
rt.run(fg)?;

Runtime Handle

A RuntimeHandle is a clonable control handle for the runtime. It is useful when other tasks, threads, web handlers, or callbacks need to start flowgraphs or query the flowgraphs registered with the runtime control plane.

let rt = Runtime::new();
let runtime_handle = rt.handle();

Runtime::block_on(async move {
    let mut fg = Flowgraph::new();
    // set up the flowgraph

    let running = runtime_handle.start(fg).await?;
    let flowgraph_handle = running.handle();
    let description = flowgraph_handle.describe().await?;

    Ok::<_, futuresdr::runtime::Error>(())
})?;

RuntimeHandle::start() returns a RunningFlowgraph. It also registers the flowgraph with the runtime control plane, so it remains available through get_flowgraph() and the control port.

Initialization and Logging

Constructing a runtime calls futuresdr::runtime::init() automatically. At the moment, initialization installs FutureSDR’s default tracing subscriber when no subscriber is already installed. Call it manually only when application code wants to emit FutureSDR logs before constructing the runtime:

use futuresdr::prelude::*;

fn main() -> Result<()> {
    futuresdr::runtime::init();
    info!("parsing configuration before the runtime exists");

    let rt = Runtime::new();
    // build and run flowgraphs

    Ok(())
}

If an application installs its own tracing subscriber first, FutureSDR leaves it in place. See Logging for runtime and compile-time filtering details.

Scheduler

Schedulers are responsible for executing the blocks in a Flowgraph and potentially other async tasks. A scheduler decides where block tasks run, how general async tasks are spawned, and how blocking work is handled.

Most applications should use the default scheduler through Runtime::new(). Select a scheduler explicitly only when you need to configure the native executor or when benchmarking shows that a different scheduler improves a specific flowgraph.

Smol

SmolScheduler is the default scheduler on native targets and is the recommended scheduler for general use. It is based on the smol async runtime and runs block tasks on a pool of executor threads.

The default runtime uses SmolScheduler::default(), which creates one worker per detected CPU core and does not pin workers to cores:

use futuresdr::prelude::*;

let mut fg = Flowgraph::new();
// set up the flowgraph

let fg = Runtime::new().run(fg)?;

Instantiate it explicitly when you want to configure the number of workers or CPU pinning:

use futuresdr::prelude::*;
use futuresdr::runtime::scheduler::SmolScheduler;

let mut fg = Flowgraph::new();
// set up the flowgraph

let scheduler = SmolScheduler::new(2, false);
let fg = Runtime::with_scheduler(scheduler).run(fg)?;

The first argument is the number of executor threads. The second argument enables CPU pinning. When pinning is enabled, workers are pinned to the detected CPU cores in order:

use futuresdr::prelude::*;
use futuresdr::runtime::scheduler::SmolScheduler;

let scheduler = SmolScheduler::new(4, true);
let rt = Runtime::with_scheduler(scheduler);

Flow

FlowScheduler is a custom native scheduler for more controlled execution. It is available with the flow_scheduler feature:

cargo run --features=flow_scheduler --example minimal

Use FlowScheduler::new() to let the scheduler assign all blocks to worker-local queues with its default deterministic mapper:

use futuresdr::prelude::*;
use futuresdr::runtime::scheduler::FlowScheduler;

let mut fg = Flowgraph::new();
// set up the flowgraph

let scheduler = FlowScheduler::new();
let fg = Runtime::with_scheduler(scheduler).run(fg)?;

With FlowScheduler::new(), blocks are not placed in the global queue. Each block is mapped to one worker queue based on its block ID, the number of blocks, and the number of workers. Each worker calls its local blocks round-robin. General async tasks spawned on the scheduler, and local tasks that overflow a worker queue, use a global queue that workers poll when their local work is idle.

For explicit control, use FlowScheduler::with_pinned_blocks() to assign selected blocks to fixed workers. The outer vector index is the worker index, and each inner vector lists the block IDs assigned to that worker in initial queue order. Blocks that are not listed still use the default deterministic mapper:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
use futuresdr::runtime::scheduler::FlowScheduler;

let mut fg = Flowgraph::new();

let src = NullSource::<f32>::new();
let head = Head::<f32>::new(1_000_000);
let snk = NullSink::<f32>::new();

connect!(fg, src > head > snk);

let scheduler = FlowScheduler::with_pinned_blocks(vec![
    vec![src.id(), head.id()],
    vec![snk.id()],
]);

Runtime::with_scheduler(scheduler).run(fg)?;

Benchmark before switching to the Flow Scheduler. Its deterministic mapping can help with some pipelines, but it is not guaranteed to outperform the default scheduler.

WebAssembly

WasmScheduler is selected by Runtime::new() automatically when compiling for wasm32 and starts one web worker by default. Use WasmScheduler::new(n) to run normal blocks on a worker pool.

Some browser APIs, including CPAL’s WebAudio output backend, must be created and owned on the browser main thread. For those cases, use Runtime::with_scheduler(futuresdr::runtime::scheduler::wasm::WasmMainScheduler::new()). This keeps the flowgraph in FutureSDR but runs normal blocks on the UI thread, so it should only be used for light flowgraphs or main-thread-only browser APIs.

Local domains are available on WASM as well. A local domain creates a dedicated web worker and receives the closure that instantiates each local block, mirroring the native local-domain model. FutureSDR uses one worker script path for both scheduler workers and local-domain workers. The default is ./futuresdr-wasm-scheduler-worker.js; configure a different path with futuresdr::runtime::scheduler::wasm::set_worker_script(path) before creating schedulers or local domains.

With Trunk, the simplest setup is to give the Rust output a fixed target name and copy the stock worker template. For a cdylib app:

<link data-trunk rel="rust" data-target-name="futuresdr_app" data-weak-refs data-reference-types />
<link data-trunk rel="copy-file" href="assets/futuresdr-wasm-scheduler-worker.js" />

For a bin target, add a [[bin]] alias named futuresdr_app in Cargo.toml and select it from Trunk:

[[bin]]
name = "futuresdr_app"
path = "src/bin/app.rs"
<link data-trunk rel="rust" data-bin="futuresdr_app" data-weak-refs data-reference-types />
<link data-trunk rel="copy-file" href="assets/futuresdr-wasm-scheduler-worker.js" />

The worker template imports ./futuresdr_app.js, initializes the module/memory from the init message, and dispatches both futuresdr-wasm-scheduler-init and futuresdr-wasm-local-domain-init messages. See examples/wasm-threaded/assets/futuresdr-wasm-scheduler-worker.js for the template. If a flowgraph is started from another web worker, give that worker target the futuresdr_app name/alias and use the same template there as well.

use futuresdr::prelude::*;

let mut fg = Flowgraph::new();
// set up the flowgraph

let fg = Runtime::new().run_async(fg).await?;

Flowgraph

A Flowgraph is a directed graph of blocks and connections. Blocks do the actual work; the flowgraph describes which stream ports and message ports are connected.

Stream connections carry sample streams between blocks. They must form a directed acyclic graph. Message connections carry PMTs between message handlers and can use arbitrary topologies.

Constructing Flowgraphs

Create an empty flowgraph with Flowgraph::new(), add blocks, and connect them. The usual way to build a flowgraph is the connect! macro. It adds blocks to the flowgraph if needed and wires their ports.

The simplest stream connection uses the default stream output and input port names:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = NullSource::<u8>::new();
let head = Head::<u8>::new(1024);
let snk = NullSink::<u8>::new();

connect!(fg, src > head > snk);

Named stream ports can be selected explicitly. Output ports are written after the source block, and input ports are written before the destination block:

connect!(fg, src.output > input.head > snk);

Message connections use | instead of >. This example connects the out message output of msg_source to the in message input of msg_copy, then forwards messages to msg_sink:

use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();

let msg_source = MessageSourceBuilder::new(Pmt::String("foo".to_string()), Duration::from_millis(100))
    .n_messages(20)
    .build();
let msg_copy = MessageCopy::new();
let msg_sink = MessageSink::new();

connect!(fg, msg_source | msg_copy | msg_sink);

Message ports can also be named explicitly:

connect!(fg, msg_source.out | r#in.msg_copy);

The r#in spelling is Rust’s raw-identifier syntax for a port named in.

Stream and message connections can be mixed in one macro invocation. Separate independent connections with semicolons:

connect!(fg,
    src > head > snk;
    msg_source | msg_copy | msg_sink;
);

Blocks can also be added and connected manually. This is what the macro is doing for the common cases: it stores blocks in the flowgraph, gets their port endpoints, and records stream or message edges.

use futuresdr::blocks::MessageCopy;
use futuresdr::blocks::MessageSink;
use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();

let src = fg.add(VectorSource::<u32>::new(vec![1, 2, 3, 4]));
let snk = fg.add(VectorSink::<u32>::new(4));
fg.stream_dyn(src, "output", snk, "input")?;

let msg_source = fg.add(
    MessageSourceBuilder::new(Pmt::String("foo".to_string()),
            Duration::from_millis(100))
        .n_messages(20)
        .build(),
);
let msg_copy = fg.add(MessageCopy::new());
let msg_sink = fg.add(MessageSink::new());

fg.message(msg_source, "out", msg_copy, "in")?;
fg.message(msg_copy, "out", msg_sink, "in")?;

let fg = Runtime::new().run(fg)?;

Use connect! for normal application code. The explicit form is useful when block types are selected dynamically or when it helps to understand the lower-level API.

Local Domains

Normal blocks and buffers must be send-capable because the scheduler may move block tasks between workers. A local domain gives you a single-thread execution island for blocks or buffers that are not Send, or for integrations that must stay on one thread. On WASM, local domains are backed by web workers.

Create a local domain, add blocks with add_local(), and connect local-only stream buffers with ~> in connect! or stream_local() manually:

use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::LocalCpuReader;
use futuresdr::runtime::buffer::LocalCpuWriter;

let mut fg = Flowgraph::new();
let local = fg.local_domain()?;

let src = fg.add_local(local, || {
    NullSource::<f32, LocalCpuWriter<f32>>::new()
});
let snk = fg.add_local(local, || {
    NullSink::<f32, LocalCpuReader<f32>>::new()
});

fg.stream_local(&src, |b| b.output(), &snk, |b| b.input())?;

The ~> macro operator is the equivalent typed local-stream connection:

connect!(fg, src ~> snk);

Local-only stream connections must stay inside one local domain. Send-capable stream buffers can still connect normal blocks and local-domain blocks. Message connections are not restricted by local domains.

When a block’s state must be created on the local-domain thread itself, build that part of the graph with domain_run() (or domain_run_async() in async code). The closure receives a LocalDomainContext; add local blocks through ctx.add(...) and use the same connect! syntax with ctx as the graph argument:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::LocalCpuReader;
use futuresdr::runtime::buffer::LocalCpuWriter;

let mut fg = Flowgraph::new();
let local = fg.local_domain()?;

let snk = fg.domain_run(local, |ctx| {
    let src = ctx.add(NullSource::<u8, LocalCpuWriter<u8>>::new());
    let head = ctx.add(Head::<u8, LocalCpuReader<u8>, LocalCpuWriter<u8>>::new(10));
    let snk = ctx.add(NullSink::<u8, LocalCpuReader<u8>>::new());

    connect!(ctx, src ~> head ~> snk);

    Ok(snk)
})?;

On wasm32, use the async forms (add_local_async, domain_run_async, and connect_async!) while constructing the graph.

Accessing Blocks

When a block is added to a flowgraph, FutureSDR returns a BlockRef<T>. A block reference is a lightweight typed identifier. It is copyable, can be converted to a BlockId, and can be used to access the block while the flowgraph owns it.

The connect! macro also leaves you with block references for the blocks it added. After a blocking Runtime::run(), the finished Flowgraph is returned, so the same BlockRef can be used to inspect block state:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);

connect!(fg, src > snk);

let fg = Runtime::new().run(fg)?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3, 4]);

Similarly, block_mut() can be used to update block metadata or block state:

let mut fg = Flowgraph::new();
let snk = fg.add(VectorSink::<u32>::new(4));

fg.block_mut(&snk)?.set_instance_name("samples");

Use BlockRef::id() or convert a BlockRef into BlockId when a runtime interaction API needs an untyped block identifier.

Flowgraph Interactions

Runtime::run() is the simplest way to execute a flowgraph when you only need the result after it finishes. To interact with a flowgraph while it is running, start it with Runtime::start() on native targets or Runtime::start_async() in async code. Both return a RunningFlowgraph.

RunningFlowgraph can post messages, call message handlers, describe the running graph, stop it, and wait for completion.

The following example starts a flowgraph and continuously hops through a list of frequencies by posting Pmt::F64 values to a block’s freq message handler:

use futuresdr::prelude::*;
use std::time::Duration;

let mut fg = Flowgraph::new();
// set up the flowgraph

// `my_seify_source` is a source or sink block with a `freq` message input.
let radio = fg.add(my_seify_source);

let rt = Runtime::new();
let running = rt.start(fg)?;

Runtime::block_on(async move {
    let frequencies = [100.0e6, 101.0e6, 102.0e6];

    for freq in frequencies.iter().cycle() {
        running.post(radio, "freq", Pmt::F64(*freq)).await?;
        Timer::after(Duration::from_secs(1)).await;
    }
})?;

Waiting for completion is a separate operation. Use it when the flowgraph is expected to finish on its own, for example when a finite source reaches the end of its input:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u32>::new(vec![1, 2, 3, 4]);
let snk = VectorSink::<u32>::new(4);

connect!(fg, src > snk);

let rt = Runtime::new();
let running = rt.start(fg)?;

let fg = running.wait()?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3, 4]);

For flowgraphs that do not finish on their own, request shutdown before waiting:

Runtime::block_on(async move {
    running.stop().await?;
    let fg = running.wait_async().await?;
    Ok::<_, Error>(fg)
})?;

If multiple tasks need access to the same running flowgraph, keep a clonable handle:

let handle = running.handle();
Runtime::block_on(async move {
    handle.post(radio_id, "freq", Pmt::F64(100.0e6)).await
})?;

Block

This page is for application developers who want to instantiate existing blocks and wire them into a Flowgraph. It does not cover implementing custom blocks.

A block is a processing element with stream ports, message ports, or both. In application code, using a block usually means:

  1. construct the block,
  2. add it to a flowgraph, and
  3. connect it to other blocks.
use futuresdr::blocks::Head;
use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<f32>::new(vec![1.0, 2.0, 3.0, 4.0]);
let head = Head::<f32>::new(2);
let snk = VectorSink::<f32>::new(2);

connect!(fg, src > head > snk);

let fg = Runtime::new().run(fg)?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1.0, 2.0]);

Type Parameters

Many blocks are generic over the sample type they process. For example, VectorSource::<f32> produces f32 samples, while VectorSource::<u8> produces bytes.

Some blocks are also generic over their buffer implementation. This can make their full Rust type look large, but most application code should ignore the buffer type. In-tree blocks use default CPU buffers that are usually the right choice:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

let src = NullSource::<f32>::new();
let head = Head::<f32>::new(1024);
let snk = NullSink::<f32>::new();

Here, Head::<f32> is enough even though there are two more generic parameters to specify input and output buffer types. These types are filled in by the block’s defaults.

When Type Inference Needs Help

Rust can infer a block’s sample type when constructor arguments carry enough type information. For example, the vector below contains u32 values, so the source item type is known:

use futuresdr::blocks::VectorSource;

let src = VectorSource::new(vec![1_u32, 2, 3]);

Other constructors do not mention the sample type in their arguments. Head::new(1024) only says how many items to pass through; it does not say what item type the block should process. In those cases, provide the sample type explicitly:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;

let head = Head::<f32>::new(1024);
let snk = NullSink::<f32>::new();

This is also why examples sometimes repeat the sample type even though the remaining generic parameters have defaults. Rust allows default generic parameters, but it cannot always infer the earlier parameters that those defaults depend on.

Closure-based blocks usually infer their types from the closure argument and return type. Add an argument annotation when needed:

use futuresdr::blocks::Apply;

let scale = Apply::new(|x: &f32| x * 0.5);
let to_u32 = Apply::new(|x: &f32| *x as u32);

Builders

Some blocks have a simple new(...) constructor; others use a builder. Typically, a builder is available when there are optional parameters that clutter new() with many optional arguments.

use futuresdr::blocks::MessageSourceBuilder;
use futuresdr::prelude::*;
use std::time::Duration;

let src = MessageSourceBuilder::new(Pmt::U32(42), Duration::from_millis(100))
    .n_messages(10)
    .build();

Inspecting Blocks After Run

When Runtime::run() returns, the finished flowgraph is returned too. Keep the BlockRef from connect! or fg.add() if you want to inspect block state afterwards:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = VectorSource::<u8>::new(vec![1, 2, 3]);
let snk = VectorSink::<u8>::new(3);

connect!(fg, src > snk);

let fg = Runtime::new().run(fg)?;
let snk = fg.block(&snk)?;

assert_eq!(snk.items(), &vec![1, 2, 3]);

For running flowgraphs, use the RunningFlowgraph or FlowgraphHandle APIs described in the Flowgraph and Runtime chapters.

Stream Buffers

Stream buffers move samples between connected stream ports. A source block writes into the writer side of a buffer, and the downstream block reads from the reader side.

FutureSDR can be extended with arbitrary buffer implementations. At the lowest level, every buffer provides a writer and reader pair implementing BufferWriter and BufferReader. The SendBufferWriter and SendBufferReader marker traits are implemented automatically when the buffer type and its notification futures are send-capable.

Buffer implementations can expose their own higher-level API. A CPU buffer exposes slices. A GPU buffer can expose GPU resources. A DMA buffer can expose hardware-owned memory. FutureSDR therefore provides specialized traits for the common buffer families instead of forcing every buffer into one sample-slice API.

The main stream buffer trait families are:

  • BufferWriter / BufferReader: minimal base trait for buffers.
  • SendBufferWriter / SendBufferReader: marker traits for send-capable buffers.
  • CpuBufferWriter / CpuBufferReader: out-of-place CPU buffer API.
  • SendCpuBufferWriter / SendCpuBufferReader: marker traits for send-capable CPU buffers.
  • InplaceWriter / InplaceReader / InplaceBuffer: in-place CPU buffer API, with SendInplaceWriter / SendInplaceReader markers for send-capable variants.

Most application code should use the default buffers through existing blocks. You only need to name buffer types when you want a non-default transport, such as in-place, GPU, or DMA buffers.

Normal Buffers

Normal CPU stream buffers are the default for most blocks. They expose readable and writable slices:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

let mut fg = Flowgraph::new();

let src = NullSource::<f32>::new();
let head = Head::<f32>::new(1024);
let snk = NullSink::<f32>::new();

connect!(fg, src > head > snk);

On native targets, DefaultCpuReader<T> and DefaultCpuWriter<T> are double-mapped circular buffers. They avoid wrapping logic in the hot path while still behaving like a ring buffer.

On WebAssembly, the default CPU buffer is the Slab implementation. It uses ordinary allocated slabs because double-mapped virtual memory is not available in the browser environment.

The default buffer size is controlled by the runtime config key buffer_size; see Running Applications. Some blocks also configure minimum item counts internally. For example, an FFT block needs enough samples for one transform.

You can select another CPU buffer by naming the buffer generic parameters:

use futuresdr::blocks::Head;
use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::slab;

let mut fg = Flowgraph::new();

let src = NullSource::<f32, slab::Writer<f32>>::new();
let head = Head::<f32, slab::Reader<f32>, slab::Writer<f32>>::new(1024);
let snk = NullSink::<f32, slab::Reader<f32>>::new();

connect!(fg, src > head > snk);

This is rarely needed in normal applications, but it is useful for benchmarks or platform-specific experiments.

In-Place Buffers

Normal stream buffers copy data from an input slice to an output slice when a block transforms samples. In-place buffers move owned buffer chunks through the flowgraph instead. A block can mutate the chunk and pass the same allocation downstream.

This can help for simple transformations, such as adding a constant to every sample, where copying between input and output buffers would dominate the work.

In-place buffers have a different API from normal CPU buffers:

  • SendInplaceReader::get_full_buffer() receives a full reusable buffer chunk.
  • SendInplaceWriter::put_full_buffer() forwards the same chunk after processing.
  • InplaceBuffer::slice() gives mutable access to the chunk contents.

That means in-place processing usually needs blocks written for the in-place API. See the in-place example for complete source.

Reusable buffers need to return to the start of the pipeline. FutureSDR models this as a circuit. First connect the forward stream edges as usual, then close the circuit from the source to the final sink with <:

use futuresdr::prelude::*;
use futuresdr::runtime::buffer::circuit;
use inplace::Apply;
use inplace::VectorSink;
use inplace::VectorSource;

let mut fg = Flowgraph::new();

let mut src: VectorSource<i32> = VectorSource::new(vec![1, 2, 3, 4]);
src.output().inject_buffers(4);

let apply = Apply::new();
let snk = VectorSink::new(4);

connect!(fg, src > apply > snk);
connect!(fg, src < snk);

The < connection closes the return path for empty buffers. The source injects a fixed number of reusable buffers, processing blocks mutate and forward them, and the sink returns each consumed buffer to the source side. In this snippet, inplace::VectorSource, inplace::Apply, and inplace::VectorSink are the custom blocks from the in-place example.

This concept is inspired by qsdr, which also explores in-place work APIs for SDR-style flowgraphs.

In-place buffers also implement the CPU buffer traits. This allows hybrid graphs where standard CPU blocks sit at the boundary and an in-place block processes the middle:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::circuit;
use inplace::Apply as InplaceApply;

let mut fg = Flowgraph::new();

let mut src = VectorSource::<i32, circuit::Writer<i32>>::new(vec![1, 2, 3, 4]);
src.output().inject_buffers(4);

let apply = InplaceApply::new();
let snk = VectorSink::new(4);

connect!(fg, src > apply > snk);
connect!(fg, src < snk);

Here the standard VectorSource writes into a circuit writer, the in-place Apply block mutates the buffer chunk, and the standard VectorSink reads from a circuit reader.

Accelerator Buffers

Accelerator buffers use the same connection model but expose APIs that match their hardware or framework:

  • Xilinx Zynq DMA buffers move chunks through AXI DMA-backed memory.
  • WGPU buffers use wgpu resources and can run in native or browser environments.
  • Vulkan buffers use Vulkan storage buffers.
  • Burn buffers use Burn tensors for machine-learning workloads.

These buffer APIs are intentionally not standardized beyond BufferWriter / BufferReader and their send-capable marker counterparts. A GPU block may need mapped buffers. A DMA block may need hardware buffer handles. A tensor buffer may need framework-specific tensor ownership.

Accelerator buffer implementations typically also implement CPU buffer traits at the host boundary:

  • Host-to-device writers implement CpuBufferWriter and, when send-capable, SendCpuBufferWriter, so a CPU source can write samples into an upload buffer.
  • Device-to-host readers implement CpuBufferReader and, when send-capable, SendCpuBufferReader, so a CPU sink can read processed samples after download.

For example, the WGPU example uses a CPU VectorSource with an H2DWriter, a GPU processing block, and a CPU VectorSink with a D2HReader:

use futuresdr::blocks::VectorSink;
use futuresdr::blocks::VectorSource;
use futuresdr::blocks::Wgpu;
use futuresdr::prelude::*;
use futuresdr::runtime::buffer::wgpu;
use futuresdr::runtime::buffer::wgpu::D2HReader;
use futuresdr::runtime::buffer::wgpu::H2DWriter;

let mut fg = Flowgraph::new();

let src = VectorSource::<f32, H2DWriter<f32>>::new(vec![1.0, 2.0, 3.0]);
let instance = wgpu::Instance::new().await;
let gpu = Wgpu::new(instance, 4096, 4, 4);
let snk = VectorSink::<f32, D2HReader<f32>>::new(3);

connect!(fg, src > gpu > snk);

See the complete accelerator examples:

See the API Docs for more details.

Logging

FutureSDR uses tracing for log and diagnostic messages. The common tracing macros are re-exported through the prelude, so application code can use them directly:

use futuresdr::prelude::*;

info!("starting application");
debug!("configured sample rate: {}", 1_000_000);
warn!("using fallback configuration");

The same macros are used internally by FutureSDR blocks, schedulers, and runtime code.

Default Logger

If no global tracing subscriber has been installed when a runtime is constructed, FutureSDR installs its default logger. The default logger writes compact formatted logs and uses a tracing_subscriber::EnvFilter.

use futuresdr::blocks::NullSink;
use futuresdr::blocks::NullSource;
use futuresdr::prelude::*;

fn main() -> Result<()> {
    let mut fg = Flowgraph::new();

    let src = NullSource::<u8>::new();
    let snk = NullSink::<u8>::new();

    connect!(fg, src > snk);

    // THIS IS NEVER LOGGED
    info!("starting flowgraph");

    // Default logger is created here
    Runtime::new().run(fg)?;

    // this is logged
    info!("flowgraph finished");

    Ok(())
}

Initialize Logging Early

If you want to use FutureSDR logging before constructing a runtime, call futuresdr::runtime::init() yourself:

use futuresdr::prelude::*;

fn main() -> Result<()> {
    futuresdr::runtime::init();

    info!("parsing arguments before runtime construction");

    let rt = Runtime::new();
    // build and run flowgraphs

    Ok(())
}

Calling init() more than once is harmless. If another subscriber is already installed, FutureSDR leaves it in place.

Log Level

FutureSDR’s default logger gets its log level from the runtime config key log_level. On native targets, config can come from the usual FutureSDR config files or environment variables described in Running Applications.

log_level = "debug"

For per-module filtering, set FUTURESDR_LOG. This uses EnvFilter syntax and overrides the default directive from log_level:

# set the default log level
FUTURESDR_LOG=warn cargo run

# disable logs from one module
FUTURESDR_LOG=lora::frame_sync=off cargo run --bin rx

# combine a default level with a module-specific rule
FUTURESDR_LOG=info,lora::decoder=off cargo run --release --bin rx

The accepted config values are tracing level filters such as off, error, warn, info, debug, and trace.

Compile-Time Filters

Warning

By default, FutureSDR enables feature flags that apply compile-time tracing filters: tracing_max_level_debug and tracing_release_max_level_info.

These filters remove more verbose log statements at compile time. In debug builds, trace messages are disabled. In release builds, messages more detailed than info are disabled.

The filters are transitive. If your application needs more detailed logs, disable FutureSDR’s default features and enable the features you need explicitly:

[dependencies]
futuresdr = { version = "...", default-features = false, features = ["audio", "seify"] }

Runtime filters such as FUTURESDR_LOG=trace cannot show messages that were removed by compile-time filters.

Custom Subscriber

Applications can install their own tracing subscriber before constructing a runtime. In that case, FutureSDR does not replace it, and FutureSDR’s logging config is not applied by the default logger.

use futuresdr::prelude::*;
use futuresdr::tracing::level_filters::LevelFilter;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;

fn main() -> Result<()> {
    let format = fmt::layer()
        .with_level(true)
        .with_target(true)
        .with_thread_ids(true)
        .with_thread_names(true)
        .compact();

    let filter = EnvFilter::from_env("MY_APP_LOG").add_directive(LevelFilter::INFO.into());

    tracing_subscriber::registry()
        .with(filter)
        .with(format)
        .init();

    info!("custom subscriber installed");

    let rt = Runtime::new();
    // build and run flowgraphs

    Ok(())
}

This is the right approach when an application already has its own logging policy, formatting, file logging, telemetry exporter, or framework integration.

Misc

Brief pointers to further information.

Android

See Android example.

WebAssembly

Use trunk serve --release to build and host the WebAssembly flowgraph. See the ZigBee example.

Web UI

FutureSDR’s reusable web UI components are implemented in the prophecy crate that is part of the FutureSDR repository. The default Prophecy GUI is served by the control port when a FutureSDR application is running, usually at http://127.0.0.1:1337/.

Prophecy is built with Leptos, a Rust web framework for reactive user interfaces. It is intended both as a small default UI and as a component library for application-specific control panels.

The crate provides:

  • RuntimeHandle and FlowgraphHandle: client-side handles for talking to the FutureSDR control-port API from the browser.
  • FlowgraphCanvas: graphical flowgraph view with blocks, stream edges, message edges, and clickable message inputs.
  • FlowgraphTable: table view of block IDs, instance names, stream ports, message ports, and blocking state.
  • Pmt, PmtInput, PmtInputList, and PmtEditor: components for displaying, entering, and submitting PMT values.
  • RadioSelector, ListSelector, and Slider: simple controls that post PMT values to a block message handler.
  • TimeSink: WebGL time-domain display that reads samples from a websocket or a Leptos signal.
  • Waterfall: WebGL waterfall display for streaming spectral data.
  • ConstellationSink and ConstellationSinkDensity: WebGL constellation displays for complex sample streams.
  • ArrayView: helper trait for exposing Rust numeric slices as JavaScript typed-array views for WebGL uploads.

For a custom web GUI that uses Prophecy components in an application-specific layout, see the WLAN example.

Mocker

The Mocker is a small harness for running one block directly, without building a full Flowgraph and without starting a Runtime.

This is useful for:

  • unit tests for a single block,
  • checking edge cases with carefully chosen input samples,
  • testing message handlers and message outputs,
  • microbenchmarks where the scheduler and graph setup would hide the cost of the block itself.

Mocker is available on native targets through futuresdr::runtime::mocker.

Stream Blocks

For stream blocks, instantiate the block with mocker::Reader<T> and mocker::Writer<T> buffer types. Then set the input samples, reserve output space, and run the block:

use futuresdr::blocks::Apply;
use futuresdr::runtime::mocker::Mocker;
use futuresdr::runtime::mocker::Reader;
use futuresdr::runtime::mocker::Writer;

let block: Apply<_, _, _, Reader<u32>, Writer<u32>> =
    Apply::with_buffers(|x: &u32| x + 1);

let mut mocker = Mocker::new(block);
mocker.input().set(vec![1, 2, 3]);
mocker.output().reserve(3);

mocker.run();

let (items, tags) = mocker.output().get();

assert_eq!(items, vec![2, 3, 4]);
assert!(tags.is_empty());

The mock reader exposes the input through the same CpuBufferReader API that a normal block sees at runtime. The mock writer stores produced samples in a vector that can be read with get() or drained with take().

Multiple Runs

run() calls the block’s work() method until the block stops requesting immediate re-entry through WorkIo::call_again. You can update the mocked input and run the same block again:

use futuresdr::blocks::Apply;
use futuresdr::runtime::mocker::Mocker;
use futuresdr::runtime::mocker::Reader;
use futuresdr::runtime::mocker::Writer;

let block: Apply<_, _, _, Reader<u32>, Writer<u32>> =
    Apply::with_buffers(|x: &u32| x + 1);

let mut mocker = Mocker::new(block);
mocker.output().reserve(6);

mocker.input().set(vec![1, 2, 3]);
mocker.run();

mocker.input().set(vec![4, 5, 6]);
mocker.run();

let (items, _) = mocker.output().get();

assert_eq!(items, vec![2, 3, 4, 5, 6, 7]);

If the block relies on init() or deinit() state, call those explicitly:

mocker.init();
mocker.run();
mocker.deinit();

Tags

Mock inputs can include item tags:

use futuresdr::runtime::dev::ItemTag;
use futuresdr::runtime::dev::Tag;

mocker.input().set_with_tags(
    vec![0.0_f32; 1024],
    vec![ItemTag {
        index: 256,
        tag: Tag::Id(256),
    }],
);

The output writer returns both produced samples and output tags:

let (items, tags) = mocker.output().get();

Message Blocks

Mocker can also exercise message handlers. Use post() to call a message handler on the wrapped block:

use futuresdr::blocks::MessageCopy;
use futuresdr::prelude::*;
use futuresdr::runtime::mocker::Mocker;

let mut mocker = Mocker::new(MessageCopy);

mocker.init();

let ret = mocker.post("in", Pmt::Usize(123))?;
assert_eq!(ret, Pmt::Ok);

mocker.run();

let messages = mocker.take_messages();
assert_eq!(messages, vec![vec![Pmt::Usize(123)]]);

Message outputs are captured per output port. Use messages() to clone the currently captured PMTs, or take_messages() to drain them.

Benchmarks

Because Mocker runs a block without a scheduler, it is useful for measuring the cost of one block implementation. The repository’s apply benchmarks use Mocker to compare several ways to apply a simple operation to samples.

For benchmark code that needs to call Kernel::work() directly, parts_mut() returns mutable access to the wrapped kernel, MessageOutputs, and BlockMeta:

let (kernel, message_outputs, meta) = mocker.parts_mut();

Most tests should prefer mocker.run(), since it matches the normal block work loop more closely.

Performance Measurement

FutureSDR performance work usually happens at two levels:

  • use Mocker to benchmark one block implementation,
  • use the perf/ applications to benchmark complete flowgraphs and scheduler or buffer configurations.

Always measure release builds. Debug builds are useful while developing, but they do not represent runtime performance.

Block Microbenchmarks

For a single block, use Mocker. It runs the block directly, without a scheduler and without a full Flowgraph, so the benchmark mostly measures the block’s work() implementation and the buffer operations it performs.

This is the right tool for comparing implementation choices inside one block, checking how performance changes with input size, or writing a Criterion benchmark around a small processing kernel. The repository’s apply benchmark is a compact example:

cargo bench --bench apply

Mocker is not a replacement for full-flowgraph benchmarks. It intentionally removes scheduling, message routing between blocks, and end-to-end stream topology effects.

Parameter Sweeps

The perf/ directory contains standalone benchmark applications for measuring complete configurations. These examples are useful when the question is about scheduler choice, buffer behavior, number of stages, number of pipes, sample counts, or other flowgraph-level parameters.

Many of the directories contain a Makefile that iterates over a parameter grid, writes CSV files to perf-data/, and provides helper targets for selected configurations:

cd perf/null
make

Inspect the local Makefile before running a sweep. Some benchmarks run for a long time, and the parameter ranges are intentionally broad.

Profiling One Configuration

To understand where time is spent in one specific configuration, profile that configuration directly. Samply works well for this because it records a profile and opens an interactive view in the browser:

samply record -- cargo run --release

For an independent example workspace, run it from that directory or pass the manifest path:

samply record -- cargo run --release --manifest-path=perf/null/Cargo.toml -- --config=flow

Enable debug symbols for release builds so the profile contains useful function names and source locations:

[profile.release]
debug = true

Add this to the Cargo.toml of the workspace you are profiling. For the root crate, that is the repository root. For a benchmark under perf/, it is usually the Cargo.toml inside that benchmark directory.

The flame graph view is often the most useful starting point. Look for unexpectedly large functions, allocation-heavy paths, synchronization overhead, and time spent outside the block code when the goal is to tune scheduling or buffering.

Stable Measurements

For reproducible results, reduce system noise. One practical approach on a systemd-based Linux machine is to move normal system work onto a small CPU set and run the benchmark on the remaining CPUs.

First, restrict the normal system slices to the CPUs reserved for the operating system:

SYSTEM_CPUS=0,1

sudo systemctl set-property --runtime -- user.slice AllowedCPUs=${SYSTEM_CPUS}
sudo systemctl set-property --runtime -- system.slice AllowedCPUs=${SYSTEM_CPUS}
sudo systemctl set-property --runtime -- init.scope AllowedCPUs=${SYSTEM_CPUS}

Then start the benchmark in its own transient unit on the CPUs reserved for the measurement:

sudo systemd-run --uid=$(id -u) --slice=sdr --wait -P -p AllowedCPUs=2,3 -d -- cargo run --release

The perf/migrate-processes.sh and perf/revert-processes.sh scripts show the same pattern for the repository benchmarks. The settings are runtime-only, but reset them after a measurement run or reboot before using the machine normally.

Custom Blocks

Custom blocks implement the processing logic that runs inside a flowgraph. A block usually consists of:

  • stream input/output fields marked with #[input] and #[output],
  • optional message inputs and outputs declared on the struct,
  • a Kernel implementation with init(), work(), and deinit() methods.

Blocks derive Block and implement Kernel. Blocks that need non-Send state, non-Send futures, or local-only buffers can run in a local domain.

Stream Block

This block multiplies f32 samples by a fixed gain:

use futuresdr::runtime::dev::prelude::*;

#[derive(Block)]
pub struct Scale {
    #[input]
    input: DefaultCpuReader<f32>,
    #[output]
    output: DefaultCpuWriter<f32>,
    gain: f32,
}

impl Scale {
    pub fn new(gain: f32) -> Self {
        Self {
            input: DefaultCpuReader::default(),
            output: DefaultCpuWriter::default(),
            gain,
        }
    }
}

impl Kernel for Scale {
    async fn work(
        &mut self,
        io: &mut WorkIo,
        _mo: &mut MessageOutputs,
        _meta: &mut BlockMeta,
    ) -> Result<()> {
        let input = self.input.slice();
        let output = self.output.slice();
        let n = input.len().min(output.len());

        for i in 0..n {
            output[i] = input[i] * self.gain;
        }

        self.input.consume(n);
        self.output.produce(n);

        if self.input.finished() {
            io.finished = true;
        }

        Ok(())
    }
}

slice() returns the currently available readable or writable window. After processing, call consume(n) and produce(n) with the number of items actually handled.

Set io.call_again = true when the block knows it can make more progress immediately. Set io.finished = true when the block is done and downstream ports should be notified.

Message Ports

Message inputs are declared with #[message_inputs(...)]. Each listed handler is an async method on the block. Message outputs are declared with #[message_outputs(...)] and used through MessageOutputs.

use futuresdr::runtime::dev::prelude::*;

#[derive(Block)]
#[message_inputs(set_gain)]
#[message_outputs(changed)]
pub struct AdjustableScale {
    #[input]
    input: DefaultCpuReader<f32>,
    #[output]
    output: DefaultCpuWriter<f32>,
    gain: f32,
}

impl AdjustableScale {
    async fn set_gain(
        &mut self,
        _io: &mut WorkIo,
        mo: &mut MessageOutputs,
        _meta: &mut BlockMeta,
        p: Pmt,
    ) -> Result<Pmt> {
        self.gain = f64::try_from(p)? as f32;
        mo.post("changed", Pmt::F32(self.gain)).await?;
        Ok(Pmt::Ok)
    }
}

Use #[message_inputs(set_gain = "gain")] when the public port name should differ from the Rust method name. This is useful for raw identifiers or compatibility with an existing control API.

Lifecycle Methods

Kernel methods are called in this order:

  • init(): once, after stream ports have been connected and validated.
  • work(): repeatedly, whenever data, messages, timers, or explicit wakeups make progress possible.
  • deinit(): once, during shutdown.

All methods receive MessageOutputs and BlockMeta. work() also receives WorkIo, which is the block’s way to communicate scheduling decisions back to the runtime.

Use io.block_on() when the block should sleep until the future returned by Kernel::block_on() completes. The block may still be called earlier if stream data or a message arrives.

Local Blocks

Use a local domain for non-Send state or non-Send futures:

use futuresdr::runtime::dev::prelude::*;

#[derive(Block)]
pub struct UiBoundBlock {
    #[input]
    input: LocalCpuReader<f32>,
}

impl Kernel for UiBoundBlock {
    async fn work(
        &mut self,
        io: &mut WorkIo,
        _mo: &mut MessageOutputs,
        _meta: &mut BlockMeta,
    ) -> Result<()> {
        if self.input.finished() {
            io.finished = true;
        }
        Ok(())
    }
}

Add local blocks to a LocalDomain with Flowgraph::add_local(). The constructor closure is executed in the local domain, so non-Send resources can be created there:

use futuresdr::prelude::*;
use futuresdr::runtime::buffer::LocalCpuReader;

let mut fg = Flowgraph::new();
let local = fg.local_domain()?;

let block = fg.add_local(local, || UiBoundBlock {
    input: LocalCpuReader::<f32>::default(),
});

For a group of local blocks, or for async construction on the local thread, use Flowgraph::domain_run() or Flowgraph::domain_run_async() and add blocks through the provided LocalDomainContext. On native targets a local domain is backed by a dedicated thread; on WASM it is backed by a dedicated web worker.

Testing

Use the Mocker to test one block without a full runtime. For graph-level behavior, build a small flowgraph with finite sources and sinks and run it with Runtime::run().

Custom Schedulers

Schedulers execute normal flowgraph block tasks and async tasks spawned through the runtime. Most applications should use Runtime::new() and the default SmolScheduler; write a scheduler only when you are experimenting with placement, latency, or executor integration.

The scheduler trait is:

pub trait Scheduler: Clone + Send + 'static {
    #[cfg(not(target_arch = "wasm32"))]
    fn run_domain(
        &self,
        blocks: Vec<Box<dyn Block>>,
        main_channel: &Sender<FlowgraphMessage>,
    ) -> Vec<Task<(BlockId, Box<dyn Block>)>>;

    fn spawn<T: Send + 'static>(
        &self,
        future: impl Future<Output = T> + Send + 'static,
    ) -> Task<T>;
}

run_domain() receives the normal send-capable blocks in a flowgraph. It must spawn each block, call block.run(main_channel).await, and return task handles that yield (BlockId, Box<dyn Block>). The runtime waits for those tasks and restores the finished block objects into the returned flowgraph.

spawn() runs general async tasks on the scheduler. Runtime::spawn(), Runtime::spawn_background(), and control-plane internals use this method.

Normal vs Local Work

Schedulers handle only the normal scheduling domain. The runtime manages local domains separately for:

  • blocks added through Flowgraph::add_local(),
  • blocks marked with #[blocking].

That separation lets scheduler implementations assume that run_domain() receives send-capable block tasks. Blocking or thread-affine work should be placed in a local domain instead of being hidden inside the normal scheduler.

Starting Point

Use the existing schedulers as templates:

  • SmolScheduler is a compact general-purpose scheduler backed by async_executor.
  • FlowScheduler shows deterministic block placement onto worker-local queues.

A minimal native scheduler usually needs:

  • a clonable handle to an executor,
  • worker thread lifecycle management,
  • an implementation of run_domain() that spawns every block and returns its task,
  • an implementation of spawn() for unrelated async tasks.

Selecting a Scheduler

Construct the runtime with your scheduler:

use futuresdr::prelude::*;

let scheduler = MyScheduler::new();
let rt = Runtime::with_scheduler(scheduler);

let fg = Flowgraph::new();
rt.run(fg)?;

Custom schedulers should preserve the runtime contract: every spawned block task must eventually return its block object, even if the block exits because the flowgraph was stopped. If a worker thread panics, treat it as a runtime failure rather than silently dropping block state.

Custom Buffers

Custom buffers define how stream samples move between blocks. A buffer implementation provides a writer type for the upstream block and a reader type for the downstream block. The basic connection contract is:

  • the writer implements BufferWriter,
  • the reader implements BufferReader,
  • BufferWriter::Reader names the matching reader type,
  • connect() stores the peer endpoints and any shared queues or resources,
  • validate() fails if the port was never connected or is otherwise unusable.

Most custom buffers also implement one of the higher-level buffer families:

  • CpuBufferWriter / CpuBufferReader for slice-based CPU buffers,
  • InplaceWriter / InplaceReader / InplaceBuffer for reusable in-place chunks,
  • accelerator-specific APIs for GPU, tensor, or DMA resources.

Port State

Use PortCore and ConnectionState for the common lifecycle:

use futuresdr::runtime::buffer::*;
use futuresdr::runtime::dev::BlockInbox;
use futuresdr::runtime::{BlockId, Error, PortId};

pub struct MyWriter<T> {
    core: PortCore,
    peer: ConnectionState<PortEndpoint>,
    _type: std::marker::PhantomData<T>,
}

impl<T> Default for MyWriter<T> {
    fn default() -> Self {
        Self {
            core: PortCore::new_disconnected(),
            peer: ConnectionState::disconnected(),
            _type: std::marker::PhantomData,
        }
    }
}

init() binds a port to its owning block id, port id, and inbox. The derive macro calls it during flowgraph construction. connect() receives the matching peer during Flowgraph::stream() or the connect! macro expansion.

Send-Capable Buffers

Normal native flowgraphs require send-capable buffers. The marker traits are implemented automatically when the concrete type and relevant futures satisfy the bounds:

  • SendBufferReader
  • SendBufferWriter
  • SendCpuBufferReader
  • SendCpuBufferWriter
  • SendInplaceReader
  • SendInplaceWriter

If a buffer is not Send, use a local domain and connect it with stream_local() or the ~> operator in connect!.

CPU Buffer Expectations

For CPU buffers, slice_with_tags() returns the currently readable or writable window. Tags use indices relative to that window. A block calls:

  • consume(n) after reading n input items,
  • produce(n) after writing n output items,
  • set_min_items(n) to request a minimum number of items before work,
  • set_min_buffer_size_in_items(n) to request a minimum backing capacity.

Do not advance read or write positions from slice() alone. The block controls advancement with consume() and produce().

Finish Notifications

Buffers propagate shutdown across stream edges:

  • a reader’s notify_finished() tells upstream writers that the downstream block is done,
  • a writer’s notify_finished() tells downstream readers that the upstream block is done,
  • finish() marks the local side as finished,
  • finished() lets the block observe that the peer is done.

Implementations normally use the peer BlockInbox or BlockNotifier stored during connection setup to wake the adjacent block.

In-Place Circuits

In-place buffers move owned chunks through the graph. The forward path is connected like a normal stream. A second circuit-closing connection wires the final consumer back to the source so empty buffers can be reused:

connect!(fg, src > apply > snk);
connect!(fg, src < snk);

Implement CircuitWriter when your writer can close that return path. The < operator calls Flowgraph::close_circuit(), which delegates to CircuitWriter::close_circuit().

Validation and Testing

Validation should catch unconnected ports, wrong peer state, and missing hardware resources before any block init() method runs. Prefer returning Error::ValidationError with the owning block and port context when possible.

Use Mocker for CPU buffers used by one block. For a new buffer implementation, also add a small flowgraph test that connects a finite source to a finite sink and checks that finish notification, tags, and repeated scheduler calls behave correctly.