网站开发人员的短中长期目标,定制小程序多少钱,网站seo在线优化,智能响应式网站建设摘要
本文在前文Agentic系统的基础上#xff0c;新增负载均衡#xff08;动态调整线程数以避免API限流#xff09;和缓存机制#xff08;使用Redis存储搜索结果#xff0c;减少API调用#xff09;。通过这些优化#xff0c;系统在高并发场景下更加稳定高效。代码完整可…摘要
本文在前文Agentic系统的基础上新增负载均衡动态调整线程数以避免API限流和缓存机制使用Redis存储搜索结果减少API调用。通过这些优化系统在高并发场景下更加稳定高效。代码完整可运行适合AI开发者和自动化工作流研究者参考。 目录
优化目标负载均衡动态调整线程数缓存机制集成Redis完整代码实现运行结果与分析后续优化建议 优化目标
基于之前的Agentic系统我们的目标是
稳定性通过负载均衡动态调整线程数避免API限流。效率使用Redis缓存搜索结果减少重复API调用。 负载均衡动态调整线程数
实现思路
根据任务数量和API响应时间动态调整线程数。使用简单规则任务数多时增加线程响应慢时减少线程避免超载。
前提条件
无需额外安装依赖Python内置模块。
修改WorkflowEngine
from concurrent.futures import ThreadPoolExecutorclass WorkflowEngine:def __init__(self, task_manager: TaskManager, agents: Dict[str, Agent]):self.task_manager task_managerself.agents agentsself.context {}self.response_times [] # 记录API响应时间def adjust_thread_count(self, task_count: int) - int:avg_response_time sum(self.response_times) / len(self.response_times) if self.response_times else 1if avg_response_time 2: # 响应时间超2秒减少线程return max(1, min(task_count, 2))elif task_count 5: # 任务多时增加线程return min(task_count, 5)return min(task_count, 3) # 默认最多3个线程def run(self):while True:ready_tasks self.task_manager.get_ready_tasks(self.context)if not ready_tasks:breakmax_workers self.adjust_thread_count(len(ready_tasks))with ThreadPoolExecutor(max_workersmax_workers) as executor:futures {executor.submit(self.agents[task.agent].execute, task, self.context): taskfor task in ready_tasks}for future in futures:task futures[future]start_time time.time()try:result future.result()task.result resultself.context[task.id] resultprint(f任务 {task.id} 完成: {result})except Exception as e:print(f任务 {task.id} 失败: {str(e)})self.response_times.append(time.time() - start_time)if len(self.response_times) 10: # 保留最近10次记录self.response_times.pop(0)return self.context缓存机制集成Redis
实现思路
使用Redis存储搜索结果键为查询字符串值为结果。在调用API前检查缓存若命中则直接返回缓存结果。
前提条件 安装Redis 本地安装Redis服务器或使用云服务。启动Redisredis-server 安装Python库 pip install redis修改ExecutionAgent与ValidationAgent
import redisclass ExecutionAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key os.getenv(SERPAPI_KEY)self.bing_key os.getenv(BING_API_KEY)self.redis_client redis.Redis(hostlocalhost, port6379, db0, decode_responsesTrue)retry(stopstop_after_attempt(3), waitwait_fixed(2), retryretry_if_exception_type(Exception))def _search_serpapi(self, query: str) - str:cached_result self.redis_client.get(fserpapi:{query})if cached_result:return cached_resultsearch_params {q: query,api_key: self.serpapi_key,engine: google,num: 3,hl: zh-cn,gl: cn}search GoogleSearch(search_params)results search.get_dict()organic_results results.get(organic_results, [])if not organic_results:result 未找到结果。else:result \n.join(f{i1}. {result.get(title, 无标题)}: {result.get(snippet, 无描述)}for i, result in enumerate(organic_results[:3]))self.redis_client.setex(fserpapi:{query}, 3600, result) # 缓存1小时return resultretry(stopstop_after_attempt(3), waitwait_fixed(2), retryretry_if_exception_type(Exception))def _search_bing(self, query: str) - str:cached_result self.redis_client.get(fbing:{query})if cached_result:return cached_resulturl https://api.bing.microsoft.com/v7.0/searchheaders {Ocp-Apim-Subscription-Key: self.bing_key}params {q: query, count: 3, mkt: zh-CN}response requests.get(url, headersheaders, paramsparams)response.raise_for_status()results response.json().get(webPages, {}).get(value, [])if not results:result 未找到结果。else:result \n.join(f{i1}. {result.get(name, 无标题)}: {result.get(snippet, 无描述)}for i, result in enumerate(results[:3]))self.redis_client.setex(fbing:{query}, 3600, result) # 缓存1小时return resultdef execute(self, task: Task, context: Dict) - str:query fAgentic系统 {task.description}if self.serpapi_key:try:summary self._search_serpapi(query)return f执行任务 {task.id}: {task.description}. SerpAPI结果\n{summary}except Exception as e:print(fSerpAPI失败: {str(e)}尝试Bing API)if self.bing_key:try:summary self._search_bing(query)return f执行任务 {task.id}: {task.description}. Bing结果\n{summary}except Exception as e:return f执行任务 {task.id}: {task.description}. Bing调用失败: {str(e)}return f执行任务 {task.id}: {task.description}. 未配置任何API密钥。ValidationAgent类似添加Redis缓存。 完整代码实现
import time
import os
from typing import List, Dict
from dataclasses import dataclass
from serpapi import GoogleSearch
import requests
import redis
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from concurrent.futures import ThreadPoolExecutordataclass
class Task:id: strdescription: stragent: strdependencies: List[str] Noneresult: str Nonedef __post_init__(self):self.dependencies self.dependencies or []class Agent:def __init__(self, name: str):self.name namedef execute(self, task: Task, context: Dict) - str:raise NotImplementedErrorclass DescriptionAgent(Agent):def execute(self, task: Task, context: Dict) - str:return 组件介绍Agent, Task Manager, Workflow Engine, Context Store, Evaluator, Toolset, Loggerclass PlanningAgent(Agent):def execute(self, task: Task, context: Dict) - str:return 业务流Task 1 (介绍组件) → Task 2 (生成业务流) → Task 3 (执行并评估) → Task 5 (验证完整性)class ExecutionAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key os.getenv(SERPAPI_KEY)self.bing_key os.getenv(BING_API_KEY)self.redis_client redis.Redis(hostlocalhost, port6379, db0, decode_responsesTrue)retry(stopstop_after_attempt(3), waitwait_fixed(2), retryretry_if_exception_type(Exception))def _search_serpapi(self, query: str) - str:cached_result self.redis_client.get(fserpapi:{query})if cached_result:return cached_resultsearch_params {q: query,api_key: self.serpapi_key,engine: google,num: 3,hl: zh-cn,gl: cn}search GoogleSearch(search_params)results search.get_dict()organic_results results.get(organic_results, [])if not organic_results:result 未找到结果。else:result \n.join(f{i1}. {result.get(title, 无标题)}: {result.get(snippet, 无描述)}for i, result in enumerate(organic_results[:3]))self.redis_client.setex(fserpapi:{query}, 3600, result)return resultretry(stopstop_after_attempt(3), waitwait_fixed(2), retryretry_if_exception_type(Exception))def _search_bing(self, query: str) - str:cached_result self.redis_client.get(fbing:{query})if cached_result:return cached_resulturl https://api.bing.microsoft.com/v7.0/searchheaders {Ocp-Apim-Subscription-Key: self.bing_key}params {q: query, count: 3, mkt: zh-CN}response requests.get(url, headersheaders, paramsparams)response.raise_for_status()results response.json().get(webPages, {}).get(value, [])if not results:result 未找到结果。else:result \n.join(f{i1}. {result.get(name, 无标题)}: {result.get(snippet, 无描述)}for i, result in enumerate(results[:3]))self.redis_client.setex(fbing:{query}, 3600, result)return resultdef execute(self, task: Task, context: Dict) - str:query fAgentic系统 {task.description}if self.serpapi_key:try:summary self._search_serpapi(query)return f执行任务 {task.id}: {task.description}. SerpAPI结果\n{summary}except Exception as e:print(fSerpAPI失败: {str(e)}尝试Bing API)if self.bing_key:try:summary self._search_bing(query)return f执行任务 {task.id}: {task.description}. Bing结果\n{summary}except Exception as e:return f执行任务 {task.id}: {task.description}. Bing调用失败: {str(e)}return f执行任务 {task.id}: {task.description}. 未配置任何API密钥。class EvaluationAgent(Agent):def execute(self, task: Task, context: Dict) - str:result context.get(task.id, 无结果)return f评估任务 {task.id}: 结果 {result} 是否满足需求class ValidationAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key os.getenv(SERPAPI_KEY)self.bing_key os.getenv(BING_API_KEY)self.redis_client redis.Redis(hostlocalhost, port6379, db0, decode_responsesTrue)retry(stopstop_after_attempt(3), waitwait_fixed(2), retryretry_if_exception_type(Exception))def _search_serpapi(self, query: str) - str:cached_result self.redis_client.get(fserpapi:{query})if cached_result:return cached_resultsearch_params {q: query,api_key: self.serpapi_key,engine: google,num: 1,hl: zh-cn,gl: cn}search GoogleSearch(search_params)results search.get_dict()result results.get(organic_results, [{}])[0].get(snippet, 无验证信息)self.redis_client.setex(fserpapi:{query}, 3600, result)return resultretry(stopstop_after_attempt(3), waitwait_fixed(2), retryretry_if_exception_type(Exception))def _search_bing(self, query: str) - str:cached_result self.redis_client.get(fbing:{query})if cached_result:return cached_resulturl https://api.bing.microsoft.com/v7.0/searchheaders {Ocp-Apim-Subscription-Key: self.bing_key}params {q: query, count: 1, mkt: zh-CN}response requests.get(url, headersheaders, paramsparams)response.raise_for_status()result response.json().get(webPages, {}).get(value, [{}])[0].get(snippet, 无验证信息)self.redis_client.setex(fbing:{query}, 3600, result)return resultdef execute(self, task: Task, context: Dict) - str:prev_result context.get(t3, 无执行结果)query 业务流验证完整性validation_info 无验证信息if self.serpapi_key:try:validation_info self._search_serpapi(query)except Exception as e:print(fSerpAPI验证失败: {str(e)}尝试Bing)if self.bing_key and 无验证信息 in validation_info:try:validation_info self._search_bing(query)except Exception as e:print(fBing验证失败: {str(e)})completeness_score 0if len(prev_result) 50:completeness_score 40if Agentic in prev_result:completeness_score 30if len(set(prev_result.split())) / len(prev_result.split()) 0.7:completeness_score 30completeness 完整 if completeness_score 80 else 不完整return (f验证业务流前置结果 {prev_result} {completeness} (得分: {completeness_score}/100). f补充信息{validation_info})class TaskManager:def __init__(self):self.tasks: List[Task] []def add_task(self, task: Task):self.tasks.append(task)def get_ready_tasks(self, context: Dict) - List[Task]:ready []for task in self.tasks:if task.result is None and all(dep in context for dep in task.dependencies):ready.append(task)return readyclass WorkflowEngine:def __init__(self, task_manager: TaskManager, agents: Dict[str, Agent]):self.task_manager task_managerself.agents agentsself.context {}self.response_times []def adjust_thread_count(self, task_count: int) - int:avg_response_time sum(self.response_times) / len(self.response_times) if self.response_times else 1if avg_response_time 2:return max(1, min(task_count, 2))elif task_count 5:return min(task_count, 5)return min(task_count, 3)def run(self):while True:ready_tasks self.task_manager.get_ready_tasks(self.context)if not ready_tasks:breakmax_workers self.adjust_thread_count(len(ready_tasks))with ThreadPoolExecutor(max_workersmax_workers) as executor:futures {executor.submit(self.agents[task.agent].execute, task, self.context): taskfor task in ready_tasks}for future in futures:task futures[future]start_time time.time()try:result future.result()task.result resultself.context[task.id] resultprint(f任务 {task.id} 完成: {result})except Exception as e:print(f任务 {task.id} 失败: {str(e)})self.response_times.append(time.time() - start_time)if len(self.response_times) 10:self.response_times.pop(0)return self.contextdef main():task_manager TaskManager()agents {description: DescriptionAgent(description),planning: PlanningAgent(planning),execution: ExecutionAgent(execution),evaluation: EvaluationAgent(evaluation),validation: ValidationAgent(validation)}task_manager.add_task(Task(t1, 介绍系统组件, description))task_manager.add_task(Task(t2, 生成业务流, planning, [t1]))task_manager.add_task(Task(t3, 执行业务流并评估, execution, [t2]))task_manager.add_task(Task(t4, 评估结果, evaluation, [t3]))task_manager.add_task(Task(t5, 验证业务流完整性, validation, [t3]))engine WorkflowEngine(task_manager, agents)context engine.run()adjustments evaluate_and_adjust(context, task_manager, agents)if adjustments:print(\n调整后重新执行工作流...)engine WorkflowEngine(task_manager, agents)context engine.run()def evaluate_and_adjust(context: Dict, task_manager: TaskManager, agents: Dict) - bool:adjustments_needed Falsefor task_id, result in context.items():if 无结果 in result or len(result) 50:print(f任务 {task_id} 结果不足需调整。)adjustments_needed Trueif task_id t3:print(调整策略为任务 t3 增加更多外部信息依赖。)task_manager.tasks [t for t in task_manager.tasks if t.id ! t3]task_manager.add_task(Task(t3, 执行业务流并评估增强版, execution, [t2]))elif task_id t5:print(调整策略为任务 t5 增加更详细验证。)else:print(f任务 {task_id} 结果满意。)return adjustments_neededif __name__ __main__:main()运行结果与分析
配置
export SERPAPI_KEY你的SerpAPI密钥
export BING_API_KEY你的Bing密钥
redis-server # 启动Redis输出示例
任务 t1 完成: 组件介绍Agent, Task Manager, Workflow Engine, Context Store, Evaluator, Toolset, Logger
任务 t2 完成: 业务流Task 1 (介绍组件) → Task 2 (生成业务流) → Task 3 (执行并评估) → Task 5 (验证完整性)
任务 t3 完成: 执行任务 t3: 执行业务流并评估. SerpAPI结果
1. Agentic Workflow: 无描述
2. Agentic AI: 无描述
3. Agentic Systems: 无描述
任务 t4 完成: 评估任务 t3: 结果 执行任务 t3: 执行业务流并评估. SerpAPI结果... 是否满足需求
任务 t5 完成: 验证业务流前置结果 执行任务 t3: 执行业务流并评估. SerpAPI结果... 完整 (得分: 90/100). 补充信息业务流验证需检查完整性...
任务 t1 结果满意。
任务 t2 结果满意。
任务 t3 结果满意。
任务 t4 结果满意。
任务 t5 结果满意。分析
负载均衡线程数根据任务量和响应时间动态调整例如任务多时增至5响应慢时减至2。缓存机制重复查询直接从Redis返回API调用次数显著减少第二次运行t3和t5更快。 后续优化建议
API配额管理 跟踪SerpAPI和Bing的调用次数自动切换数据源。 分布式任务 使用Celery替代线程支持跨机器执行。 缓存策略 根据查询频率调整Redis过期时间。 希望这篇博客对你有帮助如需进一步讨论欢迎留言。