Skip to content

Conversation

@volokluev
Copy link
Member

Up until now we had no way to sanity check that our consumer implementation put all the rows in the database it needed to. Add a test which spins up the consumer along with kafka and clickhouse

@volokluev volokluev requested a review from a team as a code owner June 30, 2025 23:32
@volokluev volokluev changed the title reliability(consumers): add end to end test reliability(consumers): add end to end test (WIP) Jun 30, 2025
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
from google.protobuf.timestamp_pb2 import Timestamp

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High severity and reachable issue identified in your code:
Line 11 has a vulnerable usage of protobuf, introducing a high severity vulnerability.

ℹ️ Why this is reachable

A reachable issue is a real security risk because your project actually executes the vulnerable code. This issue is reachable because your code uses a certain version of protobuf.
Affected versions of protobuf are vulnerable to Uncontrolled Recursion. The pure-Python implementation of Protocol Buffers is vulnerable to a denial-of-service attack when processing untrusted data with deeply nested or recursive groups/messages, potentially causing the Python recursion limit to be exceeded.

References: GHSA, CVE

To resolve this comment:
Upgrade this dependency to at least version 5.29.5 at requirements.txt.

💬 Ignore this finding

To ignore this, reply with:

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

You can view more details on this finding in the Semgrep AppSec Platform here.

Copy link
Contributor

@onkar onkar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's ready to review since the PR title has (WIP) in it, but it was open for review at the same time. LGTM mostly, left a few comments.

Experimental alternative to `snuba consumer`
"""

breakpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended break point?

storage = get_storage(StorageKey("eap_items"))
storage.get_cluster().get_query_connection(
ClickhouseClientSettings.QUERY
).execute("TRUNCATE TABLE IF EXISTS eap_items_1_local")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will truncate after starting the consumer and consumer may get empty table mid-way. Is it safer to truncate before starting the consumer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing on the consumer topic at the time it starts so no, it's not an issue

Comment on lines +112 to +113
# Wait for consumer to initialize
time.sleep(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be fragile? Can we poll until ClickHouse is ready to reduce the fragility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not waiting for clickhouse, I'm waiting for the consumer to start and I can start producing things to the topic before it fully starts

consumer_process.terminate()
except Exception:
pass
assert res.results[0][0] == num_items
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this fail if some records aren't yet flushed? How about if we retry until a timeout instead of asserting immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling terminate on the consumer process should flush it

@untitaker
Copy link
Member

you can probably write this test more reliably when done entirely from within rust. we already have some tests there. then no subprocess is necessary, you'd launch the main function directly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants