autogen_ext.runtimes.grpc#

class GrpcWorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | None = None, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None, payload_serialization_format: str = JSON_DATA_CONTENT_TYPE)[Quelle]#

Bases: AgentRuntime

Eine Agentenlaufzeit zum Ausführen von entfernten oder sprachübergreifenden Agenten.

Die Agentenkommunikation nutzt Protobufs aus agent_worker.proto und CloudEvent aus cloudevent.proto.

Sprachübergreifende Agenten erfordern zusätzlich, dass alle Agenten gemeinsame Protobuf-Schemata für alle Nachrichtentypen verwenden, die zwischen Agenten gesendet werden.

async start() None[Quelle]#

Starten Sie die Laufzeit in einem Hintergrundtask.

async stop() None[Quelle]#

Stoppen Sie die Laufzeit sofort.

async stop_when_signal(signals: Sequence[Signals] = (signal.SIGTERM, signal.SIGINT)) None[Quelle]#

Stoppen Sie die Laufzeit, wenn ein Signal empfangen wird.

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[Quelle]#

Senden Sie eine Nachricht an einen Agenten und erhalten Sie eine Antwort.

Parameter:
  • message (Any) – Die zu sendende Nachricht.

  • recipient (AgentId) – Der Agent, an den die Nachricht gesendet werden soll.

  • sender (AgentId | None, optional) – Agent, der die Nachricht gesendet hat. Sollte **nur** None sein, wenn dies von keinem Agenten gesendet wurde, z. B. direkt an die Laufzeit extern. Standardmäßig None.

  • cancellation_token (CancellationToken | None, optional) – Token zum Abbrechen eines laufenden Vorgangs. Standardmäßig None.

Löst aus:
  • CantHandleException – Wenn der Empfänger die Nachricht nicht verarbeiten kann.

  • UndeliverableException – Wenn die Nachricht nicht zugestellt werden kann.

  • Other – Jede andere Ausnahme, die vom Empfänger ausgelöst wird.

Gibt zurück:

Any – Die Antwort vom Agenten.

async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[Quelle]#

Veröffentlichen Sie eine Nachricht für alle Agenten im gegebenen Namespace oder, wenn kein Namespace angegeben ist, für den Namespace des Absenders.

Es werden keine Antworten vom Veröffentlichen erwartet.

Parameter:
  • message (Any) – Die zu veröffentlichende Nachricht.

  • topic_id (TopicId) – Das Thema, zu dem die Nachricht veröffentlicht werden soll.

  • sender (AgentId | None, optional) – Der Agent, der die Nachricht gesendet hat. Standardmäßig None.

  • cancellation_token (CancellationToken | None, optional) – Token zum Abbrechen eines laufenden Vorgangs. Standardmäßig None.

  • message_id (str | None, optional) – Die Nachrichten-ID. Wenn None, wird eine neue Nachrichten-ID generiert. Standardmäßig None. Diese Nachrichten-ID muss eindeutig sein und sollte eine UUID sein.

Löst aus:

UndeliverableException – Wenn die Nachricht nicht zugestellt werden kann.

async save_state() Mapping[str, Any][Quelle]#

Speichert den Zustand der gesamten Laufzeit, einschließlich aller gehosteten Agenten. Die einzige Möglichkeit, den Zustand wiederherzustellen, besteht darin, ihn an load_state() zu übergeben.

Die Struktur des Zustands ist implementierungsabhängig und kann jedes JSON-serialisierbare Objekt sein.

Gibt zurück:

Mapping[str, Any] – Der gespeicherte Zustand.

async load_state(state: Mapping[str, Any]) None[Quelle]#

Lädt den Zustand der gesamten Laufzeit, einschließlich aller gehosteten Agenten. Der Zustand sollte derselbe sein wie der von save_state() zurückgegebene Zustand.

Parameter:

state (Mapping[str, Any]) – Der gespeicherte Zustand.

async agent_metadata(agent: AgentId) AgentMetadata[Quelle]#

Metadaten für einen Agenten abrufen.

Parameter:

agent (AgentId) – Die Agenten-ID.

Gibt zurück:

AgentMetadata – Die Agentenmetadaten.

async agent_save_state(agent: AgentId) Mapping[str, Any][Quelle]#

Speichert den Zustand eines einzelnen Agenten.

Die Struktur des Zustands ist implementierungsabhängig und kann jedes JSON-serialisierbare Objekt sein.

Parameter:

agent (AgentId) – Die Agenten-ID.

Gibt zurück:

Mapping[str, Any] – Der gespeicherte Zustand.

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[Quelle]#

Lädt den Zustand eines einzelnen Agenten.

Parameter:
  • agent (AgentId) – Die Agenten-ID.

  • state (Mapping[str, Any]) – Der gespeicherte Zustand.

async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[Quelle]#

Registriert eine Agentenfabrik mit der Laufzeit, die mit einem bestimmten Typ verknüpft ist. Der Typ muss eindeutig sein. Diese API fügt keine Abonnements hinzu.

Hinweis

Dies ist eine Low-Level-API, und normalerweise sollte die Methode register der Agentenklasse verwendet werden, da diese auch Abonnements automatisch handhabt.

Beispiel

from dataclasses import dataclass

from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    await runtime.register_factory("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
Parameter:
  • type (str) – Der Typ des Agenten, den diese Fabrik erstellt. Es ist nicht dasselbe wie der Agentenklassenname. Der Parameter type wird verwendet, um zwischen verschiedenen Fabrikfunktionen anstelle von Agentenklassen zu unterscheiden.

  • agent_factory (Callable[[], T]) – Die Fabrik, die den Agenten erstellt, wobei T ein konkreter Agent-Typ ist. Innerhalb der Fabrik verwenden Sie autogen_core.AgentInstantiationContext, um auf Variablen wie die aktuelle Laufzeit und die Agenten-ID zuzugreifen.

  • expected_class (type[T] | None, optional) – Die erwartete Klasse des Agenten, die zur Laufzeitvalidierung der Fabrik verwendet wird. Standardmäßig None. Wenn None, wird keine Validierung durchgeführt.

async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[Quelle]#

Registriert eine Agenteninstanz bei der Laufzeit. Der Typ kann wiederverwendet werden, aber jede Agenten-ID muss eindeutig sein. Alle Agenteninstanzen innerhalb eines Typs müssen vom selben Objekttyp sein. Diese API fügt keine Abonnements hinzu.

Hinweis

Dies ist eine Low-Level-API, und normalerweise sollte die Methode register_instance der Agentenklasse verwendet werden, da diese auch Abonnements automatisch handhabt.

Beispiel

from dataclasses import dataclass

from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    agent = MyAgent()
    await runtime.register_agent_instance(
        agent_instance=agent, agent_id=AgentId(type="my_agent", key="default")
    )


import asyncio

asyncio.run(main())
Parameter:
  • agent_instance (Agent) – Eine konkrete Instanz des Agenten.

  • agent_id (AgentId) – Die Kennung des Agenten. Der Typ des Agenten ist agent_id.type.

async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[Quelle]#

Versuchen Sie, die zugrunde liegende Agenteninstanz nach Namen und Namespace abzurufen. Dies wird generell abgeraten (daher der lange Name), kann aber in einigen Fällen nützlich sein.

Wenn der zugrunde liegende Agent nicht zugänglich ist, wird eine Ausnahme ausgelöst.

Parameter:
  • id (AgentId) – Die Agenten-ID.

  • type (Type[T], optional) – Der erwartete Typ des Agenten. Standardmäßig Agent.

Gibt zurück:

T – Die konkrete Agenteninstanz.

Löst aus:
  • LookupError – Wenn der Agent nicht gefunden wird.

  • NotAccessibleError – Wenn der Agent nicht zugänglich ist, z. B. wenn er sich remote befindet.

  • TypeError – Wenn der Agent nicht vom erwarteten Typ ist.

async add_subscription(subscription: Subscription) None[Quelle]#

Fügt ein neues Abonnement hinzu, das die Laufzeit bei der Verarbeitung veröffentlichter Nachrichten erfüllen soll.

Parameter:

subscription (Subscription) – Das hinzuzufügende Abonnement

async remove_subscription(id: str) None[Quelle]#

Entfernt ein Abonnement aus der Laufzeit.

Parameter:

id (str) – ID des zu entfernenden Abonnements

Löst aus:

LookupError – Wenn das Abonnement nicht existiert

async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[Quelle]#
add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[Quelle]#

Fügt dem Laufzeitumgebung einen neuen Nachrichten-Serialisierungs-Serializer hinzu

Hinweis: Dies dedupliziert Serialisierer basierend auf den Eigenschaften type_name und data_content_type

Parameter:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – Der hinzuzufügende Serialisierer/die hinzuzufügenden Serialisierer

class GrpcWorkerAgentRuntimeHost(address: str, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None)[Quelle]#

Basiert auf: object

start() None[Quelle]#

Startet den Server in einem Hintergrundtask.

async stop(grace: int = 5) None[Quelle]#

Stoppt den Server.

async stop_when_signal(grace: int = 5, signals: Sequence[Signals] = (signal.SIGTERM, signal.SIGINT)) None[Quelle]#

Stoppt den Server, wenn ein Signal empfangen wird.

class GrpcWorkerAgentRuntimeHostServicer[Quelle]#

Bases: AgentRpcServicer

Ein gRPC-Servicer, der den Nachrichtenzustellungsdienst für Agenten hostet.

async OpenChannel(request_iterator: AsyncIterator[Message], context: ServicerContext[Message, Message]) AsyncIterator[Message][Quelle]#

Fehlender zugehöriger Dokumentationskommentar in der .proto-Datei.

async OpenControlChannel(request_iterator: AsyncIterator[ControlMessage], context: ServicerContext[ControlMessage, ControlMessage]) AsyncIterator[ControlMessage][Quelle]#

Fehlender zugehöriger Dokumentationskommentar in der .proto-Datei.

async RegisterAgent(request: RegisterAgentTypeRequest, context: ServicerContext[RegisterAgentTypeRequest, RegisterAgentTypeResponse]) RegisterAgentTypeResponse[Quelle]#

Fehlender zugehöriger Dokumentationskommentar in der .proto-Datei.

async AddSubscription(request: AddSubscriptionRequest, context: ServicerContext[AddSubscriptionRequest, AddSubscriptionResponse]) AddSubscriptionResponse[Quelle]#

Fehlender zugehöriger Dokumentationskommentar in der .proto-Datei.

async RemoveSubscription(request: RemoveSubscriptionRequest, context: ServicerContext[RemoveSubscriptionRequest, RemoveSubscriptionResponse]) RemoveSubscriptionResponse[Quelle]#

Fehlender zugehöriger Dokumentationskommentar in der .proto-Datei.

async GetSubscriptions(request: GetSubscriptionsRequest, context: ServicerContext[GetSubscriptionsRequest, GetSubscriptionsResponse]) GetSubscriptionsResponse[Quelle]#

Fehlender zugehöriger Dokumentationskommentar in der .proto-Datei.