workflow-manager.app.api.orchestration_engine#

Orchestration engine file for the workflow manager service.

Classes#

OrchestrationEngine

OrchestrationEngine is responsible for interacting with different orchestration engines.

Module Contents#

class workflow-manager.app.api.orchestration_engine.OrchestrationEngine#

OrchestrationEngine is responsible for interacting with different orchestration engines.

engine#
kestra_api_url#
airflow_api_url#
airflow_username#
airflow_password#
get_available_tasks()#

Retrieve the available tasks from the orchestration engine.

Currently, only Airflow is supported.

Returns#

list: A list of available tasks (DAGs) for Airflow.

Raises#

ValueError: If the orchestration engine is not Airflow.

_get_airflow_dags()#

Get the list of Airflow DAGs.

Returns#

dict: A dictionary containing the list of Airflow DAGs.

Raises#

HTTPException: If the request to Airflow API fails.

trigger_task(task_id: str, conf: Dict[str, str] | None = None) Dict[str, Any]#

Triggers a task in the orchestration engine.

Currently, only Airflow is supported.

Args:

task_id (str): The ID of the task to be triggered. conf (Dict[str, str]): Additional configuration parameters to pass to the DAG.

Returns#

dict: A dictionary containing a success message.

Raises#

HTTPException: If the request to Airflow API fails.

_trigger_airflow_task(task_id: str, conf: Dict[str, str] | None = None) Dict[str, Any]#

Trigger an Airflow task.

Args:

task_id (str): The ID of the task to be triggered. conf (Dict[str, str]): Additional configuration parameters to pass to the DAG.

Returns#

dict: A dictionary containing a success message.

Raises#

HTTPException: If the request to Airflow API fails.

get_task_status(task_id: str) Dict[str, Any]#

Retrieve the status of a task in the orchestration engine.

Currently, only Airflow is supported.

Args:

task_id (str): The ID of the task whose status is to be retrieved.

Returns#

dict: A dictionary containing the task status.

Raises#

ValueError: If the orchestration engine is not Airflow.

_get_airflow_task_status(task_id: str) Dict[str, Any]#

Get the status of an Airflow task.

Args:

task_id (str): The ID of the task whose status is to be retrieved.

Returns#

dict: A dictionary containing the task status.

Raises#

HTTPException: If the request to Airflow API fails.