Beendigung mit Intervention Handler#
Hinweis
Diese Methode ist gültig bei Verwendung von SingleThreadedAgentRuntime.
Es gibt viele verschiedene Möglichkeiten, die Beendigung in autogen_core zu handhaben. Letztendlich ist das Ziel, zu erkennen, dass die Laufzeit nicht mehr ausgeführt werden muss, und Sie können mit den Finalisierungsaufgaben fortfahren. Eine Möglichkeit, dies zu tun, besteht darin, einen autogen_core.base.intervention.InterventionHandler zu verwenden, um eine Beendigungsnachricht zu erkennen und dann darauf zu reagieren.
from dataclasses import dataclass
from typing import Any
from autogen_core import (
DefaultInterventionHandler,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
default_subscription,
message_handler,
)
Zuerst definieren wir eine Dataclass für normale Nachrichten und eine Nachricht, die zur Signalisierung der Beendigung verwendet wird.
@dataclass
class Message:
content: Any
@dataclass
class Termination:
reason: str
Wir programmieren unseren Agenten so, dass er eine Beendigungsnachricht veröffentlicht, wenn er entscheidet, dass es Zeit für die Beendigung ist.
@default_subscription
class AnAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("MyAgent")
self.received = 0
@message_handler
async def on_new_message(self, message: Message, ctx: MessageContext) -> None:
self.received += 1
if self.received > 3:
await self.publish_message(Termination(reason="Reached maximum number of messages"), DefaultTopicId())
Als Nächstes erstellen wir einen InterventionHandler, der die Beendigungsnachricht erkennt und darauf reagiert. Dieser Haken wird in Veröffentlichungen eingehakt und wenn er auf Termination trifft, ändert er seinen internen Status, um anzuzeigen, dass die Beendigung angefordert wurde.
class TerminationHandler(DefaultInterventionHandler):
def __init__(self) -> None:
self._termination_value: Termination | None = None
async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any:
if isinstance(message, Termination):
self._termination_value = message
return message
@property
def termination_value(self) -> Termination | None:
return self._termination_value
@property
def has_terminated(self) -> bool:
return self._termination_value is not None
Schließlich fügen wir diesen Handler zur Laufzeit hinzu und verwenden ihn, um die Beendigung zu erkennen und die Laufzeit zu stoppen, wenn die Beendigungsnachricht empfangen wird.
termination_handler = TerminationHandler()
runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])
await AnAgent.register(runtime, "my_agent", AnAgent)
runtime.start()
# Publish more than 3 messages to trigger termination.
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
# Wait for termination.
await runtime.stop_when(lambda: termination_handler.has_terminated)
print(termination_handler.termination_value)
Termination(reason='Reached maximum number of messages')