Source code for provenaclient.modules.job_service

'''
Created Date: Monday June 17th 2024 +1000
Author: Peter Baker
-----
Last Modified: Monday June 17th 2024 4:45:39 pm +1000
Modified By: Peter Baker
-----
Description: Job API L3 Module. Includes Job Admin sub module.
-----
HISTORY:
Date      	By	Comments
----------	---	---------------------------------------------------------
'''

from provenaclient.auth.manager import AuthManager
from provenaclient.utils.config import Config
from provenaclient.clients import JobAPIClient
from provenaclient.utils.exceptions import *
from provenaclient.modules.module_helpers import *
from provenaclient.models import HealthCheckResponse, AsyncAwaitSettings, DEFAULT_AWAIT_SETTINGS
from provenaclient.utils.async_job_helpers import wait_for_full_lifecycle, wait_for_full_successful_lifecycle
from ProvenaInterfaces.AsyncJobAPI import *
from typing import List, AsyncGenerator

# L3 interface.


[docs] class JobAdminSubService(ModuleService): _job_api_client: JobAPIClient def __init__(self, auth: AuthManager, config: Config, job_api_client: JobAPIClient) -> None: """Initialises a new job admin sub-service object, which sits between the user and the job-service api operations. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. job_api_client : JobAPIClient This client interacts with the Job API """ self._auth = auth self._config = config # Clients related to the job-api scoped as private. self._job_api_client = job_api_client
[docs] async def launch_job(self, request: AdminLaunchJobRequest) -> AdminLaunchJobResponse: """ Launches a new job. Args: request (AdminLaunchJobRequest): Specified job parameters Returns: AdminLaunchJobResponse: The response """ return await self._job_api_client.admin.launch_job(request=request)
[docs] async def get_job(self, session_id: str) -> AdminGetJobResponse: """ Fetches a job (any job since admin) Args: session_id (str): The session ID of job to fetch Returns: AdminGetJobResponse: The response """ return await self._job_api_client.admin.get_job(session_id=session_id)
[docs] async def list_jobs(self, list_jobs_request: AdminListJobsRequest) -> AdminListJobsResponse: """ Lists all jobs. Args: list_jobs_request (AdminListJobsRequest): The request including pagination information. Returns: AdminListJobsResponse: The list of jobs response. """ return await self._job_api_client.admin.list_jobs(list_jobs_request=list_jobs_request)
[docs] async def list_all_jobs(self, list_jobs_request: AdminListJobsRequest, limit: Optional[int] = None) -> List[JobStatusTable]: """ Lists all jobs for the given user. Will automatically paginate until list is exhausted. Args: list_jobs_request (AdminListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ all_jobs: List[JobStatusTable] = [] count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.admin.list_jobs(list_jobs_request=list_jobs_request) count += len(new_list.jobs) all_jobs.extend(new_list.jobs) if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_jobs_request.pagination_key = new_list.pagination_key return all_jobs
[docs] async def for_all_jobs(self, list_jobs_request: AdminListJobsRequest, limit: Optional[int] = None) -> AsyncGenerator[JobStatusTable, None]: """ Lists all jobs for the given user. Returns lazy generator for use in for loops. Will automatically paginate until list is exhausted. Args: list_jobs_request (AdminListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.admin.list_jobs(list_jobs_request=list_jobs_request) count += len(new_list.jobs) for job in new_list.jobs: yield job if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_jobs_request.pagination_key = new_list.pagination_key
[docs] async def list_job_batch(self, list_request: AdminListByBatchRequest) -> AdminListByBatchResponse: """ List jobs by batch ID, returning a list of jobs in the batch. Args: list_request (AdminListByBatchRequest): The request including batch ID Returns: AdminListByBatchResponse: The list of jobs in the batch """ return await self._job_api_client.admin.list_jobs_in_batch(list_request=list_request)
[docs] async def list_all_jobs_in_batch(self, list_request: AdminListByBatchRequest, limit: Optional[int] = None) -> List[JobStatusTable]: """ Lists all jobs for the given user. Will automatically paginate all entries to exhaust list NOTE this could return more than limit - but figure it may as well return data fetched for efficiency reasons Args: list_jobs_request (AdminListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ all_jobs: List[JobStatusTable] = [] count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.admin.list_jobs_in_batch(list_request=list_request) count += len(new_list.jobs) all_jobs.extend(new_list.jobs) if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_request.pagination_key = new_list.pagination_key return all_jobs
[docs] async def for_all_jobs_in_batch(self, list_request: AdminListByBatchRequest, limit: Optional[int] = None) -> AsyncGenerator[JobStatusTable, None]: """ Lists all jobs for the given user. Will automatically paginate all entries to exhaust list NOTE this could return more than limit - but figure it may as well return data fetched for efficiency reasons Args: list_jobs_request (AdminListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.admin.list_jobs_in_batch(list_request=list_request) count += len(new_list.jobs) for job in new_list.jobs: yield job if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_request.pagination_key = new_list.pagination_key
[docs] class JobService(ModuleService): _job_api_client: JobAPIClient admin: JobAdminSubService def __init__(self, auth: AuthManager, config: Config, job_api_client: JobAPIClient) -> None: """Initialises a new job-service object, which sits between the user and the job-service api operations. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. job_api_client : JobAPIClient This client interacts with the Job API """ self._auth = auth self._config = config # Clients related to the job-api scoped as private. self._job_api_client = job_api_client # Sub modules self.admin = JobAdminSubService( auth=auth, config=config, job_api_client=job_api_client )
[docs] async def get_health_check(self) -> HealthCheckResponse: """ Health check the API Returns: HealthCheckResponse: Response """ return await self._job_api_client.get_health_check()
[docs] async def fetch_job(self, session_id: str) -> GetJobResponse: """ Fetches a job by session id Args: session_id (str): The session ID Returns: GetJobResponse: The job fetched """ return await self._job_api_client.fetch_job(session_id=session_id)
[docs] async def list_jobs(self, list_jobs_request: ListJobsRequest) -> ListJobsResponse: """ Lists all jobs for the given user Can return pagination if page size exceeded Args: list_jobs_request (ListJobsRequest): The request including details Returns: ListJobsResponse: The list of jobs """ return await self._job_api_client.list_jobs(list_jobs_request=list_jobs_request)
[docs] async def list_all_jobs(self, list_jobs_request: ListJobsRequest, limit: Optional[int] = None) -> List[JobStatusTable]: """ Lists all jobs for the given user. Will automatically paginate until list is exhausted. Args: list_jobs_request (ListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ all_jobs: List[JobStatusTable] = [] count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.list_jobs(list_jobs_request=list_jobs_request) count += len(new_list.jobs) all_jobs.extend(new_list.jobs) if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_jobs_request.pagination_key = new_list.pagination_key return all_jobs
[docs] async def for_all_jobs(self, list_jobs_request: ListJobsRequest, limit: Optional[int] = None) -> AsyncGenerator[JobStatusTable, None]: """ Lists all jobs for the given user. Returns lazy generator for use in for loops. Will automatically paginate until list is exhausted. Args: list_jobs_request (ListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.list_jobs(list_jobs_request=list_jobs_request) count += len(new_list.jobs) for job in new_list.jobs: yield job if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_jobs_request.pagination_key = new_list.pagination_key
[docs] async def list_jobs_in_batch(self, list_request: ListByBatchRequest) -> ListByBatchResponse: """ Gets all jobs within a batch. Can return pagination if page size exceeded Args: list_request (ListByBatchRequest): The request including batch ID Returns: ListByBatchResponse: The response including list of jobs """ return await self._job_api_client.list_jobs_in_batch(list_request=list_request)
[docs] async def list_all_jobs_in_batch(self, list_request: ListByBatchRequest, limit: Optional[int] = None) -> List[JobStatusTable]: """ Lists all jobs for the given user. Will automatically paginate all entries to exhaust list NOTE this could return more than limit - but figure it may as well return data fetched for efficiency reasons Args: list_jobs_request (ListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ all_jobs: List[JobStatusTable] = [] count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.list_jobs_in_batch(list_request=list_request) count += len(new_list.jobs) all_jobs.extend(new_list.jobs) if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_request.pagination_key = new_list.pagination_key return all_jobs
[docs] async def for_all_jobs_in_batch(self, list_request: ListByBatchRequest, limit: Optional[int] = None) -> AsyncGenerator[JobStatusTable, None]: """ Lists all jobs for the given user. Will automatically paginate all entries to exhaust list NOTE this could return more than limit - but figure it may as well return data fetched for efficiency reasons Args: list_jobs_request (ListJobsRequest): The request including details limit (Optional[int]): Total record limit to enforce, if any Returns: ListJobsResponse: The list of jobs """ count = 0 # paginate until limit hit while True: new_list = await self._job_api_client.list_jobs_in_batch(list_request=list_request) count += len(new_list.jobs) for job in new_list.jobs: yield job if limit is not None and count >= limit: break if new_list.pagination_key is None: break list_request.pagination_key = new_list.pagination_key
[docs] async def await_job_completion(self, session_id: str, settings: AsyncAwaitSettings = DEFAULT_AWAIT_SETTINGS) -> JobStatusTable: """ Awaits completion of a given job then provides the job info. Completion is defined as a job status which is not pending or in progress. Args: session_id (str): The ID of the job to monitor and await completion. Returns: JobStatusTable: The entry at the latest point. """ return await wait_for_full_lifecycle( session_id=session_id, client=self._job_api_client, settings=settings )
[docs] async def await_successful_job_completion(self, session_id: str, settings: AsyncAwaitSettings = DEFAULT_AWAIT_SETTINGS) -> JobStatusTable: """ Awaits successful completion of a given job then provides the job info. Completion is defined as a job status which is not pending or in progress. Args: session_id (str): The ID of the job to monitor and await completion. Returns: JobStatusTable: The entry at the latest point. """ return await wait_for_full_successful_lifecycle( session_id=session_id, client=self._job_api_client, settings=settings )