Talk to AI|Manus拆解
Manus工作逻辑
[!question] 我们来探讨一下Manus这个产品。他所谓的Agent的工作逻辑是怎么实现的,比如我给他一个任务,他能自动拆解并执行,这个过程的技术原理是什么,具体的技术方案可能是什么?
核心工作逻辑
Manus 这类 AI Agent 的任务自动拆解和执行,主要基于以下技术架构:
1. 任务理解与规划层
- LLM 作为推理引擎:使用大语言模型(如 GPT-4、Claude 等)理解用户意图
- Chain-of-Thought (CoT) 推理:让模型逐步思考如何将复杂任务分解为子任务
- ReAct 框架:Reasoning(推理)+ Acting(行动)循环模式
具体流程:
用户输入 → LLM 分析意图 → 生成执行计划 → 拆解为子任务序列2. 任务拆解的技术方案
方案 A:Prompt Engineering + 结构化输出
# 伪代码示例
prompt = f"""
任务:{user_task}
请将此任务拆解为可执行的步骤,输出 JSON 格式:
{{
"steps": [
{{"action": "操作类型", "params": {{}}, "description": "说明"}},
...
]
}}
"""
plan = llm.generate(prompt)
方案 B:Planning Agent + Tool Library
- 维护一个工具库(API、浏览器操作、文件操作等)
- LLM 根据可用工具规划执行路径
- 类似 LangChain 的 Agent + Tools 模式
3. 执行层技术栈
浏览器自动化:
- Playwright/Puppeteer:控制浏览器行为
- 视觉定位:使用 GPT-4V 或 Claude 的视觉能力识别页面元素
- DOM 解析:提取页面结构,定位可交互元素
动作执行:
# 示例:基于视觉的点击操作
screenshot = browser.screenshot()
prompt = f"在这个截图中,'登录'按钮的坐标是?"
coordinates = vision_llm.analyze(screenshot, prompt)
browser.click(coordinates)4. 反馈与自我修正机制
执行动作 → 观察结果 → 判断是否成功
↓ 失败
重新规划 → 调整策略 → 再次执行
关键技术:
- 状态检测:通过截图、DOM 变化判断执行结果
- 错误恢复:LLM 分析失败原因,生成新计划
- 记忆系统:向量数据库存储历史执行经验
可能的完整技术架构
┌─────────────────────────────────────┐
│ 用户输入层 (自然语言任务) │
└──────────────┬──────────────────────┘
↓
┌──────────────────────────────────────┐
│ 规划层 (Planning Agent) │
│ - LLM (GPT-4/Claude) │
│ - Prompt Templates │
│ - Task Decomposition │
└──────────────┬───────────────────────┘
↓
┌──────────────────────────────────────┐
│ 工具调度层 (Tool Orchestration) │
│ - Tool Registry │
│ - Parameter Binding │
│ - Execution Queue │
└──────────────┬───────────────────────┘
↓
┌──────────────────────────────────────┐
│ 执行层 (Execution Engine) │
│ - Browser Automation │
│ - API Callers │
│ - File System Operations │
└──────────────┬───────────────────────┘
↓
┌──────────────────────────────────────┐
│ 观察层 (Observation) │
│ - Vision Models (截图分析) │
│ - DOM Parsing │
│ - State Detection │
└──────────────┬───────────────────────┘
↓
┌──────────────────────────────────────┐
│ 反馈循环 (Feedback Loop) │
│ - Success/Failure Detection │
│ - Re-planning │
│ - Memory Update │
└──────────────────────────────────────┘关键技术挑战
- 鲁棒性:网页结构变化、加载延迟、验证码等
- 成本控制:频繁调用 LLM API 的费用
- 安全性:避免执行危险操作
- 准确性:视觉定位可能出错
优化方向
- 本地小模型:用于简单任务的快速决策
- 缓存机制:记录常见操作模式
- 多模态融合:结合 DOM + 视觉 + 可访问性树
- 人类反馈强化学习:从用户纠正中学习
这种架构本质上是 LLM + RPA(机器人流程自动化)+ 计算机视觉 的结合体,通过语言模型的推理能力弥补传统自动化脚本缺乏灵活性的问题。
工作逻辑的具体实现方式
[!question] 请你结合一个具体的QA例子,讲解一下这个工作逻辑的实现方式
场景示例:用户想预订一家餐厅
用户输入:
"帮我在明天晚上7点预订旧金山一家评分高的意大利餐厅,预算100美元左右,最好有户外座位"
完整工作流程拆解
阶段 1: 任务理解与规划(Planning)
1.1 LLM 分析用户意图
// Manus 内部的 Prompt
const planningPrompt = `
用户请求:帮我在明天晚上7点预订旧金山一家评分高的意大利餐厅,预算100美元左右,最好有户外座位
请分析这个请求并生成执行计划。
【可用工具】
1. search_restaurants(location, cuisine, price_range, features)
2. get_restaurant_details(restaurant_id)
3. check_availability(restaurant_id, date, time, party_size)
4. make_reservation(restaurant_id, date, time, party_size, contact_info)
5. get_current_date()
【输出格式】JSON
{
"reasoning": "你的推理过程",
"tasks": [
{
"id": "task_1",
"action": "工具名称",
"params": {...},
"dependencies": [],
"reason": "为什么需要这个步骤"
}
]
}
`;
// LLM 返回的规划结果
const plan = {
"reasoning": "用户需要预订餐厅,我需要:1) 先获取明天的日期 2) 搜索符合条件的餐厅 3) 检查可用性 4) 完成预订",
"tasks": [
{
"id": "task_1",
"action": "get_current_date",
"params": {},
"dependencies": [],
"reason": "需要确定'明天'的具体日期"
},
{
"id": "task_2",
"action": "search_restaurants",
"params": {
"location": "San Francisco",
"cuisine": "Italian",
"price_range": "$$",
"features": ["outdoor seating", "highly rated"]
},
"dependencies": ["task_1"],
"reason": "根据用户偏好搜索餐厅"
},
{
"id": "task_3",
"action": "check_availability",
"params": {
"restaurant_id": "{{task_2.top_result.id}}",
"date": "{{task_1.tomorrow_date}}",
"time": "19:00",
"party_size": 2
},
"dependencies": ["task_1", "task_2"],
"reason": "确认餐厅在指定时间是否有空位"
},
{
"id": "task_4",
"action": "make_reservation",
"params": {
"restaurant_id": "{{task_2.top_result.id}}",
"date": "{{task_1.tomorrow_date}}",
"time": "19:00",
"party_size": 2,
"special_requests": "outdoor seating if available"
},
"dependencies": ["task_3"],
"reason": "完成预订"
}
]
}
阶段 2: 任务执行(Execution)
2.1 执行 Task 1 - 获取日期
// Manus 调用工具
const task1Result = await tools.get_current_date();
// 返回结果
{
"current_date": "2025-09-30",
"tomorrow_date": "2025-10-01"
}
// Manus 的内部状态更新
context.memory.set("task_1", {
status: "completed",
result: { tomorrow_date: "2025-10-01" }
});
此时用户界面显示:
🤔 正在思考...
✅ 已确定预订日期:2025年10月1日
2.2 执行 Task 2 - 搜索餐厅
// 解析参数(替换依赖任务的结果)
const task2Params = {
location: "San Francisco",
cuisine: "Italian",
price_range: "$$",
features: ["outdoor seating", "highly rated"]
};
// 调用搜索工具
const task2Result = await tools.search_restaurants(task2Params);
// 返回结果
{
"restaurants": [
{
"id": "rest_123",
"name": "Trattoria Pinocchio",
"rating": 4.6,
"price_level": "$$",
"outdoor_seating": true,
"address": "401 Columbus Ave"
},
{
"id": "rest_456",
"name": "North Beach Restaurant",
"rating": 4.5,
"price_level": "$$",
"outdoor_seating": true
}
]
}
context.memory.set("task_2", {
status: "completed",
result: { top_result: task2Result.restaurants[0] }
});
用户界面显示:
🔍 正在搜索意大利餐厅...
✅ 找到 2 家符合条件的餐厅,推荐:Trattoria Pinocchio (评分4.6⭐)
2.3 执行 Task 3 - 检查可用性
// 从之前任务获取参数
const task3Params = {
restaurant_id: context.memory.get("task_2").result.top_result.id, // "rest_123"
date: context.memory.get("task_1").result.tomorrow_date, // "2025-10-01"
time: "19:00",
party_size: 2
};
const task3Result = await tools.check_availability(task3Params);
// 返回结果
{
"available": true,
"available_times": ["18:30", "19:00", "19:30"],
"outdoor_tables_available": true
}
用户界面显示:
📅 正在查询 Trattoria Pinocchio 的可用性...
✅ 好消息!明天晚上7点有空位,并且有户外座位
阶段 3: 观察与判断(Observation)
在执行 Task 3 后,Manus 需要判断是否继续执行 Task 4:
// Observation Layer 检查
const shouldProceed = await observationLayer.validate({
taskId: "task_3",
result: task3Result,
expectations: {
available: true // 期望餐厅有空位
}
});
if (shouldProceed.valid) {
// 继续执行 Task 4
await executeTask("task_4");
} else {
// 触发重新规划
await replan({
reason: "首选餐厅不可用",
context: context
});
}
2.4 执行 Task 4 - 完成预订
const task4Params = {
restaurant_id: "rest_123",
date: "2025-10-01",
time: "19:00",
party_size: 2,
special_requests: "outdoor seating if available",
contact_info: {
name: context.user.name,
phone: context.user.phone,
email: context.user.email
}
};
const task4Result = await tools.make_reservation(task4Params);
// 返回结果
{
"confirmation_code": "ABC123XYZ",
"status": "confirmed",
"restaurant": "Trattoria Pinocchio",
"date": "2025-10-01",
"time": "19:00",
"outdoor_seating": true
}
用户界面显示:
🎉 预订成功!
📍 餐厅:Trattoria Pinocchio
📅 时间:2025年10月1日 晚上7:00
👥 人数:2人
🪑 户外座位:已确认
🎫 确认码:ABC123XYZ
阶段 4: 异常处理与重新规划(Re-planning)
假设在 Task 3 中发现餐厅不可用:
// Task 3 返回的结果
{
"available": false,
"reason": "Fully booked at 19:00",
"alternative_times": ["20:00", "20:30"]
}
// Observation Layer 检测到问题
const issue = {
taskId: "task_3",
issue: "preferred_time_unavailable",
alternatives: ["20:00", "20:30"]
};
// 触发重新规划
const replanPrompt = `
原计划执行失败:
- Task 3 失败原因:首选餐厅 Trattoria Pinocchio 在 19:00 没有空位
- 可选替代时间:20:00, 20:30
- 还有另一家符合条件的餐厅:North Beach Restaurant (rest_456)
请生成新的执行计划:
选项 A:预订 Trattoria Pinocchio 的 20:00
选项 B:尝试预订 North Beach Restaurant 的 19:00
请选择最佳方案并生成新任务列表。
`;
// LLM 生成新计划
const newPlan = {
"chosen_strategy": "先尝试选项B(优先保证时间),如果B也不行再考虑A",
"tasks": [
{
"id": "task_5",
"action": "check_availability",
"params": {
"restaurant_id": "rest_456",
"date": "2025-10-01",
"time": "19:00",
"party_size": 2
},
"reason": "尝试第二家餐厅的原定时间"
},
{
"id": "task_6",
"action": "make_reservation",
"params": {
"restaurant_id": "rest_456",
"date": "2025-10-01",
"time": "19:00",
"party_size": 2
},
"dependencies": ["task_5"],
"reason": "如果可用则预订"
}
]
};
// 继续执行新计划
await executePlan(newPlan);
用户界面显示:
⚠️ Trattoria Pinocchio 在7点已满
🔄 正在为您寻找替代方案...
🔍 尝试 North Beach Restaurant...
✅ 找到空位!正在为您预订...
核心技术实现
1. 任务依赖图解析
class TaskExecutor {
async execute(tasks: Task[]) {
// 拓扑排序确保依赖顺序
const sortedTasks = this.topologicalSort(tasks);
for (const task of sortedTasks) {
// 解析依赖任务的结果
const params = this.resolveDependencies(task, this.context);
// 执行工具
const result = await this.callTool(task.action, params);
// 保存结果供后续任务使用
this.context.memory.set(task.id, result);
// 观察结果
const valid = await this.observe(task, result);
if (!valid) {
// 触发重新规划
await this.replan(task, result);
break;
}
}
}
resolveDependencies(task: Task, context: Context) {
// 将 "{{task_2.top_result.id}}" 替换为实际值
let params = JSON.stringify(task.params);
for (const depId of task.dependencies) {
const depResult = context.memory.get(depId);
params = params.replace(
new RegExp(`{{${depId}\\.(.+?)}}`, 'g'),
(match, path) => this.getNestedValue(depResult, path)
);
}
return JSON.parse(params);
}
}2. ReAct 循环实现
async function reactLoop(userQuery: string) {
let iteration = 0;
const maxIterations = 10;
while (iteration < maxIterations) {
// Thought (思考)
const thought = await llm.generate(`
当前状态:${JSON.stringify(context.state)}
用户目标:${userQuery}
下一步应该做什么?
`);
// Action (行动)
const action = parseAction(thought);
const observation = await executeAction(action);
// 判断是否完成
const isComplete = await llm.generate(`
目标:${userQuery}
当前观察:${observation}
任务是否完成?(yes/no)
`);
if (isComplete === 'yes') {
return summarizeResult();
}
iteration++;
}
}
关键差异点对比
| 维度 | 传统脚本 | Manus Agent |
|---|---|---|
| 灵活性 | 固定流程 | 动态规划 |
| 容错性 | 遇错即停 | 自动重试/改变策略 |
| 可扩展性 | 需重写代码 | 添加新工具即可 |
| 用户体验 | 黑盒执行 | 透明的思考过程 |
这就是 Manus 的完整工作逻辑!它本质上是 "LLM 大脑 + 工具手脚 + 反馈眼睛" 的组合体。
开发侧的技术实现
[!question] 以上生产过程的原理,那么站在manus开发者角度是怎么实现的呢,比如LLM的task拆解是通过prompt吗,他如何判断是否正确呢等等,解释这些开发侧的核心问题和解法。
一、核心开发挑战与解决方案
挑战 1:如何让 LLM 稳定地拆解任务?
问题
LLM 是概率模型,同一个 prompt 可能产生不同输出。如何保证任务拆解的稳定性和正确性?
解决方案:结构化 Prompt + Schema 约束
// ❌ 不好的做法:开放式 Prompt
const badPrompt = `
用户说:帮我订餐厅
你觉得需要做什么?
`;
// 结果:LLM 可能回复自然语言,难以解析
// ✅ 好的做法:严格的结构化 Prompt
const goodPrompt = `
你是一个任务规划专家。用户请求如下:
"${userInput}"
【当前上下文】
- 用户位置:${userLocation}
- 当前时间:${currentTime}
- 用户偏好:${userPreferences}
【可用工具列表】
${availableTools.map(tool => `
- ${tool.name}: ${tool.description}
参数: ${JSON.stringify(tool.parameters)}
示例: ${tool.example}
`).join('\n')}
【输出要求】
必须返回有效的 JSON,格式如下:
{
"reasoning": "你的推理过程(200字内)",
"tasks": [
{
"id": "task_1",
"tool": "工具名称(必须从上面列表选择)",
"params": {具体参数},
"dependencies": ["task_0"], // 依赖的任务 ID
"description": "这步做什么"
}
],
"expected_outcome": "预期结果描述"
}
【关键规则】
1. 任务 ID 必须唯一,格式为 task_N
2. dependencies 必须引用已存在的任务 ID
3. 第一个任务的 dependencies 必须为空数组
4. 工具名称必须完全匹配列表中的名称
5. 不要编造工具或参数
现在开始规划:
`;关键技巧 1:使用 JSON Schema 验证
import Ajv from 'ajv';
const taskPlanSchema = {
type: "object",
required: ["reasoning", "tasks", "expected_outcome"],
properties: {
reasoning: { type: "string", minLength: 10, maxLength: 500 },
tasks: {
type: "array",
minItems: 1,
items: {
type: "object",
required: ["id", "tool", "params", "dependencies", "description"],
properties: {
id: { type: "string", pattern: "^task_\\d+$" },
tool: { type: "string", enum: registeredToolNames }, // 只能是已注册的工具
params: { type: "object" },
dependencies: { type: "array", items: { type: "string" } },
description: { type: "string" }
}
}
},
expected_outcome: { type: "string" }
}
};
const ajv = new Ajv();
const validate = ajv.compile(taskPlanSchema);
async function generatePlan(userInput: string): Promise<TaskPlan> {
let attempts = 0;
const maxAttempts = 3;
while (attempts < maxAttempts) {
const llmOutput = await claude.messages.create({
model: "claude-sonnet-4-5-20250929",
max_tokens: 2000,
messages: [{ role: "user", content: buildPrompt(userInput) }]
});
try {
const parsed = JSON.parse(llmOutput.content[0].text);
// JSON Schema 验证
if (validate(parsed)) {
// 额外的业务逻辑验证
if (validateTaskDependencies(parsed.tasks)) {
return parsed;
}
}
// 如果验证失败,告诉 LLM 哪里错了
console.log("验证失败:", validate.errors);
} catch (error) {
console.log(`尝试 ${attempts + 1} 失败:`, error);
}
attempts++;
}
throw new Error("无法生成有效的任务计划");
}关键技巧 2:Few-Shot Learning(给 LLM 看例子)
const promptWithExamples = `
你是任务规划专家。以下是一些优秀的规划示例:
【示例 1】
用户输入:"帮我订明天的餐厅"
输出:
{
"reasoning": "用户需要预订餐厅,我需要:1) 获取明天日期 2) 搜索餐厅 3) 检查可用性 4) 完成预订",
"tasks": [
{
"id": "task_1",
"tool": "get_date",
"params": { "offset_days": 1 },
"dependencies": [],
"description": "获取明天的日期"
},
{
"id": "task_2",
"tool": "search_restaurants",
"params": { "location": "user_location" },
"dependencies": ["task_1"],
"description": "搜索用户位置附近的餐厅"
}
],
"expected_outcome": "成功预订餐厅并返回确认信息"
}
【示例 2】
用户输入:"查一下今天天气"
输出:
{
"reasoning": "这是一个简单的查询任务,只需调用天气 API",
"tasks": [
{
"id": "task_1",
"tool": "get_weather",
"params": { "location": "user_location", "date": "today" },
"dependencies": [],
"description": "获取用户当前位置的天气"
}
],
"expected_outcome": "返回今天的天气信息"
}
现在轮到你了:
用户输入:"${userInput}"
请按照相同的格式输出任务计划。
`;
挑战 2:如何判断任务执行是否正确?
问题
工具执行后返回结果,但如何自动判断结果是否符合预期?
解决方案:多层验证机制
class TaskValidator {
// 层级 1:类型验证(最快)
validateSchema(result: any, expectedSchema: Schema): boolean {
// 检查返回值的数据结构
return ajv.validate(expectedSchema, result);
}
// 层级 2:规则验证(快速)
validateRules(task: Task, result: any): ValidationResult {
const rules = this.getRulesForTool(task.tool);
for (const rule of rules) {
if (!rule.check(result)) {
return {
valid: false,
reason: rule.failureMessage,
severity: rule.severity // 'error' | 'warning'
};
}
}
return { valid: true };
}
// 层级 3:LLM 语义验证(慢但准确)
async validateWithLLM(task: Task, result: any): Promise<ValidationResult> {
const prompt = `
任务描述:${task.description}
预期结果:${task.expectedOutcome}
实际结果:${JSON.stringify(result)}
请判断实际结果是否满足预期:
1. 如果满足,返回:{"valid": true, "confidence": 0.95}
2. 如果不满足,返回:{"valid": false, "reason": "具体原因", "confidence": 0.95}
只返回 JSON,不要解释。
`;
const response = await this.llm.generate(prompt);
return JSON.parse(response);
}
}
// 实际使用:组合三层验证
async function validateTaskResult(task: Task, result: any): Promise<boolean> {
// 快速检查:类型是否正确
if (!validator.validateSchema(result, task.outputSchema)) {
logger.error("Schema validation failed");
return false;
}
// 中速检查:业务规则
const ruleResult = validator.validateRules(task, result);
if (!ruleResult.valid) {
if (ruleResult.severity === 'error') {
return false;
}
// 警告级别:继续,但记录日志
logger.warn(ruleResult.reason);
}
// 慢速检查:只在关键任务使用 LLM 验证
if (task.critical) {
const llmResult = await validator.validateWithLLM(task, result);
if (!llmResult.valid) {
logger.error(`LLM validation failed: ${llmResult.reason}`);
return false;
}
}
return true;
}具体示例:验证餐厅搜索结果
// 工具定义时就声明验证规则
const searchRestaurantTool = {
name: "search_restaurants",
description: "搜索餐厅",
// 输出 Schema
outputSchema: {
type: "object",
required: ["restaurants"],
properties: {
restaurants: {
type: "array",
items: {
type: "object",
required: ["id", "name", "rating"],
properties: {
id: { type: "string" },
name: { type: "string" },
rating: { type: "number", minimum: 0, maximum: 5 }
}
}
}
}
},
// 业务规则
validationRules: [
{
name: "has_results",
check: (result) => result.restaurants.length > 0,
failureMessage: "搜索未返回任何结果",
severity: "error"
},
{
name: "rating_quality",
check: (result) => result.restaurants.some(r => r.rating >= 4.0),
failureMessage: "没有找到高评分餐厅",
severity: "warning"
},
{
name: "location_match",
check: (result, task) => {
const requestedLocation = task.params.location;
return result.restaurants.every(r =>
r.address.includes(requestedLocation)
);
},
failureMessage: "返回的餐厅位置不匹配",
severity: "error"
}
]
};
挑战 3:如何处理工具调用失败?
问题
API 超时、网络错误、返回空结果等异常情况如何处理?
解决方案:智能重试 + 降级策略
class ResilientExecutor {
async executeWithRetry(task: Task): Promise<ExecutionResult> {
const strategy = this.getRetryStrategy(task);
for (let attempt = 1; attempt <= strategy.maxAttempts; attempt++) {
try {
const result = await this.executeTool(task.tool, task.params);
// 验证结果
if (await this.validate(task, result)) {
return { success: true, data: result };
}
// 结果不符合预期,但不是错误
if (attempt < strategy.maxAttempts) {
await this.sleep(strategy.backoff(attempt));
continue;
}
} catch (error) {
if (this.isRetryable(error) && attempt < strategy.maxAttempts) {
logger.warn(`任务 ${task.id} 失败,重试 ${attempt}/${strategy.maxAttempts}`);
await this.sleep(strategy.backoff(attempt));
continue;
}
// 不可重试的错误或达到最大次数
return await this.handleFailure(task, error, attempt);
}
}
// 所有尝试都失败
return await this.fallbackStrategy(task);
}
// 决定如何处理失败
async handleFailure(task: Task, error: Error, attempts: number): Promise<ExecutionResult> {
// 策略 1:使用备用工具
if (task.fallbackTool) {
logger.info(`使用备用工具: ${task.fallbackTool}`);
return this.executeTool(task.fallbackTool, task.params);
}
// 策略 2:降级到简化版本
if (task.degradedVersion) {
logger.info("使用降级版本");
return this.executeTool(task.tool, task.degradedVersion.params);
}
// 策略 3:让 LLM 重新规划
logger.info("触发重新规划");
return {
success: false,
shouldReplan: true,
context: {
failedTask: task,
error: error.message,
attempts: attempts
}
};
}
// 判断错误是否可重试
isRetryable(error: Error): boolean {
const retryableErrors = [
'ETIMEDOUT',
'ECONNRESET',
'ECONNREFUSED',
'RATE_LIMIT_EXCEEDED',
'SERVICE_UNAVAILABLE'
];
return retryableErrors.some(e => error.message.includes(e));
}
}实战案例:餐厅搜索失败的处理
// 场景:Yelp API 失败后自动切换到 Google Places
const searchTask = {
id: "task_1",
tool: "search_yelp",
params: { location: "SF", cuisine: "Italian" },
fallbackTool: "search_google_places", // 备用工具
degradedVersion: { // 降级版本:放宽条件
params: { location: "SF", cuisine: "Italian", min_rating: 3.5 } // 原本是 4.0
}
};
// 执行流程
const result = await executor.executeWithRetry(searchTask);
if (!result.success && result.shouldReplan) {
// LLM 重新规划
const replanPrompt = `
原计划失败:
- 任务:搜索意大利餐厅
- 失败原因:${result.context.error}
- 已尝试:Yelp API(失败)、Google Places API(失败)
请生成新的执行计划,考虑以下选项:
1. 搜索更大范围的餐厅(不限意大利菜)
2. 询问用户是否接受其他菜系
3. 提供附近的热门餐厅列表让用户选择
请选择最佳方案并生成新任务。
`;
const newPlan = await llm.generate(replanPrompt);
await executor.execute(newPlan);
}
挑战 4:如何管理工具注册和版本?
问题
工具越来越多,如何组织?如何让 LLM 选对工具?
解决方案:工具注册中心 + 语义索引
class ToolRegistry {
private tools = new Map<string, Tool>();
private vectorStore: VectorDB;
// 注册工具
register(tool: Tool) {
// 1. 存储工具定义
this.tools.set(tool.name, tool);
// 2. 向量化工具描述(用于语义搜索)
const embedding = await this.embedder.embed(
`${tool.name}: ${tool.description}. Examples: ${tool.examples.join(', ')}`
);
this.vectorStore.upsert({
id: tool.name,
vector: embedding,
metadata: {
name: tool.name,
category: tool.category,
requiredParams: tool.parameters.required,
tags: tool.tags
}
});
}
// 智能工具推荐:根据任务描述找到最相关的工具
async recommendTools(taskDescription: string, limit: number = 5): Promise<Tool[]> {
const queryEmbedding = await this.embedder.embed(taskDescription);
const results = await this.vectorStore.search({
vector: queryEmbedding,
limit: limit * 2, // 多拿一些候选
filter: {
enabled: true // 只搜索启用的工具
}
});
// 进一步用 LLM 筛选
const prompt = `
任务描述:${taskDescription}
候选工具:
${results.map(r => `- ${r.metadata.name}: ${this.tools.get(r.metadata.name).description}`).join('\n')}
请选出最相关的 ${limit} 个工具,返回工具名称的 JSON 数组。
`;
const selected = await this.llm.generate(prompt);
return JSON.parse(selected).map(name => this.tools.get(name));
}
// 为 LLM 生成工具列表(Prompt 的一部分)
async generateToolPrompt(taskDescription: string): Promise<string> {
const recommendedTools = await this.recommendTools(taskDescription, 10);
return recommendedTools.map(tool => `
【工具】${tool.name}
描述:${tool.description}
参数:
${Object.entries(tool.parameters.properties).map(([key, schema]) =>
` - ${key} (${schema.type}${schema.required ? ', 必填' : ''}): ${schema.description}`
).join('\n')}
示例:
${tool.examples.map(ex => ` ${JSON.stringify(ex)}`).join('\n')}
`).join('\n---\n');
}
}
// 使用示例
const registry = new ToolRegistry();
// 注册工具时提供丰富的元数据
registry.register({
name: "search_restaurants",
description: "在指定位置搜索餐厅,支持按菜系、价格、评分筛选",
category: "search",
tags: ["food", "dining", "location-based"],
parameters: {
type: "object",
properties: {
location: { type: "string", description: "搜索位置(城市或地址)" },
cuisine: { type: "string", description: "菜系类型,如 Italian, Chinese" },
price_range: { type: "string", enum: ["$", "$$", "$$$", "$$$$"] },
min_rating: { type: "number", minimum: 0, maximum: 5 }
},
required: ["location"]
},
examples: [
{ location: "San Francisco", cuisine: "Italian", price_range: "$$" },
{ location: "New York", min_rating: 4.5 }
],
execute: async (params) => { /* 实现 */ }
});
// LLM 规划时,自动推荐相关工具
const taskDesc = "帮我找一家靠近金门大桥的海鲜餐厅";
const toolPrompt = await registry.generateToolPrompt(taskDesc);
// 只会推荐 search_restaurants、get_location 等相关工具,不会列出所有 100 个工具
挑战 5:如何让 LLM 理解工具执行的结果?
问题
工具返回的数据可能很复杂(如 JSON 嵌套结构),LLM 如何提取关键信息?
解决方案:结果摘要 + 结构化传递
class ResultProcessor {
// 自动生成结果摘要
async summarizeResult(tool: string, result: any): Promise<Summary> {
// 策略 1:预定义摘要规则(快速)
const rule = this.getSummaryRule(tool);
if (rule) {
return rule.extract(result);
}
// 策略 2:LLM 生成摘要(灵活)
const prompt = `
工具:${tool}
原始结果(JSON):
${JSON.stringify(result, null, 2)}
请提取关键信息,生成简洁摘要(50字内)。
同时提取结构化数据供后续任务使用。
返回格式:
{
"summary": "一句话总结",
"key_data": {提取的关键字段},
"next_step_suggestion": "建议下一步做什么"
}
`;
const response = await this.llm.generate(prompt);
return JSON.parse(response);
}
}
// 示例:餐厅搜索结果处理
const rawResult = {
restaurants: [
{
id: "rest_123",
name: "Trattoria Pinocchio",
rating: 4.6,
reviews: 1250,
price: "$$",
address: "401 Columbus Ave, San Francisco, CA",
outdoor_seating: true,
open_hours: { monday: "11:00-22:00", ... },
menu_url: "https://...",
phone: "+1-415-xxx-xxxx"
},
// ... 更多餐厅
],
total_count: 47,
search_params: { location: "San Francisco", cuisine: "Italian" }
};
// 自动摘要
const summary = await processor.summarizeResult("search_restaurants", rawResult);
// 输出:
{
"summary": "找到47家意大利餐厅,推荐评分4.6的Trattoria Pinocchio,有户外座位",
"key_data": {
"top_restaurant_id": "rest_123",
"top_restaurant_name": "Trattoria Pinocchio",
"has_outdoor": true,
"total_found": 47
},
"next_step_suggestion": "检查 rest_123 的预订可用性"
}
// 在 Prompt 中传递给 LLM 时,使用摘要而非原始数据
const nextPrompt = `
上一步结果:${summary.summary}
关键数据:${JSON.stringify(summary.key_data)}
基于此结果,下一步应该:${summary.next_step_suggestion}
请确认并生成下一步任务。
`;
挑战 6:如何调试和监控 Agent 的行为?
问题
Agent 是"黑盒",如何知道它为什么做出某个决定?如何发现问题?
解决方案:可观测性系统
class AgentObservability {
// 1. 记录完整的执行轨迹
async logTrace(sessionId: string, event: TraceEvent) {
await this.traceDB.insert({
session_id: sessionId,
timestamp: Date.now(),
type: event.type, // 'planning' | 'execution' | 'validation' | 'replan'
data: event.data,
llm_prompt: event.llmPrompt, // 保存发给 LLM 的 Prompt
llm_response: event.llmResponse,
duration_ms: event.duration
});
}
// 2. 实时监控面板
async getSessionInsights(sessionId: string) {
const traces = await this.traceDB.query({ session_id: sessionId });
return {
total_tasks: traces.filter(t => t.type === 'execution').length,
failed_tasks: traces.filter(t => t.type === 'execution' && !t.data.success).length,
replanning_count: traces.filter(t => t.type === 'replan').length,
total_llm_calls: traces.filter(t => t.llm_prompt).length,
total_tokens: traces.reduce((sum, t) => sum + (t.data.tokens_used || 0), 0),
execution_time_ms: traces[traces.length - 1].timestamp - traces[0].timestamp,
// 关键指标
success_rate: this.calculateSuccessRate(traces),
avg_task_duration: this.calculateAvgDuration(traces),
// 可视化数据
timeline: this.buildTimeline(traces)
};
}
// 3. 异常检测
async detectAnomalies(sessionId: string): Promise<Anomaly[]> {
const traces = await this.traceDB.query({ session_id: sessionId });
const anomalies: Anomaly[] = [];
// 检测 1:任务循环(同一个任务执行多次)
const taskCounts = new Map<string, number>();
for (const trace of traces.filter(t => t.type === 'execution')) {
const taskName = trace.data.task_name;
taskCounts.set(taskName, (taskCounts.get(taskName) || 0) + 1);
}
for (const [task, count] of taskCounts.entries()) {
if (count > 3) {
anomalies.push({
type: 'task_loop',
severity: 'high',
message: `任务 "${task}" 执行了 ${count} 次,可能陷入循环`,
recommendation: '检查任务依赖关系和重新规划逻辑'
});
}
}
// 检测 2:频繁重新规划
const replanCount = traces.filter(t => t.type === 'replan').length;
if (replanCount > 5) {
anomalies.push({
type: 'excessive_replanning',
severity: 'medium',
message: `重新规划了 ${replanCount} 次,可能目标不明确或工具不足`,
recommendation: '优化初始 Prompt 或添加更多工具'
});
}
// 检测 3:Token 使用异常
const totalTokens = traces.reduce((sum, t) => sum + (t.data.tokens_used || 0), 0);
if (totalTokens > 50000) {
anomalies.push({
type: 'high_token_usage',
severity: 'medium',
message: `使用了 ${totalTokens} tokens,成本较高`,
recommendation: '考虑缓存常见查询或使用更小的模型'
});
}
return anomalies;
}
// 4. A/B 测试不同的 Prompt
async comparePrompts(userQuery: string, promptVersions: string[]) {
const results = await Promise.all(
promptVersions.map(async (prompt, idx) => {
const startTime = Date.now();
const plan = await this.generatePlan(userQuery, prompt);
const execution = await this.executePlan(plan);
const duration = Date.now() - startTime;
return {
version: `v${idx + 1}`,
prompt: prompt,
success: execution.success,
task_count: plan.tasks.length,
duration_ms: duration,
tokens_used: execution.tokens_used,
user_satisfaction: await this.getUserFeedback(execution.result)
};
})
);
// 生成对比报告
return this.generateComparisonReport(results);
}
}
// 使用示例:调试一个失败的 session
const insights = await observability.getSessionInsights("session_abc123");
console.log(`
任务统计:
- 总任务数:${insights.total_tasks}
- 失败任务:${insights.failed_tasks}
- 重新规划:${insights.replanning_count}
性能指标:
- 成功率:${insights.success_rate}%
- 平均任务耗时:${insights.avg_task_duration}ms
- 总 Token 数:${insights.total_tokens}
`);
const anomalies = await observability.detectAnomalies("session_abc123");
if (anomalies.length > 0) {
console.log("检测到异常:");
anomalies.forEach(a => {
console.log(` [${a.severity}] ${a.message}`);
console.log(` 建议:${a.recommendation}`);
});
}
二、实际开发流程
Step 1: 设计工具系统
// 1. 定义工具接口
interface Tool {
name: string;
description: string;
parameters: JSONSchema;
examples: any[];
execute: (params: any) => Promise<any>;
validate?: (result: any) => boolean;
fallback?: string; // 备用工具名称
}
// 2. 实现具体工具
const weatherTool: Tool = {
name: "get_weather",
description: "获取指定地点的天气信息",
parameters: {
type: "object",
properties: {
location: { type: "string", description: "城市名称或坐标" },
date: { type: "string", description: "日期,格式 YYYY-MM-DD" }
},
required: ["location"]
},
examples: [
{ location: "San Francisco", date: "2025-10-01" },
{ location: "Beijing" }
],
execute: async (params) => {
const response = await fetch(
`https://api.weather.com/v1/forecast?location=${params.location}&date=${params.date}`
);
return response.json();
},
validate: (result) => {
return result.temperature !== undefined && result.condition !== undefined;
},
fallback: "get_weather_backup" // 如果主 API 失败,使用备用
};
// 3. 注册到工具库
toolRegistry.register(weatherTool);Step 2: 构建 Prompt 工程系统
class PromptBuilder {
private templates = new Map<string, PromptTemplate>();
private variables = new Map<string, any>();
// 注册 Prompt 模板(支持版本管理)
registerTemplate(name: string, version: string, template: PromptTemplate) {
const key = `${name}@${version}`;
this.templates.set(key, template);
}
// 构建 Prompt(支持变量替换、条件渲染)
async build(templateName: string, context: Context): Promise<string> {
const template = this.templates.get(templateName);
if (!template) {
throw new Error(`Template ${templateName} not found`);
}
let prompt = template.content;
// 1. 替换变量
for (const [key, value] of Object.entries(context.variables)) {
prompt = prompt.replace(new RegExp(`{{${key}}}`, 'g'), String(value));
}
// 2. 条件渲染
prompt = this.renderConditionals(prompt, context);
// 3. 注入工具列表(动态根据任务推荐)
if (template.includeTools) {
const toolPrompt = await this.toolRegistry.generateToolPrompt(context.userQuery);
prompt = prompt.replace('{{TOOLS}}', toolPrompt);
}
// 4. 注入历史对话(上下文窗口管理)
if (template.includeHistory) {
const history = await this.getRelevantHistory(context.sessionId, context.userQuery);
prompt = prompt.replace('{{HISTORY}}', history);
}
return prompt;
}
// 条件渲染(类似模板引擎)
private renderConditionals(prompt: string, context: Context): string {
// 支持 {{#if condition}} ... {{/if}}
const ifRegex = /{{#if (\w+)}}([\s\S]*?){{\/if}}/g;
return prompt.replace(ifRegex, (match, condition, content) => {
return context.variables[condition] ? content : '';
});
}
// 智能上下文窗口管理(只保留相关历史)
private async getRelevantHistory(sessionId: string, currentQuery: string): Promise<string> {
const allHistory = await this.db.getConversationHistory(sessionId);
// 向量搜索:找出与当前查询最相关的历史对话
const queryEmbedding = await this.embedder.embed(currentQuery);
const relevantMessages = await this.vectorDB.search({
vector: queryEmbedding,
filter: { session_id: sessionId },
limit: 5
});
// 格式化为 Prompt
return relevantMessages.map(msg =>
`${msg.role}: ${msg.content}`
).join('\n');
}
}
// 使用示例:注册规划 Prompt 模板
promptBuilder.registerTemplate("task_planning", "v2.1", {
content: `
你是一个任务规划专家。
【用户请求】
{{userQuery}}
【上下文信息】
- 用户位置:{{userLocation}}
- 当前时间:{{currentTime}}
{{#if hasHistory}}
- 相关历史对话:
{{HISTORY}}
{{/if}}
【可用工具】
{{TOOLS}}
【输出要求】
返回 JSON 格式的任务计划,包含 reasoning 和 tasks 数组。
每个任务必须指定 id、tool、params、dependencies。
【重要规则】
1. 优先使用高可靠性工具
2. 避免不必要的串行依赖,尽量并行
3. 每个任务的 description 要清晰说明目的
4. 如果用户请求不明确,添加 clarification_needed 字段
现在开始规划:
`,
includeTools: true,
includeHistory: true
});
// 构建 Prompt
const prompt = await promptBuilder.build("[email protected]", {
variables: {
userQuery: "帮我订餐厅",
userLocation: "San Francisco",
currentTime: "2025-10-01 18:00",
hasHistory: true
},
sessionId: "session_123"
});Step 3: 实现执行引擎核心逻辑
class ExecutionEngine {
private toolRegistry: ToolRegistry;
private validator: TaskValidator;
private monitor: AgentObservability;
async executePlan(plan: TaskPlan, context: Context): Promise<ExecutionResult> {
const trace = new ExecutionTrace(context.sessionId);
try {
// 1. 验证计划合法性
this.validatePlan(plan);
// 2. 构建任务图(拓扑排序)
const taskGraph = this.buildTaskGraph(plan.tasks);
// 3. 识别可并行的任务
const parallelGroups = this.identifyParallelTasks(taskGraph);
// 4. 逐层执行
for (const [level, tasks] of parallelGroups.entries()) {
trace.log('execution_level', { level, task_count: tasks.length });
// 并行执行同一层的任务
const results = await Promise.allSettled(
tasks.map(task => this.executeTask(task, context, trace))
);
// 检查是否有失败
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
// 判断是否需要重新规划
if (this.shouldReplan(tasks, failures)) {
return await this.triggerReplanning(plan, context, failures, trace);
}
// 否则继续执行(可能有些任务不依赖失败的任务)
this.handlePartialFailure(tasks, failures);
}
// 保存本轮结果到上下文
this.updateContext(context, tasks, results);
}
// 5. 生成最终回复
const finalResponse = await this.generateResponse(context, trace);
// 6. 记录完整轨迹
await this.monitor.saveTrace(trace);
return {
success: true,
response: finalResponse,
metadata: {
tasks_executed: plan.tasks.length,
duration_ms: trace.getDuration(),
tokens_used: trace.getTotalTokens()
}
};
} catch (error) {
trace.log('execution_error', { error: error.message });
await this.monitor.saveTrace(trace);
return {
success: false,
error: error.message,
partial_results: context.results
};
}
}
// 执行单个任务(核心方法)
private async executeTask(task: Task, context: Context, trace: ExecutionTrace): Promise<any> {
const startTime = Date.now();
trace.log('task_start', { task_id: task.id, tool: task.tool });
// 1. 解析参数(替换依赖任务的占位符)
const resolvedParams = this.resolveDependencies(task, context);
// 2. 获取工具
const tool = this.toolRegistry.get(task.tool);
if (!tool) {
throw new Error(`工具不存在: ${task.tool}`);
}
// 3. 执行工具(带重试机制)
const result = await this.executeWithRetry(tool, resolvedParams, task);
// 4. 验证结果
const validation = await this.validator.validate(task, result);
if (!validation.valid) {
trace.log('task_validation_failed', {
task_id: task.id,
reason: validation.reason
});
if (validation.severity === 'error') {
throw new Error(`任务验证失败: ${validation.reason}`);
}
}
// 5. 生成结果摘要
const summary = await this.summarizeResult(tool.name, result);
const duration = Date.now() - startTime;
trace.log('task_complete', {
task_id: task.id,
duration_ms: duration,
summary: summary.summary
});
return {
raw: result,
summary: summary,
task_id: task.id
};
}
// 解析任务参数中的依赖引用
private resolveDependencies(task: Task, context: Context): any {
const params = { ...task.params };
// 遍历所有参数,查找占位符
for (const [key, value] of Object.entries(params)) {
if (typeof value === 'string' && value.startsWith('{{')) {
// 格式: {{task_1.result.restaurant_id}}
const match = value.match(/{{(\w+)\.(.+)}}/);
if (match) {
const [_, taskId, path] = match;
const taskResult = context.results.get(taskId);
if (!taskResult) {
throw new Error(`依赖任务 ${taskId} 的结果不存在`);
}
// 通过路径提取嵌套值
params[key] = this.getNestedValue(taskResult, path);
}
}
}
return params;
}
private getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((current, key) => current?.[key], obj);
}
// 判断是否需要重新规划
private shouldReplan(tasks: Task[], failures: any[]): boolean {
// 规则 1:如果关键任务失败,必须重新规划
const criticalTasksFailed = tasks.some(t =>
t.critical && failures.some(f => f.reason.task_id === t.id)
);
// 规则 2:如果失败率超过 50%,重新规划
const failureRate = failures.length / tasks.length;
return criticalTasksFailed || failureRate > 0.5;
}
// 触发重新规划
private async triggerReplanning(
originalPlan: TaskPlan,
context: Context,
failures: any[],
trace: ExecutionTrace
): Promise<ExecutionResult> {
trace.log('replanning_triggered', { failure_count: failures.length });
// 构建重新规划的 Prompt
const replanPrompt = `
原始计划执行遇到问题:
【原始目标】
${originalPlan.expected_outcome}
【已完成的任务】
${Array.from(context.results.entries()).map(([id, result]) =>
`- ${id}: ${result.summary.summary}`
).join('\n')}
【失败的任务】
${failures.map(f =>
`- ${f.reason.task_id}: ${f.reason.error}`
).join('\n')}
【可用工具】
${await this.toolRegistry.generateToolPrompt(originalPlan.expected_outcome)}
请分析失败原因并生成新的执行计划。注意:
1. 可以使用已完成任务的结果
2. 可以更换工具或调整参数
3. 如果目标无法达成,请提供替代方案
返回 JSON 格式的新计划。
`;
// 调用 LLM 生成新计划
const newPlan = await this.llm.generate(replanPrompt);
const parsedPlan = JSON.parse(newPlan);
trace.log('new_plan_generated', { task_count: parsedPlan.tasks.length });
// 递归执行新计划
return this.executePlan(parsedPlan, context);
}
}Step 4: 实现结果生成和展示
class ResponseGenerator {
// 将执行结果转化为用户友好的回复
async generate(context: Context, trace: ExecutionTrace): Promise<string> {
// 收集所有任务的摘要
const summaries = Array.from(context.results.values()).map(r => r.summary);
// 使用 LLM 生成自然语言回复
const prompt = `
用户原始请求:${context.userQuery}
执行过程:
${summaries.map((s, i) => `${i + 1}. ${s.summary}`).join('\n')}
最终结果:
${JSON.stringify(context.finalResult, null, 2)}
请基于以上信息,生成一个简洁、友好的回复给用户。
要求:
1. 直接回答用户的问题
2. 突出关键信息(使用 markdown 格式)
3. 如果有需要用户确认的,明确提出
4. 语气自然,不要太正式
回复:
`;
const response = await this.llm.generate(prompt);
// 添加结构化数据(方便前端渲染)
return {
text: response,
structured_data: this.extractStructuredData(context),
actions: this.suggestNextActions(context)
};
}
// 提取结构化数据供前端展示
private extractStructuredData(context: Context): any {
// 根据任务类型提取不同的数据
const taskTypes = new Set(
Array.from(context.results.values()).map(r => r.task_type)
);
const structured = {};
if (taskTypes.has('search_restaurants')) {
structured.restaurants = context.results
.get('search_restaurants')
?.raw.restaurants.slice(0, 3); // 只返回前 3 个
}
if (taskTypes.has('make_reservation')) {
structured.reservation = {
confirmation_code: context.finalResult.confirmation_code,
restaurant: context.finalResult.restaurant,
time: context.finalResult.time
};
}
return structured;
}
// 建议下一步操作
private suggestNextActions(context: Context): Action[] {
const actions: Action[] = [];
// 如果预订成功,建议添加到日历
if (context.finalResult.confirmation_code) {
actions.push({
type: 'add_to_calendar',
label: '添加到日历',
data: {
title: `餐厅预订: ${context.finalResult.restaurant}`,
time: context.finalResult.time
}
});
}
// 如果搜索了餐厅但未预订,建议预订
if (context.results.has('search_restaurants') && !context.results.has('make_reservation')) {
actions.push({
type: 'make_reservation',
label: '立即预订',
data: {
restaurant_id: context.results.get('search_restaurants').raw.restaurants[0].id
}
});
}
return actions;
}
}
三、关键的工程实践
1. Prompt 版本控制和 A/B 测试
// 实际生产中的 Prompt 管理
class PromptVersionControl {
// 存储在数据库,支持热更新
async getActivePrompt(name: string): Promise<string> {
const version = await this.db.query(`
SELECT content, metadata
FROM prompts
WHERE name = $1 AND active = true
ORDER BY version DESC
LIMIT 1
`, [name]);
return version.content;
}
// A/B 测试:随机选择不同版本的 Prompt
async getPromptWithABTest(name: string, userId: string): Promise<string> {
const variants = await this.db.query(`
SELECT content, variant_id
FROM prompt_variants
WHERE name = $1 AND active = true
`, [name]);
// 根据用户 ID 一致性哈希分配变体
const variantIndex = this.consistentHash(userId, variants.length);
const selected = variants[variantIndex];
// 记录使用的变体(用于后续分析)
await this.logVariantUsage(userId, selected.variant_id);
return selected.content;
}
// 分析不同 Prompt 变体的效果
async analyzeVariants(name: string): Promise<VariantAnalysis[]> {
const stats = await this.db.query(`
SELECT
v.variant_id,
COUNT(*) as usage_count,
AVG(f.rating) as avg_rating,
AVG(e.duration_ms) as avg_duration,
SUM(e.tokens_used) as total_tokens
FROM prompt_variants v
JOIN executions e ON e.prompt_variant_id = v.variant_id
LEFT JOIN feedback f ON f.execution_id = e.id
WHERE v.name = $1
GROUP BY v.variant_id
`, [name]);
return stats.map(s => ({
variant_id: s.variant_id,
metrics: {
usage: s.usage_count,
satisfaction: s.avg_rating,
performance: s.avg_duration,
cost: s.total_tokens * 0.000015 // Claude token 价格
},
confidence_interval: this.calculateCI(s)
}));
}
}2. 成本控制和监控
class CostMonitor {
// 实时跟踪成本
async trackCost(sessionId: string, event: CostEvent) {
const cost = this.calculateCost(event);
await this.db.insert('cost_tracking', {
session_id: sessionId,
event_type: event.type, // 'llm_call', 'tool_execution'
provider: event.provider, // 'anthropic', 'openai'
tokens_used: event.tokens,
cost_usd: cost,
timestamp: Date.now()
});
// 检查是否超过预算
const sessionTotal = await this.getSessionCost(sessionId);
if (sessionTotal > this.budgetLimit) {
throw new BudgetExceededError(`会话成本超过限制: $${sessionTotal}`);
}
}
private calculateCost(event: CostEvent): number {
const pricing = {
'claude-sonnet-4-5': {
input: 0.000003, // $3 per MTok
output: 0.000015 // $15 per MTok
},
'claude-haiku-3': {
input: 0.00000025,
output: 0.00000125
}
};
const rate = pricing[event.model];
return (event.input_tokens * rate.input) + (event.output_tokens * rate.output);
}
// 成本优化建议
async suggestOptimizations(sessionId: string): Promise<Optimization[]> {
const analysis = await this.analyzeCostBreakdown(sessionId);
const suggestions: Optimization[] = [];
// 如果 LLM 调用次数过多
if (analysis.llm_calls > 10) {
suggestions.push({
type: 'reduce_llm_calls',
impact: 'high',
suggestion: '考虑缓存常见查询或批量处理任务',
potential_saving: analysis.llm_cost * 0.4
});
}
// 如果使用了大模型处理简单任务
if (analysis.simple_tasks_with_large_model > 5) {
suggestions.push({
type: 'use_smaller_model',
impact: 'medium',
suggestion: '简单任务使用 Claude Haiku 而非 Sonnet',
potential_saving: analysis.simple_task_cost * 0.8
});
}
return suggestions;
}
}3. 错误恢复和降级策略
class GracefulDegradation {
// 定义服务降级级别
private degradationLevels = {
FULL_SERVICE: 0, // 所有功能正常
REDUCE_QUALITY: 1, // 减少质量但保持功能
ESSENTIAL_ONLY: 2, // 只保留核心功能
FALLBACK_MODE: 3 // 使用备用方案
};
private currentLevel = this.degradationLevels.FULL_SERVICE;
// 根据系统状态自动降级
async handleSystemStress(metrics: SystemMetrics) {
if (metrics.error_rate > 0.3) {
await this.degradeTo(this.degradationLevels.FALLBACK_MODE);
} else if (metrics.response_time > 10000) {
await this.degradeTo(this.degradationLevels.REDUCE_QUALITY);
} else if (metrics.cost_rate > this.costThreshold) {
await this.degradeTo(this.degradationLevels.ESSENTIAL_ONLY);
}
}
private async degradeTo(level: number) {
this.currentLevel = level;
switch (level) {
case this.degradationLevels.REDUCE_QUALITY:
// 使用更便宜的模型
this.llmConfig.model = 'claude-haiku-3';
// 减少工具数量
this.toolRegistry.disableNonEssential();
break;
case this.degradationLevels.ESSENTIAL_ONLY:
// 只保留核心功能
this.disableFeatures(['advanced_search', 'personalization']);
// 增加缓存使用
this.cacheConfig.ttl = 3600; // 1小时
break;
case this.degradationLevels.FALLBACK_MODE:
// 使用预定义的规则替代 LLM
this.useRuleBasedFallback = true;
// 返回模板化回复
this.useTemplateResponses = true;
break;
}
logger.warn(`System degraded to level ${level}`);
}
// 规则引擎降级(不使用 LLM)
private async ruleBasedPlanning(userQuery: string): Promise<TaskPlan> {
// 使用简单的关键词匹配
if (userQuery.includes('餐厅') || userQuery.includes('预订')) {
return {
tasks: [
{ id: 'task_1', tool: 'search_restaurants', params: {} },
{ id: 'task_2', tool: 'make_reservation', params: {}, dependencies: ['task_1'] }
]
};
}
if (userQuery.includes('天气')) {
return {
tasks: [{ id: 'task_1', tool: 'get_weather', params: {} }]
};
}
// 默认返回通用回复
return {
tasks: [{ id: 'task_1', tool: 'general_search', params: { query: userQuery } }]
};
}
}
四、总结:Manus 开发者的核心能力
必备技能清单
1. Prompt Engineering(最重要)
- 结构化 Prompt 设计:用严格的格式、明确的输出要求和 JSON Schema 约束 LLM 的回复,避免随机性
- Few-Shot Learning:在 Prompt 中给 LLM 展示 2-3 个优秀示例,让它模仿正确的输出格式和推理方式
- Chain-of-Thought 引导:要求 LLM 在 JSON 中包含
reasoning字段,把思考过程写出来再给答案,提升准确性 - JSON Schema 约束:用代码验证 LLM 输出的 JSON 结构是否符合预期,不符合就重试或报错
2. 系统设计
- 任务依赖图解析:用拓扑排序算法理清任务的先后顺序,确保 task_2 依赖的 task_1 结果已经拿到
- 并发执行优化:识别哪些任务互不依赖可以同时跑,用
Promise.all并行执行节省时间 - 错误恢复机制:任务失败时自动重试 3 次,还不行就换备用工具或让 LLM 重新规划
- 降级策略:系统压力大时自动切换到更便宜的模型、减少功能或用规则引擎替代 LLM
3. LLM 工程
- Token 优化:只传必要信息给 LLM,用摘要代替原始数据,用缓存避免重复调用,降低成本
- 流式输出:边生成边返回(像 ChatGPT 打字效果),用户体验更好且不用等所有内容生成完
- 上下文窗口管理:用向量搜索只保留与当前对话最相关的历史记录,避免超出 token 限制
- 模型选择策略:简单任务用便宜的 Haiku,复杂推理用强大的 Sonnet,根据任务难度动态选模型
4. 可观测性
- 执行轨迹记录:把 LLM 的每个决策、工具调用、结果验证全记录到数据库,方便事后分析为什么出错
- 异常检测:自动发现异常模式(如同一任务执行 10 次、成本突然暴涨),及时预警
- A/B 测试:同时测试多个版本的 Prompt,用数据说话选出成功率最高的那个
- 成本监控:实时统计每个会话花了多少钱,超预算就停止或降级,避免账单爆炸
关键设计原则
1. 让 LLM 专注于推理,而非记忆
- 用工具提供实时数据:不要让 LLM 回忆"旧金山有哪些餐厅",而是调用 Yelp API 工具获取最新真实数据
- 用向量数据库存储知识:把历史对话、用户偏好等信息存到向量库,需要时通过相似度搜索提取,而不是塞进 Prompt
- 用规则引擎处理确定性逻辑:对于"订单金额大于100就打9折"这种固定规则,直接写代码判断,不需要问 LLM
2. 永远有 Plan B
- 每个工具都有备用方案:主 API(如 Yelp)挂了自动切换到备用 API(如 Google Places),不让用户感知到故障
- 关键路径有降级策略:LLM 调用失败时,改用更便宜的模型或预设的规则引擎继续完成任务
- LLM 失败时用规则引擎:如果 LLM 连续 3 次都没生成有效计划,就用关键词匹配等简单规则给出基础回复
3. 透明化而非黑盒
- 记录完整的决策过程:把"为什么选这个餐厅""为什么调用这个工具"等推理过程存到数据库,方便事后审查
- 可视化执行轨迹:用时间线图表展示 Agent 执行的每个步骤、耗时、成功/失败状态,让开发者一眼看懂流程
- 支持人工干预:在关键节点(如最终预订前)暂停让用户确认,或允许开发者手动修正 Agent 的决策
4. 持续优化
- A/B 测试 Prompt:同时跑两个版本的 Prompt(如"请详细推理" vs "请简洁回答"),统计哪个成功率更高
- 监控成本和性能:追踪每个功能的 token 消耗、响应时间、成功率,发现瓶颈后针对性优化
- 从失败案例学习:分析失败的会话记录,提取共性问题(如"搜索结果为空时经常卡住"),改进 Prompt 或工具逻辑
这些原则的核心思想就是:把 LLM 当作不稳定但强大的推理引擎,用工程手段保证系统整体的稳定性和可控性。
这就是 Manus 背后完整的开发侧实现!核心就是把 LLM 当作"大脑",通过精心设计的 Prompt 和工具系统,让它能够可靠地规划和执行复杂任务。