Tune Throughput and Queue Depth
| Field | Value |
|---|---|
| Difficulty | Advanced |
| Estimated Read Time | 15-20 minutes |
| Labels | performance, tuning, async, queues |
Concept
Tune three knobs that control async pipeline behavior under load — queue depth, overflow policy, metrics — and read back what happened. Performance tuning only helps once the correctness baseline is stable; this chapter assumes it is.
The chapter exercises RunOptions at the level a production pipeline needs to control:
queue_depth: how many in-flight samples the runtime accepts.overflow_policy:Block,KeepLatest, orDropIncomingwhen the queue is full.enable_metrics: turn on per-run metric collection.
APIs introduced
pyneat.RunOptions()with.queue_depth,.overflow_policy,.output_memory,.enable_metrics.pyneat.OverflowPolicy.{Block,KeepLatest,DropIncoming}— the policy values.run.try_push(sample)— non-blocking push that returns whether the sample was accepted.run.stats()— latency/enqueue/pull counters.run.input_stats()— push-side counters (accepted, dropped, queue fullness).
When to use this
- Throughput bottlenecks: increase queue depth and inspect drop/latency behavior.
- Low-latency preference: favor
KeepLatestin bursty streams. - Backpressure-sensitive ingestion: prefer
Blockfor strict loss control.
Prerequisites Chapter 002 (async basics). Chapter 011 (diagnostics).
References
Learning Process
- Build an async run path with explicit queue and overflow settings.
- Push a deterministic workload and drain outputs to completion.
- Inspect metrics and input stats for latency/drop behavior.
Run
Python:
python3 share/sima-neat/tutorials/015_tune_throughput_and_queues/tune_throughput_and_queues.py \
--iters 32 --queue 4 --drop block
C++ (prebuilt):
./lib/sima-neat/tutorials/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block
C++ (build from source):
./build.sh --target tutorial_015_tune_throughput_and_queues
./build/tutorials-standalone/tutorial_015_tune_throughput_and_queues \
--iters 32 --queue 4 --drop block
To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.
In Practice
Practical guidance for queue sizing, drop policies, caps changes, and output-lifetime safety.
Queue sizing (queue_depth)
Heuristics:
- Start with
queue_depth = 4–16for low‑latency pipelines. - Increase queues if your producer is bursty or if downstream elements have variable latency (decode/MLA/postproc).
- Keep queues small if you need freshest frames (e.g., live camera preview).
Overflow policy (RunOptions::overflow_policy)
Block: safest for correctness; producer waits when queue is full.DropIncoming: keep queued work, drop incoming samples when saturated.KeepLatest: prefer freshest frames, drop the oldest queued samples.
For live feeds, KeepLatest usually yields the lowest end-to-end latency.
Presets and renegotiation
Use RunOptions::preset to control latency/safety tradeoffs:
Realtime: lowest latency, aggressive freshness behavior.Balanced: starts zero-copy when possible, runs startup probe checks, and falls back to copy mode if reliability trips.Reliable: conservative behavior and stable output ownership.
Input shape renegotiation is automatic for dynamic inputs.
Output lifetimes (output_memory)
output_memory = Owned: returnedTensorowns its data.output_memory = ZeroCopy: tensor may reference runtime buffers reused after pull.output_memory = Auto: runtime chooses zero-copy first and falls back to owned where reliability requires it.
If you need to keep tensor data beyond the current step, call clone() or cpu().contiguous().
Caps change decision flow
Use this guide for push pipelines (Input):
- Do you need fixed caps?
- Yes → set
InputOptions::caps_override. Renegotiation is disabled. - No → leave
caps_overrideempty.
- Yes → set
- Do you expect size changes?
- Yes → no extra flag is required; dynamic dimensions are accepted by default.
- No → set fixed dimensions in
InputOptionsand optionally usecaps_override.
Buffer pool safety
RunAdvancedOptions::max_input_bytessets a hard upper bound on input buffer allocation.- If a larger buffer is required, runtime fails fast with an explicit error.
Use these to protect long‑running processes from unbounded allocations when inputs change size.
Code
// Tune async Graph throughput via RunOptions: queue_depth, overflow_policy, metrics.
//
// Usage:
// tutorial_015_tune_throughput_and_queues [--iters 32] [--queue 4] [--drop block|latest|incoming]
#include "neat.h"
#include <opencv2/core.hpp>
#include <iostream>
#include <stdexcept>
#include <string>
namespace {
bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}
int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}
simaai::neat::OverflowPolicy parse_drop_policy(int argc, char** argv) {
std::string mode;
if (!get_arg(argc, argv, "--drop", mode))
return simaai::neat::OverflowPolicy::Block;
if (mode == "latest")
return simaai::neat::OverflowPolicy::KeepLatest;
if (mode == "incoming")
return simaai::neat::OverflowPolicy::DropIncoming;
return simaai::neat::OverflowPolicy::Block;
}
} // namespace
int main(int argc, char** argv) {
try {
const int iters = parse_int_arg(argc, argv, "--iters", 32);
const int queue_depth = parse_int_arg(argc, argv, "--queue", 4);
cv::Mat rgb(120, 160, CV_8UC3, cv::Scalar(70, 20, 200));
if (!rgb.isContinuous())
rgb = rgb.clone();
simaai::neat::Graph graph;
simaai::neat::InputOptions in;
in.format = "RGB";
in.width = rgb.cols;
in.height = rgb.rows;
in.depth = rgb.channels();
in.is_live = true;
graph.add(simaai::neat::nodes::Input(in));
graph.add(simaai::neat::nodes::Output());
// CORE LOGIC
// RunOptions controls how the async runner buffers and drops frames.
simaai::neat::RunOptions opt;
opt.queue_depth = queue_depth;
opt.overflow_policy = parse_drop_policy(argc, argv);
opt.output_memory = simaai::neat::OutputMemory::Owned;
opt.enable_metrics = true;
auto run = graph.build(std::vector<cv::Mat>{rgb}, simaai::neat::RunMode::Async, opt);
// try_push never blocks; pair it with close_input + drain pull loop.
for (int i = 0; i < iters; ++i)
(void)run.try_push(std::vector<cv::Mat>{rgb});
run.close_input();
int pulled = 0;
while (run.pull(/*timeout_ms=*/1000).has_value())
++pulled;
const auto stats = run.stats();
const auto input_stats = run.input_stats();
std::cout << "inputs_enqueued=" << stats.inputs_enqueued << "\n";
std::cout << "inputs_dropped=" << stats.inputs_dropped << "\n";
std::cout << "outputs_pulled=" << pulled << "\n";
std::cout << "avg_latency_ms=" << stats.avg_latency_ms << "\n";
std::cout << "avg_push_us=" << input_stats.avg_push_us << "\n";
std::cout << "renegotiations=" << input_stats.renegotiations << "\n";
std::cout << "[OK] 015_tune_throughput_and_queues\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}