Nachrichten und Kommunikation#

Ein Agent im AutoGen-Kern kann auf Nachrichten reagieren, diese senden und veröffentlichen, und Nachrichten sind das einzige Mittel, mit dem Agenten miteinander kommunizieren können.

Nachrichten#

Nachrichten sind serialisierbare Objekte. Sie können definiert werden mit

  • Einer Unterklasse von Pydantics pydantic.BaseModel, oder

  • Einem Dataclass

Zum Beispiel

from dataclasses import dataclass


@dataclass
class TextMessage:
    content: str
    source: str


@dataclass
class ImageMessage:
    url: str
    source: str

Hinweis

Nachrichten sind reine Daten und sollten keine Logik enthalten.

Nachrichten-Handler#

Wenn ein Agent eine Nachricht empfängt, ruft die Laufzeit den Nachrichten-Handler des Agenten auf (on_message()), der die Nachrichtenverarbeitungslogik des Agenten implementieren sollte. Wenn diese Nachricht vom Agenten nicht verarbeitet werden kann, sollte der Agent eine CantHandleException auslösen.

Die Basisklasse BaseAgent bietet keine Nachrichtenverarbeitungslogik. Es wird nicht empfohlen, die Methode on_message() direkt zu implementieren, es sei denn, für fortgeschrittene Anwendungsfälle.

Entwickler sollten mit der Implementierung der Basisklasse RoutedAgent beginnen, die eine integrierte Nachrichtenweiterleitungsfähigkeit bietet.

Nachrichten nach Typ weiterleiten#

Die Basisklasse RoutedAgent bietet einen Mechanismus zum Verknüpfen von Nachrichtentypen mit Nachrichten-Handlern über den Dekorator message_handler(), sodass Entwickler die Methode on_message() nicht implementieren müssen.

Beispielsweise reagiert der folgende typ-weiterleitende Agent auf TextMessage und ImageMessage mit unterschiedlichen Nachrichten-Handlern

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


class MyAgent(RoutedAgent):
    @message_handler
    async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you said {message.content}!")

    @message_handler
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you sent me {message.url}!")

Erstellen Sie die Agentenlaufzeit und registrieren Sie den Agententyp (siehe Agent und Agentenlaufzeit)

runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')

Testen Sie diesen Agenten mit TextMessage und ImageMessage.

runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()
Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!

Die Laufzeit erstellt automatisch eine Instanz von MyAgent mit der Agenten-ID AgentId("my_agent", "default"), wenn die erste Nachricht zugestellt wird.

Nachrichten desselben Typs weiterleiten#

In einigen Szenarien ist es nützlich, Nachrichten desselben Typs an verschiedene Handler weiterzuleiten. Zum Beispiel sollten Nachrichten von verschiedenen Absender-Agenten unterschiedlich behandelt werden. Sie können den Parameter match des Dekorators message_handler() verwenden.

Der Parameter match verknüpft Handler für denselben Nachrichtentyp mit einer bestimmten Nachricht – er ist zweitrangig zur Weiterleitung nach Nachrichtentyp. Er akzeptiert eine aufrufbare Funktion, die die Nachricht und den MessageContext als Argumente erhält und einen booleschen Wert zurückgibt, der angibt, ob die Nachricht vom dekorierten Handler verarbeitet werden soll. Die aufrufbaren Funktionen werden in alphabetischer Reihenfolge der Handler geprüft.

Hier ist ein Beispiel für einen Agenten, der Nachrichten basierend auf dem Absender-Agenten mit dem Parameter match weiterleitet

class RoutedBySenderAgent(RoutedAgent):
    @message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignore
    async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you sent me {message.url}!")

Der obige Agent verwendet das Feld source der Nachricht, um den Absender-Agenten zu bestimmen. Sie können auch das Feld sender des MessageContext verwenden, um den Absender-Agenten anhand der Agenten-ID zu bestimmen, falls verfügbar.

Lassen Sie uns diesen Agenten mit Nachrichten mit unterschiedlichen source-Werten testen

runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hello from user 1 handler, user1-test, you said Hello, World!!
Hello from user 2 handler, user2-test, you said Hello, World!!
Hello, user2-test, you sent me https://example.com/image.jpg!

Im obigen Beispiel wird die erste ImageMessage nicht verarbeitet, da das Feld source der Nachricht nicht mit der match-Bedingung des Handlers übereinstimmt.

Direkte Nachrichtenübermittlung#

Es gibt zwei Arten der Kommunikation im AutoGen-Kern

  • Direkte Nachrichtenübermittlung: sendet eine direkte Nachricht an einen anderen Agenten.

  • Broadcast: veröffentlicht eine Nachricht an ein Thema.

Betrachten wir zunächst die direkte Nachrichtenübermittlung. Um eine direkte Nachricht an einen anderen Agenten zu senden, verwenden Sie innerhalb eines Nachrichten-Handlers die Methode autogen_core.BaseAgent.send_message(), von der Laufzeit aus die Methode autogen_core.AgentRuntime.send_message(). Das Warten auf Aufrufe dieser Methoden gibt den Rückgabewert des Nachrichten-Handlers des empfangenden Agenten zurück. Wenn der Handler des empfangenden Agenten None zurückgibt, wird None zurückgegeben.

Hinweis

Wenn der aufgerufene Agent während des Wartens des Absenders eine Ausnahme auslöst, wird die Ausnahme an den Absender zurückgegeben.

Anfrage/Antwort#

Die direkte Nachrichtenübermittlung kann für Anfrage/Antwort-Szenarien verwendet werden, bei denen der Absender eine Antwort vom Empfänger erwartet. Der Empfänger kann auf die Nachricht antworten, indem er einen Wert aus seinem Nachrichten-Handler zurückgibt. Man kann dies als Funktionsaufruf zwischen Agenten betrachten.

Betrachten Sie zum Beispiel die folgenden Agenten

from dataclasses import dataclass

from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


@dataclass
class Message:
    content: str


class InnerAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"Hello from inner, {message.content}")


class OuterAgent(RoutedAgent):
    def __init__(self, description: str, inner_agent_type: str):
        super().__init__(description)
        self.inner_agent_id = AgentId(inner_agent_type, self.id.key)

    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")
        # Send a direct message to the inner agent and receives a response.
        response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)
        print(f"Received inner response: {response.content}")

Nach Erhalt einer Nachricht sendet der OuterAgent eine direkte Nachricht an den InnerAgent und erhält eine Nachricht als Antwort.

Wir können diese Agenten testen, indem wir eine Message an den OuterAgent senden.

runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Received message: Hello, World!
Received inner response: Hello from inner, Hello from outer, Hello, World!

Beide Ausgaben werden vom Nachrichten-Handler des OuterAgent erzeugt, die zweite Ausgabe basiert jedoch auf der Antwort des InnerAgent.

Im Allgemeinen eignet sich die direkte Nachrichtenübermittlung für Szenarien, in denen der Absender und der Empfänger eng gekoppelt sind – sie werden zusammen erstellt und der Absender ist mit einer bestimmten Instanz des Empfängers verknüpft. Zum Beispiel führt ein Agent Tool-Aufrufe aus, indem er direkte Nachrichten an eine Instanz von ToolAgent sendet und die Antworten verwendet, um eine Aktions-Beobachtungs-Schleife zu bilden.

Broadcast#

Broadcast ist effektiv das Publish/Subscribe-Modell mit Thema und Abonnement. Lesen Sie Thema und Abonnement, um die Kernkonzepte zu verstehen.

Der Hauptunterschied zwischen direkter Nachrichtenübermittlung und Broadcast besteht darin, dass Broadcast nicht für Anfrage/Antwort-Szenarien verwendet werden kann. Wenn ein Agent eine Nachricht veröffentlicht, ist dies ein Einwegprozess. Er kann keine Antwort von anderen Agenten erhalten, auch wenn der Handler eines empfangenden Agenten einen Wert zurückgibt.

Hinweis

Wenn eine Antwort auf eine veröffentlichte Nachricht gegeben wird, wird sie verworfen.

Hinweis

Wenn ein Agent einen Nachrichtentyp veröffentlicht, für den er abonniert ist, empfängt er die veröffentlichte Nachricht nicht. Dies dient zur Vermeidung von Endlosschleifen.

Themen abonnieren und veröffentlichen#

Typenbasierte Abonnements ordnen Nachrichten, die an Themen eines bestimmten Thema-Typs veröffentlicht werden, Agenten eines bestimmten Agenten-Typs zu. Um einen Agenten, der von RoutedAgent erbt, für ein Thema eines bestimmten Thema-Typs zu abonnieren, können Sie den Klassen-Dekorator type_subscription() verwenden.

Das folgende Beispiel zeigt eine Klasse ReceiverAgent, die Themen des Thema-Typs "default" über den Dekorator type_subscription() abonniert und die empfangenen Nachrichten ausgibt.

from autogen_core import RoutedAgent, message_handler, type_subscription


@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Received a message: {message.content}")

Um eine Nachricht aus dem Handler eines Agenten zu veröffentlichen, verwenden Sie die Methode publish_message() und geben Sie eine TopicId an. Dieser Aufruf muss immer noch abgewartet werden, damit die Laufzeit die Zustellung der Nachricht an alle Abonnenten planen kann, aber er wird immer None zurückgeben. Wenn ein Agent beim Verarbeiten einer veröffentlichten Nachricht eine Ausnahme auslöst, wird dies protokolliert, aber nicht an den veröffentlichenden Agenten zurückgegeben.

Das folgende Beispiel zeigt einen BroadcastingAgent, der beim Empfang einer Nachricht eine Nachricht an ein Thema veröffentlicht.

from autogen_core import TopicId


class BroadcastingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        await self.publish_message(
            Message("Publishing a message from broadcasting agent!"),
            topic_id=TopicId(type="default", source=self.id.key),
        )

BroadcastingAgent veröffentlicht Nachrichten an ein Thema mit dem Typ "default" und der Quelle, die dem Agentenschlüssel der Agenteninstanz zugewiesen ist.

Abonnements werden bei der Agentenlaufzeit registriert, entweder als Teil der Registrierung des Agententyps oder über eine separate API-Methode. Hier registrieren wir TypeSubscription für den empfangenden Agenten mit dem Dekorator type_subscription() und für den Broadcast-Agenten ohne den Dekorator.

from autogen_core import TypeSubscription

runtime = SingleThreadedAgentRuntime()

# Option 1: with type_subscription decorator
# The type_subscription class decorator automatically adds a TypeSubscription to
# the runtime when the agent is registered.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))

# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))

# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(
    Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!

Wie im obigen Beispiel gezeigt, können Sie auch direkt über die Methode publish_message() der Laufzeit eine Nachricht an ein Thema veröffentlichen, ohne eine Agenteninstanz erstellen zu müssen.

Aus der Ausgabe können Sie sehen, dass zwei Nachrichten vom empfangenden Agenten empfangen wurden: eine wurde über die Laufzeit veröffentlicht und die andere vom Broadcast-Agenten.

Standardthema und -abonnements#

Im obigen Beispiel haben wir TopicId und TypeSubscription verwendet, um das Thema bzw. die Abonnements anzugeben. Dies ist für viele Szenarien die richtige Vorgehensweise. Wenn es jedoch einen einzigen Geltungsbereich für die Veröffentlichung gibt, d.h. alle Agenten veröffentlichen und abonnieren alle gesendeten Nachrichten, können wir die Komfortklassen DefaultTopicId und default_subscription() verwenden, um unseren Code zu vereinfachen.

DefaultTopicId dient zur Erstellung eines Themas, das "default" als Standardwert für den Thema-Typ und den Agentenschlüssel des veröffentlichenden Agenten als Standardwert für die Thema-Quelle verwendet. default_subscription() dient zur Erstellung eines Typen-Abonnements, das das Standardthema abonniert. Wir können BroadcastingAgent vereinfachen, indem wir DefaultTopicId und default_subscription() verwenden.

from autogen_core import DefaultTopicId, default_subscription


@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        # Publish a message to all agents in the same namespace.
        await self.publish_message(
            Message("Publishing a message from broadcasting agent!"),
            topic_id=DefaultTopicId(),
        )

Wenn die Laufzeit register() aufruft, um den Agententyp zu registrieren, erstellt sie eine TypeSubscription, deren Thema-Typ "default" als Standardwert verwendet und deren Agenten-Typ denselben Agenten-Typ verwendet, der im selben Kontext registriert wird.

runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(
    runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!

Hinweis

Wenn Ihr Szenario es allen Agenten erlaubt, alle gesendeten Nachrichten zu veröffentlichen und zu abonnieren, verwenden Sie DefaultTopicId und default_subscription(), um Ihre Agentenklassen zu dekorieren.