workflow-manager.app.api.workflow#

Workflow manager endpoints.

TODO: How to handle tasks, and where to trigger the device task? - There should be one endpoint for task, workflow and exam - The endpoint processes the list, i.e. all the contained tasks - Does it help to store an index with each task? - Which endpoint triggers the acquisition? (devicem manager?) - cleanup: currently there is trigger_task(…) (used by frontend) and process(…)

Attributes#

Functions#

hello_world(→ dict[str, str])

Hello world endpoint.

simulate_reconstruction_task(task, headers)

Simulate reconstruction task by updating the task progress and finally creating a result for the task.

trigger_task(]) → dict[str, Any])

Endpoint to trigger a task in the orchestration engine.

calc_age_from_date(→ int)

Calculate age in years from a given birth date.

start_scan(task, access_token)

Load the device and sequence data from the database and start the scan for task types DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK.

list_available_tasks()

Endpoint to list the available tasks from the orchestration engine.

process(→ dict[str, str])

Process a workflow.

handle_device_task(task)

Handle a device task by creating a scan job and starting the scan.

handle_processing_task(task)

Handle a processing task by triggering the appropriate orchestration engine.

upload_and_trigger(], file, Any])

Upload a file and trigger an Airflow DAG.

Module Contents#

workflow-manager.app.api.workflow.router#
workflow-manager.app.api.workflow.oauth2_scheme#
workflow-manager.app.api.workflow.orchestration_engine#
workflow-manager.app.api.workflow.SEQUENCE_MANAGER_URI = 'sequence-manager:8000'#
workflow-manager.app.api.workflow.EXAM_MANAGER_URI = 'exam-manager:8000'#
workflow-manager.app.api.workflow.PATIENT_MANAGER_URI = 'patient-manager:8100'#
workflow-manager.app.api.workflow.DEVICE_MANAGER_URI = 'device-manager:8000'#
workflow-manager.app.api.workflow.data_lake_directory#
workflow-manager.app.api.workflow.workflows: Dict[str, Dict[str, Any]]#
async workflow-manager.app.api.workflow.hello_world() dict[str, str]#

Hello world endpoint.

workflow-manager.app.api.workflow.simulate_reconstruction_task(task, headers)#

Simulate reconstruction task by updating the task progress and finally creating a result for the task.

Args:
task (Task): The task to simulate and that gets updated in the database when (virtual) progress is made.

The attribute task.args[“contrast”] is expected to be present and will be set as the results filename attribute.

headers (dict): The headers for calls to the exam-manager.

async workflow-manager.app.api.workflow.trigger_task(task_id: str, background_tasks: fastapi.BackgroundTasks, access_token: Annotated[str, Depends(oauth2_scheme)]) dict[str, Any]#

Endpoint to trigger a task in the orchestration engine.

Args:

task_id (str): The ID of the DAG to be triggered.

Returns#

dict: A dictionary containing the response from the orchestration engine.

workflow-manager.app.api.workflow.calc_age_from_date(birth_date: datetime.date) int#

Calculate age in years from a given birth date.

Parameters#

birth_date

Date of birth

Returns#

Age in years as int

async workflow-manager.app.api.workflow.start_scan(task: scanhub_libraries.models.AcquisitionTaskOut, access_token: str)#

Load the device and sequence data from the database and start the scan for task types DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK.

Parameters#

task_type

the type of the task to be started, must be one of DEVICE_TASK_SIMULATOR and DEVICE_TASK_SDK

device_id

the id of the device to start the task on

sequence_id

the id of the sequence to start

record_id

the record id

acquisition_limits

the acquisition limits as defined in the pydantic model

sequence_parameters

the sequence parameters

access_token

the access token of the current user

Raises#

HttpException if something goes wrong.

async workflow-manager.app.api.workflow.list_available_tasks()#

Endpoint to list the available tasks from the orchestration engine.

Currently, only Airflow is supported.

Returns#

dict: A dictionary containing the list of available tasks (DAGs) for Airflow.

async workflow-manager.app.api.workflow.process(workflow_id: uuid.UUID | str) dict[str, str]#

Process a workflow.

Parameters#

workflow_id

UUID of the workflow to process

Returns#

Workflow process response

async workflow-manager.app.api.workflow.handle_device_task(task: scanhub_libraries.models.TaskOut)#

Handle a device task by creating a scan job and starting the scan.

async workflow-manager.app.api.workflow.handle_processing_task(task: scanhub_libraries.models.TaskOut)#

Handle a processing task by triggering the appropriate orchestration engine.

async workflow-manager.app.api.workflow.upload_and_trigger(dag_id: str, access_token: Annotated[str, Depends(oauth2_scheme)], file: fastapi.UploadFile = File(...)) Dict[str, Any]#

Upload a file and trigger an Airflow DAG.

Parameters#

dag_id

The ID of the DAG to be triggered.

file, optional

Data upload, e.g. reconstruction result, by default File(…)

Returns#

dict: A dictionary containing a message and data.