Skip to content
This repository was archived by the owner on Jun 27, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pgdatadiff/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""
Usage:
pgdatadiff --firstdb=<firstconnectionstring> --seconddb=<secondconnectionstring> [--only-data|--only-sequences] [--count-only] [--chunk-size=<size>]
pgdatadiff --firstdb=<firstconnectionstring> --seconddb=<secondconnectionstring> [--schema=<schema>] [--only-data|--only-sequences] [--count-only] [--chunk-size=<size>]
pgdatadiff --version

Options:
-h --help Show this screen.
--version Show version.
--firstdb=postgres://postgres:password@localhost/firstdb The connection string of the first DB
--seconddb=postgres://postgres:password@localhost/seconddb The connection string of the second DB
--schema=<schema> Set the schema (by default, public)
--only-data Only compare data, exclude sequences
--only-sequences Only compare seqences, exclude data
--count-only Do a quick test based on counts alone
Expand All @@ -26,12 +27,20 @@ def main():
__doc__, version=pkg_resources.require("pgdatadiff")[0].version)
first_db_connection_string=arguments['--firstdb']
second_db_connection_string=arguments['--seconddb']

if not first_db_connection_string.startswith("postgres://") or \
not second_db_connection_string.startswith("postgres://"):
print(red("Only Postgres DBs are supported"))
return 1

differ = DBDiff(first_db_connection_string, second_db_connection_string,
schema_name=arguments['--schema']

if not schema_name:
schema_name='public'

print(f"Checking database using schema [{schema_name}]...")

differ = DBDiff(first_db_connection_string, second_db_connection_string, schema_name,
chunk_size=arguments['--chunk-size'],
count_only=arguments['--count-only'])

Expand Down
45 changes: 33 additions & 12 deletions pgdatadiff/pgdatadiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def make_session(connection_string):

class DBDiff(object):

def __init__(self, firstdb, seconddb, chunk_size=10000, count_only=False):
def __init__(self, firstdb, seconddb, schema_name, chunk_size=10000, count_only=False):
firstsession, firstengine = make_session(firstdb)
secondsession, secondengine = make_session(seconddb)
self.firstsession = firstsession
Expand All @@ -32,13 +32,14 @@ def __init__(self, firstdb, seconddb, chunk_size=10000, count_only=False):
self.secondinspector = inspect(secondengine)
self.chunk_size = int(chunk_size)
self.count_only = count_only
self.schema_name = schema_name

def diff_table_data(self, tablename):
try:
firsttable = Table(tablename, self.firstmeta, autoload=True)
firsttable = Table(tablename, self.firstmeta, autoload=True, schema=self.schema_name)
firstquery = self.firstsession.query(
firsttable)
secondtable = Table(tablename, self.secondmeta, autoload=True)
secondtable = Table(tablename, self.secondmeta, autoload=True, schema=self.schema_name)
secondquery = self.secondsession.query(
secondtable)
if firstquery.count() != secondquery.count():
Expand All @@ -48,8 +49,7 @@ def diff_table_data(self, tablename):
return None, "tables are empty"
if self.count_only is True:
return True, "Counts are the same"
pk = ",".join(self.firstinspector.get_pk_constraint(tablename)[
'constrained_columns'])
pk = ",".join([f'"{x}"' for x in self.firstinspector.get_pk_constraint(tablename)['constrained_columns']])
if not pk:
return None, "no primary key(s) on this table." \
" Comparision is not possible."
Expand All @@ -61,13 +61,18 @@ def diff_table_data(self, tablename):
SELECT md5(array_agg(md5((t.*)::varchar))::varchar)
FROM (
SELECT *
FROM {tablename}
FROM "{self.schema_name}"."{tablename}"
ORDER BY {pk} limit :row_limit offset :row_offset
) AS t;
"""

position = 0
SQL_DIFFERENCE_BLOCK = f"""
SELECT (t.*)::varchar
FROM "{self.schema_name}"."{tablename}" t
ORDER BY {pk} limit :row_limit offset :row_offset;
"""

position = 0
while position <= firstquery.count():
firstresult = self.firstsession.execute(
SQL_TEMPLATE_HASH,
Expand All @@ -78,19 +83,35 @@ def diff_table_data(self, tablename):
{"row_limit": self.chunk_size,
"row_offset": position}).fetchone()
if firstresult != secondresult:
# OK - data is different - show the first rows which differ
firstdiff = self.firstsession.execute(
SQL_DIFFERENCE_BLOCK,
{"row_limit": self.chunk_size,
"row_offset": position}).fetchall()
seconddiff = self.secondsession.execute(
SQL_DIFFERENCE_BLOCK,
{"row_limit": self.chunk_size,
"row_offset": position}).fetchall()
index = position
for first, second in zip(firstdiff, seconddiff):
first_row = first[0]
second_row = second[0]
if first_row != second_row:
return False, f"data first differs at position: {index}\n1st: {first_row}\n2nd: {second_row}" \
f"\nComparison ends.\n"
index += 1
return False, f"data is different - position {position} -" \
f" {position + self.chunk_size}"
position += self.chunk_size
return True, "data is identical."

def get_all_sequences(self):
GET_SEQUENCES_SQL = """SELECT c.relname FROM
pg_class c WHERE c.relkind = 'S';"""
GET_SEQUENCES_SQL = f"""SELECT sequence_name FROM information_schema.sequences WHERE sequence_schema = '{self.schema_name}';"""
return [x[0] for x in
self.firstsession.execute(GET_SEQUENCES_SQL).fetchall()]

def diff_sequence(self, seq_name):
GET_SEQUENCES_VALUE_SQL = f"SELECT last_value FROM {seq_name};"
GET_SEQUENCES_VALUE_SQL = f"SELECT last_value FROM \"{seq_name}\";"

try:
firstvalue = \
Expand All @@ -110,7 +131,7 @@ def diff_sequence(self, seq_name):
if firstvalue > secondvalue:
return False, f"first sequence is greater than" \
f" the second({firstvalue} vs {secondvalue})."
return True, f"sequences are identical- ({firstvalue})."
return True, f"sequences are identical - ({firstvalue})."

def diff_all_sequences(self):
print(bold(red('Starting sequence analysis.')))
Expand Down Expand Up @@ -140,7 +161,7 @@ def diff_all_table_data(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=sa_exc.SAWarning)
tables = sorted(
self.firstinspector.get_table_names(schema="public"))
self.firstinspector.get_table_names(schema=self.schema_name))
for table in tables:
with Halo(
text=f"Analysing table {table}. "
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
install_requires=[
'SQLAlchemy<=1.3.11',
'halo<=0.0.28',
'psycopg2<=2.8.4',
'psycopg2-binary<=2.8.4',
'fabulous<=0.3.0',
'docopt<=0.6.2'
],
Expand Down