Command Queue (2026-01-03)
We now serialize command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once, while allowing safe parallelism across sessions.Why
- Some auto-reply commands are expensive (LLM calls) and can collide when multiple inbound messages arrive close together.
- Serializing avoids competing for terminal/stdin, keeps logs readable, and reduces the chance of rate limits from upstream tools.
How it works
- A lane-aware FIFO queue drains each lane synchronously.
runEmbeddedPiAgentenqueues by session key (lanesession:<key>) to guarantee only one active run per session.- Each session run is then queued into a global lane (
mainby default) so overall parallelism is capped byagent.maxConcurrent. - When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting.
- Typing indicators (
onReplyStart) still fire immediately on enqueue so user experience is unchanged while we wait our turn.
Queue modes (per provider)
Inbound messages can steer the current run, wait for a followup turn, or do both:steer: inject immediately into the current run (cancels pending tool calls after the next tool boundary). If not streaming, falls back to followup.followup: enqueue for the next agent turn after the current run ends.collect: coalesce all queued messages into a single followup turn (default).steer-backlog(akasteer+backlog): steer now and preserve the message for a followup turn.interrupt(legacy): abort the active run for that session, then run the newest message.queue(legacy alias): same assteer.
collect/steer if you want
one response per inbound message.
Send /queue collect as a standalone command (per-session) or set routing.queue.byProvider.discord: "collect".
Defaults (when unset in config):
- All surfaces →
collect
routing.queue:
Queue options
Options apply tofollowup, collect, and steer-backlog (and to steer when it falls back to followup):
debounceMs: wait for quiet before starting a followup turn (prevents “continue, continue”).cap: max queued messages per session.drop: overflow policy (old,new,summarize).
debounceMs: 1000, cap: 20, drop: summarize.
Per-session overrides
- Send
/queue <mode>as a standalone command to store the mode for the current session. - Options can be combined:
/queue collect debounce:2s cap:25 drop:summarize /queue defaultor/queue resetclears the session override.
Scope and guarantees
- Applies only to config-driven command replies; plain text replies are unaffected.
- Default lane (
main) is process-wide for inbound + main heartbeats; setagent.maxConcurrentto allow multiple sessions in parallel. - Additional lanes may exist (e.g.
cron) so background jobs can run in parallel without blocking inbound replies. - Per-session lanes guarantee that only one agent run touches a given session at a time.
- No external dependencies or background worker threads; pure TypeScript + promises.
Troubleshooting
- If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining.
- If you need queue depth, enable verbose logs and watch for queue timing lines.