diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9e87cfc019b..872dac388e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -258,6 +258,12 @@ jobs: SNUBA_IMAGE=ghcr.io/getsentry/snuba-ci:${{ github.event.pull_request.head.sha || github.sha }} SNUBA_SETTINGS=test_initialization TEST_LOCATION=test_initialization docker compose -f docker-compose.gcb.yml run --rm snuba-test if: ${{ matrix.snuba_settings == 'test' }} + - name: Docker Snuba E2E Consumer Tests + run: | + SNUBA_IMAGE=ghcr.io/getsentry/snuba-ci:${{ github.event.pull_request.head.sha || github.sha }} SNUBA_SETTINGS=test_initialization TEST_LOCATION=test_consumer_e2e docker compose -f docker-compose.gcb.yml run --rm snuba-test + if: ${{ matrix.snuba_settings == 'test' }} + + - name: Upload test results to Codecov if: ${{ !cancelled() }} uses: codecov/test-results-action@v1 diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index 89c81c0aa32..0814e9cddac 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -200,6 +200,12 @@ type=click.Choice(["v1", "v2"]), help="Specify which consumer version to use, v1 is stable, v2 is experimental", ) +@click.option( + "--raw-events-topic", + default=None, + type=str, + help="override the raw events topic ", +) def rust_consumer( *, storage_names: Sequence[str], @@ -236,10 +242,7 @@ def rust_consumer( join_timeout_ms: Optional[int], consumer_version: Optional[str], ) -> None: - """ - Experimental alternative to `snuba consumer` - """ - + breakpoint() consumer_config = resolve_consumer_config( storage_names=storage_names, raw_topic=raw_events_topic, diff --git a/test_consumer_e2e/__init__.py b/test_consumer_e2e/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test_consumer_e2e/test_items_consumer.py b/test_consumer_e2e/test_items_consumer.py new file mode 100644 index 00000000000..0fe78415a4f --- /dev/null +++ b/test_consumer_e2e/test_items_consumer.py @@ -0,0 +1,141 @@ +import subprocess +import time +import uuid +from contextlib import contextmanager +from datetime import UTC, datetime, timedelta +from typing import Generator, Optional + +from confluent_kafka import Message as KafkaMessage +from confluent_kafka import Producer +from confluent_kafka.admin import AdminClient, NewTopic +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem + +from snuba.clusters.cluster import ClickhouseClientSettings +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey + +# Kafka configuration +kafka_config = {"bootstrap.servers": "localhost:9092", "client.id": "items_producer"} + +producer = Producer(kafka_config) + + +def generate_item_message(start_timestamp: Optional[datetime] = None) -> bytes: + if start_timestamp is None: + start_timestamp = datetime.now(tz=UTC) + + item_timestamp = Timestamp() + item_timestamp.FromDatetime(start_timestamp) + + received = Timestamp() + received.GetCurrentTime() + + end_timestamp = start_timestamp + timedelta(seconds=1) + + attributes = { + "category": AnyValue(string_value="http"), + "description": AnyValue(string_value="/api/0/events/"), + "environment": AnyValue(string_value="production"), + "http.status_code": AnyValue(string_value="200"), + "op": AnyValue(string_value="http.server"), + "platform": AnyValue(string_value="python"), + "sentry.received": AnyValue(double_value=received.seconds), + "sentry.start_timestamp_precise": AnyValue( + double_value=start_timestamp.timestamp() + ), + "sentry.end_timestamp_precise": AnyValue( + double_value=end_timestamp.timestamp() + ), + "start_timestamp_ms": AnyValue( + double_value=int(start_timestamp.timestamp() * 1000) + ), + } + + return TraceItem( + organization_id=1, + project_id=1, + item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + timestamp=item_timestamp, + trace_id=uuid.uuid4().hex, + item_id=uuid.uuid4().int.to_bytes(16, byteorder="little"), + received=received, + retention_days=90, + server_sample_rate=1.0, + attributes=attributes, + ).SerializeToString() + + +@contextmanager +def tmp_kafka_topic() -> Generator[str, None, None]: + # Create a unique topic name + topic_name = f"snuba-items-{uuid.uuid4().hex}" + + # Create the topic + admin_client = AdminClient({"bootstrap.servers": "localhost:9092"}) + new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1) + admin_client.create_topics([new_topic]) + + try: + yield topic_name + finally: + # Clean up the topic + admin_client.delete_topics([topic_name]) + + +def test_items_consumer() -> None: + # Launch the consumer process + with tmp_kafka_topic() as topic: + consumer_process = subprocess.Popen( + [ + "snuba", + "rust-consumer", + "--raw-events-topic", + topic, + "--consumer-version", + "v2", + "--consumer-group", + f"eap_items_consumer_{uuid.uuid4().hex}", + "--storage", + "eap_items", + "--no-strict-offset-reset", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + storage = get_storage(StorageKey("eap_items")) + storage.get_cluster().get_query_connection( + ClickhouseClientSettings.QUERY + ).execute("TRUNCATE TABLE IF EXISTS eap_items_1_local") + + # Wait for consumer to initialize + time.sleep(2) + num_items = 1000 + + def delivery_report(err: Optional[Exception], msg: KafkaMessage) -> None: + if err is not None: + raise err + + try: + count = 0 + while count < num_items: + count += 1 + if count % 100 == 0: + producer.poll(0) + message = generate_item_message() + producer.produce(topic, message, callback=delivery_report) + finally: + # Wait for any outstanding messages to be delivered + producer.flush() + + res = ( + storage.get_cluster() + .get_query_connection(ClickhouseClientSettings.QUERY) + .execute("SELECT count(*) FROM default.eap_items_1_local") + ) + try: + consumer_process.terminate() + except Exception: + pass + assert res.results[0][0] == num_items