Benutzerdefinierte Agenten#
Möglicherweise haben Sie Agenten mit Verhaltensweisen, die nicht in eine vordefinierte Kategorie fallen. In solchen Fällen können Sie benutzerdefinierte Agenten erstellen.
Alle Agenten in AgentChat erben von der Klasse BaseChatAgent und implementieren die folgenden abstrakten Methoden und Attribute:
on_messages(): Die abstrakte Methode, die das Verhalten des Agenten als Reaktion auf Nachrichten definiert. Diese Methode wird aufgerufen, wenn der Agent aufgefordert wird, eine Antwort inrun()bereitzustellen. Sie gibt einResponse-Objekt zurück.on_reset(): Die abstrakte Methode, die den Agenten in seinen Ausgangszustand zurückversetzt. Diese Methode wird aufgerufen, wenn der Agent aufgefordert wird, sich selbst zurückzusetzen.produced_message_types: Die Liste der möglichenBaseChatMessageNachrichtentypen, die der Agent in seiner Antwort erzeugen kann.
Optional können Sie die Methode on_messages_stream() implementieren, um Nachrichten zu streamen, sobald sie vom Agenten generiert werden. Diese Methode wird von run_stream() aufgerufen, um Nachrichten zu streamen. Wenn diese Methode nicht implementiert ist, verwendet der Agent die Standardimplementierung von on_messages_stream(), die die Methode on_messages() aufruft und alle Nachrichten in der Antwort yieldet.
CountDownAgent#
In diesem Beispiel erstellen wir einen einfachen Agenten, der von einer gegebenen Zahl bis Null herunterzählt und einen Nachrichtenstrom mit der aktuellen Zählung erzeugt.
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
from autogen_core import CancellationToken
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Calls the on_messages_stream.
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
# The response is returned at the end of the stream.
# It contains the final message and all the inner messages.
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_countdown_agent() -> None:
# Create a countdown agent.
countdown_agent = CountDownAgent("countdown")
# Run the agent with a given task and stream the response.
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message)
else:
print(message)
# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
3...
2...
1...
Done!
ArithmeticAgent#
In diesem Beispiel erstellen wir eine Agentenklasse, die einfache arithmetische Operationen mit einer gegebenen Ganzzahl durchführen kann. Anschließend verwenden wir verschiedene Instanzen dieser Agentenklasse in einem SelectorGroupChat, um eine gegebene Ganzzahl durch Anwenden einer Reihe von arithmetischen Operationen in eine andere Ganzzahl zu transformieren.
Die Klasse ArithmeticAgent nimmt eine operator_func entgegen, die eine Ganzzahl entgegennimmt und nach Anwendung einer arithmetischen Operation auf die Ganzzahl eine Ganzzahl zurückgibt. In ihrer Methode on_messages wendet sie die operator_func auf die Ganzzahl in der Eingabenachricht an und gibt eine Antwort mit dem Ergebnis zurück.
from typing import Callable, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.messages import BaseChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
class ArithmeticAgent(BaseChatAgent):
def __init__(self, name: str, description: str, operator_func: Callable[[int], int]) -> None:
super().__init__(name, description=description)
self._operator_func = operator_func
self._message_history: List[BaseChatMessage] = []
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Update the message history.
# NOTE: it is possible the messages is an empty list, which means the agent was selected previously.
self._message_history.extend(messages)
# Parse the number in the last message.
assert isinstance(self._message_history[-1], TextMessage)
number = int(self._message_history[-1].content)
# Apply the operator function to the number.
result = self._operator_func(number)
# Create a new message with the result.
response_message = TextMessage(content=str(result), source=self.name)
# Update the message history.
self._message_history.append(response_message)
# Return the response.
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
Hinweis
Die Methode on_messages kann mit einer leeren Nachrichtenliste aufgerufen werden. In diesem Fall bedeutet dies, dass der Agent zuvor aufgerufen wurde und nun erneut aufgerufen wird, ohne neue Nachrichten vom Aufrufer. Daher ist es wichtig, eine Historie der zuvor vom Agenten empfangenen Nachrichten zu führen und diese Historie zur Generierung der Antwort zu verwenden.
Jetzt können wir einen SelectorGroupChat mit 5 Instanzen von ArithmeticAgent erstellen:
einer, die 1 zur Einganzahl addiert,
einer, die 1 von der Einganzahl subtrahiert,
einer, die die Einganzahl mit 2 multipliziert,
einer, die die Einganzahl durch 2 teilt und auf die nächste ganze Zahl abrundet, und
einer, die die Einganzahl unverändert zurückgibt.
Anschließend erstellen wir einen SelectorGroupChat mit diesen Agenten und legen die entsprechenden Selektoreinstellungen fest:
damit derselbe Agent nacheinander ausgewählt werden kann, um wiederholte Operationen zu ermöglichen, und
passen wir die Selektor-Prompt an, um die Antwort des Modells auf die spezifische Aufgabe zuzuschneiden.
async def run_number_agents() -> None:
# Create agents for number operations.
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number.", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)
# The termination condition is to stop after 10 messages.
termination_condition = MaxMessageTermination(10)
# Create a selector group chat.
selector_group_chat = SelectorGroupChat(
[add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
termination_condition=termination_condition,
allow_repeated_speaker=True, # Allow the same agent to speak multiple times, necessary for this task.
selector_prompt=(
"Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
"Current conversation history:\n{history}\n"
"Please select the most appropriate role for the next message, and only return the role name."
),
)
# Run the selector group chat with a given task and stream the response.
task: List[BaseChatMessage] = [
TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
TextMessage(content="10", source="user"),
]
stream = selector_group_chat.run_stream(task=task)
await Console(stream)
# Use asyncio.run(run_number_agents()) when running in a script.
await run_number_agents()
---------- user ----------
Apply the operations to turn the given number into 25.
---------- user ----------
10
---------- multiply_agent ----------
20
---------- add_agent ----------
21
---------- multiply_agent ----------
42
---------- divide_agent ----------
21
---------- add_agent ----------
22
---------- add_agent ----------
23
---------- add_agent ----------
24
---------- add_agent ----------
25
---------- Summary ----------
Number of messages: 10
Finish reason: Maximum number of messages 10 reached, current message count: 10
Total prompt tokens: 0
Total completion tokens: 0
Duration: 2.40 seconds
Aus der Ausgabe können wir ersehen, dass die Agenten die Einganzahl erfolgreich von 10 auf 25 transformiert haben, indem sie geeignete Agenten ausgewählt haben, die die arithmetischen Operationen sequenziell anwenden.
Verwendung benutzerdefinierter Modellclients in benutzerdefinierten Agenten#
Eines der Hauptmerkmale des Presets AssistantAgent in AgentChat ist, dass es ein Argument model_client entgegennimmt und es bei der Beantwortung von Nachrichten verwenden kann. In einigen Fällen möchten Sie jedoch möglicherweise, dass Ihr Agent einen benutzerdefinierten Modellclient verwendet, der derzeit nicht unterstützt wird (siehe unterstützte Modellclients) oder benutzerdefinierte Modellverhalten.
Dies können Sie mit einem benutzerdefinierten Agenten erreichen, der *Ihren benutzerdefinierten Modellclient* implementiert.
Im folgenden Beispiel gehen wir ein Beispiel für einen benutzerdefinierten Agenten durch, der das Google Gemini SDK direkt verwendet, um auf Nachrichten zu antworten.
Hinweis: Sie müssen das Google Gemini SDK installieren, um dieses Beispiel auszuführen. Sie können es mit dem folgenden Befehl installieren:
pip install google-genai
# !pip install google-genai
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from google import genai
from google.genai import types
class GeminiAssistantAgent(BaseChatAgent):
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
# Add messages to the model context
for msg in messages:
await self._model_context.add_message(msg.to_model_message())
# Get conversation history
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# Generate response using Gemini
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# Create usage metadata
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# Add response to model context
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
# Yield the final response
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
await Console(gemini_assistant.run_stream(task="What is the capital of New York?"))
---------- user ----------
What is the capital of New York?
---------- gemini_assistant ----------
Albany
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the capital of New York?', type='TextMessage'), TextMessage(source='gemini_assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=5), content='Albany\nTERMINATE\n', type='TextMessage')], stop_reason=None)
Im obigen Beispiel haben wir uns entschieden, model, api_key und system_message als Argumente anzugeben - Sie können auch andere Argumente angeben, die für den von Ihnen verwendeten Modellclient erforderlich sind oder zu Ihrem Anwendungsdesign passen.
Nun wollen wir untersuchen, wie dieser benutzerdefinierte Agent als Teil eines Teams in AgentChat verwendet werden kann.
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
# Create the primary agent.
primary_agent = AssistantAgent(
"primary",
model_client=model_client,
system_message="You are a helpful AI assistant.",
)
# Create a critic agent based on our new GeminiAssistantAgent.
gemini_critic_agent = GeminiAssistantAgent(
"gemini_critic",
system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)
# Define a termination condition that stops the task if the critic approves or after 10 messages.
termination = TextMentionTermination("APPROVE") | MaxMessageTermination(10)
# Create a team with the primary and critic agents.
team = RoundRobinGroupChat([primary_agent, gemini_critic_agent], termination_condition=termination)
await Console(team.run_stream(task="Write a Haiku poem with 4 lines about the fall season."))
await model_client.close()
---------- user ----------
Write a Haiku poem with 4 lines about the fall season.
---------- primary ----------
Crimson leaves cascade,
Whispering winds sing of change,
Chill wraps the fading,
Nature's quilt, rich and warm.
---------- gemini_critic ----------
The poem is good, but it has four lines instead of three. A haiku must have three lines with a 5-7-5 syllable structure. The content is evocative of autumn, but the form is incorrect. Please revise to adhere to the haiku's syllable structure.
---------- primary ----------
Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:
Crimson leaves drift down,
Chill winds whisper through the gold,
Autumn’s breath is near.
---------- gemini_critic ----------
The revised haiku is much improved. It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn. APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='Write a Haiku poem with 4 lines about the fall season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=33, completion_tokens=31), content="Crimson leaves cascade, \nWhispering winds sing of change, \nChill wraps the fading, \nNature's quilt, rich and warm.", type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=86, completion_tokens=60), content="The poem is good, but it has four lines instead of three. A haiku must have three lines with a 5-7-5 syllable structure. The content is evocative of autumn, but the form is incorrect. Please revise to adhere to the haiku's syllable structure.\n", type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=141, completion_tokens=49), content='Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:\n\nCrimson leaves drift down, \nChill winds whisper through the gold, \nAutumn’s breath is near.', type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=211, completion_tokens=32), content='The revised haiku is much improved. It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn. APPROVE\n', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")
Im obigen Abschnitt zeigen wir mehrere sehr wichtige Konzepte:
Wir haben einen benutzerdefinierten Agenten entwickelt, der das Google Gemini SDK zur Beantwortung von Nachrichten verwendet.
Wir zeigen, dass dieser benutzerdefinierte Agent als Teil des breiteren AgentChat-Ökosystems verwendet werden kann – in diesem Fall als Teilnehmer an einem
RoundRobinGroupChat, solange er vonBaseChatAgenterbt.
Den benutzerdefinierten Agenten deklarativ machen#
Autogen bietet eine Component-Schnittstelle, um die Konfiguration von Komponenten serialisierbar in einem deklarativen Format zu machen. Dies ist nützlich zum Speichern und Laden von Konfigurationen sowie zum Teilen von Konfigurationen mit anderen.
Wir erreichen dies, indem wir von der Klasse Component erben und die Methoden _from_config und _to_config implementieren. Die deklarative Klasse kann mithilfe der Methode dump_component in ein JSON-Format serialisiert und mithilfe der Methode load_component aus einem JSON-Format deserialisiert werden.
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken, Component
from pydantic import BaseModel
from typing_extensions import Self
class GeminiAssistantAgentConfig(BaseModel):
name: str
description: str = "An agent that provides assistance with ability to use tools."
model: str = "gemini-1.5-flash-002"
system_message: str | None = None
class GeminiAssistantAgent(BaseChatAgent, Component[GeminiAssistantAgentConfig]): # type: ignore[no-redef]
component_config_schema = GeminiAssistantAgentConfig
# component_provider_override = "mypackage.agents.GeminiAssistantAgent"
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
# Add messages to the model context
for msg in messages:
await self._model_context.add_message(msg.to_model_message())
# Get conversation history
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# Generate response using Gemini
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# Create usage metadata
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# Add response to model context
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
# Yield the final response
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
@classmethod
def _from_config(cls, config: GeminiAssistantAgentConfig) -> Self:
return cls(
name=config.name, description=config.description, model=config.model, system_message=config.system_message
)
def _to_config(self) -> GeminiAssistantAgentConfig:
return GeminiAssistantAgentConfig(
name=self.name,
description=self.description,
model=self._model,
system_message=self._system_message,
)
Nachdem wir nun die erforderlichen Methoden implementiert haben, können wir den benutzerdefinierten Agenten in und aus einem JSON-Format laden und entladen und dann den Agenten aus dem JSON-Format laden.
Hinweis: Sie sollten die Klassenvariable
component_provider_overrideauf den vollständigen Pfad des Moduls setzen, das die benutzerdefinierte Agentenklasse enthält (z. B. (mypackage.agents.GeminiAssistantAgent)). Dies wird von der Methodeload_componentverwendet, um zu bestimmen, wie die Klasse instanziiert wird.
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
config = gemini_assistant.dump_component()
print(config.model_dump_json(indent=2))
loaded_agent = GeminiAssistantAgent.load_component(config)
print(loaded_agent)
{
"provider": "__main__.GeminiAssistantAgent",
"component_type": "agent",
"version": 1,
"component_version": 1,
"description": null,
"label": "GeminiAssistantAgent",
"config": {
"name": "gemini_assistant",
"description": "An agent that provides assistance with ability to use tools.",
"model": "gemini-1.5-flash-002",
"system_message": "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed."
}
}
<__main__.GeminiAssistantAgent object at 0x11a5c5a90>
Nächste Schritte#
Bisher haben wir gezeigt, wie man benutzerdefinierte Agenten erstellt, benutzerdefinierte Modellclients zu Agenten hinzufügt und benutzerdefinierte Agenten deklarativ macht. Es gibt mehrere Möglichkeiten, wie dieses einfache Beispiel erweitert werden kann:
Erweitern Sie den Gemini-Modellclient, um die Funktionsaufrufe ähnlich wie die Klasse
AssistantAgentzu handhaben. https://ai.google.dev/gemini-api/docs/function-callingImplementieren Sie ein Paket mit einem benutzerdefinierten Agenten und experimentieren Sie mit der Verwendung seines deklarativen Formats in einem Tool wie AutoGen Studio.