边
边定义了逻辑的路由方式以及图如何决定停止。
一个节点可以有多条出边。如果一个节点有多个出边,那么所有这些目标节点将作为下一个超级步的一部分并行执行。
普通边
如果一直想从节点A到节点B,可以直接使用 add_edge 方法
graph.add_edge("node_a", "node_b")
条件边
如果想有选择地路由到一条或多个节点上,可以使用 add_conditional_edges,此方法接受一个节点的名称和一个在该节点执行后调用的“路由函数”
graph.add_conditional_edges("node_a", routing_function)
与节点类似,routing_function 接受图的当前 state 并返回一个值。
默认情况下,routing_function 的返回值是要将状态发送到的下一个节点的名称(或节点列表)。所有这些节点将作为下一个超级步的一部分并行运行。
def routing_function(state: State):
if xx:
return ["node_b", "node_c"]
return "node_d"
可以选择性地提供一个字典,将 routing_function 的输出映射到下一个节点的名称。
def routing_function(state: State):
if xx:
return True
else:
return False
graph.add_conditional_edges(
"node_a",
routing_function,
{True: "node_b", False: "node_c"}
)
并行边
当一个节点有多条出边时,这些目标节点会作为下一个超级步的一部分并行执行。并行执行是构建高效多智能体系统的核心技术,能让多个任务同时执行,大幅提升性能。
Fan-out(扇出)
Fan-out 是指一个节点的输出分发到多个并行节点的模式。可以通过多次调用 add_edge 从同一个节点创建多条边,实现扇出并行执行。
# 从 START 节点扇出到三个并行节点
graph.add_edge(START, "search_wikipedia")
graph.add_edge(START, "search_web")
graph.add_edge(START, "search_docs")
执行后,search_wikipedia、search_web 和 search_docs 会在同一个 step 中并行执行。
Fan-in(扇入)
Fan-in 是指多个并行节点的输出汇聚到一个节点的模式。所有并行节点执行完成后,会自动同步并汇聚到目标节点。
# Fan-out:从 START 扇出到三个搜索节点
graph.add_edge(START, "search_wikipedia")
graph.add_edge(START, "search_web")
graph.add_edge(START, "search_docs")
# Fan-in:三个搜索节点汇聚到生成答案节点
graph.add_edge("search_wikipedia", "generate_answer")
graph.add_edge("search_web", "generate_answer")
graph.add_edge("search_docs", "generate_answer")
generate_answer 节点会等待所有三个搜索节点完成后才会执行。
条件边返回节点列表
在条件边中,路由函数可以返回一个节点列表,这些节点也会并行执行。
def routing_function(state: State):
if condition:
return ["node_b", "node_c", "node_d"] # 返回多个节点,并行执行
return "node_e" # 返回单个节点
graph.add_conditional_edges("node_a", routing_function)
状态合并(Reducer)
并行执行的节点都会更新同一个 State,状态会根据定义的**归约器函数(Reducer)**进行合并。使用 Annotated 和 operator.add 可以自动合并列表类型的状态。
from typing_extensions import Annotated, TypedDict
from operator import add
class State(TypedDict):
question: str
context: Annotated[list[str], add] # 使用 add 归约器自动合并列表
answer: str
def search_wikipedia(state: State):
return {"context": ["Wikipedia 搜索结果 1", "Wikipedia 搜索结果 2"]}
def search_web(state: State):
return {"context": ["网络搜索结果 1", "网络搜索结果 2"]}
def search_docs(state: State):
return {"context": ["文档搜索结果 1"]}
def generate_answer(state: State):
# 此时 state["context"] 已经包含了所有并行节点的结果
all_context = "\n".join(state["context"])
return {"answer": f"基于以下上下文生成答案:\n{all_context}"}
所有并行节点返回的 context 列表会通过 operator.add 自动合并为一个列表。
完整示例
from typing_extensions import Annotated, TypedDict
from operator import add
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
question: str
context: Annotated[list[str], add]
answer: str
def search_wikipedia(state: State):
return {"context": [f"Wikipedia: {state['question']}"]}
def search_web(state: State):
return {"context": [f"Web: {state['question']}"]}
def search_docs(state: State):
return {"context": [f"Docs: {state['question']}"]}
def generate_answer(state: State):
return {"answer": f"答案基于 {len(state['context'])} 个来源"}
builder = StateGraph(State)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("search_web", search_web)
builder.add_node("search_docs", search_docs)
builder.add_node("generate_answer", generate_answer)
# Fan-out:并行执行三个搜索
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge(START, "search_docs")
# Fan-in:汇聚到生成答案节点
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("search_docs", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()
核心要点
- 并行执行:多个节点在同一个 step 中同时运行
- 自动同步:后续节点会等待所有并行节点完成后再执行
- 状态合并:使用
Annotated[类型, reducer]定义如何合并并行节点的状态更新 - Fan-out/Fan-in:扇出分发任务,扇入汇聚结果,是并行处理的经典模式