Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions passboltapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ def __init__(
config_path: Optional[str] = None,
new_keys: bool = False,
delete_old_keys: bool = False,
check_certificate: bool = False,
):
"""
:param config: Config as a dictionary
:param config_path: Path to the config file.
:param delete_old_keys: Set true if old keys need to be deleted
:param check_certificate: enables SSL server certificate check.
"""
self.config = config
if config_path:
self.config = configparser.ConfigParser()
self.config.read_file(open(config_path, "r"))
self.requests_session = requests.Session()
self.requests_session.verify = check_certificate

if not self.config:
raise ValueError("Missing config. Provide config as dictionary or path to configuration file.")
Expand Down Expand Up @@ -155,9 +158,7 @@ def get(self, url, return_response_object=False, **kwargs):
r = self.requests_session.get(self.server_url + url, headers=self.get_headers(), **kwargs)
try:
r.raise_for_status()
if return_response_object:
return r
return r.json()
return r if return_response_object else r.json()
except requests.exceptions.HTTPError as e:
logging.error(r.text)
raise e
Expand All @@ -166,9 +167,7 @@ def put(self, url, data, return_response_object=False, **kwargs):
r = self.requests_session.put(self.server_url + url, json=data, headers=self.get_headers(), **kwargs)
try:
r.raise_for_status()
if return_response_object:
return r
return r.json()
return r if return_response_object else r.json()
except requests.exceptions.HTTPError as e:
logging.error(r.text)
raise e
Expand All @@ -177,9 +176,7 @@ def post(self, url, data, return_response_object=False, **kwargs):
r = self.requests_session.post(self.server_url + url, json=data, headers=self.get_headers(), **kwargs)
try:
r.raise_for_status()
if return_response_object:
return r
return r.json()
return r if return_response_object else r.json()
except requests.exceptions.HTTPError as e:
logging.error(r.text)
raise e
Expand Down Expand Up @@ -244,12 +241,10 @@ def iterate_resources(self, params: Optional[dict] = None):
params = params or {}
url_params = urllib.parse.urlencode(params)
if url_params:
url_params = "?" + url_params
response = self.get("/resources.json" + url_params)
url_params = f"?{url_params}"
response = self.get(f"/resources.json{url_params}")
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"
resources = response["body"]
for resource in resources:
yield resource
yield from response["body"]

def list_resources(self, folder_id: Optional[PassboltFolderIdType] = None):
params = {
Expand All @@ -258,8 +253,8 @@ def list_resources(self, folder_id: Optional[PassboltFolderIdType] = None):
}
url_params = urllib.parse.urlencode(params)
if url_params:
url_params = "?" + url_params
response = self.get("/folders.json" + url_params)
url_params = f"?{url_params}"
response = self.get(f"/folders.json{url_params}")
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"
response = response["body"][0]
assert "children_resources" in response.keys(), (
Expand Down Expand Up @@ -289,7 +284,7 @@ def list_users(
else:
params = {"filter[has-access]": resource_or_folder_id, "contain[user]": 1}
params["contain[permission]"] = True
response = self.get(f"/users.json", params=params)
response = self.get("/users.json", params=params)
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"
response = response["body"]
users = constructor(
Expand Down Expand Up @@ -462,8 +457,11 @@ def update_resource(
payload["secrets"] = self._encrypt_secrets(secret_text=secret_text, recipients=recipients)

if payload:
r = self.put(f"/resources/{resource_id}.json", payload, return_response_object=True)
return r
return self.put(
f"/resources/{resource_id}.json",
payload,
return_response_object=True,
)

def describe_group(self, group_id: PassboltGroupIdType):
response = self.get(f"/groups/{group_id}.json", params={"contain[groups_users]": 1})
Expand Down
10 changes: 4 additions & 6 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def get_my_passwords(passbolt_obj):
result = list()
result = []
for i in passbolt_obj.get(url="/resources.json?api-version=v2")["body"]:
result.append({
"id": i["id"],
Expand All @@ -14,8 +14,7 @@ def get_my_passwords(passbolt_obj):
})
print(i)
for i in result:
resource = passbolt_obj.get(
"/secrets/resource/{}.json?api-version=v2".format(i["id"]))
resource = passbolt_obj.get(f'/secrets/resource/{i["id"]}.json?api-version=v2')
i["password"] = passbolt_obj.decrypt(resource["body"]["data"])
print(result)

Expand All @@ -24,7 +23,7 @@ def get_passwords_basic():
# A simple example to show how to retrieve passwords of a user.
# Note the config file is placed in the project directory.
passbolt_obj = passboltapi.PassboltAPI(config_path="config.ini")
result = list()
result = []
for i in passbolt_obj.get(url="/resources.json?api-version=v2")["body"]:
result.append({
"id": i["id"],
Expand All @@ -34,8 +33,7 @@ def get_passwords_basic():
})
print(i)
for i in result:
resource = passbolt_obj.get(
"/secrets/resource/{}.json?api-version=v2".format(i["id"]))
resource = passbolt_obj.get(f'/secrets/resource/{i["id"]}.json?api-version=v2')
i["password"] = passbolt_obj.decrypt(resource["body"]["data"])
print(result)
passbolt_obj.close_session()
Expand Down