workflow-manager.app.api.manager_endpoints ========================================== .. py:module:: workflow-manager.app.api.manager_endpoints .. autoapi-nested-parse:: Workflow manager endpoints. TODO: How to handle tasks, and where to trigger the device task? - There should be one endpoint for task, workflow and exam - The endpoint processes the list, i.e. all the contained tasks - Does it help to store an index with each task? Attributes ---------- .. autoapisummary:: workflow-manager.app.api.manager_endpoints.EXAM_MANAGER_URI workflow-manager.app.api.manager_endpoints.PATIENT_MANAGER_URI workflow-manager.app.api.manager_endpoints.DEVICE_MANAGER_URI workflow-manager.app.api.manager_endpoints.WORKFLOW_MANAGER_URI workflow-manager.app.api.manager_endpoints.DICOM_BASE_URI workflow-manager.app.api.manager_endpoints.DATA_LAKE_DIR workflow-manager.app.api.manager_endpoints.dg_client workflow-manager.app.api.manager_endpoints.router workflow-manager.app.api.manager_endpoints.oauth2_scheme Functions --------- .. autoapisummary:: workflow-manager.app.api.manager_endpoints.trigger_task workflow-manager.app.api.manager_endpoints.handle_acquisition_task_trigger workflow-manager.app.api.manager_endpoints.handle_dag_task_trigger workflow-manager.app.api.manager_endpoints.callback_results_ready workflow-manager.app.api.manager_endpoints.list_available_tasks Module Contents --------------- .. py:data:: EXAM_MANAGER_URI :value: 'http://exam-manager:8000/api/v1/exam' .. py:data:: PATIENT_MANAGER_URI :value: 'http://patient-manager:8100/api/v1/patient' .. py:data:: DEVICE_MANAGER_URI :value: 'http://device-manager:8000/api/v1/device' .. py:data:: WORKFLOW_MANAGER_URI :value: 'http://workflow-manager:8000/api/v1/workflowmanager' .. py:data:: DICOM_BASE_URI :value: 'https://localhost:8443/api/v1/exam/dcm/' .. py:data:: DATA_LAKE_DIR .. py:data:: dg_client .. py:data:: router .. py:data:: oauth2_scheme .. py:function:: trigger_task(task_id: str, access_token: Annotated[str, Depends(oauth2_scheme)]) -> dict[str, Any] :async: Endpoint to trigger a task in the orchestration engine. Args: task_id (str): The ID of the DAG to be triggered. Returns ------- dict: A dictionary containing the response from the orchestration engine. .. py:function:: handle_acquisition_task_trigger(task: scanhub_libraries.models.AcquisitionTaskOut, patient: scanhub_libraries.models.PatientOut, access_token: str) -> dict[str, str] Handle acquisition task trigger. Parameters ---------- task Acquisition task model patient Patient model access_token User access token Returns ------- Dictionary with status and data, i.e. success message Raises ------ HTTPException Internal error if device task could not be triggered. .. py:function:: handle_dag_task_trigger(task: scanhub_libraries.models.DAGTaskOut, exam_id: str, access_token: str) Handle DAG trigger event. Parameters ---------- task DAG Task to be triggered exam_id Corresponding exam ID access_token User access token Returns ------- Dictionary with status and data, i.e. success message Raises ------ HTTPException Missing input HTTPException Input does not have any result HTTPException Could not trigger DAG .. py:function:: callback_results_ready(task_id: uuid.UUID | str, result_id: uuid.UUID | str, access_token: Annotated[str, Depends(oauth2_scheme)]) -> dict[str, Any] :async: Notify that results are ready via callback endpoint. Args: dag_id (str): The ID of the DAG. access_token (str): The access token for authentication. Returns ------- dict: A dictionary containing a success message. .. py:function:: list_available_tasks() :async: Endpoint to list the available tasks from the orchestration engine. Returns ------- dict: A dictionary containing the list of available dagster jobs.