Skip to content

Commit 950baf7

Browse files
committed
add basic injector_contract client
Signed-off-by: Antoine MAZEAS <antoine.mazeas@filigran.io>
1 parent 7e5c86b commit 950baf7

File tree

8 files changed

+200
-0
lines changed

8 files changed

+200
-0
lines changed

pyobas/apis/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .inject_expectation import * # noqa: F401,F403
88
from .inject_expectation_trace import * # noqa: F401,F403
99
from .injector import * # noqa: F401,F403
10+
from .injector_contract import * # noqa: F401,F403
1011
from .kill_chain_phase import * # noqa: F401,F403
1112
from .me import * # noqa: F401,F403
1213
from .organization import * # noqa: F401,F403

pyobas/apis/injector_contract.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import json
2+
from typing import Any, Dict
3+
4+
from pyobas import exceptions as exc
5+
from pyobas.apis.inputs.search import InjectorContractSearchPaginationInput
6+
from pyobas.base import RESTManager, RESTObject
7+
from pyobas.mixins import CreateMixin, DeleteMixin, UpdateMixin
8+
from pyobas.utils import RequiredOptional
9+
10+
11+
class InjectorContract(RESTObject):
12+
pass
13+
14+
15+
class InjectorContractManager(CreateMixin, UpdateMixin, DeleteMixin, RESTManager):
16+
_path = "/injector_contracts"
17+
_obj_cls = InjectorContract
18+
_create_attrs = (
19+
RequiredOptional(
20+
required=(
21+
"contract_content",
22+
"contract_id",
23+
"contract_labels",
24+
"injector_id",
25+
),
26+
optional=(
27+
"contract_attack_patterns_ids",
28+
"contract_attack_patterns_external_ids",
29+
"contract_manual",
30+
"contract_platforms",
31+
"external_contract_id",
32+
"is_atomic_testing",
33+
),
34+
),
35+
)
36+
_update_attrs = (
37+
RequiredOptional(
38+
required=(
39+
"contract_content",
40+
"contract_labels",
41+
),
42+
optional=(
43+
"contract_attack_patterns_ids",
44+
"contract_manual",
45+
"contract_platforms",
46+
"is_atomic_testing",
47+
),
48+
),
49+
)
50+
51+
@exc.on_http_error(exc.OpenBASUpdateError)
52+
def search(
53+
self, input: InjectorContractSearchPaginationInput, **kwargs: Any
54+
) -> Dict[str, Any]:
55+
path = f"{self.path}/search"
56+
# force the serialisation here since we only need a naive serialisation to json
57+
result = self.openbas.http_post(
58+
path, post_data=json.dumps(input, default=lambda x: x.__dict__), **kwargs
59+
)
60+
return result
61+
62+
@exc.on_http_error(exc.OpenBASUpdateError)
63+
def upsert(self, user: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
64+
path = f"{self.path}/upsert"
65+
result = self.openbas.http_post(path, post_data=user, **kwargs)
66+
return result

pyobas/apis/inputs/__init__.py

Whitespace-only changes.

pyobas/apis/inputs/search.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from typing import Dict, List
2+
3+
4+
class Filter:
5+
def __init__(self, key: str, mode: str, operator: str, values: List[str]):
6+
self.key = key
7+
self.mode = mode
8+
self.operator = operator
9+
self.values = values
10+
11+
12+
class FilterGroup:
13+
def __init__(self, mode: str, filters: List[Filter]):
14+
self.mode = mode
15+
self.filters = filters
16+
17+
18+
class SearchPaginationInput:
19+
def __init__(
20+
self,
21+
page: int,
22+
size: int,
23+
filter_group: FilterGroup,
24+
text_search: str,
25+
sorts: Dict[str, str],
26+
):
27+
self.size = size
28+
self.page = page
29+
self.filter_group = filter_group
30+
self.text_search = text_search
31+
self.sorts = sorts
32+
33+
34+
class InjectorContractSearchPaginationInput(SearchPaginationInput):
35+
def __init__(
36+
self,
37+
page: int,
38+
size: int,
39+
filter_group: FilterGroup,
40+
text_search: str,
41+
sorts: Dict[str, str],
42+
full_details: bool = True,
43+
):
44+
super().__init__(page, size, filter_group, text_search, sorts)
45+
self.full_details = full_details

pyobas/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def __init__(
6161
self.collector = apis.CollectorManager(self)
6262
self.cve = apis.CveManager(self)
6363
self.inject = apis.InjectManager(self)
64+
self.injector_contract = apis.InjectorContractManager(self)
6465
self.document = apis.DocumentManager(self)
6566
self.kill_chain_phase = apis.KillChainPhaseManager(self)
6667
self.attack_pattern = apis.AttackPatternManager(self)

pyobas/mixins.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,27 @@ def create(
216216
assert not isinstance(server_data, requests.Response)
217217
assert self._obj_cls is not None
218218
return self._obj_cls(self, server_data)
219+
220+
221+
class DeleteMixin(_RestManagerBase):
222+
_computed_path: Optional[str]
223+
_from_parent_attrs: Dict[str, Any]
224+
_obj_cls: Optional[Type[base.RESTObject]]
225+
_parent: Optional[base.RESTObject]
226+
_parent_attrs: Dict[str, Any]
227+
_path: Optional[str]
228+
openbas: pyobas.OpenBAS
229+
230+
@exc.on_http_error(exc.OpenBASCreateError)
231+
def delete(
232+
self, id: Optional[Union[str, int]] = None, **kwargs: Any
233+
) -> requests.Response:
234+
if id is None:
235+
path = self.path
236+
else:
237+
path = f"{self.path}/{utils.EncodedId(id)}"
238+
239+
result = self.openbas.http_delete(path, **kwargs)
240+
if TYPE_CHECKING:
241+
assert isinstance(result, requests.Response)
242+
return result

test/apis/injector_contract/__init__.py

Whitespace-only changes.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import json
2+
from unittest import TestCase, main, mock
3+
from unittest.mock import ANY
4+
5+
from pyobas import OpenBAS
6+
from pyobas.apis.inputs.search import (
7+
Filter,
8+
FilterGroup,
9+
InjectorContractSearchPaginationInput,
10+
)
11+
12+
13+
def create_mock_api_client():
14+
return mock.MagicMock()
15+
16+
17+
def mock_response(**kwargs):
18+
class MockResponse:
19+
def __init__(self, json_data, status_code):
20+
self.json_data = json_data
21+
self.status_code = status_code
22+
self.history = None
23+
self.content = None
24+
self.headers = {"Content-Type": "application/json"}
25+
26+
def json(self):
27+
return self.json_data
28+
29+
return MockResponse(None, 200)
30+
31+
32+
class TestInjectorContract(TestCase):
33+
@mock.patch("requests.Session.request", side_effect=mock_response)
34+
def test_search_input_correctly_serialised(self, mock_request):
35+
api_client = OpenBAS("url", "token")
36+
37+
search_input = InjectorContractSearchPaginationInput(
38+
0,
39+
20,
40+
FilterGroup("or", [Filter("prop", "and", "eq", ["titi", "toto"])]),
41+
None,
42+
None,
43+
)
44+
45+
expected_json = json.dumps(search_input, default=lambda x: x.__dict__)
46+
api_client.injector_contract.search(search_input)
47+
48+
mock_request.assert_called_once_with(
49+
method="post",
50+
url="url/api/injector_contracts/search",
51+
params={},
52+
data=None,
53+
timeout=None,
54+
stream=False,
55+
verify=True,
56+
json=expected_json,
57+
headers=ANY,
58+
auth=ANY,
59+
)
60+
61+
62+
if __name__ == "__main__":
63+
main()

0 commit comments

Comments
 (0)