workflow-manager.app.api.orchestration_engine ============================================= .. py:module:: workflow-manager.app.api.orchestration_engine .. autoapi-nested-parse:: Orchestration engine file for the workflow manager service. Classes ------- .. autoapisummary:: workflow-manager.app.api.orchestration_engine.OrchestrationEngine Module Contents --------------- .. py:class:: OrchestrationEngine OrchestrationEngine is responsible for interacting with different orchestration engines. .. py:attribute:: engine .. py:attribute:: kestra_api_url .. py:attribute:: airflow_api_url .. py:attribute:: airflow_username .. py:attribute:: airflow_password .. py:method:: get_available_tasks() Retrieve the available tasks from the orchestration engine. Currently, only Airflow is supported. Returns ------- list: A list of available tasks (DAGs) for Airflow. Raises ------ ValueError: If the orchestration engine is not Airflow. .. py:method:: _get_airflow_dags() Get the list of Airflow DAGs. Returns ------- dict: A dictionary containing the list of Airflow DAGs. Raises ------ HTTPException: If the request to Airflow API fails. .. py:method:: trigger_task(task_id: str, conf: Dict[str, str] | None = None) -> Dict[str, Any] Triggers a task in the orchestration engine. Currently, only Airflow is supported. Args: task_id (str): The ID of the task to be triggered. conf (Dict[str, str]): Additional configuration parameters to pass to the DAG. Returns ------- dict: A dictionary containing a success message. Raises ------ HTTPException: If the request to Airflow API fails. .. py:method:: _trigger_airflow_task(task_id: str, conf: Dict[str, str] | None = None) -> Dict[str, Any] Trigger an Airflow task. Args: task_id (str): The ID of the task to be triggered. conf (Dict[str, str]): Additional configuration parameters to pass to the DAG. Returns ------- dict: A dictionary containing a success message. Raises ------ HTTPException: If the request to Airflow API fails. .. py:method:: get_task_status(task_id: str) -> Dict[str, Any] Retrieve the status of a task in the orchestration engine. Currently, only Airflow is supported. Args: task_id (str): The ID of the task whose status is to be retrieved. Returns ------- dict: A dictionary containing the task status. Raises ------ ValueError: If the orchestration engine is not Airflow. .. py:method:: _get_airflow_task_status(task_id: str) -> Dict[str, Any] Get the status of an Airflow task. Args: task_id (str): The ID of the task whose status is to be retrieved. Returns ------- dict: A dictionary containing the task status. Raises ------ HTTPException: If the request to Airflow API fails.