Gleichzeitige Agenten#

In diesem Abschnitt untersuchen wir die Verwendung mehrerer gleichzeitig arbeitender Agenten. Wir behandeln drei Hauptmuster

  1. Einzelne Nachricht & Mehrere Prozessoren
    Zeigt, wie eine einzelne Nachricht gleichzeitig von mehreren Agenten verarbeitet werden kann, die dasselbe Thema abonniert haben.

  2. Mehrere Nachrichten & Mehrere Prozessoren
    Veranschaulicht, wie bestimmte Nachrichtentypen basierend auf Themen an dedizierte Agenten weitergeleitet werden können.

  3. Direkte Nachrichtenübermittlung
    Konzentriert sich auf das Senden von Nachrichten zwischen Agenten und von der Laufzeit an Agenten.

import asyncio
from dataclasses import dataclass

from autogen_core import (
    AgentId,
    ClosureAgent,
    ClosureContext,
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    default_subscription,
    message_handler,
    type_subscription,
)
@dataclass
class Task:
    task_id: str


@dataclass
class TaskResponse:
    task_id: str
    result: str

Einzelne Nachricht & Mehrere Prozessoren#

Das erste Muster zeigt, wie eine einzelne Nachricht gleichzeitig von mehreren Agenten verarbeitet werden kann

  • Jeder Processor Agent abonniert das Standardthema unter Verwendung des Dekorators default_subscription().

  • Beim Veröffentlichen einer Nachricht in das Standardthema verarbeiten alle registrierten Agenten die Nachricht unabhängig voneinander.

Hinweis

Unten abonnieren wir Processor mit dem Dekorator default_subscription(). Es gibt eine alternative Möglichkeit, einen Agenten zu abonnieren, ohne überhaupt auf Dekoratoren zurückzugreifen, wie in Themen abonnieren und veröffentlichen gezeigt. Auf diese Weise kann dieselbe Agentenklasse für verschiedene Themen abonniert werden.

@default_subscription
class Processor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"{self._description} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()

await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))

runtime.start()

await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())

await runtime.stop_when_idle()
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1

Mehrere Nachrichten & Mehrere Prozessoren#

Zweitens demonstriert dieses Muster die Weiterleitung verschiedener Nachrichtentypen an spezifische Prozessoren

  • UrgentProcessor abonniert das Thema "urgent"

  • NormalProcessor abonniert das Thema "normal"

Wir lassen einen Agenten ein bestimmtes Thema abonnieren, indem wir den Dekorator type_subscription() verwenden.

TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")


@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Urgent processor starting task {message.task_id}")
        await asyncio.sleep(1)  # Simulate work
        print(f"Urgent processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)


@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Normal processor starting task {message.task_id}")
        await asyncio.sleep(3)  # Simulate work
        print(f"Normal processor finished task {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
        await self.publish_message(task_response, topic_id=task_results_topic_id)

Nach der Registrierung der Agenten können wir Nachrichten an die Themen "urgent" und "normal" senden

runtime = SingleThreadedAgentRuntime()

await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))

runtime.start()

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1

Ergebnisse sammeln#

Im vorherigen Beispiel haben wir uns zur Überprüfung der Aufgabenerfüllung auf die Konsolenausgabe verlassen. In realen Anwendungen möchten wir die Ergebnisse jedoch typischerweise programmatisch sammeln und verarbeiten.

Um diese Nachrichten zu sammeln, verwenden wir einen ClosureAgent. Wir haben ein dediziertes Thema TASK_RESULTS_TOPIC_TYPE definiert, an das sowohl UrgentProcessor als auch NormalProcessor ihre Ergebnisse senden. Der ClosureAgent verarbeitet dann Nachrichten von diesem Thema.

queue = asyncio.Queue[TaskResponse]()


async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
    await queue.put(message)


runtime.start()

CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
    runtime,
    CLOSURE_AGENT_TYPE,
    collect_result,
    subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
while not queue.empty():
    print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')

Direkte Nachrichten#

Im Gegensatz zu den vorherigen Mustern konzentriert sich dieses Muster auf direkte Nachrichten. Hier demonstrieren wir zwei Möglichkeiten, diese zu senden

  • Direkte Nachrichtenübermittlung zwischen Agenten

  • Senden von Nachrichten von der Laufzeit an bestimmte Agenten

Zu beachtende Punkte im folgenden Beispiel

  • Nachrichten werden mit der AgentId adressiert.

  • Der Absender kann eine Antwort vom Zielagenten erwarten.

  • Wir registrieren die Klasse WorkerAgent nur einmal; dennoch senden wir Aufgaben an zwei verschiedene Worker.

    • Wie? Wie in Agentenlebenszyklus angegeben, ruft die Laufzeit beim Zustellen einer Nachricht über eine AgentId die Instanz ab oder erstellt eine, wenn sie noch nicht existiert. In diesem Fall erstellt die Laufzeit zwei Instanzen von Workern, wenn sie diese beiden Nachrichten sendet.

class WorkerAgent(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"{self.id} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self.id} finished task {message.task_id}")
        return TaskResponse(task_id=message.task_id, result=f"Results by {self.id}")


class DelegatorAgent(RoutedAgent):
    def __init__(self, description: str, worker_type: str):
        super().__init__(description)
        self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]

    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"Delegator received task {message.task_id}.")

        subtask1 = Task(task_id="task-part-1")
        subtask2 = Task(task_id="task-part-2")

        worker1_result, worker2_result = await asyncio.gather(
            self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])
        )

        combined_result = f"Part 1: {worker1_result.result}, " f"Part 2: {worker2_result.result}"
        task_response = TaskResponse(task_id=message.task_id, result=combined_result)
        return task_response
runtime = SingleThreadedAgentRuntime()

await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Worker Agent"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Delegator Agent", "worker"))

runtime.start()

delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)

print(f"Final result: {response.result}")
await runtime.stop_when_idle()
Delegator received task main-task.
worker/worker-1 starting task task-part-1
worker/worker-2 starting task task-part-2
worker/worker-1 finished task task-part-1
worker/worker-2 finished task task-part-2
Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2

Zusätzliche Ressourcen#

Wenn Sie mehr über gleichzeitige Verarbeitung erfahren möchten, lesen Sie das Muster Mixture of Agents, das stark auf gleichzeitigen Agenten basiert.