Quickstart
Initialising the provena client
To use the provena client, you will need to provide both auth
and config
objects
to select the authentication method, and configuration details for the
Provena instance you would like to interface with.
Replace my-provena.cloud
with the URL of the Provena instance and change realm_name
value
to the appropriate Keycloak realm.
client_config = Config(
domain="my-provena.cloud",
realm_name="provena"
)
auth = DeviceFlow(config=client_config, client_id="client-tools")
client = ProvenaClient(auth=auth, config=client_config)
await client.datastore.get_health_check()
Finding and searching items from the Registry
Fetching items from the Registry
You can fetch any item from the Registry using general_fetch_item()
and the id of the item.
res = await client.registry.general_fetch_item(id=id)
print(f"{res.item['display_name']} {res.item['item_subtype']}")
Each sub-type has an associated fetch()
too.
res = await client.registry.model_run.fetch(id=id)
You can use the ID of an item if you know ahead of time, or you can list them.
Listing items in the Registry
General list function
You can list items from the Registry using list_general_registry_items()
.
This function expects an instance of GeneralListRequest
and returns a generic result list.
from ProvenaInterfaces.RegistryAPI import GeneralListRequest
general_list_request = GeneralListRequest(
filter_by=None,
sort_by=None,
pagination_key=None
)
res = await client.registry.list_general_registry_items(general_list_request)
Sub-type list function
You can also list item from the sub-type modules via list_items()
. In this case, it will only return a list of
registry items of that sub-type. You may need to cast the returned object into Responses using the
pydantic parse_obj_as()
.
from pydantic import parse_obj_as
list_model_run_workflow = await client.registry.model_run_workflow.list_items(list_items_payload=general_list_request)
model_run_workflow_templates = parse_obj_as(ModelRunWorkflowTemplateListResponse, list_model_run_workflow)
print(f"Found {model_run_workflow_templates.total_item_count} model_run_workflow_templates")
Iterate over the list response like so.
for item in model_run_workflow_templates.items:
m = parse_obj_as(ItemModelRunWorkflowTemplate, item)
print(m)
Listing dataset items
Datasets are special. List them this way from the client library’s datastore
module.
list_dataset = await client.datastore.list_all_datasets()
print(f"Found {len(list_dataset)} datasets")
for item in list_dataset:
d = parse_obj_as(ItemDataset, item)
print(d)
Search for items
Provena provides a general search engine to search all items in the Registry.
Currently the search results return a QueryResult
object with:
status
(Search result success or failure withsuccess
anddetails
attributes)results
(list of results ofid
andscore
attributes for each result)warnings
qres = await client.search.search_registry(query="fire", limit=10, subtype_filter=ItemSubType.DATASET)
assert qres.status.success
for r in qres.results:
print(f"{r.id} {r.score}")
subtype_filter
can beNone
if you want to search over all subtypeslimit
can also beNone
to unrestrict the number of returned items
Using the returned id
for a result, you can issue a fetch request (see above).
Registering
Registering items creates an object in the registry with required metadata/payload values and issues a unique ID (via Handle).
See the following Provena docs section for more info on the Registry: https://docs.provena.io/registry/
Registering datasets
Background information about registering a dataset in Provena is found here: https://docs.provena.io/data-store/registering-a-dataset.html
Minting a dataset.
register_response = await client.datastore.mint_dataset(dataset_mint_info=metadata)
assert register_response.status.success
print(f"Created Dataset Handle: {register_response.handle}. Access entity link: https://hdl.handle.net/{register_response.handle}")
The dataset may already exists and instead of minting, you may want to version it. Find out more about versioning here: https://docs.provena.io/versioning/versioning-overview.html
First we must find the latest version of the dataset. One way to do this would be to use the following function.
async def latest_version_of_dataset(dataset_id):
fetch_resp = await client.datastore.fetch_dataset(id=dataset_id)
if fetch_resp.item.versioning_info.next_version == None or fetch_resp.item.versioning_info.next_version == "":
#assume this was the latest version
return dataset_id
else:
latest_version = await latest_version_of_dataset(fetch_resp.item.versioning_info.next_version)
return latest_version
Then apply that function…
print(f"Find the latest version of this dataset {dataset_item.id}")
latest_version = await latest_version_of_dataset(dataset_item.id)
#use the latest_version id for the version request
version_request = VersionRequest(
id=latest_version,
reason="Updating dataset"
)
register_response = await client.datastore.version_dataset(version_request=version_request)
print(f"New Version of Dataset Handle: {register_response.new_version_id}. Access entity link: https://hdl.handle.net/{register_response.new_version_id}")
This creates a new dataset with its metadata copied over. We will then update the dataset metadata.
register_response = await client.datastore.update_dataset_metadata(handle_id=register_response.new_version_id,
reason="Updating metadata for new version",
metadata_payload=metadata)
Registering a model run
Build the payload
from ProvenaInterfaces.ProvenanceAPI import ModelRunRecord, TemplatedDataset, DatasetType, AssociationInfo
from ProvenaInterfaces.AsyncJobAPI import JobStatus
# Building the Model Run Payload.
model_run_payload = ModelRunRecord(
workflow_template_id=model_run_workflow_template_item.id,
model_version = None,
inputs = [
TemplatedDataset(
dataset_template_id=input_dataset_template_item.id,
dataset_id=input_dataset_item.id,
dataset_type=DatasetType.DATA_STORE
)
],
outputs=[
TemplatedDataset(
dataset_template_id=output_dataset_template_item.id,
dataset_id=output_dataset_item.id,
dataset_type=DatasetType.DATA_STORE
)
],
annotations=None,
display_name="Notebook Model Run Testing",
description="Standard Provena Model Run Example",
study_id=None,
associations=AssociationInfo(
modeller_id=person_item.id,
requesting_organisation_id=organisation_item.id
),
start_time=start_time,
end_time=end_time
)
Register the payload
model_run_register_result = await client.prov_api.register_model_run(model_run_payload=model_run_payload)
# Check the response of the model run registration
print("Status of registration", model_run_register_result.status)
print("Job Session ID", model_run_register_result.session_id)
# Check the job to see if it's complete. We will do this by polling the job_api
job_result = await client.job_api.await_successful_job_completion(session_id=model_run_register_result.session_id)
while job_result.status != JobStatus.SUCCEEDED: # Keep polling on this cell till this turns to "SUCCEEDED"
job_result = await client.job_api.await_successful_job_completion(session_id=model_run_register_result.session_id)
pprint(job_result.result)
pprint(job_result.job_type)
print()
print("Current job status:", job_result.status)
Inspect the result of a successful model run record registration.
from pprint import pprint
model_run_record = job_result.result["record"]
pprint(model_run_record)
To understand more about what it means to register a model run, see https://docs.provena.io/provenance/registering-model-runs/registration-process/overview.html.
Registering other items via the Registry
Other than Dataset and Model Run, registering all other registry subtypes is straightforward. You will need to instantiate the relevant DomainInfo payload. Then use the sub-type module in the registry client module to create the item.
We will show a workflow for a “Model” sub-type below, however, you can substitute this for other types, i.e. Dataset Template, Model Run Workflow Template, Organisation, Person, Study.
First build the DomainInfo payload for the Model
subtype using ModelDomainInfo
.
from ProvenaInterfaces.RegistryModels import *
model_payload = ModelDomainInfo(
display_name=model_item_payload['display_name'],
name=model_item_payload['name'] ,
description=model_item_payload['description'],
documentation_url=model_item_payload['documentation_url'],
source_url=model_item_payload['source_url'],
user_metadata=model_item_payload['user_metadata']
)
Register the Model
subtype instance via create_item()
.
model_register_result = await client.registry.model.create_item(create_item_request=model_payload)
assert model_register_result.status.success
Just like a Dataset
, the item may already exists and instead of creating the item, you may want to version it. Find out more about
versioning here: https://docs.provena.io/versioning/versioning-overview.html
First we must find the latest version of the item. One way to do this would be to use the following function.
We will continue using the Model
example.
One way to find the latest version of the item for the Model
subtype is:
async def latest_version_of_model(item_id):
fetch_resp = await client.registry.model.fetch(id=item_id)
if fetch_resp.item.versioning_info.next_version == None or fetch_resp.item.versioning_info.next_version == "":
#assume this was the latest version
return item_id
else:
latest_version = await latest_version_of_model(fetch_resp.item.versioning_info.next_version)
return latest_version
We can then apply latest_version_of_model()
in the steps to register a new version of the Model
.
latest_version = await latest_version_of_model(model_item.id)
print(f"Find the latest version of this {model_item.id}")
latest_version = await latest_version_of_model(model_item.id)
print(f"Latest version: {latest_version}")
#use the latest_version id for the version request
version_request = VersionRequest(
id=latest_version,
reason="Updating model"
)
register_response = await client.registry.model.version_item(version_request=version_request)
print(f"New Version of Model Handle: {register_response.new_version_id}. Access entity link: https://hdl.handle.net/{register_response.new_version_id}")
#update the metadata
register_response = await client.registry.model.update(id=register_response.new_version_id,
reason="Updating metadata for new version",
domain_info=model_payload)