Skip to content

Commit 464321c

Browse files
author
Radosław Dziadosz
committed
Add dbt-cloud integration command to dp cli
# CR
1 parent 8584855 commit 464321c

File tree

5 files changed

+60
-38
lines changed

5 files changed

+60
-38
lines changed

data_pipelines_cli/cli_commands/dbtcloud.py

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import json
2+
import sys
23
from typing import Any, Dict
34

45
import click
56

67
from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient
78
from ..cli_constants import BUILD_DIR
89
from ..cli_utils import echo_info
9-
from ..config_generation import read_dictionary_from_config_directory
10+
from ..config_generation import read_dictionary_from_config_directory, generate_profiles_yml
11+
from ..dbt_utils import _dump_dbt_vars_from_configs_to_string, run_dbt_command
1012

1113

1214
def read_dbtcloud_config() -> Dict[str, Any]:
@@ -33,6 +35,26 @@ def read_bigquery_config(env: str) -> Dict[str, Any]:
3335
return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml")
3436

3537

38+
def resolve_env_var(unresolved_text, env):
39+
"""
40+
Resolves environment variables and jinja in the given text using the dbt show command.
41+
:param unresolved_text: Text to be resolved
42+
:param env: Environment dir name
43+
:return: Parsed text
44+
"""
45+
profiles_path = generate_profiles_yml(env, False)
46+
dbt_command_result_bytes = run_dbt_command(
47+
("show", "--inline", f"SELECT '{unresolved_text}' AS parsed", "--output", "json"),
48+
env,
49+
profiles_path,
50+
log_format_json=True,
51+
capture_output=True)
52+
decoded_output = dbt_command_result_bytes.stdout.decode(encoding=sys.stdout.encoding or "utf-8")
53+
for line in map(json.loads, decoded_output.splitlines()):
54+
if line.get('data', {}).get('node_name') == "inline_query":
55+
return json.loads(line["data"]["preview"])[0]["parsed"]
56+
57+
3658
@click.command(name="configure-cloud", help="Create dbt Cloud project")
3759
@click.option(
3860
"--account_id",
@@ -72,29 +94,33 @@ def configure_cloud_command(
7294
client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token)
7395

7496
dbtcloud_config = read_dbtcloud_config()
97+
base_bq_config = read_bigquery_config("base")
7598
file = open(keyfile)
7699
keyfile_data = json.load(file)
77-
project_id = client.create_project(dbtcloud_config["project_name"])
78-
(repository_id, deploy_key) = client.create_repository(project_id, remote_url)
100+
dbtcloud_project_id = client.create_project(dbtcloud_config["project_name"])
101+
(repository_id, deploy_key) = client.create_repository(dbtcloud_project_id, remote_url)
79102
echo_info("You need to add the generated key text as a deploy key to the target repository.\n"
80103
"This gives dbt Cloud permissions to read / write in the repository\n"
81104
f"{deploy_key}")
82105

83106
environments_projects = {}
84107
for environment in dbtcloud_config["environments"]:
85-
environment_id = create_environment(client, environment, project_id)
108+
bq_config = read_bigquery_config(environment["config_dir"])
109+
environments_projects[environment["name"]] = resolve_env_var(bq_config["project"],
110+
environment["config_dir"])
111+
environment_id = create_environment(client, environment, bq_config["dataset"],
112+
dbtcloud_project_id)
86113
if environment["type"] == "deployment":
87-
client.create_job(project_id, environment_id, dbtcloud_config["schedule_interval"],
88-
"Job - " + environment["name"])
89-
bq_config = read_bigquery_config(environment["bq_config_dir"])
90-
environments_projects[environment["name"]] = bq_config["project"]
114+
dbt_vars = _dump_dbt_vars_from_configs_to_string(environment["config_dir"]).strip()
115+
client.create_job(dbtcloud_project_id, environment_id, environment["schedule_interval"],
116+
"Job - " + environment["name"], dbt_vars)
91117

92-
client.create_environment_variable(project_id, dbtcloud_config["default_gcp_project"],
118+
client.create_environment_variable(dbtcloud_project_id, base_bq_config["project"],
93119
environments_projects)
94120

95-
connection_id = create_bq_connection(client, keyfile_data, project_id)
121+
connection_id = create_bq_connection(client, keyfile_data, dbtcloud_project_id)
96122

97-
client.associate_connection_repository(dbtcloud_config["project_name"], project_id,
123+
client.associate_connection_repository(dbtcloud_config["project_name"], dbtcloud_project_id,
98124
connection_id, repository_id)
99125

100126

@@ -111,7 +137,7 @@ def create_bq_connection(client, keyfile_data, project_id):
111137
project_id=project_id,
112138
name="BQ Connection Name",
113139
is_active=True,
114-
gcp_project_id="{{ env_var(\"DBT_GCP_PROJECT\") }} ",
140+
gcp_project_id="{{ env_var(\"DBT_GCP_PROJECT\") }}",
115141
timeout_seconds=100,
116142
private_key_id=keyfile_data["private_key_id"],
117143
private_key=keyfile_data["private_key"],
@@ -124,17 +150,18 @@ def create_bq_connection(client, keyfile_data, project_id):
124150
)
125151

126152

127-
def create_environment(client, environment, project_id):
153+
def create_environment(client, environment, dataset, project_id):
128154
"""
129155
Creates a dbt Cloud environment with the specified configuration
130156
131157
:param client: API Client
132158
:param environment: Config of environment to be created
133159
:param project_id: ID of the project
160+
:param dataset: Name of target dataset
134161
:return: ID of created environment
135162
"""
136163
if environment["type"] == "deployment":
137-
credentials_id = client.create_credentials(environment["dataset"], project_id)
164+
credentials_id = client.create_credentials(dataset, project_id)
138165
else:
139166
credentials_id = None
140167
environment_id = client.create_environment(project_id, environment["type"], environment["name"],

data_pipelines_cli/cli_commands/publish.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55

66
import click
77
import yaml
8-
from dbt.contracts.graph.compiled import ManifestNode
98
from dbt.contracts.graph.manifest import Manifest
10-
from dbt.contracts.graph.parsed import ColumnInfo
9+
from dbt.contracts.graph.nodes import ColumnInfo
10+
from dbt.contracts.graph.nodes import ManifestNode
1111

1212
from ..cli_constants import BUILD_DIR
1313
from ..cli_utils import echo_info, echo_warning

data_pipelines_cli/dbt_cloud_api_client.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ def create_environment_variable(self, project_id, default, environments):
137137
new_env = {
138138
"env_var": env_var
139139
}
140-
print(new_env)
141140
new_env_data = json.dumps(new_env)
142141

143142
response = self.request(
@@ -260,16 +259,18 @@ def create_bigquery_connection(
260259

261260
return response["data"]["id"]
262261

263-
def create_job(self, project_id, environment_id, schedule_cron, name):
262+
def create_job(self, project_id, environment_id, schedule_cron, name, vars):
264263
"""
265264
Creates sample job for given project and environment. Job is triggered by the scheduler
266265
executes commands: dbt seed, dbt test and dbt run.
267266
:param project_id: ID of the project
268267
:param environment_id: ID of the environment
269268
:param schedule_cron: Schedule (cron syntax)
270269
:param name: Name of the job
270+
:param vars: Variables passed to commands
271271
:return: ID of created job
272272
"""
273+
273274
job_details = {
274275
"account_id": self.account_id,
275276
"project_id": project_id,
@@ -282,9 +283,9 @@ def create_job(self, project_id, environment_id, schedule_cron, name):
282283
"github_webhook": False
283284
},
284285
"execute_steps": [
285-
"dbt seed",
286-
"dbt test",
287-
"dbt run"
286+
"dbt seed --vars '" + vars + "'",
287+
"dbt run --vars '" + vars + "'",
288+
"dbt test --vars '" + vars + "'"
288289
],
289290
"settings": {
290291
"threads": 1,

docs/configuration.rst

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -392,12 +392,6 @@ dbt Cloud configuration
392392
* - project_name
393393
- string
394394
- Name of the project to be created in dbt Cloud
395-
* - schedule_interval
396-
- string
397-
- The cron expression with which the example job will be run
398-
* - default_gcp_project
399-
- string
400-
- GCP project that will be used by default if a new environment is created in dbt Cloud
401395
* - environments
402396
- Array
403397
- Details of the environments to be created in dbt Cloud
@@ -414,31 +408,31 @@ Configuration of the environments:
414408
* - name
415409
- string
416410
- Name of the environment that will be created in dbt Cloud
417-
* - dataset
411+
* - type
418412
- string
419-
- Target dataset for this environment
413+
- In dbt Cloud, there are two types of environments: deployment and development. Deployment environments determine the settings used when jobs created within that environment are executed. Development environments determine the settings used in the dbt Cloud IDE for that particular dbt Cloud Project. Each dbt Cloud project can only have a single development environment but can have any number of deployment environments.
420414
* - dbt_version
421415
- string
422416
- The dbt version used in this environment
423-
* - bq_config_dir
417+
* - schedule_interval
418+
- string
419+
- The cron expression with which the example job will be run. This setting is only needed for the deployment environment.
420+
* - config_dir
424421
- string
425-
- The name of the dp env directory where the bigquery configuration for the environment is located. The name of the project in GCP will be read from it.
422+
- The name of the dp env directory where the bigquery configuration for the environment is located. The name of the project in GCP and target dataset will be read from it.
426423

427424
Example ``dbtcloud.yml`` file might look like this:
428425

429426
.. code-block:: yaml
430427
431428
project_name: "Data Pipelines Project"
432-
schedule_interval: "0 12 * * *"
433-
default_gcp_project: "default-project"
434429
environments:
435430
- name: "Develop"
436-
dataset: "dev"
437431
dbt_version: "1.0.0"
438432
type: "development"
439-
bq_config_dir: "dev"
433+
config_dir: "dev"
440434
- name: "Production"
441-
dataset: "prod"
442435
dbt_version: "1.0.0"
443436
type: "deployment"
444-
bq_config_dir: "prod"
437+
config_dir: "prod"
438+
schedule_interval: "0 12 * * *"

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"fsspec==2022.11.0",
1717
"packaging==21.3",
1818
"colorama==0.4.5",
19-
"dbt-core==1.3.1",
19+
"dbt-core==1.5.0",
2020
]
2121

2222
EXTRA_FILESYSTEMS_REQUIRE = {

0 commit comments

Comments
 (0)