Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ jobs:
SNUBA_IMAGE=ghcr.io/getsentry/snuba-ci:${{ github.event.pull_request.head.sha || github.sha }} SNUBA_SETTINGS=test_initialization TEST_LOCATION=test_initialization docker compose -f docker-compose.gcb.yml run --rm snuba-test
if: ${{ matrix.snuba_settings == 'test' }}

- name: Docker Snuba E2E Consumer Tests
run: |
SNUBA_IMAGE=ghcr.io/getsentry/snuba-ci:${{ github.event.pull_request.head.sha || github.sha }} SNUBA_SETTINGS=test_initialization TEST_LOCATION=test_consumer_e2e docker compose -f docker-compose.gcb.yml run --rm snuba-test
if: ${{ matrix.snuba_settings == 'test' }}


- name: Upload test results to Codecov
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
Expand Down
11 changes: 7 additions & 4 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@
type=click.Choice(["v1", "v2"]),
help="Specify which consumer version to use, v1 is stable, v2 is experimental",
)
@click.option(
"--raw-events-topic",
default=None,
type=str,
help="override the raw events topic ",
)
def rust_consumer(
*,
storage_names: Sequence[str],
Expand Down Expand Up @@ -236,10 +242,7 @@ def rust_consumer(
join_timeout_ms: Optional[int],
consumer_version: Optional[str],
) -> None:
"""
Experimental alternative to `snuba consumer`
"""

breakpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended break point?

consumer_config = resolve_consumer_config(
storage_names=storage_names,
raw_topic=raw_events_topic,
Expand Down
Empty file added test_consumer_e2e/__init__.py
Empty file.
141 changes: 141 additions & 0 deletions test_consumer_e2e/test_items_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import subprocess
import time
import uuid
from contextlib import contextmanager
from datetime import UTC, datetime, timedelta
from typing import Generator, Optional

from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
from google.protobuf.timestamp_pb2 import Timestamp

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High severity and reachable issue identified in your code:
Line 11 has a vulnerable usage of protobuf, introducing a high severity vulnerability.

ℹ️ Why this is reachable

A reachable issue is a real security risk because your project actually executes the vulnerable code. This issue is reachable because your code uses a certain version of protobuf.
Affected versions of protobuf are vulnerable to Uncontrolled Recursion. The pure-Python implementation of Protocol Buffers is vulnerable to a denial-of-service attack when processing untrusted data with deeply nested or recursive groups/messages, potentially causing the Python recursion limit to be exceeded.

References: GHSA, CVE

To resolve this comment:
Upgrade this dependency to at least version 5.29.5 at requirements.txt.

💬 Ignore this finding

To ignore this, reply with:

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

You can view more details on this finding in the Semgrep AppSec Platform here.

from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from snuba.clusters.cluster import ClickhouseClientSettings
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey

# Kafka configuration
kafka_config = {"bootstrap.servers": "localhost:9092", "client.id": "items_producer"}

producer = Producer(kafka_config)


def generate_item_message(start_timestamp: Optional[datetime] = None) -> bytes:
if start_timestamp is None:
start_timestamp = datetime.now(tz=UTC)

item_timestamp = Timestamp()
item_timestamp.FromDatetime(start_timestamp)

received = Timestamp()
received.GetCurrentTime()

end_timestamp = start_timestamp + timedelta(seconds=1)

attributes = {
"category": AnyValue(string_value="http"),
"description": AnyValue(string_value="/api/0/events/"),
"environment": AnyValue(string_value="production"),
"http.status_code": AnyValue(string_value="200"),
"op": AnyValue(string_value="http.server"),
"platform": AnyValue(string_value="python"),
"sentry.received": AnyValue(double_value=received.seconds),
"sentry.start_timestamp_precise": AnyValue(
double_value=start_timestamp.timestamp()
),
"sentry.end_timestamp_precise": AnyValue(
double_value=end_timestamp.timestamp()
),
"start_timestamp_ms": AnyValue(
double_value=int(start_timestamp.timestamp() * 1000)
),
}

return TraceItem(
organization_id=1,
project_id=1,
item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
timestamp=item_timestamp,
trace_id=uuid.uuid4().hex,
item_id=uuid.uuid4().int.to_bytes(16, byteorder="little"),
received=received,
retention_days=90,
server_sample_rate=1.0,
attributes=attributes,
).SerializeToString()


@contextmanager
def tmp_kafka_topic() -> Generator[str, None, None]:
# Create a unique topic name
topic_name = f"snuba-items-{uuid.uuid4().hex}"

# Create the topic
admin_client = AdminClient({"bootstrap.servers": "localhost:9092"})
new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])

try:
yield topic_name
finally:
# Clean up the topic
admin_client.delete_topics([topic_name])


def test_items_consumer() -> None:
# Launch the consumer process
with tmp_kafka_topic() as topic:
consumer_process = subprocess.Popen(
[
"snuba",
"rust-consumer",
"--raw-events-topic",
topic,
"--consumer-version",
"v2",
"--consumer-group",
f"eap_items_consumer_{uuid.uuid4().hex}",
"--storage",
"eap_items",
"--no-strict-offset-reset",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
storage = get_storage(StorageKey("eap_items"))
storage.get_cluster().get_query_connection(
ClickhouseClientSettings.QUERY
).execute("TRUNCATE TABLE IF EXISTS eap_items_1_local")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will truncate after starting the consumer and consumer may get empty table mid-way. Is it safer to truncate before starting the consumer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing on the consumer topic at the time it starts so no, it's not an issue


# Wait for consumer to initialize
time.sleep(2)
Comment on lines +112 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be fragile? Can we poll until ClickHouse is ready to reduce the fragility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not waiting for clickhouse, I'm waiting for the consumer to start and I can start producing things to the topic before it fully starts

num_items = 1000

def delivery_report(err: Optional[Exception], msg: KafkaMessage) -> None:
if err is not None:
raise err

try:
count = 0
while count < num_items:
count += 1
if count % 100 == 0:
producer.poll(0)
message = generate_item_message()
producer.produce(topic, message, callback=delivery_report)
finally:
# Wait for any outstanding messages to be delivered
producer.flush()

res = (
storage.get_cluster()
.get_query_connection(ClickhouseClientSettings.QUERY)
.execute("SELECT count(*) FROM default.eap_items_1_local")
)
try:
consumer_process.terminate()
except Exception:
pass
assert res.results[0][0] == num_items
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this fail if some records aren't yet flushed? How about if we retry until a timeout instead of asserting immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling terminate on the consumer process should flush it

Loading