Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions ddtestpy/internal/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ddtestpy.internal.constants import EMPTY_NAME
from ddtestpy.internal.git import GitTag
from ddtestpy.internal.http import BackendConnector
from ddtestpy.internal.http import BackendConnectorSetup
from ddtestpy.internal.http import FileAttachment
from ddtestpy.internal.test_data import ITRSkippingLevel
from ddtestpy.internal.test_data import ModuleRef
Expand All @@ -24,25 +24,19 @@
class APIClient:
def __init__(
self,
site: str,
api_key: str,
service: str,
env: str,
env_tags: t.Dict[str, str],
itr_skipping_level: ITRSkippingLevel,
configurations: t.Dict[str, str],
connector_setup: BackendConnectorSetup,
) -> None:
self.site = site
self.api_key = api_key
self.service = service
self.env = env
self.env_tags = env_tags
self.itr_skipping_level = itr_skipping_level
self.configurations = configurations

self.base_url = f"https://api.{self.site}"

self.connector = BackendConnector(host=f"api.{self.site}", default_headers={"dd-api-key": self.api_key})
self.connector = connector_setup.get_connector_for_subdomain("api")

def close(self) -> None:
self.connector.close()
Expand Down
3 changes: 3 additions & 0 deletions ddtestpy/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
DEFAULT_ENV_NAME = "none"
DEFAULT_SITE = "datadoghq.com"

DEFAULT_AGENT_HOSTNAME = "localhost"
DEFAULT_AGENT_PORT = 8126

TAG_TRUE = "true"
TAG_FALSE = "false"

Expand Down
2 changes: 2 additions & 0 deletions ddtestpy/internal/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class SetupError(Exception):
pass
161 changes: 153 additions & 8 deletions ddtestpy/internal/http.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,197 @@
from __future__ import annotations

from abc import abstractmethod
from dataclasses import dataclass
import gzip
import http.client
import io
import json
import logging
import os
import threading
import time
import typing as t
from urllib.parse import ParseResult
from urllib.parse import urlparse
import uuid

from ddtestpy.internal.constants import DEFAULT_AGENT_HOSTNAME
from ddtestpy.internal.constants import DEFAULT_AGENT_PORT
from ddtestpy.internal.constants import DEFAULT_SITE
from ddtestpy.internal.errors import SetupError
from ddtestpy.internal.utils import asbool


DEFAULT_TIMEOUT_SECONDS = 15.0

log = logging.getLogger(__name__)


class BackendConnectorSetup:
"""
Logic for detecting the backend connection mode (agentless or EVP) and creating new connectors.
"""

@abstractmethod
def get_connector_for_subdomain(self, subdomain: str) -> BackendConnector:
"""
Return a backend connector for the given subdomain (e.g., api, citestcov-intake, citestcycle-intake).

This method must be implemented for each backend connection mode subclass.
"""
pass

@classmethod
def detect_setup(cls) -> BackendConnectorSetup:
"""
Detect which backend connection mode to use and return a configured instance of the corresponding subclass.
"""
if asbool(os.environ.get("DD_CIVISIBILITY_AGENTLESS_ENABLED")):
log.info("Connecting to backend in agentless mode")
return cls._detect_agentless_setup()

else:
log.info("Connecting to backend through agent in EVP proxy mode")
return cls._detect_evp_proxy_setup()

@classmethod
def _detect_agentless_setup(cls) -> BackendConnectorSetup:
"""
Detect settings for agentless backend connection mode.
"""
site = os.environ.get("DD_SITE") or DEFAULT_SITE
api_key = os.environ.get("DD_API_KEY")

if not api_key:
raise SetupError("DD_API_KEY environment variable is not set")

return BackendConnectorAgentlessSetup(site=site, api_key=api_key)

@classmethod
def _detect_evp_proxy_setup(cls) -> BackendConnectorSetup:
"""
Detect settings for EVP proxy mode backend connection mode.
"""
agent_url = os.environ.get("DD_TRACE_AGENT_URL")
if not agent_url:
agent_host = (
os.environ.get("DD_TRACE_AGENT_HOSTNAME") or os.environ.get("DD_AGENT_HOST") or DEFAULT_AGENT_HOSTNAME
)
agent_port = (
os.environ.get("DD_TRACE_AGENT_PORT") or os.environ.get("DD_AGENT_PORT") or str(DEFAULT_AGENT_PORT)
)
agent_url = f"http://{agent_host}:{agent_port}"

agent_url = agent_url.rstrip("/") # Avoid an extra / when concatenating with the base path

# Get info from agent to check if the agent is there, and which EVP proxy version it supports.
try:
connector = BackendConnector(agent_url)
response, response_data = connector.get_json("/info")
endpoints = response_data.get("endpoints", [])
connector.close()
except Exception as e:
raise SetupError(f"Error connecting to Datadog agent at {agent_url}: {e}")

if response.status != 200:
raise SetupError(
f"Error connecting to Datadog agent at {agent_url}: status {response.status}, "
f"response {response_data!r}"
)

if "/evp_proxy/v4/" in endpoints:
return BackendConnectorEVPProxySetup(url=f"{agent_url}/evp_proxy/v4", use_gzip=True)

if "/evp_proxy/v2/" in endpoints:
return BackendConnectorEVPProxySetup(url=f"{agent_url}/evp_proxy/v2", use_gzip=False)

raise SetupError(f"Datadog agent at {agent_url} does not support EVP proxy mode")


class BackendConnectorAgentlessSetup(BackendConnectorSetup):
def __init__(self, site: str, api_key: str) -> None:
self.site = site
self.api_key = api_key

def get_connector_for_subdomain(self, subdomain: str) -> BackendConnector:
return BackendConnector(
url=f"https://{subdomain}.{self.site}",
default_headers={"dd-api-key": self.api_key},
use_gzip=True,
)


class BackendConnectorEVPProxySetup(BackendConnectorSetup):
def __init__(self, url: str, use_gzip: bool) -> None:
self.url = url
self.use_gzip = use_gzip

def get_connector_for_subdomain(self, subdomain: str) -> BackendConnector:
return BackendConnector(
url=self.url,
default_headers={"X-Datadog-EVP-Subdomain": subdomain},
use_gzip=self.use_gzip,
)


class BackendConnector(threading.local):
def __init__(
self,
host: str,
port: int = 443,
url: str,
default_headers: t.Optional[t.Dict[str, str]] = None,
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
accept_gzip: bool = True,
use_gzip: bool = False,
):
self.conn = http.client.HTTPSConnection(host=host, port=port, timeout=timeout_seconds)
parsed_url = urlparse(url)
self.conn = self._make_connection(parsed_url, timeout_seconds)
self.default_headers = default_headers or {}
if accept_gzip:
self.base_path = parsed_url.path.rstrip("/")
self.use_gzip = use_gzip
if self.use_gzip:
self.default_headers["Accept-Encoding"] = "gzip"

def close(self) -> None:
self.conn.close()

def _make_connection(self, parsed_url: ParseResult, timeout_seconds: float) -> http.client.HTTPConnection:
if parsed_url.scheme == "http":
if not parsed_url.hostname:
raise SetupError(f"No hostname provided in {parsed_url.geturl()}")

return http.client.HTTPConnection(
host=parsed_url.hostname, port=parsed_url.port or 80, timeout=timeout_seconds
)

if parsed_url.scheme == "https":
if not parsed_url.hostname:
raise SetupError(f"No hostname provided in {parsed_url.geturl()}")

return http.client.HTTPSConnection(
host=parsed_url.hostname, port=parsed_url.port or 443, timeout=timeout_seconds
)

# TODO: Unix domain socket support.

raise SetupError(f"Unknown scheme {parsed_url.scheme} in {parsed_url.geturl()}")

# TODO: handle retries
def request(
self,
method: str,
path: str,
data: bytes,
data: t.Optional[bytes] = None,
headers: t.Optional[t.Dict[str, str]] = None,
send_gzip: bool = False,
) -> t.Tuple[http.client.HTTPResponse, bytes]:
full_headers = self.default_headers | (headers or {})

if send_gzip:
if send_gzip and self.use_gzip and data is not None:
data = gzip.compress(data, compresslevel=6)
full_headers["Content-Encoding"] = "gzip"

start_time = time.time()

self.conn.request(method, path, body=data, headers=full_headers)
self.conn.request(method, self.base_path + path, body=data, headers=full_headers)

response = self.conn.getresponse()
if response.headers.get("Content-Encoding") == "gzip":
Expand All @@ -67,6 +207,11 @@ def request(

return response, response_data

def get_json(self, path: str, headers: t.Optional[t.Dict[str, str]] = None, send_gzip: bool = False) -> t.Any:
headers = {"Content-Type": "application/json"} | (headers or {})
response, response_data = self.request("GET", path=path, headers=headers, send_gzip=send_gzip)
return response, json.loads(response_data)

def post_json(
self, path: str, data: t.Any, headers: t.Optional[t.Dict[str, str]] = None, send_gzip: bool = False
) -> t.Any:
Expand Down
9 changes: 8 additions & 1 deletion ddtestpy/internal/pytest/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ddtestpy.internal.coverage_api import install_coverage
from ddtestpy.internal.ddtrace import install_global_trace_filter
from ddtestpy.internal.ddtrace import trace_context
from ddtestpy.internal.errors import SetupError
from ddtestpy.internal.git import get_workspace_path
from ddtestpy.internal.logging import catch_and_log_exceptions
from ddtestpy.internal.logging import setup_logging
Expand Down Expand Up @@ -628,7 +629,13 @@ def pytest_load_initial_conftests(
test_framework="pytest",
test_framework_version=pytest.__version__,
)
session_manager = SessionManager(session=session)

try:
session_manager = SessionManager(session=session)
except SetupError as e:
log.error("%s", e)
yield
return

early_config.stash[SESSION_MANAGER_STASH_KEY] = session_manager

Expand Down
14 changes: 5 additions & 9 deletions ddtestpy/internal/session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from ddtestpy.internal.codeowners import Codeowners
from ddtestpy.internal.constants import DEFAULT_ENV_NAME
from ddtestpy.internal.constants import DEFAULT_SERVICE_NAME
from ddtestpy.internal.constants import DEFAULT_SITE
from ddtestpy.internal.env_tags import get_env_tags
from ddtestpy.internal.git import Git
from ddtestpy.internal.git import GitTag
from ddtestpy.internal.http import BackendConnectorSetup
from ddtestpy.internal.platform import get_platform_tags
from ddtestpy.internal.retry_handlers import AttemptToFixHandler
from ddtestpy.internal.retry_handlers import AutoTestRetriesHandler
Expand Down Expand Up @@ -61,20 +61,16 @@ def __init__(self, session: TestSession) -> None:
self.service = _get_service_name_from_git_repo(self.env_tags) or DEFAULT_SERVICE_NAME

self.env = os.environ.get("DD_ENV") or DEFAULT_ENV_NAME
self.site = os.environ.get("DD_SITE") or DEFAULT_SITE
self.api_key = os.environ.get("DD_API_KEY")

if not self.api_key:
raise RuntimeError("DD_API_KEY environment variable is not set")
self.connector_setup = BackendConnectorSetup.detect_setup()

self.api_client = APIClient(
site=self.site,
api_key=self.api_key,
service=self.service,
env=self.env,
env_tags=self.env_tags,
itr_skipping_level=self.itr_skipping_level,
configurations=self.platform_tags,
connector_setup=self.connector_setup,
)
self.settings = self.api_client.get_settings()
self.known_tests = self.api_client.get_known_tests() if self.settings.known_tests_enabled else set()
Expand All @@ -93,8 +89,8 @@ def __init__(self, session: TestSession) -> None:
# Retry handlers must be set up after collection phase for EFD faulty session logic to work.
self.retry_handlers: t.List[RetryHandler] = []

self.writer = TestOptWriter(site=self.site, api_key=self.api_key)
self.coverage_writer = TestCoverageWriter(site=self.site, api_key=self.api_key)
self.writer = TestOptWriter(connector_setup=self.connector_setup)
self.coverage_writer = TestCoverageWriter(connector_setup=self.connector_setup)
self.session = session
self.session.set_service(self.service)

Expand Down
25 changes: 8 additions & 17 deletions ddtestpy/internal/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import msgpack # type: ignore

import ddtestpy
from ddtestpy.internal.http import BackendConnector
from ddtestpy.internal.http import BackendConnectorSetup
from ddtestpy.internal.http import FileAttachment
from ddtestpy.internal.test_data import TestItem
from ddtestpy.internal.test_data import TestModule
Expand All @@ -31,10 +31,7 @@ class Event(dict[str, t.Any]):


class BaseWriter(ABC):
def __init__(self, site: str, api_key: str) -> None:
self.site = site
self.api_key = api_key

def __init__(self) -> None:
self.lock = threading.RLock()
self.should_finish = threading.Event()
self.flush_interval_seconds = 60
Expand Down Expand Up @@ -86,8 +83,8 @@ def _send_events(self, events: t.List[Event]) -> None:
class TestOptWriter(BaseWriter):
__test__ = False

def __init__(self, site: str, api_key: str) -> None:
super().__init__(site=site, api_key=api_key)
def __init__(self, connector_setup: BackendConnectorSetup) -> None:
super().__init__()

self.metadata: t.Dict[str, t.Dict[str, str]] = {
"*": {
Expand All @@ -108,10 +105,7 @@ def __init__(self, site: str, api_key: str) -> None:
},
}

self.connector = BackendConnector(
host=f"citestcycle-intake.{self.site}",
default_headers={"dd-api-key": self.api_key},
)
self.connector = connector_setup.get_connector_for_subdomain("citestcycle-intake")

self.serializers: t.Dict[t.Type[TestItem[t.Any, t.Any]], EventSerializer[t.Any]] = {
TestRun: serialize_test_run,
Expand Down Expand Up @@ -142,13 +136,10 @@ def _send_events(self, events: t.List[Event]) -> None:
class TestCoverageWriter(BaseWriter):
__test__ = False

def __init__(self, site: str, api_key: str) -> None:
super().__init__(site=site, api_key=api_key)
def __init__(self, connector_setup: BackendConnectorSetup) -> None:
super().__init__()

self.connector = BackendConnector(
host=f"citestcov-intake.{self.site}",
default_headers={"dd-api-key": self.api_key},
)
self.connector = connector_setup.get_connector_for_subdomain("citestcov-intake")

def put_coverage(self, test_run: TestRun, coverage_bitmaps: t.Iterable[t.Tuple[str, bytes]]) -> None:
files = [{"filename": pathname, "bitmap": bitmap} for pathname, bitmap in coverage_bitmaps]
Expand Down
Loading