Flow in Azure ML Pipeline-Auftrag verwenden#

In praktischen Szenarien erfüllen Flows verschiedene Funktionen. Betrachten Sie beispielsweise einen Offline-Flow, der speziell zur Bewertung des Relevanz-Scores für Kommunikationssitzungen zwischen Menschen und Agenten entwickelt wurde. Dieser Flow wird nächtlich ausgelöst und verarbeitet eine beträchtliche Menge an Sitzungsdaten. In einem solchen Kontext sind die Parallel-Komponente und AzureML-Pipeline die optimalen Wahlmöglichkeiten für die Handhabung von groß angelegten, hochgradig widerstandsfähigen und effizienten Offline-Batch-Anforderungen.

Sobald Sie Ihren Flow entwickelt und gründlich getestet haben, führt Sie diese Anleitung durch die Verwendung Ihres Flows als parallele Komponente in einem AzureML-Pipeline-Auftrag.

Voraussetzungen

Um diese Funktion zu ermöglichen, müssen Kunden

  1. die zugehörige CLI oder das zugehörige Paket installieren

    1. Für CLI installieren Sie zuerst die Azure CLI und dann die Erweiterung ml>=2.22.0 über az extension add -n ml;

    2. Für SDK installieren Sie das Paket azure-ai-ml>=1.12.0 über pip install azure-ai-ml>=1.12.0 oder pip install promptflow[azure];

  2. sicherstellen, dass ein $schema in der Zielquelle vorhanden ist

    1. flow.dag.yaml: $schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json

    2. run.yaml: $schema: https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json

  3. sicherstellen, dass Metadaten generiert wurden und aktuell sind

    1. <mein-Flow-Verzeichnis>/.promptflow/flow.tools.json sollte existieren;

    2. Kunden können die Datei über pf flow validate --source <mein-Flow-Verzeichnis> aktualisieren.

Um ein ausführbares End-to-End-Beispiel für die Ausführung eines Beispiel-Flows in einem Azure ML-Workspace zu erkunden, können Sie sich dieses Tutorial-Notebook ansehen: Flow mit Pipeline ausführen

Weitere Informationen zu AzureML und Komponenten

Einen Flow als Komponente registrieren#

Angenommen, es gibt bereits einen Flow und dessen flow.dag.yaml sieht wie folgt aus

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
environment:
  python_requirements_txt: requirements.txt
inputs:
  text:
    type: string
    default: Hello World!
outputs:
  output:
    type: string
    reference: ${llm.output}
nodes:
- name: hello_prompt
  type: prompt
  source:
    type: code
    path: hello.jinja2
  inputs:
    text: ${inputs.text}
- name: llm
  type: python
  source:
    type: code
    path: hello.py
  inputs:
    prompt: ${hello_prompt.output}
    deployment_name: text-davinci-003
    max_tokens: "120"

Kunden können einen Flow entweder über CLI oder SDK als Komponente registrieren

# Register flow as a component
az ml component create --file \<my-flow-directory\>/flow.dag.yaml

# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
az ml component create --file \<my-flow-directory\>/flow.dag.yaml --version 3 --set name=basic_updated
from azure.ai.ml import MLClient, load_component

ml_client = MLClient()

# Register flow as a component
flow_component = load_component("<my-flow-directory>/flow.dag.yaml")
ml_client.components.create_or_update(flow_component)

# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
flow_component.name = "basic_updated"
ml_client.components.create_or_update(flow_component, version="3")

Die generierte Komponente wird eine parallele Komponente sein, deren Definition wie folgt aussehen wird

name: basic
version: 1
display_name: basic
is_deterministic: True
type: parallel
inputs:
  data:
    type: uri_folder
    optional: False
  run_outputs:
    type: uri_folder
    optional: True
  text:
    type: string
    optional: False
    default: Hello World!
outputs:
  flow_outputs:
    type: uri_folder
  debug_info:
    type: uri_folder
...

Neben den festen Eingabe-/Ausgabe-Ports werden alle Verbindungen und Flow-Eingaben als Eingabeparameter der Komponente verfügbar gemacht. Standardwerte können in der Flow-/Run-Definition angegeben werden; sie können auch beim Einreichen des Auftrags gesetzt/überschrieben werden. Eine vollständige Beschreibung der Ports finden Sie im Abschnitt Komponenten-Ports und Run-Einstellungen.

Flow in einem Pipeline-Auftrag verwenden#

Nachdem ein Flow als Komponente registriert wurde, kann er in einem Pipeline-Auftrag wie reguläre registrierte Komponenten referenziert werden. Kunden können einen Flow auch direkt in einem Pipeline-Auftrag verwenden, dann werden beim Einreichen des Auftrags anonyme Komponenten erstellt.

...
inputs:
  basic_input:
    type: uri_file
    path: <path-to-data>
compute: azureml:cpu-cluster
jobs:
  flow_from_registered:
    type: parallel
    component: azureml:my_flow_component:1
    inputs:
      data: ${{parent.inputs.basic_input}}
      text: "${data.text}"
  flow_from_dag:
    type: parallel
    component: <path-to-flow-dag-yaml>
    inputs:
      data: ${{parent.inputs.basic_input}}
      text: "${data.text}"
  flow_from_run:
    type: parallel
    component: <path-to-run-yaml>
    inputs:
      data: ${{parent.inputs.basic_input}}
      text: "${data.text}"
...

Pipeline-Aufträge können über az ml job create --file pipeline.yml eingereicht werden.

Ein vollständiges Beispiel finden Sie hier.

from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline

credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')

flow_component_from_registered = ml_client.components.get("my_flow_component", "1")
flow_component_from_dag = load_component("<path-to-flow-dag-yaml>")
flow_component_from_run = load_component("<path-to-run-yaml>")

@pipeline
def pipeline_func_with_flow(basic_input):
    flow_from_registered = flow_component_from_registered(
        data=data,
        text="${data.text}",
    )
    flow_from_dag = flow_component_from_dag(
        data=data,
        text="${data.text}",
    )
    flow_from_run = flow_component_from_run(
        data=data,
        text="${data.text}",
    )

pipeline_with_flow = pipeline_func_with_flow(basic_input=data_input)
pipeline_with_flow.compute = "cpu-cluster"

pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)

Ein vollständiges Beispiel finden Sie hier.

Wie bei regulären parallelen Komponenten können Kunden in einem Pipeline-Auftrag Run-Einstellungen dafür festlegen. Einige häufig verwendete Run-Einstellungen sind im Abschnitt Komponenten-Ports und Run-Einstellungen aufgeführt; Kunden können auch das offizielle Dokument der parallelen Komponente für weitere Details konsultieren.

...
jobs:
  flow_node:
    type: parallel
    component: <path-to-complicated-run-yaml>
    compute: azureml:cpu-cluster
    instance_count: 2
    max_concurrency_per_instance: 2
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 3
      timeout: 30
    inputs:
      data: ${{parent.inputs.data}}
      url: "${data.url}"
      connections.summarize_text_content.connection: azure_open_ai_connection
      connections.summarize_text_content.deployment_name: text-davinci-003
      environment_variables.AZURE_OPENAI_API_KEY: ${my_connection.api_key}
      environment_variables.AZURE_OPENAI_API_BASE: ${my_connection.api_base}
...
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import RetrySettings

credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')

# Load flow as a component
flow_component = load_component("<path-to-complicated-run-yaml>")

@pipeline
def pipeline_func_with_flow(data):
    flow_node = flow_component(
        data=data,
        url="${data.url}",
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "text-davinci-003",
            },
        },
        environment_variables={
            "AZURE_OPENAI_API_KEY": "${my_connection.api_key}",
            "AZURE_OPENAI_API_BASE": "${my_connection.api_base}",
        }
    )
    flow_node.compute = "cpu-cluster"
    flow_node.instance_count = 2
    flow_node.max_concurrency_per_instance = 2
    flow_node.mini_batch_error_threshold = 5
    flow_node.retry_settings = RetrySettings(timeout=30, max_retries=5)

pipeline_with_flow = pipeline_func_with_flow(data=data_input)

pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)

Umgebung der Komponente#

Standardmäßig basiert die Umgebung der erstellten Komponente auf dem neuesten Promptflow-Runtime-Image. Wenn Kunden eine Python-Anforderungsdatei in flow.dag.yaml angegeben haben, werden diese automatisch auf die Umgebung angewendet.

...
environment:
  python_requirements_txt: requirements.txt

Wenn Kunden eine bestehende Azure ML-Umgebung verwenden oder die Umgebung im Azure ML-Stil definieren möchten, können sie dies wie folgt in run.yaml definieren

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json
flow: <my-flow-directory>
azureml:
  environment: azureml:my-environment:1

Weitere Details zum unterstützten Format von Azure ML-Umgebungen finden Sie in diesem Dokument.

Unterschiede zwischen Flow in Prompt Flow und Pipeline-Auftrag#

In Prompt Flow läuft ein Flow in einer Compute-Sitzung, die für Prompt Flow konzipiert ist; in einem Pipeline-Auftrag läuft ein Flow hingegen auf verschiedenen Arten von Compute-Ressourcen, üblicherweise auf Compute-Clustern.

Vor diesem Hintergrund sollten Sie sich bewusst sein, dass es zu unerwarteten Fehlern kommen kann, wenn Ihr Flow Logik enthält, die von Identitäten oder Umgebungsvariablen abhängt, da der Flow in einem Pipeline-Auftrag auf anderen Umgebungen ausgeführt wird. Möglicherweise sind zusätzliche Konfigurationen erforderlich, damit er funktioniert.

Komponenten-Ports und Run-Einstellungen#

Eingabe-Ports#

Schlüssel

source

type

description

data

fest

uri_folder oder uri_file

erforderlich; zur Übergabe von Eingabedaten. Unterstützte Formate umfassen mltable und Listen von jsonl-Dateien.

run_outputs

fest

uri_folder

optional; zur Übergabe der Ausgabe eines Standard-Flows für einen Evaluations-Flow. Sollte mit einem flow_outputs eines vorherigen Flow-Knotens in der Pipeline verknüpft sein.

Ausgabe-Ports#

Schlüssel

source

type

description

flow_outputs

fest

uri_folder

ein uri_folder mit 1 oder mehreren jsonl-Dateien, die die Ausgaben der Flow-Läufe enthalten

debug_info

fest

uri_folder

ein uri_folder, der Debugging-Informationen des Flow-Laufs enthält, z. B. Laufprotokolle

Parameter#

Schlüssel

source

type

description

<flow-input-name>

aus Flow-Eingaben

string

Der Standardwert wird von den Flow-Eingaben übernommen; wird zur Überschreibung der Spaltenzuordnung für Flow-Eingaben verwendet.

connections.<node-name>.connection

aus Knoten integrierter LLM-Tools

string

Der Standardwert ist der aktuelle Wert, der in flow.dag.yaml oder run.yaml definiert ist; zur Überschreibung verwendeter Verbindungen von entsprechenden Knoten. Die Verbindung sollte im aktuellen Workspace vorhanden sein.

connections.<node-name>.deployment_name

aus Knoten integrierter LLM-Tools

string

Der Standardwert ist der aktuelle Wert, der in flow.dag.yaml oder run.yaml definiert ist; zur Überschreibung von Ziel-Deployment-Namen von entsprechenden Knoten. Das Deployment sollte mit der angegebenen Verbindung verfügbar sein.

connections.<node-name>.<node-input-key>

aus Knoten mit Connection Eingaben

string

Der Standardwert ist der aktuelle Wert, der in flow.dag.yaml oder run.yaml definiert ist; zur Überschreibung verwendeter Verbindungen von entsprechenden Knoten. Die Verbindung sollte im aktuellen Workspace vorhanden sein.

environment_variables.<environment-variable-name>

aus Umgebungsvariablen, die in run.yaml definiert sind

string

Der Standardwert ist der aktuelle Wert, der in run.yaml definiert ist; zur Überschreibung von Umgebungsvariablen während des Flow-Laufs, z. B. AZURE_OPENAI_API_KEY. Beachten Sie, dass Sie auf Workspace-Verbindungen mit Ausdrücken wie {my_connection.api_key} verweisen können.

Run-Einstellungen#

Schlüssel

type

Beschreibung

instance_count

integer

Die Anzahl der zu verwendenden Knoten für den Auftrag. Der Standardwert ist 1.

max_concurrency_per_instance

integer

Die Anzahl der Prozessoren auf jedem Knoten.

mini_batch_error_threshold

integer

Definiert die Anzahl der fehlgeschlagenen Mini-Batches, die in diesem parallelen Auftrag ignoriert werden können. Wenn die Anzahl der fehlgeschlagenen Mini-Batches diesen Schwellenwert überschreitet, wird der parallele Auftrag als fehlgeschlagen markiert.

Ein Mini-Batch wird als fehlgeschlagen markiert, wenn
- die Anzahl der Rückgaben von run() geringer ist als die Anzahl der Mini-Batch-Eingaben.
- Ausnahmen im benutzerdefinierten run()-Code abgefangen werden.

Standardmäßig ist der Wert "-1" eingestellt, was bedeutet, dass alle fehlgeschlagenen Mini-Batches während des parallelen Auftrags ignoriert werden.

retry_settings.max_retries

integer

Definiert die Anzahl der Wiederholungsversuche, wenn ein Mini-Batch fehlschlägt oder eine Zeitüberschreitung auftritt. Wenn alle Wiederholungsversuche fehlschlagen, wird der Mini-Batch als fehlgeschlagen markiert, um von der mini_batch_error_threshold-Berechnung berücksichtigt zu werden.

retry_settings.timeout

integer

Definiert die Zeitüberschreitung in Sekunden für die Ausführung der benutzerdefinierten run()-Funktion. Wenn die Ausführungszeit diesen Schwellenwert überschreitet, wird der Mini-Batch abgebrochen und als fehlgeschlagener Mini-Batch markiert, um einen Wiederholungsversuch auszulösen.

logging_level

INFO, WARNING oder DEBUG

Definiert, welche Protokollebene in die Benutzerprotokolldateien geschrieben wird.

Weitere Run-Einstellungen finden Sie im offiziellen Dokument der parallelen Komponente.