Distributed Agent Runtime#
Achtung
Die verteilte Agenten-Runtime ist eine experimentelle Funktion. Erwarten Sie Breaking Changes an der API.
Eine verteilte Agenten-Runtime erleichtert die Kommunikation und die Verwaltung des Agentenlebenszyklus über Prozessgrenzen hinweg. Sie besteht aus einem Host-Dienst und mindestens einer Worker-Runtime.
Der Host-Dienst pflegt Verbindungen zu allen aktiven Worker-Runtimes, erleichtert die Nachrichtenübermittlung und verwaltet Sitzungen für alle direkten Nachrichten (d. h. RPCs). Eine Worker-Runtime verarbeitet Anwendungscode (Agenten) und stellt eine Verbindung zum Host-Dienst her. Sie wirbt auch für die von ihr unterstützten Agenten beim Host-Dienst, damit der Host-Dienst Nachrichten an den richtigen Worker liefern kann.
Hinweis
Die verteilte Agenten-Runtime erfordert zusätzliche Abhängigkeiten. Installieren Sie diese mit
pip install "autogen-ext[grpc]"
Wir können einen Host-Dienst mit GrpcWorkerAgentRuntimeHost starten.
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost
host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start() # Start a host service in the background.
Der obige Code startet den Host-Dienst im Hintergrund und akzeptiert Worker-Verbindungen auf Port 50051.
Bevor wir Worker-Runtimes ausführen, definieren wir unseren Agenten. Der Agent wird bei jeder empfangenen Nachricht eine neue Nachricht veröffentlichen. Er verfolgt auch, wie viele Nachrichten er veröffentlicht hat, und hört auf, neue Nachrichten zu veröffentlichen, sobald er 5 Nachrichten veröffentlicht hat.
from dataclasses import dataclass
from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler
@dataclass
class MyMessage:
content: str
@default_subscription
class MyAgent(RoutedAgent):
def __init__(self, name: str) -> None:
super().__init__("My agent")
self._name = name
self._counter = 0
@message_handler
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:
self._counter += 1
if self._counter > 5:
return
content = f"{self._name}: Hello x {self._counter}"
print(content)
await self.publish_message(MyMessage(content=content), DefaultTopicId())
Jetzt können wir die Worker-Agenten-Runtimes einrichten. Wir verwenden GrpcWorkerAgentRuntime. Wir richten zwei Worker-Runtimes ein. Jede Runtime hostet einen Agenten. Alle Agenten veröffentlichen und abonnieren das Standard-Thema, damit sie alle veröffentlichten Nachrichten sehen können.
Um die Agenten auszuführen, veröffentlichen wir eine Nachricht von einem Worker.
import asyncio
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
worker1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker1.start()
await MyAgent.register(worker1, "worker1", lambda: MyAgent("worker1"))
worker2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker2.start()
await MyAgent.register(worker2, "worker2", lambda: MyAgent("worker2"))
await worker2.publish_message(MyMessage(content="Hello!"), DefaultTopicId())
# Let the agents run for a while.
await asyncio.sleep(5)
worker1: Hello x 1
worker2: Hello x 1
worker2: Hello x 2
worker1: Hello x 2
worker1: Hello x 3
worker2: Hello x 3
worker2: Hello x 4
worker1: Hello x 4
worker1: Hello x 5
worker2: Hello x 5
Wir können sehen, dass jeder Agent genau 5 Nachrichten veröffentlicht hat.
Um die Worker-Runtimes zu stoppen, können wir stop() aufrufen.
await worker1.stop()
await worker2.stop()
# To keep the worker running until a termination signal is received (e.g., SIGTERM).
# await worker1.stop_when_signal()
Wir können stop() aufrufen, um den Host-Dienst zu stoppen.
await host.stop()
# To keep the host service running until a termination signal (e.g., SIGTERM)
# await host.stop_when_signal()
Sprachenübergreifende Runtimes#
Der oben beschriebene Prozess ist weitgehend derselbe. Alle Nachrichtentypen MÜSSEN jedoch gemeinsam genutzte Protobuf-Schemas für alle sprachübergreifenden Nachrichtentypen verwenden.
Nächste Schritte#
Vollständige Beispiele für die Verwendung der verteilten Runtime finden Sie in den folgenden Beispielen