Talk to AI|Manus拆解

2025-07-13 · Junyi Yan、Claude Sonnet 4.5

Manus工作逻辑

[!question] 我们来探讨一下Manus这个产品。他所谓的Agent的工作逻辑是怎么实现的,比如我给他一个任务,他能自动拆解并执行,这个过程的技术原理是什么,具体的技术方案可能是什么?

核心工作逻辑

Manus 这类 AI Agent 的任务自动拆解和执行,主要基于以下技术架构:

1. 任务理解与规划层

  • LLM 作为推理引擎:使用大语言模型(如 GPT-4、Claude 等)理解用户意图
  • Chain-of-Thought (CoT) 推理:让模型逐步思考如何将复杂任务分解为子任务
  • ReAct 框架:Reasoning(推理)+ Acting(行动)循环模式

具体流程:

用户输入 → LLM 分析意图 → 生成执行计划 → 拆解为子任务序列

2. 任务拆解的技术方案

方案 A:Prompt Engineering + 结构化输出

# 伪代码示例
prompt = f"""
任务:{user_task}
请将此任务拆解为可执行的步骤,输出 JSON 格式:
{{
  "steps": [
    {{"action": "操作类型", "params": {{}}, "description": "说明"}},
    ...
  ]
}}
"""
plan = llm.generate(prompt)

方案 B:Planning Agent + Tool Library

  • 维护一个工具库(API、浏览器操作、文件操作等)
  • LLM 根据可用工具规划执行路径
  • 类似 LangChain 的 Agent + Tools 模式

3. 执行层技术栈

浏览器自动化:

  • Playwright/Puppeteer:控制浏览器行为
  • 视觉定位:使用 GPT-4V 或 Claude 的视觉能力识别页面元素
  • DOM 解析:提取页面结构,定位可交互元素

动作执行:

# 示例:基于视觉的点击操作
screenshot = browser.screenshot()
prompt = f"在这个截图中,'登录'按钮的坐标是?"
coordinates = vision_llm.analyze(screenshot, prompt)
browser.click(coordinates)

4. 反馈与自我修正机制

执行动作 → 观察结果 → 判断是否成功 
    ↓ 失败
重新规划 → 调整策略 → 再次执行

关键技术:

  • 状态检测:通过截图、DOM 变化判断执行结果
  • 错误恢复:LLM 分析失败原因,生成新计划
  • 记忆系统:向量数据库存储历史执行经验

可能的完整技术架构

┌─────────────────────────────────────┐
│   用户输入层 (自然语言任务)             │
└──────────────┬──────────────────────┘

┌──────────────────────────────────────┐
│  规划层 (Planning Agent)              │
│  - LLM (GPT-4/Claude)                │
│  - Prompt Templates                  │
│  - Task Decomposition                │
└──────────────┬───────────────────────┘

┌──────────────────────────────────────┐
│  工具调度层 (Tool Orchestration)       │
│  - Tool Registry                     │
│  - Parameter Binding                 │
│  - Execution Queue                   │
└──────────────┬───────────────────────┘

┌──────────────────────────────────────┐
│  执行层 (Execution Engine)            │
│  - Browser Automation                │
│  - API Callers                       │
│  - File System Operations            │
└──────────────┬───────────────────────┘

┌──────────────────────────────────────┐
│  观察层 (Observation)                 │
│  - Vision Models (截图分析)           │
│  - DOM Parsing                       │
│  - State Detection                   │
└──────────────┬───────────────────────┘

┌──────────────────────────────────────┐
│  反馈循环 (Feedback Loop)             │
│  - Success/Failure Detection         │
│  - Re-planning                       │
│  - Memory Update                     │
└──────────────────────────────────────┘

关键技术挑战

  1. 鲁棒性:网页结构变化、加载延迟、验证码等
  2. 成本控制:频繁调用 LLM API 的费用
  3. 安全性:避免执行危险操作
  4. 准确性:视觉定位可能出错

优化方向

  • 本地小模型:用于简单任务的快速决策
  • 缓存机制:记录常见操作模式
  • 多模态融合:结合 DOM + 视觉 + 可访问性树
  • 人类反馈强化学习:从用户纠正中学习

这种架构本质上是 LLM + RPA(机器人流程自动化)+ 计算机视觉 的结合体,通过语言模型的推理能力弥补传统自动化脚本缺乏灵活性的问题。

工作逻辑的具体实现方式

[!question] 请你结合一个具体的QA例子,讲解一下这个工作逻辑的实现方式

场景示例:用户想预订一家餐厅

用户输入

"帮我在明天晚上7点预订旧金山一家评分高的意大利餐厅,预算100美元左右,最好有户外座位"


完整工作流程拆解

阶段 1: 任务理解与规划(Planning)

1.1 LLM 分析用户意图

// Manus 内部的 Prompt
const planningPrompt = `
用户请求:帮我在明天晚上7点预订旧金山一家评分高的意大利餐厅,预算100美元左右,最好有户外座位

请分析这个请求并生成执行计划。

【可用工具】
1. search_restaurants(location, cuisine, price_range, features)
2. get_restaurant_details(restaurant_id)
3. check_availability(restaurant_id, date, time, party_size)
4. make_reservation(restaurant_id, date, time, party_size, contact_info)
5. get_current_date()

【输出格式】JSON
{
  "reasoning": "你的推理过程",
  "tasks": [
    {
      "id": "task_1",
      "action": "工具名称",
      "params": {...},
      "dependencies": [],
      "reason": "为什么需要这个步骤"
    }
  ]
}
`;

// LLM 返回的规划结果
const plan = {
  "reasoning": "用户需要预订餐厅,我需要:1) 先获取明天的日期 2) 搜索符合条件的餐厅 3) 检查可用性 4) 完成预订",
  "tasks": [
    {
      "id": "task_1",
      "action": "get_current_date",
      "params": {},
      "dependencies": [],
      "reason": "需要确定'明天'的具体日期"
    },
    {
      "id": "task_2",
      "action": "search_restaurants",
      "params": {
        "location": "San Francisco",
        "cuisine": "Italian",
        "price_range": "$$",
        "features": ["outdoor seating", "highly rated"]
      },
      "dependencies": ["task_1"],
      "reason": "根据用户偏好搜索餐厅"
    },
    {
      "id": "task_3",
      "action": "check_availability",
      "params": {
        "restaurant_id": "{{task_2.top_result.id}}",
        "date": "{{task_1.tomorrow_date}}",
        "time": "19:00",
        "party_size": 2
      },
      "dependencies": ["task_1", "task_2"],
      "reason": "确认餐厅在指定时间是否有空位"
    },
    {
      "id": "task_4",
      "action": "make_reservation",
      "params": {
        "restaurant_id": "{{task_2.top_result.id}}",
        "date": "{{task_1.tomorrow_date}}",
        "time": "19:00",
        "party_size": 2,
        "special_requests": "outdoor seating if available"
      },
      "dependencies": ["task_3"],
      "reason": "完成预订"
    }
  ]
}

阶段 2: 任务执行(Execution)

2.1 执行 Task 1 - 获取日期

// Manus 调用工具
const task1Result = await tools.get_current_date();

// 返回结果
{
  "current_date": "2025-09-30",
  "tomorrow_date": "2025-10-01"
}

// Manus 的内部状态更新
context.memory.set("task_1", {
  status: "completed",
  result: { tomorrow_date: "2025-10-01" }
});

此时用户界面显示

🤔 正在思考...
✅ 已确定预订日期:2025年10月1日

2.2 执行 Task 2 - 搜索餐厅

// 解析参数(替换依赖任务的结果)
const task2Params = {
  location: "San Francisco",
  cuisine: "Italian",
  price_range: "$$",
  features: ["outdoor seating", "highly rated"]
};

// 调用搜索工具
const task2Result = await tools.search_restaurants(task2Params);

// 返回结果
{
  "restaurants": [
    {
      "id": "rest_123",
      "name": "Trattoria Pinocchio",
      "rating": 4.6,
      "price_level": "$$",
      "outdoor_seating": true,
      "address": "401 Columbus Ave"
    },
    {
      "id": "rest_456",
      "name": "North Beach Restaurant",
      "rating": 4.5,
      "price_level": "$$",
      "outdoor_seating": true
    }
  ]
}

context.memory.set("task_2", {
  status: "completed",
  result: { top_result: task2Result.restaurants[0] }
});

用户界面显示

🔍 正在搜索意大利餐厅...
✅ 找到 2 家符合条件的餐厅,推荐:Trattoria Pinocchio (评分4.6⭐)

2.3 执行 Task 3 - 检查可用性

// 从之前任务获取参数
const task3Params = {
  restaurant_id: context.memory.get("task_2").result.top_result.id, // "rest_123"
  date: context.memory.get("task_1").result.tomorrow_date, // "2025-10-01"
  time: "19:00",
  party_size: 2
};

const task3Result = await tools.check_availability(task3Params);

// 返回结果
{
  "available": true,
  "available_times": ["18:30", "19:00", "19:30"],
  "outdoor_tables_available": true
}

用户界面显示

📅 正在查询 Trattoria Pinocchio 的可用性...
✅ 好消息!明天晚上7点有空位,并且有户外座位

阶段 3: 观察与判断(Observation)

在执行 Task 3 后,Manus 需要判断是否继续执行 Task 4:

// Observation Layer 检查
const shouldProceed = await observationLayer.validate({
  taskId: "task_3",
  result: task3Result,
  expectations: {
    available: true // 期望餐厅有空位
  }
});

if (shouldProceed.valid) {
  // 继续执行 Task 4
  await executeTask("task_4");
} else {
  // 触发重新规划
  await replan({
    reason: "首选餐厅不可用",
    context: context
  });
}

2.4 执行 Task 4 - 完成预订

const task4Params = {
  restaurant_id: "rest_123",
  date: "2025-10-01",
  time: "19:00",
  party_size: 2,
  special_requests: "outdoor seating if available",
  contact_info: {
    name: context.user.name,
    phone: context.user.phone,
    email: context.user.email
  }
};

const task4Result = await tools.make_reservation(task4Params);

// 返回结果
{
  "confirmation_code": "ABC123XYZ",
  "status": "confirmed",
  "restaurant": "Trattoria Pinocchio",
  "date": "2025-10-01",
  "time": "19:00",
  "outdoor_seating": true
}

用户界面显示

🎉 预订成功!
📍 餐厅:Trattoria Pinocchio
📅 时间:2025年10月1日 晚上7:00
👥 人数:2人
🪑 户外座位:已确认
🎫 确认码:ABC123XYZ

阶段 4: 异常处理与重新规划(Re-planning)

假设在 Task 3 中发现餐厅不可用:

// Task 3 返回的结果
{
  "available": false,
  "reason": "Fully booked at 19:00",
  "alternative_times": ["20:00", "20:30"]
}

// Observation Layer 检测到问题
const issue = {
  taskId: "task_3",
  issue: "preferred_time_unavailable",
  alternatives: ["20:00", "20:30"]
};

// 触发重新规划
const replanPrompt = `
原计划执行失败:
- Task 3 失败原因:首选餐厅 Trattoria Pinocchio 在 19:00 没有空位
- 可选替代时间:20:00, 20:30
- 还有另一家符合条件的餐厅:North Beach Restaurant (rest_456)

请生成新的执行计划:
选项 A:预订 Trattoria Pinocchio 的 20:00
选项 B:尝试预订 North Beach Restaurant 的 19:00

请选择最佳方案并生成新任务列表。
`;

// LLM 生成新计划
const newPlan = {
  "chosen_strategy": "先尝试选项B(优先保证时间),如果B也不行再考虑A",
  "tasks": [
    {
      "id": "task_5",
      "action": "check_availability",
      "params": {
        "restaurant_id": "rest_456",
        "date": "2025-10-01",
        "time": "19:00",
        "party_size": 2
      },
      "reason": "尝试第二家餐厅的原定时间"
    },
    {
      "id": "task_6",
      "action": "make_reservation",
      "params": {
        "restaurant_id": "rest_456",
        "date": "2025-10-01",
        "time": "19:00",
        "party_size": 2
      },
      "dependencies": ["task_5"],
      "reason": "如果可用则预订"
    }
  ]
};

// 继续执行新计划
await executePlan(newPlan);

用户界面显示

⚠️ Trattoria Pinocchio 在7点已满
🔄 正在为您寻找替代方案...
🔍 尝试 North Beach Restaurant...
✅ 找到空位!正在为您预订...

核心技术实现

1. 任务依赖图解析

class TaskExecutor {
  async execute(tasks: Task[]) {
    // 拓扑排序确保依赖顺序
    const sortedTasks = this.topologicalSort(tasks);
    
    for (const task of sortedTasks) {
      // 解析依赖任务的结果
      const params = this.resolveDependencies(task, this.context);
      
      // 执行工具
      const result = await this.callTool(task.action, params);
      
      // 保存结果供后续任务使用
      this.context.memory.set(task.id, result);
      
      // 观察结果
      const valid = await this.observe(task, result);
      
      if (!valid) {
        // 触发重新规划
        await this.replan(task, result);
        break;
      }
    }
  }
  
  resolveDependencies(task: Task, context: Context) {
    // 将 "{{task_2.top_result.id}}" 替换为实际值
    let params = JSON.stringify(task.params);
    
    for (const depId of task.dependencies) {
      const depResult = context.memory.get(depId);
      params = params.replace(
        new RegExp(`{{${depId}\\.(.+?)}}`, 'g'),
        (match, path) => this.getNestedValue(depResult, path)
      );
    }
    
    return JSON.parse(params);
  }
}

2. ReAct 循环实现

async function reactLoop(userQuery: string) {
  let iteration = 0;
  const maxIterations = 10;
  
  while (iteration < maxIterations) {
    // Thought (思考)
    const thought = await llm.generate(`
当前状态:${JSON.stringify(context.state)}
用户目标:${userQuery}

下一步应该做什么?
    `);
    
    // Action (行动)
    const action = parseAction(thought);
    const observation = await executeAction(action);
    
    // 判断是否完成
    const isComplete = await llm.generate(`
目标:${userQuery}
当前观察:${observation}

任务是否完成?(yes/no)
    `);
    
    if (isComplete === 'yes') {
      return summarizeResult();
    }
    
    iteration++;
  }
}

关键差异点对比

维度传统脚本Manus Agent
灵活性固定流程动态规划
容错性遇错即停自动重试/改变策略
可扩展性需重写代码添加新工具即可
用户体验黑盒执行透明的思考过程

这就是 Manus 的完整工作逻辑!它本质上是 "LLM 大脑 + 工具手脚 + 反馈眼睛" 的组合体。

开发侧的技术实现

[!question] 以上生产过程的原理,那么站在manus开发者角度是怎么实现的呢,比如LLM的task拆解是通过prompt吗,他如何判断是否正确呢等等,解释这些开发侧的核心问题和解法。

一、核心开发挑战与解决方案

挑战 1:如何让 LLM 稳定地拆解任务?

问题

LLM 是概率模型,同一个 prompt 可能产生不同输出。如何保证任务拆解的稳定性和正确性

解决方案:结构化 Prompt + Schema 约束

// ❌ 不好的做法:开放式 Prompt
const badPrompt = `
用户说:帮我订餐厅
你觉得需要做什么?
`;
// 结果:LLM 可能回复自然语言,难以解析

// ✅ 好的做法:严格的结构化 Prompt
const goodPrompt = `
你是一个任务规划专家。用户请求如下:
"${userInput}"

【当前上下文】
- 用户位置:${userLocation}
- 当前时间:${currentTime}
- 用户偏好:${userPreferences}

【可用工具列表】
${availableTools.map(tool => `
- ${tool.name}: ${tool.description}
  参数: ${JSON.stringify(tool.parameters)}
  示例: ${tool.example}
`).join('\n')}

【输出要求】
必须返回有效的 JSON,格式如下:
{
  "reasoning": "你的推理过程(200字内)",
  "tasks": [
    {
      "id": "task_1",
      "tool": "工具名称(必须从上面列表选择)",
      "params": {具体参数},
      "dependencies": ["task_0"], // 依赖的任务 ID
      "description": "这步做什么"
    }
  ],
  "expected_outcome": "预期结果描述"
}

【关键规则】
1. 任务 ID 必须唯一,格式为 task_N
2. dependencies 必须引用已存在的任务 ID
3. 第一个任务的 dependencies 必须为空数组
4. 工具名称必须完全匹配列表中的名称
5. 不要编造工具或参数

现在开始规划:
`;

关键技巧 1:使用 JSON Schema 验证

import Ajv from 'ajv';

const taskPlanSchema = {
  type: "object",
  required: ["reasoning", "tasks", "expected_outcome"],
  properties: {
    reasoning: { type: "string", minLength: 10, maxLength: 500 },
    tasks: {
      type: "array",
      minItems: 1,
      items: {
        type: "object",
        required: ["id", "tool", "params", "dependencies", "description"],
        properties: {
          id: { type: "string", pattern: "^task_\\d+$" },
          tool: { type: "string", enum: registeredToolNames }, // 只能是已注册的工具
          params: { type: "object" },
          dependencies: { type: "array", items: { type: "string" } },
          description: { type: "string" }
        }
      }
    },
    expected_outcome: { type: "string" }
  }
};

const ajv = new Ajv();
const validate = ajv.compile(taskPlanSchema);

async function generatePlan(userInput: string): Promise<TaskPlan> {
  let attempts = 0;
  const maxAttempts = 3;
  
  while (attempts < maxAttempts) {
    const llmOutput = await claude.messages.create({
      model: "claude-sonnet-4-5-20250929",
      max_tokens: 2000,
      messages: [{ role: "user", content: buildPrompt(userInput) }]
    });
    
    try {
      const parsed = JSON.parse(llmOutput.content[0].text);
      
      // JSON Schema 验证
      if (validate(parsed)) {
        // 额外的业务逻辑验证
        if (validateTaskDependencies(parsed.tasks)) {
          return parsed;
        }
      }
      
      // 如果验证失败,告诉 LLM 哪里错了
      console.log("验证失败:", validate.errors);
      
    } catch (error) {
      console.log(`尝试 ${attempts + 1} 失败:`, error);
    }
    
    attempts++;
  }
  
  throw new Error("无法生成有效的任务计划");
}

关键技巧 2:Few-Shot Learning(给 LLM 看例子)

const promptWithExamples = `
你是任务规划专家。以下是一些优秀的规划示例:

【示例 1】
用户输入:"帮我订明天的餐厅"
输出:
{
  "reasoning": "用户需要预订餐厅,我需要:1) 获取明天日期 2) 搜索餐厅 3) 检查可用性 4) 完成预订",
  "tasks": [
    {
      "id": "task_1",
      "tool": "get_date",
      "params": { "offset_days": 1 },
      "dependencies": [],
      "description": "获取明天的日期"
    },
    {
      "id": "task_2",
      "tool": "search_restaurants",
      "params": { "location": "user_location" },
      "dependencies": ["task_1"],
      "description": "搜索用户位置附近的餐厅"
    }
  ],
  "expected_outcome": "成功预订餐厅并返回确认信息"
}

【示例 2】
用户输入:"查一下今天天气"
输出:
{
  "reasoning": "这是一个简单的查询任务,只需调用天气 API",
  "tasks": [
    {
      "id": "task_1",
      "tool": "get_weather",
      "params": { "location": "user_location", "date": "today" },
      "dependencies": [],
      "description": "获取用户当前位置的天气"
    }
  ],
  "expected_outcome": "返回今天的天气信息"
}

现在轮到你了:
用户输入:"${userInput}"
请按照相同的格式输出任务计划。
`;

挑战 2:如何判断任务执行是否正确?

问题

工具执行后返回结果,但如何自动判断结果是否符合预期?

解决方案:多层验证机制

class TaskValidator {
  // 层级 1:类型验证(最快)
  validateSchema(result: any, expectedSchema: Schema): boolean {
    // 检查返回值的数据结构
    return ajv.validate(expectedSchema, result);
  }
  
  // 层级 2:规则验证(快速)
  validateRules(task: Task, result: any): ValidationResult {
    const rules = this.getRulesForTool(task.tool);
    
    for (const rule of rules) {
      if (!rule.check(result)) {
        return {
          valid: false,
          reason: rule.failureMessage,
          severity: rule.severity // 'error' | 'warning'
        };
      }
    }
    
    return { valid: true };
  }
  
  // 层级 3:LLM 语义验证(慢但准确)
  async validateWithLLM(task: Task, result: any): Promise<ValidationResult> {
    const prompt = `
任务描述:${task.description}
预期结果:${task.expectedOutcome}
实际结果:${JSON.stringify(result)}

请判断实际结果是否满足预期:
1. 如果满足,返回:{"valid": true, "confidence": 0.95}
2. 如果不满足,返回:{"valid": false, "reason": "具体原因", "confidence": 0.95}

只返回 JSON,不要解释。
`;
    
    const response = await this.llm.generate(prompt);
    return JSON.parse(response);
  }
}

// 实际使用:组合三层验证
async function validateTaskResult(task: Task, result: any): Promise<boolean> {
  // 快速检查:类型是否正确
  if (!validator.validateSchema(result, task.outputSchema)) {
    logger.error("Schema validation failed");
    return false;
  }
  
  // 中速检查:业务规则
  const ruleResult = validator.validateRules(task, result);
  if (!ruleResult.valid) {
    if (ruleResult.severity === 'error') {
      return false;
    }
    // 警告级别:继续,但记录日志
    logger.warn(ruleResult.reason);
  }
  
  // 慢速检查:只在关键任务使用 LLM 验证
  if (task.critical) {
    const llmResult = await validator.validateWithLLM(task, result);
    if (!llmResult.valid) {
      logger.error(`LLM validation failed: ${llmResult.reason}`);
      return false;
    }
  }
  
  return true;
}

具体示例:验证餐厅搜索结果

// 工具定义时就声明验证规则
const searchRestaurantTool = {
  name: "search_restaurants",
  description: "搜索餐厅",
  
  // 输出 Schema
  outputSchema: {
    type: "object",
    required: ["restaurants"],
    properties: {
      restaurants: {
        type: "array",
        items: {
          type: "object",
          required: ["id", "name", "rating"],
          properties: {
            id: { type: "string" },
            name: { type: "string" },
            rating: { type: "number", minimum: 0, maximum: 5 }
          }
        }
      }
    }
  },
  
  // 业务规则
  validationRules: [
    {
      name: "has_results",
      check: (result) => result.restaurants.length > 0,
      failureMessage: "搜索未返回任何结果",
      severity: "error"
    },
    {
      name: "rating_quality",
      check: (result) => result.restaurants.some(r => r.rating >= 4.0),
      failureMessage: "没有找到高评分餐厅",
      severity: "warning"
    },
    {
      name: "location_match",
      check: (result, task) => {
        const requestedLocation = task.params.location;
        return result.restaurants.every(r => 
          r.address.includes(requestedLocation)
        );
      },
      failureMessage: "返回的餐厅位置不匹配",
      severity: "error"
    }
  ]
};

挑战 3:如何处理工具调用失败?

问题

API 超时、网络错误、返回空结果等异常情况如何处理?

解决方案:智能重试 + 降级策略

class ResilientExecutor {
  async executeWithRetry(task: Task): Promise<ExecutionResult> {
    const strategy = this.getRetryStrategy(task);
    
    for (let attempt = 1; attempt <= strategy.maxAttempts; attempt++) {
      try {
        const result = await this.executeTool(task.tool, task.params);
        
        // 验证结果
        if (await this.validate(task, result)) {
          return { success: true, data: result };
        }
        
        // 结果不符合预期,但不是错误
        if (attempt < strategy.maxAttempts) {
          await this.sleep(strategy.backoff(attempt));
          continue;
        }
        
      } catch (error) {
        if (this.isRetryable(error) && attempt < strategy.maxAttempts) {
          logger.warn(`任务 ${task.id} 失败,重试 ${attempt}/${strategy.maxAttempts}`);
          await this.sleep(strategy.backoff(attempt));
          continue;
        }
        
        // 不可重试的错误或达到最大次数
        return await this.handleFailure(task, error, attempt);
      }
    }
    
    // 所有尝试都失败
    return await this.fallbackStrategy(task);
  }
  
  // 决定如何处理失败
  async handleFailure(task: Task, error: Error, attempts: number): Promise<ExecutionResult> {
    // 策略 1:使用备用工具
    if (task.fallbackTool) {
      logger.info(`使用备用工具: ${task.fallbackTool}`);
      return this.executeTool(task.fallbackTool, task.params);
    }
    
    // 策略 2:降级到简化版本
    if (task.degradedVersion) {
      logger.info("使用降级版本");
      return this.executeTool(task.tool, task.degradedVersion.params);
    }
    
    // 策略 3:让 LLM 重新规划
    logger.info("触发重新规划");
    return {
      success: false,
      shouldReplan: true,
      context: {
        failedTask: task,
        error: error.message,
        attempts: attempts
      }
    };
  }
  
  // 判断错误是否可重试
  isRetryable(error: Error): boolean {
    const retryableErrors = [
      'ETIMEDOUT',
      'ECONNRESET',
      'ECONNREFUSED',
      'RATE_LIMIT_EXCEEDED',
      'SERVICE_UNAVAILABLE'
    ];
    
    return retryableErrors.some(e => error.message.includes(e));
  }
}

实战案例:餐厅搜索失败的处理

// 场景:Yelp API 失败后自动切换到 Google Places
const searchTask = {
  id: "task_1",
  tool: "search_yelp",
  params: { location: "SF", cuisine: "Italian" },
  fallbackTool: "search_google_places", // 备用工具
  degradedVersion: { // 降级版本:放宽条件
    params: { location: "SF", cuisine: "Italian", min_rating: 3.5 } // 原本是 4.0
  }
};

// 执行流程
const result = await executor.executeWithRetry(searchTask);

if (!result.success && result.shouldReplan) {
  // LLM 重新规划
  const replanPrompt = `
原计划失败:
- 任务:搜索意大利餐厅
- 失败原因:${result.context.error}
- 已尝试:Yelp API(失败)、Google Places API(失败)

请生成新的执行计划,考虑以下选项:
1. 搜索更大范围的餐厅(不限意大利菜)
2. 询问用户是否接受其他菜系
3. 提供附近的热门餐厅列表让用户选择

请选择最佳方案并生成新任务。
`;
  
  const newPlan = await llm.generate(replanPrompt);
  await executor.execute(newPlan);
}

挑战 4:如何管理工具注册和版本?

问题

工具越来越多,如何组织?如何让 LLM 选对工具?

解决方案:工具注册中心 + 语义索引

class ToolRegistry {
  private tools = new Map<string, Tool>();
  private vectorStore: VectorDB;
  
  // 注册工具
  register(tool: Tool) {
    // 1. 存储工具定义
    this.tools.set(tool.name, tool);
    
    // 2. 向量化工具描述(用于语义搜索)
    const embedding = await this.embedder.embed(
      `${tool.name}: ${tool.description}. Examples: ${tool.examples.join(', ')}`
    );
    
    this.vectorStore.upsert({
      id: tool.name,
      vector: embedding,
      metadata: {
        name: tool.name,
        category: tool.category,
        requiredParams: tool.parameters.required,
        tags: tool.tags
      }
    });
  }
  
  // 智能工具推荐:根据任务描述找到最相关的工具
  async recommendTools(taskDescription: string, limit: number = 5): Promise<Tool[]> {
    const queryEmbedding = await this.embedder.embed(taskDescription);
    
    const results = await this.vectorStore.search({
      vector: queryEmbedding,
      limit: limit * 2, // 多拿一些候选
      filter: {
        enabled: true // 只搜索启用的工具
      }
    });
    
    // 进一步用 LLM 筛选
    const prompt = `
任务描述:${taskDescription}

候选工具:
${results.map(r => `- ${r.metadata.name}: ${this.tools.get(r.metadata.name).description}`).join('\n')}

请选出最相关的 ${limit} 个工具,返回工具名称的 JSON 数组。
`;
    
    const selected = await this.llm.generate(prompt);
    return JSON.parse(selected).map(name => this.tools.get(name));
  }
  
  // 为 LLM 生成工具列表(Prompt 的一部分)
  async generateToolPrompt(taskDescription: string): Promise<string> {
    const recommendedTools = await this.recommendTools(taskDescription, 10);
    
    return recommendedTools.map(tool => `
【工具】${tool.name}
描述:${tool.description}
参数:
${Object.entries(tool.parameters.properties).map(([key, schema]) => 
  `  - ${key} (${schema.type}${schema.required ? ', 必填' : ''}): ${schema.description}`
).join('\n')}
示例:
${tool.examples.map(ex => `  ${JSON.stringify(ex)}`).join('\n')}
`).join('\n---\n');
  }
}

// 使用示例
const registry = new ToolRegistry();

// 注册工具时提供丰富的元数据
registry.register({
  name: "search_restaurants",
  description: "在指定位置搜索餐厅,支持按菜系、价格、评分筛选",
  category: "search",
  tags: ["food", "dining", "location-based"],
  parameters: {
    type: "object",
    properties: {
      location: { type: "string", description: "搜索位置(城市或地址)" },
      cuisine: { type: "string", description: "菜系类型,如 Italian, Chinese" },
      price_range: { type: "string", enum: ["$", "$$", "$$$", "$$$$"] },
      min_rating: { type: "number", minimum: 0, maximum: 5 }
    },
    required: ["location"]
  },
  examples: [
    { location: "San Francisco", cuisine: "Italian", price_range: "$$" },
    { location: "New York", min_rating: 4.5 }
  ],
  execute: async (params) => { /* 实现 */ }
});

// LLM 规划时,自动推荐相关工具
const taskDesc = "帮我找一家靠近金门大桥的海鲜餐厅";
const toolPrompt = await registry.generateToolPrompt(taskDesc);
// 只会推荐 search_restaurants、get_location 等相关工具,不会列出所有 100 个工具

挑战 5:如何让 LLM 理解工具执行的结果?

问题

工具返回的数据可能很复杂(如 JSON 嵌套结构),LLM 如何提取关键信息?

解决方案:结果摘要 + 结构化传递

class ResultProcessor {
  // 自动生成结果摘要
  async summarizeResult(tool: string, result: any): Promise<Summary> {
    // 策略 1:预定义摘要规则(快速)
    const rule = this.getSummaryRule(tool);
    if (rule) {
      return rule.extract(result);
    }
    
    // 策略 2:LLM 生成摘要(灵活)
    const prompt = `
工具:${tool}
原始结果(JSON):
${JSON.stringify(result, null, 2)}

请提取关键信息,生成简洁摘要(50字内)。
同时提取结构化数据供后续任务使用。

返回格式:
{
  "summary": "一句话总结",
  "key_data": {提取的关键字段},
  "next_step_suggestion": "建议下一步做什么"
}
`;
    
    const response = await this.llm.generate(prompt);
    return JSON.parse(response);
  }
}

// 示例:餐厅搜索结果处理
const rawResult = {
  restaurants: [
    {
      id: "rest_123",
      name: "Trattoria Pinocchio",
      rating: 4.6,
      reviews: 1250,
      price: "$$",
      address: "401 Columbus Ave, San Francisco, CA",
      outdoor_seating: true,
      open_hours: { monday: "11:00-22:00", ... },
      menu_url: "https://...",
      phone: "+1-415-xxx-xxxx"
    },
    // ... 更多餐厅
  ],
  total_count: 47,
  search_params: { location: "San Francisco", cuisine: "Italian" }
};

// 自动摘要
const summary = await processor.summarizeResult("search_restaurants", rawResult);

// 输出:
{
  "summary": "找到47家意大利餐厅,推荐评分4.6的Trattoria Pinocchio,有户外座位",
  "key_data": {
    "top_restaurant_id": "rest_123",
    "top_restaurant_name": "Trattoria Pinocchio",
    "has_outdoor": true,
    "total_found": 47
  },
  "next_step_suggestion": "检查 rest_123 的预订可用性"
}

// 在 Prompt 中传递给 LLM 时,使用摘要而非原始数据
const nextPrompt = `
上一步结果:${summary.summary}
关键数据:${JSON.stringify(summary.key_data)}

基于此结果,下一步应该:${summary.next_step_suggestion}

请确认并生成下一步任务。
`;

挑战 6:如何调试和监控 Agent 的行为?

问题

Agent 是"黑盒",如何知道它为什么做出某个决定?如何发现问题?

解决方案:可观测性系统

class AgentObservability {
  // 1. 记录完整的执行轨迹
  async logTrace(sessionId: string, event: TraceEvent) {
    await this.traceDB.insert({
      session_id: sessionId,
      timestamp: Date.now(),
      type: event.type, // 'planning' | 'execution' | 'validation' | 'replan'
      data: event.data,
      llm_prompt: event.llmPrompt, // 保存发给 LLM 的 Prompt
      llm_response: event.llmResponse,
      duration_ms: event.duration
    });
  }
  
  // 2. 实时监控面板
  async getSessionInsights(sessionId: string) {
    const traces = await this.traceDB.query({ session_id: sessionId });
    
    return {
      total_tasks: traces.filter(t => t.type === 'execution').length,
      failed_tasks: traces.filter(t => t.type === 'execution' && !t.data.success).length,
      replanning_count: traces.filter(t => t.type === 'replan').length,
      total_llm_calls: traces.filter(t => t.llm_prompt).length,
      total_tokens: traces.reduce((sum, t) => sum + (t.data.tokens_used || 0), 0),
      execution_time_ms: traces[traces.length - 1].timestamp - traces[0].timestamp,
      
      // 关键指标
      success_rate: this.calculateSuccessRate(traces),
      avg_task_duration: this.calculateAvgDuration(traces),
      
      // 可视化数据
      timeline: this.buildTimeline(traces)
    };
  }
  
  // 3. 异常检测
  async detectAnomalies(sessionId: string): Promise<Anomaly[]> {
    const traces = await this.traceDB.query({ session_id: sessionId });
    const anomalies: Anomaly[] = [];
    
    // 检测 1:任务循环(同一个任务执行多次)
    const taskCounts = new Map<string, number>();
    for (const trace of traces.filter(t => t.type === 'execution')) {
      const taskName = trace.data.task_name;
      taskCounts.set(taskName, (taskCounts.get(taskName) || 0) + 1);
    }
    
    for (const [task, count] of taskCounts.entries()) {
      if (count > 3) {
        anomalies.push({
          type: 'task_loop',
          severity: 'high',
          message: `任务 "${task}" 执行了 ${count} 次,可能陷入循环`,
          recommendation: '检查任务依赖关系和重新规划逻辑'
        });
      }
    }
    
    // 检测 2:频繁重新规划
    const replanCount = traces.filter(t => t.type === 'replan').length;
    if (replanCount > 5) {
      anomalies.push({
        type: 'excessive_replanning',
        severity: 'medium',
        message: `重新规划了 ${replanCount} 次,可能目标不明确或工具不足`,
        recommendation: '优化初始 Prompt 或添加更多工具'
      });
    }
    
    // 检测 3:Token 使用异常
    const totalTokens = traces.reduce((sum, t) => sum + (t.data.tokens_used || 0), 0);
    if (totalTokens > 50000) {
      anomalies.push({
        type: 'high_token_usage',
        severity: 'medium',
        message: `使用了 ${totalTokens} tokens,成本较高`,
        recommendation: '考虑缓存常见查询或使用更小的模型'
      });
    }
    
    return anomalies;
  }
  
  // 4. A/B 测试不同的 Prompt
  async comparePrompts(userQuery: string, promptVersions: string[]) {
    const results = await Promise.all(
      promptVersions.map(async (prompt, idx) => {
        const startTime = Date.now();
        const plan = await this.generatePlan(userQuery, prompt);
        const execution = await this.executePlan(plan);
        const duration = Date.now() - startTime;
        
        return {
          version: `v${idx + 1}`,
          prompt: prompt,
          success: execution.success,
          task_count: plan.tasks.length,
          duration_ms: duration,
          tokens_used: execution.tokens_used,
          user_satisfaction: await this.getUserFeedback(execution.result)
        };
      })
    );
    
    // 生成对比报告
    return this.generateComparisonReport(results);
  }
}

// 使用示例:调试一个失败的 session
const insights = await observability.getSessionInsights("session_abc123");
console.log(`
任务统计:
- 总任务数:${insights.total_tasks}
- 失败任务:${insights.failed_tasks}
- 重新规划:${insights.replanning_count}

性能指标:
- 成功率:${insights.success_rate}%
- 平均任务耗时:${insights.avg_task_duration}ms
- 总 Token 数:${insights.total_tokens}
`);

const anomalies = await observability.detectAnomalies("session_abc123");
if (anomalies.length > 0) {
  console.log("检测到异常:");
  anomalies.forEach(a => {
    console.log(`  [${a.severity}] ${a.message}`);
    console.log(`  建议:${a.recommendation}`);
  });
}

二、实际开发流程

Step 1: 设计工具系统

// 1. 定义工具接口
interface Tool {
  name: string;
  description: string;
  parameters: JSONSchema;
  examples: any[];
  execute: (params: any) => Promise<any>;
  validate?: (result: any) => boolean;
  fallback?: string; // 备用工具名称
}

// 2. 实现具体工具
const weatherTool: Tool = {
  name: "get_weather",
  description: "获取指定地点的天气信息",
  parameters: {
    type: "object",
    properties: {
      location: { type: "string", description: "城市名称或坐标" },
      date: { type: "string", description: "日期,格式 YYYY-MM-DD" }
    },
    required: ["location"]
  },
  examples: [
    { location: "San Francisco", date: "2025-10-01" },
    { location: "Beijing" }
  ],
  execute: async (params) => {
    const response = await fetch(
      `https://api.weather.com/v1/forecast?location=${params.location}&date=${params.date}`
    );
    return response.json();
  },
  validate: (result) => {
    return result.temperature !== undefined && result.condition !== undefined;
  },
  fallback: "get_weather_backup" // 如果主 API 失败,使用备用
};

// 3. 注册到工具库
toolRegistry.register(weatherTool);

Step 2: 构建 Prompt 工程系统

class PromptBuilder {
  private templates = new Map<string, PromptTemplate>();
  private variables = new Map<string, any>();
  
  // 注册 Prompt 模板(支持版本管理)
  registerTemplate(name: string, version: string, template: PromptTemplate) {
    const key = `${name}@${version}`;
    this.templates.set(key, template);
  }
  
  // 构建 Prompt(支持变量替换、条件渲染)
  async build(templateName: string, context: Context): Promise<string> {
    const template = this.templates.get(templateName);
    if (!template) {
      throw new Error(`Template ${templateName} not found`);
    }
    
    let prompt = template.content;
    
    // 1. 替换变量
    for (const [key, value] of Object.entries(context.variables)) {
      prompt = prompt.replace(new RegExp(`{{${key}}}`, 'g'), String(value));
    }
    
    // 2. 条件渲染
    prompt = this.renderConditionals(prompt, context);
    
    // 3. 注入工具列表(动态根据任务推荐)
    if (template.includeTools) {
      const toolPrompt = await this.toolRegistry.generateToolPrompt(context.userQuery);
      prompt = prompt.replace('{{TOOLS}}', toolPrompt);
    }
    
    // 4. 注入历史对话(上下文窗口管理)
    if (template.includeHistory) {
      const history = await this.getRelevantHistory(context.sessionId, context.userQuery);
      prompt = prompt.replace('{{HISTORY}}', history);
    }
    
    return prompt;
  }
  
  // 条件渲染(类似模板引擎)
  private renderConditionals(prompt: string, context: Context): string {
    // 支持 {{#if condition}} ... {{/if}}
    const ifRegex = /{{#if (\w+)}}([\s\S]*?){{\/if}}/g;
    
    return prompt.replace(ifRegex, (match, condition, content) => {
      return context.variables[condition] ? content : '';
    });
  }
  
  // 智能上下文窗口管理(只保留相关历史)
  private async getRelevantHistory(sessionId: string, currentQuery: string): Promise<string> {
    const allHistory = await this.db.getConversationHistory(sessionId);
    
    // 向量搜索:找出与当前查询最相关的历史对话
    const queryEmbedding = await this.embedder.embed(currentQuery);
    const relevantMessages = await this.vectorDB.search({
      vector: queryEmbedding,
      filter: { session_id: sessionId },
      limit: 5
    });
    
    // 格式化为 Prompt
    return relevantMessages.map(msg => 
      `${msg.role}: ${msg.content}`
    ).join('\n');
  }
}

// 使用示例:注册规划 Prompt 模板
promptBuilder.registerTemplate("task_planning", "v2.1", {
  content: `
你是一个任务规划专家。

【用户请求】
{{userQuery}}

【上下文信息】
- 用户位置:{{userLocation}}
- 当前时间:{{currentTime}}
{{#if hasHistory}}
- 相关历史对话:
{{HISTORY}}
{{/if}}

【可用工具】
{{TOOLS}}

【输出要求】
返回 JSON 格式的任务计划,包含 reasoning 和 tasks 数组。
每个任务必须指定 id、tool、params、dependencies。

【重要规则】
1. 优先使用高可靠性工具
2. 避免不必要的串行依赖,尽量并行
3. 每个任务的 description 要清晰说明目的
4. 如果用户请求不明确,添加 clarification_needed 字段

现在开始规划:
`,
  includeTools: true,
  includeHistory: true
});

// 构建 Prompt
const prompt = await promptBuilder.build("[email protected]", {
  variables: {
    userQuery: "帮我订餐厅",
    userLocation: "San Francisco",
    currentTime: "2025-10-01 18:00",
    hasHistory: true
  },
  sessionId: "session_123"
});

Step 3: 实现执行引擎核心逻辑

class ExecutionEngine {
  private toolRegistry: ToolRegistry;
  private validator: TaskValidator;
  private monitor: AgentObservability;
  
  async executePlan(plan: TaskPlan, context: Context): Promise<ExecutionResult> {
    const trace = new ExecutionTrace(context.sessionId);
    
    try {
      // 1. 验证计划合法性
      this.validatePlan(plan);
      
      // 2. 构建任务图(拓扑排序)
      const taskGraph = this.buildTaskGraph(plan.tasks);
      
      // 3. 识别可并行的任务
      const parallelGroups = this.identifyParallelTasks(taskGraph);
      
      // 4. 逐层执行
      for (const [level, tasks] of parallelGroups.entries()) {
        trace.log('execution_level', { level, task_count: tasks.length });
        
        // 并行执行同一层的任务
        const results = await Promise.allSettled(
          tasks.map(task => this.executeTask(task, context, trace))
        );
        
        // 检查是否有失败
        const failures = results.filter(r => r.status === 'rejected');
        if (failures.length > 0) {
          // 判断是否需要重新规划
          if (this.shouldReplan(tasks, failures)) {
            return await this.triggerReplanning(plan, context, failures, trace);
          }
          
          // 否则继续执行(可能有些任务不依赖失败的任务)
          this.handlePartialFailure(tasks, failures);
        }
        
        // 保存本轮结果到上下文
        this.updateContext(context, tasks, results);
      }
      
      // 5. 生成最终回复
      const finalResponse = await this.generateResponse(context, trace);
      
      // 6. 记录完整轨迹
      await this.monitor.saveTrace(trace);
      
      return {
        success: true,
        response: finalResponse,
        metadata: {
          tasks_executed: plan.tasks.length,
          duration_ms: trace.getDuration(),
          tokens_used: trace.getTotalTokens()
        }
      };
      
    } catch (error) {
      trace.log('execution_error', { error: error.message });
      await this.monitor.saveTrace(trace);
      
      return {
        success: false,
        error: error.message,
        partial_results: context.results
      };
    }
  }
  
  // 执行单个任务(核心方法)
  private async executeTask(task: Task, context: Context, trace: ExecutionTrace): Promise<any> {
    const startTime = Date.now();
    
    trace.log('task_start', { task_id: task.id, tool: task.tool });
    
    // 1. 解析参数(替换依赖任务的占位符)
    const resolvedParams = this.resolveDependencies(task, context);
    
    // 2. 获取工具
    const tool = this.toolRegistry.get(task.tool);
    if (!tool) {
      throw new Error(`工具不存在: ${task.tool}`);
    }
    
    // 3. 执行工具(带重试机制)
    const result = await this.executeWithRetry(tool, resolvedParams, task);
    
    // 4. 验证结果
    const validation = await this.validator.validate(task, result);
    if (!validation.valid) {
      trace.log('task_validation_failed', { 
        task_id: task.id, 
        reason: validation.reason 
      });
      
      if (validation.severity === 'error') {
        throw new Error(`任务验证失败: ${validation.reason}`);
      }
    }
    
    // 5. 生成结果摘要
    const summary = await this.summarizeResult(tool.name, result);
    
    const duration = Date.now() - startTime;
    trace.log('task_complete', { 
      task_id: task.id, 
      duration_ms: duration,
      summary: summary.summary
    });
    
    return {
      raw: result,
      summary: summary,
      task_id: task.id
    };
  }
  
  // 解析任务参数中的依赖引用
  private resolveDependencies(task: Task, context: Context): any {
    const params = { ...task.params };
    
    // 遍历所有参数,查找占位符
    for (const [key, value] of Object.entries(params)) {
      if (typeof value === 'string' && value.startsWith('{{')) {
        // 格式: {{task_1.result.restaurant_id}}
        const match = value.match(/{{(\w+)\.(.+)}}/);
        if (match) {
          const [_, taskId, path] = match;
          const taskResult = context.results.get(taskId);
          
          if (!taskResult) {
            throw new Error(`依赖任务 ${taskId} 的结果不存在`);
          }
          
          // 通过路径提取嵌套值
          params[key] = this.getNestedValue(taskResult, path);
        }
      }
    }
    
    return params;
  }
  
  private getNestedValue(obj: any, path: string): any {
    return path.split('.').reduce((current, key) => current?.[key], obj);
  }
  
  // 判断是否需要重新规划
  private shouldReplan(tasks: Task[], failures: any[]): boolean {
    // 规则 1:如果关键任务失败,必须重新规划
    const criticalTasksFailed = tasks.some(t => 
      t.critical && failures.some(f => f.reason.task_id === t.id)
    );
    
    // 规则 2:如果失败率超过 50%,重新规划
    const failureRate = failures.length / tasks.length;
    
    return criticalTasksFailed || failureRate > 0.5;
  }
  
  // 触发重新规划
  private async triggerReplanning(
    originalPlan: TaskPlan,
    context: Context,
    failures: any[],
    trace: ExecutionTrace
  ): Promise<ExecutionResult> {
    trace.log('replanning_triggered', { failure_count: failures.length });
    
    // 构建重新规划的 Prompt
    const replanPrompt = `
原始计划执行遇到问题:

【原始目标】
${originalPlan.expected_outcome}

【已完成的任务】
${Array.from(context.results.entries()).map(([id, result]) => 
  `- ${id}: ${result.summary.summary}`
).join('\n')}

【失败的任务】
${failures.map(f => 
  `- ${f.reason.task_id}: ${f.reason.error}`
).join('\n')}

【可用工具】
${await this.toolRegistry.generateToolPrompt(originalPlan.expected_outcome)}

请分析失败原因并生成新的执行计划。注意:
1. 可以使用已完成任务的结果
2. 可以更换工具或调整参数
3. 如果目标无法达成,请提供替代方案

返回 JSON 格式的新计划。
`;

    // 调用 LLM 生成新计划
    const newPlan = await this.llm.generate(replanPrompt);
    const parsedPlan = JSON.parse(newPlan);
    
    trace.log('new_plan_generated', { task_count: parsedPlan.tasks.length });
    
    // 递归执行新计划
    return this.executePlan(parsedPlan, context);
  }
}

Step 4: 实现结果生成和展示

class ResponseGenerator {
  // 将执行结果转化为用户友好的回复
  async generate(context: Context, trace: ExecutionTrace): Promise<string> {
    // 收集所有任务的摘要
    const summaries = Array.from(context.results.values()).map(r => r.summary);
    
    // 使用 LLM 生成自然语言回复
    const prompt = `
用户原始请求:${context.userQuery}

执行过程:
${summaries.map((s, i) => `${i + 1}. ${s.summary}`).join('\n')}

最终结果:
${JSON.stringify(context.finalResult, null, 2)}

请基于以上信息,生成一个简洁、友好的回复给用户。
要求:
1. 直接回答用户的问题
2. 突出关键信息(使用 markdown 格式)
3. 如果有需要用户确认的,明确提出
4. 语气自然,不要太正式

回复:
`;

    const response = await this.llm.generate(prompt);
    
    // 添加结构化数据(方便前端渲染)
    return {
      text: response,
      structured_data: this.extractStructuredData(context),
      actions: this.suggestNextActions(context)
    };
  }
  
  // 提取结构化数据供前端展示
  private extractStructuredData(context: Context): any {
    // 根据任务类型提取不同的数据
    const taskTypes = new Set(
      Array.from(context.results.values()).map(r => r.task_type)
    );
    
    const structured = {};
    
    if (taskTypes.has('search_restaurants')) {
      structured.restaurants = context.results
        .get('search_restaurants')
        ?.raw.restaurants.slice(0, 3); // 只返回前 3 个
    }
    
    if (taskTypes.has('make_reservation')) {
      structured.reservation = {
        confirmation_code: context.finalResult.confirmation_code,
        restaurant: context.finalResult.restaurant,
        time: context.finalResult.time
      };
    }
    
    return structured;
  }
  
  // 建议下一步操作
  private suggestNextActions(context: Context): Action[] {
    const actions: Action[] = [];
    
    // 如果预订成功,建议添加到日历
    if (context.finalResult.confirmation_code) {
      actions.push({
        type: 'add_to_calendar',
        label: '添加到日历',
        data: {
          title: `餐厅预订: ${context.finalResult.restaurant}`,
          time: context.finalResult.time
        }
      });
    }
    
    // 如果搜索了餐厅但未预订,建议预订
    if (context.results.has('search_restaurants') && !context.results.has('make_reservation')) {
      actions.push({
        type: 'make_reservation',
        label: '立即预订',
        data: {
          restaurant_id: context.results.get('search_restaurants').raw.restaurants[0].id
        }
      });
    }
    
    return actions;
  }
}

三、关键的工程实践

1. Prompt 版本控制和 A/B 测试

// 实际生产中的 Prompt 管理
class PromptVersionControl {
  // 存储在数据库,支持热更新
  async getActivePrompt(name: string): Promise<string> {
    const version = await this.db.query(`
      SELECT content, metadata 
      FROM prompts 
      WHERE name = $1 AND active = true
      ORDER BY version DESC 
      LIMIT 1
    `, [name]);
    
    return version.content;
  }
  
  // A/B 测试:随机选择不同版本的 Prompt
  async getPromptWithABTest(name: string, userId: string): Promise<string> {
    const variants = await this.db.query(`
      SELECT content, variant_id 
      FROM prompt_variants 
      WHERE name = $1 AND active = true
    `, [name]);
    
    // 根据用户 ID 一致性哈希分配变体
    const variantIndex = this.consistentHash(userId, variants.length);
    const selected = variants[variantIndex];
    
    // 记录使用的变体(用于后续分析)
    await this.logVariantUsage(userId, selected.variant_id);
    
    return selected.content;
  }
  
  // 分析不同 Prompt 变体的效果
  async analyzeVariants(name: string): Promise<VariantAnalysis[]> {
    const stats = await this.db.query(`
      SELECT 
        v.variant_id,
        COUNT(*) as usage_count,
        AVG(f.rating) as avg_rating,
        AVG(e.duration_ms) as avg_duration,
        SUM(e.tokens_used) as total_tokens
      FROM prompt_variants v
      JOIN executions e ON e.prompt_variant_id = v.variant_id
      LEFT JOIN feedback f ON f.execution_id = e.id
      WHERE v.name = $1
      GROUP BY v.variant_id
    `, [name]);
    
    return stats.map(s => ({
      variant_id: s.variant_id,
      metrics: {
        usage: s.usage_count,
        satisfaction: s.avg_rating,
        performance: s.avg_duration,
        cost: s.total_tokens * 0.000015 // Claude token 价格
      },
      confidence_interval: this.calculateCI(s)
    }));
  }
}

2. 成本控制和监控

class CostMonitor {
  // 实时跟踪成本
  async trackCost(sessionId: string, event: CostEvent) {
    const cost = this.calculateCost(event);
    
    await this.db.insert('cost_tracking', {
      session_id: sessionId,
      event_type: event.type, // 'llm_call', 'tool_execution'
      provider: event.provider, // 'anthropic', 'openai'
      tokens_used: event.tokens,
      cost_usd: cost,
      timestamp: Date.now()
    });
    
    // 检查是否超过预算
    const sessionTotal = await this.getSessionCost(sessionId);
    if (sessionTotal > this.budgetLimit) {
      throw new BudgetExceededError(`会话成本超过限制: $${sessionTotal}`);
    }
  }
  
  private calculateCost(event: CostEvent): number {
    const pricing = {
      'claude-sonnet-4-5': {
        input: 0.000003,  // $3 per MTok
        output: 0.000015  // $15 per MTok
      },
      'claude-haiku-3': {
        input: 0.00000025,
        output: 0.00000125
      }
    };
    
    const rate = pricing[event.model];
    return (event.input_tokens * rate.input) + (event.output_tokens * rate.output);
  }
  
  // 成本优化建议
  async suggestOptimizations(sessionId: string): Promise<Optimization[]> {
    const analysis = await this.analyzeCostBreakdown(sessionId);
    const suggestions: Optimization[] = [];
    
    // 如果 LLM 调用次数过多
    if (analysis.llm_calls > 10) {
      suggestions.push({
        type: 'reduce_llm_calls',
        impact: 'high',
        suggestion: '考虑缓存常见查询或批量处理任务',
        potential_saving: analysis.llm_cost * 0.4
      });
    }
    
    // 如果使用了大模型处理简单任务
    if (analysis.simple_tasks_with_large_model > 5) {
      suggestions.push({
        type: 'use_smaller_model',
        impact: 'medium',
        suggestion: '简单任务使用 Claude Haiku 而非 Sonnet',
        potential_saving: analysis.simple_task_cost * 0.8
      });
    }
    
    return suggestions;
  }
}

3. 错误恢复和降级策略

class GracefulDegradation {
  // 定义服务降级级别
  private degradationLevels = {
    FULL_SERVICE: 0,      // 所有功能正常
    REDUCE_QUALITY: 1,    // 减少质量但保持功能
    ESSENTIAL_ONLY: 2,    // 只保留核心功能
    FALLBACK_MODE: 3      // 使用备用方案
  };
  
  private currentLevel = this.degradationLevels.FULL_SERVICE;
  
  // 根据系统状态自动降级
  async handleSystemStress(metrics: SystemMetrics) {
    if (metrics.error_rate > 0.3) {
      await this.degradeTo(this.degradationLevels.FALLBACK_MODE);
    } else if (metrics.response_time > 10000) {
      await this.degradeTo(this.degradationLevels.REDUCE_QUALITY);
    } else if (metrics.cost_rate > this.costThreshold) {
      await this.degradeTo(this.degradationLevels.ESSENTIAL_ONLY);
    }
  }
  
  private async degradeTo(level: number) {
    this.currentLevel = level;
    
    switch (level) {
      case this.degradationLevels.REDUCE_QUALITY:
        // 使用更便宜的模型
        this.llmConfig.model = 'claude-haiku-3';
        // 减少工具数量
        this.toolRegistry.disableNonEssential();
        break;
        
      case this.degradationLevels.ESSENTIAL_ONLY:
        // 只保留核心功能
        this.disableFeatures(['advanced_search', 'personalization']);
        // 增加缓存使用
        this.cacheConfig.ttl = 3600; // 1小时
        break;
        
      case this.degradationLevels.FALLBACK_MODE:
        // 使用预定义的规则替代 LLM
        this.useRuleBasedFallback = true;
        // 返回模板化回复
        this.useTemplateResponses = true;
        break;
    }
    
    logger.warn(`System degraded to level ${level}`);
  }
  
  // 规则引擎降级(不使用 LLM)
  private async ruleBasedPlanning(userQuery: string): Promise<TaskPlan> {
    // 使用简单的关键词匹配
    if (userQuery.includes('餐厅') || userQuery.includes('预订')) {
      return {
        tasks: [
          { id: 'task_1', tool: 'search_restaurants', params: {} },
          { id: 'task_2', tool: 'make_reservation', params: {}, dependencies: ['task_1'] }
        ]
      };
    }
    
    if (userQuery.includes('天气')) {
      return {
        tasks: [{ id: 'task_1', tool: 'get_weather', params: {} }]
      };
    }
    
    // 默认返回通用回复
    return {
      tasks: [{ id: 'task_1', tool: 'general_search', params: { query: userQuery } }]
    };
  }
}

四、总结:Manus 开发者的核心能力

必备技能清单

1. Prompt Engineering(最重要)

  • 结构化 Prompt 设计:用严格的格式、明确的输出要求和 JSON Schema 约束 LLM 的回复,避免随机性
  • Few-Shot Learning:在 Prompt 中给 LLM 展示 2-3 个优秀示例,让它模仿正确的输出格式和推理方式
  • Chain-of-Thought 引导:要求 LLM 在 JSON 中包含 reasoning 字段,把思考过程写出来再给答案,提升准确性
  • JSON Schema 约束:用代码验证 LLM 输出的 JSON 结构是否符合预期,不符合就重试或报错

2. 系统设计

  • 任务依赖图解析:用拓扑排序算法理清任务的先后顺序,确保 task_2 依赖的 task_1 结果已经拿到
  • 并发执行优化:识别哪些任务互不依赖可以同时跑,用 Promise.all 并行执行节省时间
  • 错误恢复机制:任务失败时自动重试 3 次,还不行就换备用工具或让 LLM 重新规划
  • 降级策略:系统压力大时自动切换到更便宜的模型、减少功能或用规则引擎替代 LLM

3. LLM 工程

  • Token 优化:只传必要信息给 LLM,用摘要代替原始数据,用缓存避免重复调用,降低成本
  • 流式输出:边生成边返回(像 ChatGPT 打字效果),用户体验更好且不用等所有内容生成完
  • 上下文窗口管理:用向量搜索只保留与当前对话最相关的历史记录,避免超出 token 限制
  • 模型选择策略:简单任务用便宜的 Haiku,复杂推理用强大的 Sonnet,根据任务难度动态选模型

4. 可观测性

  • 执行轨迹记录:把 LLM 的每个决策、工具调用、结果验证全记录到数据库,方便事后分析为什么出错
  • 异常检测:自动发现异常模式(如同一任务执行 10 次、成本突然暴涨),及时预警
  • A/B 测试:同时测试多个版本的 Prompt,用数据说话选出成功率最高的那个
  • 成本监控:实时统计每个会话花了多少钱,超预算就停止或降级,避免账单爆炸

关键设计原则

1. 让 LLM 专注于推理,而非记忆

  • 用工具提供实时数据:不要让 LLM 回忆"旧金山有哪些餐厅",而是调用 Yelp API 工具获取最新真实数据
  • 用向量数据库存储知识:把历史对话、用户偏好等信息存到向量库,需要时通过相似度搜索提取,而不是塞进 Prompt
  • 用规则引擎处理确定性逻辑:对于"订单金额大于100就打9折"这种固定规则,直接写代码判断,不需要问 LLM

2. 永远有 Plan B

  • 每个工具都有备用方案:主 API(如 Yelp)挂了自动切换到备用 API(如 Google Places),不让用户感知到故障
  • 关键路径有降级策略:LLM 调用失败时,改用更便宜的模型或预设的规则引擎继续完成任务
  • LLM 失败时用规则引擎:如果 LLM 连续 3 次都没生成有效计划,就用关键词匹配等简单规则给出基础回复

3. 透明化而非黑盒

  • 记录完整的决策过程:把"为什么选这个餐厅""为什么调用这个工具"等推理过程存到数据库,方便事后审查
  • 可视化执行轨迹:用时间线图表展示 Agent 执行的每个步骤、耗时、成功/失败状态,让开发者一眼看懂流程
  • 支持人工干预:在关键节点(如最终预订前)暂停让用户确认,或允许开发者手动修正 Agent 的决策

4. 持续优化

  • A/B 测试 Prompt:同时跑两个版本的 Prompt(如"请详细推理" vs "请简洁回答"),统计哪个成功率更高
  • 监控成本和性能:追踪每个功能的 token 消耗、响应时间、成功率,发现瓶颈后针对性优化
  • 从失败案例学习:分析失败的会话记录,提取共性问题(如"搜索结果为空时经常卡住"),改进 Prompt 或工具逻辑

这些原则的核心思想就是:把 LLM 当作不稳定但强大的推理引擎,用工程手段保证系统整体的稳定性和可控性

这就是 Manus 背后完整的开发侧实现!核心就是把 LLM 当作"大脑",通过精心设计的 Prompt 和工具系统,让它能够可靠地规划和执行复杂任务。

#人工智能
https://junyiyan.xyz/posts/feed.xml