
AI Agentç宿¶æç¥ä¸å³çï¼æµå¼å¤çä¸äºä»¶é©±å¨æ¶æå¨å¤§æ¨¡åè½å°åºç¨çè¿ç¨ä¸ï¼ä¸ä¸ªæ ¸å¿çç¾æ¥ç叿¾ï¼LLMæ¨çæ¯æ¹å¤çå¼çï¼èçå®ä¸ççä¿¡æ¯æ¯æµå¼çââè¡ä»·æ³¢å¨ãä¼ æå¨ä¸æ¥ãç¨æ·æ¶æ¯æ¥è¿æ¶å ¥ãå¦ä½è®©Agent卿µå¼ç¯å¢ä¸ä¿æå®æ¶æç¥ä¸å¿«éå³çï¼æä¸ºå·¥ç¨æ¶æçå ³é®å½é¢ãæ¬æå°ä»æµå¼æ°æ®å¤çãäºä»¶è®¢é ãç¶ææºé©±å¨ãä½å»¶è¿å³çå°èåæ§å¶ï¼æå»ºä¸å¥ååºå¼Agentç³»ç»ãä¸ã宿¶æ°æ®æµï¼Agentçç¥ç»ç³»ç»ä¼ ç»AIåºç¨é常æ¯è¯·æ±-ååºæ¨¡å¼ï¼ä½å¨ç©èç½çæ§ãéè交æãå¨çº¿å®¢æçåºæ¯ä¸ï¼æ°æ®æç»äº§çï¼Agentå¿ é¡»å ·å¤ç¥ç»ç³»ç»è¬çè½åââæç»æç¥ã宿¶ååºãæµå¼æ°æ®ä¸æ¹å¤çææ¬è´¨åºå«ï¼æ°æ®æç»å°è¾¾ä¸é¡ºåºä¸å¯éï¼å¤çå»¶è¿è¦æ±æ¯«ç§çº§ï¼æ°æ®éçè®ºä¸æ éï¼å®¹ééä¾èµcheckpointå¢éæ¢å¤ï¼ç¶æç®¡çæ´ä¸ºå¤æã1.2 Agentæµå¼æ¶æçåå±è®¾è®¡ä¸ä¸ªå®æ´ç宿¶Agentæ¶æå¯å为åå±ï¼æ°æ®ééå±ãäºä»¶æ»çº¿å±ãç¶ææºä¸å³ç弿å±ãå¨ä½æ§è¡å±ãäºãäºä»¶è®¢é 䏿¶æ¯æ»çº¿ï¼è§£è¦çæ ¸å¿åºç¡è®¾æ½äºä»¶é©±å¨æ¶æï¼EDAï¼æ¯å®æ¶Agentç³»ç»ççµéã卿ºè½å®¢æåºæ¯ä¸ï¼ç¨æ·æ¶æ¯ãæ 绪åæãç¥è¯åºæ£ç´¢ãLLMçæå¯è½å¹¶å交ç»ï¼äºä»¶é©±å¨è®©æ¯ä¸ªäºä»¶æä¸ºç¬ç«å¯å¤çå®ä½ï¼Agentå¯ä»¥æä¼å çº§çµæ´»è°åº¦ã2.2 åºäºRedis Streamsçäºä»¶æ»çº¿å®ç°import asyncio import json import redis.asyncio as redis from dataclasses import dataclass, asdict from typing import Callable, Dict, List from datetime import datetime dataclass class AgentEvent: event_id: str event_type: str # äºä»¶ç±»åï¼user_message, sensor_data, alert, etc. source: str # äºä»¶æ¥æº payload: Dict # å®é æ°æ® timestamp: float # äºä»¶åçæ¶é´æ³ priority: int 5 # ä¼å 级 1-10ï¼è¶å°è¶ä¼å context_id: str # å ³èçä¸ä¸æ/ä¼è¯ID class EventBus: åºäºRedis Streamsçè½»é级äºä»¶æ»çº¿ def __init__(self, redis_url: str redis://localhost:6379): self.redis redis.from_url(redis_url, decode_responsesTrue) self.subscribers: Dict[str, List[Callable]] {} self.running False async def publish(self, event: AgentEvent, stream: str agent:events) - str: åå¸äºä»¶å°æå®æµ event_data asdict(event) event_id await self.redis.xadd( stream, {data: json.dumps(event_data)}, maxlen10000 # ä¿çæè¿10000æ¡ï¼é²æ¢å åæ éå¢é¿ ) return event_id async def subscribe(self, stream: str, handler: Callable, group: str None): 订é äºä»¶æµï¼æ¯ææ¶è´¹è ç»æ¨¡å¼å®ç°è´è½½åè¡¡ if group: # å建æ¶è´¹è ç»ï¼å¹çæä½ï¼ try: await self.redis.xgroup_create(stream, group, id0, mkstreamTrue) except redis.ResponseError: pass # ç»å·²åå¨ # æ¶è´¹è ç»è¯»åï¼æ¯æå¤å®ä¾è´è½½åè¡¡ while self.running: messages await self.redis.xreadgroup( group, consumer-1, {stream: }, count10, block1000 ) for stream_name, msgs in messages: for msg_id, fields in msgs: event json.loads(fields[data]) try: await handler(AgentEvent(**event)) await self.redis.xack(stream, group, msg_id) except Exception as e