跳转到主要内容
本教程将带您了解如何使用 xParse + LangGraph 构建 Agentic RAG,智能重写问题、检索与回答,实现更准确的企业知识问答。

什么是 Agentic RAG?

Agentic RAG(Retrieval-Augmented Generation)是一种结合信息检索和生成式 AI 的技术。与传统的 RAG 不同,Agentic RAG 通过智能决策机制,能够:
  1. 智能判断:判断问题是否需要检索,还是可以直接回答
  2. 问题重写:当检索结果不相关时,自动重写问题以获取更好的结果
  3. 相关性评估:评估检索到的文档片段是否与问题相关
  4. 迭代优化:通过多轮检索和重写,逐步优化答案质量
Agentic RAG 的核心流程包括:
  1. 文档处理:将非结构化文档转换为向量表示
  2. 向量存储:将向量数据存储到向量数据库
  3. 智能检索:根据用户问题智能检索相关文档片段
  4. 问题重写:当检索结果不相关时,重写问题再次检索
  5. 生成回答:基于检索到的相关上下文生成高质量答案

Agentic RAG 工作流程

用户问题:"如何配置数据库连接池?"

[LangGraph 工作流]  ←  [xParse]

[判断节点] 是否需要检索?
    ├─ 是 → [检索节点] 向量检索
    │         ↓ 
    │   [评估节点] 检索结果是否相关?
    │         ├─ 相关 → [生成节点] 生成答案
    │         └─ 不相关 → [重写节点] 重写问题
    │                           ↓
    │                    [检索节点] 再次检索
    │                           ↓
    │                    [评估节点] 再次评估
    │                           ↓
    │                    [生成节点] 生成答案
    └─ 否 → [生成节点] 直接生成答案

环境准备

首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install "xparse-client>=0.2.10" langchain langchain-community langchain-core \
            langgraph langchain-milvus python-dotenv
创建 .env 文件存储配置:
# .env
XTI_APP_ID=your-app-id
XTI_SECRET_CODE=your-secret-code
MILVUS_DB_PATH=./agentic_rag_vectors.db
DASHSCOPE_API_KEY=your-dashscope-key
提示:XTI_APP_IDXTI_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。
下面我们将分步骤构建 Agentic RAG 系统。首先导入必要的库:
import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.chat_models import ChatTongyi
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, BaseMessage

# 加载环境变量
load_dotenv()

Step 1: 初始化 xParse Pipeline

首先,我们需要使用 xParse Pipeline 处理文档,将文档解析、分块、向量化并存入 Milvus 向量数据库。这是知识库构建的基础步骤。 配置 Pipeline:
PIPELINE_CONFIG = {
    "source": {
        "type": "local",
        "directory": "./knowledge_base",  # 文档存放目录
        "pattern": ["*.pdf", "*.docx", "*.txt"]
    },
    "destination": {
        "type": "milvus",
        "db_path": os.getenv("MILVUS_DB_PATH"),
        "collection_name": "agentic_rag_docs",
        "dimension": 1024
    },
    "api_base_url": "https://api.textin.com/api/xparse",
    "api_headers": {
        "x-ti-app-id": os.getenv("XTI_APP_ID"),
        "x-ti-secret-code": os.getenv("XTI_SECRET_CODE")
    },
    "stages": [
        {
            "type": "parse",
            "config": {"provider": "textin"}
        },
        {
            "type": "chunk",
            "config": {
                "strategy": "by_title",
                "new_after_n_chars": 480,
                "max_characters": 1500,
                "overlap": 80
            }
        },
        {
            "type": "embed",
            "config": {
                "provider": "qwen",
                "model_name": "text-embedding-v4"
            }
        }
    ]
}

# 初始化 Pipeline(全局复用)
pipeline = create_pipeline_from_config(PIPELINE_CONFIG)

def build_knowledge_base():
    """构建知识库"""
    print("开始构建知识库...")
    pipeline.run()
    print("知识库构建完成!")

Step 2: 初始化向量数据库和大模型

接下来,我们需要初始化向量数据库和大模型。重要:向量数据库必须使用与 Pipeline 相同的 embedding 模型,以确保语义空间一致。
# 使用与 Pipeline 相同的 embedding 配置
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="agentic_rag_docs",
    connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
    vector_field="embeddings",
    primary_field="element_id",
    text_field="text",
    enable_dynamic_field=True
)

# 初始化大模型
llm = ChatTongyi(
    model="qwen-max",
    top_p=0.8,
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)

Step 3: 定义状态结构

LangGraph 使用状态来管理工作流中的数据流。我们需要定义一个 GraphState 来存储工作流中的各种信息:
class GraphState(TypedDict):
    """工作流状态"""
    messages: Annotated[list[BaseMessage], add_messages]
    question: str  # 原始问题
    rewritten_question: str  # 重写后的问题
    documents: list  # 检索到的文档
    generation: str  # 生成的答案
    next: str  # 下一步操作
    retrieval_count: int  # 检索次数
状态字段说明:
  • question:用户提出的原始问题
  • rewritten_question:重写后的问题(用于优化检索)
  • documents:检索到的相关文档片段
  • generation:最终生成的答案
  • next:指示下一步应该执行哪个节点
  • retrieval_count:检索次数(用于防止无限循环)

Step 4: 定义节点函数

工作流由多个节点组成,每个节点负责特定的任务。让我们逐个实现这些节点:

4.1 判断是否需要检索

should_retrieve 节点使用 LLM 判断问题是否需要从知识库检索信息:
def should_retrieve(state: GraphState) -> GraphState:
    """判断是否需要检索"""
    question = state["question"]
    
    # 使用 LLM 判断问题是否需要检索知识库
    prompt = f"""判断以下问题是否需要从知识库中检索信息才能回答。

            问题:{question}

            如果问题需要特定的文档、数据或知识库信息才能回答,返回 "retrieve"。
            如果问题是一般性对话、问候或不需要特定信息的简单问题,返回 "generate"。

            只返回 "retrieve" 或 "generate",不要返回其他内容。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    decision = response.content.strip().lower()
    
    next_step = "retrieve" if "retrieve" in decision else "generate"
    
    return {
        **state,
        "next": next_step
    }

4.2 检索相关文档

retrieve 节点使用向量检索在知识库中查找相关内容:
def retrieve(state: GraphState) -> GraphState:
    """检索相关文档"""
    question = state.get("rewritten_question") or state["question"]
    retrieval_count = state.get("retrieval_count", 0)
    
    # 向量检索
    docs = vector_store.similarity_search(question, k=5)
    
    # 格式化检索结果
    documents = []
    for doc in docs:
        documents.append({
            "content": doc.page_content,
            "metadata": doc.metadata
        })
    
    return {
        **state,
        "documents": documents,
        "retrieval_count": retrieval_count + 1  # 增加检索计数
    }
注意:这里优先使用 rewritten_question(如果存在),否则使用原始问题。每次检索后,retrieval_count 会增加 1。

4.3 评估检索结果的相关性

grade_documents 节点评估检索到的文档是否与问题相关:
def grade_documents(state: GraphState) -> GraphState:
    """评估检索结果的相关性"""
    question = state.get("rewritten_question") or state["question"]
    documents = state["documents"]
    retrieval_count = state.get("retrieval_count", 0)
    
    # 限制最多重写 2 次(即最多检索 3 次)
    if retrieval_count >= 2:
        return {
            **state,
            "next": "generate"
        }
    
    if not documents:
        return {
            **state,
            "next": "rewrite"
        }
    
    # 构建评估提示
    docs_text = "\n\n".join([
        f"文档 {i+1}:\n{doc['content'][:300]}..."
        for i, doc in enumerate(documents[:3])
    ])
    
    prompt = f"""评估以下检索到的文档是否与问题相关。

            问题:{question}

            检索到的文档:
            {docs_text}

            如果文档与问题高度相关,能够回答问题,返回 "generate"。
            如果文档与问题不相关或相关性很低,返回 "rewrite"。

            只返回 "generate" 或 "rewrite",不要返回其他内容。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    decision = response.content.strip().lower()
    
    next_step = "generate" if "generate" in decision else "rewrite"
    
    return {
        **state,
        "next": next_step
    }
如果文档不相关,将触发问题重写。为了避免无限循环,当检索次数达到 2 次时,即使文档不相关也会强制生成答案。

4.4 重写问题

rewrite_question 节点基于检索结果或原始问题,生成更优化的查询:
def rewrite_question(state: GraphState) -> GraphState:
    """重写问题"""
    question = state["question"]
    documents = state.get("documents", [])
    
    if documents:
        # 基于检索结果重写问题
        docs_summary = "\n".join([
            f"- {doc['content'][:200]}..."
            for doc in documents[:2]
        ])
        prompt = f"""原始问题:{question}

                当前检索到的文档摘要:
                {docs_summary}

                这些文档与问题不够相关。请重写问题,使其能够更好地匹配知识库中的内容。
                重写时应该:
                1. 保持问题的核心意图
                2. 使用更具体的关键词
                3. 考虑知识库可能使用的术语

                只返回重写后的问题,不要返回其他内容。"""
    else:
        # 首次重写
        prompt = f"""原始问题:{question}

                请重写这个问题,使其更具体、更清晰,便于在知识库中检索相关信息。
                重写时应该:
                1. 保持问题的核心意图
                2. 使用更具体的关键词
                3. 考虑知识库可能使用的术语

                只返回重写后的问题,不要返回其他内容。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    rewritten = response.content.strip()
    
    return {
        **state,
        "rewritten_question": rewritten
    }

4.5 生成答案

generate 节点基于检索结果或直接生成答案:
def generate(state: GraphState) -> GraphState:
    """生成答案"""
    question = state["question"]
    documents = state.get("documents", [])
    
    if documents:
        # 基于检索结果生成答案
        context = "\n\n".join([
            f"文档来源:{doc['metadata'].get('filename', '未知')}\n内容:{doc['content']}"
            for i, doc in enumerate(documents)
        ])
        
        prompt = f"""基于以下文档内容回答用户问题。

                文档内容:
                {context}

                用户问题:{question}

                请基于文档内容回答问题。如果文档中没有相关信息,请说明。
                在回答中引用具体的文档来源。"""
    else:
        # 直接生成答案(不需要检索的情况)
        prompt = f"""回答以下问题:{question}"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    return {
        **state,
        "generation": response.content
    }

Step 5: 构建 LangGraph 工作流

现在我们将所有节点组合成一个完整的工作流:
# 创建状态图
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("should_retrieve", should_retrieve)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("rewrite_question", rewrite_question)
workflow.add_node("generate", generate)

# 设置入口点
workflow.set_entry_point("should_retrieve")

# 添加条件边
workflow.add_conditional_edges(
    "should_retrieve",
    lambda state: state.get("next", "generate"),
    {
        "retrieve": "retrieve",
        "generate": "generate"
    }
)

workflow.add_edge("retrieve", "grade_documents")

workflow.add_conditional_edges(
    "grade_documents",
    lambda state: state.get("next", "generate"),
    {
        "generate": "generate",
        "rewrite": "rewrite_question"
    }
)

workflow.add_edge("rewrite_question", "retrieve")
workflow.add_edge("generate", END)

# 编译工作流
app = workflow.compile()
工作流逻辑:
  1. should_retrieve 开始
  2. 如果需要检索,进入 retrievegrade_documents
  3. 如果文档相关,进入 generate;如果不相关,进入 rewrite_questionretrieve(循环)
  4. 如果不需要检索,直接进入 generate

Step 6: 使用示例

创建一个便捷的提问函数:
def ask_question(question: str) -> str:
    """提问并获取答案"""
    initial_state = {
        "messages": [HumanMessage(content=question)],
        "question": question,
        "rewritten_question": "",
        "documents": [],
        "generation": "",
        "next": "",
        "retrieval_count": 0  # 初始化检索计数
    }
    
    # 运行工作流
    result = app.invoke(initial_state)
    
    return result["generation"]
使用示例:
if __name__ == "__main__":
    # 首次运行需要构建知识库
    build_knowledge_base()
    
    # 提问示例
    questions = [
        "如何配置数据库连接池?",
        "产品的定价策略是什么?",
        "你好"  # 简单问候,不需要检索
    ]
    
    for question in questions:
        print("=" * 60)
        print(f"问题:{question}")
        print("=" * 60)
        answer = ask_question(question)
        print(f"回答:{answer}\n")

完整代码示例

下面是一个完整的、可以直接运行的示例:
import os
from typing import TypedDict, Annotated, Literal
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.chat_models import ChatTongyi
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage

# 加载环境变量
load_dotenv()

# ========== Step 1: 初始化 xParse Pipeline ==========

PIPELINE_CONFIG = {
    "source": {
        "type": "local",
        "directory": "./knowledge_base",  # 文档存放目录
        "pattern": ["*.pdf", "*.docx", "*.txt"]
    },
    "destination": {
        "type": "milvus",
        "db_path": os.getenv("MILVUS_DB_PATH"),
        "collection_name": "agentic_rag_docs",
        "dimension": 1024
    },
    "api_base_url": "https://api.textin.com/api/xparse",
    "api_headers": {
        "x-ti-app-id": os.getenv("XTI_APP_ID"),
        "x-ti-secret-code": os.getenv("XTI_SECRET_CODE")
    },
    "stages": [
        {
            "type": "parse",
            "config": {"provider": "textin"}
        },
        {
            "type": "chunk",
            "config": {
                "strategy": "by_title",
                "new_after_n_chars": 480,
                "max_characters": 1500,
                "overlap": 80
            }
        },
        {
            "type": "embed",
            "config": {
                "provider": "qwen",
                "model_name": "text-embedding-v4"
            }
        }
    ]
}

# 初始化 Pipeline(全局复用)
pipeline = create_pipeline_from_config(PIPELINE_CONFIG)

def build_knowledge_base():
    """构建知识库"""
    print("开始构建知识库...")
    pipeline.run()
    print("知识库构建完成!")

# ========== Step 2: 初始化向量数据库和大模型 ==========

# 使用与 Pipeline 相同的 embedding 配置
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="agentic_rag_docs",
    connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
    vector_field="embeddings",
    primary_field="element_id",
    text_field="text",
    enable_dynamic_field=True
)

# 初始化大模型
llm = ChatTongyi(
    model="qwen-max",
    top_p=0.8,
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)

# ========== Step 3: 定义状态结构 ==========

class GraphState(TypedDict):
    """工作流状态"""
    messages: Annotated[list[BaseMessage], add_messages]
    question: str  # 原始问题
    rewritten_question: str  # 重写后的问题
    documents: list  # 检索到的文档
    generation: str  # 生成的答案
    next: str  # 下一步操作
    retrieval_count: int  # 检索次数

# ========== Step 4: 定义节点函数 ==========

def should_retrieve(state: GraphState) -> GraphState:
    """判断是否需要检索"""
    question = state["question"]
    
    # 使用 LLM 判断问题是否需要检索知识库
    prompt = f"""判断以下问题是否需要从知识库中检索信息才能回答。

            问题:{question}

            如果问题需要特定的文档、数据或知识库信息才能回答,返回 "retrieve"。
            如果问题是一般性对话、问候或不需要特定信息的简单问题,返回 "generate"。

            只返回 "retrieve" 或 "generate",不要返回其他内容。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    decision = response.content.strip().lower()
    
    next_step = "retrieve" if "retrieve" in decision else "generate"
    
    return {
        **state,
        "next": next_step
    }

def retrieve(state: GraphState) -> GraphState:
    """检索相关文档"""
    question = state.get("rewritten_question") or state["question"]
    retrieval_count = state.get("retrieval_count", 0)
    
    # 向量检索
    docs = vector_store.similarity_search(question, k=5)
    
    # 格式化检索结果
    documents = []
    for doc in docs:
        documents.append({
            "content": doc.page_content,
            "metadata": doc.metadata
        })
    
    return {
        **state,
        "documents": documents,
        "retrieval_count": retrieval_count + 1  # 增加检索计数
    }

def grade_documents(state: GraphState) -> GraphState:
    """评估检索结果的相关性"""
    question = state.get("rewritten_question") or state["question"]
    documents = state["documents"]
    retrieval_count = state.get("retrieval_count", 0)
    
    # 限制最多重写 2 次(即最多检索 3 次)
    if retrieval_count >= 2:
        return {
            **state,
            "next": "generate"
        }
    
    if not documents:
        return {
            **state,
            "next": "rewrite"
        }
    
    # 构建评估提示
    docs_text = "\n\n".join([
        f"文档 {i+1}:\n{doc['content'][:300]}..."
        for i, doc in enumerate(documents[:3])
    ])
    
    prompt = f"""评估以下检索到的文档是否与问题相关。

            问题:{question}

            检索到的文档:
            {docs_text}

            如果文档与问题高度相关,能够回答问题,返回 "generate"。
            如果文档与问题不相关或相关性很低,返回 "rewrite"。

            只返回 "generate" 或 "rewrite",不要返回其他内容。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    decision = response.content.strip().lower()
    
    next_step = "generate" if "generate" in decision else "rewrite"
    
    return {
        **state,
        "next": next_step
    }

def rewrite_question(state: GraphState) -> GraphState:
    """重写问题"""
    question = state["question"]
    documents = state.get("documents", [])
    
    # 如果已经有重写的问题,基于之前的重写再次优化
    previous_rewrite = state.get("rewritten_question", "")
    
    if documents:
        # 基于检索结果重写问题
        docs_summary = "\n".join([
            f"- {doc['content'][:200]}..."
            for doc in documents[:2]
        ])
        prompt = f"""原始问题:{question}

                当前检索到的文档摘要:
                {docs_summary}

                这些文档与问题不够相关。请重写问题,使其能够更好地匹配知识库中的内容。
                重写时应该:
                1. 保持问题的核心意图
                2. 使用更具体的关键词
                3. 考虑知识库可能使用的术语

                只返回重写后的问题,不要返回其他内容。"""
    else:
        # 首次重写
        prompt = f"""原始问题:{question}

                请重写这个问题,使其更具体、更清晰,便于在知识库中检索相关信息。
                重写时应该:
                1. 保持问题的核心意图
                2. 使用更具体的关键词
                3. 考虑知识库可能使用的术语

                只返回重写后的问题,不要返回其他内容。"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    rewritten = response.content.strip()
    
    return {
        **state,
        "rewritten_question": rewritten
    }

def generate(state: GraphState) -> GraphState:
    """生成答案"""
    question = state["question"]
    documents = state.get("documents", [])
    
    if documents:
        # 基于检索结果生成答案
        context = "\n\n".join([
            f"文档来源:{doc['metadata'].get('filename', '未知')}\n内容:{doc['content']}"
            for i, doc in enumerate(documents)
        ])
        
        prompt = f"""基于以下文档内容回答用户问题。

                文档内容:
                {context}

                用户问题:{question}

                请基于文档内容回答问题。如果文档中没有相关信息,请说明。
                在回答中引用具体的文档来源。"""
    else:
        # 直接生成答案(不需要检索的情况)
        prompt = f"""回答以下问题:{question}"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    return {
        **state,
        "generation": response.content
    }

# ========== Step 5: 构建 LangGraph 工作流 ==========

# 创建状态图
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("should_retrieve", should_retrieve)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("rewrite_question", rewrite_question)
workflow.add_node("generate", generate)

# 设置入口点
workflow.set_entry_point("should_retrieve")

# 添加条件边
workflow.add_conditional_edges(
    "should_retrieve",
    lambda state: state.get("next", "generate"),
    {
        "retrieve": "retrieve",
        "generate": "generate"
    }
)

workflow.add_edge("retrieve", "grade_documents")

workflow.add_conditional_edges(
    "grade_documents",
    lambda state: state.get("next", "generate"),
    {
        "generate": "generate",
        "rewrite": "rewrite_question"
    }
)

workflow.add_edge("rewrite_question", "retrieve")
workflow.add_edge("generate", END)

# 编译工作流
app = workflow.compile()

# ========== Step 6: 使用示例 ==========

def ask_question(question: str) -> str:
    """提问并获取答案"""
    initial_state = {
        "messages": [HumanMessage(content=question)],
        "question": question,
        "rewritten_question": "",
        "documents": [],
        "generation": "",
        "next": "",
        "retrieval_count": 0  # 初始化检索计数
    }
    
    # 运行工作流
    result = app.invoke(initial_state)
    
    return result["generation"]

if __name__ == "__main__":
    # 首次运行需要构建知识库
    build_knowledge_base()
    
    # 提问示例
    questions = [
        "如何配置数据库连接池?",
        "产品的定价策略是什么?",
        "你好"  # 简单问候,不需要检索
    ]
    
    for question in questions:
        print("=" * 60)
        print(f"问题:{question}")
        print("=" * 60)
        answer = ask_question(question)
        print(f"回答:{answer}\n")

与普通 RAG 的区别

普通 RAG

用户问题 → 向量检索 → 生成答案

Agentic RAG

用户问题 → 判断是否需要检索
         ├─ 需要 → 检索 → 评估相关性
         │         ├─ 相关 → 生成答案
         │         └─ 不相关 → 重写问题 → 重新检索 → ...
         └─ 不需要 → 直接生成答案
核心优势
  1. 智能决策:自动判断是否需要检索,避免不必要的检索
  2. 质量保证:评估检索结果的相关性,确保答案质量
  3. 迭代优化:通过问题重写,逐步优化检索结果
  4. 灵活应对:能够处理简单问题和复杂问题

实际应用场景

场景 1: 企业知识库问答

需求:员工可以通过自然语言提问,快速找到产品文档、技术文档等信息。 实现
  • 使用 xParse Pipeline 处理企业文档
  • Agentic RAG 自动判断问题类型
  • 智能检索和重写,确保找到最相关的信息

场景 2: 客服助手

需求:客服团队需要快速回答客户问题,从 FAQ 和产品手册中找到答案。 实现
  • 将 FAQ 和产品手册存入知识库
  • Agentic RAG 能够:
    • 识别简单问候,直接回答
    • 识别需要检索的问题,智能检索
    • 当检索结果不相关时,自动优化查询

场景 3: 技术文档问答

需求:开发者可以通过自然语言提问,快速找到 API 文档、使用示例等。 实现
  • 使用 by_title 分块策略,保持文档结构
  • Agentic RAG 能够理解技术术语,重写查询以匹配文档中的关键词

进阶优化

1. 添加检索次数限制

避免无限循环重写和检索:
class GraphState(TypedDict):
    # ... 其他字段
    retrieval_count: int  # 检索次数

def grade_documents(state: GraphState) -> GraphState:
    """评估检索结果的相关性"""
    # 限制最多重写 2 次
    if state.get("retrieval_count", 0) >= 2:
        return {
            **state,
            "next": "generate"
        }
    
    # ... 原有的评估逻辑

2. 添加对话历史

支持多轮对话:
class GraphState(TypedDict):
    # ... 其他字段
    chat_history: list[BaseMessage]  # 对话历史

def generate(state: GraphState) -> GraphState:
    """生成答案"""
    # 在生成时考虑对话历史
    messages = state.get("chat_history", []) + [
        HumanMessage(content=state["question"])
    ]
    # ... 生成逻辑

3. 混合检索策略

结合向量检索和关键词检索:
def retrieve(state: GraphState) -> GraphState:
    """检索相关文档"""
    question = state.get("rewritten_question") or state["question"]
    
    # 向量检索
    vector_docs = vector_store.similarity_search(question, k=3)
    
    # 关键词检索(如果 Milvus 支持)
    # keyword_docs = keyword_search(question, k=2)
    
    # 合并结果
    documents = []
    for doc in vector_docs:
        documents.append({
            "content": doc.page_content,
            "metadata": doc.metadata,
            "source": "vector"
        })
    
    return {
        **state,
        "documents": documents
    }

4. 添加引用来源

在答案中标注信息来源:
def generate(state: GraphState) -> GraphState:
    """生成答案"""
    # ... 生成逻辑
    
    # 添加引用来源
    if documents:
        sources = [
            doc['metadata'].get('filename', '未知')
            for doc in documents
        ]
        generation_with_sources = f"{generation}\n\n📚 参考来源:{', '.join(set(sources))}"
        return {
            **state,
            "generation": generation_with_sources
        }

常见问题

Q: Agentic RAG 比普通 RAG 慢吗? A: 可能会稍慢一些,因为增加了判断和评估步骤。但通过智能决策,可以避免不必要的检索,整体效率可能更高。 Q: 如何控制重写次数? A: 在 grade_documents 函数中添加检索次数限制,避免无限循环。 Q: 可以使用其他 LLM 吗? A: 可以。LangChain 支持多种 LLM,只需替换 ChatTongyi 为对应的类,如 ChatOpenAI(OpenAI)、ChatZhipuAI(智谱AI)等。 Q: 如何提高检索质量? A:
  1. 优化分块策略,确保文档块大小和重叠合适
  2. 使用高质量的 embedding 模型(如 text-embedding-v4
  3. 调整检索数量(k 值)
  4. 优化问题重写的提示词
Q: 如何处理多轮对话? A: 在状态中添加 chat_history 字段,在生成答案时考虑历史对话上下文。

总结

通过本教程,您已经学会了如何构建一个 Agentic RAG 系统。核心思路是:
  1. Pipeline 负责数据准备:使用 xParse Pipeline 处理文档并构建知识库
  2. LangGraph 负责工作流:构建智能的检索和生成流程
  3. 智能决策:通过判断、评估、重写等步骤,逐步优化答案质量
相比普通 RAG,Agentic RAG 能够:
  • 智能判断是否需要检索
  • 评估检索结果的相关性
  • 自动重写问题以优化检索
  • 生成更准确、更相关的答案
这样,您就有了一个”会思考”的 RAG 系统!

下一步