[ Switch to styled version → ]
Subscribe to topics, publish events, and stream data in real time. The system is designed for fan-out scenarios where multiple consumers need the same data stream.
Every daemon runs an event stream broker on port 1002. Agents can subscribe to topics on any trusted peer and receive events in real time. Publishers send events to a topic, and the broker distributes them to all active subscribers.
For one-to-one messaging, use stream connections or data exchange.
Each daemon runs its own independent broker inside the daemon process. It manages subscriptions for that node only. There is no central message server.
When an agent subscribes to topics on another agent, its daemon opens a connection to the remote agent's event stream port (1002). The remote broker registers the subscription and pushes matching events over that connection. When an agent publishes to another agent, its daemon sends the event to the remote broker, which fans it out to all active subscribers.
To collect a fixed number of events, use a bounded subscription. This returns a JSON array.
pilotctl subscribe other-agent status --count 5 --timeout 60s The command returns an `events` object containing an array of `topic`, `data`, and `bytes`, and a `timeout` boolean.
To stream events indefinitely as NDJSON (one JSON object per line), use an unbounded subscription.
pilotctl subscribe other-agent status Each line is a standalone JSON object, for example: {"topic":"status","data":"online","bytes":6}
pilotctl publish other-agent status --data "processing complete"
pilotctl publish other-agent metrics --data '{"cpu":42,"mem":1024}' Events are delivered to all active subscribers of the topic on the target node. The command returns the `target`, `topic`, and `bytes`.
Use `*` as the topic to subscribe to all topics on a broker.
pilotctl subscribe other-agent "*" --count 10 `*` is a full wildcard that matches every topic. It is not a prefix glob, so `events.*` is not valid syntax. It is the only wildcard form available.
Without `--count`, subscriptions stream newline-delimited JSON (NDJSON) indefinitely. This format can be integrated with tools that process line-delimited JSON.
# Pipe events to jq for processing
pilotctl subscribe other-agent status | jq '.data'
# Log events to a file
pilotctl subscribe other-agent "*" >> events.jsonl
# Monitor metrics in real time
pilotctl subscribe other-agent metrics | while read -r line; do
echo "\$line" | jq -r '"CPU: \(.data | fromjson | .cpu)%"'
done Pub/sub is for real-time streaming where dropping an occasional event is acceptable. For guaranteed delivery, use data exchange or stream connections.
Topic names are arbitrary strings. A convention is to use dot-separated namespaces.
Since `*` is the only wildcard, prefix-based filtering (like `metrics.*`) is not supported. Subscribe to a specific topic or use `*` and filter on the client side.
The event stream uses a simple wire protocol.
The broker is an in-memory fan-out system with no queues, disk I/O, or acknowledgments.
For real-time monitoring, an agent publishes system metrics. A dashboard agent subscribes and renders them. The `publish` command always targets a peer, not the local agent.
# From any peer with metrics to share
pilotctl publish dashboard-agent metrics --data '{"cpu":42,"mem":1024,"disk":80}'
# On the dashboard agent
pilotctl subscribe monitored-agent metrics >> dashboard-data.jsonl For coordination, a controller publishes tasks, and workers subscribe to pick them up.
# Workers subscribe
pilotctl subscribe controller-agent tasks --count 1
# Controller publishes work
pilotctl publish controller-agent tasks --data '{"job":"process-batch-42"}' For event-driven workflows, actions can be triggered in response to events from other agents.
# React to completion events
pilotctl subscribe pipeline-agent task.completed | while read -r event; do
echo "Task done, starting next stage..."
done The daemon fires webhook events for pub/sub activity: `pubsub.subscribed`, `pubsub.unsubscribed`, and `pubsub.published`.