workflow-manager.app.api.orchestration_engine#
Orchestration engine file for the workflow manager service.
Classes#
OrchestrationEngine is responsible for interacting with different orchestration engines. |
Module Contents#
- class workflow-manager.app.api.orchestration_engine.OrchestrationEngine#
OrchestrationEngine is responsible for interacting with different orchestration engines.
- engine#
- kestra_api_url#
- airflow_api_url#
- airflow_username#
- airflow_password#
- get_available_tasks()#
Retrieve the available tasks from the orchestration engine.
Currently, only Airflow is supported.
Returns#
list: A list of available tasks (DAGs) for Airflow.
Raises#
ValueError: If the orchestration engine is not Airflow.
- _get_airflow_dags()#
Get the list of Airflow DAGs.
Returns#
dict: A dictionary containing the list of Airflow DAGs.
Raises#
HTTPException: If the request to Airflow API fails.
- trigger_task(task_id: str, conf: Dict[str, str] | None = None) Dict[str, Any] #
Triggers a task in the orchestration engine.
Currently, only Airflow is supported.
- Args:
task_id (str): The ID of the task to be triggered. conf (Dict[str, str]): Additional configuration parameters to pass to the DAG.
Returns#
dict: A dictionary containing a success message.
Raises#
HTTPException: If the request to Airflow API fails.
- _trigger_airflow_task(task_id: str, conf: Dict[str, str] | None = None) Dict[str, Any] #
Trigger an Airflow task.
- Args:
task_id (str): The ID of the task to be triggered. conf (Dict[str, str]): Additional configuration parameters to pass to the DAG.
Returns#
dict: A dictionary containing a success message.
Raises#
HTTPException: If the request to Airflow API fails.
- get_task_status(task_id: str) Dict[str, Any] #
Retrieve the status of a task in the orchestration engine.
Currently, only Airflow is supported.
- Args:
task_id (str): The ID of the task whose status is to be retrieved.
Returns#
dict: A dictionary containing the task status.
Raises#
ValueError: If the orchestration engine is not Airflow.