OpenAI Assistant Agent#
Open AI Assistant und Azure OpenAI Assistant sind serverseitige APIs zum Erstellen von Agenten. Sie können zum Erstellen von Agenten in AutoGen verwendet werden. Dieses Kochbuch demonstriert, wie OpenAI Assistant verwendet wird, um einen Agenten zu erstellen, der Code ausführen und Dokumente per Q&A abfragen kann.
Nachrichtenprotokoll#
Zuerst müssen wir das Nachrichtenprotokoll für den Agenten, der auf OpenAI Assistant basiert, festlegen. Das Nachrichtenprotokoll definiert die Struktur von Nachrichten, die vom Agenten verarbeitet und veröffentlicht werden. Zur Veranschaulichung definieren wir ein einfaches Nachrichtenprotokoll mit 4 Nachrichtentypen: Message, Reset, UploadForCodeInterpreter und UploadForFileSearch.
from dataclasses import dataclass
@dataclass
class TextMessage:
content: str
source: str
@dataclass
class Reset:
pass
@dataclass
class UploadForCodeInterpreter:
file_path: str
@dataclass
class UploadForFileSearch:
file_path: str
vector_store_id: str
Der Nachrichtentyp TextMessage wird zur Kommunikation mit dem Agenten verwendet. Er verfügt über ein Feld content, das den Nachrichteninhalt enthält, und ein Feld source für den Absender. Der Nachrichtentyp Reset ist eine Kontrollnachricht, die den Speicher des Agenten zurücksetzt. Sie hat keine Felder. Dies ist nützlich, wenn wir eine neue Konversation mit dem Agenten beginnen müssen.
Der Nachrichtentyp UploadForCodeInterpreter wird zum Hochladen von Datendateien für den Code-Interpreter verwendet, und der Nachrichtentyp UploadForFileSearch wird zum Hochladen von Dokumenten für die Dateisuche verwendet. Beide Nachrichtentypen verfügen über ein Feld file_path, das den lokalen Pfad zur hochzuladenden Datei enthält.
Definieren des Agenten#
Als Nächstes definieren wir die Agentenklasse. Der Konstruktor der Agentenklasse hat die folgenden Argumente: description, client, assistant_id, thread_id und assistant_event_handler_factory. Das Argument client ist das asynchrone OpenAI-Clientobjekt, und assistant_event_handler_factory dient zur Erstellung eines Assistant-Event-Handlers zur Verarbeitung von OpenAI Assistant-Ereignissen. Dies kann zur Erstellung von Streaming-Ausgaben aus dem Assistenten verwendet werden.
Die Agentenklasse verfügt über die folgenden Nachrichten-Handler
handle_message: Verarbeitet den NachrichtentypTextMessageund sendet die Antwort des Assistenten zurück.handle_reset: Verarbeitet den NachrichtentypResetund setzt den Speicher des Assistenten-Agenten zurück.handle_upload_for_code_interpreter: Verarbeitet den NachrichtentypUploadForCodeInterpreterund lädt die Datei in den Code-Interpreter hoch.handle_upload_for_file_search: Verarbeitet den NachrichtentypUploadForFileSearchund lädt das Dokument für die Dateisuche hoch.
Der Speicher des Assistenten wird innerhalb eines Threads gespeichert, der auf der Serverseite liegt. Der Thread wird durch das Argument thread_id referenziert.
import asyncio
import os
from typing import Any, Callable, List
import aiofiles
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from openai import AsyncAssistantEventHandler, AsyncClient
from openai.types.beta.thread import ToolResources, ToolResourcesFileSearch
class OpenAIAssistantAgent(RoutedAgent):
"""An agent implementation that uses the OpenAI Assistant API to generate
responses.
Args:
description (str): The description of the agent.
client (openai.AsyncClient): The client to use for the OpenAI API.
assistant_id (str): The assistant ID to use for the OpenAI API.
thread_id (str): The thread ID to use for the OpenAI API.
assistant_event_handler_factory (Callable[[], AsyncAssistantEventHandler], optional):
A factory function to create an async assistant event handler. Defaults to None.
If provided, the agent will use the streaming mode with the event handler.
If not provided, the agent will use the blocking mode to generate responses.
"""
def __init__(
self,
description: str,
client: AsyncClient,
assistant_id: str,
thread_id: str,
assistant_event_handler_factory: Callable[[], AsyncAssistantEventHandler],
) -> None:
super().__init__(description)
self._client = client
self._assistant_id = assistant_id
self._thread_id = thread_id
self._assistant_event_handler_factory = assistant_event_handler_factory
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> TextMessage:
"""Handle a message. This method adds the message to the thread and publishes a response."""
# Save the message to the thread.
await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.beta.threads.messages.create(
thread_id=self._thread_id,
content=message.content,
role="user",
metadata={"sender": message.source},
)
)
)
# Generate a response.
async with self._client.beta.threads.runs.stream(
thread_id=self._thread_id,
assistant_id=self._assistant_id,
event_handler=self._assistant_event_handler_factory(),
) as stream:
await ctx.cancellation_token.link_future(asyncio.ensure_future(stream.until_done()))
# Get the last message.
messages = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id, order="desc", limit=1))
)
last_message_content = messages.data[0].content
# Get the text content from the last message.
text_content = [content for content in last_message_content if content.type == "text"]
if not text_content:
raise ValueError(f"Expected text content in the last message: {last_message_content}")
return TextMessage(content=text_content[0].text.value, source=self.metadata["type"])
@message_handler()
async def on_reset(self, message: Reset, ctx: MessageContext) -> None:
"""Handle a reset message. This method deletes all messages in the thread."""
# Get all messages in this thread.
all_msgs: List[str] = []
while True:
if not all_msgs:
msgs = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id))
)
else:
msgs = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id, after=all_msgs[-1]))
)
for msg in msgs.data:
all_msgs.append(msg.id)
if not msgs.has_next_page():
break
# Delete all the messages.
for msg_id in all_msgs:
status = await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.beta.threads.messages.delete(message_id=msg_id, thread_id=self._thread_id)
)
)
assert status.deleted is True
@message_handler()
async def on_upload_for_code_interpreter(self, message: UploadForCodeInterpreter, ctx: MessageContext) -> None:
"""Handle an upload for code interpreter. This method uploads a file and updates the thread with the file."""
# Get the file content.
async with aiofiles.open(message.file_path, mode="rb") as f:
file_content = await ctx.cancellation_token.link_future(asyncio.ensure_future(f.read()))
file_name = os.path.basename(message.file_path)
# Upload the file.
file = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.files.create(file=(file_name, file_content), purpose="assistants"))
)
# Get existing file ids from tool resources.
thread = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.retrieve(thread_id=self._thread_id))
)
tool_resources: ToolResources = thread.tool_resources if thread.tool_resources else ToolResources()
assert tool_resources.code_interpreter is not None
if tool_resources.code_interpreter.file_ids:
file_ids = tool_resources.code_interpreter.file_ids
else:
file_ids = [file.id]
# Update thread with new file.
await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.beta.threads.update(
thread_id=self._thread_id,
tool_resources={
"code_interpreter": {"file_ids": file_ids},
},
)
)
)
@message_handler()
async def on_upload_for_file_search(self, message: UploadForFileSearch, ctx: MessageContext) -> None:
"""Handle an upload for file search. This method uploads a file and updates the vector store."""
# Get the file content.
async with aiofiles.open(message.file_path, mode="rb") as file:
file_content = await ctx.cancellation_token.link_future(asyncio.ensure_future(file.read()))
file_name = os.path.basename(message.file_path)
# Upload the file.
await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.vector_stores.file_batches.upload_and_poll(
vector_store_id=message.vector_store_id,
files=[(file_name, file_content)],
)
)
)
Die Agentenklasse ist ein dünner Wrapper um die OpenAI Assistant API, um das Nachrichtenprotokoll zu implementieren. Weitere Funktionen, wie die Verarbeitung multimodaler Nachrichten, können durch Erweiterung des Nachrichtenprotokolls hinzugefügt werden.
Assistant Event Handler#
Der Assistant-Event-Handler bietet Rückruffunktionen für die Verarbeitung von Assistant-API-spezifischen Ereignissen. Dies ist nützlich für die Verarbeitung von Streaming-Ausgaben des Assistenten und die weitere Integration der Benutzeroberfläche.
from openai import AsyncAssistantEventHandler, AsyncClient
from openai.types.beta.threads import Message, Text, TextDelta
from openai.types.beta.threads.runs import RunStep, RunStepDelta
from typing_extensions import override
class EventHandler(AsyncAssistantEventHandler):
@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
print(delta.value, end="", flush=True)
@override
async def on_run_step_created(self, run_step: RunStep) -> None:
details = run_step.step_details
if details.type == "tool_calls":
for tool in details.tool_calls:
if tool.type == "code_interpreter":
print("\nGenerating code to interpret:\n\n```python")
@override
async def on_run_step_done(self, run_step: RunStep) -> None:
details = run_step.step_details
if details.type == "tool_calls":
for tool in details.tool_calls:
if tool.type == "code_interpreter":
print("\n```\nExecuting code...")
@override
async def on_run_step_delta(self, delta: RunStepDelta, snapshot: RunStep) -> None:
details = delta.step_details
if details is not None and details.type == "tool_calls":
for tool in details.tool_calls or []:
if tool.type == "code_interpreter" and tool.code_interpreter and tool.code_interpreter.input:
print(tool.code_interpreter.input, end="", flush=True)
@override
async def on_message_created(self, message: Message) -> None:
print(f"{'-'*80}\nAssistant:\n")
@override
async def on_message_done(self, message: Message) -> None:
# print a citation to the file searched
if not message.content:
return
content = message.content[0]
if not content.type == "text":
return
text_content = content.text
annotations = text_content.annotations
citations: List[str] = []
for index, annotation in enumerate(annotations):
text_content.value = text_content.value.replace(annotation.text, f"[{index}]")
if file_citation := getattr(annotation, "file_citation", None):
client = AsyncClient()
cited_file = await client.files.retrieve(file_citation.file_id)
citations.append(f"[{index}] {cited_file.filename}")
if citations:
print("\n".join(citations))
Verwendung des Agenten#
Zuerst müssen wir den openai-Client verwenden, um den eigentlichen Assistenten, Thread und Vektorspeicher zu erstellen. Unsere AutoGen-Agenten werden diese verwenden.
import openai
# Create an assistant with code interpreter and file search tools.
oai_assistant = openai.beta.assistants.create(
model="gpt-4o-mini",
description="An AI assistant that helps with everyday tasks.",
instructions="Help the user with their task.",
tools=[{"type": "code_interpreter"}, {"type": "file_search"}],
)
# Create a vector store to be used for file search.
vector_store = openai.vector_stores.create()
# Create a thread which is used as the memory for the assistant.
thread = openai.beta.threads.create(
tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}},
)
Dann erstellen wir eine Laufzeitumgebung und registrieren eine Agentenfabrikfunktion für diesen Agenten bei der Laufzeitumgebung.
from autogen_core import SingleThreadedAgentRuntime
runtime = SingleThreadedAgentRuntime()
await OpenAIAssistantAgent.register(
runtime,
"assistant",
lambda: OpenAIAssistantAgent(
description="OpenAI Assistant Agent",
client=openai.AsyncClient(),
assistant_id=oai_assistant.id,
thread_id=thread.id,
assistant_event_handler_factory=lambda: EventHandler(),
),
)
agent = AgentId("assistant", "default")
Schalten wir die Protokollierung ein, um zu sehen, was im Hintergrund passiert.
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
Senden wir eine Begrüßungsnachricht an den Agenten und sehen wir, wie die Antwort gestreamt zurückkommt.
runtime.start()
await runtime.send_message(TextMessage(content="Hello, how are you today!", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'Hello, how are you today!', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
--------------------------------------------------------------------------------
Assistant:
Hello! I'm here and ready to assist you. How can I help you today?
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': "Hello! I'm here and ready to assist you. How can I help you today?", 'source': 'assistant'}
Assistent mit Code-Interpreter#
Stellen wir dem Agenten eine mathematische Frage und sehen wir, wie er den Code-Interpreter verwendet, um die Frage zu beantworten.
runtime.start()
await runtime.send_message(TextMessage(content="What is 1332322 x 123212?", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'What is 1332322 x 123212?', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
# Calculating the product of 1332322 and 123212
result = 1332322 * 123212
result
```
Executing code...
--------------------------------------------------------------------------------
Assistant:
The product of 1,332,322 and 123,212 is 164,158,058,264.
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'The product of 1,332,322 and 123,212 is 164,158,058,264.', 'source': 'assistant'}
Holen wir uns einige Daten vom Seattle Open Data Portal. Wir werden die City of Seattle Wage Data verwenden. Laden wir sie zuerst herunter.
import requests
response = requests.get("https://data.seattle.gov/resource/2khk-5ukd.csv")
with open("seattle_city_wages.csv", "wb") as file:
file.write(response.content)
Senden wir die Datei mit einer Nachricht vom Typ UploadForCodeInterpreter an den Agenten.
runtime.start()
await runtime.send_message(UploadForCodeInterpreter(file_path="seattle_city_wages.csv"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type UploadForCodeInterpreter to assistant: {'file_path': 'seattle_city_wages.csv'}
INFO:autogen_core:Calling message handler for assistant:default with message type UploadForCodeInterpreter sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
Wir können dem Agenten nun Fragen zu den Daten stellen.
runtime.start()
await runtime.send_message(TextMessage(content="Take a look at the uploaded CSV file.", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'Take a look at the uploaded CSV file.', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
import pandas as pd
# Load the uploaded CSV file to examine its contents
file_path = '/mnt/data/file-oEvRiyGyHc2jZViKyDqL8aoh'
csv_data = pd.read_csv(file_path)
# Display the first few rows of the dataframe to understand its structure
csv_data.head()
```
Executing code...
--------------------------------------------------------------------------------
Assistant:
The uploaded CSV file contains the following columns:
1. **department**: The department in which the individual works.
2. **last_name**: The last name of the employee.
3. **first_name**: The first name of the employee.
4. **job_title**: The job title of the employee.
5. **hourly_rate**: The hourly rate for the employee's position.
Here are the first few entries from the file:
| department | last_name | first_name | job_title | hourly_rate |
|--------------------------------|-----------|------------|------------------------------------|-------------|
| Police Department | Aagard | Lori | Pol Capt-Precinct | 112.70 |
| Police Department | Aakervik | Dag | Pol Ofcr-Detective | 75.61 |
| Seattle City Light | Aaltonen | Evan | Pwrline Clear Tree Trimmer | 53.06 |
| Seattle Public Utilities | Aar | Abdimallik | Civil Engrng Spec,Sr | 64.43 |
| Seattle Dept of Transportation | Abad | Abigail | Admin Spec II-BU | 37.40 |
If you need any specific analysis or information from this data, please let me know!
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': "The uploaded CSV file contains the following columns:\n\n1. **department**: The department in which the individual works.\n2. **last_name**: The last name of the employee.\n3. **first_name**: The first name of the employee.\n4. **job_title**: The job title of the employee.\n5. **hourly_rate**: The hourly rate for the employee's position.\n\nHere are the first few entries from the file:\n\n| department | last_name | first_name | job_title | hourly_rate |\n|--------------------------------|-----------|------------|------------------------------------|-------------|\n| Police Department | Aagard | Lori | Pol Capt-Precinct | 112.70 |\n| Police Department | Aakervik | Dag | Pol Ofcr-Detective | 75.61 |\n| Seattle City Light | Aaltonen | Evan | Pwrline Clear Tree Trimmer | 53.06 |\n| Seattle Public Utilities | Aar | Abdimallik | Civil Engrng Spec,Sr | 64.43 |\n| Seattle Dept of Transportation | Abad | Abigail | Admin Spec II-BU | 37.40 |\n\nIf you need any specific analysis or information from this data, please let me know!", 'source': 'assistant'}
runtime.start()
await runtime.send_message(TextMessage(content="What are the top-10 salaries?", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'What are the top-10 salaries?', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
# Sorting the data by hourly_rate in descending order and selecting the top 10 salaries
top_10_salaries = csv_data[['first_name', 'last_name', 'job_title', 'hourly_rate']].sort_values(by='hourly_rate', ascending=False).head(10)
top_10_salaries.reset_index(drop=True, inplace=True)
top_10_salaries
```
Executing code...
--------------------------------------------------------------------------------
Assistant:
Here are the top 10 salaries based on the hourly rates from the CSV file:
| First Name | Last Name | Job Title | Hourly Rate |
|------------|-----------|------------------------------------|-------------|
| Eric | Barden | Executive4 | 139.61 |
| Idris | Beauregard| Executive3 | 115.90 |
| Lori | Aagard | Pol Capt-Precinct | 112.70 |
| Krista | Bair | Pol Capt-Precinct | 108.74 |
| Amy | Bannister | Fire Chief, Dep Adm-80 Hrs | 104.07 |
| Ginger | Armbruster| Executive2 | 102.42 |
| William | Andersen | Executive2 | 102.42 |
| Valarie | Anderson | Executive2 | 102.42 |
| Paige | Alderete | Executive2 | 102.42 |
| Kathryn | Aisenberg | Executive2 | 100.65 |
If you need any further details or analysis, let me know!
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'Here are the top 10 salaries based on the hourly rates from the CSV file:\n\n| First Name | Last Name | Job Title | Hourly Rate |\n|------------|-----------|------------------------------------|-------------|\n| Eric | Barden | Executive4 | 139.61 |\n| Idris | Beauregard| Executive3 | 115.90 |\n| Lori | Aagard | Pol Capt-Precinct | 112.70 |\n| Krista | Bair | Pol Capt-Precinct | 108.74 |\n| Amy | Bannister | Fire Chief, Dep Adm-80 Hrs | 104.07 |\n| Ginger | Armbruster| Executive2 | 102.42 |\n| William | Andersen | Executive2 | 102.42 |\n| Valarie | Anderson | Executive2 | 102.42 |\n| Paige | Alderete | Executive2 | 102.42 |\n| Kathryn | Aisenberg | Executive2 | 100.65 |\n\nIf you need any further details or analysis, let me know!', 'source': 'assistant'}
Assistent mit Dateisuche#
Probieren wir die Q&A-Funktion für Dokumente aus. Zuerst laden wir die Wikipedia-Seite über den Dritten Anglo-Afghanischen Krieg herunter.
response = requests.get("https://en.wikipedia.org/wiki/Third_Anglo-Afghan_War")
with open("third_anglo_afghan_war.html", "wb") as file:
file.write(response.content)
Senden wir die Datei mit einer Nachricht vom Typ UploadForFileSearch an den Agenten.
runtime.start()
await runtime.send_message(
UploadForFileSearch(file_path="third_anglo_afghan_war.html", vector_store_id=vector_store.id), agent
)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type UploadForFileSearch to assistant: {'file_path': 'third_anglo_afghan_war.html', 'vector_store_id': 'vs_h3xxPbJFnd1iZ9WdjsQwNdrp'}
INFO:autogen_core:Calling message handler for assistant:default with message type UploadForFileSearch sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
Stellen wir dem Agenten Fragen zu dem Dokument. Bevor wir fragen, setzen wir den Speicher des Agenten zurück, um eine neue Konversation zu beginnen.
runtime.start()
await runtime.send_message(Reset(), agent)
await runtime.send_message(
TextMessage(
content="When and where was the treaty of Rawalpindi signed? Answer using the document provided.", source="user"
),
agent,
)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type Reset to assistant: {}
INFO:autogen_core:Calling message handler for assistant:default with message type Reset sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'When and where was the treaty of Rawalpindi signed? Answer using the document provided.', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
--------------------------------------------------------------------------------
Assistant:
The Treaty of Rawalpindi was signed on **8 August 1919**. The location of the signing was in **Rawalpindi**, which is in present-day Pakistan【6:0†source】.
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'The Treaty of Rawalpindi was signed on **8 August 1919**. The location of the signing was in **Rawalpindi**, which is in present-day Pakistan【6:0†source】.', 'source': 'assistant'}
[0] third_anglo_afghan_war.html
Das war's! Wir haben erfolgreich einen Agenten erstellt, der auf OpenAI Assistant basiert.