promptflow.executor.flow_executor Modul#

class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#

Basiert auf: object

Diese Klasse wird verwendet, um einen einzelnen Flow für verschiedene Eingaben auszuführen.

Parameter:
  • flow (Flow) – Der auszuführende Flow.

  • connections (dict) – Die für den Flow zu verwendenden Verbindungen.

  • run_tracker (RunTracker) – Der für den Flow zu verwendende Laufverfolger.

  • cache_manager (AbstractCacheManager) – Der für den Flow zu verwendende Cache-Manager.

  • loaded_tools (Mapping[str, Callable]) – Die für den Flow zu verwendenden geladenen Werkzeuge.

  • worker_count (Optional[int]) – Die Anzahl der für den Flow zu verwendenden Worker. Standard ist 16.

  • raise_ex (Optional[bool]) – Gibt an, ob Ausnahmen ausgelöst werden sollen. Standard ist False.

  • working_dir (Optional[str]) – Das für den Flow zu verwendende Arbeitsverzeichnis. Standard ist None.

  • line_timeout_sec (Optional[int]) – Das für den Flow zu verwendende Zeilen-Timeout in Sekunden. Standard ist LINE_TIMEOUT_SEC.

  • flow_file (Optional[Path]) – Die für den Flow zu verwendende Flow-Datei. Standard ist None.

property aggregation_nodes#

Gibt die Aggregationsknoten des Flow-Executors zurück.

Gibt zurück:

Eine Liste von Aggregationsknoten.

Rückgabetyp:

Liste

static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any]#
convert_flow_input_types(inputs: dict) Mapping[str, Any]#

Konvertiert die Eingabetypen des gegebenen Eingabewörterbuchs, um sie an die erwarteten Typen des Flows anzupassen.

Parameter:

inputs (dict) – Ein Wörterbuch, das die Eingaben für den Flow enthält.

Gibt zurück:

Ein Wörterbuch mit den konvertierten Eingaben.

Rückgabetyp:

Mapping[str, Any]

classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor#

Erstellt eine neue Instanz von FlowExecutor.

Parameter:
  • flow_file (Path) – Der Pfad zur Flow-Datei.

  • connections (Union[dict, ConnectionProvider]) – Die für den Flow zu verwendenden Verbindungen.

  • working_dir (Optional[str]) – Das für den Flow zu verwendende Arbeitsverzeichnis. Standard ist None.

  • func (Optional[str]) – Die zu verwendende Funktion für den Flow, wenn .py angegeben ist. Standard ist None.

  • storage (Optional[AbstractRunStorage]) – Der für den Flow zu verwendende Speicher. Standard ist None.

  • raise_ex (Optional[bool]) – Gibt an, ob Ausnahmen ausgelöst werden sollen. Standard ist True.

  • node_override (Optional[Dict[str, Dict[str, Any]]) – Die für den Flow zu verwendenden Knotenüberschreibungen. Standard ist None.

  • line_timeout_sec (Optional[int]) – Das für den Flow zu verwendende Zeilen-Timeout in Sekunden. Standard ist LINE_TIMEOUT_SEC.

  • init_kwargs (Optional[Dict[str, Any]]) – Klasseninitialisierungsparameter für aufrufbare Klassen, nur für Flex-Flows unterstützt.

Gibt zurück:

Eine neue Instanz von FlowExecutor.

Rückgabetyp:

FlowExecutor

enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#

Aktiviert den LLM-Knoten, der mit der Ausgabe verbunden ist, um Streaming-Ergebnisse zurückzugeben, die durch stream_required gesteuert werden.

Wenn der Callback `stream_required` True zurückgibt, gibt der LLM-Knoten einen Generator von Zeichenketten zurück. Andernfalls gibt der LLM-Knoten eine Zeichenkette zurück.

Parameter:

stream_required (Callable[[], bool]) – Ein Callback, der keine Argumente entgegennimmt und einen booleschen Wert zurückgibt, der angibt, ob Streaming-Ergebnisse für den LLM-Knoten aktiviert werden sollen.

Gibt zurück:

Keine

ensure_flow_is_serializable()#

Stellt sicher, dass der Flow serialisierbar ist.

Einige Knoten können einen Generator von Zeichenketten zurückgeben, um Streaming-Ausgaben zu erzeugen. Dies ist nützlich, wenn der Flow als Webdienst bereitgestellt wird. Im interaktiven Modus geht der Executor jedoch davon aus, dass das Knotenergebnis JSON-serialisierbar ist.

Diese Methode fügt jedem Knoten im Flow einen Wrapper hinzu, um die Streaming-Ausgaben zu verbrauchen und sie zu einer Zeichenkette für die Executor-Nutzung zusammenzuführen.

Gibt zurück:

Keine

exec(inputs: dict, node_concurrency=16) dict#

Führt den Flow mit den gegebenen Eingaben aus und gibt die Ausgabe zurück.

Parameter:
  • inputs (dict) – Ein Wörterbuch, das die Eingabewerte für den Flow enthält.

  • node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.

Gibt zurück:

Ein Wörterbuch mit den Ausgabewerten des Flows.

Rückgabetyp:

dict

exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult#

Führt den Aggregationsknoten des Flows aus.

Parameter:
  • inputs (Mapping[str, Any]) – Eine Zuordnung von Eingabenamen zu ihren Werten.

  • aggregation_inputs (Mapping[str, Any]) – Eine Zuordnung von Aggregationseingabenamen zu ihren Werten.

  • run_id (Optional[str]) – Die ID des aktuellen Laufs, falls vorhanden.

  • node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.

Gibt zurück:

Das Ergebnis des Aggregationsknotens.

Rückgabetyp:

AggregationResult

Löst aus:

FlowError, wenn die Eingaben oder aggregation_inputs ungültig sind.

exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult#

Führt eine einzelne Zeile des Flows aus.

Parameter:
  • inputs (Mapping[str, Any]) – Die Eingabewerte für die Zeile.

  • index (Optional[int]) – Der Index der auszuführenden Zeile.

  • run_id (Optional[str]) – Die ID des Flow-Laufs.

  • validate_inputs (bool) – Gibt an, ob die Eingabewerte validiert werden sollen.

  • node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.

  • allow_generator_output (bool) – Gibt an, ob die Generatorausgabe zugelassen werden soll.

  • line_timeout_sec (Optional[int]) – Die maximale Wartezeit für eine Zeile der Ausgabe.

Gibt zurück:

Das Ergebnis der Zeilenausführung.

Rückgabetyp:

LineResult

async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult#

Führt eine einzelne Zeile des Flows aus.

Parameter:
  • inputs (Mapping[str, Any]) – Die Eingabewerte für die Zeile.

  • index (Optional[int]) – Der Index der auszuführenden Zeile.

  • run_id (Optional[str]) – Die ID des Flow-Laufs.

  • validate_inputs (bool) – Gibt an, ob die Eingabewerte validiert werden sollen.

  • node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.

  • allow_generator_output (bool) – Gibt an, ob die Generatorausgabe zugelassen werden soll.

  • sync_iterator_to_async (bool) – Gibt an, ob die synchrone Iteratorausgabe in eine asynchrone Iteratorausgabe konvertiert werden soll.

Gibt zurück:

Das Ergebnis der Zeilenausführung.

Rückgabetyp:

LineResult

get_inputs_definition()#
get_status_summary(run_id: str)#

Ruft eine Zusammenfassung des Status eines bestimmten Laufs ab.

Parameter:

run_id (str) – Die ID des Laufs, für den die Statuszusammenfassung abgerufen werden soll.

Gibt zurück:

Eine Zusammenfassung des Status des gegebenen Laufs.

Rückgabetyp:

str

property has_aggregation_node: bool#

Prüft, ob der Flow-Executor über Aggregationsknoten verfügt.

Gibt zurück:

True, wenn der Flow-Executor mindestens einen Aggregationsknoten hat, andernfalls False.

Rückgabetyp:

bool

classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#

Lädt und führt einen einzelnen Knoten aus dem Flow aus.

Parameter:
  • flow_file (Path) – Der Pfad zur Flow-Datei.

  • node_name (str) – Der Name des auszuführenden Knotens.

  • storage (Optional[AbstractRunStorage]) – Der für den Flow zu verwendende Speicher.

  • output_sub_dir (Optional[str]) – Das Verzeichnis zum Speichern von Bildern für den Flow. Beibehalten nur für Abwärtskompatibilität.

  • flow_inputs (Optional[Mapping[str, Any]]) – Die für den Flow zu verwendenden Eingaben. Standard ist None.

  • dependency_nodes_outputs (Optional[Mapping[str, Any]) – Die Ausgaben der Abhängigkeitsknoten. Standard ist None.

  • connections (Optional[dict]) – Die für den Flow zu verwendenden Verbindungen. Standard ist None.

  • working_dir (Optional[str]) – Das für den Flow zu verwendende Arbeitsverzeichnis. Standard ist None.

  • raise_ex (Optional[bool]) – Gibt an, ob Ausnahmen ausgelöst werden sollen. Standard ist False.

static update_environment_variables_with_connections(connections: dict)#

Aktualisiert Umgebungsvariablen mit Verbindungen.

Parameter:

connections (dict) – Ein Wörterbuch mit Verbindungsinformationen.

Gibt zurück:

Ein Wörterbuch mit aktualisierten Umgebungsvariablen.

Rückgabetyp:

dict

promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#

Aktiviert den Stream-Modus für LLM-Werkzeuge, die ihn unterstützen.

Parameter:

f (function) – Die zu umschließende Funktion.

Gibt zurück:

Die umschlossene Funktion.

Rückgabetyp:

Funktion

Die Werkzeuge AzureOpenAI.completion und AzureOpenAI.chat unterstützen sowohl den Stream- als auch den Nicht-Stream-Modus. Der Stream-Modus ist standardmäßig deaktiviert. Verwenden Sie diesen Wrapper, um ihn zu aktivieren.

promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult#

Führt den Flow aus, einschließlich Aggregationsknoten.

Parameter:
  • flow_file (Path) – Der Pfad zur Flow-Datei.

  • working_dir (Path) – Das Arbeitsverzeichnis des Flows.

  • output_dir (Path) – Relativer Pfad relativ zum Arbeitsverzeichnis.

  • connections (dict) – Ein Wörterbuch mit Verbindungsinformationen.

  • inputs (Mapping[str, Any]) – Ein Wörterbuch mit den Eingabewerten für den Flow.

  • enable_stream_output (Optional[bool]) – Gibt an, ob eine Stream-Ausgabe (Generator) für die Flow-Ausgabe zugelassen werden soll. Standard ist False.

  • run_id (Optional[str]) – Die Lauf-ID wird im Operationskontext festgelegt und für die Sitzung verwendet.

  • init_kwargs (dict) – Initialisierungsparameter für Flex-Flows, nur unterstützt, wenn der Flow eine aufrufbare Klasse ist.

  • kwargs (Any) – Andere Schlüsselwortargumente zum Erstellen des Flow-Executors.

Gibt zurück:

Das Zeilenergebnis der Flow-Ausführung.

Rückgabetyp:

LineResult

promptflow.executor.flow_executor.signal_handler(sig, frame)#

Verarbeitet das vom Prozess empfangene Abbruchsignal.

Derzeit wird nur der Ein-Knoten-Lauf mit diesem Handler ausgeführt. Wir geben das Protokoll aus und lösen eine `KeyboardInterrupt`-Ausnahme aus, damit externer Code diese Ausnahme abfangen und den laufenden Knoten abbrechen kann.