两个AI智能体第一次对话-A2A双Agent协作实战
两个AI智能体第一次对话:A2A双Agent协作实战
摘要: 上篇搭了一个Echo Agent,这次搭第二个。让"翻译Agent"和"摘要Agent"通过A2A协议真正对话,跑通双Agent委托任务的全流程。
一、上一篇的Agent只会自言自语
上篇文章你搭了一个Echo Agent,它能收消息、能回消息,但本质上是个"单人相声"——只有你自己跟它对话。
真正有意思的场景是:两个Agent之间互相发消息、互相委托任务。
比如你让翻译Agent把一段中文翻成英文,翻译Agent干完了,自动把结果丢给摘要Agent,摘要Agent提炼出要点,再返回给你。整个过程你只需要发一条消息,剩下的Agent自己协作完成。
今天我就带你搭这个场景。
二、先搞清楚:双Agent怎么通信
回顾单Agent架构
上篇的架构很简单:
你(Client)→ 发消息 → Echo Agent(Server)
一个Client,一个Server。你发一条消息,Agent回一条。完事。
双Agent架构
加一个Agent之后,架构变成这样:
你(Client)→ 发消息 → 翻译Agent(Server)
↓ A2A请求
摘要Agent(Server)
↓ A2A响应
翻译Agent(汇总结果)
↓ 最终响应
你(Client)
关键变化:翻译Agent既是Server(接收你的请求),又是Client(向摘要Agent发请求)。一个Agent可以同时扮演两个角色。
这就是A2A的核心设计——Agent之间是平等的,谁都可以当Client,谁都可以当Server。
通信流程拆解
整个流程分5步:
-
你给翻译Agent发消息:“把这段话翻成英文,再帮我写个摘要”
-
翻译Agent收到消息,先做翻译
-
翻译Agent通过A2A协议把翻译结果发给摘要Agent
-
摘要Agent处理完,把摘要返回给翻译Agent
-
翻译Agent把翻译结果+摘要一起返回给你
三、项目结构
这次我们搭两个独立的Agent,各自有独立的端口和Agent Card。
a2a-dual-agent/
├── translator/ # 翻译Agent
│ ├──
agent_card.json
│ ├──
server.py
│ └──
task_manager.py
├── summarizer/ # 摘要Agent
│ ├──
agent_card.json
│ ├──
server.py
│ └──
task_manager.py
├──
client.py
# 测试客户端
├──
pyproject.toml
└── .env # 存放Ollama配置
两个Agent跑在不同的端口:翻译Agent在10002,摘要Agent在10003。
四、搭建摘要Agent(先搭被调用的那个)
为什么先搭摘要Agent?因为翻译Agent要调用它,得先让它跑起来。
4.1 创建项目
mkdir a2a-dual-agent && cd a2a-dual-agent
uv init --package dual-agent
uv venv .venv
source .venv/bin/activate
uv add git+https://
github.com/google/A2A
#subdirectory=samples/python
uv add click httpx
4.2 摘要Agent的Task Manager
创建 summarizer/ [task_manager.py](http://task_manager.py) :
from
google_a2a.common.server.task_manager
import InMemoryTaskManager
from
google_a2a.common.types
import (
Artifact, Message, SendTaskRequest, SendTaskResponse,
Task, TaskState, TaskStatus,
)
classSummarizerTaskManager(InMemoryTaskManager):
"""摘要Agent:接收文本,返回摘要。"""
asyncdefon_send_task(
self, request: SendTaskRequest
) -> SendTaskResponse:
awaitself.upsert_task(
request.params)
task_id =
request.params.
id
input_text =
request.params.message.parts[
0].text
# 截取前200字作为"摘要"(demo用Ollama生成真实摘要)
# 这里调用Ollama
try:
import httpx
asyncwith httpx.AsyncClient(timeout=30.0) as c:
resp = await c.post(
"http://localhost:11434/api/generate",
json={
"model": "
qwen2.5:7b"
,
"prompt": (
f"请用中文对以下内容写一段50字以内的摘要:\n"
f"{input_text}"
),
"stream": False,
},
)
result = resp.json()
summary = result.get("response", "(摘要生成失败)")
except Exception:
summary = f"摘要(原文截取):{input_text[:100]}..."
task = awaitself._update_task(
task_id=task_id,
task_state=
TaskState.COMPLETED,
response_text=summary,
)
returnSendTaskResponse(id=request.id, result=task)
asyncdef_update_task(
self, task_id: str, task_state: TaskState,
response_text: str
) -> Task:
task = self.tasks[task_id]
parts = [{"type": "text", "text": response_text}]
task.status
= TaskStatus(
state=task_state,
message=Message(role="agent", parts=parts),
)
if task_state ==
TaskState.COMPLETED:
task.artifacts
= [Artifact(parts=parts)]
return task
4.3 摘要Agent的Server
创建 summarizer/ [server.py](http://server.py) :
import logging
import click
from
google_a2a.common.types
import (
AgentSkill, AgentCapabilities, AgentCard
)
from
google_a2a.common.server
import A2AServer
from task_manager import SummarizerTaskManager
logging.basicConfig(level=
logging.INFO)
@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10003, type=int)
defmain(host, port):
skill = AgentSkill(
id="summarize-skill",
name="Text Summarizer",
description="Summarizes any text into a concise summary",
tags=["summary", "nlp", "text"],
examples=["请帮我总结这篇文章的要点"],
inputModes=["text"],
outputModes=["text"],
)
agent_card = AgentCard(
name="Summarizer Agent",
description="Receives text and returns a concise summary.",
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=AgentCapabilities(streaming=False),
skills=[skill],
)
task_manager = SummarizerTaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
logging.info(f"Summarizer Agent running at http://{host}:{port}")
server.start()
if __name__ == "__main__":
main()
4.4 启动摘要Agent
cd summarizer
python
server.py
看到 Uvicorn running on http://localhost:10003 就OK了。
五、搭建翻译Agent(它会调用摘要Agent)
翻译Agent是核心角色:它既接收你的请求做翻译,又作为Client去调用摘要Agent。
5.1 翻译Agent的Task Manager
创建 translator/ [task_manager.py](http://task_manager.py) :
import httpx
from
google_a2a.common.server.task_manager
import InMemoryTaskManager
from
google_a2a.common.types
import (
Artifact, Message, SendTaskRequest, SendTaskResponse,
Task, TaskState, TaskStatus,
)
SUMMARIZER_URL = "http://localhost:10003"
OLLAMA_URL = "http://localhost:11434/api/generate"
OLLAMA_MODEL = "
qwen2.5:7b"
classTranslatorTaskManager(InMemoryTaskManager):
"""翻译Agent:翻译文本后,调用摘要Agent生成摘要。"""
asyncdefon_send_task(
self, request: SendTaskRequest
) -> SendTaskResponse:
awaitself.upsert_task(
request.params)
task_id =
request.params.
id
input_text =
request.params.message.parts[
0].text
# 第一步:翻译
translation = awaitself._translate(input_text)
# 更新状态为"工作中"(已完成翻译,开始摘要)
awaitself._update_task(
task_id=task_id,
task_state=
TaskState.WORKING,
response_text=f"翻译完成,正在生成摘要...\n\n翻译结果:\n{translation}",
)
# 第二步:调用摘要Agent
summary = awaitself._call_summarizer(translation)
# 第三步:汇总返回
final_text = (
f"【翻译结果】\n{translation}\n\n"
f"【摘要】\n{summary}"
)
task = awaitself._update_task(
task_id=task_id,
task_state=
TaskState.COMPLETED,
response_text=final_text,
)
returnSendTaskResponse(id=request.id, result=task)
asyncdef_translate(self, text: str) -> str:
"""调用Ollama进行翻译。"""
try:
asyncwith httpx.AsyncClient(timeout=60.0) as c:
resp = await c.post(
OLLAMA_URL,
json={
"model": OLLAMA_MODEL,
"prompt": (
f"请将以下中文翻译成英文,只输出翻译结果:\n"
f"{text}"
),
"stream": False,
},
)
result = resp.json()
return result.get("response", "翻译失败")
except Exception as e:
return f"翻译出错: {str(e)}"
asyncdef_call_summarizer(self, text: str) -> str:
"""通过A2A协议调用摘要Agent。"""
try:
# 1. 发现摘要Agent(获取Agent Card)
asyncwith httpx.AsyncClient() as c:
card_resp = await c.get(
f"{SUMMARIZER_URL}/.well-known/
agent.json"
)
card = card_resp.json()
endpoint = card["url"]
# 2. 发送A2A请求
payload = {
"jsonrpc": "2.0",
"id": "summary-request-001",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{
"type": "text",
"text": text,
}],
}
},
}
resp = await c.post(
endpoint,
json=payload,
headers={"Content-Type": "application/json"},
timeout=60.0,
)
result = resp.json()
# 3. 解析响应
if"result"in result:
task_data = result["result"]
artifacts = task_data.get("artifacts", [])
if artifacts:
parts = artifacts[0].get("parts", [])
if parts:
return parts[0].get("text", "无摘要")
message = task_data.get("status", {}).get("message", {})
parts = message.get("parts", [])
if parts:
return parts[0].get("text", "无摘要")
return"摘要Agent返回格式异常"
except Exception as e:
return f"调用摘要Agent失败: {str(e)}"
asyncdef_update_task(
self, task_id: str, task_state: TaskState,
response_text: str
) -> Task:
task = self.tasks[task_id]
parts = [{"type": "text", "text": response_text}]
task.status
= TaskStatus(
state=task_state,
message=Message(role="agent", parts=parts),
)
if task_state ==
TaskState.COMPLETED:
task.artifacts
= [Artifact(parts=parts)]
return task
5.2 翻译Agent的Server
创建 translator/ [server.py](http://server.py) :
import logging
import click
from
google_a2a.common.types
import (
AgentSkill, AgentCapabilities, AgentCard
)
from
google_a2a.common.server
import A2AServer
from task_manager import TranslatorTaskManager
logging.basicConfig(level=
logging.INFO)
@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002, type=int)
defmain(host, port):
skill = AgentSkill(
id="translate-skill",
name="Translator",
description=(
"Translates Chinese text to English, "
"then delegates summarization to Summarizer Agent"
),
tags=["translation", "nlp", "multi-agent"],
examples=["帮我把这段话翻成英文并写个摘要"],
inputModes=["text"],
outputModes=["text"],
)
agent_card = AgentCard(
name="Translator Agent",
description=(
"Translates text and coordinates with "
"Summarizer Agent via A2A protocol."
),
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=AgentCapabilities(streaming=False),
skills=[skill],
)
task_manager = TranslatorTaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
logging.info(
f"Translator Agent running at http://{host}:{port}"
)
server.start()
if __name__ == "__main__":
main()
六、启动测试
6.1 启动两个Agent
开两个终端:
# 终端1:启动摘要Agent
cd summarizer && python
server.py
# 终端2:启动翻译Agent
cd translator && python
server.py
两个终端分别看到Uvicorn启动信息就OK。
6.2 用客户端测试
创建 [client.py](http://client.py) :
import asyncio
import httpx
from
a2a.client
import A2ACardResolver, ClientConfig, create_client
from
a2a.helpers
import new_text_message
from
a2a.types.a2a_pb2
import Role, SendMessageRequest
asyncdefmain():
print("=== 双Agent协作测试 ===\n")
asyncwith httpx.AsyncClient() as httpx_client:
# 发现翻译Agent
resolver = A2ACardResolver(
httpx_client=httpx_client,
base_url="http://localhost:10002",
)
agent_card = await resolver.get_agent_card()
print(f"连接到: {
agent_card.name}"
)
print(f"技能: {[
s.name
for s in
agent_card.skills]}\n"
)
# 创建客户端
client = awaitcreate_client(
agent=agent_card,
client_config=ClientConfig(streaming=False),
)
# 发送消息
text = (
"人工智能正在改变我们工作的方式。"
"从自动写代码到智能客服,AI工具已经渗透到"
"各行各业。但不同AI之间的协作仍然是一个"
"巨大的挑战,A2A协议正是为了解决这个问题而诞生的。"
)
message = new_text_message(text, role=
Role.ROLE_USER)
request = SendMessageRequest(message=message)
print(f"发送: {text}\n")
print("等待翻译Agent和摘要Agent协作处理...\n")
print("=" * 50)
asyncfor chunk in client.send_message(request):
# 提取文本内容
ifhasattr(chunk, 'artifacts') and
chunk.artifacts:
for part in
chunk.artifacts[
0].parts:
print(
part.text)
elif hasattr(chunk, 'status') and
chunk.status.message:
for part in
chunk.status.message.parts:
print(
part.text)
await client.close()
if __name__ == "__main__":
asyncio.run(main())
运行:
python
client.py
6.3 预期输出
=== 双Agent协作测试 ===
连接到: Translator Agent
技能: ['Translator']
发送: 人工智能正在改变我们工作的方式...
等待翻译Agent和摘要Agent协作处理...
==================================================
【翻译结果】
Artificial intelligence is changing the way we work...
【摘要】
本文讨论了AI对各行各业的渗透以及A2A协议解决AI协作挑战的意义。
==================================================
看到这个输出,说明两个Agent成功通过A2A协议完成了一次完整的协作。
七、整个流程发生了什么
拆解一下你刚才看到的结果背后发生了什么:
第1步: 你的 client.py通过A2A协议把中文文本发给翻译Agent(端口10002)。
第2步: 翻译Agent收到消息,调用本地Ollama大模型进行翻译。
第3步: 翻译Agent通过A2A协议(直接用httpx发JSON-RPC请求)把英文翻译结果发给摘要Agent(端口10003)。
第4步: 摘要Agent收到英文文本,调用Ollama生成摘要,返回给翻译Agent。
第5步: 翻译Agent把翻译结果和摘要拼接在一起,作为最终结果返回给你。
整个过程你只发了一条消息。两个Agent之间的通信、任务委托、结果汇总,全部通过A2A协议自动完成。
八、几个值得注意的细节
翻译Agent怎么调用摘要Agent
注意 _call_summarizer 这个方法。它做了一件很关键的事——手动构造A2A请求。
payload = {
"jsonrpc": "2.0",
"id": "summary-request-001",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": text}],
}
},
}
这就是A2A协议的标准格式。翻译Agent作为Client,直接用httpx向摘要Agent的端点发送JSON-RPC请求。没有SDK依赖,没有魔法,就是HTTP + JSON。
这也是A2A的魅力所在:协议就是协议,任何能发HTTP请求的代码都能参与。
为什么要先获取Agent Card
在发请求之前,翻译Agent先访问了摘要Agent的 /.well-known/ [agent.json](http://agent.json) :
card_resp = await c.get(
f"{SUMMARIZER_URL}/.well-known/
agent.json"
)
endpoint = card["url"]
这叫"能力发现"。翻译Agent先搞清楚摘要Agent的地址和能力,再决定怎么调用。在真实场景中,你可能根据Agent Card里的skill信息来决定要不要调用、调用哪个skill。
两个Agent可以独立部署
这个demo里两个Agent跑在同一台电脑上。但在生产环境中,它们可以跑在不同的服务器、不同的云、甚至不同的厂商平台上。只要网络互通,A2A协议就能让它们协作。
九、我踩过的3个新坑
坑1:摘要Agent没启动就测试。 翻译Agent调用摘要Agent时连接被拒绝,报 ConnectionRefusedError。解决方法:先启动摘要Agent(10003端口),再启动翻译Agent(10002端口)。
坑2:Ollama没装或没启动。 两个Agent都依赖Ollama。如果你没装Ollama,两个Agent都会返回"翻译失败"或"摘要生成失败"。解决方法:ollama serve 先启动Ollama服务。
坑3:JSON-RPC响应格式解析错误。 A2A的响应格式跟普通HTTP响应不一样,是JSON-RPC格式的。第一次解析时我直接取 [response.text](http://response.text) ,拿到的是原始JSON字符串而不是解析后的对象。解决方法:先 .json() 解析,再按 result → artifacts → parts → text 的路径提取。
十、下一步
这篇你实现了两个Agent之间的单向委托:翻译Agent调摘要Agent。下一篇,我们要解决一个更关键的问题:安全。
当你把Agent暴露到网络上,怎么确保只有授权的Agent才能调用?A2A协议支持API Key、JWT Bearer、OAuth 2.0三种认证方式,下一篇我会带你逐一实现,给你的Agent加把锁。
下篇预告
《给你的A2A Agent加把锁:认证鉴权实战指南》 —— API Key、JWT Bearer、OAuth 2.0三种认证方式,从开发环境到生产环境的完整安全方案。
更多推荐



所有评论(0)