Source code for provenaclient.modules.prov

Created Date: Monday June 17th 2024 +1000
Author: Peter Baker
Last Modified: Friday November 29th 2024 4:21:39 pm +1000
Modified By: Parth Kulkarni
Description: Provenance API L3 module. Includes the ProvAPI sub module. Contains IO helper functions for writing/reading files.
Date      	By	Comments
----------	---	---------------------------------------------------------

29-11-2024 | Parth Kulkarni | Added generate-report functionality. 


from provenaclient.auth.manager import AuthManager
from provenaclient.utils.config import Config
from provenaclient.clients import ProvClient
from provenaclient.utils.exceptions import *
from provenaclient.modules.module_helpers import *
from provenaclient.utils.helpers import read_file_helper, write_file_helper, get_and_validate_file_path
from typing import List
from provenaclient.models.general import CustomLineageResponse, HealthCheckResponse
from ProvenaInterfaces.ProvenanceAPI import LineageResponse, ModelRunRecord, ConvertModelRunsResponse, RegisterModelRunResponse, RegisterBatchModelRunRequest, RegisterBatchModelRunResponse, PostUpdateModelRunResponse, GenerateReportRequest
from ProvenaInterfaces.RegistryAPI import ItemModelRun
from ProvenaInterfaces.SharedTypes import StatusResponse

# L3 interface.


[docs] class ProvAPIAdminSubModule(ModuleService): _prov_api_client: ProvClient def __init__(self, auth: AuthManager, config: Config, prov_api_client: ProvClient) -> None: """ Admin sub module of the Prov API providing functionality for the admin endpoints. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. auth_client: AuthClient The instantiated auth client """ self._auth = auth self._config = config # Clients related to the prov_api scoped as private. self._prov_api_client = prov_api_client
[docs] async def generate_config_file(self, required_only: bool = True, file_path: Optional[str] = None, write_to_file: bool = False) -> str: """Generates a nicely formatted .env file of the current required/non supplied properties Used to quickly bootstrap a local environment or to understand currently deployed API. Parameters ---------- required_only : bool, optional By default True file_path: str, optional The path you want to save the config file at WITH the file name. If you don't specify a path this will be saved in a relative directory. write_to_file: bool, By default False A boolean flag to indicate whether you want to save the config response to a file or not. Returns ---------- str: Response containing the config text. """ file_path = get_and_validate_file_path( file_path=file_path, write_to_file=write_to_file, default_file_name=DEFAULT_CONFIG_FILE_NAME) config_text: str = await self._prov_api_client.admin.generate_config_file(required_only=required_only) if config_text is None: raise ValueError( f"No data returned for generate config file endpoint.") # Write to file if config text is not None, write to file is True and file path is not None. if write_to_file: if file_path is None: raise ValueError("File path is not set for writing the CSV.") write_file_helper(file_path=file_path, content=config_text) return config_text
[docs] async def store_record(self, registry_record: ItemModelRun, validate_record: bool = True) -> StatusResponse: """An admin only endpoint which enables the reupload/storage of an existing completed provenance record. Parameters ---------- registry_record : ItemModelRun The completed registry record for the model run. validate_record: bool Optional Should the ids in the payload be validated?, by default True Returns ------- StatusResponse A status response indicating the success of the request and any other details. """ return await self._prov_api_client.admin.store_record(registry_record=registry_record, validate_record=validate_record)
[docs] async def store_multiple_records(self, registry_record: List[ItemModelRun], validate_record: bool = True) -> StatusResponse: """An admin only endpoint which enables the reupload/storage of an existing but multiple completed provenance record. Parameters ---------- registry_record : List[ItemModelRun] List of the completed registry record for the model run validate_record validate_record: bool Optional Should the ids in the payload be validated?, by default True Returns ------- StatusResponse A status response indicating the success of the request and any other details. """ return await self._prov_api_client.admin.store_multiple_records(registry_record=registry_record, validate_record=validate_record)
[docs] async def store_all_registry_records(self, validate_record: bool = True) -> StatusResponse: """Applies the store record endpoint action across a list of ItemModelRuns ' which is found by querying the registry model run list endpoint directly. Parameters ---------- validate_record : bool Optional Should the ids in the payload be validated?, by default True Returns ------- StatusResponse A status response indicating the success of the request and any other details. """ return await self._prov_api_client.admin.store_all_registry_records(validate_record=validate_record)
[docs] class Prov(ModuleService): _prov_client: ProvClient def __init__(self, auth: AuthManager, config: Config, prov_client: ProvClient) -> None: """Initialises a new datastore object, which sits between the user and the datastore api operations. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. datastore_client : DatastoreClient This client interacts with the Datastore API's. """ self._auth = auth self._config = config # Clients related to the prov-api scoped as private. self._prov_api_client = prov_client # Submodules self.admin = ProvAPIAdminSubModule(auth, config, prov_client)
[docs] async def get_health_check(self) -> HealthCheckResponse: """Checks the health status of the PROV-API. Returns ------- HealthCheckResponse Response containing the PROV-API health information. """ return await self._prov_api_client.get_health_check()
[docs] async def update_model_run(self, model_run_id: str, reason: str, record: ModelRunRecord) -> PostUpdateModelRunResponse: """Updates an existing model run with new information. This function triggers an asynchronous update of a model run. The update is processed as a job, and the job session ID is returned for tracking the update progress. Args: model_run_id (str): The ID of the model run to update reason (str): The reason for updating the model run record (ModelRunRecord): The new model run record details Returns: PostUpdateModelRunResponse: Response containing the job session ID tracking the update Example: ```python response = await prov_api.update_model_run( model_run_id="10378.1/1234567", reason="Updating input dataset information", record=updated_model_run_record ) # Get the session ID to track progress session_id = response.session_id ``` """ return await self._prov_api_client.post_update_model_run( model_run_id=model_run_id, reason=reason, record=record )
[docs] async def explore_upstream(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Explores in the upstream direction (inputs/associations) starting at the specified node handle ID. The search depth is bounded by the depth parameter which has a default maximum of 100. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the upstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ upstream_response = await self._prov_api_client.explore_upstream(starting_id=starting_id, depth=depth) typed_upstream_response = CustomLineageResponse.parse_obj(upstream_response.dict()) return typed_upstream_response
[docs] async def explore_downstream(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Explores in the downstream direction (inputs/associations) starting at the specified node handle ID. The search depth is bounded by the depth parameter which has a default maximum of 100. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the downstream direction, by default 100 Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ typed_downstream_response = await self._prov_api_client.explore_downstream(starting_id=starting_id, depth=depth) typed_downstream_response = CustomLineageResponse.parse_obj(typed_downstream_response.dict()) return typed_downstream_response
[docs] async def get_contributing_datasets(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches datasets (inputs) which involved in a model run naturally in the upstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the upstream direction, by default 100 Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ contributing_datasets = await self._prov_api_client.get_contributing_datasets(starting_id=starting_id, depth=depth) typed_contributing_datasets = CustomLineageResponse.parse_obj(contributing_datasets.dict()) return typed_contributing_datasets
[docs] async def get_effected_datasets(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches datasets (outputs) which are derived from the model run naturally in the downstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the downstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ effected_datasets_response = await self._prov_api_client.get_effected_datasets(starting_id=starting_id, depth=depth) typed_effected_datasets = CustomLineageResponse.parse_obj(effected_datasets_response.dict()) return typed_effected_datasets
[docs] async def get_contributing_agents(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches agents (organisations or peoples) that are involved or impacted by the model run. naturally in the upstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the upstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ contributing_agents_response = await self._prov_api_client.get_contributing_agents(starting_id=starting_id, depth=depth) typed_contributing_agents = CustomLineageResponse.parse_obj(contributing_agents_response.dict()) return typed_contributing_agents
[docs] async def get_effected_agents(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches agents (organisations or peoples) that are involved or impacted by the model run. naturally in the downstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the downstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ effected_agents_response = await self._prov_api_client.get_effected_agents(starting_id=starting_id, depth=depth) typed_effected_agents = CustomLineageResponse.parse_obj(effected_agents_response.dict()) return typed_effected_agents
[docs] async def register_batch_model_runs(self, batch_model_run_payload: RegisterBatchModelRunRequest) -> RegisterBatchModelRunResponse: """This function allows you to register multiple model runs in one go (batch) asynchronously. Note: You can utilise the returned session ID to poll on the JOB API to check status of the model run registration(s). Parameters ---------- batch_model_run_payload : RegisterBatchModelRunRequest A list of model runs (ModelRunRecord objects) Returns ------- RegisterBatchModelRunResponse The job session id derived from job-api for the model-run batch. """ return await self._prov_api_client.register_batch_model_runs(model_run_batch_payload=batch_model_run_payload)
[docs] async def register_model_run(self, model_run_payload: ModelRunRecord) -> RegisterModelRunResponse: """Asynchronously registers a single model run. Note: You can utilise the returned session ID to poll on the JOB API to check status of the model run registration. Parameters ---------- model_run_payload : ModelRunRecord Contains information needed for the model run such as workflow template, inputs, outputs, description etc. Returns ------- RegisterModelRunResponse The job session id derived from job-api for the model-run. """ return await self._prov_api_client.register_model_run(model_run_payload=model_run_payload)
[docs] async def generate_csv_template(self, workflow_template_id: str, file_path: Optional[str] = None, write_to_csv: bool = False) -> str: """Generates a model run csv template to be utilised for creating model runs through csv format.. Parameters ---------- workflow_template_id : str An ID of a created and existing model run workflow template. path_to_save_csv: str, optional The path you want to save the csv file at WITH csv file name. If you don't specify a path this will be saved in a relative directory. write_to_csv: bool, By default False A boolean flag to indicate whether you want to save the template to a csv file or not. Returns ---------- str: Response containing the csv template text (encoded in a csv format). """ file_path = get_and_validate_file_path( file_path=file_path, write_to_file=write_to_csv, default_file_name=workflow_template_id + ".csv") csv_text = await self._prov_api_client.generate_csv_template(workflow_template_id=workflow_template_id) if csv_text is None: raise ValueError( f"No data returned for generate CSV template workflow template ID {workflow_template_id}") # Write to file if CSV content is returned and write_to_csv is True and file path is assigned. if write_to_csv: if file_path is None: raise ValueError("File path is not set for writing the CSV.") write_file_helper(file_path=file_path, content=csv_text) return csv_text
[docs] async def convert_model_runs(self, model_run_content: str) -> ConvertModelRunsResponse: """Converts model run with model_run_content provided as a string. Parameters ---------- model_run_content : str The model run information containing the necessary parameters for model run lodge. Returns ------- ConvertModelRunsResponse Returns the model run information in an interactive python datatype. Raises ------ Exception Exception raised when converting string to bytes. """ response = await self._prov_api_client.convert_model_runs_to_csv(csv_file_contents=model_run_content) return response
[docs] async def convert_model_runs_to_csv_with_file(self, file_path: str) -> ConvertModelRunsResponse: """Reads a CSV file, and it's defined model run contents and lodges a model run. Parameters ---------- file_path : str The path of an existing created CSV file containing the necessary parameters for model run lodge. Returns ------- ConvertModelRunsResponse Returns the model run information in an interactive python datatype. """ file_content = read_file_helper(file_path=file_path) response = await self._prov_api_client.convert_model_runs_to_csv(csv_file_contents=file_content) return response
[docs] async def regenerate_csv_from_model_run_batch(self, batch_id: str, file_path: Optional[str] = None, write_to_csv: bool = False) -> str: """Regenerate/create a csv file containing model run information from a model run batch job. The batch id must exist in the system. Parameters ---------- batch_id : str Obtained from creating a batch model run. file_path: str, optional The path you want to save the csv file at WITH CSV file name. If you don't specify a path this will be saved in a relative directory. write_to_csv: bool, By default False A boolean flag to indicate whether you want to save the template to a csv file or not. Returns ---------- str: Response containing the model run information (encoded in csv format). """ file_path = get_and_validate_file_path( file_path=file_path, write_to_file=write_to_csv, default_file_name=batch_id + ".csv") csv_text: str = await self._prov_api_client.regenerate_csv_from_model_run_batch(batch_id=batch_id) if csv_text is None: raise ValueError(f"No data returned for batch ID {batch_id}") # Write to file if CSV content is returned and write_to_csv is True and file path is assigned. if write_to_csv: if file_path is None: raise ValueError("File path is not set for writing the CSV.") write_file_helper(file_path=file_path, content=csv_text) return csv_text
[docs] async def generate_report(self, report_request:GenerateReportRequest, file_path: str = DEFAULT_RELATIVE_FILE_PATH) -> None: """Generates a provenance report from a Study or Model Run Entity containing the associated inputs, model runs and outputs involved. The report is generated in `.docx` and saved at relative directory level. Parameters ---------- report_request : GenerateReportRequest The request object containing the parameters for generating the report, including the `id`, `item_subtype`, and `depth`. """ # Calls API endpoint to generate report document. generated_word_file = await self._prov_api_client.generate_report( report_request=report_request ) # Sanitize the id to avoid file system errors sanitized_filename ="/", "_") + " - Study Close Out Report.docx" # Append file path and file-name together file_path = file_path + sanitized_filename # Writes content into word docx file. write_file_helper(file_path=file_path,content = generated_word_file)