promptflow.executor.flow_executor Modul#
- class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#
Basiert auf:
objectDiese Klasse wird verwendet, um einen einzelnen Flow für verschiedene Eingaben auszuführen.
- Parameter:
flow (Flow) – Der auszuführende Flow.
connections (dict) – Die für den Flow zu verwendenden Verbindungen.
run_tracker (RunTracker) – Der für den Flow zu verwendende Laufverfolger.
cache_manager (AbstractCacheManager) – Der für den Flow zu verwendende Cache-Manager.
loaded_tools (Mapping[str, Callable]) – Die für den Flow zu verwendenden geladenen Werkzeuge.
worker_count (Optional[int]) – Die Anzahl der für den Flow zu verwendenden Worker. Standard ist 16.
raise_ex (Optional[bool]) – Gibt an, ob Ausnahmen ausgelöst werden sollen. Standard ist False.
working_dir (Optional[str]) – Das für den Flow zu verwendende Arbeitsverzeichnis. Standard ist None.
line_timeout_sec (Optional[int]) – Das für den Flow zu verwendende Zeilen-Timeout in Sekunden. Standard ist LINE_TIMEOUT_SEC.
flow_file (Optional[Path]) – Die für den Flow zu verwendende Flow-Datei. Standard ist None.
- property aggregation_nodes#
Gibt die Aggregationsknoten des Flow-Executors zurück.
- Gibt zurück:
Eine Liste von Aggregationsknoten.
- Rückgabetyp:
Liste
- static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any]#
- convert_flow_input_types(inputs: dict) Mapping[str, Any]#
Konvertiert die Eingabetypen des gegebenen Eingabewörterbuchs, um sie an die erwarteten Typen des Flows anzupassen.
- Parameter:
inputs (dict) – Ein Wörterbuch, das die Eingaben für den Flow enthält.
- Gibt zurück:
Ein Wörterbuch mit den konvertierten Eingaben.
- Rückgabetyp:
Mapping[str, Any]
- classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor#
Erstellt eine neue Instanz von FlowExecutor.
- Parameter:
flow_file (Path) – Der Pfad zur Flow-Datei.
connections (Union[dict, ConnectionProvider]) – Die für den Flow zu verwendenden Verbindungen.
working_dir (Optional[str]) – Das für den Flow zu verwendende Arbeitsverzeichnis. Standard ist None.
func (Optional[str]) – Die zu verwendende Funktion für den Flow, wenn .py angegeben ist. Standard ist None.
storage (Optional[AbstractRunStorage]) – Der für den Flow zu verwendende Speicher. Standard ist None.
raise_ex (Optional[bool]) – Gibt an, ob Ausnahmen ausgelöst werden sollen. Standard ist True.
node_override (Optional[Dict[str, Dict[str, Any]]) – Die für den Flow zu verwendenden Knotenüberschreibungen. Standard ist None.
line_timeout_sec (Optional[int]) – Das für den Flow zu verwendende Zeilen-Timeout in Sekunden. Standard ist LINE_TIMEOUT_SEC.
init_kwargs (Optional[Dict[str, Any]]) – Klasseninitialisierungsparameter für aufrufbare Klassen, nur für Flex-Flows unterstützt.
- Gibt zurück:
Eine neue Instanz von FlowExecutor.
- Rückgabetyp:
- enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#
Aktiviert den LLM-Knoten, der mit der Ausgabe verbunden ist, um Streaming-Ergebnisse zurückzugeben, die durch stream_required gesteuert werden.
Wenn der Callback `stream_required` True zurückgibt, gibt der LLM-Knoten einen Generator von Zeichenketten zurück. Andernfalls gibt der LLM-Knoten eine Zeichenkette zurück.
- Parameter:
stream_required (Callable[[], bool]) – Ein Callback, der keine Argumente entgegennimmt und einen booleschen Wert zurückgibt, der angibt, ob Streaming-Ergebnisse für den LLM-Knoten aktiviert werden sollen.
- Gibt zurück:
Keine
- ensure_flow_is_serializable()#
Stellt sicher, dass der Flow serialisierbar ist.
Einige Knoten können einen Generator von Zeichenketten zurückgeben, um Streaming-Ausgaben zu erzeugen. Dies ist nützlich, wenn der Flow als Webdienst bereitgestellt wird. Im interaktiven Modus geht der Executor jedoch davon aus, dass das Knotenergebnis JSON-serialisierbar ist.
Diese Methode fügt jedem Knoten im Flow einen Wrapper hinzu, um die Streaming-Ausgaben zu verbrauchen und sie zu einer Zeichenkette für die Executor-Nutzung zusammenzuführen.
- Gibt zurück:
Keine
- exec(inputs: dict, node_concurrency=16) dict#
Führt den Flow mit den gegebenen Eingaben aus und gibt die Ausgabe zurück.
- Parameter:
inputs (dict) – Ein Wörterbuch, das die Eingabewerte für den Flow enthält.
node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.
- Gibt zurück:
Ein Wörterbuch mit den Ausgabewerten des Flows.
- Rückgabetyp:
dict
- exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult#
Führt den Aggregationsknoten des Flows aus.
- Parameter:
inputs (Mapping[str, Any]) – Eine Zuordnung von Eingabenamen zu ihren Werten.
aggregation_inputs (Mapping[str, Any]) – Eine Zuordnung von Aggregationseingabenamen zu ihren Werten.
run_id (Optional[str]) – Die ID des aktuellen Laufs, falls vorhanden.
node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.
- Gibt zurück:
Das Ergebnis des Aggregationsknotens.
- Rückgabetyp:
AggregationResult
- Löst aus:
FlowError, wenn die Eingaben oder aggregation_inputs ungültig sind.
- exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult#
Führt eine einzelne Zeile des Flows aus.
- Parameter:
inputs (Mapping[str, Any]) – Die Eingabewerte für die Zeile.
index (Optional[int]) – Der Index der auszuführenden Zeile.
run_id (Optional[str]) – Die ID des Flow-Laufs.
validate_inputs (bool) – Gibt an, ob die Eingabewerte validiert werden sollen.
node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.
allow_generator_output (bool) – Gibt an, ob die Generatorausgabe zugelassen werden soll.
line_timeout_sec (Optional[int]) – Die maximale Wartezeit für eine Zeile der Ausgabe.
- Gibt zurück:
Das Ergebnis der Zeilenausführung.
- Rückgabetyp:
LineResult
- async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult#
Führt eine einzelne Zeile des Flows aus.
- Parameter:
inputs (Mapping[str, Any]) – Die Eingabewerte für die Zeile.
index (Optional[int]) – Der Index der auszuführenden Zeile.
run_id (Optional[str]) – Die ID des Flow-Laufs.
validate_inputs (bool) – Gibt an, ob die Eingabewerte validiert werden sollen.
node_concurrency (int) – Die maximale Anzahl von Knoten, die gleichzeitig ausgeführt werden können.
allow_generator_output (bool) – Gibt an, ob die Generatorausgabe zugelassen werden soll.
sync_iterator_to_async (bool) – Gibt an, ob die synchrone Iteratorausgabe in eine asynchrone Iteratorausgabe konvertiert werden soll.
- Gibt zurück:
Das Ergebnis der Zeilenausführung.
- Rückgabetyp:
LineResult
- get_inputs_definition()#
- get_status_summary(run_id: str)#
Ruft eine Zusammenfassung des Status eines bestimmten Laufs ab.
- Parameter:
run_id (str) – Die ID des Laufs, für den die Statuszusammenfassung abgerufen werden soll.
- Gibt zurück:
Eine Zusammenfassung des Status des gegebenen Laufs.
- Rückgabetyp:
str
- property has_aggregation_node: bool#
Prüft, ob der Flow-Executor über Aggregationsknoten verfügt.
- Gibt zurück:
True, wenn der Flow-Executor mindestens einen Aggregationsknoten hat, andernfalls False.
- Rückgabetyp:
bool
- classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#
Lädt und führt einen einzelnen Knoten aus dem Flow aus.
- Parameter:
flow_file (Path) – Der Pfad zur Flow-Datei.
node_name (str) – Der Name des auszuführenden Knotens.
storage (Optional[AbstractRunStorage]) – Der für den Flow zu verwendende Speicher.
output_sub_dir (Optional[str]) – Das Verzeichnis zum Speichern von Bildern für den Flow. Beibehalten nur für Abwärtskompatibilität.
flow_inputs (Optional[Mapping[str, Any]]) – Die für den Flow zu verwendenden Eingaben. Standard ist None.
dependency_nodes_outputs (Optional[Mapping[str, Any]) – Die Ausgaben der Abhängigkeitsknoten. Standard ist None.
connections (Optional[dict]) – Die für den Flow zu verwendenden Verbindungen. Standard ist None.
working_dir (Optional[str]) – Das für den Flow zu verwendende Arbeitsverzeichnis. Standard ist None.
raise_ex (Optional[bool]) – Gibt an, ob Ausnahmen ausgelöst werden sollen. Standard ist False.
- static update_environment_variables_with_connections(connections: dict)#
Aktualisiert Umgebungsvariablen mit Verbindungen.
- Parameter:
connections (dict) – Ein Wörterbuch mit Verbindungsinformationen.
- Gibt zurück:
Ein Wörterbuch mit aktualisierten Umgebungsvariablen.
- Rückgabetyp:
dict
- promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#
Aktiviert den Stream-Modus für LLM-Werkzeuge, die ihn unterstützen.
- Parameter:
f (function) – Die zu umschließende Funktion.
- Gibt zurück:
Die umschlossene Funktion.
- Rückgabetyp:
Funktion
Die Werkzeuge AzureOpenAI.completion und AzureOpenAI.chat unterstützen sowohl den Stream- als auch den Nicht-Stream-Modus. Der Stream-Modus ist standardmäßig deaktiviert. Verwenden Sie diesen Wrapper, um ihn zu aktivieren.
- promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult#
Führt den Flow aus, einschließlich Aggregationsknoten.
- Parameter:
flow_file (Path) – Der Pfad zur Flow-Datei.
working_dir (Path) – Das Arbeitsverzeichnis des Flows.
output_dir (Path) – Relativer Pfad relativ zum Arbeitsverzeichnis.
connections (dict) – Ein Wörterbuch mit Verbindungsinformationen.
inputs (Mapping[str, Any]) – Ein Wörterbuch mit den Eingabewerten für den Flow.
enable_stream_output (Optional[bool]) – Gibt an, ob eine Stream-Ausgabe (Generator) für die Flow-Ausgabe zugelassen werden soll. Standard ist False.
run_id (Optional[str]) – Die Lauf-ID wird im Operationskontext festgelegt und für die Sitzung verwendet.
init_kwargs (dict) – Initialisierungsparameter für Flex-Flows, nur unterstützt, wenn der Flow eine aufrufbare Klasse ist.
kwargs (Any) – Andere Schlüsselwortargumente zum Erstellen des Flow-Executors.
- Gibt zurück:
Das Zeilenergebnis der Flow-Ausführung.
- Rückgabetyp:
LineResult
- promptflow.executor.flow_executor.signal_handler(sig, frame)#
Verarbeitet das vom Prozess empfangene Abbruchsignal.
Derzeit wird nur der Ein-Knoten-Lauf mit diesem Handler ausgeführt. Wir geben das Protokoll aus und lösen eine `KeyboardInterrupt`-Ausnahme aus, damit externer Code diese Ausnahme abfangen und den laufenden Knoten abbrechen kann.