PRD-003-1 — SSE Event Pipeline & ARQ Worker Specification¶
| Field | Value |
|---|---|
| Document ID | PRD-003-1 |
| Version | 1.0 |
| Status | DRAFT |
| Date | March 2026 |
| Parent Doc | PRD-003 |
| Related Docs | PRD-002 (Frontend SSE schema) |
1. Purpose & Scope¶
This document fully specifies the transform_langgraph_event() function and the surrounding
translation layer between LangGraph's internal event stream and the frontend SSE schema defined
in PRD-002.
PRD-003 establishes that the ARQ worker calls graph.astream_events() and publishes transformed
events to a Redis Pub/Sub channel. This document answers how that transformation works —
every LangGraph event type, every mapping rule, every edge case, and the complete annotated
run_triage() function.
Boundary with PRD-003: PRD-003 specifies the graph structure, state schema, HITL flow, and
checkpointing. This document covers only the event pipeline: what comes out of
astream_events() and how it becomes frontend SSE events.
2. astream_events v2 Reference¶
graph.astream_events(input, config=config, version="v2") is an async generator that yields
event dicts. The version="v2" parameter is required and changes event naming and metadata.
Full Event Type Table¶
| Event type | Fires when | event["data"] contents |
Key event["metadata"] fields |
|---|---|---|---|
on_chain_start |
A LangGraph node begins executing | {"input": <node input dict>} |
langgraph_node, langgraph_step, langgraph_checkpoint_ns, langgraph_triggers, langgraph_path |
on_chain_end |
A LangGraph node finishes executing | {"output": <node return dict>} |
langgraph_node, langgraph_step, langgraph_checkpoint_ns |
on_chain_stream |
Intermediate output from a node mid-run | {"chunk": <partial output>} |
langgraph_node, langgraph_step |
on_chat_model_stream |
An LLM token is generated inside a node | {"chunk": AIMessageChunk} |
langgraph_node, langgraph_step, langgraph_checkpoint_ns |
on_tool_start |
A tool invocation begins inside a node | {"input": <tool input dict>} |
langgraph_node, langgraph_step |
on_tool_end |
A tool invocation completes inside a node | {"output": <tool result>} |
langgraph_node, langgraph_step |
on_custom_event |
Application code called emit_event() |
{"data": <custom payload>} |
langgraph_node |
Metadata Field Glossary¶
| Field | Type | Description |
|---|---|---|
langgraph_node |
str |
Name of the currently executing graph node (e.g. "investigator") |
langgraph_step |
int |
Monotonically increasing step counter for the current graph execution |
langgraph_checkpoint_ns |
str |
Namespace identifying the active checkpoint; format: "node_name:uuid" for parallel branches |
langgraph_triggers |
list[str] |
Which edges triggered this node (usually ["supervisor"] for worker nodes) |
langgraph_path |
tuple[str, ...] |
Full path of the current execution context (useful for nested graphs) |
Token Content Formats¶
on_chat_model_stream events contain event["data"]["chunk"] which is an AIMessageChunk.
The .content field varies by provider:
- OpenAI / most providers:
chunk.contentis astr— the raw token text. - Anthropic:
chunk.contentmay be alist[dict], where each dict has{"type": "text", "text": "..."}.
The transformer uses chunk.text (a TextAccessor str subclass) which handles both forms internally — no branching needed in application code (see §6).
Note:
on_llm_streamis the older pre-v2 event name. Some non-OpenAI providers and older LangChain integrations still emit it. The transformer handles both (see §11).
3. LangGraph → Frontend SSE Mapping Table¶
This is the authoritative mapping between LangGraph events and the frontend SSE schema from PRD-002. Every SSE event type in PRD-002's event table has a row here.
| LangGraph event | Condition | Frontend SSE event | Payload mapping | Notes |
|---|---|---|---|---|
on_chain_start |
metadata["langgraph_node"] in WORKER_NODES |
agent.spawned |
agent_id: new UUID stored in spawned_agents; agent_name: langgraph_node; node: langgraph_node |
Every on_chain_start generates a new agent card — including re-entries (node called twice); each gets its own UUID |
on_chat_model_stream |
metadata["langgraph_node"] in WORKER_NODES AND token non-empty |
agent.token |
agent_id: from spawned_agents[langgraph_node]; token: extracted string token |
Drop events where extracted token is "" |
on_tool_start |
metadata["langgraph_node"] in WORKER_NODES |
agent.tool_call |
agent_id: from spawned_agents; tool_name: event["name"]; input_preview: truncated JSON of data["input"] at 60 chars |
Requires ToolNode for Tavily and any other tool — on_tool_start is not emitted by RunnableLambda; see PRD-004-1 §5 |
on_tool_end |
metadata["langgraph_node"] in WORKER_NODES |
agent.tool_result |
agent_id: from spawned_agents; tool_name: event["name"]; result_summary: str(data["output"])[:120] |
Same ToolNode requirement as on_tool_start |
on_chain_end |
metadata["langgraph_node"] in WORKER_NODES |
agent.done |
agent_id: from spawned_agents; node: langgraph_node; elapsed_ms: not directly available — omit or compute from wall time |
— |
Post-loop aget_state() |
bool(state.tasks) is True after loop exits |
graph.interrupt |
question: from interrupt_payload.question; context: from interrupt_payload.context (HumanExchange dot fields) |
See §7 for interrupt detection details |
| Answer endpoint | After Command(resume=...) succeeds |
graph.resumed |
job_id |
Emitted by the answer endpoint, not the worker |
on_chain_end |
metadata["langgraph_node"] in ALL_NODES (any node) |
graph.node_complete |
node: langgraph_node; step: langgraph_step |
Includes supervisor; useful for ExecutionTimeline |
| Pause endpoint | After interrupt fires for manual pause | graph.paused |
job_id |
Emitted by the pause endpoint, not the worker |
on_chat_model_stream |
metadata["langgraph_node"] == "writer" AND token non-empty |
output.token |
token: extracted string; section: derived from langgraph_checkpoint_ns |
Uses checkpoint namespace to identify which RunnableParallel branch |
on_chain_end |
metadata["langgraph_node"] == "writer" AND event["name"] in {"report", "comment_draft", "ticket_draft"} |
output.section_done |
section: event["name"] |
See §9 for RunnableParallel details |
After astream_events loop completes without interrupt |
— | job.done |
— | Emitted by worker after loop exits clean |
Exception in astream_events loop |
— | job.failed |
error: exception message |
Emitted by worker's except block |
ALL_NODES Constant¶
ALL_NODES = {
"supervisor", "investigator", "codebase_search",
"web_search", "critic", "human_input", "writer"
}
4. WORKER_NODES Constant¶
WORKER_NODES is the set of node names that generate agent cards in the UI. The supervisor and
human_input nodes are excluded — they do not produce agent cards.
This constant is used in transform_langgraph_event() to gate which events produce
agent.* SSE events.
5. SSE Event Type Definitions¶
All SSE event types are declared as TypedDicts so every payload is statically typed
end-to-end. No dict literals, no cast(), no isinstance().
from typing import Literal, TypeAlias, TypedDict
from langchain_core.runnables.schema import StandardStreamEvent
class LangGraphEventMetadata(TypedDict, total=False):
langgraph_node: str
langgraph_step: int
langgraph_checkpoint_ns: str
langgraph_triggers: list[str]
class AgentSpawnedEvent(TypedDict):
type: Literal["agent.spawned"]
agent_id: str
agent_name: str
node: str
class AgentTokenEvent(TypedDict):
type: Literal["agent.token"]
agent_id: str
token: str
class OutputTokenEvent(TypedDict):
type: Literal["output.token"]
token: str
section: str | None
class AgentToolCallEvent(TypedDict):
type: Literal["agent.tool_call"]
agent_id: str
tool_name: str
input_preview: str
class AgentToolResultEvent(TypedDict):
type: Literal["agent.tool_result"]
agent_id: str
tool_name: str
result_summary: str
class AgentDoneEvent(TypedDict):
type: Literal["agent.done"]
agent_id: str
node: str
class OutputSectionDoneEvent(TypedDict):
type: Literal["output.section_done"]
section: str
class GraphNodeCompleteEvent(TypedDict):
type: Literal["graph.node_complete"]
node: str
step: int | None
SseEvent: TypeAlias = (
AgentSpawnedEvent | AgentTokenEvent | OutputTokenEvent |
AgentToolCallEvent | AgentToolResultEvent | AgentDoneEvent |
OutputSectionDoneEvent | GraphNodeCompleteEvent
)
6. LangGraphEventTransformer Specification¶
The transformer replaces an if/elif chain with a dispatch table injected via DI.
Each handler is a plain typed function; the transformer class is stateless beyond its
_handlers dict.
Handler Type¶
from collections.abc import Callable
from langchain_core.messages import AIMessageChunk
_EventHandler: TypeAlias = Callable[[StandardStreamEvent, dict[str, str]], list[SseEvent]]
Metadata Helper¶
def _meta(event: StandardStreamEvent) -> LangGraphEventMetadata:
return event.get("metadata", {}) # type: ignore[return-value]
Individual Handlers¶
def _handle_stream(event: StandardStreamEvent, spawned_agents: dict[str, str]) -> list[SseEvent]:
meta = _meta(event)
node = meta.get("langgraph_node", "")
agent_id = spawned_agents.get(node, "")
chunk: AIMessageChunk = event["data"]["chunk"] # Any → typed (valid Pyright)
token = chunk.text # TextAccessor — str subclass
if not token:
return []
if agent_id:
return [AgentTokenEvent(type="agent.token", agent_id=agent_id, token=token)]
return [OutputTokenEvent(type="output.token", token=token, section=node or None)]
def _handle_tool_start(event: StandardStreamEvent, spawned_agents: dict[str, str]) -> list[SseEvent]:
meta = _meta(event)
node = meta.get("langgraph_node", "")
agent_id = spawned_agents.get(node, "")
if not agent_id:
return []
tool_name: str = event["name"]
input_preview = str(event["data"].get("input", ""))[:120]
return [AgentToolCallEvent(
type="agent.tool_call", agent_id=agent_id,
tool_name=tool_name, input_preview=input_preview,
)]
def _handle_tool_end(event: StandardStreamEvent, spawned_agents: dict[str, str]) -> list[SseEvent]:
meta = _meta(event)
node = meta.get("langgraph_node", "")
agent_id = spawned_agents.get(node, "")
if not agent_id:
return []
tool_name: str = event["name"]
result_summary = str(event["data"].get("output", ""))[:120]
return [AgentToolResultEvent(
type="agent.tool_result", agent_id=agent_id,
tool_name=tool_name, result_summary=result_summary,
)]
def _handle_chain_end(event: StandardStreamEvent, spawned_agents: dict[str, str]) -> list[SseEvent]:
meta = _meta(event)
node = meta.get("langgraph_node", "")
step = meta.get("langgraph_step")
return [GraphNodeCompleteEvent(type="graph.node_complete", node=node, step=step)]
Transformer Class¶
class LangGraphEventTransformer:
"""Injected via DI container (see PRD-012). Wire up in the dependency factory."""
def __init__(self, handlers: dict[str, _EventHandler]) -> None:
self._handlers = handlers
def transform(
self,
event: StandardStreamEvent,
spawned_agents: dict[str, str],
) -> list[SseEvent]:
handler = self._handlers.get(event["event"])
return handler(event, spawned_agents) if handler is not None else []
DI wire-up (in the container / factory, not in module scope):
LangGraphEventTransformer(handlers={
"on_chat_model_stream": _handle_stream,
"on_llm_stream": _handle_stream, # legacy provider compat — same handler
"on_tool_start": _handle_tool_start,
"on_tool_end": _handle_tool_end,
"on_chain_end": _handle_chain_end,
})
Zero if/elif. Zero isinstance. Zero reflection. Adding a new event type = add one handler function + one entry in the DI wire-up.
BaseMessage.text:AIMessageChunk.textis aTextAccessor(strsubclass) that handles bothstrcontent (OpenAI) andlist[dict]content (Anthropic) internally. Call.textdirectly — never inspect.contentor.content_blocks.
Section Helper for Writer Tokens¶
def _section_from_ns(checkpoint_ns: str) -> str | None:
"""
Derive the writer RunnableParallel section name from the checkpoint namespace.
Format is typically "writer:uuid4|branch_name:uuid4".
Returns the branch name if found, else None.
"""
known_sections = {"report", "comment_draft", "ticket_draft"}
for part in checkpoint_ns.split("|"):
name = part.split(":")[0]
if name in known_sections:
return name
return None
7. Interrupt Detection: _check_for_interrupt()¶
Primary Method (Recommended)¶
After the astream_events loop exits, call graph.aget_state(config) and check whether
state.tasks is non-empty. A non-empty tasks list means the graph suspended at an
interrupt() call — it did not run to completion.
async def _check_for_interrupt(graph, config) -> HumanExchange | None:
"""
Returns the HumanExchange if the graph is suspended at a human_input interrupt,
or None if the graph completed normally.
Call this AFTER the astream_events loop exits.
human_input_node calls interrupt(state.pending_exchange) — the value is a HumanExchange.
"""
state = await graph.aget_state(config)
if not state.tasks:
return None
return state.tasks[0].interrupts[0].value
Alternative Method (Stream-Mode Updates)¶
If you also need to detect the interrupt during the stream (not just after), add
stream_mode=["updates"] alongside version="v2" in the astream_events call and check for
the "__interrupt__" key in update chunks:
async for event in graph.astream_events(
initial_state,
config=config,
version="v2",
stream_mode=["updates"],
):
if "__interrupt__" in event:
# Graph is about to suspend
interrupt_payload: HumanExchange = event["__interrupt__"][0].value
For the ARQ worker pattern in PRD-003, the primary method (post-loop aget_state()) is
preferred because it is simpler and the astream_events loop terminates naturally when
interrupt() fires (the graph suspends, the generator exhausts).
Interrupt Payload Structure¶
# human_input_node calls: interrupt(state.pending_exchange)
# state.pending_exchange is a HumanExchange — dot fields throughout:
state.tasks[0].interrupts[0].value # -> HumanExchange
# Access: interrupt_payload.question, interrupt_payload.context
8. Agent Identity Tracking¶
Problem¶
on_chat_model_stream and other events identify the currently-executing node via
metadata["langgraph_node"] (a string like "investigator"). The frontend, however, expects a
stable agent_id UUID per spawned agent instance — not a node name — so it can match tokens,
tool calls, and completion events to the same agent card.
Solution¶
The worker maintains spawned_agents: dict[str, str] initialized as {} before the
astream_events loop begins. This dict is passed to transform_langgraph_event() by reference
and mutated in place.
spawned_agents: dict[str, str] = {} # node_name -> agent_id UUID
async for event in graph.astream_events(initial_state, config=config, version="v2"):
for sse in transform_langgraph_event(event, spawned_agents):
await redis.publish(channel, json.dumps(sse))
Every on_chain_start for a worker node: A new UUID is generated and stored in
spawned_agents[node]. All subsequent events for that execution (tokens, tool calls, chain_end)
look up the current UUID via spawned_agents[node].
Node called a second time (e.g. investigator redirected back): Each on_chain_start
generates a fresh UUID. The old agent card in the UI remains (its agent_id is still valid);
a new card is created for the new run. This means the UI correctly shows two separate
investigator cards for two separate investigator runs.
9. Writer RunnableParallel → output.section_done¶
The writer node uses RunnableParallel to produce three output sections concurrently:
writer_chain = RunnableParallel(
report=report_chain,
comment_draft=comment_chain,
ticket_draft=ticket_chain,
)
Event Detection¶
Each branch of the RunnableParallel emits an on_chain_end event when it completes. The
distinguishing fields:
event["event"] == "on_chain_end"event["metadata"]["langgraph_node"] == "writer"event["name"] in {"report", "comment_draft", "ticket_draft"}
This maps to output.section_done { section: event["name"] }.
Token Attribution¶
During writer execution, on_chat_model_stream events include langgraph_checkpoint_ns
identifying which parallel branch is running. The _section_from_ns() helper (§6) parses this
to set the section field on output.token events, so the frontend can route tokens to the
correct section panel.
Checkpoint namespace format for a parallel branch:
The_section_from_ns() function splits on |, then splits each segment on : to get the
name prefix, and matches against the known section names.
10. Full Annotated run_triage() Function¶
Service-Wide Worker Error Handler¶
Exception handling for cross-cutting concerns (publishing job.failed, logging) lives in one
place: a @worker_error_handler decorator applied to every ARQ worker function. No individual
worker function handles exceptions it cannot recover from. This is the project's
"service-wide middleware" for worker-layer errors (see PRD-007 §Architecture policy).
# worker_middleware.py
import functools
import json
import logging
from collections.abc import Callable, Coroutine
from typing import Any
from redis.asyncio import Redis
logger = logging.getLogger(__name__)
def worker_error_handler(fn: Callable[..., Coroutine[Any, Any, None]]) -> Callable[..., Coroutine[Any, Any, None]]:
"""Service-wide ARQ worker error handler.
Publishes job.failed to the Redis SSE channel on any unhandled exception,
then re-raises so ARQ records the job as FAILED. Declared once; applied to
all worker functions in WorkerSettings. Individual worker functions must not
catch exceptions they cannot locally recover from.
"""
@functools.wraps(fn)
async def wrapper(ctx: dict, job_id: str, *args: object, **kwargs: object) -> None:
try:
return await fn(ctx, job_id, *args, **kwargs)
except Exception as exc:
redis: Redis = ctx["redis"]
logger.exception("Worker function %s failed for job %s", fn.__name__, job_id)
await redis.publish(
f"jobs:{job_id}:events",
json.dumps({"type": "job.failed", "error": str(exc)}),
)
raise
return wrapper
run_triage() — Clean, No Exception Handling¶
# worker.py
import json
from datetime import timedelta
from uuid import uuid4
from arq import ArqRedis
from redis.asyncio import Redis
from .graph import graph
from .transform import transform_langgraph_event, _check_for_interrupt
from .worker_middleware import worker_error_handler
HUMAN_INPUT_TIMEOUT_SECONDS = 1800 # 30 minutes
@worker_error_handler
async def run_triage(ctx: dict, job_id: str, initial_state: dict) -> None:
"""Run the LangGraph triage graph and stream SSE events to Redis Pub/Sub.
Streams graph execution events, translates them to frontend SSE format,
and publishes each to the job's Redis channel. After the loop, detects
whether the graph suspended at a human_input interrupt or completed normally
and publishes the appropriate terminal event. Unhandled exceptions are caught
and published by the worker_error_handler decorator.
"""
redis: Redis = ctx["redis"]
arq_queue: ArqRedis = ctx["arq_queue"]
config = {"configurable": {"thread_id": job_id}}
channel = f"jobs:{job_id}:events"
spawned_agents: dict[str, str] = {}
async for event in graph.astream_events(initial_state, config=config, version="v2"):
for sse in transform_langgraph_event(event, spawned_agents):
await redis.publish(channel, json.dumps(sse))
interrupt_payload = await _check_for_interrupt(graph, config)
if interrupt_payload:
await redis.publish(channel, json.dumps({
"type": "graph.interrupt",
"question": interrupt_payload.question,
"context": interrupt_payload.context,
}))
await arq_queue.enqueue_job(
"expire_human_input",
job_id,
_job_id=f"timeout:{job_id}",
_defer_by=timedelta(seconds=HUMAN_INPUT_TIMEOUT_SECONDS),
)
else:
await redis.publish(channel, json.dumps({"type": "job.done"}))
class WorkerSettings:
functions = [run_triage]
allow_abort_jobs = True # enables Job.abort() cross-worker
max_jobs = 10 # concurrent job cap per worker
retry_jobs = False # at-most-once; deduplication at API layer (PRD-006)
11. Known Issues & Gotchas¶
on_llm_stream vs on_chat_model_stream¶
Some providers and older LangChain integrations emit on_llm_stream instead of
on_chat_model_stream. The transformer checks both:
Nested Runnable Streaming¶
If a worker node uses a sub-chain rather than calling the LLM directly, astream_events may
not propagate inner events (LangChain issue #6105). Worker agents must be called via httpx to
LangServe (as specified in PRD-003 §Worker Agent Node Specs), not as nested LangChain chains.
Nested chain calls inside a graph node will produce incomplete event streams.
Event Deduplication¶
astream_events can emit duplicate token events with different run_id but same content for
the same generation step. The primary deduplication filter is:
Only process events where metadata["langgraph_node"] is set. Graph-level events always have
it; inner chain events for the same token may not.
Empty Content Chunks¶
Models emit on_chat_model_stream events with empty-string content at the start and end of
generation (role-only chunks). Drop these:
chunk: AIMessageChunk = event["data"]["chunk"]
token = chunk.text # TextAccessor (str subclass) — empty string for role-only chunks
if not token:
return []
Multiple on_chain_start for the Same Node¶
If the supervisor routes to the same worker twice (redirect flow), each on_chain_start fires
for that node. The current implementation correctly handles this by generating a new UUID each
time, creating a second agent card. The spawned_agents dict is intentionally overwritten (not
guarded with if node not in spawned_agents) for this reason.