Skip to content
This repository was archived by the owner on Jun 27, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Check `pgdatadiff --help`

Docker images are available.

`docker run -it davidjmarkey/pgdatadiff:0.2.1 /usr/bin/pgdatadiff`
```
docker run -it davidjmarkey/pgdatadiff:0.2.1 /usr/bin/pgdatadiff --help
```


12 changes: 10 additions & 2 deletions pgdatadiff/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
"""
Usage:
pgdatadiff --firstdb=<firstconnectionstring> --seconddb=<secondconnectionstring> [--only-data|--only-sequences] [--count-only] [--chunk-size=<size>]
pgdatadiff --firstdb=<firstconnectionstring> --seconddb=<secondconnectionstring> [--schema=<schema>] [--only-data|--only-sequences] [--count-only] [--count-with-max] [--chunk-size=<size>] [--exclude-tables=<table1,table2>] [--include-tables=<table1,table2>]
pgdatadiff --version

Options:
-h --help Show this screen.
--version Show version.
--firstdb=postgres://postgres:password@localhost/firstdb The connection string of the first DB
--seconddb=postgres://postgres:password@localhost/seconddb The connection string of the second DB
--schema="public" The schema of tables in comparison
--only-data Only compare data, exclude sequences
--only-sequences Only compare seqences, exclude data
--exclude-tables="" Exclude tables from data comparison Must be a comma separated string
--include-tables="" Only include tables in data comparison Must be a comma separated string
--count-only Do a quick test based on counts alone
--chunk-size=10000 The chunk size when comparing data [default: 10000]
--count-with-max Use MAX(id) when a table uses a sequence, otherwise use COUNT.
"""

import pkg_resources
Expand All @@ -33,7 +37,11 @@ def main():

differ = DBDiff(first_db_connection_string, second_db_connection_string,
chunk_size=arguments['--chunk-size'],
count_only=arguments['--count-only'])
count_only=arguments['--count-only'],
count_with_max=arguments['--count-with-max'],
exclude_tables=arguments['--exclude-tables'],
include_tables=arguments['--include-tables'],
schema=arguments['--schema'])

if not arguments['--only-sequences']:
if differ.diff_all_table_data():
Expand Down
99 changes: 79 additions & 20 deletions pgdatadiff/pgdatadiff.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import warnings

from fabulous.color import bold, green, red
from fabulous.color import bold, green, red, yellow
from halo import Halo
from sqlalchemy import exc as sa_exc
from sqlalchemy.engine import create_engine
Expand All @@ -19,7 +19,7 @@ def make_session(connection_string):

class DBDiff(object):

def __init__(self, firstdb, seconddb, chunk_size=10000, count_only=False):
def __init__(self, firstdb, seconddb, schema, chunk_size=10000, count_only=False, count_with_max=False, progress=True, exclude_tables="", include_tables=""):
firstsession, firstengine = make_session(firstdb)
secondsession, secondengine = make_session(seconddb)
self.firstsession = firstsession
Expand All @@ -32,21 +32,49 @@ def __init__(self, firstdb, seconddb, chunk_size=10000, count_only=False):
self.secondinspector = inspect(secondengine)
self.chunk_size = int(chunk_size)
self.count_only = count_only
self.count_with_max = count_with_max
self.progress = progress
if exclude_tables is None:
self.exclude_tables = []
else:
self.exclude_tables = exclude_tables.split(',')
if include_tables is None:
self.include_tables = []
else:
self.include_tables = (include_tables or "").split(',')
self.schema_names = self.firstinspector.get_schema_names()
self.schema = schema or 'public'
if self.schema not in self.schema_names:
raise ValueError("Schema not found, check if argument has valid schema name")
print(f"Comparing for schema {self.schema}")

def diff_table_data(self, tablename):
try:
firsttable = Table(tablename, self.firstmeta, autoload=True)
firstquery = self.firstsession.query(
firsttable)
firstquery = self.firstsession.query(firsttable)
secondtable = Table(tablename, self.secondmeta, autoload=True)
secondquery = self.secondsession.query(
secondtable)
if firstquery.count() != secondquery.count():
secondquery = self.secondsession.query(secondtable)
if self.count_with_max is True:
column = self.column_using_sequence(tablename)
pk_columns = self.firstinspector.get_pk_constraint(tablename)['constrained_columns']
if column is not None and column in pk_columns:
GET_MAX_SQL = f"SELECT MAX({column}) FROM {tablename}"
first_max_count = self.firstsession.execute(GET_MAX_SQL).fetchone()[0]
second_max_count = self.secondsession.execute(GET_MAX_SQL).fetchone()[0]
if first_max_count != second_max_count:
return False, f"MAX value are different" \
f" {first_max_count} != {second_max_count}"
if first_max_count == 0:
return None, "using MAX value, tables are empty because MAX on first db is zero"
return True, "MAX Value is same for both tables"
first_count = firstquery.count()
second_count = secondquery.count()
if first_count != second_count:
return False, f"counts are different" \
f" {firstquery.count()} != {secondquery.count()}"
if firstquery.count() == 0:
f" {first_count} != {second_count}"
if first_count == 0:
return None, "tables are empty"
if self.count_only is True:
if self.count_only is True or self.count_with_max is True:
return True, "Counts are the same"
pk = ",".join(self.firstinspector.get_pk_constraint(tablename)[
'constrained_columns'])
Expand All @@ -58,17 +86,17 @@ def diff_table_data(self, tablename):
return False, "table is missing"

SQL_TEMPLATE_HASH = f"""
SELECT md5(array_agg(md5((t.*)::varchar))::varchar)
SELECT md5(array_agg(md5((t.*)::text))::text)
FROM (
SELECT *
FROM {tablename}
FROM {self.schema}.{tablename}
ORDER BY {pk} limit :row_limit offset :row_offset
) AS t;
"""

position = 0

while position <= firstquery.count():
while position <= first_count:
firstresult = self.firstsession.execute(
SQL_TEMPLATE_HASH,
{"row_limit": self.chunk_size,
Expand All @@ -78,19 +106,42 @@ def diff_table_data(self, tablename):
{"row_limit": self.chunk_size,
"row_offset": position}).fetchone()
if firstresult != secondresult:
return False, f"data is different - position {position} -" \
return False, f"data is different - for rows from {position} - to" \
f" {position + self.chunk_size}"
position += self.chunk_size
self.display_progress(position, first_count)
return True, "data is identical."

def display_progress(self, position, first_count):
if position > first_count:
position = first_count
if first_count > self.chunk_size and self.progress is True:
print(f' Progress: {"{:2.1f}".format(position/first_count*100)}%')

def column_using_sequence(self, tablename):
GET_COLUMN_OF_TABLES_WITH_SEQUENCES = f"""SELECT
attrib.attname AS column_name
FROM pg_class AS seqclass
JOIN pg_depend AS dep
ON ( seqclass.relfilenode = dep.objid )
JOIN pg_class AS depclass
ON ( dep.refobjid = depclass.relfilenode )
JOIN pg_attribute AS attrib
ON ( attrib.attnum = dep.refobjsubid
AND attrib.attrelid = dep.refobjid )
WHERE seqclass.relkind = 'S' AND depclass.relname = '{tablename}';"""
response = self.firstsession.execute(GET_COLUMN_OF_TABLES_WITH_SEQUENCES).fetchone()
if response is None:
return None
return response[0]

def get_all_sequences(self):
GET_SEQUENCES_SQL = """SELECT c.relname FROM
pg_class c WHERE c.relkind = 'S';"""
GET_SEQUENCES_SQL = f"""SELECT sequence_name FROM information_schema.sequences WHERE sequence_schema = '{self.schema}';"""
return [x[0] for x in
self.firstsession.execute(GET_SEQUENCES_SQL).fetchall()]

def diff_sequence(self, seq_name):
GET_SEQUENCES_VALUE_SQL = f"SELECT last_value FROM {seq_name};"
GET_SEQUENCES_VALUE_SQL = f"SELECT last_value FROM {self.schema}.{seq_name};"

try:
firstvalue = \
Expand All @@ -113,7 +164,7 @@ def diff_sequence(self, seq_name):
return True, f"sequences are identical- ({firstvalue})."

def diff_all_sequences(self):
print(bold(red('Starting sequence analysis.')))
print(bold(red(f'Starting sequence analysis for schema -> {self.schema}')))
sequences = sorted(self.get_all_sequences())
failures = 0
for sequence in sequences:
Expand All @@ -139,9 +190,17 @@ def diff_all_table_data(self):
print(bold(red('Starting table analysis.')))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=sa_exc.SAWarning)
tables = sorted(
self.firstinspector.get_table_names(schema="public"))
tables = sorted(self.firstinspector.get_table_names(schema=self.schema))
if len(self.include_tables) > 0:
# Intersection of 2 array
tables = [value for value in tables if value in self.include_tables]
if len(tables) == 0:
print(bold(red(f'No tables found in schema: {self.schema}')))
return 0
for table in tables:
if table in self.exclude_tables:
print(bold(yellow(f"Ignoring table {table} (excluded)")))
continue
with Halo(
text=f"Analysing table {table}. "
f"[{tables.index(table) + 1}/{len(tables)}]",
Expand Down