Skip to main content

Run Multiple Streams in One Graph

FieldValue
DifficultyAdvanced
Estimated Read Time20-25 minutes
Labelsgraph, multistream, scheduler, join

Concept

Run multiple logical streams through one public Graph and combine two named inputs into one deterministic bundle output. This is the pattern behind multi-camera or multi-source systems where related inputs must be aligned before downstream processing.

The graph here is created with:

graph = pyneat.graphs.combine(["left", "right"],
"combined",
pyneat.CombinePolicy.ByFrame)

Each pushed sample carries a stream_id and frame_id. CombinePolicy.ByFrame waits until both named inputs have produced the same frame_id, then emits one bundle from run.pull("combined").

APIs introduced

  • pyneat.graphs.combine(inputs, output, policy) — build a reusable public Graph fragment.
  • pyneat.CombinePolicy.ByFrame — combine samples whose Sample.frame_id values match.
  • pyneat.CombinePolicy.ByPts — combine samples whose Sample.pts_ns presentation timestamps match.
  • run.push("left", [sample]) / run.push("right", [sample]) — named multi-input push.
  • run.pull("combined") — named output pull.

When to use this

  • Multi-camera ingestion where each stream must make progress independently.
  • Parallel branch processing (e.g. two models running side-by-side) that must rejoin outputs correctly.
  • Diagnosing dropped or misaligned stream outputs under load.

Prerequisites Chapter 012 (Graph basics). Chapter 009 (bundle samples) helps for join semantics.

References

Learning Process

  1. Generate deterministic per-stream/per-frame samples with explicit tags.
  2. Build a public combine Graph with named inputs and one named output.
  3. Push all expected inputs and pull joined outputs.
  4. Validate output count and bundle cardinality.

Run

Python:

python3 share/sima-neat/tutorials/014_run_multiple_streams/run_multiple_streams.py \
--streams 8 --frames 4

C++ (prebuilt):

./lib/sima-neat/tutorials/tutorial_014_run_multiple_streams \
--streams 8 --frames 4

C++ (build from source):

./build.sh --target tutorial_014_run_multiple_streams
./build/tutorials-standalone/tutorial_014_run_multiple_streams \
--streams 8 --frames 4

To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.

Code

tutorials/014_run_multiple_streams/run_multiple_streams.cpp
// Multistream public Graph: named inputs -> Combine(ByFrame) -> named output bundle.
//
// Usage:
// tutorial_014_run_multiple_streams [--streams 8] [--frames 4]

#include "neat.h"

#include <cstdint>
#include <iostream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

std::vector<int64_t> contiguous_strides_bytes(const std::vector<int64_t>& shape,
int64_t elem_bytes) {
std::vector<int64_t> strides(shape.size(), 0);
int64_t stride = elem_bytes;
for (int i = static_cast<int>(shape.size()) - 1; i >= 0; --i) {
strides[static_cast<size_t>(i)] = stride;
stride *= shape[static_cast<size_t>(i)];
}
return strides;
}

simaai::neat::Sample make_rgb_sample(const std::string& stream_id, int frame_id) {
const int w = 8;
const int h = 6;
const int c = 3;
const std::size_t bytes = static_cast<std::size_t>(w) * h * c;

simaai::neat::Tensor t;
t.device = {simaai::neat::DeviceType::CPU, 0};
t.dtype = simaai::neat::TensorDType::UInt8;
t.layout = simaai::neat::TensorLayout::HWC;
t.shape = {h, w, c};
t.semantic.image = simaai::neat::ImageSpec{simaai::neat::ImageSpec::PixelFormat::RGB, ""};
t.storage = simaai::neat::make_cpu_owned_storage(bytes);
t.strides_bytes = contiguous_strides_bytes(t.shape, 1);
t.read_only = false;
{
auto map = t.map(simaai::neat::MapMode::Write);
auto* p = static_cast<std::uint8_t*>(map.data);
for (std::size_t i = 0; i < bytes; ++i)
p[i] = static_cast<std::uint8_t>(i % 255);
}
t.read_only = true;

simaai::neat::Sample sample;
sample.kind = simaai::neat::SampleKind::Tensor;
sample.tensor = std::move(t);
sample.frame_id = frame_id;
sample.stream_id = stream_id;
return sample;
}

} // namespace

int main(int argc, char** argv) {
try {
const int streams = parse_int_arg(argc, argv, "--streams", 8);
const int frames = parse_int_arg(argc, argv, "--frames", 4);

// CORE LOGIC
// `graphs::Combine` is a normal public Graph fragment. It declares two
// named inputs ("left", "right") and one named output ("combined"). ByFrame
// means the runtime emits one bundle only after both inputs have delivered
// samples with the same Sample::frame_id.
simaai::neat::Graph graph = simaai::neat::graphs::Combine({"left", "right"}, "combined",
simaai::neat::CombinePolicy::ByFrame);

std::cout << graph.describe() << "\n";

simaai::neat::Run run = graph.build();

for (int frame = 0; frame < frames; ++frame) {
for (int sid = 0; sid < streams; ++sid) {
const int logical_frame = frame * streams + sid;
if (!run.push("left", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("left push failed: " + run.last_error());
}
if (!run.push("right", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("right push failed: " + run.last_error());
}
}
}

const int expected = streams * frames;
int received = 0;
int first_fields = -1;
for (int i = 0; i < expected; ++i) {
auto maybe_bundle = run.pull("combined", /*timeout_ms=*/2000);
if (!maybe_bundle.has_value()) {
throw std::runtime_error("timed out waiting for combined output");
}
const auto& bundle = *maybe_bundle;
if (first_fields < 0)
first_fields = static_cast<int>(bundle.fields.size());
++received;
if (i < 4) {
std::cout << "bundle stream=" << bundle.stream_id << " fields=" << bundle.fields.size()
<< "\n";
}
}

run.close();

if (received != expected)
throw std::runtime_error("expected=" + std::to_string(expected) +
" received=" + std::to_string(received));
if (first_fields != 2)
throw std::runtime_error("join should emit an image+bbox bundle");

std::cout << "received=" << received << " fields=" << first_fields << "\n";
std::cout << "[OK] 014_run_multiple_streams\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source