Themen- und Abonnement-Beispielszenarien#

Einführung#

In diesem Kochbuch untersuchen wir, wie Broadcasting für die Agentenkommunikation in AutoGen anhand von vier verschiedenen Broadcasting-Szenarien funktioniert. Diese Szenarien veranschaulichen verschiedene Möglichkeiten, Nachrichten zwischen Agenten zu verarbeiten und zu verteilen. Wir verwenden ein einheitliches Beispiel eines Steuerberatungsunternehmens, das Kundenanfragen bearbeitet, um jedes Szenario zu demonstrieren.

Szenarienübersicht#

Stellen Sie sich ein Steuerberatungsunternehmen vor, das seinen Kunden verschiedene Dienstleistungen anbietet, wie z. B. Steuerplanung, Streitbeilegung, Compliance und Vorbereitung. Das Unternehmen beschäftigt ein Team von Steuerspezialisten, die jeweils über Fachkenntnisse in einem dieser Bereiche verfügen, sowie einen Steuerverwaltungssystemmanager, der den Betrieb überwacht.

Kunden reichen Anfragen ein, die von den entsprechenden Spezialisten bearbeitet werden müssen. Die Kommunikation zwischen den Kunden, dem Steuerverwaltungssystemmanager und den Steuerspezialisten wird in diesem System über Broadcasting abgewickelt.

Wir werden untersuchen, wie sich verschiedene Broadcasting-Szenarien auf die Verteilung von Nachrichten zwischen Agenten auswirken und wie sie zur Anpassung des Kommunikationsflusses an spezifische Bedürfnisse verwendet werden können.


Übersicht der Broadcasting-Szenarien#

Wir werden die folgenden Broadcasting-Szenarien behandeln

  1. Single-Tenant, Single Scope of Publishing

  2. Multi-Tenant, Single Scope of Publishing

  3. Single-Tenant, Multiple Scopes of Publishing

  4. Multi-Tenant, Multiple Scopes of Publishing

Jedes Szenario stellt einen anderen Ansatz für die Nachrichtenverteilung und Agenteninteraktion innerhalb des Systems dar. Durch das Verständnis dieser Szenarien können Sie Kommunikationsstrategien für Agenten entwerfen, die am besten zu den Anforderungen Ihrer Anwendung passen.

import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
    SystemMessage,
)
class TaxSpecialty(str, Enum):
    PLANNING = "planning"
    DISPUTE_RESOLUTION = "dispute_resolution"
    COMPLIANCE = "compliance"
    PREPARATION = "preparation"


@dataclass
class ClientRequest:
    content: str


@dataclass
class RequestAssessment:
    content: str


class TaxSpecialist(RoutedAgent):
    def __init__(
        self,
        description: str,
        specialty: TaxSpecialty,
        system_messages: List[SystemMessage],
    ) -> None:
        super().__init__(description)
        self.specialty = specialty
        self._system_messages = system_messages
        self._memory: List[ClientRequest] = []

    @message_handler
    async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
        # Process the client request.
        print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
        # Send a response back to the manager
        if ctx.topic_id is None:
            raise ValueError("Topic ID is required for broadcasting")
        await self.publish_message(
            message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
            topic_id=ctx.topic_id,
        )

1. Single-Tenant, Single Scope of Publishing#

Szenarioerklärung#

Im Szenario Single-Tenant, Single Scope of Publishing

  • Alle Agenten arbeiten innerhalb eines einzigen Mandanten (z. B. eines Kunden oder einer Benutzersitzung).

  • Nachrichten werden zu einem einzigen Thema veröffentlicht, und alle Agenten abonnieren dieses Thema.

  • Jeder Agent empfängt jede Nachricht, die zu dem Thema veröffentlicht wird.

Dieses Szenario eignet sich für Situationen, in denen alle Agenten über alle Nachrichten informiert sein müssen und keine Notwendigkeit besteht, die Kommunikation zwischen verschiedenen Agentengruppen oder Sitzungen zu isolieren.

Anwendung in der Steuerspezialistenfirma#

In unserem Steuerspezialistenunternehmen impliziert dieses Szenario

  • Alle Steuerspezialisten erhalten jede Kundenanfrage und interne Nachricht.

  • Alle Agenten arbeiten eng zusammen und haben vollständige Einsicht in die gesamte Kommunikation.

  • Nützlich für Aufgaben oder Teams, bei denen alle Agenten über alle Nachrichten informiert sein müssen.

Wie das Szenario funktioniert#

  • Abonnements: Alle Agenten verwenden das Standardabonnement (z. B. „default“).

  • Veröffentlichung: Nachrichten werden zu dem Standardthema veröffentlicht.

  • Nachrichtenverarbeitung: Jeder Agent entscheidet, ob er basierend auf Inhalt und verfügbaren Handlern auf eine Nachricht reagiert.

Vorteile#

  • Einfachheit: Einfach einzurichten und zu verstehen.

  • Zusammenarbeit: Fördert Transparenz und Zusammenarbeit zwischen Agenten.

  • Flexibilität: Agenten können dynamisch entscheiden, welche Nachrichten sie verarbeiten möchten.

Überlegungen#

  • Skalierbarkeit: Skaliert möglicherweise nicht gut mit einer großen Anzahl von Agenten oder Nachrichten.

  • Effizienz: Agenten erhalten möglicherweise viele irrelevante Nachrichten, was zu unnötiger Verarbeitung führt.

async def run_single_tenant_single_scope() -> None:
    # Create the runtime.
    runtime = SingleThreadedAgentRuntime()

    # Register TaxSpecialist agents for each specialty
    specialist_agent_type_1 = "TaxSpecialist_1"
    specialist_agent_type_2 = "TaxSpecialist_2"
    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_1,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 1",
            specialty=TaxSpecialty.PLANNING,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_2,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 2",
            specialty=TaxSpecialty.DISPUTE_RESOLUTION,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    # Add default subscriptions for each agent type
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))

    # Start the runtime and send a message to agents on default topic
    runtime.start()
    await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
    await runtime.stop_when_idle()


await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.

==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.

2. Multi-Tenant, Single Scope of Publishing#

Szenarioerklärung#

Im Szenario Multi-Tenant, Single Scope of Publishing

  • Es gibt mehrere Mandanten (z. B. mehrere Kunden oder Benutzersitzungen).

  • Jeder Mandant hat sein eigenes isoliertes Thema über die Themenquelle.

  • Alle Agenten innerhalb eines Mandanten abonnieren das Thema des Mandanten. Bei Bedarf werden für jeden Mandanten neue Agenteninstanzen erstellt.

  • Nachrichten sind nur für Agenten innerhalb desselben Mandanten sichtbar.

Dieses Szenario ist nützlich, wenn Sie die Kommunikation zwischen verschiedenen Mandanten isolieren müssen, aber möchten, dass alle Agenten innerhalb eines Mandanten über alle Nachrichten informiert sind.

Anwendung in der Steuerspezialistenfirma#

In diesem Szenario

  • Das Unternehmen betreut gleichzeitig mehrere Kunden (Mandanten).

  • Für jeden Kunden wird ein dedizierter Satz von Agenteninstanzen erstellt.

  • Die Kommunikation jedes Kunden ist von anderen isoliert.

  • Alle Agenten für einen Kunden erhalten Nachrichten, die zu dem Thema dieses Kunden veröffentlicht werden.

Wie das Szenario funktioniert#

  • Abonnements: Agenten abonnieren Themen basierend auf der Identität des Mandanten.

  • Veröffentlichung: Nachrichten werden zu dem Mandanten-spezifischen Thema veröffentlicht.

  • Nachrichtenverarbeitung: Agenten erhalten nur Nachrichten, die für ihren Mandanten relevant sind.

Vorteile#

  • Mandantenisolierung: Gewährleistet Datenschutz und Trennung zwischen Kunden.

  • Zusammenarbeit innerhalb des Mandanten: Agenten können innerhalb ihres Mandanten frei zusammenarbeiten.

Überlegungen#

  • Komplexität: Erfordert die Verwaltung mehrerer Agenten- und Themen-Sets.

  • Ressourcenverbrauch: Mehr Agenteninstanzen können zusätzliche Ressourcen verbrauchen.

async def run_multi_tenant_single_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # List of clients (tenants)
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize sessions and map the topic type to each TaxSpecialist agent type
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish client requests to their respective topics
    for tenant in tenants:
        topic_source = tenant  # The topic source is the client name
        topic_id = DefaultTopicId(source=topic_source)
        await runtime.publish_message(
            ClientRequest(f"{tenant} requires tax services."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.

3. Single-Tenant, Multiple Scopes of Publishing#

Szenarioerklärung#

Im Szenario Single-Tenant, Multiple Scopes of Publishing

  • Alle Agenten arbeiten innerhalb eines einzigen Mandanten.

  • Nachrichten werden zu verschiedenen Themen veröffentlicht.

  • Agenten abonnieren spezifische Themen, die für ihre Rolle oder Spezialisierung relevant sind.

  • Nachrichten werden basierend auf dem Thema an Teilmengen von Agenten weitergeleitet.

Dieses Szenario ermöglicht eine gezielte Kommunikation innerhalb eines Mandanten und ermöglicht eine feinere Steuerung der Nachrichtenverteilung.

Anwendung im Steuerberatungsunternehmen#

In diesem Szenario

  • Der Steuerverwaltungssystemmanager kommuniziert mit spezifischen Spezialisten basierend auf deren Spezialgebieten.

  • Verschiedene Themen repräsentieren verschiedene Spezialisierungen (z. B. „Planung“, „Compliance“).

  • Spezialisten abonnieren nur das Thema, das ihrer Spezialisierung entspricht.

  • Der Manager veröffentlicht Nachrichten zu spezifischen Themen, um die vorgesehenen Spezialisten zu erreichen.

Wie das Szenario funktioniert#

  • Abonnements: Agenten abonnieren Themen, die ihren Spezialisierungen entsprechen.

  • Veröffentlichung: Nachrichten werden zu Themen veröffentlicht, die den vorgesehenen Empfängern entsprechen.

  • Nachrichtenverarbeitung: Nur Agenten, die ein Thema abonniert haben, erhalten dessen Nachrichten.

Vorteile#

  • Gezielte Kommunikation: Nachrichten erreichen nur die relevanten Agenten.

  • Effizienz: Reduziert unnötige Nachrichtenverarbeitung durch Agenten.

Überlegungen#

  • Komplexität der Einrichtung: Erfordert sorgfältige Verwaltung von Themen und Abonnements.

  • Flexibilität: Änderungen an Kommunikationsszenarien können eine Aktualisierung der Abonnements erfordern.

async def run_single_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()
    # Register TaxSpecialist agents for each specialty and add subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish a ClientRequest to each specialist's topic
    for specialty in TaxSpecialty:
        topic_id = TopicId(type=specialty.value, source="default")
        await runtime.publish_message(
            ClientRequest(f"I need assistance with {specialty.value} taxes."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.

4. Multi-Tenant, Multiple Scopes of Publishing#

Szenarioerklärung#

Im Szenario Multi-Tenant, Multiple Scopes of Publishing

  • Es gibt mehrere Mandanten, jeder mit seiner eigenen Reihe von Agenten.

  • Nachrichten werden innerhalb jedes Mandanten zu mehreren Themen veröffentlicht.

  • Agenten abonnieren Mandanten-spezifische Themen, die für ihre Rolle relevant sind.

  • Kombiniert Mandantenisolierung mit gezielter Kommunikation.

Dieses Szenario bietet das höchste Maß an Kontrolle über die Nachrichtenverteilung und eignet sich für komplexe Systeme mit mehreren Kunden und spezialisierten Kommunikationsbedürfnissen.

Anwendung im Steuerberatungsunternehmen#

In diesem Szenario

  • Das Unternehmen betreut mehrere Kunden, jeder mit dedizierten Agenteninstanzen.

  • Innerhalb jedes Kunden kommunizieren Agenten mit mehreren Themen basierend auf Spezialisierungen.

  • Zum Beispiel abonniert der Planungsspezialist von Kunde A das Thema „Planung“ mit der Quelle „ClientA“.

  • Der Steuerverwaltungssystemmanager für jeden Kunden kommuniziert mit seinen Spezialisten über Mandanten-spezifische Themen.

Wie das Szenario funktioniert#

  • Abonnements: Agenten abonnieren Themen, die sowohl auf der Mandantenidentität als auch auf der Spezialisierung basieren.

  • Veröffentlichung: Nachrichten werden zu Mandanten-spezifischen und Spezialisierungs-spezifischen Themen veröffentlicht.

  • Nachrichtenverarbeitung: Nur Agenten, die dem Mandanten und dem Thema entsprechen, erhalten Nachrichten.

Vorteile#

  • Vollständige Isolierung: Gewährleistet sowohl Mandanten- als auch Kommunikationsisolierung.

  • Granulare Kontrolle: Ermöglicht die präzise Weiterleitung von Nachrichten an vorgesehene Agenten.

Überlegungen#

  • Komplexität: Erfordert sorgfältige Verwaltung von Themen, Mandanten und Abonnements.

  • Ressourcenverbrauch: Eine erhöhte Anzahl von Agenteninstanzen und Themen kann sich auf die Ressourcen auswirken.

async def run_multi_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # Define TypeSubscriptions for each specialty and tenant
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize agents for all specialties and add type subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        for tenant in tenants:
            specialist_subscription = TypeSubscription(
                topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
            )
            await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Send messages for each tenant to each specialty
    for tenant in tenants:
        for specialty in TaxSpecialty:
            topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
            await runtime.publish_message(
                ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
                topic_id=topic_id,
            )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.