
現在,人工智慧代理已成為大小企業的一部分。從在醫院填寫表格、檢查法律檔案,到分析影片錄影、處理客戶支援,我們有人工智慧代理來完成各種任務。公司通常會花費數十萬美元聘請客戶支援人員,他們可以瞭解客戶的需求,並根據公司的指導方針解決客戶的問題。如今,擁有一個智慧聊天機器人來回答常見問題,可以有效改善客戶服務。在本文中,我們將學習如何使用 RAG 代理(檢索增強生成)、LangGraph 和 ChromaDB 構建一個能在數秒內解決客戶疑問的常見問題聊天機器人。
RAG代理簡介
RAG 是時下的熱門話題。每個人都在談論 RAG 並在其基礎上構建應用程式。RAG 可幫助 LLM 獲取即時資料,從而使 LLM 比以往任何時候都更加準確。 然而,傳統的 RAG 系統在選擇最佳檢索方法、改變檢索工作流程或提供多步驟推理時往往會失敗。這就是代理 RAG 的用武之地。
代理 RAG 將人工智慧代理的能力融入其中,從而增強了傳統 RAG 的功能。有了這種超能力,RAG 可以根據查詢的性質動態改變工作流程,進行多步推理和多步檢索。我們甚至可以將工具整合到代理 RAG 系統中,它可以動態決定何時使用哪種工具。總之,它能提高準確性,使系統更高效、更可擴充套件。
下面是一個代理 RAG 工作流程示例。

上圖表示代理 RAG 框架的架構。它展示了人工智慧代理如何與 RAG 結合,在特定條件下做出決策。該圖清楚地表明,如果存在一個條件節點,代理就會根據所提供的上下文決定選擇哪條邊。
常見問題AI聊天機器人的架構
現在,我們將深入探討我們將要構建的聊天機器人的架構。我們將探索它是如何工作的,以及有哪些重要元件。
下圖顯示了我們系統的整體結構。我們將使用 LangChain 的開源 AI 代理框架 LangGraph 來實現這一功能。

我們系統的主要組成部分包括
- LangGraph:一個功能強大的開源人工智慧代理框架,可高效建立複雜、多代理、基於迴圈圖的代理。這些代理可以在整個工作流程中保持狀態,並能高效處理複雜的查詢。
- LLM:一種高效且功能強大的大型語言模型,可根據使用者的指令,利用其所掌握的知識做出相應的回覆。在這裡,我們將使用 OpenAI 的 o4-mini,它是一種小型推理模型,專為速度、經濟性和工具使用而設計。
- 向量資料庫:向量資料庫用於儲存、管理和檢索向量嵌入,而向量嵌入通常是資料的數字表示。這裡我們使用的 ChromaDB 是一個開源的人工智慧原生向量資料庫。它旨在增強依賴於相似性搜尋、語義搜尋和其他涉及向量資料任務的系統的能力。
推薦閱讀:如何構建客戶支援語音代理
構建智慧常見問題聊天機器人的實踐實施
現在,我們將根據上述架構來實現聊天機器人的端到端工作流程。我們將透過詳細的解釋、程式碼和輸出示例來逐步實現。那麼,讓我們開始吧。
第 1 步:安裝依賴項
首先,我們將在 Jupyter 筆記本中安裝所有必需的庫。其中包括 langchain、langgraph、langchain-openai、langchain-community、chromadb、openai、python-dotenv、pydantic 和 pysqlite3 等庫。
!pip install -q langchain langgraph langchain-openai langchain-community chromadb openai python-dotenv pydantic pysqlite3
!pip install -q langchain langgraph langchain-openai langchain-community chromadb openai python-dotenv pydantic pysqlite3
!pip install -q langchain langgraph langchain-openai langchain-community chromadb openai python-dotenv pydantic pysqlite3
第 2 步:匯入所需程式庫
現在,我們準備匯入本專案所需的所有剩餘庫。
from typing import List, TypedDict, Annotated, Dict
from dotenv import load_dotenv
# Langchain & LangGraph specific imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langgraph.graph import StateGraph, END
import os
import json
from typing import List, TypedDict, Annotated, Dict
from dotenv import load_dotenv
# Langchain & LangGraph specific imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langgraph.graph import StateGraph, END
import os
import json
from typing import List, TypedDict, Annotated, Dict
from dotenv import load_dotenv
# Langchain & LangGraph specific imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langgraph.graph import StateGraph, END
第 3 步:設定OpenAI API金鑰
輸入 OpenAI 金鑰,將其設定為環境變數。
from getpass import getpass
OPENAI_API_KEY = getpass("OpenAI API Key:")
os.getenv("OPENAI_API_KEY")
from getpass import getpass
OPENAI_API_KEY = getpass("OpenAI API Key:")
load_dotenv()
os.getenv("OPENAI_API_KEY")
from getpass import getpass
OPENAI_API_KEY = getpass("OpenAI API Key:")
load_dotenv()
os.getenv("OPENAI_API_KEY")
第 4 步:下載資料集
我們為不同部門製作了 json 格式的常見問題資料集示例。我們需要從硬碟下載並解壓縮。
!gdown 1j6pdIansfQzKOZSEUinnHd8w6GlkKE6w
!unzip -o /content/blog_faq_files.zip
!gdown 1j6pdIansfQzKOZSEUinnHd8w6GlkKE6w
!unzip -o /content/blog_faq_files.zip
!gdown 1j6pdIansfQzKOZSEUinnHd8w6GlkKE6w
!unzip -o /content/blog_faq_files.zip
輸出:

第 5 步:為對映定義部門名稱
現在,讓我們定義部門的對映,以便我們的代理系統瞭解哪個檔案屬於哪個部門。
# Define Department Names (ensure these match metadata used during ingestion)
"Loyalty Program / Rewards"
UNKNOWN_DEPARTMENT = "Unknown/Other"
"Customer Support": "customer_support_faq.json",
"Product Information": "product_information_faq.json",
"Loyalty Program / Rewards": "loyalty_program_faq.json",
# Define Department Names (ensure these match metadata used during ingestion)
DEPARTMENTS = [
"Customer Support",
"Product Information",
"Loyalty Program / Rewards"
]
UNKNOWN_DEPARTMENT = "Unknown/Other"
FAQ_FILES = {
"Customer Support": "customer_support_faq.json",
"Product Information": "product_information_faq.json",
"Loyalty Program / Rewards": "loyalty_program_faq.json",
}
# Define Department Names (ensure these match metadata used during ingestion)
DEPARTMENTS = [
"Customer Support",
"Product Information",
"Loyalty Program / Rewards"
]
UNKNOWN_DEPARTMENT = "Unknown/Other"
FAQ_FILES = {
"Customer Support": "customer_support_faq.json",
"Product Information": "product_information_faq.json",
"Loyalty Program / Rewards": "loyalty_program_faq.json",
}
第 6 步:定義輔助函式
我們將定義一些輔助函式,負責從 json 檔案中載入常見問題並將其儲存到 ChromaDB 中。
1. load_faqs(…): 這是一個輔助函式,用於從 json 檔案中載入常見問題,並將其儲存到名為 all_faqs 的列表中。
def load_faqs(file_paths: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]:
"""Loads QA pairs from JSON files for each department."""
for dept, file_path in file_paths.items():
with open(file_path, 'r', encoding='utf-8') as f:
all_faqs[dept] = json.load(f)
print(f" - Loaded {len(all_faqs[dept])} FAQs for {dept}")
except FileNotFoundError:
print(f" - WARNING: FAQ file not found for {dept}: {file_path}. Skipping.")
except json.JSONDecodeError:
print(f" - ERROR: Could not decode JSON for {dept} from {file_path}. Skipping.")
def load_faqs(file_paths: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]:
"""Loads QA pairs from JSON files for each department."""
all_faqs = {}
print("Loading FAQs...")
for dept, file_path in file_paths.items():
try:
with open(file_path, 'r', encoding='utf-8') as f:
all_faqs[dept] = json.load(f)
print(f" - Loaded {len(all_faqs[dept])} FAQs for {dept}")
except FileNotFoundError:
print(f" - WARNING: FAQ file not found for {dept}: {file_path}. Skipping.")
except json.JSONDecodeError:
print(f" - ERROR: Could not decode JSON for {dept} from {file_path}. Skipping.")
return all_faqs
def load_faqs(file_paths: Dict[str, str]) -> Dict[str, List[Dict[str, str]]]:
"""Loads QA pairs from JSON files for each department."""
all_faqs = {}
print("Loading FAQs...")
for dept, file_path in file_paths.items():
try:
with open(file_path, 'r', encoding='utf-8') as f:
all_faqs[dept] = json.load(f)
print(f" - Loaded {len(all_faqs[dept])} FAQs for {dept}")
except FileNotFoundError:
print(f" - WARNING: FAQ file not found for {dept}: {file_path}. Skipping.")
except json.JSONDecodeError:
print(f" - ERROR: Could not decode JSON for {dept} from {file_path}. Skipping.")
return all_faqs
2. setup_chroma_vector_store(…):該函式設定 ChromaDB 以儲存向量嵌入。為此,我們將首先定義 Chroma 配置,即包含 Chroma 資料庫檔案的目錄。然後,我們將常見問題轉換為 LangChain 的文件。它將包含後設資料和頁面內容,這是精確 RAG 的預定義格式。我們可以將問題和答案結合起來,以便更好地進行上下文檢索,也可以只嵌入答案。我們將在後設資料中保留問題和部門名稱。
CHROMA_PERSIST_DIRECTORY = "./chroma_db_store"
CHROMA_COLLECTION_NAME = "Chatbot_faqs"
def setup_chroma_vector_store(
all_faqs: Dict[str, List[Dict[str, str]]],
embedding_model: OpenAIEmbeddings,
"""Creates or loads a Chroma vector store with FAQ data and metadata."""
print("\nPreparing documents for vector store...")
for department, faqs in all_faqs.items():
# Combine Q&A for better contextual embedding, or just embed answers
# content = f"Question: {faq['question']}\nAnswer: {faq['answer']}"
content = faq['answer'] # Often embedding just the answer is effective for FAQ retrieval
"department": department,
"question": faq['question'] # Keep question in metadata for potential display
print(f"Total documents prepared: {len(documents)}")
raise ValueError("No documents found to add to the vector store. Check FAQ loading.")
print(f"Initializing ChromaDB vector store (Persistence: {persist_directory})...")
collection_name=collection_name,
embedding_function=embedding_model,
persist_directory=persist_directory,
vector_store = Chroma.from_documents(
embedding=embedding_model,
persist_directory=persist_directory,
collection_name=collection_name
print(f"Created and populated ChromaDB with {len(documents)} documents.")
vector_store.persist() # Ensure persistence after creation
print("Vector store persisted.")
except Exception as create_e:
print(f"FATAL ERROR: Could not create Chroma vector store: {create_e}")
print("ChromaDB setup complete.")
# ChromaDB Configuration
CHROMA_PERSIST_DIRECTORY = "./chroma_db_store"
CHROMA_COLLECTION_NAME = "Chatbot_faqs"
def setup_chroma_vector_store(
all_faqs: Dict[str, List[Dict[str, str]]],
persist_directory: str,
collection_name: str,
embedding_model: OpenAIEmbeddings,
) -> Chroma:
"""Creates or loads a Chroma vector store with FAQ data and metadata."""
documents = []
print("\nPreparing documents for vector store...")
for department, faqs in all_faqs.items():
for faq in faqs:
# Combine Q&A for better contextual embedding, or just embed answers
# content = f"Question: {faq['question']}\nAnswer: {faq['answer']}"
content = faq['answer'] # Often embedding just the answer is effective for FAQ retrieval
doc = Document(
page_content=content,
metadata={
"department": department,
"question": faq['question'] # Keep question in metadata for potential display
}
)
documents.append(doc)
print(f"Total documents prepared: {len(documents)}")
if not documents:
raise ValueError("No documents found to add to the vector store. Check FAQ loading.")
print(f"Initializing ChromaDB vector store (Persistence: {persist_directory})...")
vector_store = Chroma(
collection_name=collection_name,
embedding_function=embedding_model,
persist_directory=persist_directory,
)
try:
vector_store = Chroma.from_documents(
documents=documents,
embedding=embedding_model,
persist_directory=persist_directory,
collection_name=collection_name
)
print(f"Created and populated ChromaDB with {len(documents)} documents.")
vector_store.persist() # Ensure persistence after creation
print("Vector store persisted.")
except Exception as create_e:
print(f"FATAL ERROR: Could not create Chroma vector store: {create_e}")
raise create_e
print("ChromaDB setup complete.")
return vector_store
# ChromaDB Configuration
CHROMA_PERSIST_DIRECTORY = "./chroma_db_store"
CHROMA_COLLECTION_NAME = "Chatbot_faqs"
def setup_chroma_vector_store(
all_faqs: Dict[str, List[Dict[str, str]]],
persist_directory: str,
collection_name: str,
embedding_model: OpenAIEmbeddings,
) -> Chroma:
"""Creates or loads a Chroma vector store with FAQ data and metadata."""
documents = []
print("\nPreparing documents for vector store...")
for department, faqs in all_faqs.items():
for faq in faqs:
# Combine Q&A for better contextual embedding, or just embed answers
# content = f"Question: {faq['question']}\nAnswer: {faq['answer']}"
content = faq['answer'] # Often embedding just the answer is effective for FAQ retrieval
doc = Document(
page_content=content,
metadata={
"department": department,
"question": faq['question'] # Keep question in metadata for potential display
}
)
documents.append(doc)
print(f"Total documents prepared: {len(documents)}")
if not documents:
raise ValueError("No documents found to add to the vector store. Check FAQ loading.")
print(f"Initializing ChromaDB vector store (Persistence: {persist_directory})...")
vector_store = Chroma(
collection_name=collection_name,
embedding_function=embedding_model,
persist_directory=persist_directory,
)
try:
vector_store = Chroma.from_documents(
documents=documents,
embedding=embedding_model,
persist_directory=persist_directory,
collection_name=collection_name
)
print(f"Created and populated ChromaDB with {len(documents)} documents.")
vector_store.persist() # Ensure persistence after creation
print("Vector store persisted.")
except Exception as create_e:
print(f"FATAL ERROR: Could not create Chroma vector store: {create_e}")
raise create_e
print("ChromaDB setup complete.")
return vector_store
第 7 步:定義LangGraph代理元件
現在讓我們定義人工智慧代理元件,它是我們工作流程的主要組成部分。
1. 狀態定義:這是一個 python 類,包含代理執行時的當前狀態。它包含查詢、情感、部門等變數。
class AgentState(TypedDict):
context: str # Retrieved context for RAG
response: str # Final response to the user
error: str | None # To capture potential errors
class AgentState(TypedDict):
query: str
sentiment: str
department: str
context: str # Retrieved context for RAG
response: str # Final response to the user
error: str | None # To capture potential errors
class AgentState(TypedDict):
query: str
sentiment: str
department: str
context: str # Retrieved context for RAG
response: str # Final response to the user
error: str | None # To capture potential errors
2. Pydantic 模型:我們在此定義了一個 Pydantic 模型,它將確保 LLM 輸出的結構化。它包含一個情感值(有“積極”、“消極”和“中性 ”三個值)和一個部門名稱(將由 LLM 預測)。
class ClassificationResult(BaseModel):
"""Structured output for query classification."""
sentiment: str = Field(description="Sentiment of the query (positive, neutral, negative)")
department: str = Field(description=f"Most relevant department from the list: {DEPARTMENTS + [UNKNOWN_DEPARTMENT]}. Use '{UNKNOWN_DEPARTMENT}' if unsure or not applicable.")
class ClassificationResult(BaseModel):
"""Structured output for query classification."""
sentiment: str = Field(description="Sentiment of the query (positive, neutral, negative)")
department: str = Field(description=f"Most relevant department from the list: {DEPARTMENTS + [UNKNOWN_DEPARTMENT]}. Use '{UNKNOWN_DEPARTMENT}' if unsure or not applicable.")
class ClassificationResult(BaseModel):
"""Structured output for query classification."""
sentiment: str = Field(description="Sentiment of the query (positive, neutral, negative)")
department: str = Field(description=f"Most relevant department from the list: {DEPARTMENTS + [UNKNOWN_DEPARTMENT]}. Use '{UNKNOWN_DEPARTMENT}' if unsure or not applicable.")
3. 節點:以下是逐一處理各項任務的節點函式。
- Classify_query_node:它根據查詢的性質,將傳入的查詢分為情感查詢和目標部門名稱查詢。
- retrieve_context_node:它對向量資料庫執行 RAG,並根據部門名稱過濾結果。
- generate_response_node:它根據查詢和從資料庫中檢索的上下文生成最終響應。
- Human_escalation_node:如果情感是負面的或目標部門未知,它將把查詢升級到人工使用者。
- route_query:它根據查詢和分類節點的輸出確定下一步。
def classify_query_node(state: AgentState) -> Dict[str, str]:
Classifies the user query for sentiment and target department using an LLM.
print("--- Classifying Query ---")
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Use a reliable, cheaper model
# Prepare prompt for classification
prompt_template = ChatPromptTemplate.from_messages([
content=f"""You are an expert query classifier for ShopUNow, a retail company.
Analyze the user's query to determine its sentiment and the most relevant department.
The available departments are: {', '.join(DEPARTMENTS)}.
If the query doesn't clearly fit into one of these, or is ambiguous, classify the department as '{UNKNOWN_DEPARTMENT}'.
If the query expresses frustration, anger, dissatisfaction, or complains about a problem, classify sentiment as 'negative'.
If the query is asking a question, seeking information, or making a neutral statement, classify sentiment as 'neutral'.
If the query expresses satisfaction, praise, or positive feedback, classify sentiment as 'positive'.
Respond ONLY with the structured JSON output format."""
HumanMessage(content=f"User Query: {query}")
# LLM Chain with structured output
classifier_chain = prompt_template | llm.with_structured_output(ClassificationResult)
result: ClassificationResult = classifier_chain.invoke({}) # Pass empty dict as input seems required now
print(f" Classification Result: Sentiment='{result.sentiment}', Department='{result.department}'")
"sentiment": result.sentiment.lower(), # Normalize
"department": result.department
print(f" Error during classification: {e}")
"sentiment": "neutral", # Default on error
"department": UNKNOWN_DEPARTMENT,
"error": f"Classification failed: {e}"
def retrieve_context_node(state: AgentState) -> Dict[str, str]:
Retrieves relevant context from the vector store based on the query and department.
print("--- Retrieving Context ---")
department = state["department"]
if not department or department == UNKNOWN_DEPARTMENT:
print(" Skipping retrieval: Department unknown or not applicable.")
return {"context": "", "error": "Cannot retrieve context without a valid department."}
# Initialize embedding model and vector store access
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
collection_name=CHROMA_COLLECTION_NAME,
embedding_function=embedding_model,
persist_directory=CHROMA_PERSIST_DIRECTORY,
retriever = vector_store.as_retriever(
search_type="similarity",
'k': 3, # Retrieve top 3 relevant docs
'filter': {'department': department} # *** CRITICAL: Filter by department ***
retrieved_docs = retriever.invoke(query)
context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
print(f" Retrieved {len(retrieved_docs)} documents for department '{department}'.")
# print(f" Context Snippet: {context[:200]}...") # Optional: log snippet
return {"context": context, "error": None}
print(" No relevant documents found in vector store for this department.")
return {"context": "", "error": "No relevant context found."}
print(f" Error during context retrieval: {e}")
return {"context": "", "error": f"Retrieval failed: {e}"}
def generate_response_node(state: AgentState) -> Dict[str, str]:
Generates a response using RAG based on the query and retrieved context.
print("--- Generating Response (RAG) ---")
context = state["context"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Can use a more capable model for generation
print(" No context provided, generating generic response.")
# Fallback if retrieval failed but routing decided RAG path anyway
response_text = "I couldn't find specific information related to your query in our knowledge base. Could you please rephrase or provide more details?"
return {"response": response_text}
prompt_template = ChatPromptTemplate.from_messages([
content=f"""You are a helpful AI Chatbot for ShopUNow. Answer the user's query based *only* on the provided context.
Be concise and directly address the query. If the context doesn't contain the answer, state that clearly.
Do not make up information.
HumanMessage(content=f"User Query: {query}")
RAG_chain = prompt_template | llm
response = RAG_chain.invoke({})
response_text = response.content
print(f" Generated RAG Response: {response_text[:200]}...")
return {"response": response_text}
print(f" Error during response generation: {e}")
return {"response": "Sorry, I encountered an error while generating the response.", "error": f"Generation failed: {e}"}
def human_escalation_node(state: AgentState) -> Dict[str, str]:
Provides a message indicating the query will be escalated to a human.
print("--- Escalating to Human Support ---")
if state.get("sentiment") == "negative":
reason = "Due to the nature of your query,"
elif state.get("department") == UNKNOWN_DEPARTMENT:
reason = "As your query requires specific attention,"
response_text = f"{reason} I need to escalate this to our human support team. They will review your request and get back to you shortly. Thank you for your patience."
print(f" Escalation Message: {response_text}")
return {"response": response_text}
# 4. Conditional Routing Logic
def route_query(state: AgentState) -> str:
"""Determines the next step based on classification results."""
print("--- Routing Decision ---")
sentiment = state.get("sentiment", "neutral")
department = state.get("department", UNKNOWN_DEPARTMENT)
if sentiment == "negative" or department == UNKNOWN_DEPARTMENT:
print(f" Routing to: human_escalation (Sentiment: {sentiment}, Department: {department})")
return "human_escalation"
print(f" Routing to: retrieve_context (Sentiment: {sentiment}, Department: {department})")
return "retrieve_context"
# 3. Nodes
def classify_query_node(state: AgentState) -> Dict[str, str]:
"""
Classifies the user query for sentiment and target department using an LLM.
"""
print("--- Classifying Query ---")
query = state["query"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Use a reliable, cheaper model
# Prepare prompt for classification
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are an expert query classifier for ShopUNow, a retail company.
Analyze the user's query to determine its sentiment and the most relevant department.
The available departments are: {', '.join(DEPARTMENTS)}.
If the query doesn't clearly fit into one of these, or is ambiguous, classify the department as '{UNKNOWN_DEPARTMENT}'.
If the query expresses frustration, anger, dissatisfaction, or complains about a problem, classify sentiment as 'negative'.
If the query is asking a question, seeking information, or making a neutral statement, classify sentiment as 'neutral'.
If the query expresses satisfaction, praise, or positive feedback, classify sentiment as 'positive'.
Respond ONLY with the structured JSON output format."""
),
HumanMessage(content=f"User Query: {query}")
])
# LLM Chain with structured output
classifier_chain = prompt_template | llm.with_structured_output(ClassificationResult)
try:
result: ClassificationResult = classifier_chain.invoke({}) # Pass empty dict as input seems required now
print(f" Classification Result: Sentiment='{result.sentiment}', Department='{result.department}'")
return {
"sentiment": result.sentiment.lower(), # Normalize
"department": result.department
}
except Exception as e:
print(f" Error during classification: {e}")
return {
"sentiment": "neutral", # Default on error
"department": UNKNOWN_DEPARTMENT,
"error": f"Classification failed: {e}"
}
def retrieve_context_node(state: AgentState) -> Dict[str, str]:
"""
Retrieves relevant context from the vector store based on the query and department.
"""
print("--- Retrieving Context ---")
query = state["query"]
department = state["department"]
if not department or department == UNKNOWN_DEPARTMENT:
print(" Skipping retrieval: Department unknown or not applicable.")
return {"context": "", "error": "Cannot retrieve context without a valid department."}
# Initialize embedding model and vector store access
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = Chroma(
collection_name=CHROMA_COLLECTION_NAME,
embedding_function=embedding_model,
persist_directory=CHROMA_PERSIST_DIRECTORY,
)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={
'k': 3, # Retrieve top 3 relevant docs
'filter': {'department': department} # *** CRITICAL: Filter by department ***
}
)
try:
retrieved_docs = retriever.invoke(query)
if retrieved_docs:
context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
print(f" Retrieved {len(retrieved_docs)} documents for department '{department}'.")
# print(f" Context Snippet: {context[:200]}...") # Optional: log snippet
return {"context": context, "error": None}
else:
print(" No relevant documents found in vector store for this department.")
return {"context": "", "error": "No relevant context found."}
except Exception as e:
print(f" Error during context retrieval: {e}")
return {"context": "", "error": f"Retrieval failed: {e}"}
def generate_response_node(state: AgentState) -> Dict[str, str]:
"""
Generates a response using RAG based on the query and retrieved context.
"""
print("--- Generating Response (RAG) ---")
query = state["query"]
context = state["context"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Can use a more capable model for generation
if not context:
print(" No context provided, generating generic response.")
# Fallback if retrieval failed but routing decided RAG path anyway
response_text = "I couldn't find specific information related to your query in our knowledge base. Could you please rephrase or provide more details?"
return {"response": response_text}
# RAG Prompt
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are a helpful AI Chatbot for ShopUNow. Answer the user's query based *only* on the provided context.
Be concise and directly address the query. If the context doesn't contain the answer, state that clearly.
Do not make up information.
Context:
---
{context}
---"""
),
HumanMessage(content=f"User Query: {query}")
])
RAG_chain = prompt_template | llm
try:
response = RAG_chain.invoke({})
response_text = response.content
print(f" Generated RAG Response: {response_text[:200]}...")
return {"response": response_text}
except Exception as e:
print(f" Error during response generation: {e}")
return {"response": "Sorry, I encountered an error while generating the response.", "error": f"Generation failed: {e}"}
def human_escalation_node(state: AgentState) -> Dict[str, str]:
"""
Provides a message indicating the query will be escalated to a human.
"""
print("--- Escalating to Human Support ---")
reason = ""
if state.get("sentiment") == "negative":
reason = "Due to the nature of your query,"
elif state.get("department") == UNKNOWN_DEPARTMENT:
reason = "As your query requires specific attention,"
response_text = f"{reason} I need to escalate this to our human support team. They will review your request and get back to you shortly. Thank you for your patience."
print(f" Escalation Message: {response_text}")
return {"response": response_text}
# 4. Conditional Routing Logic
def route_query(state: AgentState) -> str:
"""Determines the next step based on classification results."""
print("--- Routing Decision ---")
sentiment = state.get("sentiment", "neutral")
department = state.get("department", UNKNOWN_DEPARTMENT)
if sentiment == "negative" or department == UNKNOWN_DEPARTMENT:
print(f" Routing to: human_escalation (Sentiment: {sentiment}, Department: {department})")
return "human_escalation"
else:
print(f" Routing to: retrieve_context (Sentiment: {sentiment}, Department: {department})")
return "retrieve_context"
# 3. Nodes
def classify_query_node(state: AgentState) -> Dict[str, str]:
"""
Classifies the user query for sentiment and target department using an LLM.
"""
print("--- Classifying Query ---")
query = state["query"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Use a reliable, cheaper model
# Prepare prompt for classification
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are an expert query classifier for ShopUNow, a retail company.
Analyze the user's query to determine its sentiment and the most relevant department.
The available departments are: {', '.join(DEPARTMENTS)}.
If the query doesn't clearly fit into one of these, or is ambiguous, classify the department as '{UNKNOWN_DEPARTMENT}'.
If the query expresses frustration, anger, dissatisfaction, or complains about a problem, classify sentiment as 'negative'.
If the query is asking a question, seeking information, or making a neutral statement, classify sentiment as 'neutral'.
If the query expresses satisfaction, praise, or positive feedback, classify sentiment as 'positive'.
Respond ONLY with the structured JSON output format."""
),
HumanMessage(content=f"User Query: {query}")
])
# LLM Chain with structured output
classifier_chain = prompt_template | llm.with_structured_output(ClassificationResult)
try:
result: ClassificationResult = classifier_chain.invoke({}) # Pass empty dict as input seems required now
print(f" Classification Result: Sentiment='{result.sentiment}', Department='{result.department}'")
return {
"sentiment": result.sentiment.lower(), # Normalize
"department": result.department
}
except Exception as e:
print(f" Error during classification: {e}")
return {
"sentiment": "neutral", # Default on error
"department": UNKNOWN_DEPARTMENT,
"error": f"Classification failed: {e}"
}
def retrieve_context_node(state: AgentState) -> Dict[str, str]:
"""
Retrieves relevant context from the vector store based on the query and department.
"""
print("--- Retrieving Context ---")
query = state["query"]
department = state["department"]
if not department or department == UNKNOWN_DEPARTMENT:
print(" Skipping retrieval: Department unknown or not applicable.")
return {"context": "", "error": "Cannot retrieve context without a valid department."}
# Initialize embedding model and vector store access
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = Chroma(
collection_name=CHROMA_COLLECTION_NAME,
embedding_function=embedding_model,
persist_directory=CHROMA_PERSIST_DIRECTORY,
)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={
'k': 3, # Retrieve top 3 relevant docs
'filter': {'department': department} # *** CRITICAL: Filter by department ***
}
)
try:
retrieved_docs = retriever.invoke(query)
if retrieved_docs:
context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
print(f" Retrieved {len(retrieved_docs)} documents for department '{department}'.")
# print(f" Context Snippet: {context[:200]}...") # Optional: log snippet
return {"context": context, "error": None}
else:
print(" No relevant documents found in vector store for this department.")
return {"context": "", "error": "No relevant context found."}
except Exception as e:
print(f" Error during context retrieval: {e}")
return {"context": "", "error": f"Retrieval failed: {e}"}
def generate_response_node(state: AgentState) -> Dict[str, str]:
"""
Generates a response using RAG based on the query and retrieved context.
"""
print("--- Generating Response (RAG) ---")
query = state["query"]
context = state["context"]
llm = ChatOpenAI(model="o4-mini", api_key=OPENAI_API_KEY) # Can use a more capable model for generation
if not context:
print(" No context provided, generating generic response.")
# Fallback if retrieval failed but routing decided RAG path anyway
response_text = "I couldn't find specific information related to your query in our knowledge base. Could you please rephrase or provide more details?"
return {"response": response_text}
# RAG Prompt
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=f"""You are a helpful AI Chatbot for ShopUNow. Answer the user's query based *only* on the provided context.
Be concise and directly address the query. If the context doesn't contain the answer, state that clearly.
Do not make up information.
Context:
---
{context}
---"""
),
HumanMessage(content=f"User Query: {query}")
])
RAG_chain = prompt_template | llm
try:
response = RAG_chain.invoke({})
response_text = response.content
print(f" Generated RAG Response: {response_text[:200]}...")
return {"response": response_text}
except Exception as e:
print(f" Error during response generation: {e}")
return {"response": "Sorry, I encountered an error while generating the response.", "error": f"Generation failed: {e}"}
def human_escalation_node(state: AgentState) -> Dict[str, str]:
"""
Provides a message indicating the query will be escalated to a human.
"""
print("--- Escalating to Human Support ---")
reason = ""
if state.get("sentiment") == "negative":
reason = "Due to the nature of your query,"
elif state.get("department") == UNKNOWN_DEPARTMENT:
reason = "As your query requires specific attention,"
response_text = f"{reason} I need to escalate this to our human support team. They will review your request and get back to you shortly. Thank you for your patience."
print(f" Escalation Message: {response_text}")
return {"response": response_text}
# 4. Conditional Routing Logic
def route_query(state: AgentState) -> str:
"""Determines the next step based on classification results."""
print("--- Routing Decision ---")
sentiment = state.get("sentiment", "neutral")
department = state.get("department", UNKNOWN_DEPARTMENT)
if sentiment == "negative" or department == UNKNOWN_DEPARTMENT:
print(f" Routing to: human_escalation (Sentiment: {sentiment}, Department: {department})")
return "human_escalation"
else:
print(f" Routing to: retrieve_context (Sentiment: {sentiment}, Department: {department})")
return "retrieve_context"
第 8 步:定義圖形函式
讓我們為圖形建立函式,併為圖形分配節點和邊。
# --- Graph Definition ---
def build_agent_graph(vector_store: Chroma) -> StateGraph:
"""Builds the LangGraph agent."""
graph = StateGraph(AgentState)
graph.add_node("classify_query", classify_query_node)
graph.add_node("retrieve_context", retrieve_context_node)
graph.add_node("generate_response", generate_response_node)
graph.add_node("human_escalation", human_escalation_node)
graph.set_entry_point("classify_query")
graph.add_conditional_edges(
"classify_query", # Source node
route_query, # Function to determine the route
{ # Mapping: output of route_query -> destination node
"retrieve_context": "retrieve_context",
"human_escalation": "human_escalation"
graph.add_edge("retrieve_context", "generate_response")
graph.add_edge("generate_response", END)
graph.add_edge("human_escalation", END)
# memory = SqliteSaver.from_conn_string(":memory:") # Example for in-memory persistence
app = graph.compile() # checkpointer=memory optional for stateful conversations
print("\nAgent graph compiled successfully.")
# --- Graph Definition ---
def build_agent_graph(vector_store: Chroma) -> StateGraph:
"""Builds the LangGraph agent."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("classify_query", classify_query_node)
graph.add_node("retrieve_context", retrieve_context_node)
graph.add_node("generate_response", generate_response_node)
graph.add_node("human_escalation", human_escalation_node)
# Set entry point
graph.set_entry_point("classify_query")
# Add edges
graph.add_conditional_edges(
"classify_query", # Source node
route_query, # Function to determine the route
{ # Mapping: output of route_query -> destination node
"retrieve_context": "retrieve_context",
"human_escalation": "human_escalation"
}
)
graph.add_edge("retrieve_context", "generate_response")
graph.add_edge("generate_response", END)
graph.add_edge("human_escalation", END)
# Compile the graph
# memory = SqliteSaver.from_conn_string(":memory:") # Example for in-memory persistence
app = graph.compile() # checkpointer=memory optional for stateful conversations
print("\nAgent graph compiled successfully.")
return app
# --- Graph Definition ---
def build_agent_graph(vector_store: Chroma) -> StateGraph:
"""Builds the LangGraph agent."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("classify_query", classify_query_node)
graph.add_node("retrieve_context", retrieve_context_node)
graph.add_node("generate_response", generate_response_node)
graph.add_node("human_escalation", human_escalation_node)
# Set entry point
graph.set_entry_point("classify_query")
# Add edges
graph.add_conditional_edges(
"classify_query", # Source node
route_query, # Function to determine the route
{ # Mapping: output of route_query -> destination node
"retrieve_context": "retrieve_context",
"human_escalation": "human_escalation"
}
)
graph.add_edge("retrieve_context", "generate_response")
graph.add_edge("generate_response", END)
graph.add_edge("human_escalation", END)
# Compile the graph
# memory = SqliteSaver.from_conn_string(":memory:") # Example for in-memory persistence
app = graph.compile() # checkpointer=memory optional for stateful conversations
print("\nAgent graph compiled successfully.")
return app
第 9 步:啟動代理執行
現在,我們將初始化代理並開始執行工作流程。
1. 首先載入常見問題。
faqs_data = load_faqs(FAQ_FILES)
print("ERROR: No FAQ data loaded. Exiting.")
# 1. Load FAQs
faqs_data = load_faqs(FAQ_FILES)
if not faqs_data:
print("ERROR: No FAQ data loaded. Exiting.")
exit()
# 1. Load FAQs
faqs_data = load_faqs(FAQ_FILES)
if not faqs_data:
print("ERROR: No FAQ data loaded. Exiting.")
exit()
輸出:

2. 設定嵌入模型。在此,我們將設定 OpenAI 嵌入模型,以加快檢索速度。
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = setup_chroma_vector_store(
CHROMA_PERSIST_DIRECTORY,
# 2. Setup Vector Store
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = setup_chroma_vector_store(
faqs_data,
CHROMA_PERSIST_DIRECTORY,
CHROMA_COLLECTION_NAME,
embedding_model
)
# 2. Setup Vector Store
embedding_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
vector_store = setup_chroma_vector_store(
faqs_data,
CHROMA_PERSIST_DIRECTORY,
CHROMA_COLLECTION_NAME,
embedding_model
)
輸出:

推薦閱讀:如何為您的 RAG 模型選擇正確的嵌入方式?
3. 現在,使用預定義函式構建代理,使用美人魚圖將代理流程視覺化。
# 3. Build the Agent Graph
agent_app = build_agent_graph(vector_store)
from IPython.display import display, Image, Markdown
display(Image(agent_app.get_graph().draw_mermaid_png()))
# 3. Build the Agent Graph
agent_app = build_agent_graph(vector_store)
from IPython.display import display, Image, Markdown
display(Image(agent_app.get_graph().draw_mermaid_png()))
# 3. Build the Agent Graph
agent_app = build_agent_graph(vector_store)
from IPython.display import display, Image, Markdown
display(Image(agent_app.get_graph().draw_mermaid_png()))
輸出:

第 10 步:測試代理
我們已經到達工作流程的最後一部分。到目前為止,我們已經構建了幾個節點和函式。現在是測試我們的代理並檢視輸出的時候了。
1. 首先,讓我們定義測試查詢。
"How do I track my order?",
"What is the return policy?",
"Tell me about the 'Urban Explorer' jacket materials.",
# Test the Agent
test_queries = [
"How do I track my order?",
"What is the return policy?",
"Tell me about the 'Urban Explorer' jacket materials.",
]
# Test the Agent
test_queries = [
"How do I track my order?",
"What is the return policy?",
"Tell me about the 'Urban Explorer' jacket materials.",
]
2. 現在我們來測試一下代理。
print("\n--- Testing Agent ---")
for query in test_queries:
print(f"\nInput Query: {query}")
# Define the input for the graph invocation
inputs = {"query": query}
# The config argument is optional but useful for stateful execution if needed
# config = {"configurable": {"thread_id": "user_123"}} # Example config
final_state = agent_app.invoke(inputs) #, config=config)
print(f"Final State Department: {final_state.get('department')}")
print(f"Final State Sentiment: {final_state.get('sentiment')}")
print(f"Agent Response: {final_state.get('response')}")
if final_state.get('error'):
print(f"Error encountered: {final_state.get('error')}")
# print(f"ERROR running agent graph for query '{query}': {e}")
# traceback.print_exc() # Print detailed traceback for debugging
print("\n--- Agent Testing Complete ---")
print("\n--- Testing Agent ---")
for query in test_queries:
print(f"\nInput Query: {query}")
# Define the input for the graph invocation
inputs = {"query": query}
# try:
# Invoke the graph
# The config argument is optional but useful for stateful execution if needed
# config = {"configurable": {"thread_id": "user_123"}} # Example config
final_state = agent_app.invoke(inputs) #, config=config)
print(f"Final State Department: {final_state.get('department')}")
print(f"Final State Sentiment: {final_state.get('sentiment')}")
print(f"Agent Response: {final_state.get('response')}")
if final_state.get('error'):
print(f"Error encountered: {final_state.get('error')}")
# except Exception as e:
# print(f"ERROR running agent graph for query '{query}': {e}")
# import traceback
# traceback.print_exc() # Print detailed traceback for debugging
print("\n--- Agent Testing Complete ---")
print("\n--- Testing Agent ---")
for query in test_queries:
print(f"\nInput Query: {query}")
# Define the input for the graph invocation
inputs = {"query": query}
# try:
# Invoke the graph
# The config argument is optional but useful for stateful execution if needed
# config = {"configurable": {"thread_id": "user_123"}} # Example config
final_state = agent_app.invoke(inputs) #, config=config)
print(f"Final State Department: {final_state.get('department')}")
print(f"Final State Sentiment: {final_state.get('sentiment')}")
print(f"Agent Response: {final_state.get('response')}")
if final_state.get('error'):
print(f"Error encountered: {final_state.get('error')}")
# except Exception as e:
# print(f"ERROR running agent graph for query '{query}': {e}")
# import traceback
# traceback.print_exc() # Print detailed traceback for debugging
print("\n--- Agent Testing Complete ---")
輸出:

我們可以從輸出結果中看到,我們的代理表現良好。首先,它對查詢進行了分類,然後將決定傳送到檢索節點或人工節點。然後,檢索部分開始,它成功地從向量資料庫中檢索出上下文。最後,根據需要生成響應。這樣,我們就製作出了智慧常見問題聊天機器人。
你可以在這裡訪問包含所有程式碼的 Colab Notebook。
小結
如果您已經學習到這裡,就意味著您已經學會了如何使用代理 RAG 和 LangGraph 構建智慧常見問題聊天機器人。在這裡,我們看到構建一個可以推理和決策的智慧代理並不難。我們構建的代理聊天機器人成本低、速度快,而且能夠完全理解問題或輸入查詢的上下文。我們在這裡使用的架構是完全可定製的,這意味著人們可以根據自己的特定用例編輯代理的任何節點。有了代理 RAG、LangGraph 和 ChromaDB,製作代理從未如此簡單。我相信,我們在本指南中介紹的內容已經讓你掌握了使用這些工具構建更復雜系統的基礎知識。
評論留言