企业级多智能体系统实战:从架构到生产部署

Yaqin Hei··20分钟阅读

项目背景

在Cognizant,我领导团队为大型企业客户开发IT运维自动化系统。需求很明确:

  • 目标:自动化60-70%的重复性IT请求
  • 场景:密码重置、权限申请、软件安装、故障排查等
  • 约束:必须与现有的ServiceNow系统集成,确保数据安全

单个LLM无法胜任这个任务,因为:

  1. 任务多样性:不同类型请求需要不同处理流程
  2. 工具调用:需要操作多个后端API
  3. 决策复杂度:需要多步推理和规划
  4. 可靠性要求:不能有幻觉或误操作

所以我们设计了一个多智能体协作系统

系统架构

整体设计

                      ┌─────────────────┐
                      │  用户请求       │
                      └────────┬────────┘
                               │
                        ┌──────▼──────┐
                        │ Router Agent │  ← 路由分发
                        └──────┬──────┘
                               │
           ┌───────────────────┼───────────────────┐
           │                   │                   │
      ┌────▼────┐        ┌────▼────┐        ┌────▼────┐
      │Password │        │Software │        │Ticket   │
      │  Agent  │        │ Agent   │        │ Agent   │
      └────┬────┘        └────┬────┘        └────┬────┘
           │                   │                   │
           └───────────────────┼───────────────────┘
                               │
                        ┌──────▼──────┐
                        │ ServiceNow  │  ← 后端系统
                        │    APIs     │
                        └─────────────┘

核心组件

1. Router Agent - 任务路由器

负责理解用户意图,分发到对应的专门Agent。

class RouterAgent:
    """路由Agent:将用户请求分发到专门Agent"""

    def __init__(self):
        self.specialists = {
            "password_reset": PasswordAgent(),
            "software_install": SoftwareAgent(),
            "ticket_query": TicketAgent(),
            "general_qa": QAAgent(),
        }

        self.llm = ChatOpenAI(model="gpt-4", temperature=0)

    def route(self, user_request: str) -> AgentResponse:
        """路由用户请求"""

        # 使用LLM进行意图识别
        prompt = f"""
        分析以下用户请求,判断应该使用哪个Agent处理。

        可选Agents:
        - password_reset: 密码重置、账号锁定
        - software_install: 软件安装、更新
        - ticket_query: 工单查询、状态跟踪
        - general_qa: 一般性问答

        用户请求: {user_request}

        返回JSON格式:
        {{"agent": "agent_name", "confidence": 0.95, "reasoning": "原因"}}
        """

        response = self.llm.invoke(prompt)
        routing_decision = json.loads(response.content)

        # 置信度检查
        if routing_decision["confidence"] < 0.8:
            return self._ask_clarification(user_request)

        # 调用对应Agent
        agent_name = routing_decision["agent"]
        specialist = self.specialists[agent_name]

        return specialist.handle(user_request)

设计要点:

  • 结构化输出:使用JSON保证解析稳定性
  • 置信度阈值:低于0.8时要求用户澄清
  • 日志记录:记录所有路由决策,用于优化

2. Specialist Agents - 专门Agent

每个专门Agent负责特定类型任务。以密码重置为例:

class PasswordAgent:
    """密码重置专门Agent"""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0)
        self.servicenow = ServiceNowClient()

    def handle(self, request: str) -> AgentResponse:
        """处理密码重置请求"""

        # Step 1: 提取关键信息
        user_info = self._extract_user_info(request)

        # Step 2: 验证用户身份
        if not self._verify_user(user_info):
            return AgentResponse(
                success=False,
                message="身份验证失败,请联系IT支持"
            )

        # Step 3: 安全检查
        if not self._security_check(user_info):
            return AgentResponse(
                success=False,
                message="检测到异常,已创建安全工单"
            )

        # Step 4: 调用ServiceNow API重置密码
        try:
            result = self.servicenow.reset_password(
                user_id=user_info["user_id"],
                email=user_info["email"]
            )

            return AgentResponse(
                success=True,
                message=f"密码已重置,临时密码已发送到 {user_info['email']}"
            )
        except Exception as e:
            logging.error(f"Password reset failed: {e}")
            return AgentResponse(
                success=False,
                message="重置失败,已创建人工处理工单"
            )

    def _extract_user_info(self, request: str) -> dict:
        """提取用户信息"""
        prompt = f"""
        从以下请求中提取用户信息:

        请求: {request}

        返回JSON:
        {{
            "user_id": "提取的用户ID",
            "email": "邮箱",
            "department": "部门"
        }}
        """

        response = self.llm.invoke(prompt)
        return json.loads(response.content)

    def _verify_user(self, user_info: dict) -> bool:
        """验证用户身份"""
        # 调用AD或SSO系统验证
        return self.servicenow.verify_user(user_info)

    def _security_check(self, user_info: dict) -> bool:
        """安全检查"""
        # 检查最近是否有异常登录
        # 检查是否是钓鱼攻击
        recent_failures = self.servicenow.get_login_failures(
            user_id=user_info["user_id"],
            hours=24
        )
        return recent_failures < 5  # 24小时内失败不超过5次

3. Tool Layer - 工具层

统一封装所有外部API调用:

class ServiceNowClient:
    """ServiceNow API客户端"""

    def __init__(self):
        self.base_url = os.getenv("SERVICENOW_URL")
        self.auth = (
            os.getenv("SERVICENOW_USER"),
            os.getenv("SERVICENOW_PASSWORD")
        )

    def reset_password(self, user_id: str, email: str) -> dict:
        """重置密码API"""
        endpoint = f"{self.base_url}/api/now/table/sys_user"

        # 生成临时密码
        temp_password = self._generate_temp_password()

        response = requests.post(
            endpoint,
            auth=self.auth,
            json={
                "user_id": user_id,
                "password": temp_password,
                "force_reset": True
            },
            timeout=10
        )

        response.raise_for_status()

        # 发送邮件
        self._send_password_email(email, temp_password)

        return response.json()

    def create_ticket(self, title: str, description: str, priority: str) -> str:
        """创建工单"""
        endpoint = f"{self.base_url}/api/now/table/incident"

        response = requests.post(
            endpoint,
            auth=self.auth,
            json={
                "short_description": title,
                "description": description,
                "priority": priority,
                "assigned_to": "auto_triage"
            },
            timeout=10
        )

        response.raise_for_status()
        ticket_id = response.json()["result"]["number"]

        return ticket_id

关键技术实现

1. Agent间通信协议

定义标准的消息格式:

from pydantic import BaseModel
from typing import Optional, Dict, Any

class AgentMessage(BaseModel):
    """Agent间通信消息"""
    sender: str                    # 发送者Agent ID
    receiver: str                  # 接收者Agent ID
    message_type: str              # task/query/response
    payload: Dict[str, Any]        # 消息内容
    context: Optional[Dict] = {}   # 上下文信息
    timestamp: float               # 时间戳

class AgentResponse(BaseModel):
    """Agent响应"""
    success: bool
    message: str
    data: Optional[Dict] = {}
    next_steps: Optional[list] = []

2. 状态管理

使用Redis存储会话状态:

class ConversationState:
    """会话状态管理"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def save_state(self, session_id: str, state: dict):
        """保存会话状态"""
        key = f"session:{session_id}"
        self.redis.setex(
            key,
            3600,  # 1小时过期
            json.dumps(state)
        )

    def get_state(self, session_id: str) -> dict:
        """获取会话状态"""
        key = f"session:{session_id}"
        data = self.redis.get(key)
        return json.loads(data) if data else {}

    def add_history(self, session_id: str, message: dict):
        """添加对话历史"""
        state = self.get_state(session_id)
        if "history" not in state:
            state["history"] = []

        state["history"].append(message)
        self.save_state(session_id, state)

3. 错误处理与降级

生产环境的可靠性是核心:

class AgentExecutor:
    """Agent执行器,带重试和降级"""

    def __init__(self, max_retries=3):
        self.max_retries = max_retries

    def execute_with_retry(self, agent_func, *args, **kwargs):
        """带重试的执行"""
        for attempt in range(self.max_retries):
            try:
                return agent_func(*args, **kwargs)

            except RateLimitError:
                # API限流,指数退避
                wait_time = 2 ** attempt
                logging.warning(f"Rate limit, retry in {wait_time}s")
                time.sleep(wait_time)

            except APIError as e:
                # API错误,创建人工工单
                logging.error(f"API error: {e}")
                return self._create_fallback_ticket(*args, **kwargs)

            except Exception as e:
                # 未知错误
                logging.error(f"Unexpected error: {e}")
                if attempt == self.max_retries - 1:
                    return self._create_fallback_ticket(*args, **kwargs)

        # 所有重试失败
        return AgentResponse(
            success=False,
            message="系统暂时不可用,已创建人工处理工单"
        )

    def _create_fallback_ticket(self, *args, **kwargs):
        """降级:创建人工处理工单"""
        servicenow = ServiceNowClient()
        ticket_id = servicenow.create_ticket(
            title="AI Agent处理失败",
            description=f"Request: {args}, Error: {kwargs}",
            priority="high"
        )
        return AgentResponse(
            success=False,
            message=f"已创建工单 {ticket_id},人工将在30分钟内处理"
        )

生产环境优化

1. 性能优化

并行处理:

import asyncio

async def parallel_agent_call(agents, request):
    """并行调用多个Agent"""
    tasks = [agent.handle_async(request) for agent in agents]
    results = await asyncio.gather(*tasks)
    return results

# 示例:同时查询多个数据源
results = await parallel_agent_call(
    agents=[TicketAgent(), KnowledgeBaseAgent(), HistoryAgent()],
    request="查询我的所有工单"
)

缓存热点数据:

from functools import lru_cache

@lru_cache(maxsize=1000)
def get_user_permissions(user_id: str) -> list:
    """缓存用户权限(1小时)"""
    return servicenow.get_permissions(user_id)

2. 监控与告警

关键指标:

import prometheus_client as prom

# 定义指标
agent_requests = prom.Counter(
    'agent_requests_total',
    'Total agent requests',
    ['agent_name', 'status']
)

agent_latency = prom.Histogram(
    'agent_latency_seconds',
    'Agent response latency',
    ['agent_name']
)

agent_success_rate = prom.Gauge(
    'agent_success_rate',
    'Agent success rate',
    ['agent_name']
)

# 使用
def handle_request_with_metrics(agent, request):
    start_time = time.time()

    try:
        response = agent.handle(request)
        status = "success" if response.success else "failure"

        agent_requests.labels(
            agent_name=agent.__class__.__name__,
            status=status
        ).inc()

        return response

    finally:
        latency = time.time() - start_time
        agent_latency.labels(
            agent_name=agent.__class__.__name__
        ).observe(latency)

3. 安全性保障

输入验证:

from pydantic import BaseModel, validator

class UserRequest(BaseModel):
    """用户请求模型"""
    request_text: str
    user_id: str
    session_id: str

    @validator('request_text')
    def validate_request(cls, v):
        # 检查恶意输入
        dangerous_patterns = [
            'DROP TABLE', 'DELETE FROM', '<script>',
            'eval(', 'exec(', '__import__'
        ]
        for pattern in dangerous_patterns:
            if pattern.lower() in v.lower():
                raise ValueError("检测到危险输入")

        # 限制长度
        if len(v) > 5000:
            raise ValueError("请求过长")

        return v

权限控制:

def require_permission(permission: str):
    """权限装饰器"""
    def decorator(func):
        def wrapper(self, request, *args, **kwargs):
            user_id = request.user_id

            if not self._check_permission(user_id, permission):
                return AgentResponse(
                    success=False,
                    message=f"权限不足,需要: {permission}"
                )

            return func(self, request, *args, **kwargs)

        return wrapper
    return decorator

class PasswordAgent:
    @require_permission("password_reset")
    def handle(self, request):
        # 处理逻辑
        pass

实际效果

部署6个月后的数据:

指标结果
自动化率68% ✅
平均处理时间从45分钟 → 2分钟
用户满意度4.6/5.0
成本节省$2.3M/年
错误率低于2%

关键成功因素:

  1. ✅ 明确的Agent职责划分
  2. ✅ 完善的错误处理和降级机制
  3. ✅ 严格的安全控制
  4. ✅ 详细的监控和日志

经验教训

做得好的地方

  • 模块化设计:新增Agent很容易
  • 降级机制:从未出现完全不可用
  • 监控体系:问题能快速定位

需要改进

  • 幻觉问题:少数情况下LLM会生成错误信息
  • 成本控制:GPT-4调用成本较高
  • 冷启动优化:第一次请求较慢

未来计划

  • 使用开源模型降低成本
  • 引入检索增强(RAG)减少幻觉
  • 实现更复杂的多Agent协作(如拍卖、投票机制)

总结

多智能体系统是构建复杂AI应用的有效范式。关键是:

  1. 清晰的职责划分:每个Agent专注特定任务
  2. 标准化通信:定义清晰的消息协议
  3. 鲁棒的错误处理:必须有降级方案
  4. 完善的监控:生产环境的生命线
  5. 持续优化:根据实际使用数据改进

从我的经验看,多智能体不是"银弹",但在合适的场景下(任务可分解、需要工具调用、要求高可靠性),它是目前最实用的方案。


代码示例 完整项目代码:GitHub Repo

想交流? 如果你也在构建多智能体系统,欢迎联系我分享经验!

Subscribe for updates

Get the latest AI engineering posts delivered to your inbox.

评论