ACA Dynamische Sitzungen Code-Executor#
Diese Anleitung erläutert die dynamischen Sitzungen von Azure Container Apps in Azure Container Apps und zeigt Ihnen, wie Sie die Azure Container Code Executor-Klasse verwenden.
Die dynamischen Sitzungen von Azure Container Apps sind eine Komponente im Dienst Azure Container Apps. Die Umgebung wird auf entfernten Azure-Instanzen gehostet und führt keinen Code lokal aus. Der Interpreter kann Python-Code in einer Jupyter-Umgebung mit einer vorinstallierten Basis gängiger Pakete ausführen. Benutzerdefinierte Umgebungen können von Benutzern für ihre Anwendungen erstellt werden. Dateien können zusätzlich zu jeder Sitzung hochgeladen oder heruntergeladen werden.
Der Code-Interpreter kann mehrere Code-Sitzungen ausführen, die jeweils durch eine Sitzungs-Identifikationszeichenfolge abgegrenzt sind.
Erstellen eines Container Apps Sitzungspools#
Erstellen Sie in Ihrem Azure-Portal eine neue Ressource vom Typ Container App Session Pool mit dem Pool-Typ Python code interpreter und notieren Sie sich den Pool management endpoint. Das Format für den Endpunkt sollte etwa so aussehen: https://{region}.dynamicsessions.io/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/sessionPools/{session_pool_name}.
Alternativ können Sie die Azure CLI zum Erstellen eines Sitzungspools verwenden.
ACADynamicSessionsCodeExecutor#
Die Klasse ACADynamicSessionsCodeExecutor ist ein Python-Code-Executor, der beliebigen Python-Code auf einer standardmäßigen Serverless Code Interpreter-Sitzung erstellt und ausführt. Seine Schnittstelle lautet wie folgt
Initialisierung#
Zuerst müssen Sie ein Anmeldeobjekt finden oder erstellen, das die Schnittstelle TokenProvider implementiert. Dies ist jedes Objekt, das die folgende Funktion implementiert
def get_token(
self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any
) -> azure.core.credentials.AccessToken
Ein Beispiel für ein solches Objekt ist die Klasse azure.identity.DefaultAzureCredential.
Beginnen wir mit der Installation dieser
# pip install azure.identity
Als Nächstes importieren wir alle notwendigen Module und Klassen für unseren Code
import os
import tempfile
from anyio import open_file
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.azure import ACADynamicSessionsCodeExecutor
from azure.identity import DefaultAzureCredential
Nun erstellen wir unseren Azure-Code-Executor und führen einige Testcodes aus, zusammen mit der Überprüfung, ob er korrekt ausgeführt wurde. Wir erstellen den Executor mit einem temporären Arbeitsverzeichnis, um eine saubere Umgebung zu gewährleisten, während wir zeigen, wie jede Funktion verwendet wird
cancellation_token = CancellationToken()
POOL_MANAGEMENT_ENDPOINT = "..."
with tempfile.TemporaryDirectory() as temp_dir:
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
)
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "hello world!" in code_result.output
Als Nächstes versuchen wir, einige Dateien hochzuladen und ihre Integrität zu überprüfen. Alle Dateien, die in den Serverless Code Interpreter hochgeladen werden, werden in das Verzeichnis /mnt/data hochgeladen. Alle herunterladbaren Dateien müssen ebenfalls in diesem Verzeichnis platziert werden. Standardmäßig ist das aktuelle Arbeitsverzeichnis für den Code-Executor auf /mnt/data gesetzt.
with tempfile.TemporaryDirectory() as temp_dir:
test_file_1 = "test_upload_1.txt"
test_file_1_contents = "test1 contents"
test_file_2 = "test_upload_2.txt"
test_file_2_contents = "test2 contents"
async with await open_file(os.path.join(temp_dir, test_file_1), "w") as f: # type: ignore[syntax]
await f.write(test_file_1_contents)
async with await open_file(os.path.join(temp_dir, test_file_2), "w") as f: # type: ignore[syntax]
await f.write(test_file_2_contents)
assert os.path.isfile(os.path.join(temp_dir, test_file_1))
assert os.path.isfile(os.path.join(temp_dir, test_file_2))
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
)
await executor.upload_files([test_file_1, test_file_2], cancellation_token)
file_list = await executor.get_file_list(cancellation_token)
assert test_file_1 in file_list
assert test_file_2 in file_list
code_blocks = [
CodeBlock(
code=f"""
with open("{test_file_1}") as f:
print(f.read())
with open("{test_file_2}") as f:
print(f.read())
""",
language="python",
)
]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0
assert test_file_1_contents in code_result.output
assert test_file_2_contents in code_result.output
Das Herunterladen von Dateien funktioniert auf ähnliche Weise.
with tempfile.TemporaryDirectory() as temp_dir:
test_file_1 = "test_upload_1.txt"
test_file_1_contents = "test1 contents"
test_file_2 = "test_upload_2.txt"
test_file_2_contents = "test2 contents"
assert not os.path.isfile(os.path.join(temp_dir, test_file_1))
assert not os.path.isfile(os.path.join(temp_dir, test_file_2))
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
)
code_blocks = [
CodeBlock(
code=f"""
with open("{test_file_1}", "w") as f:
f.write("{test_file_1_contents}")
with open("{test_file_2}", "w") as f:
f.write("{test_file_2_contents}")
""",
language="python",
),
]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0
file_list = await executor.get_file_list(cancellation_token)
assert test_file_1 in file_list
assert test_file_2 in file_list
await executor.download_files([test_file_1, test_file_2], cancellation_token)
assert os.path.isfile(os.path.join(temp_dir, test_file_1))
async with await open_file(os.path.join(temp_dir, test_file_1), "r") as f: # type: ignore[syntax]
content = await f.read()
assert test_file_1_contents in content
assert os.path.isfile(os.path.join(temp_dir, test_file_2))
async with await open_file(os.path.join(temp_dir, test_file_2), "r") as f: # type: ignore[syntax]
content = await f.read()
assert test_file_2_contents in content
Neue Sitzungen#
Jede Instanz der Klasse ACADynamicSessionsCodeExecutor hat eine eindeutige Sitzungs-ID. Jeder Aufruf eines bestimmten Code-Executors wird in derselben Sitzung ausgeführt, bis die Funktion restart() darauf aufgerufen wird. Vorherige Sitzungen können nicht wiederverwendet werden.
Hier führen wir etwas Code in der Codesitzung aus, starten sie neu und überprüfen dann, ob eine neue Sitzung geöffnet wurde.
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential()
)
code_blocks = [CodeBlock(code="x = 'abcdefg'", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0
code_blocks = [CodeBlock(code="print(x)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "abcdefg" in code_result.output
await executor.restart()
code_blocks = [CodeBlock(code="print(x)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code != 0 and "NameError" in code_result.output
Verfügbare Pakete#
Jede Code-Ausführungsinstanz ist mit den meisten gängigen Paketen vorinstalliert. Die Liste der verfügbaren Pakete und Versionen ist jedoch außerhalb der Ausführungsumgebung nicht verfügbar. Die Paketliste in der Umgebung kann durch Aufruf der Funktion get_available_packages() auf dem Code-Executor abgerufen werden.
print(executor.get_available_packages(cancellation_token))