diff --git a/dspace_rest_client/client.py b/dspace_rest_client/client.py index 44db8c3..a8b4248 100644 --- a/dspace_rest_client/client.py +++ b/dspace_rest_client/client.py @@ -21,6 +21,7 @@ import functools import os from uuid import UUID +from urllib.parse import urlparse import requests from requests import Request @@ -1380,6 +1381,179 @@ def get_users_iter(do_paginate, self, sort=None, embeds=None): params["sort"] = sort return do_paginate(url, params) + + def get_user_by_uuid(self, uuid, embeds=None): + """ + Get a single user by UUID + @param uuid: UUID of the user + @param embeds: Optional list of resources to embed in response JSON + @return: User object constructed from the API response or None if not found + """ + url = f"{self.API_ENDPOINT}/eperson/epersons/{uuid}" + params = parse_params(embeds=embeds) + r = self.api_get(url, params=params) + r_json = parse_json(response=r) + return User(r_json) if r_json else None + + def search_user_by_email(self, email): + """ + Search for a user by email + @param email: User's email address + @return: User object if found, None otherwise + """ + url = f"{self.API_ENDPOINT}/eperson/epersons/search/byEmail" + params = {"email": email} + r = self.api_get(url, params=params) + r_json = parse_json(response=r) + return User(r_json) if r_json else None + + def search_users_by_metadata(self, query, embeds=None): + """ + Search users by metadata + @param query: Search query (UUID, name, email, etc.) + @param embeds: Optional list of resources to embed in response JSON + @return: List of User objects matching the query + """ + url = f"{self.API_ENDPOINT}/eperson/epersons/search/byMetadata" + params = parse_params({"query": query}, embeds=embeds) + r = self.api_get(url, params=params) + r_json = parse_json(response=r) + users = [] + if "_embedded" in r_json and "epersons" in r_json["_embedded"]: + users = [ + User(user_resource) for user_resource in r_json["_embedded"]["epersons"] + ] + return users + + def get_eperson_id_of_user(self): + """ + Get the EPerson ID of the current user + authn/status response includes the eperson link + the uuid can be parsed from the eperson link and returned as text + @return: String of the user id or None in case of an error + """ + url = f"{self.API_ENDPOINT}/authn/status" + try: + r = self.api_get(url) + r_json = parse_json(response=r) + if "_links" in r_json: + eperson_href = r_json["_links"]["eperson"]["href"] + path = urlparse(eperson_href).path + uuid = os.path.basename(path) + return uuid + else: + logging.error("EPerson link not found in response.") + return None + except Exception as e: + logging.error("Error retrieving EPerson ID: %s", e) + return None + + def get_special_groups_of_user(self): + """ + Get the special groups of a user + authn/status/specialGroups + @return: List of Group objects or None in case of an error + """ + url = f"{self.API_ENDPOINT}/authn/status/specialGroups" + try: + r = self.api_get(url) + r_json = parse_json(response=r) + if "_embedded" in r_json and "specialGroups" in r_json["_embedded"]: + groups = [ + Group(group_resource) + for group_resource in r_json["_embedded"]["specialGroups"] + ] + return groups + logging.error("Special groups not found in response.") + return None + except Exception as e: + logging.error("Error retrieving special groups: %s", e) + return None + + def get_groups_of_user(self, user_uuid): + """ + Get groups of a user + @param user_uuid: UUID of the user + @return: List of Group objects + """ + url = f"{self.API_ENDPOINT}/eperson/epersons/{user_uuid}/groups" + r = self.api_get(url) + r_json = parse_json(response=r) + groups = [] + if "_embedded" in r_json and "groups" in r_json["_embedded"]: + groups = [ + Group(group_resource) + for group_resource in r_json["_embedded"]["groups"] + ] + return groups + + def search_users_not_in_group(self, group_uuid, query=None, embeds=None): + """ + Search users not in a specific group + @param group_uuid: UUID of the group + @param query: Search query (UUID, name, email, etc.) + @param embeds: Optional list of resources to embed in response JSON + @return: List of User objects matching the query + """ + url = f"{self.API_ENDPOINT}/eperson/epersons/search/isNotMemberOf" + params = parse_params(params={"group": group_uuid, "query": query}, embeds=embeds) + r = self.api_get(url, params=params) + r_json = parse_json(response=r) + users = [] + if "_embedded" in r_json and "epersons" in r_json["_embedded"]: + users = [ + User(user_resource) for user_resource in r_json["_embedded"]["epersons"] + ] + return users + + def update_user_metadata(self, user_uuid, path, value, embeds=None): + """ + Update user metadata + @param user_uuid: UUID of the user + @param metadata_updates: List of metadata updates in the PATCH format + @return: Updated User object or None if the operation fails + """ + url = f"{self.API_ENDPOINT}/eperson/epersons/{user_uuid}" + r = self.api_patch( + url=url, + operation="replace", + path=path, + value=value, + params=parse_params(embeds=embeds), + ) + r_json = parse_json(response=r) + return User(r_json) if r_json else None + + def change_user_password(self, user_uuid, current_password, new_password): + """ + Change the password of a user + @param user_uuid: UUID of the user + @param current_password: Current password of the user + @param new_password: New password for the user + @return: Boolean indicating success or failure + """ + # TODO: ensure this is only triggered when the user management is done in DSpace directly. + # If the user management is done in an external system (e.g. LDAP), this method should not be used. + url = f"{self.API_ENDPOINT}/eperson/epersons/{user_uuid}" + r = self.api_patch( + url, + operation="add", + path="/password", + value={ + "new_password": new_password, + "current_password": current_password, + }, + ) + if r.status_code == 200: + logging.info("Updated Password for user %s", user_uuid) + return True + if r.status_code == 422: + logging.error( + "Password does not respect the rules configured in the regular expression." + ) + return False + logging.error("An error occurred updating the password.") + return False def create_group(self, group, embeds=None): """ @@ -1400,6 +1574,265 @@ def create_group(self, group, embeds=None): ) ) + def get_groups(self, page=0, size=20, embeds=None): + """ + Fetch all groups + @param page: Page number for pagination + @param size: Number of results per page + @param embeds: Optional list of resources to embed in response JSON + @return: List of Group objects + """ + url = f"{self.API_ENDPOINT}/eperson/groups" + params = parse_params({"page": page, "size": size}, embeds=embeds) + response = self.api_get(url, params=params) + response_json = parse_json(response=response) + groups = [] + + if "_embedded" in response_json and "groups" in response_json["_embedded"]: + for group_data in response_json["_embedded"]["groups"]: + groups.append(Group(group_data)) + + return groups + + def get_subgroups(self, parent_uuid, page=0, size=20): + """ + Get all subgroups of a parent group + @param parent_uuid: UUID of the parent group + @param page: Page number for pagination + @param size: Number of results per page + @return: List of Group objects + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{parent_uuid}/subgroups" + params = parse_params({"page": page, "size": size}) + response = self.api_get(url, params=params) + response_json = parse_json(response=response) + subgroups = [] + + if "_embedded" in response_json and "groups" in response_json["_embedded"]: + for group_data in response_json["_embedded"]["groups"]: + subgroups.append(Group(group_data)) + + return subgroups + + def add_subgroup(self, parent_uuid, child_uuid): + """ + Add a subgroup to a parent group + @param parent_uuid: UUID of the parent group + @param child_uuid: UUID of the subgroup to add + @return: Boolean indicating success or failure + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{parent_uuid}/subgroups" + data = f"{self.API_ENDPOINT}/eperson/groups/{child_uuid}" + response = self.api_post_uri(url, uri_list=data, params=None) + if response.status_code == 204: + return True + if response.status_code == 401: + logging.error("You are not authenticated") + return False + if response.status_code == 403: + logging.error("You are not logged in with sufficient permissions") + return False + if response.status_code == 404: + logging.error("The parent group doesn't exist") + return False + if response.status_code == 422: + logging.error( + "The specified group is not found, or if adding the group would create a cyclic reference" + ) + return False + logging.error( + "Failed to add subgroup %s to group %s: %s", + child_uuid, + parent_uuid, + response.text, + ) + return False + + def remove_subgroup(self, parent_uuid, child_uuid): + """ + Remove a subgroup from a parent group + @param parent_uuid: UUID of the parent group + @param child_uuid: UUID of the subgroup to remove + @return: Boolean indicating success or failure + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{parent_uuid}/subgroups/{child_uuid}" + response = self.api_delete(url, params=None) + if response.status_code == 204: + return True + if response.status_code == 401: + logging.error("You are not authenticated") + return False + if response.status_code == 403: + logging.error("You are not logged in with sufficient permissions") + return False + if response.status_code == 404: + logging.error("The parent group doesn't exist") + return False + if response.status_code == 422: + logging.error("The specified group is not found") + return False + logging.error( + "Failed to remove subgroup %s from group %s: %s", + child_uuid, + parent_uuid, + response.text, + ) + return False + + def search_groups_by_metadata(self, query, page=0, size=20): + """ + Search for groups by metadata + @param query: Search query (UUID or group name) + @param page: Page number for pagination + @param size: Number of results per page + @return: List of Group objects + """ + url = f"{self.API_ENDPOINT}/eperson/groups/search/byMetadata" + params = parse_params({"query": query, "page": page, "size": size}) + response = self.api_get(url, params=params) + response_json = parse_json(response=response) + groups = [] + + if "_embedded" in response_json and "groups" in response_json["_embedded"]: + for group_data in response_json["_embedded"]["groups"]: + groups.append(Group(group_data)) + + return groups + + def get_epersons_in_group(self, group_uuid, page=0, size=20): + """ + Fetch all EPersons in a group + @param group_uuid: UUID of the group + @param page: Page number for pagination + @param size: Number of results per page + @return: List of User objects + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{group_uuid}/epersons" + params = parse_params({"page": page, "size": size}) + response = self.api_get(url, params=params) + response_json = parse_json(response=response) + epersons = [] + + if "_embedded" in response_json and "epersons" in response_json["_embedded"]: + for eperson_data in response_json["_embedded"]["epersons"]: + epersons.append(User(eperson_data)) + + return epersons + + def add_eperson_to_group(self, group_uuid, eperson_uuid): + """ + Add an EPerson to a group + @param group_uuid: UUID of the group + @param eperson_uuid: UUID of the EPerson to add + @return: Boolean indicating success or failure + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{group_uuid}/epersons" + # check if the eperson exists and is valid + eperson = self.get_user_by_uuid(eperson_uuid) + if eperson is None: + logging.error("The specified EPerson does not exist") + return False + if not isinstance(eperson, User): + logging.error("Invalid EPerson object") + return False + # check if the group exists and is valid + group = self.get_group_by_uuid(group_uuid) + if group is None: + logging.error("The specified group does not exist") + return False + if not isinstance(group, Group): + logging.error("Invalid Group object") + return False + data = f"{self.API_ENDPOINT}/eperson/epersons/{eperson_uuid}" + response = self.api_post_uri(url, uri_list=data, params=None) + if response.status_code == 204: + return True + if response.status_code == 401: + logging.error("You are not authenticated") + return False + if response.status_code == 403: + logging.error("You are not logged in with sufficient permissions") + return False + if response.status_code == 422: + logging.error("The specified group or EPerson is not found") + return False + logging.error( + "Failed to add EPerson %s to group %s: %s", + eperson_uuid, + group_uuid, + response.text, + ) + return False + + def remove_eperson_from_group(self, group_uuid, eperson_uuid): + """ + Remove an EPerson from a group + @param group_uuid: UUID of the group + @param eperson_uuid: UUID of the EPerson to remove + @return: Boolean indicating success or failure + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{group_uuid}/epersons/{eperson_uuid}" + response = self.api_delete(url, params=None) + if response.status_code == 204: + return True + if response.status_code == 401: + logging.error("You are not authenticated") + return False + if response.status_code == 403: + logging.error("You are not logged in with sufficient permissions") + return False + if response.status_code == 422: + logging.error("The specified group or EPerson is not found") + return False + logging.error( + "Failed to remove EPerson %s from group %s: %s", + eperson_uuid, + group_uuid, + response.text, + ) + return False + + def update_group_name(self, uuid, new_name): + """ + Update the name of a group + @param uuid: UUID of the group + @param new_name: New name for the group + @return: Updated Group object or None if the update fails + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{uuid}" + response = self.api_patch( + url, operation="replace", path="/name", value=new_name + ) + response_json = parse_json(response=response) + return Group(response_json) if response_json else None + + def delete_group(self, uuid): + """ + Delete a group by UUID + @param uuid: UUID of the group + @return: Boolean indicating success or failure + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{uuid}" + response = self.api_delete(url, params=None) + if response.status_code == 204: + return True + else: + logging.error("Failed to delete group %s: %s", uuid, response.text) + return False + + def get_group_by_uuid(self, uuid, embeds=None): + """ + Fetch a single group by UUID + @param uuid: UUID of the group + @param embeds: Optional list of resources to embed in response JSON + @return: Group object or None if not found + """ + url = f"{self.API_ENDPOINT}/eperson/groups/{uuid}" + params = parse_params(embeds=embeds) + response = self.api_get(url, params=params) + response_json = parse_json(response=response) + return Group(response_json) if response_json else None + def start_workflow(self, workspace_item): url = f"{self.API_ENDPOINT}/workflow/workflowitems" res = parse_json(self.api_post_uri(url, params=None, uri_list=workspace_item)) diff --git a/example_eperson_group.py b/example_eperson_group.py new file mode 100644 index 0000000..3e543b8 --- /dev/null +++ b/example_eperson_group.py @@ -0,0 +1,179 @@ + +# This software is licenced under the BSD 3-Clause licence +# available at https://opensource.org/licenses/BSD-3-Clause +# and described in the LICENCE file in the root of this project + +""" +Example Python 3 application using the dspace.py API client library to retrieve basic person +and group information in a DSpace repository +""" + +import logging +import os +import sys + +from dspace_rest_client.client import DSpaceClient + +# Import models as below if needed +# from dspace_rest_d.models import Community, Collection, Item, Bundle, Bitstream + +# Example variables needed for authentication and basic API requests +# SET THESE TO MATCH YOUR TEST SYSTEM BEFORE RUNNING THE EXAMPLE SCRIPT +# You can also leave them out of the constructor and set environment variables instead: +# DSPACE_API_ENDPOINT= +# DSPACE_API_USERNAME= +# DSPACE_API_PASSWORD= +# USER_AGENT= +DEFAULT_URL = "https://localhost:8080/server/api" +DEFAULT_USERNAME = "username@test.system.edu" +DEFAULT_PASSWORD = "password" + +GROUP_UUID = "UUID_OF_GROUP_TO_FETCH" +NEW_GROUP_NAME = "New Test Group" +UPDATED_GROUP_NAME = "Updated Test Group" +PARENT_GROUP_UUID = "UUID_OF_PARENT_GROUP" +CHILD_GROUP_UUID = "UUID_OF_CHILD_GROUP" +EPERSON_UUID = "UUID_OF_EPERSON_TO_FETCH" +QUERY = "Administrator" +SEARCH_EMAIL = "username@test.system.edu" +SEARCH_PERSON_QUERY = "Test" +SEARCH_GROUP_QUERY = "Administrator" + +# Configuration from environment variables +URL = os.environ.get("DSPACE_API_ENDPOINT", DEFAULT_URL) +USERNAME = os.environ.get("DSPACE_API_USERNAME", DEFAULT_USERNAME) +PASSWORD = os.environ.get("DSPACE_API_PASSWORD", DEFAULT_PASSWORD) + +# Instantiate DSpace client +# Note the 'fake_user_agent' setting here -- this will set a string like the following, +# to get by Cloudfront: +# Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) \ +# Chrome/39.0.2171.95 Safari/537.36 +# The default is to *not* fake the user agent, and instead use the default of +# DSpace-Python-REST-Client/x.y.z. +# To specify a custom user agent, set the USER_AGENT env variable and leave/set +# fake_user_agent as False +d = DSpaceClient( + api_endpoint=URL, username=USERNAME, password=PASSWORD, fake_user_agent=True +) + +# Authenticate against the DSpace client +authenticated = d.authenticate() +if not authenticated: + print("Error logging in! Giving up.") + sys.exit(1) + +# --- USER FUNCTIONS --- + +# Get users with pagination +logging.info("Fetching users...") +users = d.get_users(page=0, size=5) +for user in users: + print(f"User: {user.uuid}, Name: {user.name}, Email: {user.email}") + +# Get users using an iterator +logging.info("Fetching users via iterator...") +user_iter = d.get_users_iter() +for user in user_iter: + print(f"Iterated User: {user.uuid}, Name: {user.name}, Email: {user.email}") + +# Get a user by UUID +logging.info("Fetching user by UUID: %s", EPERSON_UUID) +user = d.get_user_by_uuid(EPERSON_UUID) +if user: + print(f"Fetched User: {user.uuid}, Name: {user.name}, Email: {user.email}") + +# Search for a user by email +logging.info("Searching user by email: %s", SEARCH_EMAIL) +user = d.search_user_by_email(SEARCH_EMAIL) +if user: + print(f"Found User: {user.uuid}, Name: {user.name}, Email: {user.email}") + +# Search users by metadata +logging.info("Searching users by metadata: %s", SEARCH_PERSON_QUERY) +users = d.search_users_by_metadata(query=SEARCH_PERSON_QUERY) +for user in users: + print(f"Matched User: {user.uuid}, Name: {user.name}, Email: {user.email}") + +# --- GROUP FUNCTIONS --- + +# Get groups with pagination +logging.info("Fetching groups...") +groups = d.get_groups(page=0, size=5) +for group in groups: + print(f"Group: {group.uuid}, Name: {group.name}") + +# Get a group by UUID +logging.info("Fetching group by UUID: %s", GROUP_UUID) +group = d.get_group_by_uuid(GROUP_UUID) +if group: + print(f"Fetched Group: {group.uuid}, Name: {group.name}") + +# Create a new group +logging.info("Creating a new group...") +new_group = d.create_group({"name": NEW_GROUP_NAME}) +print(new_group) +if new_group is not None: + print(f"Created Group: {new_group.uuid}, Name: {new_group.name}") + # Update group name + new_group_uuid = new_group.uuid + logging.info("Updating group name for %s...", new_group_uuid) + updated_group = d.update_group_name(new_group_uuid, UPDATED_GROUP_NAME) + if updated_group: + print(f"Updated Group: {updated_group.uuid}, Name: {updated_group.name}") +else: + print("""Error creating group! This may be due to a group with the same name already existing. + There is no update of the group name in this case.""") + +# Add a subgroup +logging.info("Adding subgroup %s to %s...", CHILD_GROUP_UUID, PARENT_GROUP_UUID) +if d.add_subgroup(PARENT_GROUP_UUID, CHILD_GROUP_UUID): + print(f"Subgroup {CHILD_GROUP_UUID} added to {PARENT_GROUP_UUID}") + +# Fetch subgroups +logging.info("Fetching subgroups of %s...", PARENT_GROUP_UUID) +subgroups = d.get_subgroups(PARENT_GROUP_UUID) +for subgroup in subgroups: + print(f"Subgroup: {subgroup.uuid}, Name: {subgroup.name}") + +# Remove a subgroup +logging.info("Removing subgroup %s from %s...", CHILD_GROUP_UUID, PARENT_GROUP_UUID) +if d.remove_subgroup(PARENT_GROUP_UUID, CHILD_GROUP_UUID): + print(f"Subgroup {CHILD_GROUP_UUID} removed from {PARENT_GROUP_UUID}") + +# Search groups by metadata +logging.info("Searching groups by metadata: %s", QUERY) +found_groups = d.search_groups_by_metadata(QUERY) +for group in found_groups: + print(f"Matched Group: {group.uuid}, Name: {group.name}") + +# Get EPersons in a group +logging.info("Fetching EPersons in group %s...", GROUP_UUID) +epersons = d.get_epersons_in_group(GROUP_UUID) +for eperson in epersons: + print(f"EPerson: {eperson.uuid}, Name: {eperson.name}, Email: {eperson.email}") + +# Add an EPerson to a group +logging.info("Adding EPerson %s to group %s...", EPERSON_UUID, GROUP_UUID) +if d.add_eperson_to_group(GROUP_UUID, EPERSON_UUID): + print(f"EPerson {EPERSON_UUID} added to group {GROUP_UUID}") + +# Remove an EPerson from a group +logging.info("Removing EPerson %s from group %s...", EPERSON_UUID, GROUP_UUID) +if d.remove_eperson_from_group(GROUP_UUID, EPERSON_UUID): + print(f"EPerson {EPERSON_UUID} removed from group {GROUP_UUID}") + +# Create a new person record +user = { + "canLogIn": True, + "email": "user@institution.edu", + "requireCertificate": False, + "metadata": { + "eperson.firstname": [{"value": "Test"}], + "eperson.lastname": [{"value": "Dummy"}], + }, +} +logging.info("Creating a new person record...") +new_person = d.create_user(user) +if new_person: + print(f"Created Person: {new_person.uuid}, Name: {new_person.name}")