diff --git a/scripts/artifact_report.py b/scripts/artifact_report.py index de88464..80f293c 100755 --- a/scripts/artifact_report.py +++ b/scripts/artifact_report.py @@ -128,6 +128,24 @@ def write_artifact_data_table( if table_responsive: self.report_file.write("") + def add_trips_map(self, map_html): + """ + Add an interactive map section to the report. + + Parameters: + map_html (str): The HTML string for the map to be embedded in the report. + """ + self.report_file.write(f""" +
+ {map_html} +
+ + """) + def add_section_heading(self, heading, size='h2'): heading = html.escape(heading) data = '
' \ diff --git a/scripts/artifacts/volkswagen_Core_Information.py b/scripts/artifacts/volkswagen_Core_Information.py new file mode 100644 index 0000000..47c3109 --- /dev/null +++ b/scripts/artifacts/volkswagen_Core_Information.py @@ -0,0 +1,157 @@ +__artifacts_v2__ = { + "VW_MIB2_Core_Info": { + "name": "Core Info", + "description": "Retrieve core-related data from Volkswagen Vehicles", + "author": "@posiwer", + "version": "1.0", + "date": "2026-01-05", + "requirements": "none", + "category": "Volkswagen Vehicle", + "notes": "", + "paths": ('*/startup-*/*SESSION-*', ), + "function": "get_Core_Information" + } +} + +import os +import gzip +import shutil +import json +import re +from datetime import datetime +from scripts.artifact_report import ArtifactHtmlReport +from scripts.ilapfuncs import tsv, logfunc + +def extract_gz_sessions(files_found, output_folder): + os.makedirs(output_folder, exist_ok=True) + extracted_files = [] + for file in files_found: + filename = os.path.basename(file) + parent_folder = os.path.basename(os.path.dirname(file)) + + if filename.startswith("SESSION-") and filename.endswith(".gz"): + expected_uncompressed = filename[:-3] + sibling_uncompressed = os.path.join(os.path.dirname(file), expected_uncompressed) + + if os.path.exists(sibling_uncompressed): + logfunc(f"[INFO] Skipping {filename} (uncompressed version exists)") + continue + + output_filename = f"{parent_folder}_{expected_uncompressed}" + output_path = os.path.join(output_folder, output_filename) + + try: + with gzip.open(file, 'rb') as f_in, open(output_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + + extracted_files.append(output_path) + except Exception as e: + logfunc(f"[ERROR] Failed to extract {filename}: {str(e)}") + return extracted_files + +def copy_existing_sessions(files_found, output_folder): + os.makedirs(output_folder, exist_ok=True) + copied_files = [] + for file in files_found: + filename = os.path.basename(file) + parent_folder = os.path.basename(os.path.dirname(file)) + if filename.startswith("SESSION-") and not filename.endswith(".gz"): + output_filename = f"{parent_folder}_{filename}" + output_path = os.path.join(output_folder, output_filename) + + try: + shutil.copy(file, output_path) + copied_files.append(output_path) + except Exception as e: + logfunc(f"[ERROR] Failed to copy {filename}: {str(e)}") + return copied_files + +def parse_vcard_fields(vcard): + name = number = call_type = date = '' + lines = vcard.splitlines() + + for line in lines: + if line.startswith('FN:') or line.startswith('N:'): + name = line.split(':', 1)[1].strip() + elif line.startswith('TEL'): + number = line.split(':', 1)[1].strip() + elif 'X-IRMC-CALL-DATETIME' in line: + parts = line.split(':') + if len(parts) == 2: + raw_dt = parts[1].strip() + meta = parts[0] + if 'TYPE=' in meta: + call_type = meta.split('TYPE=')[-1].strip().capitalize() + else: + call_type = 'Unknown' + try: + dt = datetime.strptime(raw_dt, "%Y%m%dT%H%M%S") + date = dt.strftime("%Y-%m-%d %H:%M:%S") + except: + date = raw_dt + + if all([name, number, call_type, date]): + return (date, number, name, call_type) + return None + +def extract_connected_device(line): + if "serviceState=" in line and "Connected" in line and "bdAddr=" in line and "name=" in line: + mac_match = re.search(r'bdAddr=\s*0x([0-9a-fA-F]+)', line) + name_match = re.search(r"name=\s*'([^']+)'", line) + if mac_match and name_match: + mac_raw = mac_match.group(1) + mac = ':'.join(mac_raw[i:i+2] for i in range(0, len(mac_raw), 2)).upper() + name = name_match.group(1).strip() + return name, mac + return None + +def get_Core_Information(files_found, report_folder, seeker, wrap_text, time_offset): + extraction_folder = os.path.join(report_folder, "Extracted Sessions") + extracted = extract_gz_sessions(files_found, extraction_folder) + copied = copy_existing_sessions(files_found, extraction_folder) + session_files = extracted + copied + + records = [] + + for sf in session_files: + source_file = os.path.basename(sf) + try: + with open(sf, 'r', encoding='utf-8', errors='ignore') as f: + lines = f.readlines() + + current_device = ("Unknown", "Unknown") + + for idx, line in enumerate(lines): + if "serviceState=" in line and "Connected" in line: + device_info = extract_connected_device(line) + if device_info: + current_device = device_info + + if 'BT_APPL_PIM_DATA_IND' in line and 'BEGIN:VCARD' in line: + start = line.find('BEGIN:VCARD') + end = line.find('END:VCARD') + len('END:VCARD') + if start != -1 and end != -1: + vcard_raw = line[start:end].replace('..', '\n').strip() + parsed = parse_vcard_fields(vcard_raw) + if parsed: + date, number, name, call_type = parsed + records.append(( + source_file, date, number, name, call_type, + current_device[0], current_device[1] + )) + except Exception as e: + logfunc(f"[ERROR] Failed to process {source_file}: {str(e)}") + continue + + if records: + headers = ['Source File', 'Date', 'Phone Number', 'Contact Name', 'Call Type', 'Device Name', 'Device MAC'] + report = ArtifactHtmlReport('Bluetooth Call Logs') + report.start_artifact_report(report_folder, 'Bluetooth Call Logs') + report.add_script() + report.write_artifact_data_table(headers, records, 'core/startup-*/SESSION-*') + report.end_artifact_report() + + tsv(report_folder, headers, records, 'Bluetooth Call Logs') + + with open(os.path.join(report_folder, 'bluetooth_calls.json'), 'w', encoding='utf-8') as f: + json.dump([dict(zip(headers, r)) for r in records], f, indent=4, ensure_ascii=False) diff --git a/scripts/artifacts/volkswagen_Trips.py b/scripts/artifacts/volkswagen_Trips.py new file mode 100644 index 0000000..363316f --- /dev/null +++ b/scripts/artifacts/volkswagen_Trips.py @@ -0,0 +1,234 @@ +__artifacts_v2__ = { + "VW_MIB2_Trips": { + "name": "Trip Data", + "description": "Retrieve Trip Data from Volkswagen Vehicles", + "author": "@posiwer", + "version": "1.0", + "date": "2026-01-05", + "requirements": "none", + "category": "Volkswagen Vehicle", + "notes": "", + "paths": ('*/*user1.db*'), + "function": "get_Trips" + } +} + +import os +import sqlite3 +import json +from scripts.artifact_report import ArtifactHtmlReport +from scripts.ilapfuncs import tsv + +def get_Trips(files_found, report_folder, seeker, wrap_text, time_offset): + file_found = str(files_found[0]) + + if not file_found.endswith('user1.db'): + return + + db = sqlite3.connect(file_found) + cursor = db.cursor() + + cursor.execute('''SELECT id, latitude, longitude, + DATETIME(unixtime, 'unixepoch') AS datetime, seqNo, tripId, gxHash, gyHash, forceSave + FROM trips''') + all_rows = cursor.fetchall() + + if len(all_rows) == 0: + return + + headers = ['ID', 'Latitude', 'Longitude', 'Date & Time', 'Sequence Number', 'Trip ID', 'GX Hash', 'GY Hash', 'Force Save'] + + report = ArtifactHtmlReport('Trips') + report.start_artifact_report(report_folder, 'Trips') + report.add_script() + + data_list = [] + unique_dates = set() + + for row in all_rows: + data_list.append(row) + unique_dates.add(row[3].split(' ')[0]) + + sorted_dates = sorted(unique_dates, reverse=True) + + report.add_section_heading('Trips Table') + report.write_artifact_data_table(headers, data_list, file_found) + + map_html = f""" +
+ + +
+ +
+ + + + + """ + + report.add_section_heading('Trips Map') + report.add_trips_map(map_html) + + tsv(report_folder, headers, data_list, 'Trips') + + json_path = os.path.join(report_folder, 'trips.json') + with open(json_path, mode='w', encoding='utf-8') as json_file: + json.dump([{headers[i]: row[i] for i in range(len(headers))} for row in data_list], json_file, indent=4, ensure_ascii=False) + + # Parking Status + cursor.execute('''SELECT id, DATETIME(visitTime, 'unixepoch') AS visitTime, latitude, longitude, tripId, DATETIME(lastDecay, 'unixepoch') AS lastDecay, duration_mins, description + FROM parkingStatus''') + all_rows = cursor.fetchall() + + if len(all_rows) == 0: + return + + headers = ['ID', 'Visit Time', 'Latitude', 'Longitude', 'Trip ID', 'Last Decay', 'Duration (mins)', 'Description'] + + report = ArtifactHtmlReport('Parking Status') + report.start_artifact_report(report_folder, 'Parking Status') + report.add_script() + + data_list = [] + + for row in all_rows: + data_list.append((row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7])) + + report.add_section_heading('Parking Status Table') + report.write_artifact_data_table(headers, data_list, file_found) + + map_html = f""" +
+ + + + + """ + + report.add_section_heading('Parking Status Map') + report.add_trips_map(map_html) + + tsv(report_folder, headers, data_list, 'Parking Status') + + db.close() diff --git a/scripts/artifacts/volkswagen_User_Information.py b/scripts/artifacts/volkswagen_User_Information.py new file mode 100644 index 0000000..0f8c3ac --- /dev/null +++ b/scripts/artifacts/volkswagen_User_Information.py @@ -0,0 +1,213 @@ +__artifacts_v2__ = { + "VW_MIB2_User_Info": { + "name": "User Info", + "description": "Retrieve user-related data from Volkswagen Vehicles", + "author": "@posiwer", + "version": "1.0", + "date": "2026-01-05", + "requirements": "none", + "category": "Volkswagen Vehicle", + "notes": "Updated to include contact photos when available", + "paths": ('*/*db00000*', '*/*photo*'), + "function": "get_User_Information" + } +} + +import os +import sqlite3 +import json +import shutil +from scripts.artifact_report import ArtifactHtmlReport +from scripts.ilapfuncs import tsv + +def get_User_Information(files_found, report_folder, seeker, wrap_text, time_offset): + db_file = None + photo_files = [] + + for file in files_found: + if file.endswith('db0000060'): + db_file = file + elif file.lower().endswith('.jpg'): + photo_files.append(file) + + if not db_file: + return + + photos_dir = os.path.join(report_folder, 'Contact Photos') + os.makedirs(photos_dir, exist_ok=True) + + conn = sqlite3.connect(db_file) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + cursor.execute(''' + SELECT + pt.persID, + prt.profileID as deviceID, + prt.profileName AS deviceName, + pt.pictureID, + UPPER( + SUBSTR(prt.macAddress, 1, 2) || ':' || + SUBSTR(prt.macAddress, 3, 2) || ':' || + SUBSTR(prt.macAddress, 5, 2) || ':' || + SUBSTR(prt.macAddress, 7, 2) || ':' || + SUBSTR(prt.macAddress, 9, 2) || ':' || + SUBSTR(prt.macAddress, 11, 2) + ) AS MacAddress, + pnt.phoneNumber, + COALESCE(gt2.graphem, '') || + CASE + WHEN gt1.graphem IS NOT NULL AND gt2.graphem IS NOT NULL THEN ' ' + ELSE '' + END || + COALESCE(gt1.graphem, '') AS fullName, + pt.pictureType AS hasPicture + FROM phoneNumberTable AS pnt + LEFT JOIN personalTable AS pt ON pt.persID = pnt.persID + LEFT JOIN graphemTable AS gt1 ON pt.lastNameGraphem = gt1.graphemID + LEFT JOIN graphemTable AS gt2 ON pt.firstNameGraphem = gt2.graphemID + LEFT JOIN profileTable AS prt ON pt.profileID = prt.profileID; + ''') + + contacts = cursor.fetchall() + + contact_data = [] + extracted_photos = 0 + + for contact in contacts: + picture_id = str(contact['pictureID']) + pers_id = contact['persID'] + full_name = contact['fullName'] or "Unknown Contact" + phone_number = contact['phoneNumber'] or "No Number" + device_name = contact['deviceName'] or "Unknown Device" + + safe_name = "".join(c for c in full_name if c.isalnum() or c in (' ', '_')).strip() + safe_name = safe_name.replace(' ', '_') if safe_name else f"Contact {pers_id}" + + matching_photos = [ + f for f in photo_files + if picture_id in os.path.basename(f).lower() + ] + + photo_html = "Photo not Associated" + if matching_photos: + source_photo = matching_photos[0] + + dest_filename = f"{device_name}_{safe_name}_{phone_number}_{picture_id}.jpg" + dest_path = os.path.join(photos_dir, dest_filename) + + shutil.copy2(source_photo, dest_path) + relative_path = os.path.join('Volkswagen Vehicle/Contact Photos', dest_filename) + photo_html = f'' + extracted_photos += 1 + + contact_data.append({ + 'Device': device_name, + 'Name': full_name, + 'Phone': phone_number, + 'PhotoID': picture_id, + 'Photo': photo_html, + 'PersID': pers_id + }) + + + if contact_data: + headers = ['Device', 'Name', 'Phone', 'PhotoID', 'Photo'] + data_list = [ + [item['Device'], item['Name'], item['Phone'], item['PhotoID'], item['Photo']] + for item in contact_data + ] + + report = ArtifactHtmlReport('Phone Numberss') + report.start_artifact_report(report_folder, 'Phone Numbers') + report.add_script() + report.write_artifact_data_table(headers, data_list, db_file, html_escape=False) + report.end_artifact_report() + + tsv_data = [ + [item['Device'], item['Name'], item['Phone'], item['PhotoID'], + 'Yes' if 'href' in item['Photo'] else 'No'] + for item in contact_data + ] + tsv(report_folder, headers, tsv_data, 'Phone Numbers') + + json_data = [] + for item in contact_data: + json_item = { + 'Device': item['Device'], + 'Name': item['Name'], + 'Phone': item['Phone'], + 'PhotoID': item['PhotoID'], + 'PhotoExtracted': 'Yes' if 'href' in item['Photo'] else 'No', + 'PersID': item['PersID'] + } + if 'href' in item['Photo']: + json_item['PhotoPath'] = os.path.join('Contact Photos', + f"{item['Device']}_{item['Name'].replace(' ', '_')}_{item['Phone']}_{item['PhotoID']}.jpg") + json_data.append(json_item) + + json_path = os.path.join(report_folder, 'contacts_with_photos.json') + with open(json_path, 'w', encoding='utf-8') as f: + json.dump(json_data, f, indent=4, ensure_ascii=False) + + + cursor.execute(''' + SELECT + ldt.lastDestinationID, + ldt.navLocID, + ldt.hashCode AS destinationHash, + ldt.name AS destinationName, + ldt.status AS destinationStatus, + nht.countryAbbreviation, + nht.dbVersion, + nht.flags, + nht.name AS historyName, + nht.hash AS historyHash, + nht.status AS historyStatus, + nlt.navData + FROM lastDestinationTable AS ldt + LEFT JOIN navHistoryTable AS nht ON ldt.navLocID = nht.navLocID + LEFT JOIN navLocationTable AS nlt ON ldt.navLocID = nlt.navID; + ''') + + nav_data_rows = cursor.fetchall() + + if len(nav_data_rows) > 0: + nav_headers = [ + 'Last Destination ID', + 'Nav Location ID', + 'Destination Hash', + 'Destination Name', + 'Destination Status', + 'Country Abbreviation', + 'Database Version', + 'Flags', + 'History Name', + 'History Hash', + 'History Status', + 'Nav Data' + ] + + nav_data_list = [] + for row in nav_data_rows: + row_dict = dict(zip(nav_headers, row)) + if row_dict['Nav Data'] is not None: + try: + row_dict['Nav Data'] = row_dict['Nav Data'].decode('utf-8', errors='replace') + except AttributeError: + row_dict['Nav Data'] = str(row_dict['Nav Data']) + nav_data_list.append([str(value) if value is not None else '' for value in row_dict.values()]) + + nav_report = ArtifactHtmlReport('Navigation Data') + nav_report.start_artifact_report(report_folder, 'Navigation Data') + nav_report.add_script() + nav_report.write_artifact_data_table(nav_headers, nav_data_list, db_file) + nav_report.end_artifact_report() + + tsv(report_folder, nav_headers, nav_data_list, 'Navigation Data') + + json_path = os.path.join(report_folder, 'navigation_data.json') + with open(json_path, mode='w', encoding='utf-8') as json_file: + json.dump(nav_data_list, json_file, indent=4, ensure_ascii=False) + + conn.close() \ No newline at end of file