Source code for workflow-manager.app.api.dagster_queries
"""GraphQL queries for interacting with Dagster."""
import requests
DAGSTER_URL = "http://dagster-webserver:3000/dagster/graphql"
[docs]
def list_dagster_jobs() -> list[dict]:
"""List all available Dagster jobs."""
query = """
query {
repositoriesOrError {
... on RepositoryConnection {
nodes {
name
location {
name
}
pipelines {
name
}
}
}
}
}
"""
response = requests.post(DAGSTER_URL, json={"query": query}, timeout=3)
response.raise_for_status()
data = response.json()
jobs = []
for repo in data["data"]["repositoriesOrError"]["nodes"]:
repo_name = repo["name"]
location_name = repo["location"]["name"]
for pipeline in repo["pipelines"]:
job_name = pipeline["name"]
job_id = f"{location_name}::{repo_name}::{job_name}"
jobs.append({
"job_id": job_id,
"job_name": job_name,
"repository": repo_name,
"location": location_name,
})
return jobs
[docs]
def parse_job_id(job_id: str) -> tuple[str, str, str]:
"""Parse the job ID into its components."""
parts = job_id.split("::")
if len(parts) != 3:
raise ValueError("Invalid job ID format")
location, repository, job_name = parts
return job_name, repository, location