PRD-003 — LangGraph Orchestration & Human-in-the-Loop¶
| Field | Value |
|---|---|
| Document ID | PRD-003 |
| Version | 1.0 |
| Status | DRAFT |
| Date | March 2026 |
| Parent Doc | PRD-001 |
| Related Docs | PRD-002 (Frontend), PRD-004 (LangChain/LangServe) |
Overview¶
This document specifies the LangGraph orchestration layer of AgentOps Dashboard — the system's brain. It covers how the supervisor coordinates worker agents, how shared state flows through the graph, how human interrupts are handled, and how job state is persisted across sessions.
Detailed specs: Event Pipeline & Worker · Supervisor Prompt & Routing
LangGraph is used here in its intended role: low-level orchestration of long-running, stateful, multi-agent workflows . It is deliberately not used for simple linear chains (those live in LangChain/LCEL at the LangServe layer — see PRD-004).
LangGraph Fundamentals Used¶
| Concept | Usage in This Product |
|---|---|
StateGraph |
The main graph class; holds the shared BugTriageState (Pydantic BaseModel) across all nodes |
| Nodes | Each agent and decision point is a node: supervisor, investigator, codebase_search, web_search, critic, human_input, writer |
| Conditional edges | Supervisor's output determines which worker runs next via add_conditional_edges |
interrupt() |
Pauses graph execution mid-node to await human input; resumes via Command(resume=answer) |
| Checkpointer | SqliteSaver (dev) / PostgresSaver (prod) — persists full graph state so jobs survive restarts |
| Thread ID | Each triage job maps to a LangGraph thread; jobs are fully isolated |
| Streaming | graph.astream_events() runs inside the ARQ worker; the API server is a pure Redis Pub/Sub consumer |
| ARQ | Async Redis Queue — distributed job execution, cross-worker Job.abort(), built-in status tracking (QUEUED / IN_PROGRESS / COMPLETE / FAILED) |
| Redis Pub/Sub | Event fanout channel (jobs:{id}:events) — worker publishes events, SSE endpoint subscribes; supports multiple simultaneous subscribers |
Shared State Schema¶
The BugTriageState Pydantic model is the central data structure. Every node reads from and writes to this state. It is
persisted at every checkpoint.
from __future__ import annotations
from typing import Annotated
from pydantic import BaseModel, Field, model_validator
# CriticVerdict is defined in PRD-004-1 §7 (agent-chains) and imported here.
# It provides the binary APPROVED/REJECTED gate used by the supervisor for routing.
from agentops.agents.critic import CriticVerdict
class AgentFinding(BaseModel):
agent_name: str
summary: str
details: str
confidence: float # 0.0 – 1.0
relevant_files: list[str]
class HumanExchange(BaseModel):
question: str
context: str
answer: str | None = None
class TriageReport(BaseModel):
severity: str # LOW / MEDIUM / HIGH / CRITICAL
category: str
root_cause: str
relevant_files: list[str]
similar_issues: list[str]
confidence: float
_CURRENT_SCHEMA_VERSION = 1
class BugTriageState(BaseModel):
"""
Central graph state. Persisted by LangGraph's checkpointer after every node execution.
Schema evolution rules:
- Adding a field: add it with a Field(default=...) default. Pydantic fills missing keys
from old checkpoints automatically at deserialization — no migration code required.
- Renaming or changing the type of a field: add a branch inside migrate_from_checkpoint()
and bump _CURRENT_SCHEMA_VERSION.
"""
# --- Issue metadata ---
issue_url: str
issue_title: str
issue_body: str
repository: str
# --- Routing ---
current_node: str = Field(default="")
next_node: str | None = Field(default=None)
supervisor_reasoning: str = Field(default="")
supervisor_confidence: float = Field(default=0.0)
iterations: int = Field(default=0) # total node executions; not reset on resume
max_iterations: int = Field(default=12)
paused: bool = Field(default=False)
redirect_instructions: Annotated[list[str], lambda a, b: a + b] = Field(default_factory=list)
# --- Agent outputs ---
findings: Annotated[list[AgentFinding], lambda a, b: a + b] = Field(default_factory=list)
critic_feedback: CriticVerdict | None = Field(default=None)
# Set by critic node. Supervisor reads .verdict to gate writer routing:
# APPROVED → route to writer; REJECTED → re-investigate per .gaps and .required_evidence.
# --- Human-in-the-loop ---
human_exchanges: Annotated[list[HumanExchange], lambda a, b: a + b] = Field(default_factory=list)
pending_exchange: HumanExchange | None = Field(default=None)
awaiting_human: bool = Field(default=False)
# --- Collected context ---
codebase_chunks: list[str] = Field(default_factory=list)
similar_past_issues: list[str] = Field(default_factory=list)
web_results: list[str] = Field(default_factory=list)
# --- Outputs ---
report: TriageReport | None = Field(default=None)
github_comment_draft: str | None = Field(default=None)
ticket_draft: dict[str, object] | None = Field(default=None)
# --- Cost tracking ---
running_cost_usd: float = Field(default=0.0)
max_cost_usd: float = Field(default=5.0)
cost_budget_exceeded: bool = Field(default=False)
# --- Job metadata ---
job_id: str
owner_id: str
langsmith_run_id: str | None = Field(default=None)
status: str = Field(default="queued")
# --- Schema versioning ---
schema_version: int = Field(default=0)
# 0 = pre-versioning checkpoint; always written as CURRENT_SCHEMA_VERSION by
# migrate_from_checkpoint() on the first supervisor execution after a schema bump.
@model_validator(mode="before")
@classmethod
def migrate_from_checkpoint(cls, data: object) -> object:
"""
Runs before Pydantic field validation on every deserialization.
For additive schema changes this method does nothing — Pydantic fills Field
defaults automatically. Add a branch here only when a field is renamed or its
type changes in a breaking way, then bump _CURRENT_SCHEMA_VERSION.
"""
if not isinstance(data, dict):
return data
# Example (not yet needed): if the field "web_results" was renamed from
# "search_results" in schema v2, the branch would be:
#
# if data.get("schema_version", 0) < 2 and "search_results" in data:
# data["web_results"] = data.pop("search_results")
#
data["schema_version"] = _CURRENT_SCHEMA_VERSION
return data
model_config = {"arbitrary_types_allowed": True}
Schema Versioning¶
BugTriageState uses Pydantic BaseModel rather than TypedDict specifically to leverage
Pydantic's deserialization lifecycle for checkpoint compatibility.
Additive changes (adding a new field): Supply a Field(default=...) or
Field(default_factory=...). LangGraph's checkpointer passes the stored JSON dict to Pydantic,
which fills any missing key with its declared default. No migration code is required.
Non-additive changes (renaming a field, incompatible type change): Add a conditional branch
inside migrate_from_checkpoint() guarded by data.get("schema_version", 0) < N, then bump
CURRENT_SCHEMA_VERSION. The validator is a classmethod on BugTriageState — no module-level
functions or global registries.
schema_version starts at 0 in all pre-versioning checkpoints (the field is absent, so
Pydantic uses Field(default=0)). migrate_from_checkpoint() always writes
CURRENT_SCHEMA_VERSION into the dict before field validation, so after the first supervisor
execution following a schema bump the checkpoint is updated automatically.
_CURRENT_SCHEMA_VERSION is a module-level constant defined immediately above BugTriageState
so it is always co-located with the class it describes.
Graph Structure¶
Node List¶
| Node | Type | Description |
|---|---|---|
START |
Built-in | Entry point |
supervisor |
LLM node | Reads current state; decides next step |
investigator |
Tool node | Calls LangServe /agents/investigator |
codebase_search |
Tool node | Calls LangServe /agents/codebase-search |
web_search |
Tool node | Calls LangServe /agents/web-search |
critic |
Tool node | Calls LangServe /agents/critic |
human_input |
Interrupt node | Fires interrupt(); blocks until user answers |
writer |
Tool node | Calls LangServe /agents/writer; produces final outputs |
END |
Built-in | Exit point |
Graph Definition (Pseudocode)¶
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
builder = StateGraph(BugTriageState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("investigator", investigator_node)
builder.add_node("codebase_search", codebase_search_node)
builder.add_node("web_search", web_search_node)
builder.add_node("critic", critic_node)
builder.add_node("human_input", human_input_node)
builder.add_node("writer", writer_node)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges(
"supervisor",
route_from_supervisor,
{
"investigator": "investigator",
"codebase_search": "codebase_search",
"web_search": "web_search",
"critic": "critic",
"human_input": "human_input",
"writer": "writer",
"end": END,
}
)
for node in ["investigator", "codebase_search", "web_search", "critic"]:
builder.add_edge(node, "supervisor")
builder.add_edge("human_input", "supervisor")
builder.add_edge("writer", END)
checkpointer = SqliteSaver.from_conn_string("jobs.db")
graph = builder.compile(checkpointer=checkpointer)
Routing Logic¶
def route_from_supervisor(state: BugTriageState) -> str:
"""
Supervisor LLM outputs structured JSON with 'next' field.
This function extracts it and returns the node name.
Guards: max_iterations check, investigator-first enforcement.
"""
if state["iterations"] >= state["max_iterations"]:
return "writer" # force completion
# Enforce design invariant: investigator must always run first.
# The supervisor prompt instructs this, but LLM compliance is not guaranteed.
# state["iterations"] == 0 means no worker node has executed yet.
if state["iterations"] == 0 and state["next_node"] != "investigator":
return "investigator"
return state["next_node"]
iterations counts total worker-node executions across the full job lifetime, including any
cycles that occur after a human-interrupt resume. The value max_iterations is chosen to
accommodate a typical flow (investigation → optional human interrupt → continued investigation
→ write) without premature termination. It is not reset on resume.
Investigator-first guard: When iterations == 0, no worker node has yet executed. The
supervisor prompt instructs the LLM to always call investigator first, but prompt compliance
is not guaranteed — edge-case inputs or model drift could cause the LLM to skip straight to
codebase_search or writer. The guard in route_from_supervisor overrides any such decision
deterministically. Once investigator has run, iterations becomes 1 and the guard no longer
fires; subsequent routing is left entirely to the supervisor.
Supervisor Agent Design¶
The supervisor is the most critical node. It reads the full current state and decides:
- Which worker to call next (or whether to ask the human, or to finalize)
- Whether it has enough information to write the final report
- Whether it needs to ask the user a clarifying question
Supervisor System Prompt (Abbreviated)¶
You are the supervisor of a bug triage team. You coordinate specialized agents
to investigate a GitHub issue and produce a full triage report.
Your available agents:
- investigator: Reads and interprets the issue body. Always call this first.
- codebase_search: Searches the repository for relevant code. Call when you need
to locate the root cause in source code.
- web_search: Searches the web for error messages or stack traces. Call when the
issue contains cryptic errors or library bugs.
- critic: Reviews findings for correctness and gaps. Call when you have a
hypothesis but want a second opinion before finalizing.
- writer: Produces the final report. Call only when you have sufficient evidence.
Human input rules:
- Ask the user a question if and only if:
(a) Two or more equally plausible root causes exist and you cannot distinguish them
without user knowledge, OR
(b) Critical context is missing that no tool can retrieve (e.g. recent deploys,
env-specific config)
- Maximum 2 questions per job. Do not ask about things tools can discover.
- Frame questions concisely. Provide 2–3 specific options when possible.
Output JSON with fields: next_node, reasoning, question (if asking human), confidence.
Supervisor Output Schema¶
class SupervisorDecision(BaseModel):
next_node: Literal[
"investigator", "codebase_search", "web_search",
"critic", "human_input", "writer", "end"
]
reasoning: str
question: str | None # only populated when next_node == "human_input"
question_context: str | None
confidence: float # 0.0–1.0
When
next_node == "human_input", the supervisor node returns{"pending_exchange": HumanExchange(question=..., context=...), "awaiting_human": True}. It does not append tohuman_exchangesdirectly — that field is written only byhuman_input_nodevia the append reducer.
Worker Agent Node Specs¶
Each worker node follows the same pattern: call the corresponding LangServe endpoint, get back an AgentFinding, append
it to state, return updated state to supervisor.
async def investigator_node(state: BugTriageState) -> dict:
response = await httpx.post(
"http://localhost:8001/agents/investigator/invoke",
json={
"input": {
"issue_title": state.issue_title,
"issue_body": state.issue_body,
"prior_findings": state.findings,
}
}
)
finding = AgentFinding(**response.json()["output"])
return {
"findings": [finding], # reducer appends this
"current_node": "investigator",
"iterations": state.iterations + 1,
}
| Node | LangServe Endpoint | Key Inputs from State | Key Outputs to State |
|---|---|---|---|
investigator |
/agents/investigator |
issue_title, issue_body |
findings |
codebase_search |
/agents/codebase-search |
issue_body, findings, repository |
findings, codebase_chunks |
web_search |
/agents/web-search |
issue_body, findings |
findings, web_results |
critic |
/agents/critic |
findings, codebase_chunks |
findings (critique finding) |
writer |
/agents/writer |
all findings, human exchanges | report, github_comment_draft, ticket_draft |
Human-in-the-Loop Implementation¶
Triggering an Interrupt¶
When the supervisor routes to human_input, the node fires interrupt():
from langgraph.types import interrupt
def human_input_node(state: BugTriageState) -> dict:
exchange = state["pending_exchange"]
answer = interrupt({
"question": exchange["question"],
"context": exchange["context"],
})
return {
"human_exchanges": [exchange.model_copy(update={"answer": answer})], # reducer appends
"pending_exchange": None,
"awaiting_human": False,
}
State design note:
human_exchangesuses an append reducer (Annotated[list[HumanExchange], lambda a, b: a + b]), so nodes may safely return[new_item]without reading or copying the existing list.pending_exchangeuses plain overwrite semantics and holds at most one unanswered question at a time. The supervisor writes topending_exchange;human_input_nodereads it, fills the answer, appends the completed exchange tohuman_exchanges, and clearspending_exchange— no list mutation anywhere.
Resuming from FastAPI¶
When the user submits an answer via POST /jobs/{id}/answer, the endpoint cancels the pending timeout ARQ job and
resumes the graph. Full implementation is in the Interrupt Timeout Mechanism section.
Command(resume=answer) does not restart the graph from the beginning. LangGraph restores the
full BugTriageState from the checkpoint saved at the interrupt and continues execution from
the human_input node forward. The iterations counter therefore reflects work done both
before and after the interrupt.
Before resuming, the endpoint reads the checkpointed state via graph.aget_state() and
checks awaiting_human. If the job is not currently paused at a human_input interrupt
(e.g., the job is still running, has already completed, or the timeout already fired and
auto-resumed), the endpoint returns HTTP 409 Conflict rather than calling
Command(resume=...) on a non-interrupted thread.
Interrupt Timeout Mechanism¶
interrupt() blocks the graph indefinitely — LangGraph has no built-in timeout. The 30-minute timeout is implemented
by enqueuing a deferred ARQ job (expire_human_input) with a 30-minute delay at the moment the interrupt fires. If the
user has not answered by then, the job resumes the graph with a best-effort signal via graph.ainvoke(Command(resume=...)).
If the user answers first, the answer endpoint cancels the deferred job via Job.abort() before resuming the graph.
# worker.py
TIMEOUT_ANSWER = "[no answer provided — proceeding with best-effort]"
HUMAN_INPUT_TIMEOUT_SECONDS = 1800 # 30 minutes
async def expire_human_input(ctx: dict, job_id: str):
"""
Scheduled ARQ job: fires 30 min after a human_input interrupt if unanswered.
Resumes the graph with a best-effort signal so the supervisor can proceed.
"""
redis: Redis = ctx["redis"]
job = Job(job_id, redis)
info = await job.info()
if info is not None and info.status in (JobStatus.complete, JobStatus.not_found):
return
config = {"configurable": {"thread_id": job_id}}
await graph.ainvoke(Command(resume=TIMEOUT_ANSWER), config=config)
channel = f"jobs:{job_id}:events"
await redis.publish(channel, json.dumps({
"type": "human_input.timeout",
"message": "No answer received after 30 minutes. Proceeding with best-effort.",
}))
The timeout job is enqueued at the moment the ARQ worker detects the graph has paused at a human_input node (i.e.,
astream_events yields an on_chain_end event whose metadata indicates an interrupt):
# worker.py — inside run_triage(), after the astream_events loop
async def run_triage(ctx: dict, job_id: str, initial_state: dict):
redis: Redis = ctx["redis"]
config = {"configurable": {"thread_id": job_id}}
channel = f"jobs:{job_id}:events"
async for event in graph.astream_events(initial_state, config=config, version="v2"):
sse_event = transform_langgraph_event(event)
if sse_event:
await redis.publish(channel, json.dumps(sse_event))
if _is_human_input_interrupt(event):
await arq_queue.enqueue_job(
"expire_human_input",
job_id,
_job_id=f"timeout:{job_id}",
_defer_by=timedelta(seconds=HUMAN_INPUT_TIMEOUT_SECONDS),
)
await redis.publish(channel, json.dumps({"type": "job.done"}))
Answer endpoint cancels the timeout by aborting the scheduled ARQ job before resuming the graph:
@app.post("/jobs/{job_id}/answer")
async def submit_answer(job_id: str, body: AnswerRequest, redis: Redis = Depends(get_redis)):
config = {"configurable": {"thread_id": job_id}}
snapshot = await graph.aget_state(config)
if not snapshot.values.get("awaiting_human"):
raise HTTPException(status_code=409, detail="Job is not awaiting human input")
timeout_job_id = f"timeout:{job_id}"
timeout_job = Job(timeout_job_id, redis)
await timeout_job.abort(timeout=2) # no-op if already fired or not found
await graph.ainvoke(Command(resume=body.answer), config=config)
Question Constraints¶
| Rule | Value | Rationale |
|---|---|---|
| Max questions per job | 2 | Prevent over-reliance on user; agents should be capable |
| Timeout (no answer) | 30 minutes — enforced via deferred ARQ job (expire_human_input) |
Prevents indefinite graph hang; graph resumes with best-effort signal |
| Question format | Single question + 2–3 concrete options when applicable | Reduces cognitive load |
| Blocker scope | Entire graph pauses, not just one agent | Ensures answer is incorporated before any further work |
Implementation Gotchas¶
Node re-execution on resume: When Command(resume=...) is called, LangGraph re-runs the
entire human_input_node function from the top. The current implementation is safe because no
side effects occur before interrupt(). Any future code added before the interrupt() call
must be idempotent — it will execute twice (once when the interrupt fires, once when the graph
resumes).
Bare except trap: interrupt() works by raising a special internal exception
(GraphInterrupt). Any bare except Exception: or except: block inside human_input_node
will silently swallow it, preventing the graph from pausing. Always catch specific exception
types in this node.
Multiple interrupt ordering: If multiple interrupt() calls exist in a node, LangGraph
matches resume values by index. Since human_input_node has exactly one interrupt call, this
constraint is satisfied automatically. Future multi-interrupt nodes must maintain deterministic
interrupt() call order to ensure correct resume value matching.
Pause, Redirect, and Kill¶
Pause¶
Triggered by user clicking "Pause" in the UI. Sends POST /jobs/{id}/pause.
Implementation: Sets paused: True in the checkpointed state. The supervisor checks this flag at
the start of each iteration and fires interrupt() with a "manual_pause" signal if set. User
can resume by clicking "Resume".
"At-next-iteration" behaviour: The pause takes effect at the next supervisor node boundary, not immediately. If the supervisor is already mid-LLM-call (e.g. deciding which worker to dispatch) when the API call arrives, execution continues until that LLM call completes and the supervisor node runs again. In practice this means up to one additional worker agent may be spawned before the graph actually pauses.
This is by design — LangGraph does not expose a mid-node cancellation hook for the flag-based
pattern. The alternative (a hard Job.abort() followed by re-enqueue) is lossy and not used here.
UI implication: The frontend must show a "Pause requested…" intermediate status from the
moment POST /jobs/{id}/pause returns 200 until a graph.paused SSE event is received. See
PRD-002 §Status Definitions.
Redirect¶
Triggered after a pause. User can type a new instruction into a text field in the UI: e.g., "Focus only on the database layer — ignore the auth middleware."
Implementation: Resumes the graph via Command(resume={"type": "redirect", "instruction": "..."}).
When the supervisor node receives this command, it:
- Reads the
instructionstring from theCommandpayload (the return value ofinterrupt()). - Appends it to
redirect_instructionsin its returned state dict, so the instruction is written into the LangGraph checkpoint immediately. - Prepends all accumulated
redirect_instructions(not just the latest) to its system prompt for the current and all subsequent supervisor iterations.
Persistence guarantee: Because the instruction is stored in BugTriageState.redirect_instructions
before any further LLM work is done, it survives server restarts. If the worker process crashes and
the job is resumed from checkpoint, the full list of redirect instructions is restored and the
supervisor continues to honour them.
Multiple redirects: Each redirect appends to the list. The supervisor concatenates all entries in order, so earlier redirects remain in effect unless a later one explicitly overrides them.
Kill¶
Sends DELETE /jobs/{id}. Immediately terminates the LangGraph thread via ARQ's cross-process abort mechanism.
Final state is checkpointed with status: "killed". Any partial outputs already in state are preserved and shown in
the output panel.
Architecture: The API server never runs the graph. Jobs are enqueued to ARQ workers via Redis. Job.abort() sends
an abort signal through Redis; the worker process holding that job cancels its asyncio task — this works across any
number of workers and processes by design. No module-level running_tasks dict is needed.
flowchart LR
subgraph API["FastAPI API server (REST + SSE)"]
REST["REST endpoints\n/jobs/*"]
SSE["SSE subscriber\njobs:{id}:events"]
end
subgraph WORKER["ARQ Worker(s)"]
GRAPH["graph.ainvoke()"]
PUB["publishes events\nto Redis channel"]
end
REDIS[("Redis\nARQ queue + Pub/Sub\n+ job status")]
PG[("PostgreSQL\nLangGraph state")]
API -->|"enqueue_job()"| WORKER
WORKER -->|"Redis Pub/Sub events"| API
API -->|"Job.abort()"| REDIS
REDIS -->|"job queue"| WORKER
WORKER -->|"PostgresSaver"| PG
ARQ worker function — runs on a worker process, not the API server:
# worker.py
async def run_triage(ctx: dict, job_id: str, initial_state: dict):
redis: Redis = ctx["redis"]
config = {"configurable": {"thread_id": job_id}}
channel = f"jobs:{job_id}:events"
async for event in graph.astream_events(initial_state, config=config, version="v2"):
sse_event = transform_langgraph_event(event)
if sse_event:
await redis.publish(channel, json.dumps(sse_event))
await redis.publish(channel, json.dumps({"type": "job.done"}))
class WorkerSettings:
functions = [run_triage]
allow_abort_jobs = True # enables Job.abort() cross-worker
max_jobs = 10 # concurrent job cap per worker
retry_jobs = False # ARQ retries disabled; at-most-once deduplication is enforced
# at the API layer via a Redis idempotency key (see PRD-006)
Kill endpoint — no dict lookup, no task tracking:
@app.delete("/jobs/{job_id}")
async def kill_job(job_id: str, redis: Redis = Depends(get_redis)):
job = Job(job_id, redis)
info = await job.info()
if info is None:
raise HTTPException(status_code=404, detail="Job not found")
aborted = await job.abort(timeout=5)
if not aborted:
# run_triage exits normally when interrupt() suspends the graph, so ARQ
# marks the job complete while the LangGraph thread sits checkpointed.
# The pending timeout job is present exactly in that window.
timeout_job = Job(f"timeout:{job_id}", redis)
timeout_info = await timeout_job.info()
if timeout_info is None or timeout_info.status in (JobStatus.complete, JobStatus.not_found):
raise HTTPException(status_code=409, detail="Job could not be aborted (may already be complete)")
await timeout_job.abort(timeout=2) # no-op if already fired
config = {"configurable": {"thread_id": job_id}}
await graph.aupdate_state(config, {"status": "killed"})
return {"job_id": job_id, "status": "killed"}
Checkpointing and Persistence¶
Why Checkpointing Matters¶
LangGraph's checkpointer saves the full BugTriageState to a database after every node execution. This means:
- Jobs survive server restarts
- Users can close the browser and come back to a running job
- The full execution history is available for debugging
- Paused jobs (waiting for human input) persist indefinitely until answered
Configuration¶
| Environment | Checkpointer | Connection |
|---|---|---|
| Development | SqliteSaver |
jobs.db (local file) |
| Production | PostgresSaver |
DATABASE_URL env var |
Thread IDs¶
Each job is identified by a UUID that maps 1:1 to a LangGraph thread ID. This UUID is generated at POST /jobs and is
the same ID used in SSE stream URLs, answer endpoints, and LangSmith trace grouping.
Checkpointing Deployment Notes¶
setup() call: Must be called exactly once before first use to create the checkpoint
tables in PostgreSQL. Run it as a database migration step in CI/CD (e.g. via alembic or a
standalone management command), not during lifespan() startup. Repeated calls on an existing
schema are safe but noisy; insufficient-permission errors at startup are harder to recover from
than migration-step failures.
AsyncPostgresSaver connection requirements: The psycopg connection must be created with:
- autocommit=True — required for setup() to commit the table creation DDL
- row_factory=dict_row — required because the saver accesses rows as row["column"]; the
default tuple-based row factory causes TypeError at runtime
Connection pool sizing: The AsyncConnectionPool max_size should equal or exceed the
ARQ worker's max_jobs setting (currently 10). Each concurrent job holds one connection during
checkpoint reads and writes. Undersizing the pool causes jobs to queue on connection acquisition
and increases perceived latency.
Known PoolClosed race condition: When the PostgreSQL connection pool is recycled (e.g.
after a cloud database failover or idle timeout), agent instances holding stale pool references
receive PoolClosed errors. Pattern to avoid this: acquire a fresh connection per checkpoint
operation via the pool.connection() async context manager rather than holding a long-lived
connection reference across multiple node executions.
Streaming to Frontend¶
Separation of concerns: graph.astream_events() runs inside the ARQ worker (see Kill). The worker publishes each
transformed event to a Redis Pub/Sub channel (jobs:{job_id}:events). The FastAPI SSE endpoint is a pure subscriber
— it never executes graph logic.
This means multiple browser tabs can subscribe to the same job stream independently (each gets a separate Redis subscriber), and a client disconnect drops only that subscriber — the running job in the ARQ worker is unaffected.
Disconnect handling: Starlette's StreamingResponse runs the generator inside an anyio cancel scope. When the
client disconnects, Starlette cancels the scope, which raises asyncio.CancelledError at whichever await the
generator is blocked on inside pubsub.listen(). The finally block fires unconditionally and cleans up the
subscriber. No polling, no request.is_disconnected() checks, no extra tasks.
@router.get("/{job_id}/stream")
async def stream_job(
job_id: Annotated[str, Depends(get_job_and_verify_owner)],
redis: Annotated[Redis, Depends(get_redis)],
) -> StreamingResponse:
channel = f"jobs:{job_id}:events"
async def event_generator():
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
seq = 0
try:
async for message in pubsub.listen():
if message["type"] != "message":
continue
yield f"id: {seq}\ndata: {message['data']}\n\n"
seq += 1
if json.loads(message["data"]).get("type") == "job.done":
break
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
return StreamingResponse(event_generator(), media_type="text/event-stream")
The transform_langgraph_event() function (called in the ARQ worker) maps LangGraph's internal event types (e.g.,
on_chat_model_stream, on_tool_start, on_chain_end) to the frontend event schema in
PRD-002. When a Writer RunnableParallel branch completes,
the corresponding on_chain_end event is translated to output.section_done { section } so the
frontend can enable per-section controls without waiting for job.done.
FastAPI lifespan — no graph tasks to drain on shutdown; only the Redis pool needs cleanup:
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = await create_redis_pool(RedisSettings())
app.state.arq_queue = ArqRedis(app.state.redis)
yield
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
Error Handling¶
| Error Scenario | Behavior |
|---|---|
| Worker agent LangServe endpoint is down | Supervisor retries once after 5s; on second failure, marks agent as errored and routes to next available agent or human_input |
| LLM rate limit (429) | Exponential backoff with max 3 retries; after that, job status set to FAILED |
| Supervisor outputs invalid JSON | Pydantic validation catches it; supervisor is re-invoked once with an error correction prompt |
max_iterations reached |
Graph routes directly to writer with whatever findings have been accumulated |
| Unhandled exception in any node | Job status set to FAILED; full traceback captured in LangSmith; error event sent to frontend |
| Checkpointer DB unavailable | Job continues in-memory; warning logged; user notified that job will not survive restart |
POST /jobs/{id}/answer called while job is not awaiting input |
Returns HTTP 409 Conflict; awaiting_human flag checked via graph.aget_state() before resuming |