From bc9faa1de3ca59b2fa8cc51e286d6f5d492c083f Mon Sep 17 00:00:00 2001 From: sangram11 Date: Tue, 22 Jul 2025 18:37:00 +0530 Subject: [PATCH 01/10] adding timestamp datatype along with string datatypes --- pyspark_datasources/fake.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 2b9447c..4af881c 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -1,4 +1,6 @@ from typing import List +from datetime import datetime, timedelta +import random from pyspark.sql.datasource import ( DataSource, @@ -6,7 +8,7 @@ DataSourceStreamReader, InputPartition, ) -from pyspark.sql.types import StringType, StructType +from pyspark.sql.types import StringType, StructType, TimestampType def _validate_faker_schema(schema): @@ -19,18 +21,26 @@ def _validate_faker_schema(schema): fake = Faker() for field in schema.fields: try: - getattr(fake, field.name)() + if field.dataType == StringType(): + getattr(fake, field.name)() + elif field.dataType == TimestampType(): + continue except AttributeError: raise Exception( f"Unable to find a method called `{field.name}` in faker. " f"Please check Faker's documentation to see supported methods." ) - if field.dataType != StringType(): + if field.dataType not in (StringType(), TimestampType()): raise Exception( - f"Field `{field.name}` is not a StringType. " + f"Field `{field.name}` is not a StringType or TimestampType(). " f"Only StringType is supported in the fake datasource." ) +class GenerateDateTime: + + @classmethod + def randomDate(cls): + return datetime.utcnow() + timedelta(days = random.randint(-365, 0), hours = random.randint(-23, 0), minutes = random.randint(-59, 0), seconds = random.randint(-59, 0), milliseconds = random.randint(-999, 0)) class FakeDataSource(DataSource): """ @@ -140,7 +150,7 @@ def read(self, partition): for _ in range(num_rows): row = [] for field in self.schema.fields: - value = getattr(fake, field.name)() + value = getattr(fake, field.name)() if field.dataType == StringType() else getattr(GenerateDateTime, 'randomDate')() row.append(value) yield tuple(row) From 1908e271b444f39d759895fff5c0d3458caa708f Mon Sep 17 00:00:00 2001 From: sangram11 Date: Tue, 22 Jul 2025 19:31:28 +0530 Subject: [PATCH 02/10] replaced milliseconds with microseconds --- pyspark_datasources/fake.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 4af881c..04f8fa1 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -33,14 +33,14 @@ def _validate_faker_schema(schema): if field.dataType not in (StringType(), TimestampType()): raise Exception( f"Field `{field.name}` is not a StringType or TimestampType(). " - f"Only StringType is supported in the fake datasource." + f"Only StringType and TimestampType are supported in the fake datasource." ) class GenerateDateTime: @classmethod def randomDate(cls): - return datetime.utcnow() + timedelta(days = random.randint(-365, 0), hours = random.randint(-23, 0), minutes = random.randint(-59, 0), seconds = random.randint(-59, 0), milliseconds = random.randint(-999, 0)) + return datetime.utcnow() + timedelta(days = random.randint(-365, 0), hours = random.randint(-23, 0), minutes = random.randint(-59, 0), seconds = random.randint(-59, 0), microseconds = random.randint(-999000, 0)) class FakeDataSource(DataSource): """ From 1f54cc4f939830adc3bc7c07c7c282662e9e8b68 Mon Sep 17 00:00:00 2001 From: sangram11 Date: Tue, 22 Jul 2025 19:36:23 +0530 Subject: [PATCH 03/10] replaced randomDate with random_datetime for better readability --- pyspark_datasources/fake.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 04f8fa1..4c67b38 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -39,7 +39,7 @@ def _validate_faker_schema(schema): class GenerateDateTime: @classmethod - def randomDate(cls): + def random_datetime(cls): return datetime.utcnow() + timedelta(days = random.randint(-365, 0), hours = random.randint(-23, 0), minutes = random.randint(-59, 0), seconds = random.randint(-59, 0), microseconds = random.randint(-999000, 0)) class FakeDataSource(DataSource): @@ -150,7 +150,7 @@ def read(self, partition): for _ in range(num_rows): row = [] for field in self.schema.fields: - value = getattr(fake, field.name)() if field.dataType == StringType() else getattr(GenerateDateTime, 'randomDate')() + value = getattr(fake, field.name)() if field.dataType == StringType() else getattr(GenerateDateTime, 'random_datetime')() row.append(value) yield tuple(row) From f0926492d75ca173db6a96ba1f6bdb8c72ff0e1f Mon Sep 17 00:00:00 2001 From: sangram11 Date: Tue, 22 Jul 2025 19:41:12 +0530 Subject: [PATCH 04/10] Specifying return type for random_datetime --- pyspark_datasources/fake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 4c67b38..27a8837 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -39,7 +39,7 @@ def _validate_faker_schema(schema): class GenerateDateTime: @classmethod - def random_datetime(cls): + def random_datetime(cls) -> datetime: return datetime.utcnow() + timedelta(days = random.randint(-365, 0), hours = random.randint(-23, 0), minutes = random.randint(-59, 0), seconds = random.randint(-59, 0), microseconds = random.randint(-999000, 0)) class FakeDataSource(DataSource): From 2f13cef4b54176c683ca49f4dc0bbaeb673a6c0b Mon Sep 17 00:00:00 2001 From: sangram11 Date: Wed, 23 Jul 2025 09:38:52 +0530 Subject: [PATCH 05/10] Adding test cases for timestamp datatype. Also changed the default schema in FakeDataSource. --- pyspark_datasources/fake.py | 4 ++-- tests/test_data_sources.py | 10 ++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 27a8837..63b29c0 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -124,7 +124,7 @@ def name(cls): return "fake" def schema(self): - return "name string, date string, zipcode string, state string" + return "name string, date string, zipcode string, state string, creationDate timestamp" def reader(self, schema: StructType) -> "FakeDataSourceReader": _validate_faker_schema(schema) @@ -179,6 +179,6 @@ def read(self, partition): for _ in range(partition.value): row = [] for field in self.schema.fields: - value = getattr(fake, field.name)() + value = getattr(fake, field.name)() if field.dataType == StringType() else getattr(GenerateDateTime, 'random_datetime')() row.append(value) yield tuple(row) diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index ad6a2b0..76a33a2 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -2,7 +2,7 @@ from pyspark.sql import SparkSession from pyspark_datasources import * - +from pyspark.sql.types import * @pytest.fixture def spark(): @@ -30,14 +30,20 @@ def test_fake_datasource_stream(spark): ) spark.sql("SELECT * FROM result").show() assert spark.sql("SELECT * FROM result").count() == 3 + df = spark.table("result") + df_datatypes = [d.dataType for d in df.schema.fields] + assert len(df.columns) == 5 + assert TimestampType() in df_datatypes def test_fake_datasource(spark): spark.dataSource.register(FakeDataSource) df = spark.read.format("fake").load() + df_datatypes = [d.dataType for d in df.schema.fields] df.show() assert df.count() == 3 - assert len(df.columns) == 4 + assert len(df.columns) == 5 + assert TimestampType() in df_datatypes def test_kaggle_datasource(spark): From f5dc95445ef62b00b7b69e010a6a47fa650a2290 Mon Sep 17 00:00:00 2001 From: sangram11 Date: Wed, 23 Jul 2025 10:06:54 +0530 Subject: [PATCH 06/10] Only imported TimeStampType to test it for specific usecase. Replaced getattr with directly calling the classmethod random_datetime. Addressed all comments by coderabbitai --- pyspark_datasources/fake.py | 4 ++-- tests/test_data_sources.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 63b29c0..5f1fc90 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -150,7 +150,7 @@ def read(self, partition): for _ in range(num_rows): row = [] for field in self.schema.fields: - value = getattr(fake, field.name)() if field.dataType == StringType() else getattr(GenerateDateTime, 'random_datetime')() + value = getattr(fake, field.name)() if field.dataType == StringType() else GenerateDateTime.random_datetime() row.append(value) yield tuple(row) @@ -179,6 +179,6 @@ def read(self, partition): for _ in range(partition.value): row = [] for field in self.schema.fields: - value = getattr(fake, field.name)() if field.dataType == StringType() else getattr(GenerateDateTime, 'random_datetime')() + value = getattr(fake, field.name)() if field.dataType == StringType() else GenerateDateTime.random_datetime() row.append(value) yield tuple(row) diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 76a33a2..111ace9 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -2,7 +2,7 @@ from pyspark.sql import SparkSession from pyspark_datasources import * -from pyspark.sql.types import * +from pyspark.sql.types import TimestampType @pytest.fixture def spark(): From 04e07637604ff636e544c1a8e41594823fc2b4a0 Mon Sep 17 00:00:00 2001 From: sangram11 Date: Wed, 23 Jul 2025 10:32:33 +0530 Subject: [PATCH 07/10] Adding ci.yml file to perform continuous integration --- .github/workflows/ci.yml | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..2291262 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,52 @@ +name: CI + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ['3.9', '3.10', '3.11', '3.12'] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Poetry + uses: snok/install-poetry@v1.3.4 + with: + version: latest + virtualenvs-create: true + virtualenvs-in-project: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v4 + with: + path: .venv + key: venv-${{ runner.os }}-${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }} + + - name: Install dependencies + if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + run: poetry install --no-interaction --no-root --extras "all" + + - name: Install project + run: poetry install --no-interaction --extras "all" + + - name: Run tests + run: poetry run pytest tests/ -v + + - name: Run tests with coverage + run: | + poetry run pytest tests/ --cov=pyspark_datasources --cov-report=xml --cov-report=term-missing \ No newline at end of file From 121cdabb2202203416e1350469012a8564adb2ed Mon Sep 17 00:00:00 2001 From: sangram11 Date: Wed, 23 Jul 2025 10:45:45 +0530 Subject: [PATCH 08/10] alligning my ci.yml file with master branch --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2291262..3e9caa3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,4 +49,5 @@ jobs: - name: Run tests with coverage run: | - poetry run pytest tests/ --cov=pyspark_datasources --cov-report=xml --cov-report=term-missing \ No newline at end of file + poetry run pytest tests/ --cov=pyspark_datasources --cov-report=xml --cov-report=term-missing + \ No newline at end of file From 3b55d076e541e9d1beef021354e9a6626e2a31d6 Mon Sep 17 00:00:00 2001 From: sangram11 Date: Wed, 23 Jul 2025 10:58:49 +0530 Subject: [PATCH 09/10] removing the ci.yml file as it is causing conflict with master branch --- .github/workflows/ci.yml | 53 ---------------------------------------- 1 file changed, 53 deletions(-) delete mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index 3e9caa3..0000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,53 +0,0 @@ -name: CI - -on: - push: - branches: [ main, master ] - pull_request: - branches: [ main, master ] - -jobs: - test: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ['3.9', '3.10', '3.11', '3.12'] - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Install Poetry - uses: snok/install-poetry@v1.3.4 - with: - version: latest - virtualenvs-create: true - virtualenvs-in-project: true - - - name: Load cached venv - id: cached-poetry-dependencies - uses: actions/cache@v4 - with: - path: .venv - key: venv-${{ runner.os }}-${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }} - - - name: Install dependencies - if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction --no-root --extras "all" - - - name: Install project - run: poetry install --no-interaction --extras "all" - - - name: Run tests - run: poetry run pytest tests/ -v - - - name: Run tests with coverage - run: | - poetry run pytest tests/ --cov=pyspark_datasources --cov-report=xml --cov-report=term-missing - \ No newline at end of file From b86ca070cb8fd3f17371a8ebcf1df551db5a518c Mon Sep 17 00:00:00 2001 From: sangram11 Date: Sat, 2 Aug 2025 09:24:34 +0530 Subject: [PATCH 10/10] Restored old default schema. Tested exclusively for timestamp column by adding test_fake_timestamp_column --- pyspark_datasources/fake.py | 2 +- tests/test_data_sources.py | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pyspark_datasources/fake.py b/pyspark_datasources/fake.py index 5f1fc90..d58b71e 100644 --- a/pyspark_datasources/fake.py +++ b/pyspark_datasources/fake.py @@ -124,7 +124,7 @@ def name(cls): return "fake" def schema(self): - return "name string, date string, zipcode string, state string, creationDate timestamp" + return "name string, date string, zipcode string, state string" def reader(self, schema: StructType) -> "FakeDataSourceReader": _validate_faker_schema(schema) diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 111ace9..fdc4ab6 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -2,7 +2,7 @@ from pyspark.sql import SparkSession from pyspark_datasources import * -from pyspark.sql.types import TimestampType +from pyspark.sql.types import TimestampType, StringType, StructType, StructField @pytest.fixture def spark(): @@ -31,19 +31,28 @@ def test_fake_datasource_stream(spark): spark.sql("SELECT * FROM result").show() assert spark.sql("SELECT * FROM result").count() == 3 df = spark.table("result") - df_datatypes = [d.dataType for d in df.schema.fields] - assert len(df.columns) == 5 - assert TimestampType() in df_datatypes + assert len(df.columns) == 4 def test_fake_datasource(spark): spark.dataSource.register(FakeDataSource) df = spark.read.format("fake").load() + df.show() + assert df.count() == 3 + assert len(df.columns) == 4 + + +def test_fake_timestamp_column(spark): + spark.dataSource.register(FakeDataSource) + schema = StructType([StructField("name", StringType(), True), StructField("zipcode", StringType(), True), StructField("state", StringType(), True), StructField("date", TimestampType(), True)]) + df = spark.read.format("fake").schema(schema).load() + df_columns = [d.name for d in df.schema.fields] df_datatypes = [d.dataType for d in df.schema.fields] df.show() assert df.count() == 3 - assert len(df.columns) == 5 - assert TimestampType() in df_datatypes + assert len(df.columns) == 4 + assert df_columns == ["name", "zipcode", "state", "date"] + assert df_datatypes[-1] == TimestampType() def test_kaggle_datasource(spark):