From 8a3789bafee32bd03f764edca7bc83ca9c765de8 Mon Sep 17 00:00:00 2001 From: revprm Date: Sun, 7 Dec 2025 10:03:22 +0700 Subject: [PATCH 1/3] fix: Add error handling for XML parsing in notification history --- scripts/artifacts/notificationHistory.py | 54 ++++++++++++++++++------ 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/scripts/artifacts/notificationHistory.py b/scripts/artifacts/notificationHistory.py index 05c90e48..1ca922d4 100644 --- a/scripts/artifacts/notificationHistory.py +++ b/scripts/artifacts/notificationHistory.py @@ -39,8 +39,21 @@ def get_notificationHistory(files_found, report_folder, seeker, wrap_text): multi_root = True tree = abxread(file_found, multi_root) else: - tree = ET.parse(file_found) - + try: + tree = ET.parse(file_found) + except ET.ParseError: + with open(file_found, "r", encoding="utf-8", errors="ignore") as f: + raw = f.read() + cut_point = raw.rfind("") + if cut_point != -1: + clean_xml = raw[ + : cut_point + 11 + ] + tree = ET.ElementTree(ET.fromstring(clean_xml)) + else: + logfunc(f"Error: Could not recover {file_name}") + continue + root = tree.getroot() for setting in root.findall(".//setting"): if setting.attrib.get('name') == 'notification_history_enabled': @@ -58,22 +71,37 @@ def get_notificationHistory(files_found, report_folder, seeker, wrap_text): data_headers = ('Status', 'User') report.write_artifact_data_table(data_headers, data_list, file_found) report.end_artifact_report() - + tsvname = f'Android Notification History - Status' tsv(report_folder, data_headers, data_list, tsvname) - + else: logfunc('No Android Notification History - Status data available') - + #parsing notification_policy.xml - if file_name.endswith('notification_policy.xml'): + elif file_name.endswith('notification_policy.xml'): data_list = [] if (checkabx(file_found)): multi_root = False tree = abxread(file_found, multi_root) else: - tree = ET.parse(file_found) - + try: + tree = ET.parse(file_found) + except ET.ParseError: + with open(file_found, "r", encoding="utf-8", errors="ignore") as f: + raw = f.read() + if "" in raw: + clean_xml = raw[: raw.rfind("") + 22] + tree = ET.ElementTree(ET.fromstring(clean_xml)) + elif "" in raw: + clean_xml = raw[ + : raw.rfind("") + 24 + ] + tree = ET.ElementTree(ET.fromstring(clean_xml)) + else: + logfunc(f"Error: Could not recover {file_name}") + continue + root = tree.getroot() for elem in root: if elem.tag == 'snoozed-notifications': @@ -94,10 +122,10 @@ def get_notificationHistory(files_found, report_folder, seeker, wrap_text): data_headers = ('Reminder Time', 'Snoozed Notification') report.write_artifact_data_table(data_headers, data_list, file_found) report.end_artifact_report() - + tsvname = f'Android Notification History - Snoozed notifications' tsv(report_folder, data_headers, data_list, tsvname) - + else: logfunc('No Android Notification History - Snoozed notifications data available') @@ -116,7 +144,7 @@ def get_notificationHistory(files_found, report_folder, seeker, wrap_text): major_version = notification_history.major_version if notification_history.HasField('major_version') else None # notification format version should be 1 for notification in notification_history.notification: package_name = notification.package if notification.package else package_map.get(notification.package_index, "") #retrieves package from the map if not stored locally - + #this block tries to fetch the value of each field from within the parsed protobuf file e.g. variable user_id -> recovers the user_id field from the pb fields = ['uid', 'user_id', 'package_index', 'channel_name', 'channel_id','channel_id_index', 'channel_name_index', 'conversation_id', 'conversation_id_index'] defaults = {field: 'Error' for field in fields} @@ -172,10 +200,10 @@ def get_notificationHistory(files_found, report_folder, seeker, wrap_text): file_directory = os.path.dirname(file_found) report.write_artifact_data_table(data_headers, data_pb_list, file_directory, html_escape=False) report.end_artifact_report() - + tsvname = f'Android Notification History - Notifications' tsv(report_folder, data_headers, data_pb_list, tsvname) - + tlactivity = f'Android Notification History - Notifications' timeline(report_folder, tlactivity, data_pb_list, data_headers) else: From dc2894c9202ed95db27ab1d8ebafdab89859631c Mon Sep 17 00:00:00 2001 From: revprm Date: Sun, 7 Dec 2025 10:43:27 +0700 Subject: [PATCH 2/3] fix: Enhance Firefox data retrieval with metadata table checks --- scripts/artifacts/firefox.py | 127 ++++++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 40 deletions(-) diff --git a/scripts/artifacts/firefox.py b/scripts/artifacts/firefox.py index e575e480..380dcd8a 100755 --- a/scripts/artifacts/firefox.py +++ b/scripts/artifacts/firefox.py @@ -5,39 +5,83 @@ from scripts.artifact_report import ArtifactHtmlReport from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly +def table_exists(cursor, table_name): + """Check if a table exists in the database""" + cursor.execute( + """ + SELECT name FROM sqlite_master + WHERE type='table' AND name=? + """, + (table_name,), + ) + return cursor.fetchone() is not None + + def get_firefox(files_found, report_folder, seeker, wrap_text): - + for file_found in files_found: file_found = str(file_found) if not os.path.basename(file_found) == 'places.sqlite': # skip -journal and other files continue - + db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - cursor.execute(''' - SELECT - datetime(moz_places.last_visit_date_local/1000, 'unixepoch') AS LastVisitDate, - moz_places.url AS URL, - moz_places.title AS Title, - moz_places.visit_count_local AS VisitCount, - moz_places.description AS Description, - CASE - WHEN moz_places.hidden = 0 THEN 'No' - WHEN moz_places.hidden = 1 THEN 'Yes' - END AS Hidden, - CASE - WHEN moz_places.typed = 0 THEN 'No' - WHEN moz_places.typed = 1 THEN 'Yes' - END AS Typed, - moz_places.frecency AS Frecency, - moz_places.preview_image_url AS PreviewImageURL - FROM - moz_places - INNER JOIN moz_historyvisits ON moz_places.origin_id = moz_historyvisits.id - INNER JOIN moz_places_metadata ON moz_places.id = moz_places_metadata.id - ORDER BY - moz_places.last_visit_date_local ASC - ''') + + has_metadata_table = table_exists(cursor, "moz_places_metadata") + + if has_metadata_table: + cursor.execute( + """ + SELECT + datetime(moz_places.last_visit_date_local/1000, 'unixepoch') AS LastVisitDate, + moz_places.url AS URL, + moz_places.title AS Title, + moz_places.visit_count_local AS VisitCount, + moz_places.description AS Description, + CASE + WHEN moz_places.hidden = 0 THEN 'No' + WHEN moz_places.hidden = 1 THEN 'Yes' + END AS Hidden, + CASE + WHEN moz_places.typed = 0 THEN 'No' + WHEN moz_places.typed = 1 THEN 'Yes' + END AS Typed, + moz_places.frecency AS Frecency, + moz_places.preview_image_url AS PreviewImageURL + FROM + moz_places + INNER JOIN moz_historyvisits ON moz_places.origin_id = moz_historyvisits.id + INNER JOIN moz_places_metadata ON moz_places.id = moz_places_metadata.id + ORDER BY + moz_places.last_visit_date_local ASC + """ + ) + else: + cursor.execute( + """ + SELECT + datetime(moz_places.last_visit_date_local/1000, 'unixepoch') AS LastVisitDate, + moz_places.url AS URL, + moz_places.title AS Title, + moz_places.visit_count_local AS VisitCount, + moz_places.description AS Description, + CASE + WHEN moz_places.hidden = 0 THEN 'No' + WHEN moz_places.hidden = 1 THEN 'Yes' + END AS Hidden, + CASE + WHEN moz_places.typed = 0 THEN 'No' + WHEN moz_places.typed = 1 THEN 'Yes' + END AS Typed, + moz_places.frecency AS Frecency, + moz_places.preview_image_url AS PreviewImageURL + FROM + moz_places + WHERE moz_places.last_visit_date_local IS NOT NULL + ORDER BY + moz_places.last_visit_date_local ASC + """ + ) all_rows = cursor.fetchall() usageentries = len(all_rows) @@ -52,15 +96,15 @@ def get_firefox(files_found, report_folder, seeker, wrap_text): report.write_artifact_data_table(data_headers, data_list, file_found) report.end_artifact_report() - + tsvname = f'Firefox - Web History' tsv(report_folder, data_headers, data_list, tsvname) - + tlactivity = f'Firefox - Web History' timeline(report_folder, tlactivity, data_list, data_headers) else: logfunc('No Firefox - Web History data available') - + cursor = db.cursor() cursor.execute(''' SELECT @@ -104,15 +148,15 @@ def get_firefox(files_found, report_folder, seeker, wrap_text): report.write_artifact_data_table(data_headers, data_list, file_found) report.end_artifact_report() - + tsvname = f'Firefox - Web Visits' tsv(report_folder, data_headers, data_list, tsvname) - + tlactivity = f'Firefox - Web Visits' timeline(report_folder, tlactivity, data_list, data_headers) else: logfunc('No Firefox - Web Visits data available') - + cursor = db.cursor() cursor.execute(''' SELECT @@ -147,15 +191,16 @@ def get_firefox(files_found, report_folder, seeker, wrap_text): report.write_artifact_data_table(data_headers, data_list, file_found) report.end_artifact_report() - + tsvname = f'Firefox - Bookmarks' tsv(report_folder, data_headers, data_list, tsvname) - + tlactivity = f'Firefox - Bookmarks' timeline(report_folder, tlactivity, data_list, data_headers) else: logfunc('No Firefox - Bookmarks data available') - + + if table_exists(cursor, "moz_places_metadata_search_queries"): cursor = db.cursor() cursor.execute(''' SELECT @@ -178,18 +223,20 @@ def get_firefox(files_found, report_folder, seeker, wrap_text): report.write_artifact_data_table(data_headers, data_list, file_found) report.end_artifact_report() - + tsvname = f'Firefox - Search Terms' tsv(report_folder, data_headers, data_list, tsvname) - + else: logfunc('No Firefox - Search Terms data available') - - db.close() + else: + logfunc('No Firefox - Search Terms data available') + + db.close() __artifacts__ = { "Firefox": ( "Firefox", ('*/org.mozilla.firefox/files/places.sqlite*'), get_firefox) -} \ No newline at end of file +} From 7bd06a5b28ad0b7ea8f8464cb9c9f4c52e04f87e Mon Sep 17 00:00:00 2001 From: revprm Date: Tue, 9 Dec 2025 08:06:42 +0700 Subject: [PATCH 3/3] fix: Improve user database retrieval and error handling in Wire Messenger --- scripts/artifacts/wireMessenger.py | 105 +++++++++++++++++------------ 1 file changed, 62 insertions(+), 43 deletions(-) diff --git a/scripts/artifacts/wireMessenger.py b/scripts/artifacts/wireMessenger.py index cf052453..0dc6f920 100644 --- a/scripts/artifacts/wireMessenger.py +++ b/scripts/artifacts/wireMessenger.py @@ -65,44 +65,54 @@ def get_user_id(files_found): #wire names the database from the user id #this function checks the files found for the database matching the user id -#it also checks that the user id located is in a uuid format +#it also checks that the user id located is in a uuid format def get_user_database(files_found, user_id): #create a regular expression for uuids and compile it - re_uuid = r'[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}$' + re_uuid = r"[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}$" uuid_match = re.compile(re_uuid) - + if re.match(uuid_match, user_id): for file_found in files_found: - is_sqlite3 = lambda file_found: open(file_found, 'rb').read(16) == b'SQLite format 3\x00' file_found = str(file_found) user_id = str(user_id) + + #Skip cache folders entirely (avoids broken symlinks/ghost files) + if "/cache/" in file_found or "\\cache\\" in file_found: + continue + #check if the file found ends with the uuid user id if file_found.endswith(user_id): - #check if the file found is also a sqlite database and is not a directory - if is_sqlite3 and not isdir(file_found): - user_database = file_found - #returns a string of the user database so it can then be queried using sqlite3 - return user_database - else: - logfunc("Located a file ending with the User ID, but it is not an SQLite DB - I'll continue looking!") - pass - else: - continue + if not isdir(file_found): + try: + with open(file_found, "rb") as f: + file_header = f.read(16) + if file_header == b"SQLite format 3\x00": + return file_found + except (IOError, OSError): + #Silently continue if file cannot be opened (e.g. broken link) + continue else: logfunc("User ID is not a UUID - decoding not supported!") pass + return None + def get_wire_profile(files_found, report_folder, seeker, wrap_text): #get the user id and the database of the user user_id = get_user_id(files_found) user_database = get_user_database(files_found, user_id) + + if not user_database: + logfunc("Valid Wire SQLite database not found.") + return + #create an empty list for the profile data profile_data = list() - + #connect to the db db = open_sqlite_db_readonly(user_database) cursor = db.cursor() - #sqlite query time + #sqlite query time cursor.execute(''' SELECT Users._id AS "User ID", Users.name AS "Display Name", @@ -120,7 +130,7 @@ def get_wire_profile(files_found, report_folder, seeker, wrap_text): all_rows = cursor.fetchall() db.close() usage_entries = len(all_rows) - + #check if there were any entries in the database if usage_entries > 0: #iterate through all rows in the database @@ -136,15 +146,15 @@ def get_wire_profile(files_found, report_folder, seeker, wrap_text): #update the boolean value to ensure that the profile picture gets parsed found_profile_picture = True - #check if there was a profile picture located + #check if there was a profile picture located if found_profile_picture: profile_data.append((row[0],row[1],row[2],row[3],row[4], row[5], row[6], row[7], row[8], thumb)) else: profile_data.append((row[0],row[1],row[2],row[3],row[4], row[5], row[6], row[7], row[8], "No profile picture located")) - + else: logfunc("No entries in the SQLite database!") - + #checks if the profile data variable is populated then writes the data to the report if profile_data: description = 'Parses details about the user profile for Wire Messenger' @@ -155,7 +165,7 @@ def get_wire_profile(files_found, report_folder, seeker, wrap_text): 'Verification Device','Device Model','Date Registered','Profile Picture Name', 'Profile Picture') report.write_artifact_data_table(data_headers, profile_data, user_database, html_escape=False) report.end_artifact_report() - + tsvname = 'Wire User Profile' tsv(report_folder, data_headers, profile_data, tsvname) else: @@ -165,13 +175,18 @@ def get_wire_contacts(files_found, report_folder, seeker, wrap_text): #get the user id and the database of the user user_id = get_user_id(files_found) user_database = get_user_database(files_found, user_id) + + if not user_database: + logfunc("Valid Wire SQLite database not found.") + return + #create an empty list for the contacts data contacts_data = list() - + #connect to the db db = open_sqlite_db_readonly(user_database) cursor = db.cursor() - #sqlite query time + #sqlite query time cursor.execute(''' Select Users._id As "User ID", Users.name As "Display Name", @@ -182,28 +197,28 @@ def get_wire_contacts(files_found, report_folder, seeker, wrap_text): From Users Where Users.connection != 'self' ''') - + all_rows = cursor.fetchall() db.close() usage_entries = len(all_rows) - + #check if there were any entries in the database if usage_entries > 0: #iterate through all rows in the database for row in all_rows: - + user_id = row[0] display_name = row[1] handle_id = row[2] conn_status = row[3] conn_time = row[4] profile_picture = row[5] - + contacts_data.append((user_id, display_name, handle_id, conn_status, conn_time, profile_picture)) else: logfunc("No entries in the SQLite database!") - + #checks if the contacts data variable is populated then writes the data to the report if contacts_data: description = 'Parses details about the user contacts for Wire Messenger' @@ -213,19 +228,24 @@ def get_wire_contacts(files_found, report_folder, seeker, wrap_text): data_headers = ('User ID', 'Display Name', 'Handle ID', 'Connection Status', 'Connection Time', 'Profile Picture ID') report.write_artifact_data_table(data_headers, contacts_data, user_database, html_escape=False) report.end_artifact_report() - + tsvname = 'Wire User Contacts' tsv(report_folder, data_headers, contacts_data, tsvname) else: logfunc("No contacts data located!") - + def get_wire_messages(files_found, report_folder, seeker, wrap_text): #get the user id and the database of the user user_id = get_user_id(files_found) user_database = get_user_database(files_found, user_id) + + if not user_database: + logfunc("Valid Wire SQLite database not found.") + return + #create an empty list for the messages data messages_data = list() - + #connect to the db db = open_sqlite_db_readonly(user_database) cursor = db.cursor() @@ -236,9 +256,9 @@ def get_wire_messages(files_found, report_folder, seeker, wrap_text): timestamp FROM MsgDeletion; ''') - + deleted_rows = cursor.fetchall() - + if len(deleted_rows) == 0: cursor.execute(''' Select @@ -259,9 +279,9 @@ def get_wire_messages(files_found, report_folder, seeker, wrap_text): LEFT JOIN Assets2 ON Messages.asset_id = Assets2._id Order By time; ''') - + #closing read only db and opening write mode - #inserting the deleted messages into the Messages table + #inserting the deleted messages into the Messages table else: db.close() db = sqlite3.connect(user_database) @@ -276,7 +296,7 @@ def get_wire_messages(files_found, report_folder, seeker, wrap_text): UPDATE Messages SET msg_type = 'Deleted' WHERE msg_type = ''; ''') - + cursor.execute(''' Select datetime(Messages.time / 1000, 'unixepoch'), @@ -296,16 +316,16 @@ def get_wire_messages(files_found, report_folder, seeker, wrap_text): LEFT JOIN Assets2 ON Messages.asset_id = Assets2._id Order By time; ''') - + all_rows = cursor.fetchall() db.close() usage_entries = len(all_rows) - + #check if there were any entries in the database if usage_entries > 0: #iterate through all rows in the database for row in all_rows: - + date_time = row[0] message_id = row[1] user_id = row[2] @@ -316,13 +336,12 @@ def get_wire_messages(files_found, report_folder, seeker, wrap_text): reacted_by = row[7] call_duration = row[8] asset_id = row[9] - - + messages_data.append((date_time, message_id, user_id,message_type, message_content, reaction, reaction_dt, reacted_by, call_duration, asset_id)) else: logfunc("No entries in the SQLite database!") - + #checks if the messages data variable is populated then writes the data to the report if messages_data: description = 'Parses details about the messages for Wire Messenger' @@ -333,7 +352,7 @@ def get_wire_messages(files_found, report_folder, seeker, wrap_text): 'Reacted By','Call Duration','Asset ID - Check Path: /data/media/0/Pictures/Wire Images/') report.write_artifact_data_table(data_headers, messages_data, user_database, html_escape=False) report.end_artifact_report() - + tsvname = 'Wire User Contacts' tsv(report_folder, data_headers, messages_data, tsvname) else: