diff --git a/src/clearpass/client.py b/src/clearpass/client.py index 59dd2c3..c25c4fb 100644 --- a/src/clearpass/client.py +++ b/src/clearpass/client.py @@ -1,8 +1,14 @@ +import json import logging import re import requests -import json -import urllib.parse + +from urllib.parse import ( + SplitResult, + quote, + urlencode, + urlunsplit, +) from clearpass.exceptions import TokenError @@ -54,6 +60,7 @@ def hyphenate_mac(macstring): class APIConnection(): def __init__(self, username, password, endpoint, client_id, client_secret): self._baseurl = f"https://{endpoint}/" + self._endpoint = endpoint self._authurl = f"{self._baseurl}api/oauth" self._authpayload = { @@ -111,19 +118,36 @@ def postheaders(self): self._postheaders.update(self.getheaders) return self._postheaders - def _put_api(self, resource, payload): + def _put_api(self, resource, **kwargs): return requests.put( url=f"{self._baseurl}api/{resource}", headers=self.postheaders, - data=json.dumps(payload), verify=False, + **kwargs + ) + + def _post_api(self, resource, **kwargs): + return requests.post( + url=f"{self._baseurl}api/{resource}", + headers=self.postheaders, + verify=False, + **kwargs ) - def _get_api(self, resource, filter=None): + def _get_api(self, resource, **kwargs): url = f"{self._baseurl}api/{resource}" - if filter: - url += f"?filter={urllib.parse.quote(json.dumps(filter))}" + params = {} + for parameter, value in kwargs.items(): + if value is not None: + params[parameter] = json.dumps(value) + url = urlunsplit(SplitResult( + scheme="https", + netloc=f"{self._endpoint}", + path=f"api/{resource}", + query=urlencode(params, quote_via=quote), + fragment=None, + )) return requests.get( url=url, headers=self.getheaders, @@ -176,7 +200,7 @@ def set_mac_address( if attributes is not None: data["attributes"] = attributes - return self._put_api(f"endpoint/{mac_id}", data) + return self._put_api(f"endpoint/{mac_id}", json=data) def enable_mac_address(self, mac): mac_id = self.get_mac_id(mac) @@ -196,3 +220,16 @@ def disable_mac_address(self, mac, disabled_by, reason): if res.status_code == 404: raise ValueError(f"{mac} not found.") return res + + def disconnect_mac_address(self, mac): + res = self._post_api( + f"session-action/disconnect/mac/{mac}", + json={} + ) + if res.status_code == 404: + raise ValueError(f"{mac} not found.") + return res + + def block_mac_address(self, mac, disabled_by, reason): + self.disable_mac_address(mac, disabled_by, reason) + self.disconnect_mac_address(mac) diff --git a/tests/conftest.py b/tests/conftest.py index 617bc56..62ef1e5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,10 +23,11 @@ # To record, `export VCR_RECORD=True` VCR_RECORD = "VCR_RECORD" in os.environ MAC_404 = 'deadbeef1234' # pragma: allowlist secret -TEST_DATA = {'mac': '123123123123', # pragma: allowlist secret - 'disabled_by': 'TESTING', - 'reason': 'Still testing...' - } +TEST_DATA = { + 'mac': '123123123123', # pragma: allowlist secret + 'disabled_by': 'TESTING', + 'reason': 'Still testing...' +} @pytest.fixture diff --git a/tests/test_connector.py b/tests/test_connector.py index df76dc8..c780c69 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -1,3 +1,4 @@ +import os import pytest from conftest import TEST_DATA, MAC_404 @@ -39,3 +40,9 @@ def test_404_enable_mac(cassette, clearpass_client): def test_get_info_for_mac_address(cassette, clearpass_client): result = clearpass_client.get_info_for_mac_address(mac=TEST_DATA['mac']) assert result + + +def test_disconnect_mac_address(cassette, clearpass_client): + result = clearpass_client.disconnect_mac_address( + mac=os.environ["CLEARPASS_MAC"]) + assert result