diff --git a/scripts/artifacts/mailprotect.py b/scripts/artifacts/mailprotect.py index fb4352904..18247850b 100644 --- a/scripts/artifacts/mailprotect.py +++ b/scripts/artifacts/mailprotect.py @@ -30,6 +30,18 @@ from scripts.lavafuncs import lava_process_artifact, lava_insert_sqlite_data from scripts.parse3 import ParseProto +#checking if the file is a valid sqlite database +def is_valid_sqlite(path): + if not os.path.isfile(path): + return False + try: + with open(path, "rb") as f: + header = f.read(16) + return header.startswith(b"SQLite format 3") + except: + return False + + def get_mailprotect(files_found, report_folder, seeker, wrap_text, timezone_offset): iOSversion = iOS.get_version() @@ -44,10 +56,14 @@ def get_mailprotect(files_found, report_folder, seeker, wrap_text, timezone_offs else: continue + if not envelope_db or not protected_db: + logfunc("MailProtect database is missing skipping.") + return() + if version.parse(iOSversion) <= version.parse("11"): logfunc("Unsupported version for iOS emails in iOS " + iOSversion) return () - + if version.parse(iOSversion) < version.parse("13"): head, end = os.path.split(envelope_db) db = sqlite3.connect(os.path.join(report_folder, "emails.db")) @@ -58,22 +74,38 @@ def get_mailprotect(files_found, report_folder, seeker, wrap_text, timezone_offs """ ) db.commit() - + + if not is_valid_sqlite(f"{head}/Protected Index"): + logfunc("Protected Index is not a valid SQLite database skipping mailprotect.") + return () + + if not is_valid_sqlite(envelope_db): + logfunc("Envelope Index is not a valid SQLite database skipping mailprotect.") + return() + cursor.execute( """ create table email2(rowid int, data text) """ ) db.commit() - db.close() + # db.close() with open_sqlite_db_readonly(os.path.join(head, "Envelope Index")) as db: + # attach_query = attach_sqlite_db_readonly(f"{head}/Protected Index", 'PI') + # cursor.execute(attach_query) + # attach_query = attach_sqlite_db_readonly(f"{report_folder}/emails.db", 'emails') + # cursor.execute(attach_query) + + # cursor = db.cursor() + cursor = db.cursor() + attach_query = attach_sqlite_db_readonly(f"{head}/Protected Index", 'PI') - cursor.execute(attach_query) + cursor.execute(attach_query) + attach_query = attach_sqlite_db_readonly(f"{report_folder}/emails.db", 'emails') - cursor.execute(attach_query) + cursor.execute(attach_query) - cursor = db.cursor() cursor.execute( """ select diff --git a/scripts/artifacts/netusage.py b/scripts/artifacts/netusage.py index d9be1c74a..7f91d4291 100644 --- a/scripts/artifacts/netusage.py +++ b/scripts/artifacts/netusage.py @@ -7,6 +7,21 @@ def pad_mac_adr(adr): return ':'.join([i.zfill(2) for i in adr.split(':')]).upper() +def safe_convert_timestamp(ts, timezone_offset): + if ts is None: + return "" + + ts = str(ts) + + if ts.startswith("0000") or ts.startswith("0001") or ts == "0" or ts.strip() == "": + return "" + + try: + return convert_utc_human_to_timezone(convert_ts_human_to_utc(ts), timezone_offset) + except: + return "" + + def get_netusage(files_found, report_folder, seeker, wrap_text, timezone_offset): for file_found in files_found: file_found = str(file_found) @@ -46,18 +61,22 @@ def get_netusage(files_found, report_folder, seeker, wrap_text, timezone_offset) data_headers = ('Last Connect Timestamp','First Usage Timestamp','Last Usage Timestamp','Bundle Name','Process Name','Type','Wifi In (Bytes)','Wifi Out (Bytes)','Mobile/WWAN In (Bytes)','Mobile/WWAN Out (Bytes)','Wired In (Bytes)','Wired Out (Bytes)') # Don't remove the comma, that is required to make this a tuple as there is only 1 element data_list = [] for row in all_rows: - if row[0] is None: - lastconnected = '' - else: - lastconnected = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[0]),timezone_offset) - if row[1] is None: - firstused = '' - else: - firstused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[1]),timezone_offset) - if row[2] is None: - lastused = '' - else: - lastused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[2]),timezone_offset) + # if row[0] is None: + # lastconnected = '' + # else: + # lastconnected = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[0]),timezone_offset) + # if row[1] is None: + # firstused = '' + # else: + # firstused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[1]),timezone_offset) + # if row[2] is None: + # lastused = '' + # else: + # lastused = convert_utc_human_to_timezone(convert_ts_human_to_utc(row[2]),timezone_offset) + lastconnected = safe_convert_timestamp(row[0], timezone_offset) + firstused = safe_convert_timestamp(row[1], timezone_offset) + lastused = safe_convert_timestamp(row[2], timezone_offset) + data_list.append((lastconnected,firstused,lastused,row[3],row[4],row[5],row[6],row[7],row[8],row[9],row[10],row[11])) diff --git a/scripts/artifacts/photosDbexif.py b/scripts/artifacts/photosDbexif.py index f857f873c..a6b348be7 100755 --- a/scripts/artifacts/photosDbexif.py +++ b/scripts/artifacts/photosDbexif.py @@ -66,20 +66,58 @@ def get_photosDbexif(files_found, report_folder, seeker, wrap_text, timezone_off #sqlite portion db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - cursor.execute(''' - SELECT - DATETIME(ZASSET.ZDATECREATED+978307200,'UNIXEPOCH') AS DATECREATED, - DATETIME(ZASSET.ZMODIFICATIONDATE+978307200,'UNIXEPOCH') AS MODIFICATIONDATE, - ZASSET.ZDIRECTORY, - ZASSET.ZFILENAME, - ZASSET.ZLATITUDE, - ZASSET.ZLONGITUDE, - ZADDITIONALASSETATTRIBUTES.ZCREATORBUNDLEID - FROM ZASSET - INNER JOIN ZADDITIONALASSETATTRIBUTES ON ZASSET.Z_PK = ZADDITIONALASSETATTRIBUTES.Z_PK - ''') - all_rows = cursor.fetchall() + try: + cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = {row[0] for row in cursor.fetchall()} + except Exception as e: + logfunc(f'Can not find table: {e}') + db.close() + continue + + use_zasset = 'ZASSET' in tables + use_zgeneric = 'ZGENERICASSET' in tables + + if not use_zasset and not use_zgeneric: + logfunc('PhotosDbexif: can not find table.') + db.close() + continue + + if use_zasset: + cursor.execute(''' + SELECT + DATETIME(ZASSET.ZDATECREATED+978307200,'UNIXEPOCH') AS DATECREATED, + DATETIME(ZASSET.ZMODIFICATIONDATE+978307200,'UNIXEPOCH') AS MODIFICATIONDATE, + ZASSET.ZDIRECTORY, + ZASSET.ZFILENAME, + ZASSET.ZLATITUDE, + ZASSET.ZLONGITUDE, + ZADDITIONALASSETATTRIBUTES.ZCREATORBUNDLEID + FROM ZASSET + INNER JOIN ZADDITIONALASSETATTRIBUTES ON ZASSET.Z_PK = ZADDITIONALASSETATTRIBUTES.Z_PK + ''') + else: + logfunc('PhotosDbexif: menggunakan ZGENERICASSET.') + select_sql = ''' + SELECT + DATETIME(ZGENERICASSET.ZDATECREATED+978307200,'UNIXEPOCH') AS DATECREATED, + DATETIME(ZGENERICASSET.ZMODIFICATIONDATE+978307200,'UNIXEPOCH') AS MODIFICATIONDATE, + ZGENERICASSET.ZDIRECTORY, + ZGENERICASSET.ZFILENAME, + ZGENERICASSET.ZLATITUDE, + ZGENERICASSET.ZLONGITUDE, + ZADDITIONALASSETATTRIBUTES.ZCREATORBUNDLEID + FROM ZGENERICASSET + INNER JOIN ZADDITIONALASSETATTRIBUTES ON ZGENERICASSET.Z_PK = ZADDITIONALASSETATTRIBUTES.Z_PK + ''' + + try: + cursor.execute(select_sql) + all_rows = cursor.fetchall() + except Exception as e: + logfunc(f'PhotosDbexif: query Photos.sqlite gagal: {e}') + db.close() + continue usageentries = len(all_rows) diff --git a/scripts/artifacts/photosMetadata.py b/scripts/artifacts/photosMetadata.py index 6c10e78d6..00f1901e4 100644 --- a/scripts/artifacts/photosMetadata.py +++ b/scripts/artifacts/photosMetadata.py @@ -12,6 +12,29 @@ from scripts.ilapfuncs import logfunc, tsv, kmlgen, timeline, is_platform_windows, generate_thumbnail, \ open_sqlite_db_readonly, iOS +import plistlib +from io import BytesIO +import json + +def safe_parse_reverse_location(raw): + if b"$archiver" in raw: + try: + return nd.deserialize_plist(BytesIO(raw)) + except Exception: + pass + + try: + return plistlib.loads(raw) + except Exception: + pass + + try: + return json.loads(raw.decode("utf-8")) + except Exception: + pass + + return None + def get_photosMetadata(files_found, report_folder, seeker, wrap_text, timezone_offset): for file_found in files_found: @@ -291,10 +314,21 @@ def get_photosMetadata(files_found, report_folder, seeker, wrap_text, timezone_o with open(pathto, 'rb') as f: try: - deserialized_plist = nd.deserialize_plist(f) - postal_address = deserialized_plist['postalAddress']['_formattedAddress'] - postal_address_subadminarea = deserialized_plist['postalAddress']['_subAdministrativeArea'] - postal_address_sublocality = deserialized_plist['postalAddress']['_subLocality'] + # deserialized_plist = nd.deserialize_plist(f) + raw = f.read() + parsed = safe_parse_reverse_location(raw) + + if parsed and "postalAddress" in parsed: + pa = parsed["postalAddress"] + postal_address = pa.get("_formattedAddress", "") + postal_address_subadminarea = pa.get("_subAdministrativeArea", "") + postal_address_sublocality = pa.get("_subLocality", "") + else: + logfunc(f"[iOS Photos] ReverseLocationData unsupported format for asset {row[0]}") + + # postal_address = deserialized_plist['postalAddress']['_formattedAddress'] + # postal_address_subadminarea = deserialized_plist['postalAddress']['_subAdministrativeArea'] + # postal_address_sublocality = deserialized_plist['postalAddress']['_subLocality'] except (KeyError, ValueError, TypeError) as ex: if str(ex).find("does not contain an '$archiver' key") >= 0: @@ -632,11 +666,24 @@ def get_photosMetadata(files_found, report_folder, seeker, wrap_text, timezone_o wf.write(row[61]) with open(pathto, 'rb') as f: + # try: + # deserialized_plist = nd.deserialize_plist(f) + # postal_address = deserialized_plist['postalAddress']['_formattedAddress'] + # postal_address_subadminarea = deserialized_plist['postalAddress']['_subAdministrativeArea'] + # postal_address_sublocality = deserialized_plist['postalAddress']['_subLocality'] try: - deserialized_plist = nd.deserialize_plist(f) - postal_address = deserialized_plist['postalAddress']['_formattedAddress'] - postal_address_subadminarea = deserialized_plist['postalAddress']['_subAdministrativeArea'] - postal_address_sublocality = deserialized_plist['postalAddress']['_subLocality'] + # deserialized_plist = nd.deserialize_plist(f) + raw = f.read() + parsed = safe_parse_reverse_location(raw) + + if parsed and "postalAddress" in parsed: + pa = parsed["postalAddress"] + postal_address = pa.get("_formattedAddress", "") + postal_address_subadminarea = pa.get("_subAdministrativeArea", "") + postal_address_sublocality = pa.get("_subLocality", "") + else: + logfunc(f"[iOS Photos] ReverseLocationData unsupported format for asset {row[0]}") + except: logfunc('Error reading exported bplist from Asset PK' + row[0]) @@ -975,10 +1022,21 @@ def get_photosMetadata(files_found, report_folder, seeker, wrap_text, timezone_o with open(pathto, 'rb') as f: try: - deserialized_plist = nd.deserialize_plist(f) - postal_address = deserialized_plist['postalAddress']['_formattedAddress'] - postal_address_subadminarea = deserialized_plist['postalAddress']['_subAdministrativeArea'] - postal_address_sublocality = deserialized_plist['postalAddress']['_subLocality'] + raw = f.read() + parsed = safe_parse_reverse_location(raw) + + if parsed and "postalAddress" in parsed: + pa = parsed["postalAddress"] + postal_address = pa.get("_formattedAddress", "") + postal_address_subadminarea = pa.get("_subAdministrativeArea", "") + postal_address_sublocality = pa.get("_subLocality", "") + else: + logfunc(f"[iOS Photos] ReverseLocationData unsupported format for asset {row[0]}") + + # deserialized_plist = nd.deserialize_plist(f) + # postal_address = deserialized_plist['postalAddress']['_formattedAddress'] + # postal_address_subadminarea = deserialized_plist['postalAddress']['_subAdministrativeArea'] + # postal_address_sublocality = deserialized_plist['postalAddress']['_subLocality'] except: logfunc('Error reading exported bplist from Asset PK' + row[0]) diff --git a/scripts/artifacts/tikTokReplied.py b/scripts/artifacts/tikTokReplied.py index 243851055..807bc94c4 100644 --- a/scripts/artifacts/tikTokReplied.py +++ b/scripts/artifacts/tikTokReplied.py @@ -53,6 +53,10 @@ def tiktok_replied(files_found, report_folder, seeker, wrap_text, timezone_offse # There are sometimes more than one table that contacts are contained in. Need to union them all together contacts_tables = [row[0] for row in table_results] + + if not contacts_tables: + logfunc("No contacts tables found, skipping TikTok replied messages. ") + return [], [], report_file # create the contact subquery contacts_subqueries = []