From 4bfc1a79119d72bb7b47357476d5f32227e4b4e0 Mon Sep 17 00:00:00 2001 From: kingshy Date: Wed, 18 Jan 2017 16:53:33 -0800 Subject: [PATCH 1/6] new account-actions table to track user activities --- configurations/analytics.sql | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 configurations/analytics.sql diff --git a/configurations/analytics.sql b/configurations/analytics.sql new file mode 100644 index 0000000..3c433ba --- /dev/null +++ b/configurations/analytics.sql @@ -0,0 +1,25 @@ + +-- +-- table to get account_id, action, count(*) +-- Notes: +-- * default null_string is "\N" -- any Optional.absent() values should be replaced with this +-- http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-null-as +-- * use interleaved sortkey which is better for ad-hoc queries. +-- + +CREATE TABLE account_actions ( + account_id INTEGER, + action VARCHAR(256) NOT NULL, + category VARCHAR(256) DEFAULT NULL, -- ops, app, mobile, fw, ds, biz etc... + result VARCHAR(256) DEFAULT NULL, -- result of action if exist. e.g. pass/fail/value + sense_id VARCHAR(64) DEFAULT NULL, + fw INTEGER DEFAULT NULL, -- firmware version + hw INTEGER DEFAULT NULL, -- hardware version 1 or 4 + ts TIMESTAMP WITHOUT TIME ZONE, -- utc + offset_millis INTEGER DEFAULT NULL -- local offset if available +) DISTYLE KEY DISTKEY (account_id) +INTERLEAVED SORTKEY (account_id, ts, action, category, sense_id, fw_version); + +GRANT ALL ON account_actions to migrator; +GRANT ALL ON account_actions to group ops; + From 5e08ac410cd42ae140aec803324d5675cca3cb00 Mon Sep 17 00:00:00 2001 From: kingshy Date: Mon, 23 Jan 2017 13:02:56 -0800 Subject: [PATCH 2/6] make it simple --- configurations/analytics.sql | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/configurations/analytics.sql b/configurations/analytics.sql index 3c433ba..f35d6b7 100644 --- a/configurations/analytics.sql +++ b/configurations/analytics.sql @@ -7,18 +7,14 @@ -- * use interleaved sortkey which is better for ad-hoc queries. -- -CREATE TABLE account_actions ( +CREATE TABLE actions ( account_id INTEGER, - action VARCHAR(256) NOT NULL, - category VARCHAR(256) DEFAULT NULL, -- ops, app, mobile, fw, ds, biz etc... - result VARCHAR(256) DEFAULT NULL, -- result of action if exist. e.g. pass/fail/value - sense_id VARCHAR(64) DEFAULT NULL, - fw INTEGER DEFAULT NULL, -- firmware version - hw INTEGER DEFAULT NULL, -- hardware version 1 or 4 - ts TIMESTAMP WITHOUT TIME ZONE, -- utc + action VARCHAR(256) NOT NULL, -- action that user is performing + result VARCHAR(256) DEFAULT NULL, -- result of action if exist. e.g. pass/fail/value + ts TIMESTAMP WITHOUT TIME ZONE, -- utc offset_millis INTEGER DEFAULT NULL -- local offset if available ) DISTYLE KEY DISTKEY (account_id) -INTERLEAVED SORTKEY (account_id, ts, action, category, sense_id, fw_version); +INTERLEAVED SORTKEY (account_id, ts, action); GRANT ALL ON account_actions to migrator; GRANT ALL ON account_actions to group ops; From e016c6f5b7f8d45ca27f344b9d303b165331ceac Mon Sep 17 00:00:00 2001 From: kingshy Date: Fri, 27 Jan 2017 12:13:48 -0800 Subject: [PATCH 3/6] make account_id BIGINT, will make joins with other table faster --- configurations/analytics.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/configurations/analytics.sql b/configurations/analytics.sql index f35d6b7..a7cab71 100644 --- a/configurations/analytics.sql +++ b/configurations/analytics.sql @@ -8,14 +8,14 @@ -- CREATE TABLE actions ( - account_id INTEGER, + account_id BIGINT, action VARCHAR(256) NOT NULL, -- action that user is performing result VARCHAR(256) DEFAULT NULL, -- result of action if exist. e.g. pass/fail/value ts TIMESTAMP WITHOUT TIME ZONE, -- utc offset_millis INTEGER DEFAULT NULL -- local offset if available -) DISTYLE KEY DISTKEY (account_id) +) DISTSTYLE KEY DISTKEY (account_id) INTERLEAVED SORTKEY (account_id, ts, action); -GRANT ALL ON account_actions to migrator; -GRANT ALL ON account_actions to group ops; +GRANT ALL ON actions to migrator; +GRANT ALL ON actions to group ops; From 22539ddc8d7e914df23e70e45ed60abc140d0648 Mon Sep 17 00:00:00 2001 From: kingshy Date: Mon, 30 Jan 2017 17:15:04 -0800 Subject: [PATCH 4/6] update snapshots with key_store --- configurations/dynamodb_snapshots.sql | 39 ++ get_speech_stats.py | 60 --- cron_upload.py => scripts/cron_upload.py | 0 scripts/get_speech_stats.py | 152 +++++++ manual_upload.py => scripts/manual_upload.py | 0 scripts/slackbot.py | 24 ++ scripts/test_slack.py | 11 + upload.py => scripts/upload.py | 0 scripts/upload_prev.py | 406 +++++++++++++++++++ snapshots/ddb_copy_fields.yml | 17 + snapshots/snapshot_ddb.py | 56 ++- snapshots/snapshot_rds.sh | 45 +- 12 files changed, 734 insertions(+), 76 deletions(-) delete mode 100644 get_speech_stats.py rename cron_upload.py => scripts/cron_upload.py (100%) create mode 100644 scripts/get_speech_stats.py rename manual_upload.py => scripts/manual_upload.py (100%) create mode 100644 scripts/slackbot.py create mode 100644 scripts/test_slack.py rename upload.py => scripts/upload.py (100%) create mode 100644 scripts/upload_prev.py diff --git a/configurations/dynamodb_snapshots.sql b/configurations/dynamodb_snapshots.sql index 3ec871e..503c08f 100644 --- a/configurations/dynamodb_snapshots.sql +++ b/configurations/dynamodb_snapshots.sql @@ -220,3 +220,42 @@ ALTER TABLE prod_speech_timeline OWNER to migrator; ALTER TABLE prod_speech_results ADD COLUMN fw INTEGER DEFAULT 0; +-- 2017-01-30 +CREATE TABLE key_store ( + aes_key VARCHAR(256), + created_at TIMESTAMP WITHOUT TIME ZONE, + device_id VARCHAR(64), + hw_version INTEGER, + metadata VARCHAR(512), + note VARCHAR(512) +) DISTSTYLE KEY DISTKEY (device_id) +INTERLEAVED SORTKEY (device_id, metadata); + +GRANT ALL ON key_store TO group ops; +GRANT ALL ON key_store to migrator; +ALTER TABLE key_store OWNER to migrator; + +CREATE TABLE key_store_admin ( + created_at TIMESTAMP WITHOUT TIME ZONE, + device_id VARCHAR(64), + hw_version INTEGER, + metadata VARCHAR(512), + note VARCHAR(512) +) DISTSTYLE KEY DISTKEY (device_id) +INTERLEAVED SORTKEY (device_id, metadata); + +ALTER TABLE key_store_admin OWNER to migrator; +GRANT SELECT ON key_store_admin to admin_tool; + + +CREATE TABLE pill_key_store ( + aes_key VARCHAR(256), + created_at TIMESTAMP WITHOUT TIME ZONE, + device_id VARCHAR(64), + metadata VARCHAR(512), + note VARCHAR(512) +) DISTSTYLE KEY DISTKEY (device_id) +INTERLEAVED SORTKEY (device_id, metadata); + +GRANT ALL ON pill_key_store TO group ops; +ALTER TABLE pill_key_store OWNER to migrator; diff --git a/get_speech_stats.py b/get_speech_stats.py deleted file mode 100644 index 0de1502..0000000 --- a/get_speech_stats.py +++ /dev/null @@ -1,60 +0,0 @@ -import os -from datetime import datetime -import json -import arrow -import psycopg2 -import psycopg2.extras -import requests - -REDSHIT_HOSTNAME = 'sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com' -REDSHIT_USER = 'migrator' -REDSHIT_DB = 'sensors1' - -SLACK_URL = "https://hooks.slack.com/services/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" -HEADERS = {'content-type': 'application/json'} - -def get_db(): - """Connects to the specific database.""" - conn = psycopg2.connect( - database=REDSHIT_DB, - user=REDSHIT_USER, - password=os.getenv('REDSHIT_PASS'), - host=REDSHIT_HOSTNAME, - port=5439 - ) - return conn - -def main(): - utc_now = arrow.get(datetime.utcnow()) - week_ago = utc_now.replace(days=-7) - date_string = week_ago.format('YYYY-MM-DD') - query = """SELECT date_trunc('day', created_utc) AS day, COUNT(*) AS c - FROM prod_speech_results - WHERE created_utc >= %s GROUP BY day ORDER BY day""" - - db = get_db() - with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute(query, (date_string,)) - rows = cur.fetchall() - slack_string = "" - for row in rows: - slack_string += "\n%s | %s" % (str(row['day'])[:10], row['c']) - - attachments = { - "fallback": "Voice command stats", - "fields": [ - { - "title": "Commands per Day", - "value": slack_string - } - ] - } - payload = { - 'channel': '#voice-stats', - 'username': 'shout', - "attachments": [attachments] - } - requests.post(SLACK_URL, data=json.dumps(payload), headers=HEADERS) - -if __name__ == "__main__": - main() diff --git a/cron_upload.py b/scripts/cron_upload.py similarity index 100% rename from cron_upload.py rename to scripts/cron_upload.py diff --git a/scripts/get_speech_stats.py b/scripts/get_speech_stats.py new file mode 100644 index 0000000..b083294 --- /dev/null +++ b/scripts/get_speech_stats.py @@ -0,0 +1,152 @@ +import os +from datetime import datetime +import json +import arrow +import psycopg2 +import psycopg2.extras +import requests + +REDSHIT_HOSTNAME = 'sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com' +REDSHIT_USER = 'migrator' +REDSHIT_DB = 'sensors1' + +SLACK_TOKEN = "T024FJP19/B30RWGBB9/KtT7y8rXKGhaH3yAduGuPgGh" +SLACK_URL = "https://hooks.slack.com/services/%s" % SLACK_TOKEN +HEADERS = {'content-type': 'application/json'} + +def get_db(): + """Connects to the specific database.""" + conn = psycopg2.connect( + database=REDSHIT_DB, + user=REDSHIT_USER, + password=os.getenv('REDSHIT_PASS'), + host=REDSHIT_HOSTNAME, + port=5439 + ) + return conn + +def post_slack(slack_string, title): + attachments = { + "fallback": "Voice command stats", + "fields": [ + { + "title": title, + "value": slack_string + } + ] + } + + payload = { + 'channel': '#voice-stats', + 'username': 'shout', + "attachments": [attachments] + } + + requests.post(SLACK_URL, data=json.dumps(payload), headers=HEADERS) + + +def main(): + utc_now = arrow.get(datetime.utcnow()) + week_ago = utc_now.replace(days=-7) + date_string = week_ago.format('YYYY-MM-DD') + + # number of commands per day + query = """SELECT date_trunc('day', created_utc) AS day, COUNT(*) AS c + FROM prod_speech_results + WHERE created_utc >= %s GROUP BY day ORDER BY day""" + + stats = {} + days = [] + db = get_db() + with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(query, (date_string,)) + rows = cur.fetchall() + slack_string = "" + for row in rows: + # slack_string += "\n%s | %s" % (str(row['day'])[:10], row['c']) + days.append(row['day']) + stats.setdefault(row['day'], {'c':0, 'u': 0}) + stats[row['day']]['c'] = row['c'] + + # if slack_string != "": + # post_slack(slack_string, "Commands per day (testing)") + + + # number of distinct account-ids using speech + query = """SELECT date_trunc('day', ts) AS day, + COUNT(distinct account_id) AS c + FROM prod_speech_timeline + WHERE ts >= %s GROUP BY day ORDER BY day""" + + with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(query, (date_string,)) + rows = cur.fetchall() + slack_string = "" + for row in rows: + # slack_string += "\n%s | %s" % (str(row['day'])[:10], row['c']) + if row['day'] in stats: + stats[row['day']]['u'] = row['c'] + + # if slack_string != "": + # post_slack(slack_string, + # "Distinct accounts using per day (testing)") + + if stats: + slack_string = "" + for day in days: + slack_string += "\n%s | %s | %s" % ( + day, stats[day]['c'], stats[day]['u']) + + if slack_string != "": + post_slack(slack_string, "Commands and distinct accounts per day") + + cmd_query = """ + SELECT date, fw, + ROUND((1.0*empty_text)/total, 2) as empty_text_perc, + ROUND((1.0*text_try_again)/total, 2) as text_try_again_perc, + ROUND((1.0*text_reject)/total, 2) as text_reject_perc, + ROUND((1.0*ok_cmds)/total, 2) as ok_commands, + total AS total_commands + FROM ( + SELECT DATE_TRUNC('day', created_utc) AS date, fw, + SUM(CASE WHEN (text != '' AND cmd_result='TRY_AGAIN') THEN 1 ELSE 0 END) AS text_try_again, + SUM(CASE WHEN (text != '' AND cmd_result='REJECTED') THEN 1 ELSE 0 END) AS text_reject, + SUM(CASE WHEN (text = '') THEN 1 ELSE 0 END) as empty_text, + SUM(CASE WHEN (cmd != '') THEN 1 ELSE 0 END) as ok_cmds, + COUNT(*) AS total + FROM prod_speech_results WHERE created_utc >= current_date - INTERVAL '7 day' + GROUP BY date, fw ORDER BY date, fw + ) ORDER BY date, fw + """ + with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(cmd_query, (date_string,)) + rows = cur.fetchall() + slack_title = "Date | empty_text | reject | try_again | ok_cmd | total " + num_data = 0 + slack_data = {} + for row in rows: + fw_version = str(row['fw']) + if fw_version is None: + fw_version = 'null' + if fw_version in ['-1', '0', '1', 'None']: + continue + slack_data.setdefault(fw_version, []) + num_data += 1 + slack_string = "\n%s | %s | %s | %s | %s | %s" % ( + str(row['date'])[:10], + row['empty_text_perc'], + row['text_reject_perc'], + row['text_try_again_perc'], + row['ok_commands'], + row['total_commands']) + slack_data[fw_version].append(slack_string) + + + for fw in sorted(slack_data): + slack_string = slack_title + for row in slack_data[fw]: + slack_string += row + post_slack(slack_string, "Commands breakdown by FW version %s" % (fw)) + +if __name__ == "__main__": + main() diff --git a/manual_upload.py b/scripts/manual_upload.py similarity index 100% rename from manual_upload.py rename to scripts/manual_upload.py diff --git a/scripts/slackbot.py b/scripts/slackbot.py new file mode 100644 index 0000000..1b6fd38 --- /dev/null +++ b/scripts/slackbot.py @@ -0,0 +1,24 @@ +"""Slack""" +import os +import json +import logging +import requests + +SLACK_TOKEN = os.getenv('REDSHIFT_SLACK') +SLACK_URL = "https://hooks.slack.com/services/%s" % (SLACK_TOKEN) +HEADERS = {'content-type': 'application/json'} + +def post(table, date, text): + """Post a status message to ops""" + if not SLACK_TOKEN: + logging.error("Wrong url") + return + + prefix = "*Copy %s %s:* " % (table, date) + payload = {'text': prefix + text, + 'channel': '#ops', 'username': 'redshit'} + try: + requests.post(SLACK_URL, data=json.dumps(payload), headers=HEADERS) + except requests.exceptions.RequestException as err: + logging.error("Fail to send to Slack") + logging.error(err) diff --git a/scripts/test_slack.py b/scripts/test_slack.py new file mode 100644 index 0000000..ead23f1 --- /dev/null +++ b/scripts/test_slack.py @@ -0,0 +1,11 @@ + +import requests +import os +import json + +data = {"text": "testing webhook", "channel": "#ops", "username": "redshit"} +slack_token = os.getenv('REDSHIFT_SLACK') +if slack_token: + slack_url = "https://hooks.slack.com/services/%s" % (slack_token) + headers={'content-type': 'application/json'} + r = requests.post(slack_url, json.dumps(data), headers=headers) diff --git a/upload.py b/scripts/upload.py similarity index 100% rename from upload.py rename to scripts/upload.py diff --git a/scripts/upload_prev.py b/scripts/upload_prev.py new file mode 100644 index 0000000..efb2117 --- /dev/null +++ b/scripts/upload_prev.py @@ -0,0 +1,406 @@ +""" +script to migrate sensors1 database from RDS to Redshift +""" + +import os +import sys + +from datetime import datetime +import time + +import json +import hashlib +import logging + +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('boto').setLevel(logging.ERROR) + +from boto.s3.connection import S3Connection +from boto.s3.key import Key + +import boto +from boto.dynamodb2.table import Table +from boto.dynamodb2.items import Item + +REDSHIFT_HOST = 'sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com' +REDSHIFT_PSQL = "psql -h %s -U migrator -p 5439 -d sensors1" % (REDSHIFT_HOST) +DYNAMODB_TABLE = 'redshift_log' # logs +S3_MAIN_BUCKET = "hello-db-exports" # S3 bucket to store gzipped files +MAX_LINES = 100000000 +CHUNK_SIZE = 1000000 # testing 5000000 # lines + +ERROR_NOT_FOUND = 'File in S3 bucket, but not in local drive' +ERROR_CHECKSUM = "Checksum do not match" + +# terminal colors +GREEN = '\033[92m' +ENDC = '\033[0m' +BLUE = '\033[94m' +BOLD = '\033[1m' + + +def main(args): + logging.debug("\n\nStart %s", str(datetime.now())) + + if not os.getenv("aws_secret_access_key") or \ + not os.getenv("aws_access_key_id"): + logging.error("Please set credentials and password env") + return + + table_prefix = args[1] + year = args[2] + month = args[3] + day = args[4] + get_db = args[5] + do_split = args[6] + + yy_mm = "%s_%s" % (year, month) + date = "%s_%s_%s" % (year, month, day) + + table_name = "%s_%s" % (table_prefix, date) # eg device_sensors_par_2015_08_01 + redshift_table = "%s_%s" % (table_prefix, yy_mm) + prefix = "%s_%s" % (table_prefix, date) + folder = "%s_%s" % (table_prefix, date) + + # set s3 bucket name + if "device" in table_prefix: + bucket_name = "device_sensors_%s/%s" % (yy_mm, date) + else: + bucket_name = "tracker_motion_%s/%s" % (yy_mm, date) + + # create data folder if not exist + if not os.path.isdir(folder): + logging.debug("Creating folder for data: %s", folder) + os.makedirs(folder) + + + # 1. Get data from DB + datafile = "%s/%s.csv" % (folder, prefix) + if get_db == 'yes': + logging.debug("\n\nGetting data from RDS table %s", table_name) + start_ts = int(time.time()) + + # get_data.sh + os.system("./get_data.sh %s %s %s" % (table_name, prefix, folder)) + + lapse = int(time.time()) - start_ts + logging.debug("get data time taken: %d", lapse) + + line_count = 0 + with open("%s.count" % table_name) as fp: + lines = fp.read().split("\n") + line_count = int(lines[0].split()[0]) + + if line_count == 0: + logging.error("no data retrieved from RDS") + sys.exit() + + logging.debug("\nGet table size from RDS") + start_ts = int(time.time()) + + # get_count.sh + os.system("./get_count.sh %s" % (table_name)) + + lapse = int(time.time()) - start_ts + logging.debug("get count time taken: %d", lapse) + + # check counts + logging.debug("check table size") + lines = open("%s.rds_count" % table_name).read().split("\n") + rds_line_count = int(lines[0].split()[0]) + + if rds_line_count != line_count: + logging.error("Downloaded file %s has insufficient data") + sys.exit() + else: + logging.debug("line counts matches %d", line_count) + + + # make sure data file exists + gzip_datafile = "%s.csv.gz" % prefix + logging.debug("\n\nCheck if datafile %s, %s exist", + datafile, gzip_datafile) + if not os.path.isfile(datafile) and not \ + os.path.isfile("%s/%s" % (folder, gzip_datafile)): + logging.error("Data not downloaded to %s", datafile) + logging.debug("exiting....") + sys.exit() + + + # 2. split data into smaller chunks + logging.debug("\n\nSplit data into files of size %s", CHUNK_SIZE) + start_ts = int(time.time()) + file_info = {} + num_lines = 0 + if do_split == 'yes': + num = 0 + with open(datafile) as filep: + added = 0 + for i, line in enumerate(filep): + if added == 0: + splitfile = "%s-0%d" % (prefix, num) + if num < 10: + splitfile = "%s-000%d" % (prefix, num) + elif num < 100: + splitfile = "%s-00%d" % (prefix, num) + elif num < 1000: + splitfile = "%s-00%d" % (prefix, num) + + split_fp = open("%s/%s" % (folder, splitfile), 'w') + + split_fp.write(line) + num_lines += 1 + added += 1 + + if added >= CHUNK_SIZE: + logging.debug("split file %s: %d", splitfile, added) + split_fp.close() + file_info.setdefault(splitfile + ".gz", {}) + file_info[splitfile + ".gz"]['size'] = added + file_info[splitfile + ".gz"]['created'] = datetime.strftime( + datetime.now(), "%Y-%m-%d %H:%M:%S") + added = 0 + num += 1 + + split_fp.close() + logging.debug("split file %s: %d", splitfile, added) + file_info.setdefault(splitfile + ".gz", {}) + file_info[splitfile + ".gz"]['size'] = added + file_info[splitfile + ".gz"]['created'] = datetime.strftime( + datetime.now(), "%Y-%m-%d %H:%M:%S") + + file_info[gzip_datafile] = {} + file_info[gzip_datafile]['size'] = num_lines + file_info[gzip_datafile]['created'] = datetime.strftime( + datetime.now(), "%Y-%m-%d %H:%M:%S") + + lapse = int(time.time()) - start_ts + logging.debug("split file time taken: %d", lapse) + + + # 3. upload to S3 + logging.debug("\n\nPreparing to upload to S3 %s/%s/%s", + S3_MAIN_BUCKET, bucket_name, date) + start_ts = int(time.time()) + + onlyfiles = [ f for f in os.listdir(folder) + if os.path.isfile(os.path.join(folder,f)) ] + + aws_key = os.getenv("aws_access_key_id") + aws_secret = os.getenv("aws_secret_access_key") + + conn = S3Connection(aws_key, aws_secret) + bucket = conn.get_bucket(S3_MAIN_BUCKET) + + uploaded = bucket.list(bucket_name) + up_files = [] + for k in uploaded: up_files.append(k.key) + + + for org_filename in onlyfiles: + if '.gz' not in org_filename: + cmd = "pigz %s/%s" % (folder, org_filename) + logging.debug(cmd) + + os.system(cmd) + + filename = "%s.gz" % org_filename + else: + filename = org_filename + + logging.debug("---- Uploading file %s", filename) + k = Key(bucket) + k.key = "%s/%s" % (bucket_name, filename) + + # get md5 checksum for verification + os.system("md5sum %s/%s > md5info" % (folder, filename)) + file_md5 = open("md5info", 'r').read().split(' ')[0] + logging.debug("md5sum = %s", file_md5) + + file_info.setdefault(filename, {}) + file_info[filename]['checksum'] = file_md5 + + if 'gz' in org_filename: + file_info[filename]['size'] = CHUNK_SIZE + t = os.path.getmtime("%s/%s" %(folder, filename)) + modified_dt = datetime.fromtimestamp(t) + file_info[filename]['created'] = datetime.strftime( + modified_dt, "%Y-%m-%d %H:%M:%S") + + if "%s/%s" % (bucket_name, filename) in up_files: + logging.debug("Already uploaded %s", filename) + continue + + aws_md5 = Key.get_md5_from_hexdigest(k, file_md5) + logging.debug("md5: %s, %r", file_md5, aws_md5) + + k.set_contents_from_filename("%s/%s" % (folder, filename), md5=aws_md5) + + lapse = int(time.time()) - start_ts + logging.debug("s3 upload time taken: %d", lapse) + + + # 4. check md5 checksum and save to dynamoDB + logging.debug("\n\nChecking MD5 checksums of uploaded files") + start_ts = int(time.time()) + rs_keys = bucket.list(bucket_name) + num_files_uploaded = len(file_info) + + num_errors = 0 + not_found_errors = 0 + dynamo_table = Table('redshift_log') + + for key_val in rs_keys: + if bucket_name not in key_val.key: + continue + + filename = key_val.key.split("/")[-1] + if not filename: + logging.error("Empty filename %r", key_val.key) + continue + + if filename == gzip_datafile: + continue + + if filename not in file_info: + logging.error("file %s not found!! key %r", filename, key_val.key) + not_found_errors += 1 + continue + # checksum + logging.debug("check info for file %s: %r", + filename, file_info[filename]) + + local_checksum = file_info[filename]['checksum'] + s3_checksum = key_val.etag.strip('"') + + d_item = { + 'filename': filename, + 'uploaded_s3': True, + 'size': file_info[filename]['size'], + 'created': file_info[filename]['created'], + 'local_checksum': local_checksum, + 's3_checksum': s3_checksum} + + if local_checksum != s3_checksum: + logging.error("fail checksum file: %s, checksum: %s, etag: %s", + filename, local_checksum, s3_checksum) + num_errors += 1 + d_item['errors'] = ERROR_CHECKSUM + + logging.debug("dynamo item: %r", d_item) + new_item = Item(dynamo_table, data=d_item) + new_item.save(overwrite=True) + + + lapse = int(time.time()) - start_ts + logging.debug("md5 check time taken: %d", lapse) + + logging.debug("\n\nSummary for %s", prefix) + logging.debug("Number of lines: %d", num_lines) + logging.debug("Number of splitted files: %d", len(onlyfiles)) + logging.debug("Number of files uploaded: %d", num_files_uploaded) + logging.debug("Errors-not-found: %d", not_found_errors) + logging.debug("Errors-checksum: %d", num_errors) + + + # create manifest file, format: + # { + # "entries": [ + # {"url":"s3://mybucket/custdata.1","mandatory":true}, + # {"url":"s3://mybucket/custdata.2","mandatory":true}, + # {"url":"s3://mybucket/custdata.3","mandatory":true} + # ] + # } + logging.debug("\n\nCreate manifest file") + entries = [] + for filename in file_info: + if filename == gzip_datafile: + logging.debug("do not add original file %s to manifest", datafile) + continue + + logging.debug("check file %s", filename) + try: + d_item = dynamo_table.get_item(filename=filename) + except boto.dynamodb2.exceptions.ItemNotFound, e: + logging.error("cannot find entry %s in dynamo logs", filename) + logging.error("%r", e) + continue + logging.debug("adding to manifest: %s", filename) + entries.append({"url": "s3://%s/%s/%s" % ( + S3_MAIN_BUCKET, bucket_name, filename), + "mandatory": True}) + d_item['added_manifest'] = True + d_item.partial_save() + + + manifest_data = {"entries": entries} + manifest_file = "%s.manifest" % prefix + filep = open("%s/%s" % (folder, manifest_file), "w") + filep.write(json.dumps(manifest_data, separators=(',', ':'), indent=2)) + filep.close() + + # upload the manifest file + k = Key(bucket) + k.key = "%s/%s" % (bucket_name, manifest_file) + + file_md5 = hashlib.md5( + open("%s/%s" % (folder, manifest_file), 'rb').read()).hexdigest() + aws_md5 = Key.get_md5_from_hexdigest(k, file_md5) + k.set_contents_from_filename("%s/%s" % (folder, manifest_file), + md5=aws_md5) + + + # 5. set up COPY command to import from S3 to Redshift + logging.debug("\n\nPreparing to COPY data to Redshift") + + # create copy script + copy_filename = "Copy/copy_%s.sh" % prefix + filep = open(copy_filename, "w") + filep.write("#!/bin/bash" + "\n") + filep.write(REDSHIFT_PSQL + " << EOF\n") # psql connection + + credential = "'aws_access_key_id=%s;aws_secret_access_key=%s'" % ( + aws_key, aws_secret) + manifest_file = "'s3://hello-db-exports/%s/%s.manifest'" % ( + bucket_name, prefix) + copy_cmd = "COPY %s from %s credentials %s delimiter ',' gzip manifest;" % ( + redshift_table, manifest_file, credential) + + filep.write(copy_cmd + "\nEOF") # COPY + filep.close() + + os.system("chmod +x %s" % copy_filename) + logging.debug("COPY command: %s", copy_cmd) + + + # okay = raw_input("\n\n" + GREEN + BOLD + "Review manifest and command\n" + + # "Okay to proceed with Redshift COPY?(Y/n)" + ENDC) + + okay = 'Y' + + if okay == 'Y': + logging.debug("Executing COPY command") + start_ts = int(time.time()) + os.system("./%s" % (copy_filename)) + + lapse = int(time.time()) - start_ts + logging.debug("COPY to Redshift time taken: %d", lapse) + + logging.debug("\n\nDone %s", str(datetime.now())) + + +if __name__ == "__main__": + """Example: + python upload.py device_sensors_par 2015 08 03 yes yes > + migrate_2015_08_03.log 2>&1 + """ + + if len(sys.argv) != 7: + print "Usage: python upload.py [table_prefix] [YYYY] [MM] [DD] " + \ + "[get_db] [do_split]\n\n" + print "get_db: yes/no" + print "do_split: yes/no" + print "set DD to -1 to get a monthly table" + sys.exit() + + main(sys.argv) diff --git a/snapshots/ddb_copy_fields.yml b/snapshots/ddb_copy_fields.yml index 8a1fb24..ea5bf86 100644 --- a/snapshots/ddb_copy_fields.yml +++ b/snapshots/ddb_copy_fields.yml @@ -62,6 +62,7 @@ prod_speech_results: - updated - uuid - wake_id + - fw speech_results: - cmd @@ -80,3 +81,19 @@ prod_speech_timeline: - account_id - sense_id - ts + +key_store: + - aes_key + - created_at + - device_id + - hw_version + - metadata + - note + +pill_key_store: + - aes_key + - created_at + - device_id + - metadata + - note + diff --git a/snapshots/snapshot_ddb.py b/snapshots/snapshot_ddb.py index aed3a7c..6bf94de 100644 --- a/snapshots/snapshot_ddb.py +++ b/snapshots/snapshot_ddb.py @@ -18,7 +18,6 @@ CAPACITY_CHANGE_WAIT_TIME = 12 # * 10secs WAIT_TIME_CHUNK = 10 CAPACITY_CHANGE_MAX_MINUTES = 20 -COPY_READ_RATIO = 95 # use 95% of original reads AWS_SECRET = os.getenv("aws_secret_access_key") # needed to access dynamo AWS_ACCESS_KEY = os.getenv("aws_access_key_id") @@ -62,7 +61,7 @@ def update_read_throughput(table, updated_read_throughput, write_throughput): def post_slack(text): """Update Slack""" - payload = {'text': text, 'channel': '#research', 'username': 'redshit_snapshots'} + payload = {'text': text, 'channel': '#redshift', 'username': 'redshit_snapshots'} try: requests.post(SLACK_URL, data=json.dumps(payload), headers=SLACK_HEADERS) @@ -97,8 +96,12 @@ def rename_table(from_table_name, to_table_name, drop=False): def grant_permissions(table_name): """grant""" - grant_cmd = "%s -c \"GRANT SELECT ON %s to tim, GROUP data\"" % ( - REDSHIT_PSQL, table_name) + if table_name == 'key_store': + grant_cmd = "%s -c \"GRANT ALL ON %s TO GROUP ops\"" % ( + REDSHIT_PSQL, table_name, table_name) + else: + grant_cmd = "%s -c \"GRANT SELECT ON %s to tim, GROUP data; GRANT ALL ON %s TO GROUP ops\"" % ( + REDSHIT_PSQL, table_name, table_name) print "Step 5: Grant permissions" grant_out = subprocess.check_output(grant_cmd, shell=True) if "ERROR" in grant_out: @@ -123,6 +126,28 @@ def create_table(new_table_name, table_name): return False return True +def extra_key_store_ops(): + drop_cmd = "%s -c \"DROP TABLE IF EXISTS key_store_admin_old\"" % (REDSHIT_PSQL) + os.system(drop_cmd) + + drop_cmd = "%s -c \"DROP TABLE IF EXISTS key_store_admin_new\"" % (REDSHIT_PSQL) + os.system(drop_cmd) + + create_cmd = "%s -c \"CREATE TABLE key_store_admin_new (like key_store_admin)\"" % (REDSHIT_PSQL) + os.system(create_cmd) + + select_cmd = "%s -c \"INSERT INTO key_store_admin_new SELECT created_at, device_id, hw_version, metadata, note FROM key_store\"" % (REDSHIT_PSQL) + os.system(select_cmd) + + alter_cmd = "%s -c \"ALTER TABLE key_store_admin RENAME TO key_store_admin_old\"" % (REDSHIT_PSQL) + os.system(alter_cmd) + + alter_cmd = "%s -c \"ALTER TABLE key_store_admin_new RENAME TO key_store_admin\"" % (REDSHIT_PSQL) + os.system(alter_cmd) + + grant_cmd = "%s -c \"GRANT SELECT ON key_store_admin to admin_tool\"" % (REDSHIT_PSQL) + os.system(grant_cmd) + def main(table_name, skip_throughput=False): """main method""" @@ -143,7 +168,10 @@ def main(table_name, skip_throughput=False): # update read capacity before copying if not skip_throughput: - copy_throughput = get_new_throughput(item_count) + if table_name in ['key_store', 'pill_key_store']: + copy_throughput = 2 * org_reads + else: + copy_throughput = get_new_throughput(item_count) print "Updating read throughput to %d for copying" % copy_throughput update_res = update_read_throughput(ddb_table, copy_throughput, org_writes) @@ -155,7 +183,11 @@ def main(table_name, skip_throughput=False): sys.exit(1) print "extra 10 mins sleep for capacity change ...." - time.sleep(600) # wait another 10 minutes + time.sleep(900) # wait another 10 minutes + + read_ratio = 95 + if table_name in ['key_store', 'pill_key_store']: + read_ratio = 60 # PREVIOUSLY, truncate table # truncate_cmd = "%s -c \"TRUNCATE %s \"" % (REDSHIT_PSQL, table_name) @@ -187,8 +219,8 @@ def main(table_name, skip_throughput=False): psql -h sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com -U migrator -p 5439 -d sensors1 << EOF COPY %s %s FROM 'dynamodb://%s' credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' - READRATIO 95 timeformat 'auto' - """ % (new_table_name, fields, table_name, AWS_ACCESS_KEY, AWS_SECRET) + READRATIO %d timeformat 'auto' + """ % (new_table_name, fields, table_name, AWS_ACCESS_KEY, AWS_SECRET, read_ratio) print "Step 2: Copy command: %s" % copy_command os.system(copy_command) @@ -207,6 +239,10 @@ def main(table_name, skip_throughput=False): if grant_permissions(table_name) is False: sys.exit() + # for key_store, create new table w/o aes + if table_name == 'key_store': + extra_key_store_ops() + # change table throughput to original values if not skip_throughput: update_res = update_read_throughput(ddb_table, org_reads, org_writes) @@ -225,8 +261,8 @@ def main(table_name, skip_throughput=False): if __name__ == "__main__": - if len(sys.argv) > 3: - print "Usage: python snapshot_ddb.py [dynamodb_table]" + if len(sys.argv) < 3: + print "Usage: python snapshot_ddb.py [dynamodb_table] [skip_throughput 1=yes/0=no]" print "Available tables:" for name in COPY_FIELDS: print "\t%s" % name diff --git a/snapshots/snapshot_rds.sh b/snapshots/snapshot_rds.sh index 49f631c..8c063e7 100755 --- a/snapshots/snapshot_rds.sh +++ b/snapshots/snapshot_rds.sh @@ -3,14 +3,16 @@ if [[ -z "$1" ]] || [[ -z "$2" ]] then echo "[ERROR] ./copy_rds.sh [table_name] [gzip | nozip]" - echo "valid tables: questions response_choices account_questions responses timeline_feedback" + echo "valid tables: " + echo "questions response_choices account_questions responses timeline_feedback" + echo "account_device_map account_tracker_map" exit 1 fi table_name=$1 gzipped=$2 -TABLES=(questions response_choices account_questions responses timeline_feedback) +TABLES=(questions response_choices account_questions responses timeline_feedback account_device_map account_tracker_map) if [[ " ${TABLES[@]} " =~ " $1 " ]]; then echo "table $1 is valid" @@ -36,6 +38,7 @@ fi echo "migrating ${table_name} ${gzipped} to ${tmp_filename}" + #### download RDS table echo "RDS Download: ${rds_copy}" psql -h common-replica-1.cdawj8qazvva.us-east-1.rds.amazonaws.com -d common -U common << EOF @@ -50,23 +53,33 @@ if [ "$gzipped" == "gzip" ]; then s3_filename="${s3_filename}.gz" fi + #### copy to S3, gzipped if needed aws s3 cp ${tmp_filename} ${s3_filename} + +#### DROP temp tables and re-create "new" table +echo "create temp tables" +psql -h sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com -U migrator -p 5439 -d sensors1 << EOF +DROP TABLE ${table_name}_old; +EOF + sleep 1 -#### truncate redshift table psql -h sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com -U migrator -p 5439 -d sensors1 << EOF -TRUNCATE ${table_name}; +CREATE TABLE ${table_name}_new (like ${table_name}); EOF -sleep 2 +sleep 1 + #### copy data from S3 to redshift AWS_SECRET_KEY=$AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY=$AWS_ACCESS_KEY_ID -copy_to_redshift="COPY ${table_name} FROM '${s3_filename}' credentials 'aws_access_key_id=${AWS_ACCESS_KEY};aws_secret_access_key=${AWS_SECRET_KEY}' CSV" +copy_to_redshift="COPY ${table_name}_new FROM '${s3_filename}' credentials 'aws_access_key_id=${AWS_ACCESS_KEY};aws_secret_access_key=${AWS_SECRET_KEY}' CSV" + +echo "copying ${s3_filename} to redshift" if [ "$gzipped" == "gzip" ]; then copy_to_redshift="${copy_to_redshift} GZIP" @@ -78,6 +91,26 @@ psql -h sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com -U migrator -p 54 ${copy_to_redshift}; EOF +sleep 1 + + +# RENAME tables +echo "table gymnastics" +psql -h sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com -U migrator -p 5439 -d sensors1 << EOF + ALTER TABLE ${table_name} RENAME TO ${table_name}_old; + ALTER TABLE ${table_name}_new RENAME TO ${table_name}; +EOF + +sleep 1 + + +# RE-GRANT permissions +echo "enable permissions" +psql -h sensors2.cy7n0vzxfedi.us-east-1.redshift.amazonaws.com -U migrator -p 5439 -d sensors1 << EOF + GRANT SELECT ON ${table_name} TO GROUP data; + GRANT ALL ON ${table_name} TO GROUP ops; +EOF #### remove temp file +echo "removing tmp file" rm ${tmp_filename} From f968701c333f9bbcee30f176346c24375767ed5e Mon Sep 17 00:00:00 2001 From: kingshy Date: Thu, 9 Feb 2017 13:39:26 -0800 Subject: [PATCH 5/6] update sort order --- configurations/analytics.sql | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/configurations/analytics.sql b/configurations/analytics.sql index a7cab71..8145989 100644 --- a/configurations/analytics.sql +++ b/configurations/analytics.sql @@ -7,15 +7,20 @@ -- * use interleaved sortkey which is better for ad-hoc queries. -- -CREATE TABLE actions ( +CREATE TABLE prod_actions ( account_id BIGINT, action VARCHAR(256) NOT NULL, -- action that user is performing result VARCHAR(256) DEFAULT NULL, -- result of action if exist. e.g. pass/fail/value ts TIMESTAMP WITHOUT TIME ZONE, -- utc offset_millis INTEGER DEFAULT NULL -- local offset if available ) DISTSTYLE KEY DISTKEY (account_id) -INTERLEAVED SORTKEY (account_id, ts, action); +INTERLEAVED SORTKEY (action, ts, account_id); + +GRANT ALL ON prod_actions to migrator; +GRANT ALL ON prod_actions to group ops; + +CREATE TABLE dev_actions (like prod_actions); +GRANT ALL ON dev_actions to migrator; +GRANT ALL ON dev_actions to group ops; -GRANT ALL ON actions to migrator; -GRANT ALL ON actions to group ops; From d09c72d1470802d921983bc259ec8d9ddf1f2942 Mon Sep 17 00:00:00 2001 From: kingshy Date: Fri, 24 Feb 2017 14:05:37 -0800 Subject: [PATCH 6/6] add prod_alarms schema --- configurations/dynamodb_snapshots.sql | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/configurations/dynamodb_snapshots.sql b/configurations/dynamodb_snapshots.sql index 503c08f..11d8fee 100644 --- a/configurations/dynamodb_snapshots.sql +++ b/configurations/dynamodb_snapshots.sql @@ -259,3 +259,14 @@ INTERLEAVED SORTKEY (device_id, metadata); GRANT ALL ON pill_key_store TO group ops; ALTER TABLE pill_key_store OWNER to migrator; + +CREATE TABLE prod_alarm ( + account_id BIGINT, + alarm_templates VARCHAR(65535), + updated_at BIGINT +) DISTSTYLE KEY DISTKEY (account_id) +COMPOUND SORTKEY (account_id, updated_at); + +GRANT ALL ON prod_alarm TO group ops; +ALTER TABLE prod_alarm OWNER to migrator; +