Ergebnisse mit einem Agenten extrahieren#
Wenn Sie ein Multi-Agenten-System zur Lösung einer Aufgabe ausführen, möchten Sie möglicherweise das Ergebnis des Systems extrahieren, sobald es beendet ist. Diese Anleitung zeigt eine Möglichkeit, dies zu erreichen. Da Agent-Instanzen von außen nicht direkt zugänglich sind, verwenden wir einen Agenten, um das Endergebnis an einem zugänglichen Ort zu veröffentlichen.
Wenn Sie Ihr System so modellieren, dass es einen Typ wie FinalResult veröffentlicht, können Sie einen Agenten erstellen, dessen alleinige Aufgabe es ist, diesen zu abonnieren und ihn extern verfügbar zu machen. Für einfache Agenten wie diesen ist die Klasse ClosureAgent eine Option, um den Boilerplate-Code zu reduzieren. Dies ermöglicht es Ihnen, eine Funktion zu definieren, die als Nachrichtenhandler des Agenten fungiert. In diesem Beispiel verwenden wir eine Warteschlange, die zwischen dem Agenten und dem externen Code geteilt wird, um das Ergebnis zu übergeben.
Hinweis
Wenn Sie überlegen, wie Sie Ergebnisse aus einem Multi-Agenten-System extrahieren, müssen Sie immer die Abonnements des Agenten und die Themen, zu denen er veröffentlicht, berücksichtigen. Das liegt daran, dass der Agent nur Nachrichten von Themen empfängt, die er abonniert hat.
import asyncio
from dataclasses import dataclass
from autogen_core import (
ClosureAgent,
ClosureContext,
DefaultSubscription,
DefaultTopicId,
MessageContext,
SingleThreadedAgentRuntime,
)
Definieren Sie eine Datenklasse für das Endergebnis.
@dataclass
class FinalResult:
value: str
Erstellen Sie eine Warteschlange, um das Ergebnis vom Agenten an den externen Code zu übergeben.
queue = asyncio.Queue[FinalResult]()
Erstellen Sie eine Funktions-Closure für die Ausgabe des Endergebnisses in die Warteschlange. Die Funktion muss der Signatur Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]] entsprechen, wobei T der Typ der Nachricht ist, die der Agent empfangen wird. Sie können Union-Typen verwenden, um mehrere Nachrichtentypen zu verarbeiten.
async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
await queue.put(message)
Lassen Sie uns eine Laufzeitumgebung erstellen und einen ClosureAgent registrieren, der das Endergebnis in die Warteschlange veröffentlicht.
runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')
Wir können das Sammeln der Endergebnisse simulieren, indem wir sie direkt in die Laufzeitumgebung veröffentlichen.
runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()
Wir können uns die Warteschlange ansehen, um das Endergebnis zu sehen.
while not queue.empty():
print((result := await queue.get()).value)
Result 1
Result 2