-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
97 lines (82 loc) · 3.69 KB
/
app.py
File metadata and controls
97 lines (82 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# app.py
import os
import re
import base64
import requests
import json
import sys
import copy
from logging_config import get_logger
from urllib.parse import urlparse
from analysis_engine import AnalysisEngine
# setup logger and default configurations
logger = get_logger(__name__)
DEFAULT_HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
download_dir = os.getcwd()
# load a JSON report template from the given file path
def load_report_template(json_path: str) -> dict:
if not os.path.exists(json_path):
logger.error(f"[-] CRITICAL: Template file not found at {json_path}")
return None
with open(json_path, 'r', encoding='utf-8') as f:
return json.load(f)
# merge analysis results into a copy of the report template
def populate_report_data(template: dict, analysis_data: dict) -> dict:
report_data = copy.deepcopy(template)
def update_recursive(target, source):
for key, value in source.items():
if isinstance(value, dict) and key in target and isinstance(target[key], dict):
update_recursive(target[key], value)
else:
target[key] = value
update_recursive(report_data, analysis_data)
return report_data
# main function to archestrate the file download and analysis process
def main():
logger.info("[*] Analysis worker started.")
report_template = load_report_template("report_template.json")
if not report_template:
sys.exit(1)
live_analysis_data = {}
try:
# get the target URL from an environment variable
target_url = os.getenv("TARGET_URL")
if not target_url: raise ValueError("TARGET_URL environment variable not set.")
logger.info(f"[*] Analyzing URL")
# handle both base64 data URIs and standard web URLs
if target_url.startswith("data:"):
match = re.match(r'data:.*?;base64,(.*)', target_url)
file_data = base64.b64decode(match.group(1))
else:
res = requests.get(target_url, headers=DEFAULT_HEADERS, timeout=30)
res.raise_for_status()
file_data = res.content
# save the file locally for analysis
parsed = urlparse(target_url)
file_name = os.path.basename(parsed.path) or "downloaded.bin"
download_path = os.path.join(download_dir, file_name)
with open(download_path, "wb") as f:
f.write(file_data)
logger.info(f"[+] File saved to: {download_path} ({len(file_data)} bytes)")
# initialize and run the analysis engine
engine = AnalysisEngine(download_path, file_data, file_name)
live_analysis_data = engine.run_all_analyses()
# set a final message based on the analysis result
if not live_analysis_data.get("is_malicious"):
live_analysis_data["message"] = "The file appears to be safe."
else:
live_analysis_data["message"] = "Malicious file detected based on YARA rules and heuristics."
except Exception as e:
# handle any exceptions during the process, marking the fiel as malicious
logger.exception(f"[!] An unexpected error occurred: {e}")
live_analysis_data["is_malicious"] = True
live_analysis_data["message"] = f"Analysis failed with an error: {e}"
live_analysis_data["risk_score"] = 100
finally:
# ensure a final report is always generated and printed to stdout
final_report = populate_report_data(report_template, live_analysis_data)
print("<YARA scan result>\n" + json.dumps(final_report, ensure_ascii=False))
sys.stdout.flush()
sys.exit(0)
if __name__ == "__main__":
main()