workflow-manager.app.api.manager_endpoints#

Workflow manager endpoints.

TODO: How to handle tasks, and where to trigger the device task? - There should be one endpoint for task, workflow and exam - The endpoint processes the list, i.e. all the contained tasks - Does it help to store an index with each task?

Attributes#

Functions#

trigger_task(]) → dict[str, Any])

Endpoint to trigger a task in the orchestration engine.

handle_acquisition_task_trigger(→ dict[str, str])

Handle acquisition task trigger.

handle_dag_task_trigger(task, exam_id, access_token)

Handle DAG trigger event.

callback_results_ready(]) → dict[str, Any])

Notify that results are ready via callback endpoint.

list_available_tasks()

Endpoint to list the available tasks from the orchestration engine.

Module Contents#

workflow-manager.app.api.manager_endpoints.EXAM_MANAGER_URI = 'http://exam-manager:8000/api/v1/exam'#
workflow-manager.app.api.manager_endpoints.PATIENT_MANAGER_URI = 'http://patient-manager:8100/api/v1/patient'#
workflow-manager.app.api.manager_endpoints.DEVICE_MANAGER_URI = 'http://device-manager:8000/api/v1/device'#
workflow-manager.app.api.manager_endpoints.WORKFLOW_MANAGER_URI = 'http://workflow-manager:8000/api/v1/workflowmanager'#
workflow-manager.app.api.manager_endpoints.DICOM_BASE_URI = 'https://localhost:8443/api/v1/exam/dcm/'#
workflow-manager.app.api.manager_endpoints.DATA_LAKE_DIR#
workflow-manager.app.api.manager_endpoints.dg_client#
workflow-manager.app.api.manager_endpoints.router#
workflow-manager.app.api.manager_endpoints.oauth2_scheme#
async workflow-manager.app.api.manager_endpoints.trigger_task(task_id: str, access_token: Annotated[str, Depends(oauth2_scheme)]) dict[str, Any]#

Endpoint to trigger a task in the orchestration engine.

Args:

task_id (str): The ID of the DAG to be triggered.

Returns#

dict: A dictionary containing the response from the orchestration engine.

workflow-manager.app.api.manager_endpoints.handle_acquisition_task_trigger(task: scanhub_libraries.models.AcquisitionTaskOut, patient: scanhub_libraries.models.PatientOut, access_token: str) dict[str, str]#

Handle acquisition task trigger.

Parameters#

task

Acquisition task model

patient

Patient model

access_token

User access token

Returns#

Dictionary with status and data, i.e. success message

Raises#

HTTPException

Internal error if device task could not be triggered.

workflow-manager.app.api.manager_endpoints.handle_dag_task_trigger(task: scanhub_libraries.models.DAGTaskOut, exam_id: str, access_token: str)#

Handle DAG trigger event.

Parameters#

task

DAG Task to be triggered

exam_id

Corresponding exam ID

access_token

User access token

Returns#

Dictionary with status and data, i.e. success message

Raises#

HTTPException

Missing input

HTTPException

Input does not have any result

HTTPException

Could not trigger DAG

async workflow-manager.app.api.manager_endpoints.callback_results_ready(task_id: uuid.UUID | str, result_id: uuid.UUID | str, access_token: Annotated[str, Depends(oauth2_scheme)]) dict[str, Any]#

Notify that results are ready via callback endpoint.

Args:

dag_id (str): The ID of the DAG. access_token (str): The access token for authentication.

Returns#

dict: A dictionary containing a success message.

async workflow-manager.app.api.manager_endpoints.list_available_tasks()#

Endpoint to list the available tasks from the orchestration engine.

Returns#

dict: A dictionary containing the list of available dagster jobs.