Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions devops/create_development_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
from uuid import uuid4

import click
import polars as pl
from faker import Faker
from tqdm import tqdm

#####

FAKE_CLIENT = Faker("en_us")


def generate_fake_data(num_records: int) -> pl.DataFrame:
"""
Generates a Polars DataFrame of fake PII data for testing purposes.

Args:
num_records: The number of fake records to generate.

Returns:
A Polars DataFrame containing the generated fake data.
"""

output = []
for _ in tqdm(range(num_records), desc="Generating fake data", unit="record"):
output.append(
{
"id": uuid4().hex,
"name": FAKE_CLIENT.name(),
"address": FAKE_CLIENT.address(),
"city": FAKE_CLIENT.city(),
"state": FAKE_CLIENT.state(),
"email": FAKE_CLIENT.email(),
"phone": FAKE_CLIENT.phone_number(),
}
)

return pl.DataFrame(output)


@click.command()
@click.option(
"--output-dir", default=".", help="Directory to save the generated CSV file."
)
@click.option(
"--output-filename", default="fake_data.csv", help="Name of the generated CSV file."
)
@click.option(
"--num-records",
default=10_000_000,
help="Number of fake records to generate (default: 10,000,000).",
)
def cli(output_dir: str, output_filename: str, num_records: int) -> None:
"""Generate a CSV file with fake data for testing purposes."""
df = generate_fake_data(num_records)

# Combine the output directory and filename to create the full output path
output_path = os.path.join(output_dir, output_filename)

# Save the output DataFrame to a CSV file
df.write_csv(output_path)

# Notify the end user
click.echo(f"Generated {num_records} fake records and saved to {output_path}")


#####

if __name__ == "__main__":
cli()
22 changes: 15 additions & 7 deletions klondike/scripts/stream_csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,32 @@ def stream_csv_to_database(
**bigquery_kwargs: Additional keyword arguments to pass to the BigQuery upload method (e.g., write_disposition, etc.).
"""

lazy_df = pl.scan_csv(
csv_path,
separator=csv_separator,
infer_schema_length=infer_schema_length,
)

records_written = 0
skip_rows_count = 0
logger.info(
f"Streaming data from {csv_path} to table {destination_table_name} in batches of {batch_size}..."
)
for batch_df in lazy_df.collect().iter_slices(n_rows=batch_size):

while True:
batch_df = pl.read_csv(
csv_path,
separator=csv_separator,
infer_schema_length=infer_schema_length,
skip_rows=skip_rows_count,
n_rows=batch_size,
)

if len(batch_df) == 0:
break

logger.debug(f"Uploading batch to table {destination_table_name}...")
connector.write_dataframe(
df=batch_df,
table_name=destination_table_name,
**bigquery_kwargs,
)
records_written += len(batch_df)
skip_rows_count += len(batch_df)

logger.info(
f"Finished streaming {records_written} rows to {destination_table_name}"
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "klondike"
version = "0.3.0"
version = "0.3.1"
authors = [{ "name" = "Ian Richard Ferguson", "email" = "IAN@ianferguson.dev" }]
description = "Klondike is a suite of database connectors, powered by Polars"
requires-python = "~=3.12.0"
Expand Down Expand Up @@ -40,6 +40,9 @@ dev = [
"mypy~=1.9.0", # Strict static type checking
"pre-commit~=3.4.0",
"jupyter>=1.1.1",
"faker==40.4.0",
"tqdm==4.67.3",
"click==8.3.1",
]

[tool.ruff]
Expand Down
Loading