如何使用Agentic RAG和LangGraph构建智能FAQ聊天机器人

如何使用Agentic RAG和LangGraph构建智能FAQ聊天机器人

现在,人工智能代理已成为大小企业的一部分。从在医院填写表格、检查法律文件,到分析视频录像、处理客户支持,我们有人工智能代理来完成各种任务。公司通常会花费数十万美元聘请客户支持人员,他们可以了解客户的需求,并根据公司的指导方针解决客户的问题。如今,拥有一个智能聊天机器人来回答常见问题,可以有效改善客户服务。在本文中,我们将学习如何使用 RAG 代理(检索增强生成)、LangGraph 和 ChromaDB 构建一个能在数秒内解决客户疑问的常见问题聊天机器人。

RAG代理简介

RAG 是时下的热门话题。每个人都在谈论 RAG 并在其基础上构建应用程序。RAG 可帮助 LLM 获取实时数据,从而使 LLM 比以往任何时候都更加准确。 然而,传统的 RAG 系统在选择最佳检索方法、改变检索工作流程或提供多步骤推理时往往会失败。这就是代理 RAG 的用武之地。

代理 RAG 将人工智能代理的能力融入其中,从而增强了传统 RAG 的功能。有了这种超能力,RAG 可以根据查询的性质动态改变工作流程,进行多步推理和多步检索。我们甚至可以将工具集成到代理 RAG 系统中,它可以动态决定何时使用哪种工具。总之,它能提高准确性,使系统更高效、更可扩展。

下面是一个代理 RAG 工作流程示例。

代理 RAG 工作流程

上图表示代理 RAG 框架的架构。它展示了人工智能代理如何与 RAG 结合,在特定条件下做出决策。该图清楚地表明,如果存在一个条件节点,代理就会根据所提供的上下文决定选择哪条边。

常见问题AI聊天机器人的架构

现在,我们将深入探讨我们将要构建的聊天机器人的架构。我们将探索它是如何工作的,以及有哪些重要组件。

下图显示了我们系统的整体结构。我们将使用 LangChain 的开源 AI 代理框架 LangGraph 来实现这一功能。

常见问题AI聊天机器人的架构

我们系统的主要组成部分包括

  1. LangGraph:一个功能强大的开源人工智能代理框架,可高效创建复杂、多代理、基于循环图的代理。这些代理可以在整个工作流程中保持状态,并能高效处理复杂的查询。
  2. LLM:一种高效且功能强大的大型语言模型,可根据用户的指令,利用其所掌握的知识做出相应的回复。在这里,我们将使用 OpenAI 的 o4-mini,它是一种小型推理模型,专为速度、经济性和工具使用而设计。
  3. 向量数据库:向量数据库用于存储、管理和检索向量嵌入,而向量嵌入通常是数据的数字表示。这里我们使用的 ChromaDB 是一个开源的人工智能原生向量数据库。它旨在增强依赖于相似性搜索、语义搜索和其他涉及向量数据任务的系统的能力。

推荐阅读:如何构建客户支持语音代理

构建智能常见问题聊天机器人的实践实施

现在,我们将根据上述架构来实现聊天机器人的端到端工作流程。我们将通过详细的解释、代码和输出示例来逐步实现。那么,让我们开始吧。

第 1 步:安装依赖项

首先,我们将在 Jupyter 笔记本中安装所有必需的库。其中包括 langchain、langgraph、langchain-openai、langchain-community、chromadb、openai、python-dotenv、pydantic 和 pysqlite3 等库。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
!pip install -q langchain langgraph langchain-openai langchain-community chromadb openai python-dotenv pydantic pysqlite3
!pip install -q langchain langgraph langchain-openai langchain-community chromadb openai python-dotenv pydantic pysqlite3
!pip install -q langchain langgraph langchain-openai langchain-community chromadb openai python-dotenv pydantic pysqlite3

第 2 步:导入所需程序库

现在,我们准备导入本项目所需的所有剩余库。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import os
import json
from typing import List, TypedDict, Annotated, Dict
from dotenv import load_dotenv
# Langchain & LangGraph specific imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langgraph.graph import StateGraph, END
import os import json from typing import List, TypedDict, Annotated, Dict from dotenv import load_dotenv # Langchain & LangGraph specific imports from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from pydantic import BaseModel, Field from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langchain_core.documents import Document from langchain_community.vectorstores import Chroma from langgraph.graph import StateGraph, END
import os
import json
from typing import List, TypedDict, Annotated, Dict
from dotenv import load_dotenv
# Langchain & LangGraph specific imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langgraph.graph import StateGraph, END

第 3 步:设置OpenAI API密钥

输入 OpenAI 密钥,将其设置为环境变量。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from getpass import getpass
OPENAI_API_KEY = getpass("OpenAI API Key:")
load_dotenv()
os.getenv("OPENAI_API_KEY")
from getpass import getpass OPENAI_API_KEY = getpass("OpenAI API Key:") load_dotenv() os.getenv("OPENAI_API_KEY")
from getpass import getpass
OPENAI_API_KEY = getpass("OpenAI API Key:")
load_dotenv()
os.getenv("OPENAI_API_KEY")

第 4 步:下载数据集

我们为不同部门制作了 json 格式的常见问题数据集示例。我们需要从硬盘下载并解压缩。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
!gdown 1j6pdIansfQzKOZSEUinnHd8w6GlkKE6w
!unzip -o /content/blog_faq_files.zip
!gdown 1j6pdIansfQzKOZSEUinnHd8w6GlkKE6w !unzip -o /content/blog_faq_files.zip
!gdown 1j6pdIansfQzKOZSEUinnHd8w6GlkKE6w
!unzip -o /content/blog_faq_files.zip

输出:

下载数据集

第 5 步:为映射定义部门名称

现在,让我们定义部门的映射,以便我们的代理系统了解哪个文件属于哪个部门。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Define Department Names (ensure these match metadata used during ingestion)
DEPARTMENTS = [
"Customer Support",
"Product Information",
"Loyalty Program / Rewards"
]
UNKNOWN_DEPARTMENT = "Unknown/Other"
FAQ_FILES = {
"Customer Support": "customer_support_faq.json",
"Product Information": "product_information_faq.json",
"Loyalty Program / Rewards": "loyalty_program_faq.json",
}
# Define Department Names (ensure these match metadata used during ingestion) DEPARTMENTS = [ "Customer Support", "Product Information", "Loyalty Program / Rewards" ] UNKNOWN_DEPARTMENT = "Unknown/Other" FAQ_FILES = { "Customer Support": "customer_support_faq.json", "Product Information": "product_information_faq.json", "Loyalty Program / Rewards": "loyalty_program_faq.json", }
# Define Department Names (ensure these match metadata used during ingestion)
DEPARTMENTS = [
"Customer Support",
"Product Information",
"Loyalty Program / Rewards"
]
UNKNOWN_DEPARTMENT = "Unknown/Other"
FAQ_FILES = {
"Customer Support": "customer_support_faq.json",
"Product Information": "product_information_faq.json",
"Loyalty Program / Rewards": "loyalty_program_faq.json",
}

第 6 步:定义辅助函数

我们将定义一些辅助函数,负责从 json 文件中加载常见问题并将其存储到 ChromaDB 中。

1. load_faqs(…): 这是一个辅助函数,用于从 json 文件中加载常见问题,并将其存储到名为 all_faqs 的列表中。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def load_faqs(file_paths: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]:
"""Loads QA pairs from JSON files for each department."""
all_faqs = {}
print("Loading FAQs...")
for dept, file_path in file_paths.items():
try:
with open(file_path, 'r', encoding='utf-8') as f:
all_faqs[dept] = json.load(f)
print(f" - Loaded {len(all_faqs[dept])} FAQs for {dept}")
except FileNotFoundError:
print(f" - WARNING: FAQ file not found for {dept}: {file_path}. Skipping.")
except json.JSONDecodeError:
print(f" - ERROR: Could not decode JSON for {dept} from {file_path}. Skipping.")
return all_faqs
def load_faqs(file_paths: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]: """Loads QA pairs from JSON files for each department.""" all_faqs = {} print("Loading FAQs...") for dept, file_path in file_paths.items(): try: with open(file_path, 'r', encoding='utf-8') as f: all_faqs[dept] = json.load(f) print(f" - Loaded {len(all_faqs[dept])} FAQs for {dept}") except FileNotFoundError: print(f" - WARNING: FAQ file not found for {dept}: {file_path}. Skipping.") except json.JSONDecodeError: print(f" - ERROR: Could not decode JSON for {dept} from {file_path}. Skipping.") return all_faqs
def load_faqs(file_paths: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]:
"""Loads QA pairs from JSON files for each department."""
all_faqs = {}
print("Loading FAQs...")
for dept, file_path in file_paths.items():
try:
with open(file_path, 'r', encoding='utf-8') as f:
all_faqs[dept] = json.load(f)
print(f"  - Loaded {len(all_faqs[dept])} FAQs for {dept}")
except FileNotFoundError:
print(f"  - WARNING: FAQ file not found for {dept}: {file_path}. Skipping.")
except json.JSONDecodeError:
print(f"  - ERROR: Could not decode JSON for {dept} from {file_path}. Skipping.")
return all_faqs

2. setup_chroma_vector_store(…):该函数设置 ChromaDB 以存储向量嵌入。为此,我们将首先定义 Chroma 配置,即包含 Chroma 数据库文件的目录。然后,我们将常见问题转换为 LangChain 的文档。它将包含元数据和页面内容,这是精确 RAG 的预定义格式。我们可以将问题和答案结合起来,以便更好地进行上下文检索,也可以只嵌入答案。我们将在元数据中保留问题和部门名称。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# ChromaDB Configuration
CHROMA_PERSIST_DIRECTORY = "./chroma_db_store"
CHROMA_COLLECTION_NAME = "Chatbot_faqs"
def setup_chroma_vector_store(
all_faqs: Dict[str, List[Dict[str, str]]],
persist_directory: str,
collection_name: str,
embedding_model: OpenAIEmbeddings,
) -> Chroma:
"""Creates or loads a Chroma vector store with FAQ data and metadata."""
documents = []
print("\nPreparing documents for vector store...")
for department, faqs in all_faqs.items():
for faq in faqs:
# Combine Q&A for better contextual embedding, or just embed answers
# content = f"Question: {faq['question']}\nAnswer: {faq['answer']}"
content = faq['answer'] # Often embedding just the answer is effective for FAQ retrieval
doc = Document(
page_content=content,
metadata={
"department": department,
"question": faq['question'] # Keep question in metadata for potential display
}
)
documents.append(doc)
print(f"Total documents prepared: {len(documents)}")
if not documents:
raise ValueError("No documents found to add to the vector store. Check FAQ loading.")
print(f"Initializing ChromaDB vector store (Persistence: {persist_directory})...")
vector_store = Chroma(
collection_name=collection_name,
embedding_function=embedding_model,
persist_directory=persist_directory,
)
try:
vector_store = Chroma.from_documents(
documents=documents,
embedding=embedding_model,
persist_directory=persist_directory,
collection_name=collection_name
)
print(f"Created and populated ChromaDB with {len(documents)} documents.")
vector_store.persist() # Ensure persistence after creation
print("Vector store persisted.")
except Exception as create_e:
print(f"FATAL ERROR: Could not create Chroma vector store: {create_e}")
raise create_e
print("ChromaDB setup complete.")
return vector_store
# ChromaDB Configuration CHROMA_PERSIST_DIRECTORY = "./chroma_db_store" CHROMA_COLLECTION_NAME = "Chatbot_faqs" def setup_chroma_vector_store( all_faqs: Dict[str, List[Dict[str, str]]], persist_directory: str, collection_name: str, embedding_model: OpenAIEmbeddings, ) -> Chroma: """Creates or loads a Chroma vector store with FAQ data and metadata.""" documents = [] print("\nPreparing documents for vector store...") for department, faqs in all_faqs.items(): for faq in faqs: # Combine Q&A for better contextual embedding, or just embed answers # content = f"Question: {faq['question']}\nAnswer: {faq['answer']}" content = faq['answer'] # Often embedding just the answer is effective for FAQ retrieval doc = Document( page_content=content, metadata={ "department": department, "question": faq['question'] # Keep question in metadata for potential display } ) documents.append(doc) print(f"Total documents prepared: {len(documents)}") if not documents: raise ValueError("No documents found to add to the vector store. Check FAQ loading.") print(f"Initializing ChromaDB vector store (Persistence: {persist_directory})...") vector_store = Chroma( collection_name=collection_name, embedding_function=embedding_model, persist_directory=persist_directory, ) try: vector_store = Chroma.from_documents( documents=documents, embedding=embedding_model, persist_directory=persist_directory, collection_name=collection_name ) print(f"Created and populated ChromaDB with {len(documents)} documents.") vector_store.persist() # Ensure persistence after creation print("Vector store persisted.") except Exception as create_e: print(f"FATAL ERROR: Could not create Chroma vector store: {create_e}") raise create_e print("ChromaDB setup complete.") return vector_store
# ChromaDB Configuration
CHROMA_PERSIST_DIRECTORY = "./chroma_db_store"
CHROMA_COLLECTION_NAME = "Chatbot_faqs"
def setup_chroma_vector_store(
all_faqs: Dict[str, List[Dict[str, str]]],
persist_directory: str,
collection_name: str,
embedding_model: OpenAIEmbeddings,
) -> Chroma:
"""Creates or loads a Chroma vector store with FAQ data and metadata."""
documents = []
print("\nPreparing documents for vector store...")
for department, faqs in all_faqs.items():
for faq in faqs:
# Combine Q&A for better contextual embedding, or just embed answers
# content = f"Question: {faq['question']}\nAnswer: {faq['answer']}"
content = faq['answer'] # Often embedding just the answer is effective for FAQ retrieval
doc = Document(
page_content=content,
metadata={
"department": department,
"question": faq['question'] # Keep question in metadata for potential display
}
)
documents.append(doc)
print(f"Total documents prepared: {len(documents)}")
if not documents:
raise ValueError("No documents found to add to the vector store. Check FAQ loading.")
print(f"Initializing ChromaDB vector store (Persistence: {persist_directory})...")
vector_store = Chroma(
collection_name=collection_name,
embedding_function=embedding_model,
persist_directory=persist_directory,
)
try:
vector_store = Chroma.from_documents(
documents=documents,
embedding=embedding_model,
persist_directory=persist_directory,
collection_name=collection_name
)
print(f"Created and populated ChromaDB with {len(documents)} documents.")
vector_store.persist() # Ensure persistence after creation
print("Vector store persisted.")
except Exception as create_e:
print(f"FATAL ERROR: Could not create Chroma vector store: {create_e}")
raise create_e
print("ChromaDB setup complete.")
return vector_store

第 7 步:定义LangGraph代理组件

现在让我们定义人工智能代理组件,它是我们工作流程的主要组成部分。

1. 状态定义:这是一个 python 类,包含代理运行时的当前状态。它包含查询、情感、部门等变量。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class AgentState(TypedDict):
query: str
sentiment: str
department: str
context: str # Retrieved context for RAG
response: str # Final response to the user
error: str | None # To capture potential errors
class AgentState(TypedDict): query: str sentiment: str department: str context: str # Retrieved context for RAG response: str # Final response to the user error: str | None # To capture potential errors
class AgentState(TypedDict):
query: str
sentiment: str
department: str
context: str # Retrieved context for RAG
response: str # Final response to the user
error: str | None # To capture potential errors

2. Pydantic 模型:我们在此定义了一个 Pydantic 模型,它将确保 LLM 输出的结构化。它包含一个情感值(有“积极”、“消极”和“中性 ”三个值)和一个部门名称(将由 LLM 预测)。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class ClassificationResult(BaseModel):
"""Structured output for query classification."""
sentiment: str = Field(description="Sentiment of the query (positive, neutral, negative)")
department: str = Field(description=f"Most relevant department from the list: {DEPARTMENTS + [UNKNOWN_DEPARTMENT]}. Use '{UNKNOWN_DEPARTMENT}' if unsure or not applicable.")
class ClassificationResult(BaseModel): """Structured output for query classification.""" sentiment: str = Field(description="Sentiment of the query (positive, neutral, negative)") department: str = Field(description=f"Most relevant department from the list: {DEPARTMENTS + [UNKNOWN_DEPARTMENT]}. Use '{UNKNOWN_DEPARTMENT}' if unsure or not applicable.")
class ClassificationResult(BaseModel):
"""Structured output for query classification."""
sentiment: str = Field(description="Sentiment of the query (positive, neutral, negative)")
department: str = Field(description=f"Most relevant department from the list: {DEPARTMENTS + [UNKNOWN_DEPARTMENT]}. Use '{UNKNOWN_DEPARTMENT}' if unsure or not applicable.")

3. 节点:以下是逐一处理各项任务的节点函数。

  • Classify_query_node:它根据查询的性质,将传入的查询分为情感查询和目标部门名称查询。
  • retrieve_context_node:它对向量数据库执行 RAG,并根据部门名称过滤结果。
  • generate_response_node:它根据查询和从数据库中检索的上下文生成最终响应。
  • Human_escalation_node:如果情感是负面的或目标部门未知,它将把查询升级到人工用户。
  • route_query:它根据查询和分类节点的输出确定下一步。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# 3. Nodes
def classify_query_node(state: AgentState) -> Dict[str, str]:
"""
Classifies the user query for sentiment and target department using an LLM.
"""
print("--- Classifying Query ---")
query = state["query"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Use a reliable, cheaper model
# Prepare prompt for classification
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are an expert query classifier for ShopUNow, a retail company.
Analyze the user's query to determine its sentiment and the most relevant department.
The available departments are: {', '.join(DEPARTMENTS)}.
If the query doesn't clearly fit into one of these, or is ambiguous, classify the department as '{UNKNOWN_DEPARTMENT}'.
If the query expresses frustration, anger, dissatisfaction, or complains about a problem, classify sentiment as 'negative'.
If the query is asking a question, seeking information, or making a neutral statement, classify sentiment as 'neutral'.
If the query expresses satisfaction, praise, or positive feedback, classify sentiment as 'positive'.
Respond ONLY with the structured JSON output format."""
),
HumanMessage(content=f"User Query: {query}")
])
# LLM Chain with structured output
classifier_chain = prompt_template | llm.with_structured_output(ClassificationResult)
try:
result: ClassificationResult = classifier_chain.invoke({}) # Pass empty dict as input seems required now
print(f" Classification Result: Sentiment='{result.sentiment}', Department='{result.department}'")
return {
"sentiment": result.sentiment.lower(), # Normalize
"department": result.department
}
except Exception as e:
print(f" Error during classification: {e}")
return {
"sentiment": "neutral", # Default on error
"department": UNKNOWN_DEPARTMENT,
"error": f"Classification failed: {e}"
}
def retrieve_context_node(state: AgentState) -> Dict[str, str]:
"""
Retrieves relevant context from the vector store based on the query and department.
"""
print("--- Retrieving Context ---")
query = state["query"]
department = state["department"]
if not department or department == UNKNOWN_DEPARTMENT:
print(" Skipping retrieval: Department unknown or not applicable.")
return {"context": "", "error": "Cannot retrieve context without a valid department."}
# Initialize embedding model and vector store access
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = Chroma(
collection_name=CHROMA_COLLECTION_NAME,
embedding_function=embedding_model,
persist_directory=CHROMA_PERSIST_DIRECTORY,
)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={
'k': 3, # Retrieve top 3 relevant docs
'filter': {'department': department} # *** CRITICAL: Filter by department ***
}
)
try:
retrieved_docs = retriever.invoke(query)
if retrieved_docs:
context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
print(f" Retrieved {len(retrieved_docs)} documents for department '{department}'.")
# print(f" Context Snippet: {context[:200]}...") # Optional: log snippet
return {"context": context, "error": None}
else:
print(" No relevant documents found in vector store for this department.")
return {"context": "", "error": "No relevant context found."}
except Exception as e:
print(f" Error during context retrieval: {e}")
return {"context": "", "error": f"Retrieval failed: {e}"}
def generate_response_node(state: AgentState) -> Dict[str, str]:
"""
Generates a response using RAG based on the query and retrieved context.
"""
print("--- Generating Response (RAG) ---")
query = state["query"]
context = state["context"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Can use a more capable model for generation
if not context:
print(" No context provided, generating generic response.")
# Fallback if retrieval failed but routing decided RAG path anyway
response_text = "I couldn't find specific information related to your query in our knowledge base. Could you please rephrase or provide more details?"
return {"response": response_text}
# RAG Prompt
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are a helpful AI Chatbot for ShopUNow. Answer the user's query based *only* on the provided context.
Be concise and directly address the query. If the context doesn't contain the answer, state that clearly.
Do not make up information.
Context:
---
{context}
---"""
),
HumanMessage(content=f"User Query: {query}")
])
RAG_chain = prompt_template | llm
try:
response = RAG_chain.invoke({})
response_text = response.content
print(f" Generated RAG Response: {response_text[:200]}...")
return {"response": response_text}
except Exception as e:
print(f" Error during response generation: {e}")
return {"response": "Sorry, I encountered an error while generating the response.", "error": f"Generation failed: {e}"}
def human_escalation_node(state: AgentState) -> Dict[str, str]:
"""
Provides a message indicating the query will be escalated to a human.
"""
print("--- Escalating to Human Support ---")
reason = ""
if state.get("sentiment") == "negative":
reason = "Due to the nature of your query,"
elif state.get("department") == UNKNOWN_DEPARTMENT:
reason = "As your query requires specific attention,"
response_text = f"{reason} I need to escalate this to our human support team. They will review your request and get back to you shortly. Thank you for your patience."
print(f" Escalation Message: {response_text}")
return {"response": response_text}
# 4. Conditional Routing Logic
def route_query(state: AgentState) -> str:
"""Determines the next step based on classification results."""
print("--- Routing Decision ---")
sentiment = state.get("sentiment", "neutral")
department = state.get("department", UNKNOWN_DEPARTMENT)
if sentiment == "negative" or department == UNKNOWN_DEPARTMENT:
print(f" Routing to: human_escalation (Sentiment: {sentiment}, Department: {department})")
return "human_escalation"
else:
print(f" Routing to: retrieve_context (Sentiment: {sentiment}, Department: {department})")
return "retrieve_context"
# 3. Nodes def classify_query_node(state: AgentState) -> Dict[str, str]: """ Classifies the user query for sentiment and target department using an LLM. """ print("--- Classifying Query ---") query = state["query"] llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Use a reliable, cheaper model # Prepare prompt for classification prompt_template = ChatPromptTemplate.from_messages([ SystemMessage( content=f"""You are an expert query classifier for ShopUNow, a retail company. Analyze the user's query to determine its sentiment and the most relevant department. The available departments are: {', '.join(DEPARTMENTS)}. If the query doesn't clearly fit into one of these, or is ambiguous, classify the department as '{UNKNOWN_DEPARTMENT}'. If the query expresses frustration, anger, dissatisfaction, or complains about a problem, classify sentiment as 'negative'. If the query is asking a question, seeking information, or making a neutral statement, classify sentiment as 'neutral'. If the query expresses satisfaction, praise, or positive feedback, classify sentiment as 'positive'. Respond ONLY with the structured JSON output format.""" ), HumanMessage(content=f"User Query: {query}") ]) # LLM Chain with structured output classifier_chain = prompt_template | llm.with_structured_output(ClassificationResult) try: result: ClassificationResult = classifier_chain.invoke({}) # Pass empty dict as input seems required now print(f" Classification Result: Sentiment='{result.sentiment}', Department='{result.department}'") return { "sentiment": result.sentiment.lower(), # Normalize "department": result.department } except Exception as e: print(f" Error during classification: {e}") return { "sentiment": "neutral", # Default on error "department": UNKNOWN_DEPARTMENT, "error": f"Classification failed: {e}" } def retrieve_context_node(state: AgentState) -> Dict[str, str]: """ Retrieves relevant context from the vector store based on the query and department. """ print("--- Retrieving Context ---") query = state["query"] department = state["department"] if not department or department == UNKNOWN_DEPARTMENT: print(" Skipping retrieval: Department unknown or not applicable.") return {"context": "", "error": "Cannot retrieve context without a valid department."} # Initialize embedding model and vector store access embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY) vector_store = Chroma( collection_name=CHROMA_COLLECTION_NAME, embedding_function=embedding_model, persist_directory=CHROMA_PERSIST_DIRECTORY, ) retriever = vector_store.as_retriever( search_type="similarity", search_kwargs={ 'k': 3, # Retrieve top 3 relevant docs 'filter': {'department': department} # *** CRITICAL: Filter by department *** } ) try: retrieved_docs = retriever.invoke(query) if retrieved_docs: context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs]) print(f" Retrieved {len(retrieved_docs)} documents for department '{department}'.") # print(f" Context Snippet: {context[:200]}...") # Optional: log snippet return {"context": context, "error": None} else: print(" No relevant documents found in vector store for this department.") return {"context": "", "error": "No relevant context found."} except Exception as e: print(f" Error during context retrieval: {e}") return {"context": "", "error": f"Retrieval failed: {e}"} def generate_response_node(state: AgentState) -> Dict[str, str]: """ Generates a response using RAG based on the query and retrieved context. """ print("--- Generating Response (RAG) ---") query = state["query"] context = state["context"] llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Can use a more capable model for generation if not context: print(" No context provided, generating generic response.") # Fallback if retrieval failed but routing decided RAG path anyway response_text = "I couldn't find specific information related to your query in our knowledge base. Could you please rephrase or provide more details?" return {"response": response_text} # RAG Prompt prompt_template = ChatPromptTemplate.from_messages([ SystemMessage( content=f"""You are a helpful AI Chatbot for ShopUNow. Answer the user's query based *only* on the provided context. Be concise and directly address the query. If the context doesn't contain the answer, state that clearly. Do not make up information. Context: --- {context} ---""" ), HumanMessage(content=f"User Query: {query}") ]) RAG_chain = prompt_template | llm try: response = RAG_chain.invoke({}) response_text = response.content print(f" Generated RAG Response: {response_text[:200]}...") return {"response": response_text} except Exception as e: print(f" Error during response generation: {e}") return {"response": "Sorry, I encountered an error while generating the response.", "error": f"Generation failed: {e}"} def human_escalation_node(state: AgentState) -> Dict[str, str]: """ Provides a message indicating the query will be escalated to a human. """ print("--- Escalating to Human Support ---") reason = "" if state.get("sentiment") == "negative": reason = "Due to the nature of your query," elif state.get("department") == UNKNOWN_DEPARTMENT: reason = "As your query requires specific attention," response_text = f"{reason} I need to escalate this to our human support team. They will review your request and get back to you shortly. Thank you for your patience." print(f" Escalation Message: {response_text}") return {"response": response_text} # 4. Conditional Routing Logic def route_query(state: AgentState) -> str: """Determines the next step based on classification results.""" print("--- Routing Decision ---") sentiment = state.get("sentiment", "neutral") department = state.get("department", UNKNOWN_DEPARTMENT) if sentiment == "negative" or department == UNKNOWN_DEPARTMENT: print(f" Routing to: human_escalation (Sentiment: {sentiment}, Department: {department})") return "human_escalation" else: print(f" Routing to: retrieve_context (Sentiment: {sentiment}, Department: {department})") return "retrieve_context"
# 3. Nodes
def classify_query_node(state: AgentState) -> Dict[str, str]:
"""
Classifies the user query for sentiment and target department using an LLM.
"""
print("--- Classifying Query ---")
query = state["query"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Use a reliable, cheaper model
# Prepare prompt for classification
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are an expert query classifier for ShopUNow, a retail company.
Analyze the user's query to determine its sentiment and the most relevant department.
The available departments are: {', '.join(DEPARTMENTS)}.
If the query doesn't clearly fit into one of these, or is ambiguous, classify the department as '{UNKNOWN_DEPARTMENT}'.
If the query expresses frustration, anger, dissatisfaction, or complains about a problem, classify sentiment as 'negative'.
If the query is asking a question, seeking information, or making a neutral statement, classify sentiment as 'neutral'.
If the query expresses satisfaction, praise, or positive feedback, classify sentiment as 'positive'.
Respond ONLY with the structured JSON output format."""
),
HumanMessage(content=f"User Query: {query}")
])
# LLM Chain with structured output
classifier_chain = prompt_template | llm.with_structured_output(ClassificationResult)
try:
result: ClassificationResult = classifier_chain.invoke({}) # Pass empty dict as input seems required now
print(f"  Classification Result: Sentiment='{result.sentiment}', Department='{result.department}'")
return {
"sentiment": result.sentiment.lower(), # Normalize
"department": result.department
}
except Exception as e:
print(f"  Error during classification: {e}")
return {
"sentiment": "neutral", # Default on error
"department": UNKNOWN_DEPARTMENT,
"error": f"Classification failed: {e}"
}
def retrieve_context_node(state: AgentState) -> Dict[str, str]:
"""
Retrieves relevant context from the vector store based on the query and department.
"""
print("--- Retrieving Context ---")
query = state["query"]
department = state["department"]
if not department or department == UNKNOWN_DEPARTMENT:
print("  Skipping retrieval: Department unknown or not applicable.")
return {"context": "", "error": "Cannot retrieve context without a valid department."}
# Initialize embedding model and vector store access
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = Chroma(
collection_name=CHROMA_COLLECTION_NAME,
embedding_function=embedding_model,
persist_directory=CHROMA_PERSIST_DIRECTORY,
)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={
'k': 3, # Retrieve top 3 relevant docs
'filter': {'department': department} # *** CRITICAL: Filter by department ***
}
)
try:
retrieved_docs = retriever.invoke(query)
if retrieved_docs:
context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
print(f"  Retrieved {len(retrieved_docs)} documents for department '{department}'.")
# print(f"  Context Snippet: {context[:200]}...") # Optional: log snippet
return {"context": context, "error": None}
else:
print("  No relevant documents found in vector store for this department.")
return {"context": "", "error": "No relevant context found."}
except Exception as e:
print(f"  Error during context retrieval: {e}")
return {"context": "", "error": f"Retrieval failed: {e}"}
def generate_response_node(state: AgentState) -> Dict[str, str]:
"""
Generates a response using RAG based on the query and retrieved context.
"""
print("--- Generating Response (RAG) ---")
query = state["query"]
context = state["context"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Can use a more capable model for generation
if not context:
print("  No context provided, generating generic response.")
# Fallback if retrieval failed but routing decided RAG path anyway
response_text = "I couldn't find specific information related to your query in our knowledge base. Could you please rephrase or provide more details?"
return {"response": response_text}
# RAG Prompt
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are a helpful AI Chatbot for ShopUNow. Answer the user's query based *only* on the provided context.
Be concise and directly address the query. If the context doesn't contain the answer, state that clearly.
Do not make up information.
Context:
---
{context}
---"""
),
HumanMessage(content=f"User Query: {query}")
])
RAG_chain = prompt_template | llm
try:
response = RAG_chain.invoke({})
response_text = response.content
print(f"  Generated RAG Response: {response_text[:200]}...")
return {"response": response_text}
except Exception as e:
print(f"  Error during response generation: {e}")
return {"response": "Sorry, I encountered an error while generating the response.", "error": f"Generation failed: {e}"}
def human_escalation_node(state: AgentState) -> Dict[str, str]:
"""
Provides a message indicating the query will be escalated to a human.
"""
print("--- Escalating to Human Support ---")
reason = ""
if state.get("sentiment") == "negative":
reason = "Due to the nature of your query,"
elif state.get("department") == UNKNOWN_DEPARTMENT:
reason = "As your query requires specific attention,"
response_text = f"{reason} I need to escalate this to our human support team. They will review your request and get back to you shortly. Thank you for your patience."
print(f"  Escalation Message: {response_text}")
return {"response": response_text}
# 4. Conditional Routing Logic
def route_query(state: AgentState) -> str:
"""Determines the next step based on classification results."""
print("--- Routing Decision ---")
sentiment = state.get("sentiment", "neutral")
department = state.get("department", UNKNOWN_DEPARTMENT)
if sentiment == "negative" or department == UNKNOWN_DEPARTMENT:
print(f"  Routing to: human_escalation (Sentiment: {sentiment}, Department: {department})")
return "human_escalation"
else:
print(f"  Routing to: retrieve_context (Sentiment: {sentiment}, Department: {department})")
return "retrieve_context"

第 8 步:定义图形函数

让我们为图形创建函数,并为图形分配节点和边。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# --- Graph Definition ---
def build_agent_graph(vector_store: Chroma) -> StateGraph:
"""Builds the LangGraph agent."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("classify_query", classify_query_node)
graph.add_node("retrieve_context", retrieve_context_node)
graph.add_node("generate_response", generate_response_node)
graph.add_node("human_escalation", human_escalation_node)
# Set entry point
graph.set_entry_point("classify_query")
# Add edges
graph.add_conditional_edges(
"classify_query", # Source node
route_query, # Function to determine the route
{ # Mapping: output of route_query -> destination node
"retrieve_context": "retrieve_context",
"human_escalation": "human_escalation"
}
)
graph.add_edge("retrieve_context", "generate_response")
graph.add_edge("generate_response", END)
graph.add_edge("human_escalation", END)
# Compile the graph
# memory = SqliteSaver.from_conn_string(":memory:") # Example for in-memory persistence
app = graph.compile() # checkpointer=memory optional for stateful conversations
print("\nAgent graph compiled successfully.")
return app
# --- Graph Definition --- def build_agent_graph(vector_store: Chroma) -> StateGraph: """Builds the LangGraph agent.""" graph = StateGraph(AgentState) # Add nodes graph.add_node("classify_query", classify_query_node) graph.add_node("retrieve_context", retrieve_context_node) graph.add_node("generate_response", generate_response_node) graph.add_node("human_escalation", human_escalation_node) # Set entry point graph.set_entry_point("classify_query") # Add edges graph.add_conditional_edges( "classify_query", # Source node route_query, # Function to determine the route { # Mapping: output of route_query -> destination node "retrieve_context": "retrieve_context", "human_escalation": "human_escalation" } ) graph.add_edge("retrieve_context", "generate_response") graph.add_edge("generate_response", END) graph.add_edge("human_escalation", END) # Compile the graph # memory = SqliteSaver.from_conn_string(":memory:") # Example for in-memory persistence app = graph.compile() # checkpointer=memory optional for stateful conversations print("\nAgent graph compiled successfully.") return app
# --- Graph Definition ---
def build_agent_graph(vector_store: Chroma) -> StateGraph:
"""Builds the LangGraph agent."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("classify_query", classify_query_node)
graph.add_node("retrieve_context", retrieve_context_node)
graph.add_node("generate_response", generate_response_node)
graph.add_node("human_escalation", human_escalation_node)
# Set entry point
graph.set_entry_point("classify_query")
# Add edges
graph.add_conditional_edges(
"classify_query", # Source node
route_query,      # Function to determine the route
{                 # Mapping: output of route_query -> destination node
"retrieve_context": "retrieve_context",
"human_escalation": "human_escalation"
}
)
graph.add_edge("retrieve_context", "generate_response")
graph.add_edge("generate_response", END)
graph.add_edge("human_escalation", END)
# Compile the graph
# memory = SqliteSaver.from_conn_string(":memory:") # Example for in-memory persistence
app = graph.compile() # checkpointer=memory optional for stateful conversations
print("\nAgent graph compiled successfully.")
return app

第 9 步:启动代理执行

现在,我们将初始化代理并开始执行工作流程。

1. 首先加载常见问题。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# 1. Load FAQs
faqs_data = load_faqs(FAQ_FILES)
if not faqs_data:
print("ERROR: No FAQ data loaded. Exiting.")
exit()
# 1. Load FAQs faqs_data = load_faqs(FAQ_FILES) if not faqs_data: print("ERROR: No FAQ data loaded. Exiting.") exit()
# 1. Load FAQs
faqs_data = load_faqs(FAQ_FILES)
if not faqs_data:
print("ERROR: No FAQ data loaded. Exiting.")
exit()

输出:

加载常见问题

2. 设置嵌入模型。在此,我们将设置 OpenAI 嵌入模型,以加快检索速度。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# 2. Setup Vector Store
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = setup_chroma_vector_store(
faqs_data,
CHROMA_PERSIST_DIRECTORY,
CHROMA_COLLECTION_NAME,
embedding_model
)
# 2. Setup Vector Store embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY) vector_store = setup_chroma_vector_store( faqs_data, CHROMA_PERSIST_DIRECTORY, CHROMA_COLLECTION_NAME, embedding_model )
# 2. Setup Vector Store
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = setup_chroma_vector_store(
faqs_data,
CHROMA_PERSIST_DIRECTORY,
CHROMA_COLLECTION_NAME,
embedding_model
)

输出:

设置嵌入模型

推荐阅读:如何为您的 RAG 模型选择正确的嵌入方式?

3. 现在,使用预定义函数构建代理,使用美人鱼图将代理流程可视化。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# 3. Build the Agent Graph
agent_app = build_agent_graph(vector_store)
from IPython.display import display, Image, Markdown
display(Image(agent_app.get_graph().draw_mermaid_png()))
# 3. Build the Agent Graph agent_app = build_agent_graph(vector_store) from IPython.display import display, Image, Markdown display(Image(agent_app.get_graph().draw_mermaid_png()))
# 3. Build the Agent Graph
agent_app = build_agent_graph(vector_store)
from IPython.display import display, Image, Markdown
display(Image(agent_app.get_graph().draw_mermaid_png()))

输出:

使用美人鱼图将代理流程可视化

第 10 步:测试代理

我们已经到达工作流程的最后一部分。到目前为止,我们已经构建了几个节点和函数。现在是测试我们的代理并查看输出的时候了。

1. 首先,让我们定义测试查询。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Test the Agent
test_queries = [
"How do I track my order?",
"What is the return policy?",
"Tell me about the 'Urban Explorer' jacket materials.",
]
# Test the Agent test_queries = [ "How do I track my order?", "What is the return policy?", "Tell me about the 'Urban Explorer' jacket materials.", ]
# Test the Agent
test_queries = [
"How do I track my order?",
"What is the return policy?",
"Tell me about the 'Urban Explorer' jacket materials.",
]

2. 现在我们来测试一下代理。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
print("\n--- Testing Agent ---")
for query in test_queries:
print(f"\nInput Query: {query}")
# Define the input for the graph invocation
inputs = {"query": query}
# try:
# Invoke the graph
# The config argument is optional but useful for stateful execution if needed
# config = {"configurable": {"thread_id": "user_123"}} # Example config
final_state = agent_app.invoke(inputs) #, config=config)
print(f"Final State Department: {final_state.get('department')}")
print(f"Final State Sentiment: {final_state.get('sentiment')}")
print(f"Agent Response: {final_state.get('response')}")
if final_state.get('error'):
print(f"Error encountered: {final_state.get('error')}")
# except Exception as e:
# print(f"ERROR running agent graph for query '{query}': {e}")
# import traceback
# traceback.print_exc() # Print detailed traceback for debugging
print("\n--- Agent Testing Complete ---")
print("\n--- Testing Agent ---") for query in test_queries: print(f"\nInput Query: {query}") # Define the input for the graph invocation inputs = {"query": query} # try: # Invoke the graph # The config argument is optional but useful for stateful execution if needed # config = {"configurable": {"thread_id": "user_123"}} # Example config final_state = agent_app.invoke(inputs) #, config=config) print(f"Final State Department: {final_state.get('department')}") print(f"Final State Sentiment: {final_state.get('sentiment')}") print(f"Agent Response: {final_state.get('response')}") if final_state.get('error'): print(f"Error encountered: {final_state.get('error')}") # except Exception as e: # print(f"ERROR running agent graph for query '{query}': {e}") # import traceback # traceback.print_exc() # Print detailed traceback for debugging print("\n--- Agent Testing Complete ---")
print("\n--- Testing Agent ---")
for query in test_queries:
print(f"\nInput Query: {query}")
# Define the input for the graph invocation
inputs = {"query": query}
# try:
# Invoke the graph
# The config argument is optional but useful for stateful execution if needed
# config = {"configurable": {"thread_id": "user_123"}} # Example config
final_state = agent_app.invoke(inputs) #, config=config)
print(f"Final State Department: {final_state.get('department')}")
print(f"Final State Sentiment: {final_state.get('sentiment')}")
print(f"Agent Response: {final_state.get('response')}")
if final_state.get('error'):
print(f"Error encountered: {final_state.get('error')}")
# except Exception as e:
#     print(f"ERROR running agent graph for query '{query}': {e}")
#     import traceback
#     traceback.print_exc() # Print detailed traceback for debugging
print("\n--- Agent Testing Complete ---")

输出:

测试代理

我们可以从输出结果中看到,我们的代理表现良好。首先,它对查询进行了分类,然后将决定发送到检索节点或人工节点。然后,检索部分开始,它成功地从向量数据库中检索出上下文。最后,根据需要生成响应。这样,我们就制作出了智能常见问题聊天机器人。

你可以在这里访问包含所有代码的 Colab Notebook。

小结

如果您已经学习到这里,就意味着您已经学会了如何使用代理 RAG 和 LangGraph 构建智能常见问题聊天机器人。在这里,我们看到构建一个可以推理和决策的智能代理并不难。我们构建的代理聊天机器人成本低、速度快,而且能够完全理解问题或输入查询的上下文。我们在这里使用的架构是完全可定制的,这意味着人们可以根据自己的特定用例编辑代理的任何节点。有了代理 RAG、LangGraph 和 ChromaDB,制作代理从未如此简单。我相信,我们在本指南中介绍的内容已经让你掌握了使用这些工具构建更复杂系统的基础知识。

评论留言