Gleichzeitige Agenten#
In diesem Abschnitt untersuchen wir die Verwendung mehrerer gleichzeitig arbeitender Agenten. Wir behandeln drei Hauptmuster
Einzelne Nachricht & Mehrere Prozessoren
Zeigt, wie eine einzelne Nachricht gleichzeitig von mehreren Agenten verarbeitet werden kann, die dasselbe Thema abonniert haben.Mehrere Nachrichten & Mehrere Prozessoren
Veranschaulicht, wie bestimmte Nachrichtentypen basierend auf Themen an dedizierte Agenten weitergeleitet werden können.Direkte Nachrichtenübermittlung
Konzentriert sich auf das Senden von Nachrichten zwischen Agenten und von der Laufzeit an Agenten.
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId,
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
default_subscription,
message_handler,
type_subscription,
)
@dataclass
class Task:
task_id: str
@dataclass
class TaskResponse:
task_id: str
result: str
Einzelne Nachricht & Mehrere Prozessoren#
Das erste Muster zeigt, wie eine einzelne Nachricht gleichzeitig von mehreren Agenten verarbeitet werden kann
Jeder
ProcessorAgent abonniert das Standardthema unter Verwendung des Dekoratorsdefault_subscription().Beim Veröffentlichen einer Nachricht in das Standardthema verarbeiten alle registrierten Agenten die Nachricht unabhängig voneinander.
Hinweis
Unten abonnieren wir Processor mit dem Dekorator default_subscription(). Es gibt eine alternative Möglichkeit, einen Agenten zu abonnieren, ohne überhaupt auf Dekoratoren zurückzugreifen, wie in Themen abonnieren und veröffentlichen gezeigt. Auf diese Weise kann dieselbe Agentenklasse für verschiedene Themen abonniert werden.
@default_subscription
class Processor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"{self._description} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()
await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))
runtime.start()
await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1
Mehrere Nachrichten & Mehrere Prozessoren#
Zweitens demonstriert dieses Muster die Weiterleitung verschiedener Nachrichtentypen an spezifische Prozessoren
UrgentProcessorabonniert das Thema "urgent"NormalProcessorabonniert das Thema "normal"
Wir lassen einen Agenten ein bestimmtes Thema abonnieren, indem wir den Dekorator type_subscription() verwenden.
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Urgent processor starting task {message.task_id}")
await asyncio.sleep(1) # Simulate work
print(f"Urgent processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Normal processor starting task {message.task_id}")
await asyncio.sleep(3) # Simulate work
print(f"Normal processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
Nach der Registrierung der Agenten können wir Nachrichten an die Themen "urgent" und "normal" senden
runtime = SingleThreadedAgentRuntime()
await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))
runtime.start()
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
Ergebnisse sammeln#
Im vorherigen Beispiel haben wir uns zur Überprüfung der Aufgabenerfüllung auf die Konsolenausgabe verlassen. In realen Anwendungen möchten wir die Ergebnisse jedoch typischerweise programmatisch sammeln und verarbeiten.
Um diese Nachrichten zu sammeln, verwenden wir einen ClosureAgent. Wir haben ein dediziertes Thema TASK_RESULTS_TOPIC_TYPE definiert, an das sowohl UrgentProcessor als auch NormalProcessor ihre Ergebnisse senden. Der ClosureAgent verarbeitet dann Nachrichten von diesem Thema.
queue = asyncio.Queue[TaskResponse]()
async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
await queue.put(message)
runtime.start()
CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
runtime,
CLOSURE_AGENT_TYPE,
collect_result,
subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
while not queue.empty():
print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')
Direkte Nachrichten#
Im Gegensatz zu den vorherigen Mustern konzentriert sich dieses Muster auf direkte Nachrichten. Hier demonstrieren wir zwei Möglichkeiten, diese zu senden
Direkte Nachrichtenübermittlung zwischen Agenten
Senden von Nachrichten von der Laufzeit an bestimmte Agenten
Zu beachtende Punkte im folgenden Beispiel
Nachrichten werden mit der
AgentIdadressiert.Der Absender kann eine Antwort vom Zielagenten erwarten.
Wir registrieren die Klasse
WorkerAgentnur einmal; dennoch senden wir Aufgaben an zwei verschiedene Worker.Wie? Wie in Agentenlebenszyklus angegeben, ruft die Laufzeit beim Zustellen einer Nachricht über eine
AgentIddie Instanz ab oder erstellt eine, wenn sie noch nicht existiert. In diesem Fall erstellt die Laufzeit zwei Instanzen von Workern, wenn sie diese beiden Nachrichten sendet.
class WorkerAgent(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"{self.id} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self.id} finished task {message.task_id}")
return TaskResponse(task_id=message.task_id, result=f"Results by {self.id}")
class DelegatorAgent(RoutedAgent):
def __init__(self, description: str, worker_type: str):
super().__init__(description)
self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"Delegator received task {message.task_id}.")
subtask1 = Task(task_id="task-part-1")
subtask2 = Task(task_id="task-part-2")
worker1_result, worker2_result = await asyncio.gather(
self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])
)
combined_result = f"Part 1: {worker1_result.result}, " f"Part 2: {worker2_result.result}"
task_response = TaskResponse(task_id=message.task_id, result=combined_result)
return task_response
runtime = SingleThreadedAgentRuntime()
await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Worker Agent"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Delegator Agent", "worker"))
runtime.start()
delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)
print(f"Final result: {response.result}")
await runtime.stop_when_idle()
Delegator received task main-task.
worker/worker-1 starting task task-part-1
worker/worker-2 starting task task-part-2
worker/worker-1 finished task task-part-1
worker/worker-2 finished task task-part-2
Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2
Zusätzliche Ressourcen#
Wenn Sie mehr über gleichzeitige Verarbeitung erfahren möchten, lesen Sie das Muster Mixture of Agents, das stark auf gleichzeitigen Agenten basiert.