LangGraph Agent Documentation
This document provides in-depth documentation of the LangGraph-based conversational agent that powers the Chatbot Service.
Overview
The chatbot uses LangGraph to orchestrate conversations as a state machine. This enables:
- Tool calling: Automatic selection and execution of tools
- State persistence: Conversation continuity across requests
- Human-in-the-loop: Interactive test sessions with interrupts
- Complex flows: Multi-step reasoning and branching logic
Architecture
flowchart TD
subgraph MainGraph["GraphAgent (Main Graph)"]
Entry([Entry Point])
Think[think]
RAGNode[rag_search]
GuiaNode[get_guia]
TestNode[test_session]
End([END])
Entry --> Think
Think -->|tool: rag_search| RAGNode
Think -->|tool: get_guia| GuiaNode
Think -->|tool: generate_test| TestNode
Think -->|no tool call| End
RAGNode --> Think
GuiaNode --> Think
TestNode --> End
end
State Management
SubjectState
The main state object that flows through the graph:
class SubjectState(MessagesState):
"""State schema for the conversational agent graph."""
# Inherited from MessagesState
messages: list[BaseMessage]
# Subject/course context
asignatura: str | None
# RAG results
context: list[dict[str, Any]] | None
# Chain-of-Thought reasoning (advanced queries)
thinking: str | None
# Difficulty classification
query_difficulty: str | None
# Test session fields (shared with test subgraph)
topic: str | None
num_questions: int | None
difficulty: str | None
questions: list[MultipleChoiceTest] | None
current_question_index: int | None
user_answers: list[str] | None
feedback_history: list[str] | None
scores: list[bool] | None
pending_feedback: str | None
State Field Descriptions
| Field | Type | Description |
|---|---|---|
messages | list[BaseMessage] | Full conversation history |
asignatura | str | Current subject (e.g., “iv”, “DS”) |
context | list[dict] | Retrieved document chunks from RAG |
thinking | str | CoT reasoning (advanced queries only) |
query_difficulty | str | Classified level: basic/intermediate/advanced |
topic | str | Test session topic |
num_questions | int | Total questions in test |
questions | list | Generated test questions |
current_question_index | int | Current question position (0-indexed) |
user_answers | list[str] | User’s answers to test questions |
scores | list[bool] | Correct/incorrect for each answer |
GraphAgent Class
Initialization
class GraphAgent:
def __init__(
self,
*,
llm_provider: Literal["vllm", "gemini", "mistral"] = "vllm",
vllm_port: str | None = None,
model_dir: str | None = None,
openai_api_key: str = "EMPTY",
gemini_api_key: str | None = None,
gemini_model: str | None = None,
mistral_api_key: str | None = None,
mistral_model: str | None = None,
temperature: float = 0.1,
):
"""Initialize GraphAgent with configurable LLM provider."""
Design Decision: A single GraphAgent instance is created at application startup and reused for all requests. This ensures:
- Consistent checkpoint access
- Efficient resource usage
- Proper conversation state restoration
LLM Provider Selection
flowchart TD
Init[GraphAgent.__init__]
Init --> Provider{llm_provider}
Provider -->|gemini| Gemini[ChatGoogleGenerativeAI]
Provider -->|mistral| Mistral[ChatMistralAI]
Provider -->|vllm| VLLM[ChatOpenAI]
Gemini --> BindTools[bind_tools]
Mistral --> BindTools
VLLM --> BindTools
BindTools --> Agent[Ready for inference]
def _get_llm(self, temperature: float | None = None):
"""Get configured LLM instance based on provider."""
if self.llm_provider == "gemini":
return ChatGoogleGenerativeAI(
model=self.gemini_model,
google_api_key=self.gemini_api_key,
temperature=temp,
)
elif self.llm_provider == "mistral":
return ChatMistralAI(
model=self.mistral_model,
mistral_api_key=self.mistral_api_key,
temperature=temp,
)
else: # vllm
return ChatOpenAI(
model=self.model_name,
openai_api_key=self.openai_api_key,
openai_api_base=self.vllm_url,
temperature=temp,
)
Graph Nodes
Think Node
The main reasoning node that processes user messages and decides actions.
def think(self, state: SubjectState):
"""Main reasoning node with adaptive Chain-of-Thought."""
Responsibilities:
- Classify query difficulty
- Select appropriate system prompt
- Invoke LLM with tools bound
- Parse CoT response (if advanced query)
flowchart TD
Input[State with messages]
Input --> Classify[Classify Difficulty]
Classify --> Select{Select Prompt}
Select -->|basic| Basic[SYSTEM_PROMPT_BASIC]
Select -->|intermediate| Inter[SYSTEM_PROMPT_INTERMEDIATE]
Select -->|advanced| Adv[SYSTEM_PROMPT_COT]
Basic --> Invoke[LLM.invoke with tools]
Inter --> Invoke
Adv --> Invoke
Invoke --> Parse{Parse Response}
Parse -->|CoT| Extract[Extract thinking/answer]
Parse -->|Normal| Return[Return messages]
Extract --> Return
Adaptive Prompting:
# Classify difficulty
difficulty_result = classify_difficulty_detailed(last_user_message)
query_difficulty = difficulty_result.level.value
use_cot = difficulty_result.level == DifficultyLevel.ADVANCED
# Select prompt
if use_cot:
system_prompt = SYSTEM_PROMPT_COT.format(asignatura=asignatura)
elif query_difficulty:
system_prompt = get_adaptive_prompt(query_difficulty, asignatura)
else:
system_prompt = get_adaptive_prompt("intermediate", asignatura)
CoT Parsing (for advanced queries):
if use_cot:
# Extract <thinking>...</thinking>
thinking_match = re.search(r"<thinking>(.*?)</thinking>", content, re.DOTALL)
if thinking_match:
result["thinking"] = thinking_match.group(1).strip()
# Extract <answer>...</answer>
answer_match = re.search(r"<answer>(.*?)</answer>", content, re.DOTALL)
if answer_match:
clean_answer = answer_match.group(1).strip()
result["messages"] = [AIMessage(content=clean_answer)]
RAG Search Node
Executes semantic search against the RAG service.
def rag_search(self, state: SubjectState):
"""RAG search node - semantic search over document database."""
Flow:
sequenceDiagram
participant Graph
participant Node as rag_search node
participant Tool as rag_search tool
participant RAG as RAG Service
Graph->>Node: State with tool_calls
Node->>Node: Extract rag_search call
Node->>Tool: invoke(query, asignatura, ...)
Tool->>RAG: POST /search
RAG-->>Tool: Document chunks
Tool-->>Node: Results dict
Node->>Node: Format as ToolMessage
Node-->>Graph: Updated state with context
def rag_search(self, state: SubjectState):
tools = get_tools()
rag_tool = next((tool for tool in tools if tool.name == "rag_search"), None)
# Extract tool calls from last message
tool_calls = getattr(last_message, "tool_calls", [])
tool_messages = []
for tc in tool_calls:
if tc["name"] == "rag_search":
rag_result = rag_tool.invoke(tc["args"])
results = rag_result.get("results", [])
# Format context
content = "This is chunks of context:\n"
for result in results:
content += f"\n- {result['content']}\n"
state["context"].append({
"content": result["content"],
"metadata": result["metadata"]
})
tool_messages.append(ToolMessage(content=content, tool_call_id=tc["id"]))
return {"messages": tool_messages}
Get Guia Node
Retrieves teaching guide information from MongoDB.
def get_guia(self, state: SubjectState):
"""Teaching guide retrieval node."""
Important: The asignatura is injected from state into tool args:
for tc in tool_calls:
if tc["name"] == "get_guia":
args = tc["args"]
args["asignatura"] = state.get("asignatura") # Inject from state
content = guia_tool.invoke(args)
Conditional Routing
def should_continue(self, state: SubjectState):
"""Decide if agent should continue to tools or end."""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return last_message.tool_calls[0]["name"] # Route to tool node
else:
return END # Conversation complete
Routing Map:
graph_builder.add_conditional_edges(
"agent",
self.should_continue,
{
"rag_search": "rag_search",
"get_guia": "get_guia",
"generate_test": "test_session",
END: END,
},
)
Graph Construction
def build_graph(self):
"""Build and compile the agent graph."""
from chatbot.logic.testGraph import create_test_subgraph
graph_builder = StateGraph(SubjectState)
# Build test subgraph
test_subgraph = create_test_subgraph(
llm_provider=self.llm_provider,
# ... pass LLM config
)
# Add nodes
graph_builder.add_node("agent", self.think)
graph_builder.add_node("rag_search", self.rag_search)
graph_builder.add_node("get_guia", self.get_guia)
graph_builder.add_node("test_session", test_subgraph)
# Set entry point
graph_builder.set_entry_point("agent")
# Add edges
graph_builder.add_conditional_edges("agent", self.should_continue, {...})
graph_builder.add_edge("rag_search", "agent")
graph_builder.add_edge("get_guia", "agent")
graph_builder.add_edge("test_session", END)
# Setup checkpointer
conn = sqlite3.connect(checkpoint_path, check_same_thread=False)
memory = SqliteSaver(conn)
self._graph = graph_builder.compile(checkpointer=memory)
Checkpoint Persistence
SQLite Storage
Conversation state is persisted in SQLite:
chatbot/storage/checkpoints.db
Per-thread storage:
- Each conversation has a unique
thread_id - Full message history is preserved
- Tool call/response pairs are tracked
- Test session progress is maintained
State Restoration
def call_agent(self, query: str, id: str, asignatura: str | None = None):
"""Call agent - state is automatically restored from checkpoint."""
state = {
"messages": [HumanMessage(content=query)],
"asignatura": asignatura,
"context": [],
}
config = {"configurable": {"thread_id": id}}
# LangGraph automatically restores previous state for this thread_id
return self._graph.invoke(state, config=config)
Test Session Subgraph
The test session uses a separate subgraph with human-in-the-loop interrupts.
TestSessionState
class TestSessionState(MessagesState):
"""State shared with parent graph."""
topic: str
num_questions: int
difficulty: str | None
asignatura: str | None
questions: list[MultipleChoiceTest]
current_question_index: int
user_answers: list[str]
feedback_history: list[str]
scores: list[bool]
pending_feedback: str | None
Subgraph Structure
flowchart TD
subgraph TestSubgraph["Test Session Subgraph"]
Init[initialize_test]
GenQ[generate_queries_node]
Retrieve[retrieve_context_node]
Gen[generate_questions_node]
Present[present_question]
Answer[answer_question]
Router{test_router}
Final[finalize_test]
Init --> GenQ
GenQ --> Retrieve
Retrieve --> Gen
Gen --> Present
Present --> Answer
Answer --> Router
Router -->|continue| Present
Router -->|finalize| Final
end
Interrupt Mechanism
def answer_question(self, state: TestSessionState):
"""Wait for user answer - INTERRUPT happens here."""
# Build interrupt payload
interrupt_payload = {
"action": "answer_question",
"question_num": idx + 1,
"total_questions": state.get("num_questions"),
"question_text": question_text,
}
# INTERRUPT: Execution pauses here
user_answer = interrupt(interrupt_payload)
# When resumed with user's answer:
feedback, is_correct = self.evaluate_answer_with_llm(...)
return {
"user_answers": updated_answers,
"scores": updated_scores,
"current_question_index": idx + 1,
# ...
}
Resume Flow
def call_agent_resume(self, id: str, resume_value: str):
"""Resume an interrupted execution."""
from langgraph.types import Command
config = {"configurable": {"thread_id": id}}
# Resume with user's answer
return self._graph.invoke(Command(resume=resume_value), config=config)
Proactive RAG for Tests
The test subgraph proactively retrieves context before generating questions:
flowchart LR
Topic[Test Topic] --> GenQueries[Generate Search Queries]
GenQueries --> RAG[RAG Search]
RAG --> Context[Retrieved Context]
Context --> GenQuestions[Generate Questions]
GenQuestions --> Questions[Test Questions]
def generate_queries_node(self, state: TestSessionState):
"""Generate search queries for RAG."""
prompt = TEST_QUERY_GENERATION_PROMPT.format(topic=topic, num_queries=3)
response = llm.invoke(prompt)
queries = json.loads(response_text) # ["query1", "query2", "query3"]
return {"queries": queries}
def retrieve_context_node(self, state: TestSessionState):
"""Execute RAG searches."""
for query in queries:
result = rag_search.func(query=query, asignatura=asignatura, top_k=3)
all_context.extend(result.get("results", []))
return {"context": all_context}
Answer Evaluation
Test answers are evaluated using the LLM:
def evaluate_answer_with_llm(self, question, user_answer, state):
"""Evaluate user's answer using LLM."""
# Format RAG context
rag_context = state.get("context", [])
formatted_context = self._format_rag_context(rag_context)
# Build evaluation prompt
evaluation_prompt = TEST_EVALUATION_PROMPT.format(
topic=state["topic"],
question_text=question_text,
user_answer=user_answer,
correct_answer_hint=correct_answer_hint,
rag_context=formatted_context,
)
response = self.llm.invoke(evaluation_prompt)
# Parse: Is answer correct? What's the feedback?
# ...
return feedback, is_correct
API Integration
Main Chat Endpoint
# api.py
agente = GraphAgent(llm_provider=settings.llm_provider)
@app.post("/chat")
async def chat(chat_request: ChatRequest):
respuesta = agente.call_agent(
query=chat_request.query,
id=chat_request.id,
asignatura=chat_request.asignatura,
)
# Check for interrupt
if "__interrupt__" in respuesta and respuesta["__interrupt__"]:
interrupt_data = respuesta["__interrupt__"][0].value
return ChatResponse(
message=last_ai_message,
interrupted=True,
interrupt_info=InterruptInfo(**interrupt_data),
)
return ChatResponse(message=last_ai_message, interrupted=False)
Resume Endpoint
@app.post("/resume_chat")
async def resume_chat(resume_request: ResumeRequest):
respuesta = agente.call_agent_resume(
id=resume_request.id,
resume_value=resume_request.user_response,
)
# ... handle response
Error Handling
Tool Node Errors
def rag_search(self, state: SubjectState):
try:
rag_result = rag_tool.invoke(args)
except requests.exceptions.Timeout:
return {"context": [], "messages": [
ToolMessage(content="RAG service timeout", tool_call_id=tc["id"])
]}
Missing State
def present_question(self, state: TestSessionState):
if not questions:
print(f"ERROR: No questions in state!")
return {} # Let router handle
if idx >= len(questions):
print(f"ERROR: Index {idx} out of range")
return {}
Adding New Tools
- Create tool with @tool decorator:
# chatbot/logic/tools/new_tool.py
from langchain.tools import tool
@tool
def my_new_tool(arg1: str, arg2: int = 5) -> str:
"""Tool description for the LLM."""
# Implementation
return result
- Add to AVAILABLE_TOOLS:
# chatbot/logic/tools/tools.py
from chatbot.logic.tools.new_tool import my_new_tool
AVAILABLE_TOOLS = [
get_guia,
rag_search,
generate_test,
my_new_tool, # Add here
]
- Add node in graph builder:
# chatbot/logic/graph.py
def build_graph(self):
graph_builder.add_node("my_new_tool", self.my_new_tool_node)
- Add edge mapping:
graph_builder.add_conditional_edges(
"agent",
self.should_continue,
{
"rag_search": "rag_search",
"get_guia": "get_guia",
"generate_test": "test_session",
"my_new_tool": "my_new_tool", # Add mapping
END: END,
},
)
graph_builder.add_edge("my_new_tool", "agent") # Return to think
Related Documentation
- Architecture - System overview
- Tools - Tool implementations
- API Endpoints - API reference