Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
449d4c7
chore: pipeline should fail if records increase buffer size (#62)
aballiet Jun 25, 2025
81feb12
chore: remove loguru logger from kafka consumer (#63)
aballiet Jun 25, 2025
1f3cccc
chore: make streaming_v2 work with clustering and orjson (#64)
anaselmhamdi Jul 4, 2025
5547544
feat: add custom dsm tracking (#65)
anaselmhamdi Jul 4, 2025
6ce5398
chore: DSM working in staging (#67)
anaselmhamdi Jul 4, 2025
a6fb4a3
chore: loop through dsm call (#68)
anaselmhamdi Jul 7, 2025
1ca934f
chore: remove offsets skipping in case msg too large and log consumer…
aballiet Jul 7, 2025
fec0957
chore: remove return (#70)
anaselmhamdi Jul 7, 2025
87ac659
Fix streaming v2 for large rows (#71)
anaselmhamdi Jul 7, 2025
18f1734
feat: add monitoring for large records synced in destination (#72)
aballiet Jul 9, 2025
f5218ad
chore: add better datadog span to trace stream iteration (#73)
anaselmhamdi Jul 9, 2025
7ce4f71
feat: add destination_alias to sync_metadata (#74)
aballiet Jul 9, 2025
f1d7c44
Fix generator did not stop error when enable_tracing is false (#75)
anaselmhamdi Jul 9, 2025
9eb0e67
chore: fix monitor instantiation in destination and removed redundant…
anaselmhamdi Jul 9, 2025
9b64afc
feat(bigquery): allow set bq_max_rows_per_request (#77)
aballiet Jul 10, 2025
8f6453d
feat(bigquery): allow set concurrent thread (#78)
aballiet Jul 10, 2025
4120b76
chore: increase max row size and max request size (#79)
anaselmhamdi Jul 10, 2025
1203f02
chore: add logging when proto serialization fails (#80)
aballiet Jul 11, 2025
af28219
chore: add more logs when proto serialization fails (#81)
aballiet Jul 24, 2025
7dbaac2
chore: catch empty project_id error (#84)
anaselmhamdi Jul 31, 2025
c0bc834
chore: re-init client in append rows
anaselmhamdi Aug 6, 2025
ae83960
chore: add kafka error handling, bizon api quota config
anaselmhamdi Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
default_language_version:
python: python3.8

repos:
- repo: https://github.com/psf/black
rev: 24.4.2
Expand Down
2 changes: 2 additions & 0 deletions bizon/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class SyncMetadata(BaseModel):
stream_name: str
sync_mode: SourceSyncModes
destination_name: str
destination_alias: str

@classmethod
def from_bizon_config(cls, job_id: str, config: BizonConfig) -> "SyncMetadata":
Expand All @@ -85,4 +86,5 @@ def from_bizon_config(cls, job_id: str, config: BizonConfig) -> "SyncMetadata":
stream_name=config.source.stream,
sync_mode=config.source.sync_mode,
destination_name=config.destination.name,
destination_alias=config.destination.alias,
)
1 change: 1 addition & 0 deletions bizon/connectors/destinations/bigquery/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,6 @@ class BigQueryConfigDetails(AbstractDestinationDetailsConfig):

class BigQueryConfig(AbstractDestinationConfig):
name: Literal[DestinationTypes.BIGQUERY]
alias: str = "bigquery"
buffer_size: Optional[int] = 400
config: BigQueryConfigDetails
4 changes: 3 additions & 1 deletion bizon/connectors/destinations/bigquery/src/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from bizon.common.models import SyncMetadata
from bizon.destination.destination import AbstractDestination
from bizon.engine.backend.backend import AbstractBackend
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.config import SourceSyncModes
from bizon.source.source import AbstractSourceCallback

Expand All @@ -28,8 +29,9 @@ def __init__(
config: BigQueryConfigDetails,
backend: AbstractBackend,
source_callback: AbstractSourceCallback,
monitor: AbstractMonitor,
):
super().__init__(sync_metadata, config, backend, source_callback)
super().__init__(sync_metadata, config, backend, source_callback, monitor)
self.config: BigQueryConfigDetails = config

if config.authentication and config.authentication.service_account_key:
Expand Down
11 changes: 6 additions & 5 deletions bizon/connectors/destinations/bigquery_streaming/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ class BigQueryStreamingConfigDetails(AbstractDestinationDetailsConfig):
description="BigQuery Time partitioning type",
)
authentication: Optional[BigQueryAuthentication] = None
bq_max_rows_per_request: Optional[int] = Field(30000, description="Max rows per buffer streaming request.")
bq_max_rows_per_request: Optional[int] = Field(
5000,
description="Max rows per buffer streaming request. Must not exceed 10000.",
le=10000,
)
record_schemas: Optional[list[BigQueryRecordSchemaConfig]] = Field(
default=None, description="Schema for the records. Required if unnest is set to true."
)
use_legacy_streaming_api: bool = Field(
default=False,
description="[DEPRECATED] Use the legacy streaming API. This is required for some older BigQuery versions.",
)


class BigQueryStreamingConfig(AbstractDestinationConfig):
name: Literal[DestinationTypes.BIGQUERY_STREAMING]
alias: str = "bigquery"
config: BigQueryStreamingConfigDetails
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
)
from bizon.destination.destination import AbstractDestination
from bizon.engine.backend.backend import AbstractBackend
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.callback import AbstractSourceCallback

from .config import BigQueryStreamingConfigDetails
Expand All @@ -44,7 +45,6 @@
class BigQueryStreamingDestination(AbstractDestination):

# Add constants for limits
MAX_ROWS_PER_REQUEST = 5000 # 5000 (max is 10000)
MAX_REQUEST_SIZE_BYTES = 5 * 1024 * 1024 # 5 MB (max is 10MB)
MAX_ROW_SIZE_BYTES = 0.9 * 1024 * 1024 # 1 MB

Expand All @@ -54,8 +54,9 @@ def __init__(
config: BigQueryStreamingConfigDetails,
backend: AbstractBackend,
source_callback: AbstractSourceCallback,
monitor: AbstractMonitor,
): # type: ignore
super().__init__(sync_metadata, config, backend, source_callback)
super().__init__(sync_metadata, config, backend, source_callback, monitor)
self.config: BigQueryStreamingConfigDetails = config

if config.authentication and config.authentication.service_account_key:
Expand Down Expand Up @@ -222,7 +223,7 @@ def _insert_batch(self, table, batch):
try:
# Handle streaming batch
if batch.get("stream_batch") and len(batch["stream_batch"]) > 0:
return self.bq_client.insert_rows_json(
self.bq_client.insert_rows_json(
table,
batch["stream_batch"],
row_ids=[None] * len(batch["stream_batch"]),
Expand All @@ -245,6 +246,10 @@ def _insert_batch(self, table, batch):
if load_job.state != "DONE":
raise Exception(f"Failed to load rows to BigQuery: {load_job.errors}")

self.monitor.track_large_records_synced(
num_records=len(batch["json_batch"]), extra_tags={"destination_id": self.destination_id}
)

except Exception as e:
logger.error(f"Error inserting batch: {str(e)}, type: {type(e)}")
raise
Expand Down Expand Up @@ -347,7 +352,7 @@ def batch(self, iterable):

# If adding this item would exceed either limit, yield current batch and start new one
if (
len(current_batch) >= self.MAX_ROWS_PER_REQUEST
len(current_batch) >= self.bq_max_rows_per_request
or current_batch_size + item_size > self.MAX_REQUEST_SIZE_BYTES
):
logger.debug(f"Yielding batch of {len(current_batch)} rows, size: {current_batch_size/1024/1024:.2f}MB")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ class BigQueryStreamingV2ConfigDetails(AbstractDestinationDetailsConfig):
description="BigQuery Time partitioning type",
)
authentication: Optional[BigQueryAuthentication] = None
bq_max_rows_per_request: Optional[int] = Field(30000, description="Max rows per buffer streaming request.")
bq_max_rows_per_request: Optional[int] = Field(
5000,
description="Max rows per buffer streaming request. Must not exceed 10000.",
le=10000,
)
record_schemas: Optional[list[BigQueryRecordSchemaConfig]] = Field(
default=None, description="Schema for the records. Required if unnest is set to true."
)


class BigQueryStreamingV2Config(AbstractDestinationConfig):
name: Literal[DestinationTypes.BIGQUERY_STREAMING_V2]
alias: str = "bigquery"
config: BigQueryStreamingV2ConfigDetails
Loading