Distributed Agent Runtime#

Achtung

Die verteilte Agenten-Runtime ist eine experimentelle Funktion. Erwarten Sie Breaking Changes an der API.

Eine verteilte Agenten-Runtime erleichtert die Kommunikation und die Verwaltung des Agentenlebenszyklus über Prozessgrenzen hinweg. Sie besteht aus einem Host-Dienst und mindestens einer Worker-Runtime.

Der Host-Dienst pflegt Verbindungen zu allen aktiven Worker-Runtimes, erleichtert die Nachrichtenübermittlung und verwaltet Sitzungen für alle direkten Nachrichten (d. h. RPCs). Eine Worker-Runtime verarbeitet Anwendungscode (Agenten) und stellt eine Verbindung zum Host-Dienst her. Sie wirbt auch für die von ihr unterstützten Agenten beim Host-Dienst, damit der Host-Dienst Nachrichten an den richtigen Worker liefern kann.

Hinweis

Die verteilte Agenten-Runtime erfordert zusätzliche Abhängigkeiten. Installieren Sie diese mit

pip install "autogen-ext[grpc]"

Wir können einen Host-Dienst mit GrpcWorkerAgentRuntimeHost starten.

from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost

host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start()  # Start a host service in the background.

Der obige Code startet den Host-Dienst im Hintergrund und akzeptiert Worker-Verbindungen auf Port 50051.

Bevor wir Worker-Runtimes ausführen, definieren wir unseren Agenten. Der Agent wird bei jeder empfangenen Nachricht eine neue Nachricht veröffentlichen. Er verfolgt auch, wie viele Nachrichten er veröffentlicht hat, und hört auf, neue Nachrichten zu veröffentlichen, sobald er 5 Nachrichten veröffentlicht hat.

from dataclasses import dataclass

from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler


@dataclass
class MyMessage:
    content: str


@default_subscription
class MyAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__("My agent")
        self._name = name
        self._counter = 0

    @message_handler
    async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:
        self._counter += 1
        if self._counter > 5:
            return
        content = f"{self._name}: Hello x {self._counter}"
        print(content)
        await self.publish_message(MyMessage(content=content), DefaultTopicId())

Jetzt können wir die Worker-Agenten-Runtimes einrichten. Wir verwenden GrpcWorkerAgentRuntime. Wir richten zwei Worker-Runtimes ein. Jede Runtime hostet einen Agenten. Alle Agenten veröffentlichen und abonnieren das Standard-Thema, damit sie alle veröffentlichten Nachrichten sehen können.

Um die Agenten auszuführen, veröffentlichen wir eine Nachricht von einem Worker.

import asyncio

from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime

worker1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker1.start()
await MyAgent.register(worker1, "worker1", lambda: MyAgent("worker1"))

worker2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker2.start()
await MyAgent.register(worker2, "worker2", lambda: MyAgent("worker2"))

await worker2.publish_message(MyMessage(content="Hello!"), DefaultTopicId())

# Let the agents run for a while.
await asyncio.sleep(5)
worker1: Hello x 1
worker2: Hello x 1
worker2: Hello x 2
worker1: Hello x 2
worker1: Hello x 3
worker2: Hello x 3
worker2: Hello x 4
worker1: Hello x 4
worker1: Hello x 5
worker2: Hello x 5

Wir können sehen, dass jeder Agent genau 5 Nachrichten veröffentlicht hat.

Um die Worker-Runtimes zu stoppen, können wir stop() aufrufen.

await worker1.stop()
await worker2.stop()

# To keep the worker running until a termination signal is received (e.g., SIGTERM).
# await worker1.stop_when_signal()

Wir können stop() aufrufen, um den Host-Dienst zu stoppen.

await host.stop()

# To keep the host service running until a termination signal (e.g., SIGTERM)
# await host.stop_when_signal()

Sprachenübergreifende Runtimes#

Der oben beschriebene Prozess ist weitgehend derselbe. Alle Nachrichtentypen MÜSSEN jedoch gemeinsam genutzte Protobuf-Schemas für alle sprachübergreifenden Nachrichtentypen verwenden.

Nächste Schritte#

Vollständige Beispiele für die Verwendung der verteilten Runtime finden Sie in den folgenden Beispielen