From 94fdb285231106d2decb83e80e65d71abab515ba Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 23 Feb 2026 16:49:36 +0000 Subject: [PATCH 1/2] Fix dollar-quoted strings causing false "Encountered */" errors The validate_comment_markers pre-lex validation was not skipping dollar-quoted strings ($$...$$), so content like $$/\*\s*...\*/$$ was incorrectly detected as an unmatched block comment close. https://claude.ai/code/session_01HZFB5jseiADk1UEzJi1Y8E --- src/analyzer.rs | 9 +- src/lexer.rs | 2 +- .../221_dbt_config_dollar_quoted.sql | 300 ++++++++++++++++++ tests/golden_test.rs | 4 + 4 files changed, 313 insertions(+), 2 deletions(-) create mode 100644 tests/data/unformatted/221_dbt_config_dollar_quoted.sql diff --git a/src/analyzer.rs b/src/analyzer.rs index 7448274..8557b4c 100644 --- a/src/analyzer.rs +++ b/src/analyzer.rs @@ -3,7 +3,7 @@ use memchr::memchr; use crate::action::Action; use crate::comment::Comment; use crate::error::SqlfmtError; -use crate::lexer::{self, LexState}; +use crate::lexer::{self, scan_dollar_string, LexState}; use crate::line::Line; use crate::node::{Node, NodeIndex}; use crate::node_manager::NodeManager; @@ -797,6 +797,13 @@ impl Analyzer { i = skip_jinja_block(bytes, i); continue; } + if bytes[i] == b'$' { + let ds_len = scan_dollar_string(&bytes[i..]); + if ds_len > 0 { + i += ds_len; + continue; + } + } if i + 1 < len && bytes[i] == b'/' && bytes[i + 1] == b'*' { in_comment = true; i += 2; diff --git a/src/lexer.rs b/src/lexer.rs index f19c0b9..c69c2a4 100644 --- a/src/lexer.rs +++ b/src/lexer.rs @@ -394,7 +394,7 @@ fn scan_block_comment(bytes: &[u8]) -> usize { } /// Scan a dollar-quoted string ($tag$...$tag$). `bytes` starts at `$`. -fn scan_dollar_string(bytes: &[u8]) -> usize { +pub(crate) fn scan_dollar_string(bytes: &[u8]) -> usize { // Find the end of the opening tag let mut tag_end = 1; while tag_end < bytes.len() diff --git a/tests/data/unformatted/221_dbt_config_dollar_quoted.sql b/tests/data/unformatted/221_dbt_config_dollar_quoted.sql new file mode 100644 index 0000000..1dbc6fe --- /dev/null +++ b/tests/data/unformatted/221_dbt_config_dollar_quoted.sql @@ -0,0 +1,300 @@ +-- disable-parser +{{ + config( + materialized='incremental', + transient=false, + unique_key='snowflake_query_id', + meta={'final_schema': 'metadata'}, + incremental_strategy='merge', + on_schema_change='sync_all_columns', + full_refresh=false, + snowflake_warehouse=auto_scale_model(), + cluster_by=['query_start_time::date'], + ) +}} +{% set looker_query_context = "'-- Looker Query Context'" %} +SELECT qh.query_id AS snowflake_query_id + , qh.database_id AS database_id + , qh.schema_id AS schema_id + , qh.session_id AS snowflake_session_id + , qh.warehouse_id AS warehouse_id + , qh.database_name AS database_name + , qh.query_text AS query_text + , qh.query_type AS query_type + , qh.role_name AS snowflake_role_name + , qh.rows_produced AS query_result_rows_produced + , qh.schema_name AS schema_name + , qh.user_name AS snowflake_user_name + , qh.warehouse_name AS warehouse_name + , qh.warehouse_size AS warehouse_size + , qh.warehouse_type AS warehouse_type + , qh.cluster_number AS warehouse_cluster_nbr + , CASE + WHEN qh.user_name IN ('LOOKER') THEN 'looker' + WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):source = 'avalanche' THEN 'avalanche' + WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):source = 'avalanche-v2' THEN 'avalanche-v2' + -- if dag_id is present and it is not dbt then tag as "airflow" + WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):dag_id IS NOT NULL + AND TRY_PARSE_JSON(qh.query_tag::VARIANT):source != 'dbt' THEN 'airflow' + -- if source is dbt then tag as dbt even if it came from airflow + WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):source = 'dbt' THEN 'dbt' + WHEN RLIKE ( qh.query_text , '.*Query generated by Periscope Data.*' , 'mes' ) = TRUE THEN 'periscope' + END AS query_source + , TRY_PARSE_JSON(qh.query_tag::VARIANT):dag_id::TEXT AS airflow_dag_id + , TRY_PARSE_JSON(qh.query_tag::VARIANT):task_id::TEXT AS airflow_task_id + , TRY_PARSE_JSON(qh.query_tag::VARIANT):airflow_try::TEXT AS airflow_try + , TRY_PARSE_JSON(qh.query_tag::VARIANT):airflow_run_id::TEXT AS airflow_run_id + , TRY_PARSE_JSON(qh.query_tag::VARIANT):model_name::TEXT AS dbt_model + , TRY_PARSE_JSON(qh.query_tag::VARIANT):model_group::TEXT AS dbt_model_group + , TRY_PARSE_JSON(qh.query_tag::VARIANT):workstream::TEXT AS workstream + , TRY_PARSE_JSON(qh.query_tag::VARIANT) AS query_tags + , qh.execution_status AS query_status + , qh.error_code::TEXT AS error_code + , qh.error_message AS error_message + , qh.end_time AS query_end_time + , qh.start_time AS query_start_time + , qh.total_elapsed_time AS total_elapsed_time + , qh.compilation_time AS compilation_time + , qh.execution_time AS execution_time + , qh.queued_provisioning_time AS queued_provisioning_time + , qh.queued_repair_time AS queued_repair_time + , qh.queued_overload_time AS queued_overload_time + , qh.transaction_blocked_time AS transaction_blocked_time + , qh.partitions_scanned AS partitions_scanned + , qh.partitions_total AS partitions_total + , qh.credits_used_cloud_services AS credits_used_cloud_services + , qh.bytes_spilled_to_local_storage AS query_bytes_spillover_local + , qh.bytes_spilled_to_remote_storage AS query_bytes_spillover_remote + , qh.bytes_scanned AS query_bytes_scanned + , qh.rows_inserted AS rows_inserted + , qh.rows_updated AS rows_updated + , qh.rows_deleted AS rows_deleted + , TRY_PARSE_JSON( + REPLACE( + REGEXP_SUBSTR( + qh.query_text + , '(Query generated by Periscope Data.*)''' + , 1 + , 1 + , 'mes' + , 1 + ) + , 'Query generated by Periscope Data' + ) + ):user_email::VARCHAR AS periscope_user_name + , CAST(IFNULL(qah.credits_attributed_compute, 0) + IFNULL(qah.credits_used_query_acceleration, 0) + qh.credits_used_cloud_services AS NUMBER(38,12)) AS weighted_credits_used + , CAST( ( weighted_credits_used * sar.rate ) AS DECIMAL(30,12) ) AS estimated_cost + , qh.query_load_percent + , qh.child_queries_wait_time + , qh.role_type + , qh.query_hash + , qh.query_hash_version + , qh.query_parameterized_hash + , qh.query_parameterized_hash_version + , CASE + WHEN POSITION( + {{ looker_query_context }} IN qh.query_text ) > 0 THEN SUBSTR( + qh.query_text + , POSITION( + {{ looker_query_context }} IN qh.query_text ) + ) + ELSE NULL + END AS raw_looker_comment + , CASE + WHEN raw_looker_comment IS NOT NULL THEN REGEXP_SUBSTR( + raw_looker_comment + , '"user_id":([0-9]+)' + , 1 + , 1 + , 'e' + ) + ELSE NULL + END AS looker_user_id + , CASE + WHEN raw_looker_comment IS NOT NULL THEN REGEXP_SUBSTR( + raw_looker_comment + , '"history_slug":"([^"]+)"' + , 1 + , 1 + , 'e' + ) + ELSE NULL + END AS looker_history_slug + , CASE + WHEN raw_looker_comment IS NOT NULL THEN REGEXP_SUBSTR( + raw_looker_comment + , '"instance_slug":"([^"]+)"' + , 1 + , 1 + , 'e' + ) + ELSE NULL + END AS looker_instance_slug + , try_parse_json(regexp_substr(query_text, $$/\*\s*({.*"app":.*})\s*\*/$$, 1, 1, 'ie')) as select_dbt_comment_meta + , qh.list_external_files_time + , qh.query_acceleration_bytes_scanned + , TRY_PARSE_JSON(qh.query_tag::VARIANT):team::TEXT AS pod + , TRY_PARSE_JSON(qh.query_tag::VARIANT):service::TEXT AS avalanche_service + , TRY_PARSE_JSON(qh.query_tag::VARIANT):table_name::TEXT AS avalanche_target_table + FROM {{ source('snowflake', 'query_history') }} AS qh + LEFT JOIN {{ ref('snowflake_amortized_rates') }} AS sar + ON DATE_TRUNC(DAY, qh.start_time) = sar.date_day + LEFT JOIN {{ source('snowflake', 'query_attribution_history') }} AS qah + ON qh.query_id = qah.query_id +{% if is_incremental() %} +-- this filter will only be applied on an incremental run + WHERE qh.start_time > ( + SELECT DATEADD(DAY, -2, MAX(query_start_time)) + FROM {{ this }} + ) +QUALIFY ROW_NUMBER() OVER (PARTITION BY qh.query_id ORDER BY qh.start_time DESC) = 1 +{% endif %} +)))))__SQLFMT_OUTPUT__((((( +-- disable-parser +{{ + config( + materialized='incremental', + transient=false, + unique_key='snowflake_query_id', + meta={'final_schema': 'metadata'}, + incremental_strategy='merge', + on_schema_change='sync_all_columns', + full_refresh=false, + snowflake_warehouse=auto_scale_model(), + cluster_by=['query_start_time::date'], + ) +}} +{% set looker_query_context = "'-- Looker Query Context'" %} +select + qh.query_id as snowflake_query_id, + qh.database_id as database_id, + qh.schema_id as schema_id, + qh.session_id as snowflake_session_id, + qh.warehouse_id as warehouse_id, + qh.database_name as database_name, + qh.query_text as query_text, + qh.query_type as query_type, + qh.role_name as snowflake_role_name, + qh.rows_produced as query_result_rows_produced, + qh.schema_name as schema_name, + qh.user_name as snowflake_user_name, + qh.warehouse_name as warehouse_name, + qh.warehouse_size as warehouse_size, + qh.warehouse_type as warehouse_type, + qh.cluster_number as warehouse_cluster_nbr, + case + when qh.user_name in ('LOOKER') + then 'looker' + when try_parse_json(qh.query_tag::variant):source = 'avalanche' + then 'avalanche' + when try_parse_json(qh.query_tag::variant):source = 'avalanche-v2' + then 'avalanche-v2' + -- if dag_id is present and it is not dbt then tag as "airflow" + when + try_parse_json(qh.query_tag::variant):dag_id is not null + and try_parse_json(qh.query_tag::variant):source != 'dbt' + then 'airflow' + -- if source is dbt then tag as dbt even if it came from airflow + when try_parse_json(qh.query_tag::variant):source = 'dbt' + then 'dbt' + when + rlike (qh.query_text, '.*Query generated by Periscope Data.*', 'mes') = true + then 'periscope' + end as query_source, + try_parse_json(qh.query_tag::variant):dag_id::text as airflow_dag_id, + try_parse_json(qh.query_tag::variant):task_id::text as airflow_task_id, + try_parse_json(qh.query_tag::variant):airflow_try::text as airflow_try, + try_parse_json(qh.query_tag::variant):airflow_run_id::text as airflow_run_id, + try_parse_json(qh.query_tag::variant):model_name::text as dbt_model, + try_parse_json(qh.query_tag::variant):model_group::text as dbt_model_group, + try_parse_json(qh.query_tag::variant):workstream::text as workstream, + try_parse_json(qh.query_tag::variant) as query_tags, + qh.execution_status as query_status, + qh.error_code::text as error_code, + qh.error_message as error_message, + qh.end_time as query_end_time, + qh.start_time as query_start_time, + qh.total_elapsed_time as total_elapsed_time, + qh.compilation_time as compilation_time, + qh.execution_time as execution_time, + qh.queued_provisioning_time as queued_provisioning_time, + qh.queued_repair_time as queued_repair_time, + qh.queued_overload_time as queued_overload_time, + qh.transaction_blocked_time as transaction_blocked_time, + qh.partitions_scanned as partitions_scanned, + qh.partitions_total as partitions_total, + qh.credits_used_cloud_services as credits_used_cloud_services, + qh.bytes_spilled_to_local_storage as query_bytes_spillover_local, + qh.bytes_spilled_to_remote_storage as query_bytes_spillover_remote, + qh.bytes_scanned as query_bytes_scanned, + qh.rows_inserted as rows_inserted, + qh.rows_updated as rows_updated, + qh.rows_deleted as rows_deleted, + try_parse_json( + replace( + regexp_substr( + qh.query_text, + '(Query generated by Periscope Data.*)' '', + 1, + 1, + 'mes', + 1 + ), + 'Query generated by Periscope Data' + ) + ):user_email::varchar as periscope_user_name, + cast( + ifnull(qah.credits_attributed_compute, 0) + + ifnull(qah.credits_used_query_acceleration, 0) + + qh.credits_used_cloud_services as number(38, 12) + ) as weighted_credits_used, + cast((weighted_credits_used * sar.rate) as decimal(30, 12)) as estimated_cost, + qh.query_load_percent, + qh.child_queries_wait_time, + qh.role_type, + qh.query_hash, + qh.query_hash_version, + qh.query_parameterized_hash, + qh.query_parameterized_hash_version, + case + when position({{ looker_query_context }} in qh.query_text) > 0 + then + substr(qh.query_text, position({{ looker_query_context }} in qh.query_text)) + else null + end as raw_looker_comment, + case + when raw_looker_comment is not null + then regexp_substr(raw_looker_comment, '"user_id":([0-9]+)', 1, 1, 'e') + else null + end as looker_user_id, + case + when raw_looker_comment is not null + then regexp_substr(raw_looker_comment, '"history_slug":"([^"]+)"', 1, 1, 'e') + else null + end as looker_history_slug, + case + when raw_looker_comment is not null + then regexp_substr(raw_looker_comment, '"instance_slug":"([^"]+)"', 1, 1, 'e') + else null + end as looker_instance_slug, + try_parse_json( + regexp_substr(query_text, $$/\*\s*({.*"app":.*})\s*\*/$$, 1, 1, 'ie') + ) as select_dbt_comment_meta, + qh.list_external_files_time, + qh.query_acceleration_bytes_scanned, + try_parse_json(qh.query_tag::variant):team::text as pod, + try_parse_json(qh.query_tag::variant):service::text as avalanche_service, + try_parse_json(qh.query_tag::variant):table_name::text as avalanche_target_table +from {{ source("snowflake", "query_history") }} as qh +left join + {{ ref("snowflake_amortized_rates") }} as sar + on date_trunc(day, qh.start_time) = sar.date_day +left join + {{ source("snowflake", "query_attribution_history") }} as qah + on qh.query_id = qah.query_id +{% if is_incremental() %} + -- this filter will only be applied on an incremental run + where + qh.start_time > (select dateadd(day, -2, max(query_start_time)) from {{ this }}) + qualify row_number() over (partition by qh.query_id order by qh.start_time desc) = 1 +{% endif %} diff --git a/tests/golden_test.rs b/tests/golden_test.rs index 1d07f40..bad0137 100644 --- a/tests/golden_test.rs +++ b/tests/golden_test.rs @@ -459,6 +459,10 @@ golden_test_clickhouse!( golden_unformatted_220_clickhouse_joins, "tests/data/unformatted/220_clickhouse_joins.sql" ); +golden_test!( + golden_unformatted_221_dbt_config_dollar_quoted, + "tests/data/unformatted/221_dbt_config_dollar_quoted.sql" +); // ============================================================================= // Unformatted golden tests — 300-series (Jinja formatting) From 70da8e9f4c5221516b7b47ebd846312f76d8c796 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 23 Feb 2026 16:59:39 +0000 Subject: [PATCH 2/2] Sanitize and distill dollar-quoted string test fixture Replace proprietary SQL with a minimal, generic fixture that exercises the same bug: dollar-quoted strings containing /* and */ patterns being misinterpreted as block comment markers. Reduces the fixture from ~300 lines to ~65 lines while preserving all key elements (dbt config block, dollar-quoted regex, jinja refs, incremental block). https://claude.ai/code/session_01HZFB5jseiADk1UEzJi1Y8E --- .../221_dbt_config_dollar_quoted.sql | 306 ++---------------- 1 file changed, 35 insertions(+), 271 deletions(-) diff --git a/tests/data/unformatted/221_dbt_config_dollar_quoted.sql b/tests/data/unformatted/221_dbt_config_dollar_quoted.sql index 1dbc6fe..d8d0a1e 100644 --- a/tests/data/unformatted/221_dbt_config_dollar_quoted.sql +++ b/tests/data/unformatted/221_dbt_config_dollar_quoted.sql @@ -3,298 +3,62 @@ config( materialized='incremental', transient=false, - unique_key='snowflake_query_id', - meta={'final_schema': 'metadata'}, + unique_key='event_id', + meta={'final_schema': 'analytics'}, incremental_strategy='merge', on_schema_change='sync_all_columns', full_refresh=false, - snowflake_warehouse=auto_scale_model(), - cluster_by=['query_start_time::date'], ) }} -{% set looker_query_context = "'-- Looker Query Context'" %} -SELECT qh.query_id AS snowflake_query_id - , qh.database_id AS database_id - , qh.schema_id AS schema_id - , qh.session_id AS snowflake_session_id - , qh.warehouse_id AS warehouse_id - , qh.database_name AS database_name - , qh.query_text AS query_text - , qh.query_type AS query_type - , qh.role_name AS snowflake_role_name - , qh.rows_produced AS query_result_rows_produced - , qh.schema_name AS schema_name - , qh.user_name AS snowflake_user_name - , qh.warehouse_name AS warehouse_name - , qh.warehouse_size AS warehouse_size - , qh.warehouse_type AS warehouse_type - , qh.cluster_number AS warehouse_cluster_nbr +{% set app_comment_pattern = "'-- App Context'" %} +SELECT e.event_id AS event_id + , e.event_type AS event_type + , e.user_id AS user_id + , e.created_at AS created_at , CASE - WHEN qh.user_name IN ('LOOKER') THEN 'looker' - WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):source = 'avalanche' THEN 'avalanche' - WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):source = 'avalanche-v2' THEN 'avalanche-v2' - -- if dag_id is present and it is not dbt then tag as "airflow" - WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):dag_id IS NOT NULL - AND TRY_PARSE_JSON(qh.query_tag::VARIANT):source != 'dbt' THEN 'airflow' - -- if source is dbt then tag as dbt even if it came from airflow - WHEN TRY_PARSE_JSON(qh.query_tag::VARIANT):source = 'dbt' THEN 'dbt' - WHEN RLIKE ( qh.query_text , '.*Query generated by Periscope Data.*' , 'mes' ) = TRUE THEN 'periscope' - END AS query_source - , TRY_PARSE_JSON(qh.query_tag::VARIANT):dag_id::TEXT AS airflow_dag_id - , TRY_PARSE_JSON(qh.query_tag::VARIANT):task_id::TEXT AS airflow_task_id - , TRY_PARSE_JSON(qh.query_tag::VARIANT):airflow_try::TEXT AS airflow_try - , TRY_PARSE_JSON(qh.query_tag::VARIANT):airflow_run_id::TEXT AS airflow_run_id - , TRY_PARSE_JSON(qh.query_tag::VARIANT):model_name::TEXT AS dbt_model - , TRY_PARSE_JSON(qh.query_tag::VARIANT):model_group::TEXT AS dbt_model_group - , TRY_PARSE_JSON(qh.query_tag::VARIANT):workstream::TEXT AS workstream - , TRY_PARSE_JSON(qh.query_tag::VARIANT) AS query_tags - , qh.execution_status AS query_status - , qh.error_code::TEXT AS error_code - , qh.error_message AS error_message - , qh.end_time AS query_end_time - , qh.start_time AS query_start_time - , qh.total_elapsed_time AS total_elapsed_time - , qh.compilation_time AS compilation_time - , qh.execution_time AS execution_time - , qh.queued_provisioning_time AS queued_provisioning_time - , qh.queued_repair_time AS queued_repair_time - , qh.queued_overload_time AS queued_overload_time - , qh.transaction_blocked_time AS transaction_blocked_time - , qh.partitions_scanned AS partitions_scanned - , qh.partitions_total AS partitions_total - , qh.credits_used_cloud_services AS credits_used_cloud_services - , qh.bytes_spilled_to_local_storage AS query_bytes_spillover_local - , qh.bytes_spilled_to_remote_storage AS query_bytes_spillover_remote - , qh.bytes_scanned AS query_bytes_scanned - , qh.rows_inserted AS rows_inserted - , qh.rows_updated AS rows_updated - , qh.rows_deleted AS rows_deleted - , TRY_PARSE_JSON( - REPLACE( - REGEXP_SUBSTR( - qh.query_text - , '(Query generated by Periscope Data.*)''' - , 1 - , 1 - , 'mes' - , 1 - ) - , 'Query generated by Periscope Data' - ) - ):user_email::VARCHAR AS periscope_user_name - , CAST(IFNULL(qah.credits_attributed_compute, 0) + IFNULL(qah.credits_used_query_acceleration, 0) + qh.credits_used_cloud_services AS NUMBER(38,12)) AS weighted_credits_used - , CAST( ( weighted_credits_used * sar.rate ) AS DECIMAL(30,12) ) AS estimated_cost - , qh.query_load_percent - , qh.child_queries_wait_time - , qh.role_type - , qh.query_hash - , qh.query_hash_version - , qh.query_parameterized_hash - , qh.query_parameterized_hash_version - , CASE - WHEN POSITION( - {{ looker_query_context }} IN qh.query_text ) > 0 THEN SUBSTR( - qh.query_text - , POSITION( - {{ looker_query_context }} IN qh.query_text ) - ) - ELSE NULL - END AS raw_looker_comment - , CASE - WHEN raw_looker_comment IS NOT NULL THEN REGEXP_SUBSTR( - raw_looker_comment - , '"user_id":([0-9]+)' - , 1 - , 1 - , 'e' - ) - ELSE NULL - END AS looker_user_id - , CASE - WHEN raw_looker_comment IS NOT NULL THEN REGEXP_SUBSTR( - raw_looker_comment - , '"history_slug":"([^"]+)"' - , 1 - , 1 - , 'e' - ) - ELSE NULL - END AS looker_history_slug - , CASE - WHEN raw_looker_comment IS NOT NULL THEN REGEXP_SUBSTR( - raw_looker_comment - , '"instance_slug":"([^"]+)"' - , 1 - , 1 - , 'e' - ) - ELSE NULL - END AS looker_instance_slug - , try_parse_json(regexp_substr(query_text, $$/\*\s*({.*"app":.*})\s*\*/$$, 1, 1, 'ie')) as select_dbt_comment_meta - , qh.list_external_files_time - , qh.query_acceleration_bytes_scanned - , TRY_PARSE_JSON(qh.query_tag::VARIANT):team::TEXT AS pod - , TRY_PARSE_JSON(qh.query_tag::VARIANT):service::TEXT AS avalanche_service - , TRY_PARSE_JSON(qh.query_tag::VARIANT):table_name::TEXT AS avalanche_target_table - FROM {{ source('snowflake', 'query_history') }} AS qh - LEFT JOIN {{ ref('snowflake_amortized_rates') }} AS sar - ON DATE_TRUNC(DAY, qh.start_time) = sar.date_day - LEFT JOIN {{ source('snowflake', 'query_attribution_history') }} AS qah - ON qh.query_id = qah.query_id -{% if is_incremental() %} --- this filter will only be applied on an incremental run - WHERE qh.start_time > ( - SELECT DATEADD(DAY, -2, MAX(query_start_time)) + WHEN e.source = 'api' THEN 'api' + WHEN e.source = 'web' THEN 'web' + ELSE 'other' + END AS event_source + , try_parse_json(regexp_substr(e.payload, $$/\*\s*({.*"app":.*})\s*\*/$$, 1, 1, 'ie')) as event_meta + , e.duration_ms AS duration_ms + FROM {{ source('app', 'events') }} AS e + LEFT JOIN {{ ref('dim_users') }} AS u + ON e.user_id = u.user_id +{% if is_incremental() %} + WHERE e.created_at > ( + SELECT DATEADD(DAY, -2, MAX(created_at)) FROM {{ this }} ) -QUALIFY ROW_NUMBER() OVER (PARTITION BY qh.query_id ORDER BY qh.start_time DESC) = 1 -{% endif %} +{% endif %} )))))__SQLFMT_OUTPUT__((((( -- disable-parser {{ config( materialized='incremental', transient=false, - unique_key='snowflake_query_id', - meta={'final_schema': 'metadata'}, + unique_key='event_id', + meta={'final_schema': 'analytics'}, incremental_strategy='merge', on_schema_change='sync_all_columns', full_refresh=false, - snowflake_warehouse=auto_scale_model(), - cluster_by=['query_start_time::date'], ) }} -{% set looker_query_context = "'-- Looker Query Context'" %} +{% set app_comment_pattern = "'-- App Context'" %} select - qh.query_id as snowflake_query_id, - qh.database_id as database_id, - qh.schema_id as schema_id, - qh.session_id as snowflake_session_id, - qh.warehouse_id as warehouse_id, - qh.database_name as database_name, - qh.query_text as query_text, - qh.query_type as query_type, - qh.role_name as snowflake_role_name, - qh.rows_produced as query_result_rows_produced, - qh.schema_name as schema_name, - qh.user_name as snowflake_user_name, - qh.warehouse_name as warehouse_name, - qh.warehouse_size as warehouse_size, - qh.warehouse_type as warehouse_type, - qh.cluster_number as warehouse_cluster_nbr, - case - when qh.user_name in ('LOOKER') - then 'looker' - when try_parse_json(qh.query_tag::variant):source = 'avalanche' - then 'avalanche' - when try_parse_json(qh.query_tag::variant):source = 'avalanche-v2' - then 'avalanche-v2' - -- if dag_id is present and it is not dbt then tag as "airflow" - when - try_parse_json(qh.query_tag::variant):dag_id is not null - and try_parse_json(qh.query_tag::variant):source != 'dbt' - then 'airflow' - -- if source is dbt then tag as dbt even if it came from airflow - when try_parse_json(qh.query_tag::variant):source = 'dbt' - then 'dbt' - when - rlike (qh.query_text, '.*Query generated by Periscope Data.*', 'mes') = true - then 'periscope' - end as query_source, - try_parse_json(qh.query_tag::variant):dag_id::text as airflow_dag_id, - try_parse_json(qh.query_tag::variant):task_id::text as airflow_task_id, - try_parse_json(qh.query_tag::variant):airflow_try::text as airflow_try, - try_parse_json(qh.query_tag::variant):airflow_run_id::text as airflow_run_id, - try_parse_json(qh.query_tag::variant):model_name::text as dbt_model, - try_parse_json(qh.query_tag::variant):model_group::text as dbt_model_group, - try_parse_json(qh.query_tag::variant):workstream::text as workstream, - try_parse_json(qh.query_tag::variant) as query_tags, - qh.execution_status as query_status, - qh.error_code::text as error_code, - qh.error_message as error_message, - qh.end_time as query_end_time, - qh.start_time as query_start_time, - qh.total_elapsed_time as total_elapsed_time, - qh.compilation_time as compilation_time, - qh.execution_time as execution_time, - qh.queued_provisioning_time as queued_provisioning_time, - qh.queued_repair_time as queued_repair_time, - qh.queued_overload_time as queued_overload_time, - qh.transaction_blocked_time as transaction_blocked_time, - qh.partitions_scanned as partitions_scanned, - qh.partitions_total as partitions_total, - qh.credits_used_cloud_services as credits_used_cloud_services, - qh.bytes_spilled_to_local_storage as query_bytes_spillover_local, - qh.bytes_spilled_to_remote_storage as query_bytes_spillover_remote, - qh.bytes_scanned as query_bytes_scanned, - qh.rows_inserted as rows_inserted, - qh.rows_updated as rows_updated, - qh.rows_deleted as rows_deleted, - try_parse_json( - replace( - regexp_substr( - qh.query_text, - '(Query generated by Periscope Data.*)' '', - 1, - 1, - 'mes', - 1 - ), - 'Query generated by Periscope Data' - ) - ):user_email::varchar as periscope_user_name, - cast( - ifnull(qah.credits_attributed_compute, 0) - + ifnull(qah.credits_used_query_acceleration, 0) - + qh.credits_used_cloud_services as number(38, 12) - ) as weighted_credits_used, - cast((weighted_credits_used * sar.rate) as decimal(30, 12)) as estimated_cost, - qh.query_load_percent, - qh.child_queries_wait_time, - qh.role_type, - qh.query_hash, - qh.query_hash_version, - qh.query_parameterized_hash, - qh.query_parameterized_hash_version, - case - when position({{ looker_query_context }} in qh.query_text) > 0 - then - substr(qh.query_text, position({{ looker_query_context }} in qh.query_text)) - else null - end as raw_looker_comment, - case - when raw_looker_comment is not null - then regexp_substr(raw_looker_comment, '"user_id":([0-9]+)', 1, 1, 'e') - else null - end as looker_user_id, - case - when raw_looker_comment is not null - then regexp_substr(raw_looker_comment, '"history_slug":"([^"]+)"', 1, 1, 'e') - else null - end as looker_history_slug, + e.event_id as event_id, + e.event_type as event_type, + e.user_id as user_id, + e.created_at as created_at, case - when raw_looker_comment is not null - then regexp_substr(raw_looker_comment, '"instance_slug":"([^"]+)"', 1, 1, 'e') - else null - end as looker_instance_slug, + when e.source = 'api' then 'api' when e.source = 'web' then 'web' else 'other' + end as event_source, try_parse_json( - regexp_substr(query_text, $$/\*\s*({.*"app":.*})\s*\*/$$, 1, 1, 'ie') - ) as select_dbt_comment_meta, - qh.list_external_files_time, - qh.query_acceleration_bytes_scanned, - try_parse_json(qh.query_tag::variant):team::text as pod, - try_parse_json(qh.query_tag::variant):service::text as avalanche_service, - try_parse_json(qh.query_tag::variant):table_name::text as avalanche_target_table -from {{ source("snowflake", "query_history") }} as qh -left join - {{ ref("snowflake_amortized_rates") }} as sar - on date_trunc(day, qh.start_time) = sar.date_day -left join - {{ source("snowflake", "query_attribution_history") }} as qah - on qh.query_id = qah.query_id + regexp_substr(e.payload, $$/\*\s*({.*"app":.*})\s*\*/$$, 1, 1, 'ie') + ) as event_meta, + e.duration_ms as duration_ms +from {{ source("app", "events") }} as e +left join {{ ref("dim_users") }} as u on e.user_id = u.user_id {% if is_incremental() %} - -- this filter will only be applied on an incremental run - where - qh.start_time > (select dateadd(day, -2, max(query_start_time)) from {{ this }}) - qualify row_number() over (partition by qh.query_id order by qh.start_time desc) = 1 + where e.created_at > (select dateadd(day, -2, max(created_at)) from {{ this }}) {% endif %}