workflow-manager.app.api.workflow#
Workflow manager endpoints.
TODO: How to handle tasks, and where to trigger the device task? - There should be one endpoint for task, workflow and exam - The endpoint processes the list, i.e. all the contained tasks - Does it help to store an index with each task? - Which endpoint triggers the acquisition? (devicem manager?) - cleanup: currently there is trigger_task(…) (used by frontend) and process(…)
Attributes#
Functions#
|
Hello world endpoint. |
|
Simulate reconstruction task by updating the task progress and finally creating a result for the task. |
|
Endpoint to trigger a task in the orchestration engine. |
|
Calculate age in years from a given birth date. |
|
Load the device and sequence data from the database and start the scan for task types DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK. |
Endpoint to list the available tasks from the orchestration engine. |
|
|
Process a workflow. |
|
Handle a device task by creating a scan job and starting the scan. |
|
Handle a processing task by triggering the appropriate orchestration engine. |
|
Upload a file and trigger an Airflow DAG. |
Module Contents#
- workflow-manager.app.api.workflow.router#
- workflow-manager.app.api.workflow.oauth2_scheme#
- workflow-manager.app.api.workflow.orchestration_engine#
- workflow-manager.app.api.workflow.SEQUENCE_MANAGER_URI = 'sequence-manager:8000'#
- workflow-manager.app.api.workflow.EXAM_MANAGER_URI = 'exam-manager:8000'#
- workflow-manager.app.api.workflow.PATIENT_MANAGER_URI = 'patient-manager:8100'#
- workflow-manager.app.api.workflow.DEVICE_MANAGER_URI = 'device-manager:8000'#
- workflow-manager.app.api.workflow.data_lake_directory#
- workflow-manager.app.api.workflow.workflows: Dict[str, Dict[str, Any]]#
- async workflow-manager.app.api.workflow.hello_world() dict[str, str] #
Hello world endpoint.
- workflow-manager.app.api.workflow.simulate_reconstruction_task(task, headers)#
Simulate reconstruction task by updating the task progress and finally creating a result for the task.
- Args:
- task (Task): The task to simulate and that gets updated in the database when (virtual) progress is made.
The attribute task.args[“contrast”] is expected to be present and will be set as the results filename attribute.
headers (dict): The headers for calls to the exam-manager.
- async workflow-manager.app.api.workflow.trigger_task(task_id: str, background_tasks: fastapi.BackgroundTasks, access_token: Annotated[str, Depends(oauth2_scheme)]) dict[str, Any] #
Endpoint to trigger a task in the orchestration engine.
- Args:
task_id (str): The ID of the DAG to be triggered.
Returns#
dict: A dictionary containing the response from the orchestration engine.
- workflow-manager.app.api.workflow.calc_age_from_date(birth_date: datetime.date) int #
Calculate age in years from a given birth date.
Parameters#
- birth_date
Date of birth
Returns#
Age in years as int
- async workflow-manager.app.api.workflow.start_scan(task: scanhub_libraries.models.AcquisitionTaskOut, access_token: str)#
Load the device and sequence data from the database and start the scan for task types DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK.
Parameters#
- task_type
the type of the task to be started, must be one of DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK
- device_id
the id of the device to start the task on
- sequence_id
the id of the sequence to start
- record_id
the record id
- acquisition_limits
the acquisition limits as defined in the pydantic model
- sequence_parameters
the sequence parameters
- access_token
the access token of the current user
Raises#
HttpException if something goes wrong.
- async workflow-manager.app.api.workflow.list_available_tasks()#
Endpoint to list the available tasks from the orchestration engine.
Currently, only Airflow is supported.
Returns#
dict: A dictionary containing the list of available tasks (DAGs) for Airflow.
- async workflow-manager.app.api.workflow.process(workflow_id: uuid.UUID | str) dict[str, str] #
Process a workflow.
Parameters#
- workflow_id
UUID of the workflow to process
Returns#
Workflow process response
- async workflow-manager.app.api.workflow.handle_device_task(task: scanhub_libraries.models.TaskOut)#
Handle a device task by creating a scan job and starting the scan.
- async workflow-manager.app.api.workflow.handle_processing_task(task: scanhub_libraries.models.TaskOut)#
Handle a processing task by triggering the appropriate orchestration engine.
- async workflow-manager.app.api.workflow.upload_and_trigger(dag_id: str, access_token: Annotated[str, Depends(oauth2_scheme)], file: fastapi.UploadFile = File(...)) Dict[str, Any] #
Upload a file and trigger an Airflow DAG.
Parameters#
- dag_id
The ID of the DAG to be triggered.
- file, optional
Data upload, e.g. reconstruction result, by default File(…)
Returns#
dict: A dictionary containing a message and data.