Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5e6a126
STS - First draft
DVNorbertAcatrinei Dec 30, 2021
f59c671
Minor
DVNorbertAcatrinei Dec 30, 2021
74e8302
Added sts to dbtvault_generator & environment files
DVNorbertAcatrinei Dec 30, 2021
fc879eb
WIP
DVNorbertAcatrinei Jan 10, 2022
496a852
Minor update on sts macro
DVNorbertAcatrinei Jan 10, 2022
b0a6886
Added extra sts tests
DVNorbertAcatrinei Jan 10, 2022
5fa3c50
Minor change in STS macro
DVNorbertAcatrinei Jan 12, 2022
845c6dc
WIP - Added a new version of the STS macro
DVNorbertAcatrinei Jan 12, 2022
eaf5b67
WIP - Minor changes in the 2nd version of the STS macro
DVNorbertAcatrinei Jan 12, 2022
b60c1e6
WIP - Added more tests + changes to sts macro
DVNorbertAcatrinei Jan 14, 2022
7111eb7
Added cycle test for sts + updated base tests
DVNorbertAcatrinei Jan 14, 2022
6982a51
Minor changes to the sts macro
DVNorbertAcatrinei Jan 14, 2022
ba44257
WIP - Added incremental, period mat, rank mat tests for STS
DVNorbertAcatrinei Jan 20, 2022
7040abd
WIP - Removed rank mat and period mat tests for STS
DVNorbertAcatrinei Jan 25, 2022
07e22b4
Updated STS macro - DELETE timestamp currently hardcoded
DVNorbertAcatrinei Jan 25, 2022
27f4aa0
Updated STS base and incremental load tests
DVNorbertAcatrinei Jan 25, 2022
37fcf1e
Merge branch 'develop' into feat/sts
DVAlexHiggs May 10, 2022
60c92d8
SF-STS feature passing all tests
DVCatAreias May 12, 2022
e304948
STS 2 feature is passing all tests
DVCatAreias May 13, 2022
f9e53c0
STS 2 refactor for general case
DVCatAreias May 17, 2022
bbc30fc
STS 2 delete function corrected
DVCatAreias May 17, 2022
52b9d94
STS and STS-IM complete
DVCatAreias May 18, 2022
ad9a2e2
Code readability edit
DVCatAreias May 19, 2022
3fefa25
Merge branch 'develop' into feat/sts
DVTimWilson Jul 12, 2022
568e451
Code review
DVTimWilson Jul 12, 2022
c850073
Period materialisation testing
DVTimWilson Jul 15, 2022
684b658
Delete repeated documented parameters
DVTimWilson Jul 19, 2022
b6a7b68
WIP Add hashdiff parameter and column
DVTimWilson Jul 19, 2022
574b5d8
STS macro with combined staging and load selections
DVTimWilson Jul 20, 2022
c4dc169
Add STS to pipeline
DVTimWilson Jul 20, 2022
0a2a063
Resolve issues noted while documenting
DVTimWilson Jul 20, 2022
4f86c4f
Doh! missed something.
DVTimWilson Jul 20, 2022
ae4e228
Merge branch 'develop' into feat/sts
DVAlexHiggs Aug 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions dbtvault-dev/macros/tables/snowflake/sts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
{%- macro sts(src_pk, src_ldts, src_source, src_status, src_hashdiff, source_model ) -%}

{{- adapter.dispatch('sts', 'dbtvault')(src_pk=src_pk,
src_ldts=src_ldts,
src_source=src_source,
src_status=src_status,
src_hashdiff=src_hashdiff,
source_model=source_model) -}}
{%- endmacro -%}

{%- macro default__sts(src_pk, src_ldts, src_source, src_status, src_hashdiff, source_model) -%}

{% if model.config.materialized != 'incremental' and execute %}

{%- set error_message -%}
STS loading error: The materialization must be incremental.
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}

{{- dbtvault.check_required_parameters(src_pk=src_pk, src_ldts=src_ldts,
src_source=src_source, src_status=src_status,
src_hashdiff=src_hashdiff,
source_model=source_model) -}}

{%- set src_pk = dbtvault.escape_column_name(src_pk) -%}
{%- set src_ldts = dbtvault.escape_column_name(src_ldts) -%}
{%- set src_source = dbtvault.escape_column_name(src_source) -%}
{%- set src_status = dbtvault.escape_column_name(src_status) -%}
{%- set src_hashdiff = dbtvault.escape_column_name(src_hashdiff) -%}
{%- set hash_columns = {src_hashdiff: src_status} -%}

{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_ldts, src_source]) -%}
{%- set final_source_cols = dbtvault.expand_column_list(columns=[src_pk, src_ldts, src_source, src_status, src_hashdiff]) -%}

{{ dbtvault.prepend_generated_by() }}

WITH source_data AS (
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }}
FROM {{ ref(source_model) }} AS a
WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }}
{%- if model.config.materialized == 'vault_insert_by_period' %}
AND __PERIOD_FILTER__
{% endif %}
),

{%- if dbtvault.is_any_incremental() %}

stage_datetime AS (
SELECT MAX({{ dbtvault.prefix([src_ldts], 'b') }}) AS LOAD_DATETIME
FROM source_data AS b
),

latest_records AS (
SELECT {{ dbtvault.prefix(final_source_cols, 'c', alias_target='target') }}
FROM (
SELECT {{ dbtvault.prefix(final_source_cols, 'current_records', alias_target='target') }},
RANK() OVER (
PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }}
ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC
) AS rank
FROM {{ this }} AS current_records
) AS c
WHERE c.rank = 1
),

{%- endif %}

records_with_status AS (
SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }},
'I' AS {{ src_status }}
FROM source_data AS stage

{%- if dbtvault.is_any_incremental() %}
WHERE NOT EXISTS (
SELECT 1
FROM latest_records
WHERE ({{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }}
AND {{ dbtvault.prefix([src_status], 'latest_records') }} != 'D')
)

UNION ALL

SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'latest_records') }},
stage_datetime.LOAD_DATETIME AS {{ src_ldts }},
{{ dbtvault.prefix([src_source], 'latest_records') }},
'D' AS {{ src_status }}
FROM latest_records
INNER JOIN stage_datetime
ON 1 = 1
WHERE NOT EXISTS (
SELECT 1
FROM source_data AS stage
WHERE {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }}
)
AND {{ dbtvault.prefix([src_status], 'latest_records') }} != 'D'
AND stage_datetime.LOAD_DATETIME IS NOT NULL

UNION ALL

SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }},
'U' AS {{ src_status }}
FROM source_data AS stage
WHERE EXISTS (
SELECT 1
FROM latest_records
WHERE ({{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }}
AND {{ dbtvault.prefix([src_status], 'latest_records') }} != 'D'
AND {{ dbtvault.prefix([src_ldts], 'stage') }} != {{ dbtvault.prefix([src_ldts], 'latest_records') }})
)
{%- endif %}
),

records_with_status_and_hashdiff AS (
SELECT {{ dbtvault.alias_all(source_cols, 'd') }}, {{ dbtvault.alias_all(src_status, 'd') }},
{{ dbtvault.hash_columns(columns=hash_columns) | indent(4) }}
FROM records_with_status AS d
),

records_to_insert AS (
SELECT DISTINCT {{ dbtvault.alias_all(final_source_cols, 'stage') }}
FROM records_with_status_and_hashdiff AS stage
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN latest_records
ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }}
WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }}
OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL
{%- endif %}
)

SELECT * FROM records_to_insert

{%- endmacro -%}
37 changes: 32 additions & 5 deletions test/dbtvault_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def raw_vault_structure(model_name, vault_structure, config=None, **kwargs):
"xts": xts,
"ma_sat": ma_sat,
"bridge": bridge,
"pit": pit
"pit": pit,
"sts": sts,

}

processed_metadata = process_structure_metadata(vault_structure=vault_structure, model_name=model_name,
Expand All @@ -47,15 +49,13 @@ def stage(model_name, source_model: dict, derived_columns=None, null_columns=Non
Generate a stage model template
:param model_name: Name of the model file
:param source_model: Model to select from
:param hashed_columns: Dictionary of hashed columns, can be None
:param derived_columns: Dictionary of derived column, can be None
:param hashed_columns: Dictionary of hashed columns, can be None
:param ranked_columns: Dictionary of ranked columns, can be None
:param null_columns: Dictionary of null columns, can be None
:param include_source_columns: Boolean: Whether to extract source columns from source table
:param depends_on: depends on string if provided
:param config: Optional model config
:param depends_on: Optional forced dependency
:param depends_on: depends on string if provided
"""

template = f"""
Expand Down Expand Up @@ -339,6 +339,32 @@ def bridge(model_name, src_pk, as_of_dates_table, bridge_walk, stage_tables_ldts
template_to_file(template, model_name)


def sts(model_name, src_pk, src_ldts, src_source,
src_status, src_hashdiff, source_model,
config, depends_on=""):
"""
Generate a satellite model template
:param model_name: Name of the model file
:param src_pk: Source pk
:param src_ldts: Source load date timestamp
:param src_source: Source record source column
:param src_status: Source record status
:param src_hashdiff: Column for hashdiff calculation
:param source_model: Model name to select from
:param config: Optional model config
:param depends_on: Optional forced dependency
"""

template = f"""
{depends_on}
{{{{ config({config}) }}}}
{{{{ dbtvault.sts(src_pk={src_pk}, src_ldts={src_ldts}, src_source={src_source},
src_status={src_status}, src_hashdiff={src_hashdiff}, source_model={source_model}) }}}}
"""

template_to_file(template, model_name)


def macro_model(model_name, macro_name, metadata=None):
"""
Generate a model containing a call to a macro
Expand Down Expand Up @@ -539,7 +565,8 @@ def process_structure_metadata(vault_structure, model_name, config, **kwargs):
"t_link": "incremental",
"ma_sat": "incremental",
"pit": "pit_incremental",
"bridge": "bridge_incremental"
"bridge": "bridge_incremental",
"sts": "incremental",
}

if config:
Expand Down
6 changes: 6 additions & 0 deletions test/features/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from test.features.staging import fixtures_staging
from test.features.t_links import fixtures_t_link
from test.features.xts import fixtures_xts
from test.features.sts import fixtures_sts

fixture_registry_utils = {
"fixture.enable_sha": behave_fixtures.enable_sha,
Expand Down Expand Up @@ -192,6 +193,11 @@
"sqlserver": fixtures_cycle.cycle_custom_null_key_sqlserver,
"databricks": ''},

"fixture.sts":
{"snowflake": fixtures_sts.sts_snowflake,
"bigquery": fixtures_sts.sts_bigquery,
"sqlserver": fixtures_sts.sts_sqlserver,
"databricks": ''},
}

fixture_registry_snowflake = {k: v['snowflake'] for k, v in fixtures_registry.items()}
Expand Down
130 changes: 130 additions & 0 deletions test/features/sts/fixtures_sts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from behave import fixture


@fixture
def sts_snowflake(context):
"""
Define the structures and metadata to load Status Tracking Satellites (STS)
"""

context.hashed_columns = {
"STG_CUSTOMER": {
"CUSTOMER_PK": "CUSTOMER_ID"
}
}

context.vault_structure_columns = {
"STS": {
"src_pk": "CUSTOMER_PK",
"src_status": "STATUS",
"src_ldts": "LOAD_DATE",
"src_source": "SOURCE",
"src_hashdiff": "STATUS_HASHDIFF"
}
}

context.seed_config = {
"RAW_STAGE": {
"column_types": {
"CUSTOMER_ID": "NUMBER(38, 0)",
"CUSTOMER_NAME": "VARCHAR",
"LOAD_DATE": "DATE",
"SOURCE": "VARCHAR"
}
},
"STS": {
"column_types": {
"CUSTOMER_PK": "BINARY(16)",
"LOAD_DATE": "DATE",
"SOURCE": "VARCHAR",
"STATUS": "VARCHAR",
"STATUS_HASHDIFF": "BINARY(16)"
}
}
}

# SQLServer

@fixture
def sts_sqlserver(context):
"""
Define the structures and metadata to load Status Tracking Satellites (STS)
"""

context.hashed_columns = {
"STG_CUSTOMER": {
"CUSTOMER_PK": "CUSTOMER_ID"
}
}

context.vault_structure_columns = {
"STS": {
"src_pk": "CUSTOMER_PK",
"src_status": "STATUS",
"src_ldts": "LOAD_DATE",
"src_source": "SOURCE",
"src_hashdiff": "STATUS_HASHDIFF"
}
}

context.seed_config = {
"RAW_STAGE": {
"column_types": {
"CUSTOMER_ID": "DECIMAL(38, 0)",
"CUSTOMER_NAME": "VARCHAR(10)",
"LOAD_DATE": "DATE",
"SOURCE": "VARCHAR(10)"
}
},
"STS": {
"column_types": {
"CUSTOMER_PK": "BINARY(16)",
"LOAD_DATE": "DATE",
"SOURCE": "VARCHAR(10",
"STATUS": "VARCHAR(10"
}
}
}

# BigQuery

@fixture
def sts_bigquery(context):
"""
Define the structures and metadata to load Status Tracking Satellites (STS)
"""

context.hashed_columns = {
"STG_CUSTOMER": {
"CUSTOMER_PK": "CUSTOMER_ID"
}
}

context.vault_structure_columns = {
"STS": {
"src_pk": "CUSTOMER_PK",
"src_status": "STATUS",
"src_ldts": "LOAD_DATE",
"src_source": "SOURCE",
"src_hashdiff": "STATUS_HASHDIFF"
}
}

context.seed_config = {
"RAW_STAGE": {
"column_types": {
"CUSTOMER_ID": "STRING",
"CUSTOMER_NAME": "STRING",
"LOAD_DATE": "DATE",
"SOURCE": "STRING"
}
},
"STS": {
"column_types": {
"CUSTOMER_PK": "STRING",
"LOAD_DATE": "DATE",
"SOURCE": "STRING",
"STATUS": "STRING"
}
}
}
Loading