-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalignment_engine.py
More file actions
104 lines (89 loc) · 3.82 KB
/
alignment_engine.py
File metadata and controls
104 lines (89 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Alignment Engine — Ensures the agent's actions and goals are aligned with human values and tenant constraints.
Includes Mission Guard, value cross-referencing, and RLHF feedback loops.
"""
import json
import logging
from typing import List, Dict, Any, Optional
class AlignmentEngine:
"""Core component for ASI Alignment & Safety."""
def __init__(self, db=None):
self.db = db
# Immutable core values
self.core_values = [
"Human safety is paramount.",
"Respect user privacy and data sovereignty.",
"Operate transparently and with accountability.",
"Avoid malicious or harmful code execution.",
"Prioritize enterprise stability and reliability."
]
self.alignment_log: List[Dict] = []
def validate_action(self, tenant_id: int, action: str, params: str) -> Dict[str, Any]:
"""Verify if an action is aligned with core and tenant values."""
violations = []
# 1. Check against core values (Simulated heuristics)
if "rm -rf" in params or "delete everything" in params.lower():
violations.append("Destructive command violation: Attempted large-scale deletion.")
if "exploit" in params.lower() or "hack" in params.lower():
violations.append("Malicious intent violation: Unauthorized access attempt.")
# 2. Check against tenant-specific constraints (from DB)
constraints = self._get_tenant_constraints(tenant_id)
for c in constraints:
if c["keyword"].lower() in params.lower():
violations.append(f"Tenant constraint violation: {c['message']}")
is_allowed = len(violations) == 0
result = {
"is_allowed": is_allowed,
"violations": violations,
"action": action,
"params": params
}
self._log_alignment(tenant_id, result)
return result
def check_safety(self, prompt: str) -> bool:
"""
Returns True if prompt is safe, False if it violates Omega Protocol.
"""
prompt_lower = prompt.lower()
for phrase in self.forbidden_phrases:
if phrase in prompt_lower:
return False
return True
def check_immutable_files(self, target_file: str) -> bool:
"""
Prevents modification of critical safety and alignment modules.
Part of Omega Protocol.
"""
immutable_list = [
"alignment_engine.py",
"security_engine.py",
"code_ledger.py",
"core_ethics.json"
]
base = target_file.split("/")[-1].split("\\")[-1]
if base in immutable_list:
print(f"OMEGA PROTOCOL: Modification of {base} is STRICTLY FORBIDDEN.")
return False
return True
def get_alignment_report(self, tenant_id: int) -> List[Dict]:
"""Retrieve recent alignment checks for audit."""
return [l for l in self.alignment_log if l["tenant_id"] == tenant_id][-50:]
def _get_tenant_constraints(self, tenant_id: int) -> List[Dict]:
"""Load tenant-specific safety constraints."""
# Mocking for now, in a real scenario this queries the DB knowledge_base
return [
{"keyword": "shutdown", "message": "System shutdown is restricted to super-admins."}
]
def _log_alignment(self, tenant_id: int, result: Dict):
"""Record the alignment check in memory and audit log."""
log_entry = {
"tenant_id": tenant_id,
"timestamp": "2026-02-14T19:42:00", # Placeholder
**result
}
self.alignment_log.append(log_entry)
if self.db:
try:
self.db.audit(tenant_id, "alignment_check", json.dumps(result))
except Exception:
pass