Skip to main content

Tune Throughput and Queue Depth

FieldValue
DifficultyAdvanced
Estimated Read Time15-20 minutes
Labelsperformance, tuning, async, queues

Concept

Tune three knobs that control async pipeline behavior under load — queue depth, overflow policy, metrics — and read back what happened. Performance tuning only helps once the correctness baseline is stable; this chapter assumes it is.

The chapter exercises RunOptions at the level a production pipeline needs to control:

  • queue_depth: how many in-flight samples the runtime accepts.
  • overflow_policy: Block, KeepLatest, or DropIncoming when the queue is full.
  • enable_metrics: turn on per-run metric collection.

APIs introduced

  • pyneat.RunOptions() with .queue_depth, .overflow_policy, .output_memory, .enable_metrics.
  • pyneat.OverflowPolicy.{Block,KeepLatest,DropIncoming} — the policy values.
  • run.try_push(sample) — non-blocking push that returns whether the sample was accepted.
  • run.stats() — latency/enqueue/pull counters.
  • run.input_stats() — push-side counters (accepted, dropped, queue fullness).

When to use this

  • Throughput bottlenecks: increase queue depth and inspect drop/latency behavior.
  • Low-latency preference: favor KeepLatest in bursty streams.
  • Backpressure-sensitive ingestion: prefer Block for strict loss control.

Prerequisites Chapter 002 (async basics). Chapter 011 (diagnostics).

References

Learning Process

  1. Build an async run path with explicit queue and overflow settings.
  2. Push a deterministic workload and drain outputs to completion.
  3. Inspect metrics and input stats for latency/drop behavior.

Run

Python:

python3 share/sima-neat/tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.py \
--iters 32 --queue 4 --drop block

C++ (prebuilt):

./lib/sima-neat/tutorials/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block

C++ (build from source):

./build.sh --target tutorial_015_tune_throughput_and_queues
./build/tutorials-standalone/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block

To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.

In Practice

Practical guidance for queue sizing, drop policies, caps changes, and output-lifetime safety.

Queue sizing (queue_depth)

Heuristics:

  • Start with queue_depth = 4–16 for low‑latency pipelines.
  • Increase queues if your producer is bursty or if downstream elements have variable latency (decode/MLA/postproc).
  • Keep queues small if you need freshest frames (e.g., live camera preview).

Overflow policy (RunOptions::overflow_policy)

  • Block: safest for correctness; producer waits when queue is full.
  • DropIncoming: keep queued work, drop incoming samples when saturated.
  • KeepLatest: prefer freshest frames, drop the oldest queued samples.

For live feeds, KeepLatest usually yields the lowest end-to-end latency.

Presets and renegotiation

Use RunOptions::preset to control latency/safety tradeoffs:

  • Realtime: lowest latency, aggressive freshness behavior.
  • Balanced: starts zero-copy when possible, runs startup probe checks, and falls back to copy mode if reliability trips.
  • Reliable: conservative behavior and stable output ownership.

Input shape renegotiation is automatic for dynamic inputs.

Output lifetimes (output_memory)

  • output_memory = Owned: returned Tensor owns its data.
  • output_memory = ZeroCopy: tensor may reference runtime buffers reused after pull.
  • output_memory = Auto: runtime chooses zero-copy first and falls back to owned where reliability requires it.

If you need to keep tensor data beyond the current step, call clone() or cpu().contiguous().

Caps change decision flow

Use this guide for push pipelines (Input):

  1. Do you need fixed caps?
    • Yes → set InputOptions::caps_override. Renegotiation is disabled.
    • No → leave caps_override empty.
  2. Do you expect size changes?
    • Yes → no extra flag is required; dynamic dimensions are accepted by default.
    • No → set fixed dimensions in InputOptions and optionally use caps_override.

Buffer pool safety

  • RunAdvancedOptions::max_input_bytes sets a hard upper bound on input buffer allocation.
  • If a larger buffer is required, runtime fails fast with an explicit error.

Use these to protect long‑running processes from unbounded allocations when inputs change size.

Code

tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.cpp
// Tune async Graph throughput via RunOptions: queue_depth, overflow_policy, metrics.
//
// Usage:
// tutorial_015_tune_throughput_and_queues [--iters 32] [--queue 4] [--drop block|latest|incoming]

#include "neat.h"

#include <opencv2/core.hpp>

#include <iostream>
#include <stdexcept>
#include <string>

namespace {

bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}

int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}

simaai::neat::OverflowPolicy parse_drop_policy(int argc, char** argv) {
std::string mode;
if (!get_arg(argc, argv, "--drop", mode))
return simaai::neat::OverflowPolicy::Block;
if (mode == "latest")
return simaai::neat::OverflowPolicy::KeepLatest;
if (mode == "incoming")
return simaai::neat::OverflowPolicy::DropIncoming;
return simaai::neat::OverflowPolicy::Block;
}

} // namespace

int main(int argc, char** argv) {
try {
const int iters = parse_int_arg(argc, argv, "--iters", 32);
const int queue_depth = parse_int_arg(argc, argv, "--queue", 4);

cv::Mat rgb(120, 160, CV_8UC3, cv::Scalar(70, 20, 200));
if (!rgb.isContinuous())
rgb = rgb.clone();

simaai::neat::Graph graph;
simaai::neat::InputOptions in;
in.format = "RGB";
in.width = rgb.cols;
in.height = rgb.rows;
in.depth = rgb.channels();
in.is_live = true;
graph.add(simaai::neat::nodes::Input(in));
graph.add(simaai::neat::nodes::Output());

// CORE LOGIC
// RunOptions controls how the async runner buffers and drops frames.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;
opt.enable_metrics = true;

auto run = graph.build(std::vector<cv::Mat>{rgb}, simaai::neat::RunMode::Async, opt);

// try_push never blocks; pair it with close_input + drain pull loop.
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();

int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;

const auto stats = run.stats();
const auto input_stats = run.input_stats();

std::cout << "inputs_enqueued=" << stats.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << stats.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << stats.avg_latency_ms << "\n";
std::cout << "avg_push_us=" << input_stats.avg_push_us << "\n";
std::cout << "renegotiations=" << input_stats.renegotiations << "\n";
std::cout << "[OK] 015_tune_throughput_and_queues\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}

Source