Source code for provenaclient.modules.auth

'''
Created Date: Friday May 31st 2024 +1000
Author: Peter Baker
-----
Last Modified: Friday May 31st 2024 9:50:26 am +1000
Modified By: Peter Baker
-----
Description: Auth API L3 Module.
-----
HISTORY:
Date      	By	Comments
----------	---	---------------------------------------------------------
'''

from provenaclient.auth.manager import AuthManager
from provenaclient.utils.config import Config
from provenaclient.clients import AuthClient
from provenaclient.modules.module_helpers import *
from provenaclient.models import HealthCheckResponse
from ProvenaInterfaces.AuthAPI import *

# L3 interface.


[docs] class AdminAuthSubModule(ModuleService): _auth_client: AuthClient def __init__(self, auth: AuthManager, config: Config, auth_client: AuthClient) -> None: """ Admin sub module of the Auth API functionality Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. auth_client: AuthClient The instantiated auth client """ self._auth = auth self._config = config # Clients related to the datastore scoped as private. self._auth_client = auth_client
[docs] async def get_all_pending_request_history(self) -> AccessRequestList: """ Gets all requests with pending status Returns: AccessRequestList: The response object """ return await self._auth_client.admin.get_all_pending_request_history()
[docs] async def get_all_request_history(self) -> AccessRequestList: """ Gets all requests Returns: AccessRequestList: The response object """ return await self._auth_client.admin.get_all_request_history()
[docs] async def get_user_pending_request_history(self, username: str) -> AccessRequestList: """ Gets pending requests for specified user Args: username (str): Username to query Returns: AccessRequestList: The response list """ return await self._auth_client.admin.get_user_pending_request_history(username=username)
[docs] async def get_user_request_history(self, username: str) -> AccessRequestList: """ Gets all requests for specified user Args: username (str): Username to query Returns: AccessRequestList: The response list """ return await self._auth_client.admin.get_user_request_history(username=username)
[docs] async def post_add_note(self, note: RequestAddNote) -> None: """ Adds a note to an existing request Args: note (RequestAddNote): Payload incl note info """ return await self._auth_client.admin.post_add_note( note=note )
[docs] async def post_change_request_state(self, send_email_alert: bool, change: AccessRequestStatusChange) -> ChangeStateStatus: """ Change state of a request. Args: send_email_alert (bool): Should trigger email alert? change (AccessRequestStatusChange): The details of change Returns: ChangeStateStatus: The response object """ return await self._auth_client.admin.post_change_request_state( change=change, send_email_alert=send_email_alert )
[docs] async def get_list_groups(self) -> ListGroupsResponse: """ Gets a list of groups Returns: ListGroupsResponse: List of groups """ return await self._auth_client.admin.get_list_groups()
[docs] async def get_describe_group(self, group_id: str) -> DescribeGroupResponse: """ Describes a group by ID Args: group_id (str): The group Returns: DescribeGroupResponse: Description """ return await self._auth_client.admin.get_describe_group(group_id=group_id)
[docs] async def get_list_members(self, group_id: str) -> ListMembersResponse: """ Lists members of group Args: group_id (str): The gruop Returns: ListMembersResponse: The list of members """ return await self._auth_client.admin.get_list_members(group_id=group_id)
[docs] async def get_list_group_membership(self, username: str) -> ListUserMembershipResponse: """ Gets list of groups a user is in Args: username (str): The username Returns: ListUserMembershipResponse: The list of groups """ return await self._auth_client.admin.get_list_group_membership(username=username)
[docs] async def get_check_user_membership(self, username: str, group_id: str) -> CheckMembershipResponse: """ Checks user membership within a group Args: username (str): The username to target group_id (str): The group to check Returns: CheckMembershipResponse: Response """ return await self._auth_client.admin.get_check_user_membership(username=username, group_id=group_id)
[docs] async def post_groups_add_member(self, group_id: str, user: GroupUser) -> AddMemberResponse: """ Adds a member to a group Args: group_id (str): Id of group user (GroupUser): The user to add Returns: AddMemberResponse: The response """ return await self._auth_client.admin.post_groups_add_member(user=user, group_id=group_id)
[docs] async def delete_remove_member(self, group_id: str, username: str) -> RemoveMemberResponse: """ Removes member from group Args: group_id (str): The id of group username (str): The user to remove Returns: RemoveMemberResponse: The response """ return await self._auth_client.admin.delete_remove_member(username=username, group_id=group_id)
[docs] async def post_add_group(self, group: UserGroupMetadata) -> AddGroupResponse: """ Adds a group/creates group Args: group (UserGroupMetadata): The group details Returns: AddGroupResponse: The response """ return await self._auth_client.admin.post_add_group(group=group)
[docs] async def put_update_group(self, group: UserGroupMetadata) -> UpdateGroupResponse: """ Updates group details Args: group (UserGroupMetadata): The group metadata Returns: UpdateGroupResponse: The response """ return await self._auth_client.admin.put_update_group(group=group)
[docs] async def get_export_groups(self) -> GroupsExportResponse: """ Exports all group details in specified format. Returns: GroupsExportResponse: The data dump """ return await self._auth_client.admin.get_export_groups()
[docs] async def post_import_groups(self, body: GroupsImportRequest) -> GroupsImportResponse: """ Imports groups from data dump back in Args: body (GroupsImportRequest): The import request incl. dump Returns: GroupsImportResponse: The response """ return await self._auth_client.admin.post_import_groups(body=body)
[docs] async def post_restore_groups_from_table(self, table_name: str, body: GroupsRestoreRequest) -> GroupsImportResponse: """ Restores groups by first dumping from valid group table. Needs permissions to table. Args: table_name (str): The table name body (GroupsRestoreRequest): The request Returns: GroupsImportResponse: The response details """ return await self._auth_client.admin.post_restore_groups_from_table(body=body, table_name=table_name)
[docs] class Auth(ModuleService): # Internal clients _auth_client: AuthClient # Sub Modules admin: AdminAuthSubModule def __init__(self, auth: AuthManager, config: Config, auth_client: AuthClient) -> None: """ Sets up the auth API L3 module. Includes admin sub module. Parameters ---------- auth : AuthManager An abstract interface containing the user's requested auth flow method. config : Config A config object which contains information related to the Provena instance. """ self._auth = auth self._config = config # Clients related to the datastore scoped as private. self._auth_client = auth_client # Sub Modules self.admin = AdminAuthSubModule( auth=self._auth, config=self._config, auth_client=self._auth_client )
[docs] async def get_health_check(self) -> HealthCheckResponse: """ Health check the API Returns: HealthCheckResponse: Response """ return await self._auth_client.get_health_check()
[docs] async def get_user_request_history(self) -> AccessRequestList: """ Gets the users access request history Returns: AccessRequestList: The list of requests """ return await self._auth_client.get_user_request_history()
[docs] async def post_user_request_change(self, body: AccessReport, send_email: bool) -> StatusResponse: """ Requests a change by diffing access models Args: body (AccessReport): The new access desired send_email (bool): Email alert Returns: StatusResponse: Ok? """ return await self._auth_client.post_user_request_change(body=body, send_email=send_email)
[docs] async def get_user_pending_request_history(self) -> AccessRequestList: """ Gets only pending requests from history Returns: AccessRequestList: The list """ return await self._auth_client.get_user_pending_request_history()
[docs] async def get_user_generate_access_report(self) -> AccessReportResponse: """ Generates an access report detailing system access. Returns: AccessReportResponse: The response """ return await self._auth_client.get_user_generate_access_report()
[docs] async def get_list_groups(self) -> ListGroupsResponse: """ Lists all groups Returns: ListGroupsResponse: List of groups """ return await self._auth_client.get_list_groups()
[docs] async def get_describe_group(self, group_id: str) -> DescribeGroupResponse: """ Describes a specific gruop Args: group_id (str): The id of group Returns: DescribeGroupResponse: Response with details """ return await self._auth_client.get_describe_group(group_id=group_id)
[docs] async def get_list_membership(self) -> ListUserMembershipResponse: """ Gets the list of groups user is member of Returns: ListUserMembershipResponse: List and details """ return await self._auth_client.get_list_membership()
[docs] async def get_list_group_members(self, group_id: str) -> ListMembersResponse: """ Lists the members of a given group Args: group_id (str): The group to lookup Returns: ListMembersResponse: Members """ return await self._auth_client.get_list_group_members(group_id=group_id)
[docs] async def get_check_membership(self, group_id: str) -> CheckMembershipResponse: """ Checks if user is in specific group Args: group_id (str): The group to check Returns: CheckMembershipResponse: In group? """ return await self._auth_client.get_check_membership(group_id=group_id)