Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bizon/connectors/sources/kafka/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class KafkaSourceConfig(SourceConfig):
skip_message_empty_value: bool = Field(
default=True, description="Skip messages with empty value (tombstone messages)"
)

skip_message_invalid_keys: bool = Field(
default=False, description="Skip messages with invalid keys (unparsable JSON keys)"
)
# Kafka consumer configuration
batch_size: int = Field(100, description="Kafka batch size, number of messages to fetch at once.")
consumer_timeout: int = Field(10, description="Kafka consumer timeout in seconds, before returning batch.")
Expand Down
57 changes: 42 additions & 15 deletions bizon/connectors/sources/kafka/src/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,31 +262,46 @@ def parse_encoded_messages(self, encoded_messages: list) -> List[SourceRecord]:

for message in encoded_messages:

MESSAGE_LOG_METADATA = (
f"Message for topic {message.topic()} partition {message.partition()} and offset {message.offset()}"
)

if message.error():
# If the message is too large, we skip it and update the offset
if message.error().code() == KafkaError.MSG_SIZE_TOO_LARGE:
logger.error(
(
f"Message for topic {message.topic()} partition {message.partition()} and offset {message.offset()} is too large. "
f"Raised MSG_SIZE_TOO_LARGE, if manually setting the offset, the message might not exist. Double-check in Confluent Cloud."
f"{MESSAGE_LOG_METADATA} is too large. "
"Raised MSG_SIZE_TOO_LARGE, if manually setting the offset, the message might not exist. Double-check in Confluent Cloud."
)
)

logger.error(
(
f"Error while consuming message for topic {message.topic()} partition {message.partition()} and offset {message.offset()}: "
f"{message.error()}"
)
)
logger.error((f"{MESSAGE_LOG_METADATA}: " f"{message.error()}"))
raise KafkaException(message.error())

# We skip tombstone messages
if self.config.skip_message_empty_value and not message.value():
logger.debug(
f"Message for topic {message.topic()} partition {message.partition()} and offset {message.offset()} is empty, skipping."
)
logger.debug(f"{MESSAGE_LOG_METADATA} is empty, skipping.")
continue

# Parse message keys
if message.key():
try:
message_keys = orjson.loads(message.key().decode("utf-8"))
except orjson.JSONDecodeError as e:
# We skip messages with invalid keys
if self.config.skip_message_invalid_keys:
logger.warning(f"{MESSAGE_LOG_METADATA} has an invalid key={message.key()}, skipping.")
# Skip the message
continue

logger.error(
f"{MESSAGE_LOG_METADATA}: Error while parsing message key: {e}, raw key: {message.key()}"
)
raise e
else:
message_keys = {}

# Decode the message
try:

Expand All @@ -297,7 +312,7 @@ def parse_encoded_messages(self, encoded_messages: list) -> List[SourceRecord]:
"offset": message.offset(),
"partition": message.partition(),
"timestamp": message.timestamp()[1],
"keys": orjson.loads(message.key().decode("utf-8")) if message.key() else {},
"keys": message_keys,
"headers": (
{key: value.decode("utf-8") for key, value in message.headers()} if message.headers() else {}
),
Expand All @@ -317,16 +332,28 @@ def parse_encoded_messages(self, encoded_messages: list) -> List[SourceRecord]:
except Exception as e:
logger.error(
(
f"Error while decoding message for topic {message.topic()} on partition {message.partition()}: {e} at offset {message.offset()} "
f"{MESSAGE_LOG_METADATA}: Error while decoding message: {e} "
f"with value: {message.value()} and key: {message.key()}"
)
)
# Try to parse error message from the message

# Try to parse error message from the message value
try:
message_raw_text = message.value().decode("utf-8")
logger.error(f"Parsed Kafka value: {message_raw_text}")
except UnicodeDecodeError:
logger.error("Message is not a valid UTF-8 string")
logger.error("Message value is not a valid UTF-8 string")

# Try to parse error message from the message headers
if message.headers():
try:
headers_dict = {key: value.decode("utf-8") for key, value in message.headers()}
logger.error(f"Parsed Kafka headers: {headers_dict}")
except UnicodeDecodeError as header_error:
logger.error(f"Some message headers are not valid UTF-8 strings: {header_error}")
logger.error(f"Raw message headers: {list(message.headers())}")
else:
logger.error("Message headers are None or empty")

logger.error(traceback.format_exc())
raise e
Expand Down
8 changes: 5 additions & 3 deletions bizon/monitoring/datadog/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ def trace(self, operation_name: str, resource: str = None, extra_tags: Dict[str,

try:
from ddtrace import tracer
except ImportError:
logger.warning("ddtrace not available, skipping tracing")
yield None
return

try:
# Combine tags
all_tags = self.tags.copy()
if extra_tags:
Expand All @@ -145,9 +150,6 @@ def trace(self, operation_name: str, resource: str = None, extra_tags: Dict[str,
span.set_tag(key, value)
span.set_tag("_sampling_priority_v1", 1)
yield span
except ImportError:
logger.warning("ddtrace not available, skipping tracing")
yield None
except Exception as e:
logger.warning(f"Failed to create trace: {e}")
yield None
Loading