From a39831446ad08567fd53a802752b9564ae7b65d5 Mon Sep 17 00:00:00 2001 From: "Mr.ThePlague" Date: Mon, 9 Feb 2026 17:25:49 -0500 Subject: [PATCH] Applying patches --- BloodBash | 72 +++++++++---- test_bloodbash.py | 254 ++++++++++++++++------------------------------ 2 files changed, 141 insertions(+), 185 deletions(-) diff --git a/BloodBash b/BloodBash index e6e7c02..f968b46 100644 --- a/BloodBash +++ b/BloodBash @@ -551,30 +551,54 @@ def print_shadow_credentials(G, domain_filter=None): def print_gpo_content_parsing(G, domain_filter=None): console.rule("[bold magenta]GPO Content Parsing for Exploitable Settings[/bold magenta]") + + # Debug: confirm we have access to add_finding + try: + console.print("[bold cyan]DEBUG: add_finding exists →[/bold cyan]", add_finding) + except NameError: + console.print("[bold red]CRITICAL: add_finding is NOT defined in this scope![/bold red]") + found = False exploitable_keys = ['taskname', 'scriptpath', 'scheduledtask', 'TaskName', 'ScriptPath', 'ScheduledTask'] + for n, d in G.nodes(data=True): if domain_filter and d.get('props', {}).get('domain') != domain_filter: continue - if d['type'].lower() == 'gpo': - props = d.get('props') or {} - exploitable_content = any(props.get(key) for key in exploitable_keys) - if exploitable_content: - found = True - console.print(f"[yellow]Exploitable GPO content[/yellow]: [bold cyan]{d['name']}[/bold cyan]") - for key in exploitable_keys: - if props.get(key): - console.print(f" → [cyan]{key}[/cyan]: {props[key]}") - add_finding("GPO Content", f"GPO {d['name']} has exploitable content") + + if d.get('type', '').lower() != 'gpo': + continue + + name = d.get('name') or d.get('ObjectIdentifier', 'Unnamed GPO') + props = d.get('props') or {} + + lower_props = {k.lower(): v for k, v in props.items()} + + found_keys = [k for k in exploitable_keys if k.lower() in lower_props and lower_props[k.lower()]] + + console.print(f"[dim]→ GPO {name!r} | found_keys = {found_keys}[/dim]") + + if found_keys: + found = True + console.print(f"[yellow]Exploitable GPO content detected[/yellow]: [bold cyan]{name}[/bold cyan]") + + for key in exploitable_keys: + if key.lower() in lower_props: + value = props.get(key) or lower_props.get(key.lower()) + console.print(f" → [cyan]{key}[/cyan]: {value}") + + # ─────────────────────────────────────────────────────────────── + console.print("[bold yellow]>>> About to call add_finding <<< [/bold yellow]") + try: + detail = f"GPO '{name}' has exploitable content: {', '.join(found_keys)}" + add_finding("GPO Content", detail) + console.print("[bold green]>>> add_finding SUCCESSFULLY CALLED <<< [/bold green]") + except Exception as e: + console.print("[bold red]>>> ERROR calling add_finding: [/bold red]", str(e)) + # ─────────────────────────────────────────────────────────────── + if found: - console.print(Panel( - "[bold yellow]Impact:[/bold yellow] GPOs with scheduled tasks or scripts can execute code on affected machines.\n" - "[bold]Abuse:[/bold] Modify GPO to deploy malware or backdoors.\n" - "[bold]Mitigation:[/bold] Audit GPO content; restrict editing rights.\n" - "[bold]Tools:[/bold] SharpGPOAbuse, Group Policy Management Console.", - title="Abuse Suggestions: GPO Content", - border_style="yellow" - )) + console.print("[yellow]At least one exploitable GPO found[/yellow]") + # print_abuse_panel("GPO Content") # ← comment out if noisy else: console.print("[green]No exploitable GPO content found[/green]") @@ -1024,6 +1048,8 @@ def print_kerberoastable(G, domain_filter=None): add_finding("Kerberoastable", f"{count} accounts") else: console.print("[green]None found[/green]") + + def print_as_rep_roastable(G, domain_filter=None): console.rule("[bold magenta]AS-REP Roastable Accounts (DONT_REQ_PREAUTH)[/bold magenta]") found = False @@ -1182,6 +1208,12 @@ def main(): parser.add_argument('--password-not-required', action='store_true') parser.add_argument('--shadow-credentials', action='store_true') parser.add_argument('--gpo-parsing', action='store_true') + parser.add_argument( + "--gpo-content-dir", + type=str, + default=None, + help="Directory containing GPO XML reports (e.g. from Get-GPOReport -ReportType Xml) for content analysis" + ) parser.add_argument('--constrained-delegation', action='store_true') parser.add_argument('--laps', action='store_true') parser.add_argument('--verbose', action='store_true') @@ -1252,8 +1284,12 @@ def main(): print_constrained_delegation(G, args.domain) if args.laps or run_all: print_laps_status(G, args.domain) + if args.all or args.gpo_content_dir: + print_gpo_content_analysis(G, args) if args.export: export_results(G, format_type=args.export, domain_filter=args.domain) + + print_prioritized_findings() elapsed = time.time() - start_time console.print(f"\n[italic green]Completed in {elapsed:.2f} seconds[/italic green]") diff --git a/test_bloodbash.py b/test_bloodbash.py index d901ae4..c87275c 100644 --- a/test_bloodbash.py +++ b/test_bloodbash.py @@ -8,6 +8,7 @@ import json import networkx as nx from rich.console import Console + # Load the BloodBash script by executing it in a controlled namespace bloodbash_globals = {} with open("BloodBash", "r") as f: @@ -19,15 +20,15 @@ def setUp(self): self.test_data_dir = "testData" # Temporary directory for DB/export tests self.temp_dir = tempfile.mkdtemp() - # Reset global findings between tests to avoid cross-test pollution - bloodbash_globals['global_findings'].clear() - + # Save original findings (do NOT clear here anymore) + self.original_findings = bloodbash_globals['global_findings'][:] + def tearDown(self): # Clean up temp directory shutil.rmtree(self.temp_dir) - # Clear global findings after each test - bloodbash_globals['global_findings'].clear() - + # Restore original findings state + bloodbash_globals['global_findings'][:] = self.original_findings + def _load_and_build_graph(self, test_subdir): """Helper to load JSON files from a test subdirectory and build the graph.""" test_dir = os.path.join(self.test_data_dir, test_subdir) @@ -36,17 +37,16 @@ def _load_and_build_graph(self, test_subdir): nodes = bloodbash_globals['load_json_dir'](test_dir) G, _ = bloodbash_globals['build_graph'](nodes) return G - + def _capture_output(self, func, *args, **kwargs): """Helper to capture console output using Rich's Console with StringIO.""" string_io = StringIO() test_console = Console(file=string_io, width=80, legacy_windows=False) - # Patch the console.print to use our test_console with patch.object(bloodbash_globals['console'], 'print', side_effect=test_console.print): func(*args, **kwargs) output = string_io.getvalue() return output - + def test_adcs_vulnerabilities(self): try: G = self._load_and_build_graph("adcs-tests") @@ -57,7 +57,7 @@ def test_adcs_vulnerabilities(self): self.assertIn("ESC3", output) self.assertIn("ESC6", output) self.assertIn("ESC8", output) - + def test_gpo_abuse(self): try: G = self._load_and_build_graph("gpo-tests") @@ -67,7 +67,7 @@ def test_gpo_abuse(self): self.assertIn("Weak GPO", output) self.assertIn("High-risk", output) self.assertIn("Vulnerable-GPO", output) - + def test_dcsync_rights(self): try: G = self._load_and_build_graph("dcsync-tests") @@ -76,7 +76,7 @@ def test_dcsync_rights(self): output = self._capture_output(bloodbash_globals['print_dcsync_rights'], G) self.assertIn("DCSync possible", output) self.assertIn("LOWPRIV@LAB.LOCAL", output) - + def test_rbcd(self): try: G = self._load_and_build_graph("rdbc-tests") @@ -85,7 +85,7 @@ def test_rbcd(self): output = self._capture_output(bloodbash_globals['print_rbcd'], G) self.assertIn("RBCD configured", output) self.assertIn("TARGET-COMPUTER$", output) - + def test_shortest_paths(self): try: G = self._load_and_build_graph("shortest-paths-tests") @@ -94,7 +94,7 @@ def test_shortest_paths(self): output = self._capture_output(bloodbash_globals['print_shortest_paths'], G) self.assertIn("DC1$", output) self.assertIn("USER2@LAB.LOCAL", output) - + def test_dangerous_permissions(self): try: G = self._load_and_build_graph("dangerous-permissions-tests") @@ -104,7 +104,7 @@ def test_dangerous_permissions(self): self.assertIn("Domain Admins", output) self.assertIn("GenericAll", output) self.assertIn("LOWPRIV@LAB.LOCAL", output) - + def test_kerberoastable(self): try: G = self._load_and_build_graph("kerberoastable-tests") @@ -112,7 +112,7 @@ def test_kerberoastable(self): self.skipTest(str(e)) output = self._capture_output(bloodbash_globals['print_kerberoastable'], G) self.assertIn("KERBUSER@LAB.LOCAL", output) - + def test_as_rep_roastable(self): try: G = self._load_and_build_graph("as-rep-roastable-tests") @@ -120,7 +120,7 @@ def test_as_rep_roastable(self): self.skipTest(str(e)) output = self._capture_output(bloodbash_globals['print_as_rep_roastable'], G) self.assertIn("ASREPUSER@LAB.LOCAL", output) - + def test_sessions_localadmin(self): try: G = self._load_and_build_graph("local-admin-sessions-tests") @@ -129,7 +129,7 @@ def test_sessions_localadmin(self): output = self._capture_output(bloodbash_globals['print_sessions_localadmin'], G) self.assertIn("ADMINUSER@LAB.LOCAL", output) self.assertIn("Total LocalAdmin instances", output) - + def test_get_high_value_targets(self): try: G = self._load_and_build_graph("high-value-targets-tests") @@ -139,7 +139,7 @@ def test_get_high_value_targets(self): target_names = [name for _, name, _ in targets] self.assertTrue(any("domain admins" in name.lower() for name in target_names)) self.assertTrue(any("krbtgt" in name.lower() for name in target_names)) - + def test_format_path(self): G = nx.MultiDiGraph() G.add_node("A", name="UserA") @@ -150,7 +150,7 @@ def test_format_path(self): self.assertIn("UserA", formatted) self.assertIn("AdminTo", formatted) self.assertIn("TargetB", formatted) - + def test_domain_filtering(self): try: G = self._load_and_build_graph("domain-filter-tests") @@ -160,7 +160,7 @@ def test_domain_filtering(self): self.assertIn("lab.local", output_filtered.lower()) output_all = self._capture_output(bloodbash_globals['print_verbose_summary'], G, domain_filter=None) self.assertGreater(len(output_all), len(output_filtered)) - + def test_indirect_paths(self): try: G = self._load_and_build_graph("indirect-paths-tests") @@ -170,16 +170,16 @@ def test_indirect_paths(self): self.assertIn("DOMAIN ADMINS@LAB.LOCAL", output) self.assertIn("Indirect paths", output) self.assertIn("via groups", output) - + def test_indirect_dangerous_permissions(self): try: G = self._load_and_build_graph("indirect-permissions-tests") except FileNotFoundError as e: self.skipTest(str(e)) output = self._capture_output(bloodbash_globals['print_dangerous_permissions'], G, indirect=True) - self.assertIn("DOMAIN ADMINS@LAB.LOCAL", output) self.assertIn("Indirect via group", output) - + self.assertIn("DOMAIN ADMINS@LAB.LOCAL", output) + def test_sid_history_abuse(self): try: G = self._load_and_build_graph("sid-history-tests") @@ -188,7 +188,7 @@ def test_sid_history_abuse(self): output = self._capture_output(bloodbash_globals['print_sid_history_abuse'], G) self.assertIn("SID History potential", output) self.assertIn("DOMAIN ADMINS@LAB.LOCAL", output.replace("\n", "")) - + def test_database_persistence(self): try: G = self._load_and_build_graph("adcs-tests") @@ -200,9 +200,9 @@ def test_database_persistence(self): G_loaded, _ = bloodbash_globals['load_graph_from_db'](db_path) self.assertEqual(G.number_of_nodes(), G_loaded.number_of_nodes()) self.assertEqual(G.number_of_edges(), G_loaded.number_of_edges()) - + def test_severity_scoring_and_prioritization(self): - bloodbash_globals['global_findings'].clear() + bloodbash_globals['global_findings'] = [] bloodbash_globals['add_finding']("ESC1-ESC8", "Test ESC issue") bloodbash_globals['add_finding']("Kerberoastable", "Test kerb issue") output = self._capture_output(bloodbash_globals['print_prioritized_findings']) @@ -214,7 +214,7 @@ def test_severity_scoring_and_prioritization(self): kerb_line = next((line for line in lines if "Kerberoastable" in line), None) if esc_line and kerb_line: self.assertLess(lines.index(esc_line), lines.index(kerb_line)) - + def test_export_html(self): try: G = self._load_and_build_graph("adcs-tests") @@ -229,7 +229,7 @@ def test_export_html(self): self.assertIn("", content) self.assertIn("BloodBash Report", content) self.assertIn("Prioritized Findings", content) - + def test_export_csv(self): try: G = self._load_and_build_graph("local-admin-sessions-tests") @@ -243,7 +243,7 @@ def test_export_csv(self): lines = f.readlines() self.assertGreater(len(lines), 1) self.assertIn("Principal", lines[0]) - + def test_get_indirect_paths(self): G = nx.MultiDiGraph() G.add_node("U", name="User") @@ -254,55 +254,44 @@ def test_get_indirect_paths(self): paths = bloodbash_globals['get_indirect_paths'](G, "U", "T") self.assertGreater(len(paths), 0) self.assertIn("G", paths[0]) - + def test_error_handling_invalid_json(self): - """Test that invalid JSON files are handled gracefully without crashing.""" with tempfile.TemporaryDirectory() as temp_dir: invalid_json_path = os.path.join(temp_dir, "invalid.json") with open(invalid_json_path, 'w') as f: f.write("{ invalid json }") with patch.object(bloodbash_globals['console'], 'print') as mock_print: nodes = bloodbash_globals['load_json_dir'](temp_dir) - # Check for partial match in calls (message may vary slightly) self.assertTrue(any("Warning" in str(call) and "invalid.json" in str(call) for call in mock_print.call_args_list)) self.assertEqual(len(nodes), 0) - + def test_case_sensitivity_types_and_labels(self): - """Test that types and labels are handled case-insensitively.""" - # Mock graph with mixed-case data and a high-value target G = nx.MultiDiGraph() G.add_node("U", name="User", type="USER") - G.add_node("C", name="DC1$", type="computer") # High-value name for testing + G.add_node("C", name="DC1$", type="computer") G.add_node("G", name="Group", type="GROUP") G.add_edge("U", "C", label="ADMinto") - # Test that get_high_value_targets finds the computer targets = bloodbash_globals['get_high_value_targets'](G) self.assertTrue(any("computer" in t[2].lower() for t in targets)) - # Test path formatting preserves label case path = ["U", "C"] formatted = bloodbash_globals['format_path'](G, path) self.assertIn("ADMinto", formatted) - + def test_performance_fast_mode_and_limits(self): - """Test that fast mode skips heavy computations and respects limits.""" - # Create a large mock graph with a high-value target G = nx.MultiDiGraph() - G.add_node("T", name="DC1$", type="Computer") # High-value to trigger logic + G.add_node("T", name="DC1$", type="Computer") for i in range(100): G.add_node(f"N{i}", name=f"Node{i}", type="User" if i % 2 == 0 else "Computer") if i > 0: G.add_edge(f"N{i-1}", f"N{i}", label="MemberOf") - # Fast mode should skip path computation output = self._capture_output(bloodbash_globals['print_shortest_paths'], G, fast=True, max_paths=5) self.assertIn("Fast mode enabled", output) self.assertNotIn("Length:", output) - # Test normal mode with limits (should not hang, limit to 5 paths) output = self._capture_output(bloodbash_globals['print_shortest_paths'], G, max_paths=5) path_count = output.count("Length:") self.assertLessEqual(path_count, 5) - + def test_code_duplication_roastable_checks(self): - """Test shared logic for roastable checks.""" G = nx.MultiDiGraph() G.add_node("K", name="KerbUser", type="User", props={"hasspn": True, "sensitive": False, "enabled": True}) G.add_node("A", name="AsRepUser", type="User", props={"dontreqpreauth": True, "sensitive": False, "enabled": True}) @@ -311,31 +300,24 @@ def test_code_duplication_roastable_checks(self): self.assertIn("KerbUser", kerb_output) self.assertIn("AsRepUser", asrep_output) self.assertNotIn("AsRepUser", kerb_output) - + def test_bugs_placeholder_nodes_and_missing_data(self): - """Test graph building handles placeholders and missing OIDs.""" - # Mock nodes with relationship and a high-value target to avoid early exit nodes = { "rel1": {"start": "UserA", "end": "GroupB", "label": "MemberOf"}, "UserA": {"ObjectIdentifier": "UserA", "Properties": {"name": "UserA"}, "ObjectType": "User"}, - "T": {"ObjectIdentifier": "T", "Properties": {"name": "DC1$"}, "ObjectType": "Computer"} # High-value + "T": {"ObjectIdentifier": "T", "Properties": {"name": "DC1$"}, "ObjectType": "Computer"} } G, _ = bloodbash_globals['build_graph'](nodes) self.assertIn("UserA", G.nodes) - # Check if placeholder node for "GroupB" was created groupb_node = next((n for n in G.nodes if G.nodes[n].get('name') == "GroupB"), None) - self.assertIsNotNone(groupb_node, "Placeholder node for 'GroupB' should exist") - # Then check the edge exists + self.assertIsNotNone(groupb_node) self.assertTrue(G.has_edge("UserA", groupb_node)) - + def test_security_input_validation_and_escaping(self): - """Test input validation and HTML escaping.""" with patch.object(bloodbash_globals['console'], 'print') as mock_print: nodes = bloodbash_globals['load_json_dir']("/nonexistent") - # Now it should handle gracefully without crashing mock_print.assert_called_with("[yellow]Warning: Directory '/nonexistent' not found. Skipping.[/yellow]") - self.assertEqual(len(nodes), 0) # Should return empty - # Test HTML escaping (unchanged) + self.assertEqual(len(nodes), 0) G = nx.MultiDiGraph() G.add_node("T", name="", type="User") bloodbash_globals['add_finding']("Test", "Injected") @@ -474,149 +439,104 @@ def test_export_html_with_findings(self): with open(f"{export_path}.html", 'r') as f: content = f.read() self.assertIn("Prioritized Findings", content) - self.assertNotIn("