Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import os
import platform
import datetime as dt

if platform.system() == 'Darwin':
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
Expand Down Expand Up @@ -82,6 +83,13 @@ class StatusResponse(BaseModel):
loaded_repositories: List[Dict[str, Any]] = Field(default_factory=list)


class SecurityEventIngestRequest(BaseModel):
"""Compatibility payload for OmniLore security-sentinel ingest calls."""

event: Dict[str, Any] = Field(default_factory=dict)
tenant_context: Optional[Dict[str, Any]] = Field(default_factory=dict)


# Initialize FastAPI app

@asynccontextmanager
Expand Down Expand Up @@ -112,6 +120,11 @@ async def lifespan(app: FastAPI):

# Global FastCode instance
fastcode_instance: Optional[FastCode] = None
security_event_buffer: list[Dict[str, Any]] = []
SECURITY_EVENT_BUFFER_LIMIT = max(
10,
int(os.getenv("FASTCODE_SECURITY_EVENT_BUFFER_LIMIT", "500")),
)
Comment on lines +124 to +127
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SECURITY_EVENT_BUFFER_LIMIT is computed with int(os.getenv(...)). If FASTCODE_SECURITY_EVENT_BUFFER_LIMIT is set to a non-integer value, this will raise ValueError at import time and prevent the API from starting. Consider parsing defensively (try/except with a fallback) and optionally clamping to a reasonable maximum to avoid accidental huge memory usage.

Suggested change
SECURITY_EVENT_BUFFER_LIMIT = max(
10,
int(os.getenv("FASTCODE_SECURITY_EVENT_BUFFER_LIMIT", "500")),
)
_raw_security_event_buffer_limit = os.getenv("FASTCODE_SECURITY_EVENT_BUFFER_LIMIT", "500")
try:
_parsed_security_event_buffer_limit = int(_raw_security_event_buffer_limit)
except (TypeError, ValueError):
_parsed_security_event_buffer_limit = 500
# Clamp to avoid accidental huge memory usage
SECURITY_EVENT_BUFFER_LIMIT = max(10, min(_parsed_security_event_buffer_limit, 10_000))

Copilot uses AI. Check for mistakes.

# Setup logging
log_dir = Path("./logs")
Expand Down Expand Up @@ -165,6 +178,53 @@ async def health_check():
"repo_loaded": fastcode_instance.repo_loaded,
"repo_indexed": fastcode_instance.repo_indexed,
"multi_repo_mode": fastcode_instance.multi_repo_mode,
"security_ingest_enabled": True,
"security_event_buffer_size": len(security_event_buffer),
}


@app.post("/ingest")
async def ingest_security_event(request: SecurityEventIngestRequest):
"""
Security Sentinel compatibility endpoint.

OmniLore white-label tooling posts security events here when configured with
OMNILORE_SECURITY_SENTINEL_URL=http://127.0.0.1:8001.
"""
fastcode = _ensure_fastcode_initialized()

event = request.event or {}
tenant_context = request.tenant_context or {}
record = {
"received_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"event": _safe_jsonable(event),
"tenant_context": _safe_jsonable(tenant_context),
}
security_event_buffer.append(record)
if len(security_event_buffer) > SECURITY_EVENT_BUFFER_LIMIT:
security_event_buffer.pop(0)
Comment on lines +203 to +205
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security_event_buffer uses a list and trims with pop(0), which is O(n) due to shifting elements. Since this is a bounded FIFO buffer, consider using collections.deque with maxlen (or popleft) to keep trimming O(1) and simplify the logic.

Copilot uses AI. Check for mistakes.

event_type = (
event.get("type")
or event.get("event")
or event.get("name")
or "unknown"
)
tenant_id = tenant_context.get("tenant_id", "unknown")
logger.warning(
"Security ingest accepted (compat): event_type=%s tenant_id=%s",
event_type,
tenant_id,
)
Comment on lines +214 to +218
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every /ingest call logs at WARNING level. If security events are high-volume, this can quickly flood logs and trigger alerting; consider logging at INFO/DEBUG, adding sampling/rate-limiting, or only warning on malformed/unexpected payloads.

Suggested change
logger.warning(
"Security ingest accepted (compat): event_type=%s tenant_id=%s",
event_type,
tenant_id,
)
log_message = "Security ingest accepted (compat): event_type=%s tenant_id=%s"
if event_type == "unknown" or tenant_id == "unknown":
logger.warning(log_message, event_type, tenant_id)
else:
logger.info(log_message, event_type, tenant_id)

Copilot uses AI. Check for mistakes.

return {
"status": "received",
"mode": "fastcode_compat",
"event_type": event_type,
"tenant_id": tenant_id,
"repo_loaded": fastcode.repo_loaded,
"repo_indexed": fastcode.repo_indexed,
"security_event_buffer_size": len(security_event_buffer),
}


Expand Down