Einleitung

Dieses Repository enthält die Implementierung des Data Collection Prozesses für das Training der Large Action Models (LAMs) im Paper Large Action Models: From Inception to Implementation. Der Data Collection Prozess ist darauf ausgelegt, die Aufgabenverarbeitung zu optimieren und sicherzustellen, dass alle notwendigen Schritte von der Initialisierung bis zur Ausführung nahtlos integriert sind. Dieses Modul ist Teil des UFO Projekts.

Datenfluss

Dataflow nutzt UFO zur Implementierung von instantiation, execution und dataflow für eine gegebene Aufgabe, mit Optionen für Stapelverarbeitung und Einzelverarbeitung.

  1. Instantiation: Instantiation bezieht sich auf den Prozess der Einrichtung und Vorbereitung einer Aufgabe zur Ausführung. Dieser Schritt beinhaltet typischerweise das Auswählen einer Vorlage, Prefill und Filter.
  2. Execution: Execution ist der eigentliche Prozess der Ausführung der Aufgabe. Dieser Schritt beinhaltet die Durchführung der durch die Instantiation spezifizierten Aktionen oder Operationen. Nach der Ausführung wird ein Bewertungsagent die Qualität des gesamten Ausführungsprozesses bewerten.
  3. Dataflow: Dataflow ist der übergreifende Prozess, der Instantiation und Execution zu einer einzigen Pipeline kombiniert. Er bietet eine End-to-End-Lösung für die Verarbeitung von Aufgaben und stellt sicher, dass alle notwendigen Schritte (von der Initialisierung bis zur Ausführung) nahtlos integriert sind.

Sie können instantiation und execution unabhängig voneinander verwenden, wenn Sie nur einen bestimmten Teil des Prozesses durchführen müssen. Wenn beide Schritte für eine Aufgabe erforderlich sind, optimiert der dataflow Prozess diese und ermöglicht Ihnen, Aufgaben von Anfang bis Ende in einer einzigen Pipeline auszuführen.

Die gesamte Verarbeitung des Dataflows ist wie folgt. Gegeben sind Task-Plan-Daten, die LLM werden Task-Aktionsdaten instantiieren, einschließlich der Auswahl einer Vorlage, Prefill und Filter.

Anwendung

1. Pakete installieren

Sie sollten die notwendigen Pakete im UFO-Root-Ordner installieren.

pip install -r requirements.txt

2. LLMs konfigurieren

Bevor Sie Dataflow ausführen, müssen Sie Ihre LLM-Konfigurationen individuell für PrefillAgent und FilterAgent angeben. Sie können Ihre eigene Konfigurationsdatei dataflow/config/config.yaml erstellen, indem Sie dataflow/config/config.yaml.template kopieren und die Konfiguration für PREFILL_AGENT und FILTER_AGENT wie folgt bearbeiten:

OpenAI

VISUAL_MODE: True, # Whether to use the visual mode
API_TYPE: "openai" , # The API type, "openai" for the OpenAI API.  
API_BASE: "https://api.openai.com/v1/chat/completions", # The the OpenAI API endpoint.
API_KEY: "sk-",  # The OpenAI API key, begin with sk-
API_VERSION: "2024-02-15-preview", # "2024-02-15-preview" by default
API_MODEL: "gpt-4-vision-preview",  # The only OpenAI model

Azure OpenAI (AOAI)

VISUAL_MODE: True, # Whether to use the visual mode
API_TYPE: "aoai" , # The API type, "aoai" for the Azure OpenAI.  
API_BASE: "YOUR_ENDPOINT", #  The AOAI API address. Format: https://{your-resource-name}.openai.azure.com
API_KEY: "YOUR_KEY",  # The aoai API key
API_VERSION: "2024-02-15-preview", # "2024-02-15-preview" by default
API_MODEL: "gpt-4-vision-preview",  # The only OpenAI model
API_DEPLOYMENT_ID: "YOUR_AOAI_DEPLOYMENT", # The deployment id for the AOAI API

Sie können auch nicht-visuelle Modelle (z. B. GPT-4) für jeden Agenten verwenden, indem Sie VISUAL_MODE: False und die entsprechenden API_MODEL (openai) und API_DEPLOYMENT_ID (aoai) einstellen.

Nicht-visuelle Modellkonfiguration

Sie können nicht-visuelle Modelle (z. B. GPT-4) für jeden Agenten verwenden, indem Sie die folgenden Einstellungen in der Datei config.yaml konfigurieren:

  • VISUAL_MODE: False # Um den nicht-visuellen Modus zu aktivieren.
  • Geben Sie das entsprechende API_MODEL (OpenAI) und API_DEPLOYMENT_ID (AOAI) für jeden Agenten an.

Stellen Sie sicher, dass Sie diese Einstellungen korrekt konfigurieren, um nicht-visuelle Modelle effektiv zu nutzen.

Weitere Konfigurationen

config_dev.yaml gibt die Pfade relevanter Dateien an und enthält Standardeinstellungen. Die Abgleichstrategie für den Fensterabgleich und den Steuerfilter unterstützt Optionen: 'contains', 'fuzzy' und 'regex', was flexible Abgleichstrategien für Benutzer ermöglicht. MAX_STEPS ist die maximale Schrittanzahl für die Ausführung des Flows, die vom Benutzer eingestellt werden kann.

Hinweis

Die spezifische Implementierung und die Aufrufmethode der Abgleichstrategie finden Sie unter windows_app_env.

Hinweis

VORSICHT! Wenn Sie GitHub oder andere Open-Source-Tools verwenden, setzen Sie Ihre config.yaml nicht online, da sie Ihre privaten Schlüssel enthält.

3. Dateien vorbereiten

Bestimmte Dateien müssen vor der Ausführung der Aufgabe vorbereitet werden.

3.1. Aufgaben als JSON

Die zu instanziierenden Aufgaben müssen in einem Ordner mit JSON-Dateien organisiert werden. Der Standardordnerpfad ist auf dataflow/tasks eingestellt. Dieser Pfad kann in der Datei dataflow/config/config.yaml geändert werden, oder Sie können ihn im Terminal angeben, wie in 4. Start Running erwähnt. Zum Beispiel könnte eine Aufgabe, die in dataflow/tasks/prefill/ gespeichert ist, wie folgt aussehen:

{
    // The app you want to use
    "app": "word",
    // A unique ID to distinguish different tasks 
    "unique_id": "1",
    // The task and steps to be instantiated
    "task": "Type 'hello' and set the font type to Arial",
    "refined_steps": [
        "Type 'hello'",
        "Set the font to Arial"
    ]
}

3.2. Vorlagen und Beschreibungen

Sie sollten eine App-Datei als Referenz für die Instanziierung in einem Ordner ablegen, der nach der App benannt ist.

Zum Beispiel, wenn Sie template1.docx für Word haben, sollte es sich unter dataflow/templates/word/template1.docx befinden.

Zusätzlich sollte für jeden App-Ordner eine Datei description.json unter dataflow/templates/word/description.json vorhanden sein, die jede Vorlagendatei detailliert beschreibt. Sie könnte wie folgt aussehen:

{
    "template1.docx": "A document with a rectangle shape",
    "template2.docx": "A document with a line of text"
}

Wenn eine Datei description.json nicht vorhanden ist, wird eine Vorlagendatei zufällig ausgewählt.

3.3. Finale Struktur

Stellen Sie sicher, dass die folgenden Dateien vorhanden sind:

  • Zu instanziierende JSON-Dateien
  • Vorlagen als Referenzen für die Instanziierung
  • Beschreibungsdatei im JSON-Format

Die Struktur der Dateien kann wie folgt sein:

dataflow/
|
├── tasks
│   └── prefill
│       ├── bulleted.json
│       ├── delete.json
│       ├── draw.json
│       ├── macro.json
│       └── rotate.json
├── templates
│   └── word
│       ├── description.json
│       ├── template1.docx
│       ├── template2.docx
│       ├── template3.docx
│       ├── template4.docx
│       ├── template5.docx
│       ├── template6.docx
│       └── template7.docx
└── ...

4. Starten der Ausführung

Nachdem die vorherigen Schritte abgeschlossen sind, können Sie die folgenden Befehle in der Befehlszeile verwenden. Wir bieten Einzel-/Stapelverarbeitung an, für die Sie den Pfad der einzelnen Datei / des Ordnerpfads angeben müssen. Bestimmen Sie den vom Benutzer bereitgestellten Pfadtyp und entscheiden Sie automatisch, ob eine einzelne Aufgabe oder mehrere Aufgaben verarbeitet werden.

Außerdem können Sie wählen, ob Sie die Abschnitte instantiation / execution einzeln verwenden oder sie als ganzen Abschnitt, der als dataflow bezeichnet wird, verwenden.

Der Standard-Task-Hub ist in dataflow/config_dev.yaml als "TASKS_HUB" festgelegt.

  • Dataflow-Aufgabe
python -m dataflow -dataflow --task_path path_to_task_file
  • Instanziierungsaufgabe
python -m dataflow -instantiation --task_path path_to_task_file
  • Ausführungsaufgabe
python -m dataflow -execution --task_path path_to_task_file

Hinweis

  1. Benutzer sollten vorsichtig sein, die Originaldateien bei der Verwendung dieses Projekts zu speichern, andernfalls werden die Dateien geschlossen, wenn die Anwendung beendet wird.
  2. Nach dem Start des Projekts sollten Benutzer das Anwendungsfenster nicht schließen, während das Programm Screenshots macht.

Arbeitsablauf

Instanziierung

Es gibt drei wichtige Schritte im Instanziierungsprozess:

  1. Auswahl einer Vorlage Datei entsprechend der angegebenen App und Anweisung.
  2. Prefill der Aufgabe unter Verwendung des aktuellen Screenshots.
  3. Filter der erstellten Aufgabe.

Gegeben die initiale Aufgabe wählt der Dataflow zuerst eine Vorlage (Phase 1), dann füllt er die initiale Aufgabe basierend auf der Word-Umgebung vor, um Task-Aktionsdaten zu erhalten (Phase 2). Schließlich filtert er die erstellte Aufgabe, um die Qualität der Task-Aktionsdaten zu bewerten (Phase 3).

Hinweis

Die detailliertere Code-Design-Dokumentation für die Instanziierung finden Sie unter instantiation.

Ausführung

Die instanziierten Pläne werden von einer Ausführungsaufgabe ausgeführt. Nach der Ausführung bewertet ein Bewertungsagent die Qualität des gesamten Ausführungsprozesses.

Hinweis

Die detailliertere Code-Design-Dokumentation für die Ausführung finden Sie unter execution.

Ergebnis

Die Ergebnisse werden im Verzeichnis results\ unter instantiation, execution und dataflow gespeichert und werden weiter in Unterverzeichnissen basierend auf den Ausführungsergebnissen gespeichert.

Hinweis

Detailliertere Informationen zu den Ergebnissen finden Sie unter result.

Schnellstart

Wir bereiten zwei Fälle vor, um den Dataflow zu demonstrieren, die sich unter dataflow\tasks\prefill befinden. Nach der Installation der erforderlichen Pakete können Sie den folgenden Befehl in der Befehlszeile eingeben:

python -m dataflow -dataflow

Und Sie können die Hinweise im Terminal sehen, was bedeutet, dass der Dataflow funktioniert.

Nachdem die beiden Aufgaben abgeschlossen sind, erscheinen die Aufgaben- und Ausgabedateien wie folgt:

UFO/
├── dataflow/
│   └── results/
│       ├── saved_document/         # Directory for saved documents
│       │   ├── bulleted.docx       # Result of the "bulleted" task
│       │   └── rotate.docx         # Result of the "rotate" task
│       ├── dataflow/                    # Dataflow results directory
│       │   ├── execution_pass/     # Successfully executed tasks
│       │   │   ├── bulleted.json   # Execution result for the "bulleted" task
│       │   │   ├── rotate.json      # Execution result for the "rotate" task
│       │   │   └── ...
└── ...

Die spezifischen Ergebnisse können in result im JSON-Format zusammen mit Beispieldaten referenziert werden.

Log-Dateien

Die entsprechenden Logs finden Sie in den Verzeichnissen logs/bulleted und logs/rotate, wie unten gezeigt. Detaillierte Logs für jeden Workflow werden aufgezeichnet und erfassen jeden Schritt des Ausführungsprozesses.

Referenz

AppEnum

Basiert auf: Enum

Enum-Klasse für Anwendungen.

Initialisiert die Anwendungs-Enum.

Parameter
  • id (int) –

    Die ID der Anwendung.

  • description (str) –

    Die Beschreibung der Anwendung.

  • file_extension (str) –

    Die Dateiendung der Anwendung.

  • win_app (str) –

    Der Windows-Anwendungsname.

Quellcode in dataflow/data_flow_controller.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, id: int, description: str, file_extension: str, win_app: str):
    """
    Initialize the application enum.
    :param id: The ID of the application.
    :param description: The description of the application.
    :param file_extension: The file extension of the application.
    :param win_app: The Windows application name.
    """

    self.id = id
    self.description = description
    self.file_extension = file_extension
    self.win_app = win_app
    self.app_root_name = win_app.upper() + ".EXE"

TaskObject

Initialisiert das Aufgabenobjekt.

Parameter
  • task_file_path (str) –

    Der Pfad zur Aufgabendatei.

  • task_type (str) –

    Der task_type des Aufgabenobjekts (dataflow, instantiation oder execution).

Quellcode in dataflow/data_flow_controller.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, task_file_path: str, task_type: str) -> None:
    """
    Initialize the task object.
    :param task_file_path: The path to the task file.
    :param task_type: The task_type of the task object (dataflow, instantiation, or execution).
    """

    self.task_file_path = task_file_path
    self.task_file_base_name = os.path.basename(task_file_path)
    self.task_file_name = self.task_file_base_name.split(".")[0]

    task_json_file = load_json_file(task_file_path)
    self.app_object = self._choose_app_from_json(task_json_file["app"])
    # Initialize the task attributes based on the task_type
    self._init_attr(task_type, task_json_file)

DataFlowController

Flow-Controller-Klasse zur Verwaltung des Instanziierungs- und Ausführungsprozesses.

Initialisiert den Flow-Controller.

Parameter
  • task_path (str) –

    Der Pfad zur Aufgabendatei.

  • task_type (str) –

    Der task_type des Flow-Controllers (instantiation, execution oder dataflow).

Quellcode in dataflow/data_flow_controller.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(self, task_path: str, task_type: str) -> None:
    """
    Initialize the flow controller.
    :param task_path: The path to the task file.
    :param task_type: The task_type of the flow controller (instantiation, execution, or dataflow).
    """

    self.task_object = TaskObject(task_path, task_type)
    self.app_env = None
    self.app_name = self.task_object.app_object.description.lower()
    self.task_file_name = self.task_object.task_file_name

    self.schema = self._load_schema(task_type)

    self.task_type = task_type
    self.task_info = self.init_task_info()
    self.result_hub = _configs["RESULT_HUB"].format(task_type=task_type)

instantiated_plan property writable

Ruft den instanziierten Plan aus den Aufgabeninformationen ab.

Rückgabe
  • List[Dict[str, Any]]

    Der instanziierte Plan.

template_copied_path property

Ruft den kopierten Vorlagenpfad aus den Aufgabeninformationen ab.

Rückgabe
  • str

    Der kopierte Vorlagenpfad.

execute_execution(request, plan)

Führt den Ausführungsprozess aus.

Parameter
  • request (str) –

    Die auszuführende Aufgabenanforderung.

  • plan (Dict[str, any]) –

    Der Ausführungsplan, der detaillierte Schritte enthält.

Quellcode in dataflow/data_flow_controller.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def execute_execution(self, request: str, plan: Dict[str, any]) -> None:
    """
    Execute the execution process.
    :param request: The task request to be executed.
    :param plan: The execution plan containing detailed steps.
    """

    print_with_color("Executing the execution process...", "blue")
    execute_flow = None

    try:
        self.app_env.start(self.template_copied_path)
        # Initialize the execution context and flow
        context = Context()
        execute_flow = ExecuteFlow(self.task_file_name, context, self.app_env)

        # Execute the plan
        executed_plan, execute_result = execute_flow.execute(request, plan)

        # Update the instantiated plan
        self.instantiated_plan = executed_plan
        # Record execution results and time metrics
        self.task_info["execution_result"]["result"] = execute_result
        self.task_info["time_cost"]["execute"] = execute_flow.execution_time
        self.task_info["time_cost"]["execute_eval"] = execute_flow.eval_time

    except Exception as e:
        # Handle and log any exceptions that occur during execution
        self.task_info["execution_result"]["error"] = {
            "type": str(type(e).__name__),
            "message": str(e),
            "traceback": traceback.format_exc(),
        }
        print_with_color(f"Error in Execution: {e}", "red")
        raise e
    finally:
        # Record the total time cost of the execution process
        if execute_flow and hasattr(execute_flow, "execution_time"):
            self.task_info["time_cost"]["execute"] = execute_flow.execution_time
        else:
            self.task_info["time_cost"]["execute"] = None
        if execute_flow and hasattr(execute_flow, "eval_time"):
            self.task_info["time_cost"]["execute_eval"] = execute_flow.eval_time
        else:
            self.task_info["time_cost"]["execute_eval"] = None

execute_instantiation()

Führt den Instanziierungsprozess aus.

Rückgabe
  • Optional[List[Dict[str, Any]]]

    Der Instanziierungsplan, falls erfolgreich.

Quellcode in dataflow/data_flow_controller.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def execute_instantiation(self) -> Optional[List[Dict[str, Any]]]:
    """
    Execute the instantiation process.
    :return: The instantiation plan if successful.
    """

    print_with_color(
        f"Instantiating task {self.task_object.task_file_name}...", "blue"
    )

    template_copied_path = self.instantiation_single_flow(
        ChooseTemplateFlow,
        "choose_template",
        init_params=[self.task_object.app_object.file_extension],
        execute_params=[],
    )

    if template_copied_path:
        self.app_env.start(template_copied_path)

        prefill_result = self.instantiation_single_flow(
            PrefillFlow,
            "prefill",
            init_params=[self.app_env],
            execute_params=[
                template_copied_path,
                self.task_object.task,
                self.task_object.refined_steps,
            ],
        )
        self.app_env.close()

        if prefill_result:
            self.instantiation_single_flow(
                FilterFlow,
                "instantiation_evaluation",
                init_params=[],
                execute_params=[prefill_result["instantiated_request"]],
            )
            return prefill_result["instantiated_plan"]

init_task_info()

Initialisiert die Aufgabeninformationen.

Rückgabe
  • Dict[str, Any]

    Die initialisierten Aufgabeninformationen.

Quellcode in dataflow/data_flow_controller.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def init_task_info(self) -> Dict[str, Any]:
    """
    Initialize the task information.
    :return: The initialized task information.
    """
    init_task_info = None
    if self.task_type == "execution":
        # read from the instantiated task file
        init_task_info = load_json_file(self.task_object.task_file_path)
    else:
        init_task_info = {
            "unique_id": self.task_object.unique_id,
            "app": self.app_name,
            "original": {
                "original_task": self.task_object.task,
                "original_steps": self.task_object.refined_steps,
            },
            "execution_result": {"result": None, "error": None},
            "instantiation_result": {
                "choose_template": {"result": None, "error": None},
                "prefill": {"result": None, "error": None},
                "instantiation_evaluation": {"result": None, "error": None},
            },
            "time_cost": {},
        }
    return init_task_info

instantiation_single_flow(flow_class, flow_type, init_params=None, execute_params=None)

Führt einen einzelnen Flow-Prozess in der Instanziierungsphase aus.

Parameter
  • flow_class (AppAgentProcessor) –

    Die zu instanziierende Flow-Klasse.

  • flow_type (str) –

    Der Typ des Flows.

  • init_params

    Die Initialisierungsparameter für den Flow.

  • execute_params

    Die Ausführungsparameter für den Flow.

Rückgabe
  • Optional[Dict[str, Any]]

    Das Ergebnis des Flow-Prozesses.

Quellcode in dataflow/data_flow_controller.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def instantiation_single_flow(
    self,
    flow_class: AppAgentProcessor,
    flow_type: str,
    init_params=None,
    execute_params=None,
) -> Optional[Dict[str, Any]]:
    """
    Execute a single flow process in the instantiation phase.
    :param flow_class: The flow class to instantiate.
    :param flow_type: The type of the flow.
    :param init_params: The initialization parameters for the flow.
    :param execute_params: The execution parameters for the flow.
    :return: The result of the flow process.
    """

    flow_instance = None
    try:
        flow_instance = flow_class(self.app_name, self.task_file_name, *init_params)
        result = flow_instance.execute(*execute_params)
        self.task_info["instantiation_result"][flow_type]["result"] = result
        return result
    except Exception as e:
        self.task_info["instantiation_result"][flow_type]["error"] = {
            "type": str(e.__class__),
            "error_message": str(e),
            "traceback": traceback.format_exc(),
        }
        print_with_color(f"Error in {flow_type}: {e} {traceback.format_exc()}")
    finally:
        if flow_instance and hasattr(flow_instance, "execution_time"):
            self.task_info["time_cost"][flow_type] = flow_instance.execution_time
        else:
            self.task_info["time_cost"][flow_type] = None

reformat_to_batch(path)

Überträgt das Ergebnis in den Ergebnis-Hub.

Quellcode in dataflow/data_flow_controller.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def reformat_to_batch(self, path) -> None:
    """
    Transfer the result to the result hub.
    """
    os.makedirs(path, exist_ok=True)
    source_files_path = os.path.join(
        self.result_hub,
        self.task_type + "_pass",
    )
    source_template_path = os.path.join(
        os.path.dirname(self.result_hub),
        "saved_document",
    )
    target_file_path = os.path.join(
        path,
        "tasks",
    )
    target_template_path = os.path.join(
        path,
        "files",
    )
    os.makedirs((target_file_path), exist_ok=True)
    os.makedirs((target_template_path), exist_ok=True)

    for file in os.listdir(source_files_path):
        if file.endswith(".json"):
            source_file = os.path.join(source_files_path, file)
            target_file = os.path.join(target_file_path, file)
            target_object = os.path.join(
                target_template_path, file.replace(".json", ".docx")
            )
            is_successed = reformat_json_file(
                target_file,
                target_object,
                load_json_file(source_file),
            )
            if is_successed:
                shutil.copy(
                    os.path.join(
                        source_template_path, file.replace(".json", ".docx")
                    ),
                    target_template_path,
                )

run()

Führt den Instanziierungs- und Ausführungsprozess aus.

Quellcode in dataflow/data_flow_controller.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def run(self) -> None:
    """
    Run the instantiation and execution process.
    """

    start_time = time.time()

    try:
        self.app_env = WindowsAppEnv(self.task_object.app_object)

        if self.task_type == "dataflow":
            plan = self.execute_instantiation()
            self.execute_execution(self.task_object.task, plan)
        elif self.task_type == "instantiation":
            self.execute_instantiation()
        elif self.task_type == "execution":
            plan = self.instantiated_plan
            self.execute_execution(self.task_object.task, plan)
        else:
            raise ValueError(f"Unsupported task_type: {self.task_type}")
    except Exception as e:
        raise e

    finally:
        # Update or record the total time cost of the process
        total_time = round(time.time() - start_time, 3)
        new_total_time = (
            self.task_info.get("time_cost", {}).get("total", 0) + total_time
        )
        self.task_info["time_cost"]["total"] = round(new_total_time, 3)

        self.save_result()

    if _configs["REFORMAT_TO_BATCH"]:
        self.reformat_to_batch(_configs["REFORMAT_TO_BATCH_HUB"])

save_result()

Validiert und speichert das Ergebnis der instanziierten Aufgabe.

Quellcode in dataflow/data_flow_controller.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def save_result(self) -> None:
    """
    Validate and save the instantiated task result.
    """

    validation_error = None

    # Validate the result against the schema
    try:
        validate(instance=self.task_info, schema=self.schema)
    except ValidationError as e:
        # Record the validation error but allow the process to continue
        validation_error = str(e.message)
        print_with_color(f"Validation Error: {e.message}", "yellow")

    # Determine the target directory based on task_type and quality/completeness
    target_file = None

    if self.task_type == "instantiation":
        # Determine the quality of the instantiation
        if not self.task_info["instantiation_result"]["instantiation_evaluation"][
            "result"
        ]:
            target_file = INSTANTIATION_RESULT_MAP[False]
        else:
            is_quality_good = self.task_info["instantiation_result"][
                "instantiation_evaluation"
            ]["result"]["judge"]
            target_file = INSTANTIATION_RESULT_MAP.get(
                is_quality_good, INSTANTIATION_RESULT_MAP[False]
            )

    else:
        # Determine the completion status of the execution
        if not self.task_info["execution_result"]["result"]:
            target_file = EXECUTION_RESULT_MAP["no"]
        else:
            is_completed = self.task_info["execution_result"]["result"]["complete"]
            target_file = EXECUTION_RESULT_MAP.get(
                is_completed, EXECUTION_RESULT_MAP["no"]
            )

    # Construct the full path to save the result
    new_task_path = os.path.join(
        self.result_hub, target_file, self.task_object.task_file_base_name
    )
    os.makedirs(os.path.dirname(new_task_path), exist_ok=True)
    save_json_file(new_task_path, self.task_info)

    print(f"Task saved to {new_task_path}")

    # If validation failed, indicate that the saved result may need further inspection
    if validation_error:
        print(
            "The saved task result does not conform to the expected schema and may require review."
        )