workflow-manager.app.api.workflow ================================= .. py:module:: workflow-manager.app.api.workflow .. autoapi-nested-parse:: Workflow manager endpoints. TODO: How to handle tasks, and where to trigger the device task? - There should be one endpoint for task, workflow and exam - The endpoint processes the list, i.e. all the contained tasks - Does it help to store an index with each task? - Which endpoint triggers the acquisition? (devicem manager?) - cleanup: currently there is trigger_task(...) (used by frontend) and process(...) Attributes ---------- .. autoapisummary:: workflow-manager.app.api.workflow.router workflow-manager.app.api.workflow.oauth2_scheme workflow-manager.app.api.workflow.orchestration_engine workflow-manager.app.api.workflow.SEQUENCE_MANAGER_URI workflow-manager.app.api.workflow.EXAM_MANAGER_URI workflow-manager.app.api.workflow.PATIENT_MANAGER_URI workflow-manager.app.api.workflow.DEVICE_MANAGER_URI workflow-manager.app.api.workflow.data_lake_directory workflow-manager.app.api.workflow.workflows Functions --------- .. autoapisummary:: workflow-manager.app.api.workflow.hello_world workflow-manager.app.api.workflow.simulate_reconstruction_task workflow-manager.app.api.workflow.trigger_task workflow-manager.app.api.workflow.calc_age_from_date workflow-manager.app.api.workflow.start_scan workflow-manager.app.api.workflow.list_available_tasks workflow-manager.app.api.workflow.process workflow-manager.app.api.workflow.handle_device_task workflow-manager.app.api.workflow.handle_processing_task workflow-manager.app.api.workflow.upload_and_trigger Module Contents --------------- .. py:data:: router .. py:data:: oauth2_scheme .. py:data:: orchestration_engine .. py:data:: SEQUENCE_MANAGER_URI :value: 'sequence-manager:8000' .. py:data:: EXAM_MANAGER_URI :value: 'exam-manager:8000' .. py:data:: PATIENT_MANAGER_URI :value: 'patient-manager:8100' .. py:data:: DEVICE_MANAGER_URI :value: 'device-manager:8000' .. py:data:: data_lake_directory .. py:data:: workflows :type: Dict[str, Dict[str, Any]] .. py:function:: hello_world() -> dict[str, str] :async: Hello world endpoint. .. py:function:: simulate_reconstruction_task(task, headers) Simulate reconstruction task by updating the task progress and finally creating a result for the task. Args: task (Task): The task to simulate and that gets updated in the database when (virtual) progress is made. The attribute task.args["contrast"] is expected to be present and will be set as the results filename attribute. headers (dict): The headers for calls to the exam-manager. .. py:function:: trigger_task(task_id: str, background_tasks: fastapi.BackgroundTasks, access_token: Annotated[str, Depends(oauth2_scheme)]) -> dict[str, Any] :async: Endpoint to trigger a task in the orchestration engine. Args: task_id (str): The ID of the DAG to be triggered. Returns ------- dict: A dictionary containing the response from the orchestration engine. .. py:function:: calc_age_from_date(birth_date: datetime.date) -> int Calculate age in years from a given birth date. Parameters ---------- birth_date Date of birth Returns ------- Age in years as int .. py:function:: start_scan(task: scanhub_libraries.models.AcquisitionTaskOut, access_token: str) :async: Load the device and sequence data from the database and start the scan for task types DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK. Parameters ---------- task_type the type of the task to be started, must be one of DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK device_id the id of the device to start the task on sequence_id the id of the sequence to start record_id the record id acquisition_limits the acquisition limits as defined in the pydantic model sequence_parameters the sequence parameters access_token the access token of the current user Raises ------ HttpException if something goes wrong. .. py:function:: list_available_tasks() :async: Endpoint to list the available tasks from the orchestration engine. Currently, only Airflow is supported. Returns ------- dict: A dictionary containing the list of available tasks (DAGs) for Airflow. .. py:function:: process(workflow_id: uuid.UUID | str) -> dict[str, str] :async: Process a workflow. Parameters ---------- workflow_id UUID of the workflow to process Returns ------- Workflow process response .. py:function:: handle_device_task(task: scanhub_libraries.models.TaskOut) :async: Handle a device task by creating a scan job and starting the scan. .. py:function:: handle_processing_task(task: scanhub_libraries.models.TaskOut) :async: Handle a processing task by triggering the appropriate orchestration engine. .. py:function:: upload_and_trigger(dag_id: str, access_token: Annotated[str, Depends(oauth2_scheme)], file: fastapi.UploadFile = File(...)) -> Dict[str, Any] :async: Upload a file and trigger an Airflow DAG. Parameters ---------- dag_id The ID of the DAG to be triggered. file, optional Data upload, e.g. reconstruction result, by default File(...) Returns ------- dict: A dictionary containing a message and data.