diff --git a/README.md b/README.md index aaad602..0c15e84 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,8 @@ Check `pgdatadiff --help` Docker images are available. -`docker run -it davidjmarkey/pgdatadiff:0.2.1 /usr/bin/pgdatadiff` +``` +docker run -it davidjmarkey/pgdatadiff:0.2.1 /usr/bin/pgdatadiff --help +``` diff --git a/pgdatadiff/main.py b/pgdatadiff/main.py index db41464..d2361ee 100644 --- a/pgdatadiff/main.py +++ b/pgdatadiff/main.py @@ -1,6 +1,6 @@ """ Usage: - pgdatadiff --firstdb= --seconddb= [--only-data|--only-sequences] [--count-only] [--chunk-size=] + pgdatadiff --firstdb= --seconddb= [--schema=] [--only-data|--only-sequences] [--count-only] [--count-with-max] [--chunk-size=] [--exclude-tables=] [--include-tables=] pgdatadiff --version Options: @@ -8,10 +8,14 @@ --version Show version. --firstdb=postgres://postgres:password@localhost/firstdb The connection string of the first DB --seconddb=postgres://postgres:password@localhost/seconddb The connection string of the second DB + --schema="public" The schema of tables in comparison --only-data Only compare data, exclude sequences --only-sequences Only compare seqences, exclude data + --exclude-tables="" Exclude tables from data comparison Must be a comma separated string + --include-tables="" Only include tables in data comparison Must be a comma separated string --count-only Do a quick test based on counts alone --chunk-size=10000 The chunk size when comparing data [default: 10000] + --count-with-max Use MAX(id) when a table uses a sequence, otherwise use COUNT. """ import pkg_resources @@ -33,7 +37,11 @@ def main(): differ = DBDiff(first_db_connection_string, second_db_connection_string, chunk_size=arguments['--chunk-size'], - count_only=arguments['--count-only']) + count_only=arguments['--count-only'], + count_with_max=arguments['--count-with-max'], + exclude_tables=arguments['--exclude-tables'], + include_tables=arguments['--include-tables'], + schema=arguments['--schema']) if not arguments['--only-sequences']: if differ.diff_all_table_data(): diff --git a/pgdatadiff/pgdatadiff.py b/pgdatadiff/pgdatadiff.py index 1bb9be1..da6b177 100644 --- a/pgdatadiff/pgdatadiff.py +++ b/pgdatadiff/pgdatadiff.py @@ -1,6 +1,6 @@ import warnings -from fabulous.color import bold, green, red +from fabulous.color import bold, green, red, yellow from halo import Halo from sqlalchemy import exc as sa_exc from sqlalchemy.engine import create_engine @@ -19,7 +19,7 @@ def make_session(connection_string): class DBDiff(object): - def __init__(self, firstdb, seconddb, chunk_size=10000, count_only=False): + def __init__(self, firstdb, seconddb, schema, chunk_size=10000, count_only=False, count_with_max=False, progress=True, exclude_tables="", include_tables=""): firstsession, firstengine = make_session(firstdb) secondsession, secondengine = make_session(seconddb) self.firstsession = firstsession @@ -32,21 +32,49 @@ def __init__(self, firstdb, seconddb, chunk_size=10000, count_only=False): self.secondinspector = inspect(secondengine) self.chunk_size = int(chunk_size) self.count_only = count_only + self.count_with_max = count_with_max + self.progress = progress + if exclude_tables is None: + self.exclude_tables = [] + else: + self.exclude_tables = exclude_tables.split(',') + if include_tables is None: + self.include_tables = [] + else: + self.include_tables = (include_tables or "").split(',') + self.schema_names = self.firstinspector.get_schema_names() + self.schema = schema or 'public' + if self.schema not in self.schema_names: + raise ValueError("Schema not found, check if argument has valid schema name") + print(f"Comparing for schema {self.schema}") def diff_table_data(self, tablename): try: firsttable = Table(tablename, self.firstmeta, autoload=True) - firstquery = self.firstsession.query( - firsttable) + firstquery = self.firstsession.query(firsttable) secondtable = Table(tablename, self.secondmeta, autoload=True) - secondquery = self.secondsession.query( - secondtable) - if firstquery.count() != secondquery.count(): + secondquery = self.secondsession.query(secondtable) + if self.count_with_max is True: + column = self.column_using_sequence(tablename) + pk_columns = self.firstinspector.get_pk_constraint(tablename)['constrained_columns'] + if column is not None and column in pk_columns: + GET_MAX_SQL = f"SELECT MAX({column}) FROM {tablename}" + first_max_count = self.firstsession.execute(GET_MAX_SQL).fetchone()[0] + second_max_count = self.secondsession.execute(GET_MAX_SQL).fetchone()[0] + if first_max_count != second_max_count: + return False, f"MAX value are different" \ + f" {first_max_count} != {second_max_count}" + if first_max_count == 0: + return None, "using MAX value, tables are empty because MAX on first db is zero" + return True, "MAX Value is same for both tables" + first_count = firstquery.count() + second_count = secondquery.count() + if first_count != second_count: return False, f"counts are different" \ - f" {firstquery.count()} != {secondquery.count()}" - if firstquery.count() == 0: + f" {first_count} != {second_count}" + if first_count == 0: return None, "tables are empty" - if self.count_only is True: + if self.count_only is True or self.count_with_max is True: return True, "Counts are the same" pk = ",".join(self.firstinspector.get_pk_constraint(tablename)[ 'constrained_columns']) @@ -58,17 +86,17 @@ def diff_table_data(self, tablename): return False, "table is missing" SQL_TEMPLATE_HASH = f""" - SELECT md5(array_agg(md5((t.*)::varchar))::varchar) + SELECT md5(array_agg(md5((t.*)::text))::text) FROM ( SELECT * - FROM {tablename} + FROM {self.schema}.{tablename} ORDER BY {pk} limit :row_limit offset :row_offset ) AS t; """ position = 0 - while position <= firstquery.count(): + while position <= first_count: firstresult = self.firstsession.execute( SQL_TEMPLATE_HASH, {"row_limit": self.chunk_size, @@ -78,19 +106,42 @@ def diff_table_data(self, tablename): {"row_limit": self.chunk_size, "row_offset": position}).fetchone() if firstresult != secondresult: - return False, f"data is different - position {position} -" \ + return False, f"data is different - for rows from {position} - to" \ f" {position + self.chunk_size}" position += self.chunk_size + self.display_progress(position, first_count) return True, "data is identical." + def display_progress(self, position, first_count): + if position > first_count: + position = first_count + if first_count > self.chunk_size and self.progress is True: + print(f' Progress: {"{:2.1f}".format(position/first_count*100)}%') + + def column_using_sequence(self, tablename): + GET_COLUMN_OF_TABLES_WITH_SEQUENCES = f"""SELECT + attrib.attname AS column_name + FROM pg_class AS seqclass + JOIN pg_depend AS dep + ON ( seqclass.relfilenode = dep.objid ) + JOIN pg_class AS depclass + ON ( dep.refobjid = depclass.relfilenode ) + JOIN pg_attribute AS attrib + ON ( attrib.attnum = dep.refobjsubid + AND attrib.attrelid = dep.refobjid ) + WHERE seqclass.relkind = 'S' AND depclass.relname = '{tablename}';""" + response = self.firstsession.execute(GET_COLUMN_OF_TABLES_WITH_SEQUENCES).fetchone() + if response is None: + return None + return response[0] + def get_all_sequences(self): - GET_SEQUENCES_SQL = """SELECT c.relname FROM - pg_class c WHERE c.relkind = 'S';""" + GET_SEQUENCES_SQL = f"""SELECT sequence_name FROM information_schema.sequences WHERE sequence_schema = '{self.schema}';""" return [x[0] for x in self.firstsession.execute(GET_SEQUENCES_SQL).fetchall()] def diff_sequence(self, seq_name): - GET_SEQUENCES_VALUE_SQL = f"SELECT last_value FROM {seq_name};" + GET_SEQUENCES_VALUE_SQL = f"SELECT last_value FROM {self.schema}.{seq_name};" try: firstvalue = \ @@ -113,7 +164,7 @@ def diff_sequence(self, seq_name): return True, f"sequences are identical- ({firstvalue})." def diff_all_sequences(self): - print(bold(red('Starting sequence analysis.'))) + print(bold(red(f'Starting sequence analysis for schema -> {self.schema}'))) sequences = sorted(self.get_all_sequences()) failures = 0 for sequence in sequences: @@ -139,9 +190,17 @@ def diff_all_table_data(self): print(bold(red('Starting table analysis.'))) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=sa_exc.SAWarning) - tables = sorted( - self.firstinspector.get_table_names(schema="public")) + tables = sorted(self.firstinspector.get_table_names(schema=self.schema)) + if len(self.include_tables) > 0: + # Intersection of 2 array + tables = [value for value in tables if value in self.include_tables] + if len(tables) == 0: + print(bold(red(f'No tables found in schema: {self.schema}'))) + return 0 for table in tables: + if table in self.exclude_tables: + print(bold(yellow(f"Ignoring table {table} (excluded)"))) + continue with Halo( text=f"Analysing table {table}. " f"[{tables.index(table) + 1}/{len(tables)}]",