|
9 | 9 | import os
|
10 | 10 | import platform
|
11 | 11 | import ssl
|
| 12 | +import sys |
| 13 | +import time |
12 | 14 | import uuid
|
13 | 15 | from collections.abc import Iterable
|
| 16 | +from pathlib import Path |
14 | 17 | from urllib.parse import urlencode, urlparse
|
15 | 18 |
|
16 | 19 | import certifi
|
17 | 20 | import grpc
|
18 | 21 | import sentry_sdk
|
| 22 | +from google.protobuf.json_format import MessageToJson, ParseDict |
19 | 23 |
|
20 | 24 | from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc
|
21 | 25 | from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError
|
|
27 | 31 | _DIODE_SENTRY_DSN_ENVVAR_NAME = "DIODE_SENTRY_DSN"
|
28 | 32 | _CLIENT_ID_ENVVAR_NAME = "DIODE_CLIENT_ID"
|
29 | 33 | _CLIENT_SECRET_ENVVAR_NAME = "DIODE_CLIENT_SECRET"
|
| 34 | +_DRY_RUN_OUTPUT_DIR_ENVVAR_NAME = "DIODE_DRY_RUN_OUTPUT_DIR" |
30 | 35 | _INGEST_SCOPE = "diode:ingest"
|
31 | 36 | _DEFAULT_STREAM = "latest"
|
32 | 37 | _LOGGER = logging.getLogger(__name__)
|
33 | 38 |
|
34 | 39 |
|
| 40 | +def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]: |
| 41 | + """Yield entities from a file with concatenated JSON messages.""" |
| 42 | + path = Path(file_path) |
| 43 | + with path.open("r") as fh: |
| 44 | + request = json.load(fh) |
| 45 | + req_pb = ingester_pb2.IngestRequest() |
| 46 | + ParseDict(request, req_pb) |
| 47 | + yield from req_pb.entities |
| 48 | + |
| 49 | + |
| 50 | +class DiodeClientInterface: |
| 51 | + """Runtime placeholder for the Diode client interface.""" |
| 52 | + |
| 53 | + pass |
| 54 | + |
| 55 | + |
35 | 56 | def _load_certs() -> bytes:
|
36 | 57 | """Loads cacert.pem."""
|
37 | 58 | with open(certifi.where(), "rb") as f:
|
@@ -82,7 +103,7 @@ def _get_optional_config_value(
|
82 | 103 | return value
|
83 | 104 |
|
84 | 105 |
|
85 |
| -class DiodeClient: |
| 106 | +class DiodeClient(DiodeClientInterface): |
86 | 107 | """Diode Client."""
|
87 | 108 |
|
88 | 109 | _name = "diode-sdk-python"
|
@@ -287,6 +308,77 @@ def _authenticate(self, scope: str):
|
287 | 308 | ) + [("authorization", f"Bearer {access_token}")]
|
288 | 309 |
|
289 | 310 |
|
| 311 | +class DiodeDryRunClient(DiodeClientInterface): |
| 312 | + """Client that outputs ingestion requests instead of sending them.""" |
| 313 | + |
| 314 | + _name = "diode-sdk-python-dry-run" |
| 315 | + _version = version_semver() |
| 316 | + _app_name = None |
| 317 | + _app_version = None |
| 318 | + |
| 319 | + def __init__(self, app_name: str = "dryrun", output_dir: str | None = None): |
| 320 | + """Initiate a new dry run client.""" |
| 321 | + self._output_dir = os.getenv(_DRY_RUN_OUTPUT_DIR_ENVVAR_NAME, output_dir) |
| 322 | + self._app_name = app_name |
| 323 | + |
| 324 | + @property |
| 325 | + def name(self) -> str: |
| 326 | + """Retrieve the name.""" |
| 327 | + return self._name |
| 328 | + |
| 329 | + @property |
| 330 | + def version(self) -> str: |
| 331 | + """Retrieve the version.""" |
| 332 | + return self._version |
| 333 | + |
| 334 | + @property |
| 335 | + def app_name(self) -> str: |
| 336 | + """Retrieve the app name.""" |
| 337 | + return self._app_name |
| 338 | + |
| 339 | + @property |
| 340 | + def output_dir(self) -> str | None: |
| 341 | + """Retrieve the dry run output dir.""" |
| 342 | + return self._output_dir |
| 343 | + |
| 344 | + def __enter__(self): |
| 345 | + """Enters the runtime context related to the channel object.""" |
| 346 | + return self |
| 347 | + |
| 348 | + def __exit__(self, exc_type, exc_value, exc_traceback): |
| 349 | + """Exits the runtime context related to the channel object.""" |
| 350 | + |
| 351 | + def ingest( |
| 352 | + self, |
| 353 | + entities: Iterable[Entity | ingester_pb2.Entity | None], |
| 354 | + stream: str | None = _DEFAULT_STREAM, |
| 355 | + ) -> ingester_pb2.IngestResponse: |
| 356 | + """Ingest entities in dry run mode.""" |
| 357 | + request = ingester_pb2.IngestRequest( |
| 358 | + stream=stream, |
| 359 | + id=str(uuid.uuid4()), |
| 360 | + producer_app_name=self._app_name, |
| 361 | + entities=entities, |
| 362 | + sdk_name=self.name, |
| 363 | + sdk_version=self.version, |
| 364 | + ) |
| 365 | + |
| 366 | + output = MessageToJson(request, preserving_proto_field_name=True) |
| 367 | + if self._output_dir: |
| 368 | + timestamp = time.perf_counter_ns() |
| 369 | + path = Path(self._output_dir) |
| 370 | + path.mkdir(parents=True, exist_ok=True) |
| 371 | + filename = "".join( |
| 372 | + c if c.isalnum() or c in ("_", "-") else "_" for c in self._app_name |
| 373 | + ) |
| 374 | + file_path = path / f"{filename}_{timestamp}.json" |
| 375 | + with file_path.open("w") as fh: |
| 376 | + fh.write(output) |
| 377 | + else: |
| 378 | + print(output, file=sys.stdout) |
| 379 | + return ingester_pb2.IngestResponse() |
| 380 | + |
| 381 | + |
290 | 382 | class _DiodeAuthentication:
|
291 | 383 | def __init__(
|
292 | 384 | self,
|
|
0 commit comments