exam-manager.app.dal.task_dal#

Data acess layer (DAL) between fastapi endpoint and sql database.

Functions#

add_task_data(...)

Add new task to database.

get_task_data(...)

Get task by id.

get_all_task_data(...)

Get a list of all tasks assigned to a certain workflow.

get_all_task_template_data(...)

Get a list of all tasks assigned to a certain workflow.

delete_task_data(→ bool)

Delete task by id.

update_task_data(...)

Update existing task in database.

Module Contents#

async exam-manager.app.dal.task_dal.add_task_data(payload: scanhub_libraries.models.BaseAcquisitionTask | scanhub_libraries.models.BaseDAGTask, creator) app.db.postgres.AcquisitionTask | app.db.postgres.DAGTask#

Add new task to database.

Parameters#

payload

Task pydantic base model

creator

The username/id of the user who creats this task

Returns#

Database orm model of created task

async exam-manager.app.dal.task_dal.get_task_data(task_id: uuid.UUID) scanhub_libraries.models.BaseAcquisitionTask | scanhub_libraries.models.BaseDAGTask | None#

Get task by id.

Parameters#

task_id

Id of the requested task

Returns#

Database orm model with data of requested task

async exam-manager.app.dal.task_dal.get_all_task_data(workflow_id: uuid.UUID) list[scanhub_libraries.models.BaseAcquisitionTask | scanhub_libraries.models.BaseDAGTask]#

Get a list of all tasks assigned to a certain workflow.

Parameters#

workflow_id

Id of the parent workflow entry, tasks are assigned to

Returns#

List of task data base orm models

async exam-manager.app.dal.task_dal.get_all_task_template_data() list[scanhub_libraries.models.BaseAcquisitionTask | scanhub_libraries.models.BaseDAGTask]#

Get a list of all tasks assigned to a certain workflow.

Parameters#

workflow_id

Id of the parent workflow entry, tasks are assigned to

Returns#

List of task data base orm models

async exam-manager.app.dal.task_dal.delete_task_data(task_id: uuid.UUID) bool#

Delete task by id.

Parameters#

task_id

Id of the task to be deleted

Returns#

Success of deletion

async exam-manager.app.dal.task_dal.update_task_data(task_id: uuid.UUID, payload: scanhub_libraries.models.BaseAcquisitionTask | scanhub_libraries.models.BaseDAGTask) scanhub_libraries.models.BaseAcquisitionTask | scanhub_libraries.models.BaseDAGTask | None#

Update existing task in database.

Parameters#

task_id

Id of the task to be updateed

payload

Task pydantic base model with data to be updated

Returns#

Database orm model of updated task