Flow in Azure ML Pipeline-Auftrag verwenden#
In praktischen Szenarien erfüllen Flows verschiedene Funktionen. Betrachten Sie beispielsweise einen Offline-Flow, der speziell zur Bewertung des Relevanz-Scores für Kommunikationssitzungen zwischen Menschen und Agenten entwickelt wurde. Dieser Flow wird nächtlich ausgelöst und verarbeitet eine beträchtliche Menge an Sitzungsdaten. In einem solchen Kontext sind die Parallel-Komponente und AzureML-Pipeline die optimalen Wahlmöglichkeiten für die Handhabung von groß angelegten, hochgradig widerstandsfähigen und effizienten Offline-Batch-Anforderungen.
Sobald Sie Ihren Flow entwickelt und gründlich getestet haben, führt Sie diese Anleitung durch die Verwendung Ihres Flows als parallele Komponente in einem AzureML-Pipeline-Auftrag.
Voraussetzungen
Um diese Funktion zu ermöglichen, müssen Kunden
die zugehörige CLI oder das zugehörige Paket installieren
Für CLI installieren Sie zuerst die Azure CLI und dann die Erweiterung
ml>=2.22.0überaz extension add -n ml;Für SDK installieren Sie das Paket
azure-ai-ml>=1.12.0überpip install azure-ai-ml>=1.12.0oderpip install promptflow[azure];
sicherstellen, dass ein
$schemain der Zielquelle vorhanden istflow.dag.yaml:$schema:https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.jsonrun.yaml:$schema:https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json
sicherstellen, dass Metadaten generiert wurden und aktuell sind
<mein-Flow-Verzeichnis>/.promptflow/flow.tools.jsonsollte existieren;Kunden können die Datei über
pf flow validate --source <mein-Flow-Verzeichnis>aktualisieren.
Um ein ausführbares End-to-End-Beispiel für die Ausführung eines Beispiel-Flows in einem Azure ML-Workspace zu erkunden, können Sie sich dieses Tutorial-Notebook ansehen: Flow mit Pipeline ausführen
Weitere Informationen zu AzureML und Komponenten
Einen Flow als Komponente registrieren#
Angenommen, es gibt bereits einen Flow und dessen flow.dag.yaml sieht wie folgt aus
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
environment:
python_requirements_txt: requirements.txt
inputs:
text:
type: string
default: Hello World!
outputs:
output:
type: string
reference: ${llm.output}
nodes:
- name: hello_prompt
type: prompt
source:
type: code
path: hello.jinja2
inputs:
text: ${inputs.text}
- name: llm
type: python
source:
type: code
path: hello.py
inputs:
prompt: ${hello_prompt.output}
deployment_name: text-davinci-003
max_tokens: "120"
Kunden können einen Flow entweder über CLI oder SDK als Komponente registrieren
# Register flow as a component
az ml component create --file \<my-flow-directory\>/flow.dag.yaml
# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
az ml component create --file \<my-flow-directory\>/flow.dag.yaml --version 3 --set name=basic_updated
from azure.ai.ml import MLClient, load_component
ml_client = MLClient()
# Register flow as a component
flow_component = load_component("<my-flow-directory>/flow.dag.yaml")
ml_client.components.create_or_update(flow_component)
# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
flow_component.name = "basic_updated"
ml_client.components.create_or_update(flow_component, version="3")
Die generierte Komponente wird eine parallele Komponente sein, deren Definition wie folgt aussehen wird
name: basic
version: 1
display_name: basic
is_deterministic: True
type: parallel
inputs:
data:
type: uri_folder
optional: False
run_outputs:
type: uri_folder
optional: True
text:
type: string
optional: False
default: Hello World!
outputs:
flow_outputs:
type: uri_folder
debug_info:
type: uri_folder
...
Neben den festen Eingabe-/Ausgabe-Ports werden alle Verbindungen und Flow-Eingaben als Eingabeparameter der Komponente verfügbar gemacht. Standardwerte können in der Flow-/Run-Definition angegeben werden; sie können auch beim Einreichen des Auftrags gesetzt/überschrieben werden. Eine vollständige Beschreibung der Ports finden Sie im Abschnitt Komponenten-Ports und Run-Einstellungen.
Flow in einem Pipeline-Auftrag verwenden#
Nachdem ein Flow als Komponente registriert wurde, kann er in einem Pipeline-Auftrag wie reguläre registrierte Komponenten referenziert werden. Kunden können einen Flow auch direkt in einem Pipeline-Auftrag verwenden, dann werden beim Einreichen des Auftrags anonyme Komponenten erstellt.
...
inputs:
basic_input:
type: uri_file
path: <path-to-data>
compute: azureml:cpu-cluster
jobs:
flow_from_registered:
type: parallel
component: azureml:my_flow_component:1
inputs:
data: ${{parent.inputs.basic_input}}
text: "${data.text}"
flow_from_dag:
type: parallel
component: <path-to-flow-dag-yaml>
inputs:
data: ${{parent.inputs.basic_input}}
text: "${data.text}"
flow_from_run:
type: parallel
component: <path-to-run-yaml>
inputs:
data: ${{parent.inputs.basic_input}}
text: "${data.text}"
...
Pipeline-Aufträge können über az ml job create --file pipeline.yml eingereicht werden.
Ein vollständiges Beispiel finden Sie hier.
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline
credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')
flow_component_from_registered = ml_client.components.get("my_flow_component", "1")
flow_component_from_dag = load_component("<path-to-flow-dag-yaml>")
flow_component_from_run = load_component("<path-to-run-yaml>")
@pipeline
def pipeline_func_with_flow(basic_input):
flow_from_registered = flow_component_from_registered(
data=data,
text="${data.text}",
)
flow_from_dag = flow_component_from_dag(
data=data,
text="${data.text}",
)
flow_from_run = flow_component_from_run(
data=data,
text="${data.text}",
)
pipeline_with_flow = pipeline_func_with_flow(basic_input=data_input)
pipeline_with_flow.compute = "cpu-cluster"
pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)
Ein vollständiges Beispiel finden Sie hier.
Wie bei regulären parallelen Komponenten können Kunden in einem Pipeline-Auftrag Run-Einstellungen dafür festlegen. Einige häufig verwendete Run-Einstellungen sind im Abschnitt Komponenten-Ports und Run-Einstellungen aufgeführt; Kunden können auch das offizielle Dokument der parallelen Komponente für weitere Details konsultieren.
...
jobs:
flow_node:
type: parallel
component: <path-to-complicated-run-yaml>
compute: azureml:cpu-cluster
instance_count: 2
max_concurrency_per_instance: 2
mini_batch_error_threshold: 5
retry_settings:
max_retries: 3
timeout: 30
inputs:
data: ${{parent.inputs.data}}
url: "${data.url}"
connections.summarize_text_content.connection: azure_open_ai_connection
connections.summarize_text_content.deployment_name: text-davinci-003
environment_variables.AZURE_OPENAI_API_KEY: ${my_connection.api_key}
environment_variables.AZURE_OPENAI_API_BASE: ${my_connection.api_base}
...
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import RetrySettings
credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')
# Load flow as a component
flow_component = load_component("<path-to-complicated-run-yaml>")
@pipeline
def pipeline_func_with_flow(data):
flow_node = flow_component(
data=data,
url="${data.url}",
connections={
"summarize_text_content": {
"connection": "azure_open_ai_connection",
"deployment_name": "text-davinci-003",
},
},
environment_variables={
"AZURE_OPENAI_API_KEY": "${my_connection.api_key}",
"AZURE_OPENAI_API_BASE": "${my_connection.api_base}",
}
)
flow_node.compute = "cpu-cluster"
flow_node.instance_count = 2
flow_node.max_concurrency_per_instance = 2
flow_node.mini_batch_error_threshold = 5
flow_node.retry_settings = RetrySettings(timeout=30, max_retries=5)
pipeline_with_flow = pipeline_func_with_flow(data=data_input)
pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)
Umgebung der Komponente#
Standardmäßig basiert die Umgebung der erstellten Komponente auf dem neuesten Promptflow-Runtime-Image. Wenn Kunden eine Python-Anforderungsdatei in flow.dag.yaml angegeben haben, werden diese automatisch auf die Umgebung angewendet.
...
environment:
python_requirements_txt: requirements.txt
Wenn Kunden eine bestehende Azure ML-Umgebung verwenden oder die Umgebung im Azure ML-Stil definieren möchten, können sie dies wie folgt in run.yaml definieren
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json
flow: <my-flow-directory>
azureml:
environment: azureml:my-environment:1
Weitere Details zum unterstützten Format von Azure ML-Umgebungen finden Sie in diesem Dokument.
Unterschiede zwischen Flow in Prompt Flow und Pipeline-Auftrag#
In Prompt Flow läuft ein Flow in einer Compute-Sitzung, die für Prompt Flow konzipiert ist; in einem Pipeline-Auftrag läuft ein Flow hingegen auf verschiedenen Arten von Compute-Ressourcen, üblicherweise auf Compute-Clustern.
Vor diesem Hintergrund sollten Sie sich bewusst sein, dass es zu unerwarteten Fehlern kommen kann, wenn Ihr Flow Logik enthält, die von Identitäten oder Umgebungsvariablen abhängt, da der Flow in einem Pipeline-Auftrag auf anderen Umgebungen ausgeführt wird. Möglicherweise sind zusätzliche Konfigurationen erforderlich, damit er funktioniert.
Komponenten-Ports und Run-Einstellungen#
Eingabe-Ports#
Schlüssel |
source |
type |
description |
|---|---|---|---|
data |
fest |
uri_folder oder uri_file |
erforderlich; zur Übergabe von Eingabedaten. Unterstützte Formate umfassen |
run_outputs |
fest |
uri_folder |
optional; zur Übergabe der Ausgabe eines Standard-Flows für einen Evaluations-Flow. Sollte mit einem |
Ausgabe-Ports#
Schlüssel |
source |
type |
description |
|---|---|---|---|
flow_outputs |
fest |
uri_folder |
ein uri_folder mit 1 oder mehreren jsonl-Dateien, die die Ausgaben der Flow-Läufe enthalten |
debug_info |
fest |
uri_folder |
ein uri_folder, der Debugging-Informationen des Flow-Laufs enthält, z. B. Laufprotokolle |
Parameter#
Schlüssel |
source |
type |
description |
|---|---|---|---|
<flow-input-name> |
aus Flow-Eingaben |
string |
Der Standardwert wird von den Flow-Eingaben übernommen; wird zur Überschreibung der Spaltenzuordnung für Flow-Eingaben verwendet. |
connections.<node-name>.connection |
aus Knoten integrierter LLM-Tools |
string |
Der Standardwert ist der aktuelle Wert, der in |
connections.<node-name>.deployment_name |
aus Knoten integrierter LLM-Tools |
string |
Der Standardwert ist der aktuelle Wert, der in |
connections.<node-name>.<node-input-key> |
aus Knoten mit |
string |
Der Standardwert ist der aktuelle Wert, der in |
environment_variables.<environment-variable-name> |
aus Umgebungsvariablen, die in |
string |
Der Standardwert ist der aktuelle Wert, der in |
Run-Einstellungen#
Schlüssel |
type |
Beschreibung |
|---|---|---|
instance_count |
integer |
Die Anzahl der zu verwendenden Knoten für den Auftrag. Der Standardwert ist 1. |
max_concurrency_per_instance |
integer |
Die Anzahl der Prozessoren auf jedem Knoten. |
mini_batch_error_threshold |
integer |
Definiert die Anzahl der fehlgeschlagenen Mini-Batches, die in diesem parallelen Auftrag ignoriert werden können. Wenn die Anzahl der fehlgeschlagenen Mini-Batches diesen Schwellenwert überschreitet, wird der parallele Auftrag als fehlgeschlagen markiert. |
retry_settings.max_retries |
integer |
Definiert die Anzahl der Wiederholungsversuche, wenn ein Mini-Batch fehlschlägt oder eine Zeitüberschreitung auftritt. Wenn alle Wiederholungsversuche fehlschlagen, wird der Mini-Batch als fehlgeschlagen markiert, um von der |
retry_settings.timeout |
integer |
Definiert die Zeitüberschreitung in Sekunden für die Ausführung der benutzerdefinierten run()-Funktion. Wenn die Ausführungszeit diesen Schwellenwert überschreitet, wird der Mini-Batch abgebrochen und als fehlgeschlagener Mini-Batch markiert, um einen Wiederholungsversuch auszulösen. |
logging_level |
INFO, WARNING oder DEBUG |
Definiert, welche Protokollebene in die Benutzerprotokolldateien geschrieben wird. |
Weitere Run-Einstellungen finden Sie im offiziellen Dokument der parallelen Komponente.