autogen_core#
- class Agent(*args, **kwargs)[Quelle]#
Bases:
Protocol- property metadata: AgentMetadata#
Metadaten des Agenten.
- async bind_id_and_runtime(id: AgentId, runtime: AgentRuntime) None[Quelle]#
Funktion zur Bindung einer Agent-Instanz an eine AgentRuntime.
- Parameter:
agent_id (AgentId) – ID des Agenten.
runtime (AgentRuntime) – AgentRuntime-Instanz, an die der Agent gebunden werden soll.
- async on_message(message: Any, ctx: MessageContext) Any[Quelle]#
Nachrichtenhandler für den Agenten. Dies sollte nur vom Runtime aufgerufen werden, nicht von anderen Agenten.
- Parameter:
message (Any) – Empfangene Nachricht. Der Typ ist einer der Typen in subscriptions.
ctx (MessageContext) – Kontext der Nachricht.
- Gibt zurück:
Any – Antwort auf die Nachricht. Kann None sein.
- Löst aus:
CancelledError – Wenn die Nachricht abgebrochen wurde.
CantHandleException – Wenn der Agent die Nachricht nicht bearbeiten kann.
- async save_state() Mapping[str, Any][Quelle]#
Speichert den Zustand des Agenten. Das Ergebnis muss JSON-serialisierbar sein.
- class AgentId(type: str | AgentType, key: str)[Quelle]#
Basiert auf:
objectEine Agenten-ID identifiziert eine Agenten-Instanz innerhalb eines Agenten-Runtimes eindeutig - einschließlich eines verteilten Runtimes. Sie ist die "Adresse" der Agenten-Instanz zum Empfangen von Nachrichten.
Weitere Informationen finden Sie hier: Agenten-Identität und Lebenszyklus
- classmethod from_str(agent_id: str) Self[Quelle]#
Konvertiert einen String im Format
type/keyin eine AgentId.
- class AgentProxy(agent: AgentId, runtime: AgentRuntime)[Quelle]#
Basiert auf:
objectEine Hilfsklasse, die es Ihnen ermöglicht, eine
AgentIdanstelle seiner zugehörigenAgentzu verwenden.- property metadata: Awaitable[AgentMetadata]#
Metadaten des Agenten.
- async send_message(message: Any, *, sender: AgentId, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[Quelle]#
- class AgentRuntime(*args, **kwargs)[Quelle]#
Bases:
Protocol- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[Quelle]#
Sendet eine Nachricht an einen Agenten und erhält eine Antwort.
- Parameter:
message (Any) – Die zu sendende Nachricht.
recipient (AgentId) – Der Agent, an den die Nachricht gesendet werden soll.
sender (AgentId | None, optional) – Agent, der die Nachricht gesendet hat. Sollte **nur** None sein, wenn diese von keinem Agenten gesendet wurde, z. B. direkt an den Runtime extern. Standardwert ist None.
cancellation_token (CancellationToken | None, optional) – Token zum Abbrechen einer laufenden Nachricht. Standardwert ist None.
- Löst aus:
CantHandleException – Wenn der Empfänger die Nachricht nicht bearbeiten kann.
UndeliverableException – Wenn die Nachricht nicht zugestellt werden kann.
Other – Jede andere Ausnahme, die vom Empfänger ausgelöst wird.
- Gibt zurück:
Any – Die Antwort vom Agenten.
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[Quelle]#
Veröffentlicht eine Nachricht an alle Agenten im gegebenen Namensraum oder, falls kein Namensraum angegeben ist, an den Namensraum des Absenders.
Es werden keine Antworten auf Veröffentlichungen erwartet.
- Parameter:
message (Any) – Die zu veröffentlichende Nachricht.
topic_id (TopicId) – Das Thema, zu dem die Nachricht veröffentlicht werden soll.
sender (AgentId | None, optional) – Der Agent, der die Nachricht gesendet hat. Standardwert ist None.
cancellation_token (CancellationToken | None, optional) – Token zum Abbrechen einer laufenden Nachricht. Standardwert ist None.
message_id (str | None, optional) – Die Nachrichten-ID. Wenn None, wird eine neue Nachrichten-ID generiert. Standardwert ist None. Diese Nachrichten-ID muss eindeutig sein und sollte vorzugsweise eine UUID sein.
- Löst aus:
UndeliverableException – Wenn die Nachricht nicht zugestellt werden kann.
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[Quelle]#
Registriert eine Agentenfabrik beim Runtime, die mit einem bestimmten Typ verknüpft ist. Der Typ muss eindeutig sein. Diese API fügt keine Abonnements hinzu.
Hinweis
Dies ist eine Low-Level-API, und normalerweise sollte die register-Methode der Agentenklasse verwendet werden, da diese auch Abonnements automatisch handhabt.
Beispiel
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- Parameter:
type (str) – Der Typ des Agenten, den diese Factory erstellt. Dies ist nicht dasselbe wie der Agentenklassenname. Der type-Parameter wird verwendet, um zwischen verschiedenen Factory-Funktionen anstelle von Agentenklassen zu unterscheiden.
agent_factory (Callable[[], T]) – Die Factory, die den Agenten erstellt, wobei T ein konkreter Agententyp ist. Innerhalb der Factory verwenden Sie autogen_core.AgentInstantiationContext, um auf Variablen wie den aktuellen Runtime und die Agenten-ID zuzugreifen.
expected_class (type[T] | None, optional) – Die erwartete Klasse des Agenten, die zur Runtime-Validierung der Factory verwendet wird. Standardwert ist None. Wenn None, wird keine Validierung durchgeführt.
- async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[Quelle]#
Registriert eine Agenten-Instanz beim Runtime. Der Typ darf wiederverwendet werden, aber jede agent_id muss eindeutig sein. Alle Agenten-Instanzen innerhalb eines Typs müssen vom selben Objekttyp sein. Diese API fügt keine Abonnements hinzu.
Hinweis
Dies ist eine Low-Level-API, und normalerweise sollte die register_instance-Methode der Agentenklasse verwendet werden, da diese auch Abonnements automatisch handhabt.
Beispiel
from dataclasses import dataclass from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def main() -> None: runtime: AgentRuntime = ... # type: ignore agent = MyAgent() await runtime.register_agent_instance( agent_instance=agent, agent_id=AgentId(type="my_agent", key="default") ) import asyncio asyncio.run(main())
- async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[Quelle]#
Versucht, die zugrunde liegende Agenten-Instanz nach Name und Namensraum abzurufen. Dies wird generell nicht empfohlen (daher der lange Name), kann aber in einigen Fällen nützlich sein.
Wenn der zugrunde liegende Agent nicht zugänglich ist, wird eine Ausnahme ausgelöst.
- Parameter:
id (AgentId) – Die Agenten-ID.
type (Type[T], optional) – Der erwartete Typ des Agenten. Standard ist Agent.
- Gibt zurück:
T – Die konkrete Agenteninstanz.
- Löst aus:
LookupError – Wenn der Agent nicht gefunden wird.
NotAccessibleError – Wenn der Agent nicht zugänglich ist, z.B. wenn er sich remote befindet.
TypeError – Wenn der Agent nicht vom erwarteten Typ ist.
- async get(id: AgentId, /, *, lazy: bool = True) AgentId[source]#
- async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
- async save_state() Mapping[str, Any][source]#
Speichert den Zustand der gesamten Laufzeitumgebung, einschließlich aller gehosteten Agenten. Die einzige Möglichkeit, den Zustand wiederherzustellen, ist die Übergabe an
load_state().Die Struktur des Zustands ist implementierungsabhängig und kann jedes JSON-serialisierbare Objekt sein.
- Gibt zurück:
Mapping[str, Any] – Der gespeicherte Zustand.
- async load_state(state: Mapping[str, Any]) None[source]#
Lädt den Zustand der gesamten Laufzeitumgebung, einschließlich aller gehosteten Agenten. Der Zustand sollte derselbe sein wie der von
save_state()zurückgegebene.- Parameter:
state (Mapping[str, Any]) – Der gespeicherte Zustand.
- async agent_metadata(agent: AgentId) AgentMetadata[source]#
Ruft die Metadaten für einen Agenten ab.
- Parameter:
agent (AgentId) – Die Agenten-ID.
- Gibt zurück:
AgentMetadata – Die Agenten-Metadaten.
- async agent_save_state(agent: AgentId) Mapping[str, Any][source]#
Speichert den Zustand eines einzelnen Agenten.
Die Struktur des Zustands ist implementierungsabhängig und kann jedes JSON-serialisierbare Objekt sein.
- Parameter:
agent (AgentId) – Die Agenten-ID.
- Gibt zurück:
Mapping[str, Any] – Der gespeicherte Zustand.
- async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#
Lädt den Zustand eines einzelnen Agenten.
- async add_subscription(subscription: Subscription) None[source]#
Fügt eine neue Abonnement hinzu, das die Laufzeitumgebung bei der Verarbeitung veröffentlichter Nachrichten erfüllen soll.
- Parameter:
subscription (Subscription) – Das hinzuzufügende Abonnement.
- async remove_subscription(id: str) None[source]#
Entfernt ein Abonnement aus der Laufzeitumgebung.
- Parameter:
id (str) – Die ID des zu entfernenden Abonnements.
- Löst aus:
LookupError – Wenn das Abonnement nicht existiert.
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#
Fügt einen neuen Serialisierer für die Nachrichten serialisierung zur Laufzeitumgebung hinzu.
Hinweis: Dies dedupliziert Serialisierer basierend auf den Eigenschaften type_name und data_content_type.
- Parameter:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – Der/die hinzuzufügende(n) Serialisierer.
- class BaseAgent(description: str)[source]#
-
- property metadata: AgentMetadata#
Metadaten des Agenten.
- async bind_id_and_runtime(id: AgentId, runtime: AgentRuntime) None[source]#
Funktion zur Bindung einer Agent-Instanz an eine AgentRuntime.
- Parameter:
agent_id (AgentId) – ID des Agenten.
runtime (AgentRuntime) – AgentRuntime-Instanz, an die der Agent gebunden werden soll.
- property runtime: AgentRuntime#
- final async on_message(message: Any, ctx: MessageContext) Any[source]#
Nachrichtenhandler für den Agenten. Dies sollte nur vom Runtime aufgerufen werden, nicht von anderen Agenten.
- Parameter:
message (Any) – Empfangene Nachricht. Der Typ ist einer der Typen in subscriptions.
ctx (MessageContext) – Kontext der Nachricht.
- Gibt zurück:
Any – Antwort auf die Nachricht. Kann None sein.
- Löst aus:
CancelledError – Wenn die Nachricht abgebrochen wurde.
CantHandleException – Wenn der Agent die Nachricht nicht bearbeiten kann.
- abstract async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
- async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#
Siehe
autogen_core.AgentRuntime.send_message()für weitere Informationen.
- async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
- async save_state() Mapping[str, Any][source]#
Speichert den Zustand des Agenten. Das Ergebnis muss JSON-serialisierbar sein.
- async load_state(state: Mapping[str, Any]) None[source]#
Lädt den Zustand des Agenten, der von save_state erhalten wurde.
- Parameter:
state (Mapping[str, Any]) – Zustand des Agenten. Muss JSON-serialisierbar sein.
- async register_instance(runtime: AgentRuntime, agent_id: AgentId, *, skip_class_subscriptions: bool = True, skip_direct_message_subscription: bool = False) AgentId[source]#
Diese Funktion ähnelt register, wird aber zum Registrieren einer Instanz eines Agenten verwendet. Ein Abonnement, das auf der Agenten-ID basiert, wird erstellt und der Laufzeitumgebung hinzugefügt.
- async classmethod register(runtime: AgentRuntime, type: str, factory: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False, skip_direct_message_subscription: bool = False) AgentType[source]#
Registriert eine virtuelle Unterklasse einer ABC.
Gibt die Unterklasse zurück, um sie als Klassen-Decorator zu verwenden.
- class CacheStore[source]#
Bases:
ABC,Generic[T],ComponentBase[BaseModel]Dieses Protokoll definiert die grundlegende Schnittstelle für Speicher-/Cache-Operationen.
Unterklassen sollten den Lebenszyklus des zugrunde liegenden Speichers handhaben.
- component_type: ClassVar[ComponentType] = 'cache_store'#
Der logische Typ der Komponente.
- abstract get(key: str, default: T | None = None) T | None[source]#
Ruft ein Element aus dem Speicher ab.
- Parameter:
key – Der Schlüssel, der das Element im Speicher identifiziert.
default (optional) – Der Standardwert, der zurückgegeben wird, wenn der Schlüssel nicht gefunden wird. Standard ist None.
- Gibt zurück:
Der Wert, der dem Schlüssel zugeordnet ist, falls gefunden, andernfalls der Standardwert.
- class InMemoryStore[source]#
Bases:
CacheStore[T],Component[InMemoryStoreConfig]- component_provider_override: ClassVar[str | None] = 'autogen_core.InMemoryStore'#
Überschreibe den Anbieter-String für die Komponente. Dies sollte verwendet werden, um zu verhindern, dass interne Modulnamen Teil des Modulnamens werden.
- component_config_schema#
alias of
InMemoryStoreConfig
- get(key: str, default: T | None = None) T | None[source]#
Ruft ein Element aus dem Speicher ab.
- Parameter:
key – Der Schlüssel, der das Element im Speicher identifiziert.
default (optional) – Der Standardwert, der zurückgegeben wird, wenn der Schlüssel nicht gefunden wird. Standard ist None.
- Gibt zurück:
Der Wert, der dem Schlüssel zugeordnet ist, falls gefunden, andernfalls der Standardwert.
- set(key: str, value: T) None[source]#
Setzt ein Element im Speicher.
- Parameter:
key – Der Schlüssel, unter dem das Element gespeichert werden soll.
value – Der Wert, der im Speicher gespeichert werden soll.
- class CancellationToken[source]#
Basiert auf:
objectEin Token zum Abbrechen ausstehender asynchroner Aufrufe
- cancel() None[source]#
Abbrechen ausstehender asynchroner Aufrufe, die mit diesem Abbruch-Token verknüpft sind.
- class AgentInstantiationContext[source]#
Basiert auf:
objectEine statische Klasse, die einen Kontext für die Agenteninstanziierung bereitstellt.
Diese statische Klasse kann verwendet werden, um während der Agenteninstanziierung – innerhalb der Factory-Funktion oder des Konstruktors der Agentenklasse – auf die aktuelle Laufzeit und die Agenten-ID zuzugreifen.
Beispiel
Abrufen der aktuellen Laufzeit und Agenten-ID innerhalb der Factory-Funktion und des Agentenkonstruktors
import asyncio from dataclasses import dataclass from autogen_core import ( AgentId, AgentInstantiationContext, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler, ) @dataclass class TestMessage: content: str class TestAgent(RoutedAgent): def __init__(self, description: str): super().__init__(description) # Get the current runtime -- we don't use it here, but it's available. _ = AgentInstantiationContext.current_runtime() # Get the current agent ID. agent_id = AgentInstantiationContext.current_agent_id() print(f"Current AgentID from constructor: {agent_id}") @message_handler async def handle_test_message(self, message: TestMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") def test_agent_factory() -> TestAgent: # Get the current runtime -- we don't use it here, but it's available. _ = AgentInstantiationContext.current_runtime() # Get the current agent ID. agent_id = AgentInstantiationContext.current_agent_id() print(f"Current AgentID from factory: {agent_id}") return TestAgent(description="Test agent") async def main() -> None: # Create a SingleThreadedAgentRuntime instance. runtime = SingleThreadedAgentRuntime() # Start the runtime. runtime.start() # Register the agent type with a factory function. await runtime.register_factory("test_agent", test_agent_factory) # Send a message to the agent. The runtime will instantiate the agent and call the message handler. await runtime.send_message(TestMessage(content="Hello, world!"), AgentId("test_agent", "default")) # Stop the runtime. await runtime.stop() asyncio.run(main())
- classmethod current_runtime() AgentRuntime[source]#
- class TopicId(type: str, source: str)[source]#
Basiert auf:
objectTopicId definiert den Gültigkeitsbereich einer Broadcast-Nachricht. Im Wesentlichen implementiert die Agentenlaufzeit ein Publish-Subscribe-Modell über ihre Broadcast-API: Beim Veröffentlichen einer Nachricht muss das Thema angegeben werden.
Weitere Informationen finden Sie hier: Topic
- type: str#
Typ des Ereignisses, das diese topic_id enthält. Entspricht der Cloud-Event-Spezifikation.
Muss dem Muster entsprechen: ^[w-.:=]+Z
Erfahren Sie hier mehr: cloudevents/spec
- source: str#
Identifiziert den Kontext, in dem ein Ereignis aufgetreten ist. Entspricht der Cloud-Event-Spezifikation.
Erfahren Sie hier mehr: cloudevents/spec
- class Subscription(*args, **kwargs)[source]#
Bases:
ProtocolAbonnements definieren die Themen, an denen ein Agent interessiert ist.
- property id: str#
Ruft die ID des Abonnements ab.
Implementierungen sollten eine eindeutige ID für das Abonnement zurückgeben. Normalerweise ist dies eine UUID.
- Gibt zurück:
str – ID des Abonnements.
- is_match(topic_id: TopicId) bool[source]#
Prüft, ob eine gegebene topic_id mit dem Abonnement übereinstimmt.
- Parameter:
topic_id (TopicId) – TopicId zum Prüfen.
- Gibt zurück:
bool – True, wenn die topic_id mit dem Abonnement übereinstimmt, andernfalls False.
- map_to_agent(topic_id: TopicId) AgentId[source]#
Ordnet eine topic_id einem Agenten zu. Sollte nur aufgerufen werden, wenn is_match für die gegebene topic_id True zurückgibt.
- Parameter:
topic_id (TopicId) – TopicId zum Zuordnen.
- Gibt zurück:
AgentId – ID des Agenten, der die topic_id verarbeiten soll.
- Löst aus:
CantHandleException – Wenn das Abonnement die topic_id nicht verarbeiten kann.
- class MessageContext(sender: AgentId | None, topic_id: TopicId | None, is_rpc: bool, cancellation_token: CancellationToken, message_id: str)[source]#
Basiert auf:
object- cancellation_token: CancellationToken#
- class UnknownPayload(type_name: str, data_content_type: str, payload: bytes)[source]#
Basiert auf:
object
- class Image(image: Image)[source]#
Basiert auf:
objectStellt ein Bild dar.
Beispiel
Laden eines Bildes von einer URL
from autogen_core import Image from PIL import Image as PILImage import aiohttp import asyncio async def from_url(url: str) -> Image: async with aiohttp.ClientSession() as session: async with session.get(url) as response: content = await response.read() return Image.from_pil(PILImage.open(content)) image = asyncio.run(from_url("https://example.com/image"))
- class RoutedAgent(description: str)[source]#
Bases:
BaseAgentEine Basisklasse für Agenten, die Nachrichten basierend auf dem Nachrichtentyp und optionalen Abgleichfunktionen an Handler weiterleiten.
Um einen weitergeleiteten Agenten zu erstellen, leiten Sie diese Klasse ab und fügen Sie Nachrichtenhandler als Methoden hinzu, die entweder mit dem Dekorator
event()oderrpc()dekoriert sind.Beispiel
from dataclasses import dataclass from autogen_core import MessageContext from autogen_core import RoutedAgent, event, rpc @dataclass class Message: pass @dataclass class MessageWithContent: content: str @dataclass class Response: pass class MyAgent(RoutedAgent): def __init__(self): super().__init__("MyAgent") @event async def handle_event_message(self, message: Message, ctx: MessageContext) -> None: assert ctx.topic_id is not None await self.publish_message(MessageWithContent("event handled"), ctx.topic_id) @rpc(match=lambda message, ctx: message.content == "special") # type: ignore async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response: return Response()
- async on_message_impl(message: Any, ctx: MessageContext) Any | None[source]#
Eine Nachricht verarbeiten, indem sie an den entsprechenden Nachrichtenhandler weitergeleitet wird. Überschreiben Sie diese Methode nicht in Unterklassen. Fügen Sie stattdessen Nachrichtenhandler als Methoden hinzu, die entweder mit dem Dekorator
event()oderrpc()dekoriert sind.
- async on_unhandled_message(message: Any, ctx: MessageContext) None[source]#
Wird aufgerufen, wenn eine Nachricht empfangen wird, für die kein passender Nachrichtenhandler vorhanden ist. Die Standardimplementierung protokolliert eine Info-Nachricht.
- class ClosureAgent(description: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn')[source]#
Bases:
BaseAgent,ClosureContext- property metadata: AgentMetadata#
Metadaten des Agenten.
- property runtime: AgentRuntime#
- async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
- async save_state() Mapping[str, Any][source]#
Closure-Agenten haben keinen Zustand. Daher gibt diese Methode immer ein leeres Wörterbuch zurück.
- async load_state(state: Mapping[str, Any]) None[source]#
Closure-Agenten haben keinen Zustand. Daher tut diese Methode nichts.
- async classmethod register_closure(runtime: AgentRuntime, type: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn', skip_direct_message_subscription: bool = False, description: str = '', subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None = None) AgentType[source]#
Der Closure-Agent ermöglicht es Ihnen, einen Agenten mithilfe einer Closure, d. h. einer Funktion, zu definieren, ohne eine Klasse definieren zu müssen. Er ermöglicht es, Werte aus der Laufzeit zu extrahieren.
Die Closure kann den erwarteten Nachrichtentyp definieren, oder Any kann verwendet werden, um beliebige Nachrichtentypen zu akzeptieren.
Beispiel
import asyncio from autogen_core import SingleThreadedAgentRuntime, MessageContext, ClosureAgent, ClosureContext from dataclasses import dataclass from autogen_core._default_subscription import DefaultSubscription from autogen_core._default_topic import DefaultTopicId @dataclass class MyMessage: content: str async def main(): queue = asyncio.Queue[MyMessage]() async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None: await queue.put(message) runtime = SingleThreadedAgentRuntime() await ClosureAgent.register_closure( runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()] ) runtime.start() await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId()) await runtime.stop_when_idle() result = await queue.get() print(result) asyncio.run(main())
- Parameter:
runtime (AgentRuntime) – Laufzeit, zu der der Agent registriert werden soll
type (str) – Agententyp des registrierten Agenten
closure (Callable[[ClosureContext, T, MessageContext], Awaitable[Any]]) – Closure zur Verarbeitung von Nachrichten
unknown_type_policy (Literal["error", "warn", "ignore"], optional) – Was zu tun ist, wenn ein Typ angetroffen wird, der nicht mit dem Closure-Typ übereinstimmt. Standardmäßig „warn“.
skip_direct_message_subscription (bool, optional) – Abonnement für direkte Nachrichten für diesen Agenten nicht hinzufügen. Standardmäßig False.
description (str, optional) – Beschreibung der Aufgabe des Agenten. Standardmäßig „“.
subscriptions (Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None, optional) – Liste der Abonnements für diesen Closure-Agenten. Standardmäßig None.
- Gibt zurück:
AgentType – Typ des registrierten Agenten
- class ClosureContext(*args, **kwargs)[source]#
Bases:
Protocol
- message_handler(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][Quelle]#
Decorator für generische Nachrichtenhandler.
Fügen Sie diesen Decorator zu Methoden in einer
RoutedAgent-Klasse hinzu, die sowohl Ereignis- als auch RPC-Nachrichten verarbeiten sollen. Diese Methoden müssen eine bestimmte Signatur aufweisen, die eingehalten werden muss, damit sie gültig ist.Die Methode muss eine async-Methode sein.
Die Methode muss mit dem Decorator @message_handler dekoriert sein.
- Die Methode muss genau 3 Argumente haben
self
message: Die zu verarbeitende Nachricht. Diese muss mit dem Nachrichtentyp typ-hinted sein, den sie verarbeiten soll.
ctx: Ein
autogen_core.MessageContext-Objekt.
Die Methode muss typ-hinted sein mit den Nachrichtentypen, die sie als Antwort zurückgeben kann, oder sie kann None zurückgeben, wenn sie nichts zurückgibt.
Handler können mehr als einen Nachrichtentyp verarbeiten, indem sie eine Union der Nachrichtentypen akzeptieren. Sie können auch mehr als einen Nachrichtentyp zurückgeben, indem sie eine Union der Nachrichtentypen zurückgeben.
- Parameter:
func – Die zu dekorierende Funktion.
strict – Wenn True, löst der Handler eine Ausnahme aus, wenn der Nachrichten- oder Rückgabetyp nicht in den Zieltypen enthalten ist. Wenn False, wird stattdessen eine Warnung protokolliert.
match – Eine Funktion, die die Nachricht und den Kontext als Argumente nimmt und einen booleschen Wert zurückgibt. Dies wird für die sekundäre Weiterleitung nach dem Nachrichtentyp verwendet. Für Handler, die denselben Nachrichtentyp adressieren, wird die Match-Funktion in alphabetischer Reihenfolge der Handler angewendet und der erste passende Handler wird aufgerufen, während die anderen übersprungen werden. Wenn None, wird der erste Handler in alphabetischer Reihenfolge, der denselben Nachrichtentyp übereinstimmt, aufgerufen.
- event(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]], MessageHandler[AgentT, ReceivesT, None]] | MessageHandler[AgentT, ReceivesT, None][Quelle]#
Decorator für Ereignisnachrichtenhandler.
Fügen Sie diesen Decorator zu Methoden in einer
RoutedAgent-Klasse hinzu, die Ereignisnachrichten verarbeiten sollen. Diese Methoden müssen eine bestimmte Signatur aufweisen, die eingehalten werden muss, damit sie gültig ist.Die Methode muss eine async-Methode sein.
Die Methode muss mit dem Decorator @message_handler dekoriert sein.
- Die Methode muss genau 3 Argumente haben
self
message: Die zu verarbeitende Ereignisnachricht. Diese muss mit dem Nachrichtentyp typ-hinted sein, den sie verarbeiten soll.
ctx: Ein
autogen_core.MessageContext-Objekt.
Die Methode muss None zurückgeben.
Handler können mehr als einen Nachrichtentyp verarbeiten, indem sie eine Union der Nachrichtentypen akzeptieren.
- Parameter:
func – Die zu dekorierende Funktion.
strict – Wenn True, löst der Handler eine Ausnahme aus, wenn der Nachrichtentyp nicht in den Zieltypen enthalten ist. Wenn False, wird stattdessen eine Warnung protokolliert.
match – Eine Funktion, die die Nachricht und den Kontext als Argumente nimmt und einen booleschen Wert zurückgibt. Dies wird für die sekundäre Weiterleitung nach dem Nachrichtentyp verwendet. Für Handler, die denselben Nachrichtentyp adressieren, wird die Match-Funktion in alphabetischer Reihenfolge der Handler angewendet und der erste passende Handler wird aufgerufen, während die anderen übersprungen werden. Wenn None, wird der erste Handler in alphabetischer Reihenfolge, der denselben Nachrichtentyp übereinstimmt, aufgerufen.
- rpc(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][Quelle]#
Decorator für RPC-Nachrichtenhandler.
Fügen Sie diesen Decorator zu Methoden in einer
RoutedAgent-Klasse hinzu, die RPC-Nachrichten verarbeiten sollen. Diese Methoden müssen eine bestimmte Signatur aufweisen, die eingehalten werden muss, damit sie gültig ist.Die Methode muss eine async-Methode sein.
Die Methode muss mit dem Decorator @message_handler dekoriert sein.
- Die Methode muss genau 3 Argumente haben
self
message: Die zu verarbeitende Nachricht. Diese muss mit dem Nachrichtentyp typ-hinted sein, den sie verarbeiten soll.
ctx: Ein
autogen_core.MessageContext-Objekt.
Die Methode muss typ-hinted sein mit den Nachrichtentypen, die sie als Antwort zurückgeben kann, oder sie kann None zurückgeben, wenn sie nichts zurückgibt.
Handler können mehr als einen Nachrichtentyp verarbeiten, indem sie eine Union der Nachrichtentypen akzeptieren. Sie können auch mehr als einen Nachrichtentyp zurückgeben, indem sie eine Union der Nachrichtentypen zurückgeben.
- Parameter:
func – Die zu dekorierende Funktion.
strict – Wenn True, löst der Handler eine Ausnahme aus, wenn der Nachrichten- oder Rückgabetyp nicht in den Zieltypen enthalten ist. Wenn False, wird stattdessen eine Warnung protokolliert.
match – Eine Funktion, die die Nachricht und den Kontext als Argumente nimmt und einen booleschen Wert zurückgibt. Dies wird für die sekundäre Weiterleitung nach dem Nachrichtentyp verwendet. Für Handler, die denselben Nachrichtentyp adressieren, wird die Match-Funktion in alphabetischer Reihenfolge der Handler angewendet und der erste passende Handler wird aufgerufen, während die anderen übersprungen werden. Wenn None, wird der erste Handler in alphabetischer Reihenfolge, der denselben Nachrichtentyp übereinstimmt, aufgerufen.
- class TypeSubscription(topic_type: str, agent_type: str | AgentType, id: str | None = None)[Quelle]#
Bases:
SubscriptionDiese Subscription passt auf Topics basierend auf dem Typ und ordnet Agenten zu, indem sie die Quelle des Topics als Agentenschlüssel verwendet.
Diese Subscription führt dazu, dass jede Quelle eine eigene Agentinstanz hat.
Beispiel
from autogen_core import TypeSubscription subscription = TypeSubscription(topic_type="t1", agent_type="a1")
In diesem Fall
Eine Topic-ID mit Typ t1 und Quelle s1 wird von einem Agenten des Typs a1 mit dem Schlüssel s1 verarbeitet.
Eine Topic-ID mit Typ t1 und Quelle s2 wird von einem Agenten des Typs a1 mit dem Schlüssel s2 verarbeitet.
- Parameter:
- property id: str#
Ruft die ID des Abonnements ab.
Implementierungen sollten eine eindeutige ID für das Abonnement zurückgeben. Normalerweise ist dies eine UUID.
- Gibt zurück:
str – ID des Abonnements.
- is_match(topic_id: TopicId) bool[Quelle]#
Prüft, ob eine gegebene topic_id mit dem Abonnement übereinstimmt.
- Parameter:
topic_id (TopicId) – TopicId zum Prüfen.
- Gibt zurück:
bool – True, wenn die topic_id mit dem Abonnement übereinstimmt, andernfalls False.
- map_to_agent(topic_id: TopicId) AgentId[Quelle]#
Ordnet eine topic_id einem Agenten zu. Sollte nur aufgerufen werden, wenn is_match für die gegebene topic_id True zurückgibt.
- Parameter:
topic_id (TopicId) – TopicId zum Zuordnen.
- Gibt zurück:
AgentId – ID des Agenten, der die topic_id verarbeiten soll.
- Löst aus:
CantHandleException – Wenn das Abonnement die topic_id nicht verarbeiten kann.
- class DefaultSubscription(topic_type: str = 'default', agent_type: str | AgentType | None = None)[Quelle]#
Bases:
TypeSubscriptionDie Standard-Subscription ist als sinnvolle Standardeinstellung für Anwendungen konzipiert, die nur einen globalen Geltungsbereich für Agenten benötigen.
Dieses Topic verwendet standardmäßig den Topic-Typ „default“ und versucht, den zu verwendenden Agent-Typ anhand des Instanziierungskontexts zu erkennen.
- Parameter:
topic_type (str, optional) – Der Topic-Typ, zu dem abonniert werden soll. Standard ist „default“.
agent_type (str, optional) – Der Agent-Typ, der für die Subscription verwendet werden soll. Standard ist None, in diesem Fall wird versucht, den Agent-Typ anhand des Instanziierungskontexts zu erkennen.
- class DefaultTopicId(type: str = 'default', source: str | None = None)[Quelle]#
Bases:
TopicIdDefaultTopicId bietet eine sinnvolle Standardeinstellung für die Felder topic_id und source einer TopicId.
Wenn es im Kontext eines Nachrichtenhandlers erstellt wird, wird die Quelle auf die agent_id des Nachrichtenhandlers gesetzt, andernfalls wird sie auf „default“ gesetzt.
- Parameter:
type (str, optional) – Topic-Typ, zu dem eine Nachricht gesendet werden soll. Standard ist „default“.
source (str | None, optional) – Topic-Quelle, zu der eine Nachricht gesendet werden soll. Wenn None, wird die Quelle auf die agent_id des Nachrichtenhandlers gesetzt, wenn sie sich im Kontext eines Nachrichtenhandlers befindet, andernfalls auf „default“. Standard ist None.
- default_subscription(cls: Type[BaseAgentType] | None = None) Callable[[Type[BaseAgentType]], Type[BaseAgentType]] | Type[BaseAgentType][Quelle]#
- class TypePrefixSubscription(topic_type_prefix: str, agent_type: str | AgentType, id: str | None = None)[Quelle]#
Bases:
SubscriptionDieses Abonnement gleicht Themen anhand eines Präfixes ihres Typs ab und ordnet sie Agenten zu, indem die Quelle des Themas als Agentenschlüssel verwendet wird.
Diese Subscription führt dazu, dass jede Quelle eine eigene Agentinstanz hat.
Beispiel
from autogen_core import TypePrefixSubscription subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")
In diesem Fall
Eine Topic-ID mit Typ t1 und Quelle s1 wird von einem Agenten des Typs a1 mit dem Schlüssel s1 verarbeitet.
Eine Topic-ID mit Typ t1 und Quelle s2 wird von einem Agenten des Typs a1 mit dem Schlüssel s2 verarbeitet.
Ein Thema mit der ID t1SUFFIX und der Quelle s2 wird von einem Agenten vom Typ a1 mit dem Schlüssel s2 behandelt.
- Parameter:
- property id: str#
Ruft die ID des Abonnements ab.
Implementierungen sollten eine eindeutige ID für das Abonnement zurückgeben. Normalerweise ist dies eine UUID.
- Gibt zurück:
str – ID des Abonnements.
- is_match(topic_id: TopicId) bool[Quelle]#
Prüft, ob eine gegebene topic_id mit dem Abonnement übereinstimmt.
- Parameter:
topic_id (TopicId) – TopicId zum Prüfen.
- Gibt zurück:
bool – True, wenn die topic_id mit dem Abonnement übereinstimmt, andernfalls False.
- map_to_agent(topic_id: TopicId) AgentId[Quelle]#
Ordnet eine topic_id einem Agenten zu. Sollte nur aufgerufen werden, wenn is_match für die gegebene topic_id True zurückgibt.
- Parameter:
topic_id (TopicId) – TopicId zum Zuordnen.
- Gibt zurück:
AgentId – ID des Agenten, der die topic_id verarbeiten soll.
- Löst aus:
CantHandleException – Wenn das Abonnement die topic_id nicht verarbeiten kann.
- JSON_DATA_CONTENT_TYPE = 'application/json'#
Der Inhaltstyp für JSON-Daten.
- PROTOBUF_DATA_CONTENT_TYPE = 'application/x-protobuf'#
Der Inhaltstyp für Protobuf-Daten.
- class SingleThreadedAgentRuntime(*, intervention_handlers: List[InterventionHandler] | None = None, tracer_provider: TracerProvider | None = None, ignore_unhandled_exceptions: bool = True)[Quelle]#
Bases:
AgentRuntimeEine Single-Threaded-Agent-Laufzeitumgebung, die alle Nachrichten über eine einzige asyncio-Warteschlange verarbeitet. Nachrichten werden in der Reihenfolge ihres Empfangs zugestellt und die Laufzeitumgebung verarbeitet jede Nachricht in einem separaten asyncio-Task parallel.
Hinweis
Diese Laufzeitumgebung eignet sich für die Entwicklung und eigenständige Anwendungen. Sie ist nicht für Szenarien mit hohem Durchsatz oder hoher Nebenläufigkeit geeignet.
- Parameter:
intervention_handlers (List[InterventionHandler], optional) – Eine Liste von Interventionshandlern, die Nachrichten abfangen können, bevor sie gesendet oder veröffentlicht werden. Standardmäßig None.
tracer_provider (TracerProvider, optional) – Der zu verwendende Tracer-Provider für das Tracing. Standardmäßig None. Sie können zusätzlich die Umgebungsvariable AUTOGEN_DISABLE_RUNTIME_TRACING auf true setzen, um die Telemetrie der Agent-Laufzeitumgebung zu deaktivieren, wenn Sie keinen Zugriff auf den Konstruktor der Laufzeitumgebung haben. Zum Beispiel, wenn Sie ComponentConfig verwenden.
ignore_unhandled_exceptions (bool, optional) – Ob unbehandelte Ausnahmen in Agent-Ereignishandlern ignoriert werden sollen. Alle Hintergrundausnahmen werden beim nächsten Aufruf von process_next oder von einem ausstehenden stop, stop_when_idle oder stop_when ausgelöst. Beachten Sie, dass dies nicht für RPC-Handler gilt. Standardmäßig True.
Examples
Ein einfaches Beispiel für die Erstellung einer Laufzeitumgebung, die Registrierung eines Agenten, das Senden einer Nachricht und das Beenden der Laufzeitumgebung
import asyncio from dataclasses import dataclass from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): @message_handler async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") async def main() -> None: # Create a runtime and register the agent runtime = SingleThreadedAgentRuntime() await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent")) # Start the runtime, send a message and stop the runtime runtime.start() await runtime.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default")) await runtime.stop() asyncio.run(main())
Ein Beispiel für die Erstellung einer Laufzeitumgebung, die Registrierung eines Agenten, das Veröffentlichen einer Nachricht und das Beenden der Laufzeitumgebung
import asyncio from dataclasses import dataclass from autogen_core import ( DefaultTopicId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, default_subscription, message_handler, ) @dataclass class MyMessage: content: str # The agent is subscribed to the default topic. @default_subscription class MyAgent(RoutedAgent): @message_handler async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") async def main() -> None: # Create a runtime and register the agent runtime = SingleThreadedAgentRuntime() await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent")) # Start the runtime. runtime.start() # Publish a message to the default topic that the agent is subscribed to. await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId()) # Wait for the message to be processed and then stop the runtime. await runtime.stop_when_idle() asyncio.run(main())
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[Quelle]#
Sendet eine Nachricht an einen Agenten und erhält eine Antwort.
- Parameter:
message (Any) – Die zu sendende Nachricht.
recipient (AgentId) – Der Agent, an den die Nachricht gesendet werden soll.
sender (AgentId | None, optional) – Agent, der die Nachricht gesendet hat. Sollte **nur** None sein, wenn diese von keinem Agenten gesendet wurde, z. B. direkt an den Runtime extern. Standardwert ist None.
cancellation_token (CancellationToken | None, optional) – Token zum Abbrechen einer laufenden Nachricht. Standardwert ist None.
- Löst aus:
CantHandleException – Wenn der Empfänger die Nachricht nicht bearbeiten kann.
UndeliverableException – Wenn die Nachricht nicht zugestellt werden kann.
Other – Jede andere Ausnahme, die vom Empfänger ausgelöst wird.
- Gibt zurück:
Any – Die Antwort vom Agenten.
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[Quelle]#
Veröffentlicht eine Nachricht an alle Agenten im gegebenen Namensraum oder, falls kein Namensraum angegeben ist, an den Namensraum des Absenders.
Es werden keine Antworten auf Veröffentlichungen erwartet.
- Parameter:
message (Any) – Die zu veröffentlichende Nachricht.
topic_id (TopicId) – Das Thema, zu dem die Nachricht veröffentlicht werden soll.
sender (AgentId | None, optional) – Der Agent, der die Nachricht gesendet hat. Standardwert ist None.
cancellation_token (CancellationToken | None, optional) – Token zum Abbrechen einer laufenden Nachricht. Standardwert ist None.
message_id (str | None, optional) – Die Nachrichten-ID. Wenn None, wird eine neue Nachrichten-ID generiert. Standardwert ist None. Diese Nachrichten-ID muss eindeutig sein und sollte vorzugsweise eine UUID sein.
- Löst aus:
UndeliverableException – Wenn die Nachricht nicht zugestellt werden kann.
- async save_state() Mapping[str, Any][Quelle]#
Speichert den Zustand aller instanziierten Agenten.
Diese Methode ruft die Methode
save_state()für jeden Agenten auf und gibt ein Dictionary zurück, das Agent-IDs ihren Zuständen zuordnet.Hinweis
Diese Methode speichert derzeit nicht den Abonnementstatus. Wir werden dies in Zukunft hinzufügen.
- Gibt zurück:
Ein Dictionary, das Agent-IDs ihren Zuständen zuordnet.
- async load_state(state: Mapping[str, Any]) None[Quelle]#
Lädt den Zustand aller instanziierten Agenten.
Diese Methode ruft die Methode
load_state()für jeden Agenten mit dem in der Dictionary bereitgestellten Zustand auf. Die Schlüssel der Dictionary sind die Agent-IDs, und die Werte sind die Zustands-Dictionaries, die von der Methodesave_state()zurückgegeben wurden.Hinweis
Diese Methode lädt derzeit nicht den Abonnementstatus. Wir werden dies in Zukunft hinzufügen.
- async process_next() None[Quelle]#
Verarbeitet die nächste Nachricht in der Warteschlange.
Wenn eine unbehandelte Ausnahme im Hintergrund-Task auftritt, wird sie hier ausgelöst. process_next kann nach dem Auslösen einer unbehandelten Ausnahme nicht erneut aufgerufen werden.
- start() None[Quelle]#
Startet die Nachrichtenverarbeitungsschleife der Laufzeitumgebung. Diese läuft in einem Hintergrund-Task.
Beispiel
import asyncio from autogen_core import SingleThreadedAgentRuntime async def main() -> None: runtime = SingleThreadedAgentRuntime() runtime.start() # ... do other things ... await runtime.stop() asyncio.run(main())
- async close() None[Quelle]#
Ruft
stop()auf, falls zutreffend, und die MethodeAgent.close()für alle instanziierten Agenten
- async stop() None[Quelle]#
Stoppt sofort die Nachrichtenverarbeitungsschleife der Laufzeitumgebung. Die aktuell verarbeitete Nachricht wird abgeschlossen, aber alle nachfolgenden werden verworfen.
- async stop_when_idle() None[Quelle]#
Stoppt die Nachrichtenverarbeitungsschleife der Laufzeitumgebung, wenn keine ausstehende Nachricht verarbeitet oder in die Warteschlange gestellt wird. Dies ist die gebräuchlichste Methode, um die Laufzeitumgebung zu stoppen.
- async stop_when(condition: Callable[[], bool]) None[Quelle]#
Stoppt die Nachrichtenverarbeitungsschleife der Laufzeitumgebung, wenn die Bedingung erfüllt ist.
Vorsicht
Diese Methode wird nicht zur Verwendung empfohlen und ist aus Gründen der Abwärtskompatibilität vorhanden. Sie startet eine aktive Schleife, um die Bedingung ständig zu überprüfen. Es ist wesentlich effizienter, stop_when_idle oder stop aufzurufen. Wenn Sie die Laufzeitumgebung basierend auf einer Bedingung stoppen müssen, sollten Sie einen Hintergrund-Task und asyncio.Event verwenden, um zu signalisieren, wann die Bedingung erfüllt ist und der Hintergrund-Task stop aufrufen soll.
- async agent_metadata(agent: AgentId) AgentMetadata[Quelle]#
Ruft die Metadaten für einen Agenten ab.
- Parameter:
agent (AgentId) – Die Agenten-ID.
- Gibt zurück:
AgentMetadata – Die Agenten-Metadaten.
- async agent_save_state(agent: AgentId) Mapping[str, Any][Quelle]#
Speichert den Zustand eines einzelnen Agenten.
Die Struktur des Zustands ist implementierungsabhängig und kann jedes JSON-serialisierbare Objekt sein.
- Parameter:
agent (AgentId) – Die Agenten-ID.
- Gibt zurück:
Mapping[str, Any] – Der gespeicherte Zustand.
- async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#
Lädt den Zustand eines einzelnen Agenten.
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]#
Registriert eine Agentenfabrik beim Runtime, die mit einem bestimmten Typ verknüpft ist. Der Typ muss eindeutig sein. Diese API fügt keine Abonnements hinzu.
Hinweis
Dies ist eine Low-Level-API, und normalerweise sollte die register-Methode der Agentenklasse verwendet werden, da diese auch Abonnements automatisch handhabt.
Beispiel
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- Parameter:
type (str) – Der Typ des Agenten, den diese Factory erstellt. Dies ist nicht dasselbe wie der Agentenklassenname. Der type-Parameter wird verwendet, um zwischen verschiedenen Factory-Funktionen anstelle von Agentenklassen zu unterscheiden.
agent_factory (Callable[[], T]) – Die Factory, die den Agenten erstellt, wobei T ein konkreter Agententyp ist. Innerhalb der Factory verwenden Sie autogen_core.AgentInstantiationContext, um auf Variablen wie den aktuellen Runtime und die Agenten-ID zuzugreifen.
expected_class (type[T] | None, optional) – Die erwartete Klasse des Agenten, die zur Runtime-Validierung der Factory verwendet wird. Standardwert ist None. Wenn None, wird keine Validierung durchgeführt.
- async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[source]#
Registriert eine Agenten-Instanz beim Runtime. Der Typ darf wiederverwendet werden, aber jede agent_id muss eindeutig sein. Alle Agenten-Instanzen innerhalb eines Typs müssen vom selben Objekttyp sein. Diese API fügt keine Abonnements hinzu.
Hinweis
Dies ist eine Low-Level-API, und normalerweise sollte die register_instance-Methode der Agentenklasse verwendet werden, da diese auch Abonnements automatisch handhabt.
Beispiel
from dataclasses import dataclass from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def main() -> None: runtime: AgentRuntime = ... # type: ignore agent = MyAgent() await runtime.register_agent_instance( agent_instance=agent, agent_id=AgentId(type="my_agent", key="default") ) import asyncio asyncio.run(main())
- async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[source]#
Versucht, die zugrunde liegende Agenten-Instanz nach Name und Namensraum abzurufen. Dies wird generell nicht empfohlen (daher der lange Name), kann aber in einigen Fällen nützlich sein.
Wenn der zugrunde liegende Agent nicht zugänglich ist, wird eine Ausnahme ausgelöst.
- Parameter:
id (AgentId) – Die Agenten-ID.
type (Type[T], optional) – Der erwartete Typ des Agenten. Standard ist Agent.
- Gibt zurück:
T – Die konkrete Agenteninstanz.
- Löst aus:
LookupError – Wenn der Agent nicht gefunden wird.
NotAccessibleError – Wenn der Agent nicht zugänglich ist, z.B. wenn er sich remote befindet.
TypeError – Wenn der Agent nicht vom erwarteten Typ ist.
- async add_subscription(subscription: Subscription) None[source]#
Fügt eine neue Abonnement hinzu, das die Laufzeitumgebung bei der Verarbeitung veröffentlichter Nachrichten erfüllen soll.
- Parameter:
subscription (Subscription) – Das hinzuzufügende Abonnement.
- async remove_subscription(id: str) None[source]#
Entfernt ein Abonnement aus der Laufzeitumgebung.
- Parameter:
id (str) – Die ID des zu entfernenden Abonnements.
- Löst aus:
LookupError – Wenn das Abonnement nicht existiert.
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[source]#
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#
Fügt einen neuen Serialisierer für die Nachrichten serialisierung zur Laufzeitumgebung hinzu.
Hinweis: Dies dedupliziert Serialisierer basierend auf den Eigenschaften type_name und data_content_type.
- Parameter:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – Der/die hinzuzufügende(n) Serialisierer.
- ROOT_LOGGER_NAME = 'autogen_core'#
Der Name des Root-Loggers.
- EVENT_LOGGER_NAME = 'autogen_core.events'#
Der Name des Loggers, der für strukturierte Events verwendet wird.
- TRACE_LOGGER_NAME = 'autogen_core.trace'#
Logger-Name für die von Entwicklern beabsichtigte Trace-Protokollierung. Der Inhalt und das Format dieses Protokolls sollten nicht als verbindlich angesehen werden.
- class Component[source]#
Bases:
ComponentFromConfig[ConfigT],ComponentSchemaType[ConfigT],Generic[ConfigT]Um eine Komponentenklasse zu erstellen, erben Sie von dieser Klasse für die konkrete Klasse und von ComponentBase für die Schnittstelle. Implementieren Sie dann zwei Klassenvariablen:
component_config_schema- Eine Pydantic-Modellklasse, die die Konfiguration der Komponente darstellt. Dies ist auch der Typparameter von Component.component_type- Der logische Typ der Komponente.
Beispiel
from __future__ import annotations from pydantic import BaseModel from autogen_core import Component class Config(BaseModel): value: str class MyComponent(Component[Config]): component_type = "custom" component_config_schema = Config def __init__(self, value: str): self.value = value def _to_config(self) -> Config: return Config(value=self.value) @classmethod def _from_config(cls, config: Config) -> MyComponent: return cls(value=config.value)
- class ComponentBase[source]#
Bases:
ComponentToConfig[ConfigT],ComponentLoader,Generic[ConfigT]
- class ComponentFromConfig[source]#
Bases:
Generic[FromConfigT]- classmethod _from_config(config: FromConfigT) Self[source]#
Erstelle eine neue Instanz der Komponente aus einem Konfigurationsobjekt.
- Parameter:
config (T) – Das Konfigurationsobjekt.
- Gibt zurück:
Self – Die neue Instanz der Komponente.
- classmethod _from_config_past_version(config: Dict[str, Any], version: int) Self[source]#
Erstellt eine neue Instanz der Komponente aus einer früheren Version des Konfigurationsobjekts.
Dies wird nur aufgerufen, wenn die Version des Konfigurationsobjekts kleiner als die aktuelle Version ist, da das Schema in diesem Fall nicht bekannt ist.
- class ComponentLoader[source]#
Basiert auf:
object- classmethod load_component(model: ComponentModel | Dict[str, Any], expected: None = None) Self[source]#
- classmethod load_component(model: ComponentModel | Dict[str, Any], expected: Type[ExpectedType]) ExpectedType
Lädt eine Komponente aus einem Modell. Gedacht für die Verwendung mit dem Rückgabewert von
autogen_core.ComponentConfig.dump_component().Beispiel
from autogen_core import ComponentModel from autogen_core.models import ChatCompletionClient component: ComponentModel = ... # type: ignore model_client = ChatCompletionClient.load_component(component)
- Parameter:
model (ComponentModel) – Das Modell, aus dem die Komponente geladen werden soll.
model – _Beschreibung_
expected (Type[ExpectedType] | None, optional) – Expliziter Typ nur, wenn er direkt auf ComponentLoader verwendet wird. Standard ist None.
- Gibt zurück:
Self – Die geladene Komponente.
- Löst aus:
ValueError – Wenn der Anbieter-String ungültig ist.
TypeError – Der Anbieter ist keine Unterklasse von ComponentConfigImpl oder der erwartete Typ stimmt nicht überein.
- Gibt zurück:
Self | ExpectedType – Die geladene Komponente.
- pydantic model ComponentModel[source]#
Bases:
BaseModelModellklasse für eine Komponente. Enthält alle Informationen, die zur Instanziierung einer Komponente erforderlich sind.
JSON-Schema anzeigen
{ "title": "ComponentModel", "description": "Model class for a component. Contains all information required to instantiate a component.", "type": "object", "properties": { "provider": { "title": "Provider", "type": "string" }, "component_type": { "anyOf": [ { "enum": [ "model", "agent", "tool", "termination", "token_provider", "workbench" ], "type": "string" }, { "type": "string" }, { "type": "null" } ], "default": null, "title": "Component Type" }, "version": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Version" }, "component_version": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Component Version" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" }, "label": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Label" }, "config": { "title": "Config", "type": "object" } }, "required": [ "provider", "config" ] }
- Felder:
component_type (Literal['model', 'agent', 'tool', 'termination', 'token_provider', 'workbench'] | str | None)component_version (int | None)config (dict[str, Any])description (str | None)label (str | None)provider (str)version (int | None)
- field component_type: ComponentType | None = None#
Logischer Typ der Komponente. Wenn fehlend, übernimmt die Komponente den Standardtyp des Anbieters.
- field version: int | None = None#
Version der Komponentenspezifikation. Wenn fehlend, übernimmt die Komponente die aktuelle Version der verwendeten Bibliothek. Dies ist offensichtlich gefährlich und sollte für benutzergenerierte temporäre Konfigurationen verwendet werden. Für alle anderen Konfigurationen sollte die Version angegeben werden.
- field component_version: int | None = None#
Version der Komponente. Wenn fehlend, übernimmt die Komponente die Standardversion des Anbieters.
- class ComponentSchemaType[source]#
Bases:
Generic[ConfigT]- component_config_schema: Type[ConfigT]#
Die Pydantic-Modellklasse, die die Konfiguration der Komponente darstellt.
- required_class_vars = ['component_config_schema', 'component_type']#
- class ComponentToConfig[source]#
Bases:
Generic[ToConfigT]Die beiden Methoden, die eine Klasse als Komponente implementieren muss.
- Parameter:
Protocol (ConfigT) – Typ, der von
pydantic.BaseModelabgeleitet ist.
- component_type: ClassVar[Literal['model', 'agent', 'tool', 'termination', 'token_provider', 'workbench'] | str]#
Der logische Typ der Komponente.
- component_version: ClassVar[int] = 1#
Die Version der Komponente. Wenn Schema-Inkompatibilitäten auftreten, sollte diese aktualisiert werden.
- component_provider_override: ClassVar[str | None] = None#
Überschreibe den Anbieter-String für die Komponente. Dies sollte verwendet werden, um zu verhindern, dass interne Modulnamen Teil des Modulnamens werden.
- component_description: ClassVar[str | None] = None#
Eine Beschreibung der Komponente. Wenn nicht angegeben, wird der Docstring der Klasse verwendet.
- component_label: ClassVar[str | None] = None#
Ein menschenlesbares Label für die Komponente. Wenn nicht angegeben, wird der Klassenname der Komponente verwendet.
- _to_config() ToConfigT[Quelle]#
Gib die Konfiguration aus, die erforderlich wäre, um eine neue Instanz einer Komponente zu erstellen, die der Konfiguration dieser Instanz entspricht.
- Gibt zurück:
T – Die Konfiguration der Komponente.
- dump_component() ComponentModel[Quelle]#
Exportiert die Komponente in ein Modell, das wieder geladen werden kann.
- Löst aus:
TypeError – Wenn die Komponente eine lokale Klasse ist.
- Gibt zurück:
ComponentModel – Das Modell, das die Komponente repräsentiert.
- final class DropMessage[Quelle]#
Basiert auf:
objectMarkierungstyp zur Signalisierung, dass eine Nachricht von einem Intervention-Handler verworfen werden soll. Der Typ selbst sollte vom Handler zurückgegeben werden.
- class InterventionHandler(*args, **kwargs)[Quelle]#
Bases:
ProtocolEin Intervention-Handler ist eine Klasse, die verwendet werden kann, um Nachrichten zu modifizieren, zu protokollieren oder zu verwerfen, die von der
autogen_core.base.AgentRuntimeverarbeitet werden.Der Handler wird aufgerufen, wenn die Nachricht an die Laufzeit übermittelt wird.
Derzeit unterstützt nur die
autogen_core.base.SingleThreadedAgentRuntimedies.Hinweis: Die Rückgabe von None aus einer der Methoden des Intervention-Handlers führt zu einer Warnung und wird als "keine Änderung" behandelt. Wenn Sie beabsichtigen, eine Nachricht zu verwerfen, sollten Sie explizit
DropMessagezurückgeben.Beispiel
from autogen_core import DefaultInterventionHandler, MessageContext, AgentId, SingleThreadedAgentRuntime from dataclasses import dataclass from typing import Any @dataclass class MyMessage: content: str class MyInterventionHandler(DefaultInterventionHandler): async def on_send(self, message: Any, *, message_context: MessageContext, recipient: AgentId) -> MyMessage: if isinstance(message, MyMessage): message.content = message.content.upper() return message runtime = SingleThreadedAgentRuntime(intervention_handlers=[MyInterventionHandler()])
- async on_send(message: Any, *, message_context: MessageContext, recipient: AgentId) Any | type[DropMessage][Quelle]#
Wird aufgerufen, wenn eine Nachricht über
autogen_core.base.AgentRuntime.send_message()an die AgentRuntime übermittelt wird.
- async on_publish(message: Any, *, message_context: MessageContext) Any | type[DropMessage][Quelle]#
Wird aufgerufen, wenn eine Nachricht über
autogen_core.base.AgentRuntime.publish_message()an die AgentRuntime veröffentlicht wird.
- class DefaultInterventionHandler(*args, **kwargs)[Quelle]#
Bases:
InterventionHandlerEinfache Klasse, die eine Standardimplementierung für alle Methoden des Intervention-Handlers bietet, die einfach die Nachricht unverändert zurückgibt. Ermöglicht einfaches Unterklassen für die Überschreibung nur der gewünschten Methoden.
- async on_send(message: Any, *, message_context: MessageContext, recipient: AgentId) Any | type[DropMessage][Quelle]#
Wird aufgerufen, wenn eine Nachricht über
autogen_core.base.AgentRuntime.send_message()an die AgentRuntime übermittelt wird.
- async on_publish(message: Any, *, message_context: MessageContext) Any | type[DropMessage][Quelle]#
Wird aufgerufen, wenn eine Nachricht über
autogen_core.base.AgentRuntime.publish_message()an die AgentRuntime veröffentlicht wird.
- trace_create_agent_span(agent_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, agent_id: str | None = None, agent_description: str | None = None) Generator[Span, Any, None][Quelle]#
Kontextmanager zur Erstellung eines Spans für die Agentenerstellung gemäß den OpenTelemetry Semantic Conventions für generative KI-Systeme.
Siehe die Dokumentation der GenAI Semantic Conventions: OpenTelemetry GenAI Semantic Conventions
Warnung
Die GenAI Semantic Conventions befinden sich noch in der Inkubation und können sich in zukünftigen Versionen ändern.
- Parameter:
agent_name (str) – Der Name des erstellten Agenten.
tracer (Optional[trace.Tracer]) – Der Tracer, der zur Erstellung des Spans verwendet wird.
parent (Optional[Span]) – Der übergeordnete Span, zu dem dieser Span verknüpft werden soll.
agent_id (Optional[str]) – Die eindeutige Kennung des Agenten.
agent_description (Optional[str]) – Eine Beschreibung des Agenten.
- trace_invoke_agent_span(agent_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, agent_id: str | None = None, agent_description: str | None = None) Generator[Span, Any, None][Quelle]#
Kontextmanager zur Erstellung eines Spans für die Agentenaufrufung gemäß den OpenTelemetry Semantic Conventions für generative KI-Systeme.
Siehe die Dokumentation der GenAI Semantic Conventions: OpenTelemetry GenAI Semantic Conventions
Warnung
Die GenAI Semantic Conventions befinden sich noch in der Inkubation und können sich in zukünftigen Versionen ändern.
- Parameter:
agent_name (str) – Der Name des aufgerufenen Agenten.
tracer (Optional[trace.Tracer]) – Der Tracer, der zur Erstellung des Spans verwendet wird.
parent (Optional[Span]) – Der übergeordnete Span, zu dem dieser Span verknüpft werden soll.
agent_id (Optional[str]) – Die eindeutige Kennung des Agenten.
agent_description (Optional[str]) – Eine Beschreibung des Agenten.
- trace_tool_span(tool_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, tool_description: str | None = None, tool_call_id: str | None = None) Generator[Span, Any, None][Quelle]#
Kontextmanager zur Erstellung eines Spans für die Tool-Ausführung gemäß den OpenTelemetry Semantic Conventions für generative KI-Systeme.
Siehe die Dokumentation der GenAI Semantic Conventions: OpenTelemetry GenAI Semantic Conventions
Warnung
Die GenAI Semantic Conventions befinden sich noch in der Inkubation und können sich in zukünftigen Versionen ändern.
- Parameter:
tool_name (str) – Der Name des ausgeführten Tools.
tracer (Optional[trace.Tracer]) – Der Tracer, der zur Erstellung des Spans verwendet wird.
parent (Optional[Span]) – Der übergeordnete Span, zu dem dieser Span verknüpft werden soll.
tool_description (Optional[str]) – Eine Beschreibung des Tools.
tool_call_id (Optional[str]) – Eine eindeutige Kennung des Tool-Aufrufs.