Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 11a85a6

Browse files
author
Ben Edwards
authored
Merge pull request #76 from pageuppeople-opensource/preserve-after-deletion
Preserve after deletion
2 parents 9454273 + 6b116cf commit 11a85a6

File tree

1 file changed

+184
-172
lines changed

1 file changed

+184
-172
lines changed

rdl/DestinationTableManager.py

Lines changed: 184 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -1,172 +1,184 @@
1-
import io
2-
import os
3-
import logging
4-
from rdl.ColumnTypeResolver import ColumnTypeResolver
5-
6-
from sqlalchemy import MetaData, DateTime, Boolean, BigInteger
7-
from sqlalchemy.schema import Column, Table
8-
from sqlalchemy.sql import func
9-
from rdl.shared import Providers
10-
11-
12-
class DestinationTableManager(object):
13-
def __init__(self, target_db, logger=None):
14-
self.logger = logger or logging.getLogger(__name__)
15-
self.target_db = target_db
16-
self.column_type_resolver = ColumnTypeResolver()
17-
18-
def create_schema(self, schema_name):
19-
self.target_db.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
20-
21-
def table_exists(self, schema_name, table_name):
22-
return self.target_db.dialect.has_table(self.target_db, table_name, schema_name)
23-
24-
def drop_table(self, schema_name, table_name):
25-
metadata = MetaData()
26-
self.logger.debug(f"Dropping table {schema_name}.{table_name}")
27-
28-
table = Table(table_name, metadata, schema=schema_name)
29-
table.drop(self.target_db, checkfirst=True)
30-
31-
self.logger.debug(f"Dropped table {schema_name}.{table_name}")
32-
33-
def create_table(self, schema_name, table_name, columns_configuration, drop_first):
34-
metadata = MetaData()
35-
36-
table = Table(table_name, metadata, schema=schema_name)
37-
38-
for column_configuration in columns_configuration:
39-
table.append_column(self.create_column(column_configuration["destination"]))
40-
41-
table.append_column(
42-
Column(
43-
Providers.AuditColumnsNames.TIMESTAMP,
44-
DateTime(timezone=True),
45-
server_default=func.now(),
46-
)
47-
)
48-
49-
table.append_column(
50-
Column(
51-
Providers.AuditColumnsNames.IS_DELETED,
52-
Boolean,
53-
server_default="f",
54-
default=False,
55-
)
56-
)
57-
58-
table.append_column(
59-
Column(Providers.AuditColumnsNames.CHANGE_VERSION, BigInteger)
60-
)
61-
62-
if drop_first:
63-
self.logger.debug(f"Dropping table {schema_name}.{table_name}")
64-
table.drop(self.target_db, checkfirst=True)
65-
self.logger.debug(f"Dropped table {schema_name}.{table_name}")
66-
67-
self.logger.debug(f"Creating table {schema_name}.{table_name}")
68-
table.create(self.target_db, checkfirst=False)
69-
self.logger.debug(f"Created table {schema_name}.{table_name}")
70-
71-
return
72-
73-
def create_column(self, configuration):
74-
return Column(
75-
configuration["name"],
76-
self.column_type_resolver.resolve_postgres_type(configuration),
77-
primary_key=configuration.get("primary_key", False),
78-
nullable=configuration["nullable"],
79-
)
80-
81-
def rename_table(self, schema_name, source_table_name, target_table_name):
82-
83-
# Steps to efficiently rename a table.
84-
# 1. Drop target_old if exists.
85-
# 2. Begin transaction
86-
# 3. Rename target to target_old if it exists.
87-
# 4. Rename source to target
88-
# 5. commit
89-
# 6. Drop target_old if it exists.
90-
91-
old_load_table_name = f"{target_table_name}__old"
92-
93-
# Step 1
94-
sql = f"DROP TABLE IF EXISTS {schema_name}.{old_load_table_name} CASCADE; "
95-
self.logger.debug(f"Table Rename, executing '{sql}'")
96-
self.target_db.execute(sql)
97-
98-
# Step 2
99-
sql_builder = io.StringIO()
100-
sql_builder.write("BEGIN TRANSACTION; ")
101-
102-
# Step 3
103-
sql_builder.write(
104-
f"ALTER TABLE IF EXISTS {schema_name}.{target_table_name} RENAME TO {old_load_table_name}; "
105-
)
106-
107-
# Step 4
108-
sql_builder.write(
109-
f"ALTER TABLE {schema_name}.{source_table_name} RENAME TO {target_table_name}; "
110-
)
111-
112-
sql_builder.write("COMMIT TRANSACTION; ")
113-
self.logger.debug(f"Table Rename, executing '{sql_builder.getvalue()}'")
114-
self.target_db.execute(sql_builder.getvalue())
115-
116-
sql_builder.close()
117-
118-
sql = f"DROP TABLE IF EXISTS {schema_name}.{old_load_table_name} CASCADE "
119-
self.logger.debug(f"Table Rename, executing '{sql}'")
120-
self.target_db.execute(sql)
121-
122-
def upsert_table(
123-
self, schema_name, source_table_name, target_table_name, columns_config
124-
):
125-
column_array = list(
126-
map(lambda column: column["destination"]["name"], columns_config)
127-
)
128-
column_list = ",".join(map(str, column_array))
129-
column_list = column_list + f",{Providers.AuditColumnsNames.TIMESTAMP}"
130-
column_list = column_list + f",{Providers.AuditColumnsNames.IS_DELETED}"
131-
column_list = column_list + f",{Providers.AuditColumnsNames.CHANGE_VERSION}"
132-
133-
primary_key_column_array = [
134-
column_config["destination"]["name"]
135-
for column_config in columns_config
136-
if "primary_key" in column_config["destination"]
137-
and column_config["destination"]["primary_key"]
138-
]
139-
140-
primary_key_column_list = ",".join(map(str, primary_key_column_array))
141-
142-
sql_builder = io.StringIO()
143-
sql_builder.write(
144-
f"INSERT INTO {schema_name}.{target_table_name} ({column_list}) \n"
145-
)
146-
sql_builder.write(
147-
f" SELECT {column_list} FROM {schema_name}.{source_table_name} \n"
148-
)
149-
sql_builder.write(f" ON CONFLICT({primary_key_column_list}) DO UPDATE SET ")
150-
151-
for column_config in columns_config:
152-
sql_builder.write(
153-
"{0} = EXCLUDED.{0},\n".format(column_config["destination"]["name"])
154-
)
155-
156-
sql_builder.write(
157-
"{0} = EXCLUDED.{0},\n".format(Providers.AuditColumnsNames.TIMESTAMP)
158-
)
159-
sql_builder.write(
160-
"{0} = EXCLUDED.{0},\n".format(Providers.AuditColumnsNames.IS_DELETED)
161-
)
162-
sql_builder.write(
163-
"{0} = EXCLUDED.{0};\n".format(Providers.AuditColumnsNames.CHANGE_VERSION)
164-
)
165-
166-
upsert_sql = sql_builder.getvalue()
167-
168-
self.logger.debug(f"UPSERT executing '{upsert_sql}'")
169-
self.target_db.execute(upsert_sql)
170-
self.logger.debug("UPSERT completed")
171-
172-
sql_builder.close()
1+
import io
2+
import os
3+
import logging
4+
from rdl.ColumnTypeResolver import ColumnTypeResolver
5+
6+
from sqlalchemy import MetaData, DateTime, Boolean, BigInteger
7+
from sqlalchemy.schema import Column, Table
8+
from sqlalchemy.sql import func
9+
from rdl.shared import Providers
10+
11+
12+
class DestinationTableManager(object):
13+
def __init__(self, target_db, logger=None):
14+
self.logger = logger or logging.getLogger(__name__)
15+
self.target_db = target_db
16+
self.column_type_resolver = ColumnTypeResolver()
17+
18+
def create_schema(self, schema_name):
19+
self.target_db.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
20+
21+
def table_exists(self, schema_name, table_name):
22+
return self.target_db.dialect.has_table(self.target_db, table_name, schema_name)
23+
24+
def drop_table(self, schema_name, table_name):
25+
metadata = MetaData()
26+
self.logger.debug(f"Dropping table {schema_name}.{table_name}")
27+
28+
table = Table(table_name, metadata, schema=schema_name)
29+
table.drop(self.target_db, checkfirst=True)
30+
31+
self.logger.debug(f"Dropped table {schema_name}.{table_name}")
32+
33+
def create_table(self, schema_name, table_name, columns_configuration, drop_first):
34+
metadata = MetaData()
35+
36+
table = Table(table_name, metadata, schema=schema_name)
37+
38+
for column_configuration in columns_configuration:
39+
table.append_column(self.create_column(column_configuration["destination"]))
40+
41+
table.append_column(
42+
Column(
43+
Providers.AuditColumnsNames.TIMESTAMP,
44+
DateTime(timezone=True),
45+
server_default=func.now(),
46+
)
47+
)
48+
49+
table.append_column(
50+
Column(
51+
Providers.AuditColumnsNames.IS_DELETED,
52+
Boolean,
53+
server_default="f",
54+
default=False,
55+
)
56+
)
57+
58+
table.append_column(
59+
Column(Providers.AuditColumnsNames.CHANGE_VERSION, BigInteger)
60+
)
61+
62+
if drop_first:
63+
self.logger.debug(f"Dropping table {schema_name}.{table_name}")
64+
table.drop(self.target_db, checkfirst=True)
65+
self.logger.debug(f"Dropped table {schema_name}.{table_name}")
66+
67+
self.logger.debug(f"Creating table {schema_name}.{table_name}")
68+
table.create(self.target_db, checkfirst=False)
69+
self.logger.debug(f"Created table {schema_name}.{table_name}")
70+
71+
return
72+
73+
def create_column(self, configuration):
74+
return Column(
75+
configuration["name"],
76+
self.column_type_resolver.resolve_postgres_type(configuration),
77+
primary_key=configuration.get("primary_key", False),
78+
nullable=configuration["nullable"],
79+
)
80+
81+
def rename_table(self, schema_name, source_table_name, target_table_name):
82+
83+
# Steps to efficiently rename a table.
84+
# 1. Drop target_old if exists.
85+
# 2. Begin transaction
86+
# 3. Rename target to target_old if it exists.
87+
# 4. Rename source to target
88+
# 5. commit
89+
# 6. Drop target_old if it exists.
90+
91+
old_load_table_name = f"{target_table_name}__old"
92+
93+
# Step 1
94+
sql = f"DROP TABLE IF EXISTS {schema_name}.{old_load_table_name} CASCADE; "
95+
self.logger.debug(f"Table Rename, executing '{sql}'")
96+
self.target_db.execute(sql)
97+
98+
# Step 2
99+
sql_builder = io.StringIO()
100+
sql_builder.write("BEGIN TRANSACTION; ")
101+
102+
# Step 3
103+
sql_builder.write(
104+
f"ALTER TABLE IF EXISTS {schema_name}.{target_table_name} RENAME TO {old_load_table_name}; "
105+
)
106+
107+
# Step 4
108+
sql_builder.write(
109+
f"ALTER TABLE {schema_name}.{source_table_name} RENAME TO {target_table_name}; "
110+
)
111+
112+
sql_builder.write("COMMIT TRANSACTION; ")
113+
self.logger.debug(f"Table Rename, executing '{sql_builder.getvalue()}'")
114+
self.target_db.execute(sql_builder.getvalue())
115+
116+
sql_builder.close()
117+
118+
sql = f"DROP TABLE IF EXISTS {schema_name}.{old_load_table_name} CASCADE "
119+
self.logger.debug(f"Table Rename, executing '{sql}'")
120+
self.target_db.execute(sql)
121+
122+
def upsert_table(
123+
self, schema_name, source_table_name, target_table_name, columns_config
124+
):
125+
column_array = list(
126+
map(lambda column: column["destination"]["name"], columns_config)
127+
)
128+
column_list = ",".join(map(str, column_array))
129+
column_list = column_list + f",{Providers.AuditColumnsNames.TIMESTAMP}"
130+
column_list = column_list + f",{Providers.AuditColumnsNames.IS_DELETED}"
131+
column_list = column_list + f",{Providers.AuditColumnsNames.CHANGE_VERSION}"
132+
133+
primary_key_column_array = [
134+
column_config["destination"]["name"]
135+
for column_config in columns_config
136+
if "primary_key" in column_config["destination"]
137+
and column_config["destination"]["primary_key"]
138+
]
139+
140+
primary_key_column_list = ",".join(map(str, primary_key_column_array))
141+
142+
sql_builder = io.StringIO()
143+
sql_builder.write(
144+
f"INSERT INTO {schema_name}.{target_table_name} ({column_list}) \n"
145+
)
146+
sql_builder.write(
147+
f" SELECT {column_list} FROM {schema_name}.{source_table_name} \n"
148+
)
149+
sql_builder.write(f" ON CONFLICT({primary_key_column_list}) DO UPDATE SET ")
150+
151+
deleted_column = f"EXCLUDED.{Providers.AuditColumnsNames.IS_DELETED}"
152+
for column_config in columns_config:
153+
if (
154+
"preserve_after_delete" in column_config["destination"]
155+
and column_config["destination"]["preserve_after_delete"]
156+
):
157+
col_name = column_config["destination"]["name"]
158+
existing_column = f"{target_table_name}.{col_name}"
159+
excluded_column = f"EXCLUDED.{col_name}"
160+
sql_builder.write(
161+
f"{col_name} = CASE WHEN {deleted_column} = TRUE THEN {existing_column} ELSE {excluded_column} END, \n"
162+
)
163+
else:
164+
sql_builder.write(
165+
"{0} = EXCLUDED.{0},\n".format(column_config["destination"]["name"])
166+
)
167+
168+
sql_builder.write(
169+
"{0} = EXCLUDED.{0},\n".format(Providers.AuditColumnsNames.TIMESTAMP)
170+
)
171+
sql_builder.write(
172+
"{0} = EXCLUDED.{0},\n".format(Providers.AuditColumnsNames.IS_DELETED)
173+
)
174+
sql_builder.write(
175+
"{0} = EXCLUDED.{0};\n".format(Providers.AuditColumnsNames.CHANGE_VERSION)
176+
)
177+
178+
upsert_sql = sql_builder.getvalue()
179+
180+
self.logger.debug(f"UPSERT executing '{upsert_sql}'")
181+
self.target_db.execute(upsert_sql)
182+
self.logger.debug("UPSERT completed")
183+
184+
sql_builder.close()

0 commit comments

Comments
 (0)