Source code for provenaclient.modules.prov

'''
Created Date: Monday June 17th 2024 +1000
Author: Peter Baker
-----
Last Modified: Friday November 29th 2024 4:21:39 pm +1000
Modified By: Parth Kulkarni
-----
Description: Provenance API L3 module. Includes the ProvAPI sub module. Contains IO helper functions for writing/reading files.
-----
HISTORY:
Date      	By	Comments
----------	---	---------------------------------------------------------

29-11-2024 | Parth Kulkarni | Added generate-report functionality. 
22-08-2025 | Peter Baker | Added delete model run capability

'''

from provenaclient.auth.manager import AuthManager
from provenaclient.utils.config import Config
from provenaclient.clients import ProvClient, RegistryClient
from provenaclient.utils.exceptions import *
from provenaclient.modules.module_helpers import *
from provenaclient.utils.helpers import read_file_helper, write_file_helper, get_and_validate_file_path
from typing import List
from provenaclient.models.general import CustomLineageResponse, HealthCheckResponse
from ProvenaInterfaces.ProvenanceAPI import LineageResponse, ModelRunRecord, ConvertModelRunsResponse, RegisterModelRunResponse, RegisterBatchModelRunRequest, RegisterBatchModelRunResponse, PostUpdateModelRunResponse, GenerateReportRequest, PostDeleteGraphResponse
from ProvenaInterfaces.RegistryAPI import ItemModelRun, ItemSubType
from ProvenaInterfaces.SharedTypes import StatusResponse

# L3 interface.

PROV_API_DEFAULT_SEARCH_DEPTH = 3
DEFAULT_CONFIG_FILE_NAME = "prov-api.env"
DEFAULT_RELATIVE_FILE_PATH = "./"


[docs] class ProvAPIAdminSubModule(ModuleService): _prov_api_client: ProvClient _registry_api_client: RegistryClient def __init__(self, auth: AuthManager, config: Config, prov_api_client: ProvClient, registry_api_client: RegistryClient) -> None: """ Admin sub module of the Prov API providing functionality for the admin endpoints. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. auth_client: AuthClient The instantiated auth client """ self._auth = auth self._config = config # Clients related to the prov_api scoped as private. self._prov_api_client = prov_api_client self._registry_api_client = registry_api_client
[docs] async def generate_config_file(self, required_only: bool = True, file_path: Optional[str] = None, write_to_file: bool = False) -> str: """Generates a nicely formatted .env file of the current required/non supplied properties Used to quickly bootstrap a local environment or to understand currently deployed API. Parameters ---------- required_only : bool, optional By default True file_path: str, optional The path you want to save the config file at WITH the file name. If you don't specify a path this will be saved in a relative directory. write_to_file: bool, By default False A boolean flag to indicate whether you want to save the config response to a file or not. Returns ---------- str: Response containing the config text. """ file_path = get_and_validate_file_path( file_path=file_path, write_to_file=write_to_file, default_file_name=DEFAULT_CONFIG_FILE_NAME) config_text: str = await self._prov_api_client.admin.generate_config_file(required_only=required_only) if config_text is None: raise ValueError( f"No data returned for generate config file endpoint.") # Write to file if config text is not None, write to file is True and file path is not None. if write_to_file: if file_path is None: raise ValueError("File path is not set for writing the CSV.") write_file_helper(file_path=file_path, content=config_text) return config_text
[docs] async def store_record(self, registry_record: ItemModelRun, validate_record: bool = True) -> StatusResponse: """An admin only endpoint which enables the reupload/storage of an existing completed provenance record. Parameters ---------- registry_record : ItemModelRun The completed registry record for the model run. validate_record: bool Optional Should the ids in the payload be validated?, by default True Returns ------- StatusResponse A status response indicating the success of the request and any other details. """ return await self._prov_api_client.admin.store_record(registry_record=registry_record, validate_record=validate_record)
[docs] async def store_multiple_records(self, registry_record: List[ItemModelRun], validate_record: bool = True) -> StatusResponse: """An admin only endpoint which enables the reupload/storage of an existing but multiple completed provenance record. Parameters ---------- registry_record : List[ItemModelRun] List of the completed registry record for the model run validate_record validate_record: bool Optional Should the ids in the payload be validated?, by default True Returns ------- StatusResponse A status response indicating the success of the request and any other details. """ return await self._prov_api_client.admin.store_multiple_records(registry_record=registry_record, validate_record=validate_record)
[docs] async def store_all_registry_records(self, validate_record: bool = True) -> StatusResponse: """Applies the store record endpoint action across a list of ItemModelRuns ' which is found by querying the registry model run list endpoint directly. Parameters ---------- validate_record : bool Optional Should the ids in the payload be validated?, by default True Returns ------- StatusResponse A status response indicating the success of the request and any other details. """ return await self._prov_api_client.admin.store_all_registry_records(validate_record=validate_record)
[docs] async def delete_model_run_provenance(self, model_run_id: str, trial_mode: bool = False) -> PostDeleteGraphResponse: """Deletes a model run by its ID - provenance store ONLY""" return await self._prov_api_client.admin.delete_model_run_provenance(model_run_id=model_run_id, trial_mode=trial_mode)
[docs] async def delete_model_run_provenance_and_registry(self, model_run_id: str, trial_mode: bool = False) -> PostDeleteGraphResponse: """Deletes a model run by its ID in both the registry AND in the provenance store.""" # First, delete from provenance diff = await self._prov_api_client.admin.delete_model_run_provenance(model_run_id=model_run_id, trial_mode=trial_mode) if (trial_mode): # done if we are in trial mode - do not execute action return diff else: await self._registry_api_client.admin.delete_item(id=model_run_id, item_subtype=ItemSubType.MODEL_RUN) return diff
[docs] class Prov(ModuleService): _prov_client: ProvClient def __init__(self, auth: AuthManager, config: Config, prov_client: ProvClient, registry_client: RegistryClient) -> None: """Initialises a new datastore object, which sits between the user and the datastore api operations. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. datastore_client : DatastoreClient This client interacts with the Datastore API's. """ self._auth = auth self._config = config # Clients related to the prov-api scoped as private. self._prov_api_client = prov_client # Submodules self.admin = ProvAPIAdminSubModule( auth, config, prov_client, registry_client)
[docs] async def get_health_check(self) -> HealthCheckResponse: """Checks the health status of the PROV-API. Returns ------- HealthCheckResponse Response containing the PROV-API health information. """ return await self._prov_api_client.get_health_check()
[docs] async def update_model_run(self, model_run_id: str, reason: str, record: ModelRunRecord) -> PostUpdateModelRunResponse: """Updates an existing model run with new information. This function triggers an asynchronous update of a model run. The update is processed as a job, and the job session ID is returned for tracking the update progress. Args: model_run_id (str): The ID of the model run to update reason (str): The reason for updating the model run record (ModelRunRecord): The new model run record details Returns: PostUpdateModelRunResponse: Response containing the job session ID tracking the update Example: ```python response = await prov_api.update_model_run( model_run_id="10378.1/1234567", reason="Updating input dataset information", record=updated_model_run_record ) # Get the session ID to track progress session_id = response.session_id ``` """ return await self._prov_api_client.post_update_model_run( model_run_id=model_run_id, reason=reason, record=record )
[docs] async def explore_upstream(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Explores in the upstream direction (inputs/associations) starting at the specified node handle ID. The search depth is bounded by the depth parameter which has a default maximum of 100. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the upstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ upstream_response = await self._prov_api_client.explore_upstream(starting_id=starting_id, depth=depth) typed_upstream_response = CustomLineageResponse.parse_obj( upstream_response.dict()) return typed_upstream_response
[docs] async def explore_downstream(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Explores in the downstream direction (inputs/associations) starting at the specified node handle ID. The search depth is bounded by the depth parameter which has a default maximum of 100. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the downstream direction, by default 100 Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ typed_downstream_response = await self._prov_api_client.explore_downstream(starting_id=starting_id, depth=depth) typed_downstream_response = CustomLineageResponse.parse_obj( typed_downstream_response.dict()) return typed_downstream_response
[docs] async def get_contributing_datasets(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches datasets (inputs) which involved in a model run naturally in the upstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the upstream direction, by default 100 Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ contributing_datasets = await self._prov_api_client.get_contributing_datasets(starting_id=starting_id, depth=depth) typed_contributing_datasets = CustomLineageResponse.parse_obj( contributing_datasets.dict()) return typed_contributing_datasets
[docs] async def get_effected_datasets(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches datasets (outputs) which are derived from the model run naturally in the downstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the downstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ effected_datasets_response = await self._prov_api_client.get_effected_datasets(starting_id=starting_id, depth=depth) typed_effected_datasets = CustomLineageResponse.parse_obj( effected_datasets_response.dict()) return typed_effected_datasets
[docs] async def get_contributing_agents(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches agents (organisations or peoples) that are involved or impacted by the model run. naturally in the upstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the upstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ contributing_agents_response = await self._prov_api_client.get_contributing_agents(starting_id=starting_id, depth=depth) typed_contributing_agents = CustomLineageResponse.parse_obj( contributing_agents_response.dict()) return typed_contributing_agents
[docs] async def get_effected_agents(self, starting_id: str, depth: int = PROV_API_DEFAULT_SEARCH_DEPTH) -> CustomLineageResponse: """Fetches agents (organisations or peoples) that are involved or impacted by the model run. naturally in the downstream direction. Parameters ---------- starting_id : str The ID of the entity to start at. depth : int, optional The depth to traverse in the downstream direction, by default 100. Returns ------- CustomLineageResponse A typed response containing the status, node count, and networkx serialised graph response. """ effected_agents_response = await self._prov_api_client.get_effected_agents(starting_id=starting_id, depth=depth) typed_effected_agents = CustomLineageResponse.parse_obj( effected_agents_response.dict()) return typed_effected_agents
[docs] async def register_batch_model_runs(self, batch_model_run_payload: RegisterBatchModelRunRequest) -> RegisterBatchModelRunResponse: """This function allows you to register multiple model runs in one go (batch) asynchronously. Note: You can utilise the returned session ID to poll on the JOB API to check status of the model run registration(s). Parameters ---------- batch_model_run_payload : RegisterBatchModelRunRequest A list of model runs (ModelRunRecord objects) Returns ------- RegisterBatchModelRunResponse The job session id derived from job-api for the model-run batch. """ return await self._prov_api_client.register_batch_model_runs(model_run_batch_payload=batch_model_run_payload)
[docs] async def register_model_run(self, model_run_payload: ModelRunRecord) -> RegisterModelRunResponse: """Asynchronously registers a single model run. Note: You can utilise the returned session ID to poll on the JOB API to check status of the model run registration. Parameters ---------- model_run_payload : ModelRunRecord Contains information needed for the model run such as workflow template, inputs, outputs, description etc. Returns ------- RegisterModelRunResponse The job session id derived from job-api for the model-run. """ return await self._prov_api_client.register_model_run(model_run_payload=model_run_payload)
[docs] async def generate_csv_template(self, workflow_template_id: str, file_path: Optional[str] = None, write_to_csv: bool = False) -> str: """Generates a model run csv template to be utilised for creating model runs through csv format.. Parameters ---------- workflow_template_id : str An ID of a created and existing model run workflow template. path_to_save_csv: str, optional The path you want to save the csv file at WITH csv file name. If you don't specify a path this will be saved in a relative directory. write_to_csv: bool, By default False A boolean flag to indicate whether you want to save the template to a csv file or not. Returns ---------- str: Response containing the csv template text (encoded in a csv format). """ file_path = get_and_validate_file_path( file_path=file_path, write_to_file=write_to_csv, default_file_name=workflow_template_id + ".csv") csv_text = await self._prov_api_client.generate_csv_template(workflow_template_id=workflow_template_id) if csv_text is None: raise ValueError( f"No data returned for generate CSV template workflow template ID {workflow_template_id}") # Write to file if CSV content is returned and write_to_csv is True and file path is assigned. if write_to_csv: if file_path is None: raise ValueError("File path is not set for writing the CSV.") write_file_helper(file_path=file_path, content=csv_text) return csv_text
[docs] async def convert_model_runs(self, model_run_content: str) -> ConvertModelRunsResponse: """Converts model run with model_run_content provided as a string. Parameters ---------- model_run_content : str The model run information containing the necessary parameters for model run lodge. Returns ------- ConvertModelRunsResponse Returns the model run information in an interactive python datatype. Raises ------ Exception Exception raised when converting string to bytes. """ response = await self._prov_api_client.convert_model_runs_to_csv(csv_file_contents=model_run_content) return response
[docs] async def convert_model_runs_to_csv_with_file(self, file_path: str) -> ConvertModelRunsResponse: """Reads a CSV file, and it's defined model run contents and lodges a model run. Parameters ---------- file_path : str The path of an existing created CSV file containing the necessary parameters for model run lodge. Returns ------- ConvertModelRunsResponse Returns the model run information in an interactive python datatype. """ file_content = read_file_helper(file_path=file_path) response = await self._prov_api_client.convert_model_runs_to_csv(csv_file_contents=file_content) return response
[docs] async def regenerate_csv_from_model_run_batch(self, batch_id: str, file_path: Optional[str] = None, write_to_csv: bool = False) -> str: """Regenerate/create a csv file containing model run information from a model run batch job. The batch id must exist in the system. Parameters ---------- batch_id : str Obtained from creating a batch model run. file_path: str, optional The path you want to save the csv file at WITH CSV file name. If you don't specify a path this will be saved in a relative directory. write_to_csv: bool, By default False A boolean flag to indicate whether you want to save the template to a csv file or not. Returns ---------- str: Response containing the model run information (encoded in csv format). """ file_path = get_and_validate_file_path( file_path=file_path, write_to_file=write_to_csv, default_file_name=batch_id + ".csv") csv_text: str = await self._prov_api_client.regenerate_csv_from_model_run_batch(batch_id=batch_id) if csv_text is None: raise ValueError(f"No data returned for batch ID {batch_id}") # Write to file if CSV content is returned and write_to_csv is True and file path is assigned. if write_to_csv: if file_path is None: raise ValueError("File path is not set for writing the CSV.") write_file_helper(file_path=file_path, content=csv_text) return csv_text
[docs] async def generate_report(self, report_request: GenerateReportRequest, file_path: str = DEFAULT_RELATIVE_FILE_PATH) -> None: """Generates a provenance report from a Study or Model Run Entity containing the associated inputs, model runs and outputs involved. The report is generated in `.docx` and saved at relative directory level. Parameters ---------- report_request : GenerateReportRequest The request object containing the parameters for generating the report, including the `id`, `item_subtype`, and `depth`. """ # Calls API endpoint to generate report document. generated_word_file = await self._prov_api_client.generate_report( report_request=report_request ) # Sanitize the id to avoid file system errors sanitized_filename = report_request.id.replace( "/", "_") + " - Study Close Out Report.docx" # Append file path and file-name together file_path = file_path + sanitized_filename # Writes content into word docx file. write_file_helper(file_path=file_path, content=generated_word_file)