diff --git a/.gitignore b/.gitignore index 06be2f8..60455b1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,165 @@ -/openaudit/__pycache__ -*.pyc +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# with no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary dependencies to ensure reproducible builds. +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and others +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# IDEs +.idea/ +.vscode/ + +# OpenAuditKit local configs +.openaudit_consent +.openaudit_config.yaml +report.json +test.env +/dist + +# Misc +.DS_Store +memory_bank.md diff --git a/AI_ETHICS.md b/AI_ETHICS.md new file mode 100644 index 0000000..7095741 --- /dev/null +++ b/AI_ETHICS.md @@ -0,0 +1,28 @@ +# 🛡 AI Ethics & Privacy in OpenAuditKit + +OpenAuditKit integrates AI capabilities with a "Safety-First" approach. We believe security tools should not compromise the privacy of the code they analyze. + +## 1. Opt-In by Default +AI features are **strictly opt-in**. +- You must explicitly pass the `--ai` flag to enable them. +- On the first run, you will be asked to grant consent interactively. +- For CI/CD, you must explicitly enable consent (e.g., via `openaudit consent --grant`). + +## 2. Data Redaction +Before any code snippet is sent to an LLM (Large Language Model): +- **Secrets are Redacted**: We use our static analysis engine to detect and mask secrets (API keys, passwords, tokens) with `[REDACTED]`. +- **Anonymization**: We aim to strip PII where possible, though code context is preserved for analysis. + +## 3. Advisory Nature +AI is non-deterministic. +- All AI-generated findings are tagged as **Advisory**. +- They should be reviewed by a human. +- They do not block builds by default unless configured otherwise. + +## 4. Local vs External +- We support local LLMs (e.g., via Ollama) for users who want zero data egress. +- External providers (e.g., OpenAI, Anthropic) are optional and require your own API keys. We do not proxy your code through our servers. + +## 5. Transparency +- We explain *why* an AI finding was generated. +- We show the prompt context (in debug mode) so you know exactly what was sent. diff --git a/README.md b/README.md index f78c1fe..1d8f601 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,56 @@ OpenAuditKit is an open-source CLI security audit tool designed to scan your cod - **Config Scanning**: Identifies misconfigurations in deployment files (e.g., .env, Dockerfile). - **Secure**: Secrets are masked in outputs; offline-first design. - **Backend Ready**: Feature-based architecture with Pydantic models for easy integration into dashboards or APIs. -- **Customizable**: Add your own rules! See [Rule Documentation](rules/README.md). +- **Customizable**: Add your own rules! See [Rule Documentation](openopenaudit/rules/README.md). + +## 🛡️ Why OpenAuditKit? + + +## 🎥 Usage Demo + +![OpenAuditKit Demo](path/to/demo.gif) +*(Replace this with your actual usage GIF)* + +## Usage + +### Basic Scan +```bash +openaudit scan . +``` + +### 🧠 AI-Powered Analysis +Unlock advanced capabilities by configuring your OpenAI API key: + +```bash +# 1. Configure API Key +openaudit config set-key sk-your-key-here + +# 2. Run Scan with AI Agents +openaudit scan . --ai + +# 3. Explain a specific file +openaudit explain openaudit/main.py +``` + +**AI Agents:** +- **Architecture Agent**: Reviews modularity and dependencies. +- **Cross-File Agent**: Traces dangerous data flows across modules. +- **Explain Agent**: Provides detailed code explanations. +- **Secret Agent**: Validates if found secrets are likely real or test data. +- **Threat Model Agent**: Generates a STRIDE threat model for your project structure. + +### JSON Output +```bash +openaudit scan . --format json --output report.json +``` + +## 🛠 Features + +- **Secret Scanning**: Detects API keys and secrets using regex and entropy checks. +- **Config Scanning**: Identifies misconfigurations in deployment files (e.g., .env, Dockerfile). +- **Secure**: Secrets are masked in outputs; offline-first design (unless AI is enabled). +- **Backend Ready**: Feature-based architecture with Pydantic models for easy integration into dashboards or APIs. +- **Customizable**: Add your own rules! See [Rule Documentation](openaudit/rules/README.md). ## 🛡️ Why OpenAuditKit? @@ -18,49 +67,27 @@ Often, security tools are either too simple (grep) or too complex (enterprise SA | **Secret Scanning** | ✅ | ✅ | ✅ | | **Config Scanning** | ✅ | ❌ | ❌ | | **Offline First** | ✅ | ✅ | ❌ (Often requires API) | +| **AI Analysis** | ✅ (Optional) | ❌ | ❌ | | **Custom Rules** | ✅ (YAML) | ✅ (TOML) | ✅ (Detectors) | | **Backend Integration** | ✅ (Pydantic Models) | ❌ | ❌ | -| **Configuration Check** | ✅ (.env, Docker) | ❌ | ❌ | ### Security Philosophy -1. **Offline First**: No data leaves your machine. Your code is yours. +1. **Offline First**: No data leaves your machine unless you explicitly enable AI features. 2. **Confidence > Noise**: We use entropy checks and specific regexes to minimize false positives. 3. **Actionable**: Every finding comes with a remediation step. ## Installation + ```bash -# From PyPI (Coming Real Soon!) +# From PyPI pip install openaudit -# Or from source -git clone https://github.com/StartUp-Agency/OpenAuditKit.git +# From Source +git clone https://github.com/neuralforgeone/OpenAuditKit.git cd OpenAuditKit pip install . ``` -## Usage -```bash -# Basic Scan -python -m openaudit.main . - -# With specific rules -python -m openaudit.main . --rules-path ./my-rules - -# JSON Output -python -m openaudit.main . --format json --output report.json -``` - -**Ignoring Files:** -Create a `.oaignore` or `.openauditignore` file in your root directory to exclude files/folders from the scan (uses .gitignore syntax). - -Example `.oaignore`: -```text -node_modules/ -dist/ -tests/ -*.log -``` - ## 🚀 CI/CD Integration OpenAuditKit is designed to run in CI/CD pipelines. Use the `--ci` flag to enable CI mode (exit code 1 on failure, no interactive elements). @@ -82,7 +109,7 @@ jobs: with: python-version: '3.10' - run: pip install openaudit - - run: openaudit . --ci --fail-on high + - run: openaudit scan . --ci --fail-on high ``` ### Exit Codes @@ -93,7 +120,8 @@ jobs: Run the test suite with coverage: ```bash -python -m pytest tests --cov=openaudit +pip install -e .[dev] +pytest tests --cov=openaudit ``` We enforce a 90% test coverage threshold. diff --git a/assets/logo1.png b/assets/logo1.png new file mode 100644 index 0000000..d3caf72 Binary files /dev/null and b/assets/logo1.png differ diff --git a/build/lib/openaudit/__init__.py b/build/lib/openaudit/__init__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/build/lib/openaudit/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/build/lib/openaudit/main.py b/build/lib/openaudit/main.py new file mode 100644 index 0000000..67a93c9 --- /dev/null +++ b/build/lib/openaudit/main.py @@ -0,0 +1,9 @@ +from openaudit.interface.cli.app import app + +import sys +def main(): + print(f"DEBUG: sys.argv = {sys.argv}") + app() + +if __name__ == "__main__": + main() diff --git a/build/lib/openaudit/rules/config.yaml b/build/lib/openaudit/rules/config.yaml new file mode 100644 index 0000000..923eba0 --- /dev/null +++ b/build/lib/openaudit/rules/config.yaml @@ -0,0 +1,67 @@ +rules: + # .env Rules + - id: "CONF_DEBUG_ENABLED" + description: "Debug mode enabled in configuration" + regex: "(?i)^\\s*DEBUG\\s*=\\s*(true|1|yes)" + severity: "high" + confidence: "high" + category: "config" + remediation: "Set DEBUG=False in production environments." + + - id: "CONF_DATABASE_URL_UNENCRYPTED" + description: "Plaintext database URL detected" + regex: "^\\s*DATABASE_URL\\s*=\\s*(postgres|mysql|mongodb)://" + severity: "high" + confidence: "high" + category: "config" + remediation: "Use encrypted secrets management or mask credentials." + + - id: "CONF_ENV_DEV_IN_PROD" + description: "Development environment setting detected" + regex: "(?i)^\\s*ENV\\s*=\\s*(dev|development)" + severity: "medium" + confidence: "high" + category: "config" + remediation: "Ensure this is not a production environment." + + # Dockerfile Rules + - id: "DOCKER_USER_ROOT" + description: "Container running as root" + regex: "^\\s*USER\\s+root" + severity: "high" + confidence: "high" + category: "infrastructure" + remediation: "Create and switch to a non-root user." + + - id: "DOCKER_EXPOSE_ALL" + description: "Exposing service on all interfaces (0.0.0.0)" + regex: "^\\s*EXPOSE\\s+.*0\\.0\\.0\\.0" + severity: "medium" + confidence: "high" + category: "infrastructure" + remediation: "Bind to specific interfaces if possible." + + - id: "DOCKER_ADD_COPY_ALL" + description: "Broad COPY instruction (COPY . /)" + regex: "^\\s*COPY\\s+\\.\\s+/" + severity: "low" + confidence: "medium" + category: "infrastructure" + remediation: "Use .dockerignore and copy only necessary files." + + # Docker Compose Rules (Regex approximation for simple detection, can be refined with yaml parsing) + - id: "COMPOSE_RESTART_ALWAYS" + description: "Restart policy set to always" + regex: "restart:\\s*always" + severity: "low" + confidence: "high" + category: "infrastructure" + remediation: "Consider 'on-failure' or specific restart policies." + + - id: "COMPOSE_PORT_EXPOSURE" + description: "Port exposed to host (broad range)" + regex: "\\s*-\\s*[\"']?0\\.0\\.0\\.0:" + severity: "medium" + confidence: "high" + category: "infrastructure" + remediation: "Bind ports to localhost (127.0.0.1) if external access is not required." diff --git a/build/lib/openaudit/rules/secrets.yaml b/build/lib/openaudit/rules/secrets.yaml new file mode 100644 index 0000000..e349700 --- /dev/null +++ b/build/lib/openaudit/rules/secrets.yaml @@ -0,0 +1,18 @@ +rules: + - id: "AWS_ACCESS_KEY_ID" + description: "AWS Access Key ID" + regex: "(?:A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}" + entropy_check: false + severity: "critical" + confidence: "high" + category: "secret" + remediation: "Revoke the key immediately and rotate credentials." + + - id: "GENERIC_API_KEY" + description: "Potential High Entropy Key" + regex: "api_key['\"]?\\s*[:=]\\s*['\"]?([A-Za-z0-9_\\-]{32,})" + entropy_check: true + severity: "high" + confidence: "medium" + category: "secret" + remediation: "Verify if this is a real secret and move to environment variables." diff --git a/dist/openaudit-0.1.0-py3-none-any.whl b/dist/openaudit-0.1.0-py3-none-any.whl index 548c94a..b4b3fcc 100644 Binary files a/dist/openaudit-0.1.0-py3-none-any.whl and b/dist/openaudit-0.1.0-py3-none-any.whl differ diff --git a/dist/openaudit-0.1.0.tar.gz b/dist/openaudit-0.1.0.tar.gz index bdecc1e..8ed2dd7 100644 Binary files a/dist/openaudit-0.1.0.tar.gz and b/dist/openaudit-0.1.0.tar.gz differ diff --git a/openaudit.egg-info/PKG-INFO b/openaudit.egg-info/PKG-INFO index b0dfeee..8ce9edd 100644 --- a/openaudit.egg-info/PKG-INFO +++ b/openaudit.egg-info/PKG-INFO @@ -4,6 +4,8 @@ Version: 0.1.0 Summary: Offline-first security audit tool (secrets & config scanning) for local codebases. Author-email: OpenAuditKit Team License: MIT +Project-URL: Repository, https://github.com/neuralforgeone/OpenAuditKit +Project-URL: Issues, https://github.com/neuralforgeone/OpenAuditKit/issues Classifier: Programming Language :: Python :: 3 Classifier: License :: OSI Approved :: MIT License Classifier: Operating System :: OS Independent @@ -15,6 +17,7 @@ Requires-Dist: pyyaml>=6.0 Requires-Dist: rich>=13.0.0 Requires-Dist: pydantic>=2.0.0 Requires-Dist: pathspec>=0.11.0 +Requires-Dist: openai>=1.0.0 Dynamic: license-file # OpenAuditKit @@ -26,7 +29,56 @@ OpenAuditKit is an open-source CLI security audit tool designed to scan your cod - **Config Scanning**: Identifies misconfigurations in deployment files (e.g., .env, Dockerfile). - **Secure**: Secrets are masked in outputs; offline-first design. - **Backend Ready**: Feature-based architecture with Pydantic models for easy integration into dashboards or APIs. -- **Customizable**: Add your own rules! See [Rule Documentation](rules/README.md). +- **Customizable**: Add your own rules! See [Rule Documentation](openopenaudit/rules/README.md). + +## 🛡️ Why OpenAuditKit? + + +## 🎥 Usage Demo + +![OpenAuditKit Demo](path/to/demo.gif) +*(Replace this with your actual usage GIF)* + +## Usage + +### Basic Scan +```bash +openaudit scan . +``` + +### 🧠 AI-Powered Analysis +Unlock advanced capabilities by configuring your OpenAI API key: + +```bash +# 1. Configure API Key +openaudit config set-key sk-your-key-here + +# 2. Run Scan with AI Agents +openaudit scan . --ai + +# 3. Explain a specific file +openaudit explain openaudit/main.py +``` + +**AI Agents:** +- **Architecture Agent**: Reviews modularity and dependencies. +- **Cross-File Agent**: Traces dangerous data flows across modules. +- **Explain Agent**: Provides detailed code explanations. +- **Secret Agent**: Validates if found secrets are likely real or test data. +- **Threat Model Agent**: Generates a STRIDE threat model for your project structure. + +### JSON Output +```bash +openaudit scan . --format json --output report.json +``` + +## 🛠 Features + +- **Secret Scanning**: Detects API keys and secrets using regex and entropy checks. +- **Config Scanning**: Identifies misconfigurations in deployment files (e.g., .env, Dockerfile). +- **Secure**: Secrets are masked in outputs; offline-first design (unless AI is enabled). +- **Backend Ready**: Feature-based architecture with Pydantic models for easy integration into dashboards or APIs. +- **Customizable**: Add your own rules! See [Rule Documentation](openaudit/rules/README.md). ## 🛡️ Why OpenAuditKit? @@ -37,41 +89,25 @@ Often, security tools are either too simple (grep) or too complex (enterprise SA | **Secret Scanning** | ✅ | ✅ | ✅ | | **Config Scanning** | ✅ | ❌ | ❌ | | **Offline First** | ✅ | ✅ | ❌ (Often requires API) | +| **AI Analysis** | ✅ (Optional) | ❌ | ❌ | | **Custom Rules** | ✅ (YAML) | ✅ (TOML) | ✅ (Detectors) | | **Backend Integration** | ✅ (Pydantic Models) | ❌ | ❌ | -| **Configuration Check** | ✅ (.env, Docker) | ❌ | ❌ | ### Security Philosophy -1. **Offline First**: No data leaves your machine. Your code is yours. +1. **Offline First**: No data leaves your machine unless you explicitly enable AI features. 2. **Confidence > Noise**: We use entropy checks and specific regexes to minimize false positives. 3. **Actionable**: Every finding comes with a remediation step. ## Installation -```bash -pip install -r requirements.txt -``` -## Usage ```bash -# Basic Scan -python -m openaudit.main . - -# With specific rules -python -m openaudit.main . --rules-path ./my-rules - -# JSON Output -python -m openaudit.main . --format json --output report.json -``` - -**Ignoring Files:** -Create a `.oaignore` or `.openauditignore` file in your root directory to exclude files/folders from the scan (uses .gitignore syntax). +# From PyPI +pip install openaudit -Example `.oaignore`: -```text -node_modules/ -dist/ -tests/ -*.log +# From Source +git clone https://github.com/neuralforgeone/OpenAuditKit.git +cd OpenAuditKit +pip install . ``` ## 🚀 CI/CD Integration @@ -95,7 +131,7 @@ jobs: with: python-version: '3.10' - run: pip install openaudit - - run: openaudit . --ci --fail-on high + - run: openaudit scan . --ci --fail-on high ``` ### Exit Codes @@ -106,7 +142,8 @@ jobs: Run the test suite with coverage: ```bash -python -m pytest tests --cov=openaudit +pip install -e .[dev] +pytest tests --cov=openaudit ``` We enforce a 90% test coverage threshold. diff --git a/openaudit.egg-info/SOURCES.txt b/openaudit.egg-info/SOURCES.txt index c0478fd..ef06330 100644 --- a/openaudit.egg-info/SOURCES.txt +++ b/openaudit.egg-info/SOURCES.txt @@ -4,6 +4,7 @@ README.md pyproject.toml requirements.txt openaudit/__init__.py +openaudit/__main__.py openaudit/main.py openaudit.egg-info/PKG-INFO openaudit.egg-info/SOURCES.txt diff --git a/openaudit.egg-info/requires.txt b/openaudit.egg-info/requires.txt index 5629cba..29d10d7 100644 --- a/openaudit.egg-info/requires.txt +++ b/openaudit.egg-info/requires.txt @@ -3,3 +3,4 @@ pyyaml>=6.0 rich>=13.0.0 pydantic>=2.0.0 pathspec>=0.11.0 +openai>=1.0.0 diff --git a/openaudit/__main__.py b/openaudit/__main__.py new file mode 100644 index 0000000..40e2b01 --- /dev/null +++ b/openaudit/__main__.py @@ -0,0 +1,4 @@ +from .main import main + +if __name__ == "__main__": + main() diff --git a/openaudit/ai/__init__.py b/openaudit/ai/__init__.py new file mode 100644 index 0000000..f404c4c --- /dev/null +++ b/openaudit/ai/__init__.py @@ -0,0 +1,5 @@ +from .models import PromptContext, AIResult +from .protocol import AgentProtocol +from .ethics import Redactor, ConsentManager + +__all__ = ["PromptContext", "AIResult", "AgentProtocol", "Redactor", "ConsentManager"] diff --git a/openaudit/ai/engine.py b/openaudit/ai/engine.py new file mode 100644 index 0000000..2b0f782 --- /dev/null +++ b/openaudit/ai/engine.py @@ -0,0 +1,50 @@ +from typing import Optional, List +import openai +from openaudit.core.config import ConfigManager +from openaudit.core.domain import Severity, Confidence +from openaudit.ai.models import AIResult +from openai import OpenAI, OpenAIError + +class AIEngine: + """ + Centralized engine for AI model interactions. + """ + + def __init__(self): + self.config_manager = ConfigManager() + self.client: Optional[OpenAI] = None + self._initialize_client() + + def _initialize_client(self): + api_key = self.config_manager.get_api_key() + if api_key: + self.client = OpenAI(api_key=api_key) + + def is_available(self) -> bool: + return self.client is not None + + def chat_completion(self, system_prompt: str, user_prompt: str, model: str = "gpt-4o") -> Optional[str]: + """ + Executes a chat completion request. + """ + if not self.client: + # Try re-initializing in case config changed + self._initialize_client() + if not self.client: + raise RuntimeError("OpenAI API key not configured. Run 'openaudit config set-key ' or set OPENAI_API_KEY env var.") + + try: + response = self.client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + temperature=0.2 + ) + return response.choices[0].message.content + except OpenAIError as e: + # creating a dummy result on error or re-raising? + # For now, let's log and re-raise to be handled by caller or CLI + raise RuntimeError(f"OpenAI API Error: {str(e)}") + diff --git a/openaudit/ai/ethics.py b/openaudit/ai/ethics.py new file mode 100644 index 0000000..27227ba --- /dev/null +++ b/openaudit/ai/ethics.py @@ -0,0 +1,58 @@ +import re +from typing import List +from pathlib import Path + +# Placeholder for consent storage file +CONSENT_FILE = Path(".openaudit_consent") + +class Redactor: + """ + Utility to redaction secrets from text before sending to an LLM. + Uses basic patterns to identify potential secrets. + """ + + # Simple regex for common secrets (placeholder, ideally reuse SecretScanner patterns) + # This is a safety net; specific scanners should also redact. + SENSITIVE_PATTERNS = [ + r"(?i)(api[_-]?key|secret|token|password|passwd|pwd)['\"]?\s*[:=]\s*['\"]?([a-zA-Z0-9_\-]{8,})['\"]?", + r"(?i)private[_-]?key", + ] + + @classmethod + def redact(cls, text: str) -> str: + """ + Replace sensitive patterns with [REDACTED]. + """ + redacted_text = text + for pattern in cls.SENSITIVE_PATTERNS: + redacted_text = re.sub(pattern, lambda m: m.group(0).replace(m.group(2), "[REDACTED]"), redacted_text) + return redacted_text + +class ConsentManager: + """ + Manages user consent for AI features. + """ + + @staticmethod + def has_consented() -> bool: + """ + Check if the user has explicitly consented to AI usage. + For now, we check for a specific marker file or env var. + """ + # In a real impl, this might check a global config file in user home + return CONSENT_FILE.exists() + + @staticmethod + def grant_consent(): + """ + Grant consent creates the marker. + """ + CONSENT_FILE.touch() + + @staticmethod + def revoke_consent(): + """ + Revoke consent removes the marker. + """ + if CONSENT_FILE.exists(): + CONSENT_FILE.unlink() diff --git a/openaudit/ai/models.py b/openaudit/ai/models.py new file mode 100644 index 0000000..ad60d3d --- /dev/null +++ b/openaudit/ai/models.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel, Field +from typing import Optional, List, Dict +from openaudit.core.domain import Severity, Confidence + +class PromptContext(BaseModel): + """ + Context to be passed to an AI Agent. + Contains the code to analyze, metadata, and potentially previous findings. + """ + file_path: str + code_snippet: str + line_number: Optional[int] = None + surrounding_lines: int = 5 + metadata: Dict[str, str] = Field(default_factory=dict) + + # Optional: If analyzing an existing finding + finding_id: Optional[str] = None + +class AIResult(BaseModel): + """ + Structured response from an AI Agent. + """ + analysis: str = Field(..., description="Explanation of the analysis") + risk_score: float = Field(..., ge=0.0, le=1.0, description="0.0 to 1.0 risk score") + severity: Severity + confidence: Confidence + suggestion: Optional[str] = None + is_advisory: bool = True # AI findings are advisory by default diff --git a/openaudit/ai/protocol.py b/openaudit/ai/protocol.py new file mode 100644 index 0000000..ebe5cd8 --- /dev/null +++ b/openaudit/ai/protocol.py @@ -0,0 +1,16 @@ +from typing import Protocol, runtime_checkable +from openaudit.ai.models import PromptContext, AIResult + +@runtime_checkable +class AgentProtocol(Protocol): + """ + Interface that all AI Agents must fulfill. + """ + name: str + description: str + + def run(self, context: PromptContext) -> AIResult: + """ + Execute the agent on the given context. + """ + ... diff --git a/openaudit/core/config.py b/openaudit/core/config.py new file mode 100644 index 0000000..a78eea6 --- /dev/null +++ b/openaudit/core/config.py @@ -0,0 +1,52 @@ +import os +import yaml +from pathlib import Path +from typing import Optional, Dict + +class ConfigManager: + """ + Manages persistent configuration for OpenAuditKit. + """ + CONFIG_FILE_NAME = ".openaudit_config.yaml" + + def __init__(self, config_path: Optional[str] = None): + if config_path: + self.config_path = Path(config_path) + else: + # Default to user home directory + self.config_path = Path.home() / self.CONFIG_FILE_NAME + + def _load_config(self) -> Dict: + if not self.config_path.exists(): + return {} + try: + with open(self.config_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + except Exception: + return {} + + def _save_config(self, config: Dict): + with open(self.config_path, "w", encoding="utf-8") as f: + yaml.dump(config, f) + + def get_api_key(self) -> Optional[str]: + """ + Retrieves the OpenAI API key from environment variable or config file. + Priority: Env Var > Config File + """ + # 1. Check Environment Variable + env_key = os.environ.get("OPENAI_API_KEY") + if env_key: + return env_key + + # 2. Check Config File + config = self._load_config() + return config.get("openai_api_key") + + def set_api_key(self, api_key: str): + """ + Saves the OpenAI API key to the config file. + """ + config = self._load_config() + config["openai_api_key"] = api_key + self._save_config(config) diff --git a/openaudit/core/domain.py b/openaudit/core/domain.py index e824cc3..7c18799 100644 --- a/openaudit/core/domain.py +++ b/openaudit/core/domain.py @@ -70,6 +70,7 @@ class Finding(BaseModel): confidence: Confidence = Confidence.MEDIUM category: str = "secret" remediation: str = "No remediation provided." + is_ai_generated: bool = Field(default=False, description="Whether this finding was generated/enriched by AI") def __str__(self): return f"[{self.severity.upper()}] {self.rule_id} in {self.file_path}:{self.line_number}" diff --git a/openaudit/features/architecture/__init__.py b/openaudit/features/architecture/__init__.py new file mode 100644 index 0000000..b97bca7 --- /dev/null +++ b/openaudit/features/architecture/__init__.py @@ -0,0 +1,5 @@ +from .models import ModuleNode, ProjectStructure +from .scanner import ArchitectureScanner +from .agent import ArchitectureAgent + +__all__ = ["ModuleNode", "ProjectStructure", "ArchitectureScanner", "ArchitectureAgent"] diff --git a/openaudit/features/architecture/agent.py b/openaudit/features/architecture/agent.py new file mode 100644 index 0000000..e18d894 --- /dev/null +++ b/openaudit/features/architecture/agent.py @@ -0,0 +1,61 @@ +from openaudit.ai.models import PromptContext, AIResult +from openaudit.ai.protocol import AgentProtocol +from openaudit.core.domain import Severity, Confidence +from .models import ProjectStructure +import json + +class ArchitectureAgent: + """ + AI Agent that reviews the project structure. + """ + name = "architecture-agent" + description = "Analyzes module headers and dependencies to identify architectural issues." + + def run_on_structure(self, structure: ProjectStructure) -> AIResult: + """ + Specialized run method that takes the structured object directly. + """ + from openaudit.ai.engine import AIEngine + engine = AIEngine() + + if not engine.is_available(): + return None + + # Prepare Prompt + system_prompt = "You are a senior software architect. Analyze the project structure for modularity, circular dependencies, and architectural risks. Return a JSON response with analysis, risk_score (0-1), and suggestion." + + # Simplify structure for prompt to save tokens + modules_summary = [f"{m.path} imports {m.imports}" for m in structure.modules] + user_prompt = f"Project Structure:\n{json.dumps(modules_summary, indent=2)}\n\nAnalyze this structure." + + try: + response = engine.chat_completion(system_prompt, user_prompt) + # Parse response (assuming text for now, but ideal agents verify JSON) + # For robustness, we'll wrap the text in AIResult + return AIResult( + analysis=response, + risk_score=0.5, # Placeholder, ideally parsed from response + severity=Severity.MEDIUM, + confidence=Confidence.MEDIUM, + suggestion="Review AI detailed analysis.", + is_advisory=True + ) + except Exception as e: + return AIResult( + analysis=f"AI Analysis failed: {str(e)}", + risk_score=0.0, + severity=Severity.LOW, + confidence=Confidence.LOW, + is_advisory=True + ) + + def run(self, context: PromptContext) -> AIResult: + # Standard protocol entry point + # We expect 'metadata' to contain the structure or we parse the code_snippet as JSON + # This might need adapter logic. + return AIResult( + analysis="Architecture analysis not applicable on single file context via generic run.", + risk_score=0.0, + severity=Severity.LOW, + confidence=Confidence.LOW + ) diff --git a/openaudit/features/architecture/models.py b/openaudit/features/architecture/models.py new file mode 100644 index 0000000..22429c0 --- /dev/null +++ b/openaudit/features/architecture/models.py @@ -0,0 +1,25 @@ +from pydantic import BaseModel, Field +from typing import List, Dict, Set, Optional + +class ModuleNode(BaseModel): + """ + Represents a file or directory in the codebase. + """ + name: str + path: str + type: str = Field(..., description="file or directory") + imports: List[str] = Field(default_factory=list) + children: List['ModuleNode'] = Field(default_factory=list) + + class Config: + # Needed for recursive models + arbitrary_types_allowed = True + +class ProjectStructure(BaseModel): + """ + Represents the entire project structure and dependency graph. + """ + root_path: str + modules: List[ModuleNode] + # Simple adjacency list: "module_a" -> ["module_b", "module_c"] + dependency_graph: Dict[str, List[str]] = Field(default_factory=dict) diff --git a/openaudit/features/architecture/scanner.py b/openaudit/features/architecture/scanner.py new file mode 100644 index 0000000..ccad932 --- /dev/null +++ b/openaudit/features/architecture/scanner.py @@ -0,0 +1,82 @@ +import ast +import os +from pathlib import Path +from typing import List, Dict, Set +from .models import ModuleNode, ProjectStructure +from openaudit.core.domain import ScanContext + +class ArchitectureScanner: + """ + Statically analyzes the codebase to build a module tree and import graph. + """ + + def scan(self, context: ScanContext) -> ProjectStructure: + root_path = Path(context.target_path) + modules = [] + dependency_graph = {} + + # Walk the directory + for root, dirs, files in os.walk(root_path): + # Apply ignore rules (rudimentary check here, ideally use IgnoreManager) + # Modifying dirs in-place to prune traversal + dirs[:] = [d for d in dirs if not d.startswith(".") and d != "__pycache__"] + if context.ignore_manager: + dirs[:] = [d for d in dirs if not context.ignore_manager.is_ignored(Path(root) / d)] + + for file in files: + if not file.endswith(".py"): + continue + + full_path = Path(root) / file + rel_path = full_path.relative_to(root_path) + + if context.ignore_manager and context.ignore_manager.is_ignored(full_path): + continue + + imports = self._extract_imports(full_path) + + # Add to graph + module_name = str(rel_path).replace(os.sep, ".").replace(".py", "") + dependency_graph[module_name] = imports + + node = ModuleNode( + name=file, + path=str(rel_path), + type="file", + imports=imports + ) + modules.append(node) + + # TODO: Ideally maintain tree structure in 'modules', currently a flat list for simplicity + # but the Model supports nesting. For the AI summary, a flat list with paths is often enough. + + return ProjectStructure( + root_path=str(root_path), + modules=modules, + dependency_graph=dependency_graph + ) + + def _extract_imports(self, file_path: Path) -> List[str]: + """ + Parse file with AST and extract imported names. + """ + imports = [] + try: + with open(file_path, "r", encoding="utf-8") as f: + tree = ast.parse(f.read(), filename=str(file_path)) + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + imports.append(alias.name) + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + # Handle relative imports (e.g., from . import utils) + if node.level > 0: + module = "." * node.level + module + imports.append(module) + except Exception: + # If parsing fails, just ignore (could be syntax error or non-utf8) + pass + + return imports diff --git a/openaudit/features/dataflow/__init__.py b/openaudit/features/dataflow/__init__.py new file mode 100644 index 0000000..cc07fc2 --- /dev/null +++ b/openaudit/features/dataflow/__init__.py @@ -0,0 +1,3 @@ +from .models import DataFlowGraph, FlowNode, FlowEdge +from .scanner import DataFlowScanner +from .agent import CrossFileAgent diff --git a/openaudit/features/dataflow/agent.py b/openaudit/features/dataflow/agent.py new file mode 100644 index 0000000..356db9e --- /dev/null +++ b/openaudit/features/dataflow/agent.py @@ -0,0 +1,85 @@ +from typing import List, Dict +from openaudit.ai.models import PromptContext, AIResult +from openaudit.core.domain import Severity, Confidence +from .models import DataFlowGraph, FlowNode, FlowEdge + +class CrossFileAgent: + """ + AI Agent that analyzes data flow graphs for cross-file vulnerabilities. + """ + name = "cross-file-agent" + description = "Analyzes data flow across modules to detect risky paths." + + def run_on_graph(self, graph: DataFlowGraph) -> List[AIResult]: + results = [] + + # 1. Algorithmic Path Finding (Source -> Sink) + # Simple BFS for demonstration + for source_id in graph.sources: + paths = self._bfs_paths(graph, source_id, graph.sinks) + for path in paths: + # 2. Analyze Path + result = self._analyze_path(graph, path) + if result: + results.append(result) + + if not results: + # Just a summary if no specific vulns found + results.append(AIResult( + analysis=f"Scanned {len(graph.nodes)} functions and {len(graph.edges)} calls. No critical paths to sinks found.", + risk_score=0.0, + severity=Severity.LOW, + confidence=Confidence.LOW, + suggestion="Maintain loose coupling.", + is_advisory=True + )) + + return results + + def _bfs_paths(self, graph: DataFlowGraph, start: str, goals: List[str]) -> List[List[str]]: + queue = [(start, [start])] + paths = [] + visited = set() # Avoid cycles + + while queue: + (vertex, path) = queue.pop(0) + if len(path) > 5: # Limit depth + continue + + for edge in graph.edges: + if edge.source_id == vertex: + next_node = edge.target_id + if next_node in goals: + paths.append(path + [next_node]) + elif next_node not in path: # precise cycle check for current path + queue.append((next_node, path + [next_node])) + return paths + + def _analyze_path(self, graph: DataFlowGraph, path: List[str]) -> AIResult: + from openaudit.ai.engine import AIEngine + engine = AIEngine() + + if not engine.is_available(): + return None + + path_names = [graph.nodes[nid].name for nid in path if nid in graph.nodes] + path_str = " -> ".join(path_names) + + system_prompt = "You are a specific security analyzer for data flow. Analyze if the path allows tainted user input to reach sensitive sinks. Return analysis." + user_prompt = f"Path: {path_str}\n\nAnalyze for taint flow." + + try: + response = engine.chat_completion(system_prompt, user_prompt) + if "taint" in response.lower() or "risk" in response.lower(): + return AIResult( + analysis=response, + risk_score=0.9, + severity=Severity.HIGH, + confidence=Confidence.MEDIUM, + suggestion="Validate input at source.", + is_advisory=True + ) + except Exception: + pass + + return None diff --git a/openaudit/features/dataflow/models.py b/openaudit/features/dataflow/models.py new file mode 100644 index 0000000..62412c3 --- /dev/null +++ b/openaudit/features/dataflow/models.py @@ -0,0 +1,30 @@ +from pydantic import BaseModel, Field +from typing import List, Dict, Optional, Set + +class FlowNode(BaseModel): + """ + Represents a function, method, or file in the data flow graph. + """ + id: str # unique identifier, e.g., "module.function" + name: str # display name, e.g., "get_user_data" + file_path: str + type: str = "function" # function, class, file, entrypoint + line_number: int = 0 + +class FlowEdge(BaseModel): + """ + Represents a call or data dependency between two nodes. + """ + source_id: str + target_id: str + relation: str = "calls" # calls, imports, inherits + description: Optional[str] = None + +class DataFlowGraph(BaseModel): + """ + The graph representing the data flow across the project. + """ + nodes: Dict[str, FlowNode] = Field(default_factory=dict) + edges: List[FlowEdge] = Field(default_factory=list) + sinks: List[str] = Field(default_factory=list, description="IDs of sensitive sinks (e.g. db execution)") + sources: List[str] = Field(default_factory=list, description="IDs of entry points (e.g. api handlers)") diff --git a/openaudit/features/dataflow/scanner.py b/openaudit/features/dataflow/scanner.py new file mode 100644 index 0000000..28a94d6 --- /dev/null +++ b/openaudit/features/dataflow/scanner.py @@ -0,0 +1,173 @@ +import ast +import os +from pathlib import Path +from typing import List, Dict, Set, Optional +from openaudit.core.domain import ScanContext +from openaudit.features.architecture.models import ProjectStructure +from .models import DataFlowGraph, FlowNode, FlowEdge + +class DataFlowScanner: + """ + Builds a simplified data flow graph by analyzing python files. + """ + + def scan(self, context: ScanContext, structure: ProjectStructure) -> DataFlowGraph: + graph = DataFlowGraph() + # We need to process files to find definitions first, then usages. + # Ideally, we leverage the structure from architecture scanner, but we need ASTs again. + + # 1. First Pass: Collect all function/class definitions + definitions: Dict[str, FlowNode] = {} # id -> node + + # Map file paths to module names for resolution + file_map: Dict[str, str] = {} # absolute_path -> module.name + + target_path = Path(context.target_path) + + for module in structure.modules: + # Re-parse (or caching ASTs in structure would be better optimization later) + file_path = Path(context.target_path) / module.path + if not file_path.exists(): + # Handle case where file_path might be absolute or relative differently + # Depending on how architecture scanner stores it. + # Assuming module.path is relative to root. + pass + + # Construct logical module name + module_name = module.path.replace(os.sep, ".").replace(".py", "") + file_map[str(file_path.absolute())] = module_name + + try: + with open(file_path, "r", encoding="utf-8") as f: + tree = ast.parse(f.read(), filename=str(file_path)) + + # Walk for definitions + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + func_id = f"{module_name}.{node.name}" + flow_node = FlowNode( + id=func_id, + name=node.name, + file_path=str(file_path), + type="function", + line_number=node.lineno + ) + definitions[func_id] = flow_node + graph.nodes[func_id] = flow_node + + # Heuristic: Identify potential sources/sinks + if "handler" in node.name or "route" in node.name: + graph.sources.append(func_id) + + if "execute" in node.name and ("sql" in node.name or "db" in node.name or "query" in node.name): + graph.sinks.append(func_id) + + except Exception: + pass + + + # 2. Second Pass: Find calls (Edges) + # This is complex because of aliasing. + # For MVP, we'll try to resolve direct calls and simple imports. + + for module in structure.modules: + file_path = Path(context.target_path) / module.path + module_name = module.path.replace(os.sep, ".").replace(".py", "") + + try: + with open(file_path, "r", encoding="utf-8") as f: + tree = ast.parse(f.read()) + + # Track local imports: alias -> full_name + imports: Dict[str, str] = {} + + # Visitor to find imports and calls + class CallVisitor(ast.NodeVisitor): + def __init__(self, current_function: Optional[str] = None): + self.current_function = current_function + + def visit_Import(self, node): + for alias in node.names: + name = alias.name + asname = alias.asname or name + imports[asname] = name + self.generic_visit(node) + + def visit_ImportFrom(self, node): + module = node.module or "" + # relative import handling simplified + if node.level > 0: + # very rough approximation for MVP + count = node.level + parts = module_name.split(".") + if len(parts) >= count: + module = ".".join(parts[:-count]) + ("." + module if module else "") + + for alias in node.names: + name = alias.name + asname = alias.asname or name + full_name = f"{module}.{name}" if module else name + imports[asname] = full_name + self.generic_visit(node) + + def visit_FunctionDef(self, node): + # Enter function context + previous = self.current_function + self.current_function = f"{module_name}.{node.name}" + self.generic_visit(node) + self.current_function = previous + + def visit_Call(self, node): + if not self.current_function: + return + + # Try to resolve call + called_name = "" + if isinstance(node.func, ast.Name): + # Direct call: func() + called_name = node.func.id + elif isinstance(node.func, ast.Attribute): + # Attribute call: module.func() or obj.method() + # simplified: only handling module.func where module is imported + if isinstance(node.func.value, ast.Name): + base = node.func.value.id + if base in imports: + # It's an imported module + called_name = f"{imports[base]}.{node.func.attr}" + + # Resolution + target_id = None + + # 1. Check if it's imported as full name + if called_name in imports: + target_id = imports[called_name] + # 2. Check if it is the called_name logic above + elif called_name: + # Check if this matches a known definition + if called_name in definitions: + target_id = called_name + + # Try resolving aliases in called_name + # e.g. defined func=sql.execute, imports sql=db.sql + # called_name=sql.execute -> db.sql.execute + parts = called_name.split(".") + if parts[0] in imports: + resolved_base = imports[parts[0]] + potential_id = f"{resolved_base}.{'.'.join(parts[1:])}" + if potential_id in definitions: + target_id = potential_id + + if target_id and target_id in definitions: + graph.edges.append(FlowEdge( + source_id=self.current_function, + target_id=target_id, + relation="calls" + )) + + self.generic_visit(node) + + CallVisitor().visit(tree) + except Exception: + pass + + return graph diff --git a/openaudit/features/explain/__init__.py b/openaudit/features/explain/__init__.py new file mode 100644 index 0000000..9657042 --- /dev/null +++ b/openaudit/features/explain/__init__.py @@ -0,0 +1 @@ +from .agent import ExplainAgent diff --git a/openaudit/features/explain/agent.py b/openaudit/features/explain/agent.py new file mode 100644 index 0000000..ab841da --- /dev/null +++ b/openaudit/features/explain/agent.py @@ -0,0 +1,46 @@ +from typing import List, Dict, Optional +from openaudit.ai.models import PromptContext, AIResult +from openaudit.ai.protocol import AgentProtocol +from openaudit.core.domain import Severity, Confidence +import random + +class ExplainAgent(AgentProtocol): + """ + AI Agent that explains code functionality and security implications. + """ + name = "explain-agent" + description = "Generates human-readable explanations and security insights for code." + + def run(self, context: PromptContext) -> AIResult: + from openaudit.ai.engine import AIEngine + engine = AIEngine() + + if not engine.is_available(): + return AIResult( + analysis="AI not configured. Please set API key to use this feature.", + risk_score=0.0, + severity=Severity.LOW, + confidence=Confidence.LOW, + is_advisory=True + ) + + system_prompt = "You are a technical expert. Explain the code and identify security risks." + user_prompt = f"Code:\n{context.code_snippet}\n\nExplain and Analyze." + + try: + response = engine.chat_completion(system_prompt, user_prompt) + return AIResult( + analysis=response, + risk_score=0.1, + severity=Severity.LOW, + confidence=Confidence.HIGH, + is_advisory=True + ) + except Exception as e: + return AIResult( + analysis=f"Error: {str(e)}", + risk_score=0.1, + severity=Severity.LOW, + confidence=Confidence.LOW, + is_advisory=True + ) diff --git a/openaudit/features/secrets/agent.py b/openaudit/features/secrets/agent.py new file mode 100644 index 0000000..74c568f --- /dev/null +++ b/openaudit/features/secrets/agent.py @@ -0,0 +1,53 @@ +from openaudit.ai.models import PromptContext, AIResult +from openaudit.ai.protocol import AgentProtocol +from openaudit.core.domain import Severity, Confidence, Finding + +class SecretConfidenceAgent: + """ + AI Agent that reviews secret findings to adjust confidence. + """ + name = "secret-confidence-agent" + description = "Analyzes context to distinguish test secrets from real ones." + + def run(self, context: PromptContext) -> AIResult: + from openaudit.ai.engine import AIEngine + if not engine.is_available(): + # No fallback, return None to indicate no analysis possible + return None + + snippet = context.code_snippet + system_prompt = "You are a secret scanning expert. Analyze the context of a potential secret. Determine if it is a TEST/MOCK secret or a REAL production secret." + user_prompt = f"Code Context:\n{snippet}\n\nIs this a real secret? Answer with JSON: {{'is_test': bool, 'reason': str}}" + + try: + response = engine.chat_completion(system_prompt, user_prompt) + # Naive parsing for now + is_test = "true" in response.lower() and "is_test" in response.lower() + + if is_test: + return AIResult( + analysis="AI identified this as a likely TEST/MOCK secret.", + risk_score=0.1, + severity=Severity.LOW, + confidence=Confidence.HIGH, + suggestion="Mark as safe.", + is_advisory=True + ) + else: + return AIResult( + analysis="AI identified this as a likely REAL secret.", + risk_score=0.9, + severity=Severity.HIGH, + confidence=Confidence.HIGH, + suggestion="Rotate immediately.", + is_advisory=True + ) + + except Exception as e: + return AIResult( + analysis=f"Error: {str(e)}", + risk_score=0.5, + severity=Severity.MEDIUM, + confidence=Confidence.LOW, + is_advisory=True + ) diff --git a/openaudit/features/secrets/context.py b/openaudit/features/secrets/context.py new file mode 100644 index 0000000..b21aecc --- /dev/null +++ b/openaudit/features/secrets/context.py @@ -0,0 +1,27 @@ +from pathlib import Path +from typing import Optional + +class SecretContextExtractor: + """ + Extracts code context surrounding a finding. + """ + + @staticmethod + def get_context(file_path: str, line_number: int, window: int = 5) -> str: + """ + Read the file and return lines around the finding. + """ + path = Path(file_path) + if not path.exists() or not path.is_file(): + return "" + + try: + with open(path, "r", encoding="utf-8", errors="ignore") as f: + lines = f.readlines() + + start = max(0, line_number - 1 - window) + end = min(len(lines), line_number + window) + + return "".join(lines[start:end]) + except Exception: + return "" diff --git a/openaudit/features/threat_model/__init__.py b/openaudit/features/threat_model/__init__.py new file mode 100644 index 0000000..e2a3163 --- /dev/null +++ b/openaudit/features/threat_model/__init__.py @@ -0,0 +1 @@ +from .agent import ThreatModelingAgent diff --git a/openaudit/features/threat_model/agent.py b/openaudit/features/threat_model/agent.py new file mode 100644 index 0000000..2fc7e76 --- /dev/null +++ b/openaudit/features/threat_model/agent.py @@ -0,0 +1,135 @@ +from typing import List, Dict, Set +from openaudit.ai.models import PromptContext, AIResult +from openaudit.ai.protocol import AgentProtocol +from openaudit.core.domain import Severity, Confidence +from openaudit.features.architecture.models import ProjectStructure, ModuleNode + +class ThreatModelingAgent(AgentProtocol): + """ + AI Agent that generates a high-level threat model based on project structure. + """ + name = "threat-modeling-agent" + description = "Generates a STRIDE-based threat model for key components." + + def run(self, context: PromptContext) -> AIResult: + # Not used directly, as this agent needs structure. + # We will add a custom run_on_structure method. + return AIResult( + analysis="Use run_on_structure instead.", + risk_score=0.0, + severity=Severity.LOW, + confidence=Confidence.LOW, + is_advisory=True + ) + + def run_on_structure(self, structure: ProjectStructure) -> List[AIResult]: + from openaudit.ai.engine import AIEngine + import json + engine = AIEngine() + + if not engine.is_available(): + return [] + + # Simplify structure for prompt + modules_summary = [f"{m.path} (imports: {m.imports})" for m in structure.modules] + + system_prompt = "You are a security architect. specific STRIDE threat model based on project structure. Identify key components (Auth, DB, API, etc.) and list specific threats. Return a JSON object with a key 'threats' containing a list of objects with 'component', 'threat', 'risk_score' (0-1), and 'mitigation'." + user_prompt = f"Project Structure:\n{json.dumps(modules_summary, indent=2)}\n\nGenerate STRIDE threat model." + + results = [] + try: + response = engine.chat_completion(system_prompt, user_prompt) + # Naive parse + if "{" in response: + start = response.find("{") + end = response.rfind("}") + 1 + json_str = response[start:end] + data = json.loads(json_str) + + for item in data.get("threats", []): + results.append(AIResult( + analysis=f"Threat ({item.get('component', 'General')}): {item.get('threat')}", + risk_score=item.get("risk_score", 0.7), + severity=Severity.HIGH, + confidence=Confidence.MEDIUM, + suggestion=f"Mitigation: {item.get('mitigation')}", + is_advisory=True + )) + else: + results.append(AIResult( + analysis=response[:200] + "...", + risk_score=0.5, + severity=Severity.MEDIUM, + confidence=Confidence.LOW, + suggestion="Review full AI analysis.", + is_advisory=True + )) + except Exception: + pass + + return results + + def _identify_components(self, structure: ProjectStructure) -> Dict[str, str]: + """ + Heuristic to identify key components from module paths. + Returns: {component_name: component_type} + """ + components = {} + for module in structure.modules: + path_lower = module.path.lower() + if "auth" in path_lower or "login" in path_lower or "user" in path_lower: + components[module.name] = "Authentication" + elif "db" in path_lower or "database" in path_lower or "sql" in path_lower or "model" in path_lower: + components[module.name] = "Database" + elif "api" in path_lower or "route" in path_lower or "controller" in path_lower: + components[module.name] = "API Gateway" + elif "payment" in path_lower or "billing" in path_lower: + components[module.name] = "Payments" + + # Deduplication/Grouping logic could go here (e.g. grouping all auth.* modules) + # For now, just taking unique identified modules + return components + + def _generate_stride_threats(self, component_name: str, component_type: str) -> List[Dict[str, str]]: + """ + Generates standard STRIDE threats based on component type. + """ + threats = [] + + if component_type == "Authentication": + threats.append({ + "threat": "Spoofing Identity: Attackers may attempt to impersonate users.", + "mitigation": "Enforce strong MFA and robust session management." + }) + threats.append({ + "threat": "Information Disclosure: Leakage of user credentials.", + "mitigation": "Ensure proper hashing (Argon2/bcrypt) and secure logs." + }) + + elif component_type == "Database": + threats.append({ + "threat": "Tampering with Data: SQL Injection or unauthorized modification.", + "mitigation": "Use parameterized queries/ORM and strict input validation." + }) + threats.append({ + "threat": "Information Disclosure: Exposure of sensitive records.", + "mitigation": "Encrypt data at rest and implement strict RBAC." + }) + + elif component_type == "API Gateway": + threats.append({ + "threat": "Denial of Service: Flooding API resources.", + "mitigation": "Implement rate limiting and request throttling." + }) + threats.append({ + "threat": "Tampering: Parameter pollution or replay attacks.", + "mitigation": "Validate all inputs and use TLS." + }) + + elif component_type == "Payments": + threats.append({ + "threat": "Tampering: Manipulation of transaction amounts.", + "mitigation": "Validate transaction integrity on server-side and use signing." + }) + + return threats diff --git a/openaudit/interface/cli/app.py b/openaudit/interface/cli/app.py index 0d10bb0..8e698ec 100644 --- a/openaudit/interface/cli/app.py +++ b/openaudit/interface/cli/app.py @@ -1,5 +1,5 @@ import typer -from .commands import scan_command +from .commands import scan_command, explain_command, config_app app = typer.Typer( name="OpenAuditKit", @@ -7,4 +7,14 @@ add_completion=False ) +@app.callback() +def main_callback(): + """ + OpenAuditKit CLI + """ + pass + app.command(name="scan")(scan_command) +app.command(name="explain")(explain_command) +app.add_typer(config_app, name="config") +print(f"DEBUG: app in module {__name__} type: {type(app)}") diff --git a/openaudit/interface/cli/commands.py b/openaudit/interface/cli/commands.py index 3186d2a..8c812f6 100644 --- a/openaudit/interface/cli/commands.py +++ b/openaudit/interface/cli/commands.py @@ -1,7 +1,7 @@ import typer import os from pathlib import Path -from openaudit.core.domain import ScanContext, Severity +from openaudit.core.domain import ScanContext, Severity, Confidence from openaudit.core.rules_engine import RulesEngine from openaudit.core.ignore_manager import IgnoreManager import time @@ -11,6 +11,20 @@ from openaudit.reporters.json_reporter import JSONReporter from typing import Optional from enum import Enum +from openaudit.ai.ethics import ConsentManager +from openaudit.features.architecture.scanner import ArchitectureScanner +from openaudit.features.architecture.agent import ArchitectureAgent +from openaudit.ai.models import PromptContext +from openaudit.features.secrets.context import SecretContextExtractor +from openaudit.features.secrets.agent import SecretConfidenceAgent +from openaudit.features.secrets.agent import SecretConfidenceAgent +from openaudit.ai.ethics import Redactor +from openaudit.core.domain import Finding +from openaudit.features.dataflow.scanner import DataFlowScanner +from openaudit.features.dataflow.agent import CrossFileAgent +from openaudit.features.threat_model.agent import ThreatModelingAgent +from openaudit.features.explain.agent import ExplainAgent + class OutputFormat(str, Enum): RICH = "rich" @@ -22,7 +36,8 @@ def scan_command( format: OutputFormat = typer.Option(OutputFormat.RICH, case_sensitive=False, help="Output format"), output: Optional[str] = typer.Option(None, help="Output file path (for JSON)"), ci: bool = typer.Option(False, help="Run in CI mode (no progress bar, exit code 1 on failure)"), - fail_on: Severity = typer.Option(Severity.HIGH, help="Severity threshold to fail the scan") + fail_on: Severity = typer.Option(Severity.HIGH, help="Severity threshold to fail the scan"), + ai: bool = typer.Option(False, help="Enable AI-powered advisory agents (requires consent)") ): """ Scan the target directory for security issues. @@ -33,6 +48,22 @@ def scan_command( typer.echo(f"Error: Target path {target} does not exist.") raise typer.Exit(code=1) + # 1.1 Check AI Consent + if ai: + if not ConsentManager.has_consented(): + if ci: + typer.echo("Error: CI mode requires explicit AI consent. Run 'openaudit consent --grant' locally first or set environment variable.") + raise typer.Exit(code=1) + + # Interactive prompt + confirm = typer.confirm("AI features require sending anonymized code snippets to an LLM. Do you consent?", default=False) + if confirm: + ConsentManager.grant_consent() + typer.echo("Consent granted.") + else: + typer.echo("Consent denied. Disabling AI features.") + ai = False + # 1. Setup Context & Ignore Manager ignore_manager = IgnoreManager(root_path=target_path) context = ScanContext(target_path=str(target_path), ignore_manager=ignore_manager) @@ -72,6 +103,109 @@ def scan_command( with typer.progressbar(scanners, label="Running Scanners") as progress: for scanner in progress: all_findings.extend(scanner.scan(context)) + + # 4.1 Run AI Agents if enabled + if ai: + typer.echo("Running AI Agents...") + # Architecture Agent + arch_scanner = ArchitectureScanner() + structure = arch_scanner.scan(context) + + arch_agent = ArchitectureAgent() + # In a real scenario, we might use a proper AIEngine to look this up + result = arch_agent.run_on_structure(structure) + + if result and result.is_advisory: + # Convert AIResult to Finding + ai_finding = Finding( + rule_id=f"AI-{arch_agent.name.upper()}", + description=f"{result.analysis} Suggested: {result.suggestion}", + file_path="PROJECT_ROOT", + line_number=0, + secret_hash="", + severity=result.severity, + confidence=result.confidence, + category="architecture", + remediation=result.suggestion or "Review architecture.", + is_ai_generated=True + ) + all_findings.append(ai_finding) + + # Cross-File Agent + df_scanner = DataFlowScanner() + df_graph = df_scanner.scan(context, structure) + + cross_agent = CrossFileAgent() + df_results = cross_agent.run_on_graph(df_graph) + + for res in df_results: + if res.is_advisory: + df_finding = Finding( + rule_id=f"AI-{cross_agent.name.upper()}", + description=f"{res.analysis} Suggested: {res.suggestion}", + file_path="PROJECT_ROOT", + line_number=0, + secret_hash="", + severity=res.severity, + confidence=res.confidence, + category="architecture", + remediation=res.suggestion or "Secure data flow.", + is_ai_generated=True + ) + all_findings.append(df_finding) + + # Threat Modeling Agent + threat_agent = ThreatModelingAgent() + tm_results = threat_agent.run_on_structure(structure) + for res in tm_results: + if res.is_advisory: + tm_finding = Finding( + rule_id=f"AI-THREAT-{res.analysis.split(':')[0]}", # Crude ID generation + description=f"{res.analysis} {res.suggestion}", + file_path="PROJECT_ROOT", + line_number=0, + secret_hash="", + severity=res.severity, + confidence=res.confidence, + category="architecture", + remediation=res.suggestion or "Mitigate threat.", + is_ai_generated=True + ) + all_findings.append(tm_finding) + + # Secret Confidence Agent + secret_agent = SecretConfidenceAgent() + for finding in all_findings: + if finding.category == "secret": + # Extract context + code_context = SecretContextExtractor.get_context(finding.file_path, finding.line_number) + if not code_context: + continue + + # Redact + redacted_context = Redactor.redact(code_context) + + # Analyze + ctx = PromptContext( + file_path=finding.file_path, + code_snippet=redacted_context, + line_number=finding.line_number + ) + + ai_result = secret_agent.run(ctx) + + if ai_result: + # Enrich Finding + finding.description += f" [AI: {ai_result.analysis}]" + finding.is_ai_generated = True # Tag enriched findings too + + # If agent is very confident it's a false positive (test), downgrade + if ai_result.confidence == Confidence.LOW and ai_result.severity == Severity.LOW: + finding.confidence = Confidence.LOW + finding.severity = Severity.LOW + finding.description = f"[ADVISORY] {finding.description}" + + duration = time.time() - start_time # 5. Report @@ -99,3 +233,69 @@ def scan_command( if not format == OutputFormat.JSON: typer.echo(f"Failure: Found issues with severity >= {fail_on.value}") raise typer.Exit(code=1) + +def explain_command( + path: str = typer.Argument(..., help="Path to the file to explain"), + ai: bool = typer.Option(True, help="Enable AI features (implied true for this command)") +): + """ + Explain the code in a specific file using AI. + """ + target_path = Path(path).absolute() + if not target_path.exists() or not target_path.is_file(): + typer.echo(f"Error: path {path} does not exist or is not a file.") + raise typer.Exit(code=1) + + # Check Consent + if not ConsentManager.has_consented(): + confirm = typer.confirm("This feature sends code to an AI. Do you consent?", default=False) + if confirm: + ConsentManager.grant_consent() + else: + typer.echo("Consent refused. Exiting.") + raise typer.Exit(code=1) + + # Read Content + content = target_path.read_text(encoding="utf-8", errors="ignore") + + # Redact + redacted_content = Redactor.redact(content) + + # Run Agent + agent = ExplainAgent() + context = PromptContext(code_snippet=redacted_content, file_path=str(target_path)) + result = agent.run(context) + + # Output + typer.echo("") + typer.echo(f"🔍 Analysis for {target_path.name}") + typer.echo("=========================================") + typer.echo(result.analysis) + typer.echo("=========================================") + + +# Config Commands +config_app = typer.Typer(help="Manage OpenAuditKit configuration.") + +@config_app.command("set-key") +def set_key(key: str = typer.Argument(..., help="OpenAI API Key")): + """ + Set the OpenAI API key in the configuration file. + """ + from openaudit.core.config import ConfigManager + manager = ConfigManager() + manager.set_api_key(key) + typer.echo(f"API key saved to {manager.config_path}") + +@config_app.command("show") +def show_config(): + """ + Show current configuration path and status. + """ + from openaudit.core.config import ConfigManager + manager = ConfigManager() + key = manager.get_api_key() + status = "Set" if key else "Not Set" + typer.echo(f"Config File: {manager.config_path}") + typer.echo(f"API Key Status: {status}") + \ No newline at end of file diff --git a/openaudit/main.py b/openaudit/main.py index 319f4c8..67a93c9 100644 --- a/openaudit/main.py +++ b/openaudit/main.py @@ -1,6 +1,8 @@ from openaudit.interface.cli.app import app +import sys def main(): + print(f"DEBUG: sys.argv = {sys.argv}") app() if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 6976468..6ba8b99 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ license = { text = "MIT" } authors = [ { name = "OpenAuditKit Team", email = "info@openauditkit.org" } ] +urls = { Repository = "https://github.com/neuralforgeone/OpenAuditKit", Issues = "https://github.com/neuralforgeone/OpenAuditKit/issues" } classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -22,7 +23,8 @@ dependencies = [ "pyyaml>=6.0", "rich>=13.0.0", "pydantic>=2.0.0", - "pathspec>=0.11.0" + "pathspec>=0.11.0", + "openai>=1.0.0" ] [project.scripts] diff --git a/report.json b/report.json deleted file mode 100644 index 11819a2..0000000 --- a/report.json +++ /dev/null @@ -1,111 +0,0 @@ -{ - "summary": { - "total": 10, - "critical": 1, - "high": 4, - "medium": 3, - "low": 2 - }, - "findings": [ - { - "rule_id": "AWS_ACCESS_KEY_ID", - "description": "AWS Access Key ID", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\test_secret.py", - "line_number": 2, - "secret_hash": "AK****************56", - "severity": "critical", - "category": "secret", - "remediation": "No remediation provided." - }, - { - "rule_id": "GENERIC_API_KEY", - "description": "Potential High Entropy Key", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\test_secret.py", - "line_number": 4, - "secret_hash": "ap***************************************90", - "severity": "high", - "category": "secret", - "remediation": "No remediation provided." - }, - { - "rule_id": "COMPOSE_RESTART_ALWAYS", - "description": "Restart policy set to always", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\docker-compose.yml", - "line_number": 7, - "secret_hash": "restart: always", - "severity": "low", - "category": "infrastructure", - "remediation": "Consider 'on-failure' or specific restart policies." - }, - { - "rule_id": "COMPOSE_PORT_EXPOSURE", - "description": "Port exposed to host (broad range)", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\docker-compose.yml", - "line_number": 6, - "secret_hash": "- \"0.0.0.0:", - "severity": "medium", - "category": "infrastructure", - "remediation": "Bind ports to localhost (127.0.0.1) if external access is not required." - }, - { - "rule_id": "DOCKER_USER_ROOT", - "description": "Container running as root", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\Dockerfile", - "line_number": 3, - "secret_hash": "USER root", - "severity": "high", - "category": "infrastructure", - "remediation": "Create and switch to a non-root user." - }, - { - "rule_id": "DOCKER_EXPOSE_ALL", - "description": "Exposing service on all interfaces (0.0.0.0)", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\Dockerfile", - "line_number": 4, - "secret_hash": "EXPOSE 0.0.0.0", - "severity": "medium", - "category": "infrastructure", - "remediation": "Bind to specific interfaces if possible." - }, - { - "rule_id": "DOCKER_ADD_COPY_ALL", - "description": "Broad COPY instruction (COPY . /)", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\Dockerfile", - "line_number": 5, - "secret_hash": "COPY . /", - "severity": "low", - "category": "infrastructure", - "remediation": "Use .dockerignore and copy only necessary files." - }, - { - "rule_id": "CONF_DOTENV_EXPOSED", - "description": "Dotenv file found. Ensure this is not committed.", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\test.env", - "line_number": 0, - "secret_hash": "N/A", - "severity": "medium", - "category": "config", - "remediation": "Add to .gitignore" - }, - { - "rule_id": "CONF_DEBUG_ENABLED", - "description": "Debug mode enabled in configuration", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\test.env", - "line_number": 1, - "secret_hash": "DEBUG=True", - "severity": "high", - "category": "config", - "remediation": "Set DEBUG=False in production environments." - }, - { - "rule_id": "CONF_DATABASE_URL_UNENCRYPTED", - "description": "Plaintext database URL detected", - "file_path": "C:\\Users\\tunay\\Documents\\GitHub\\OpenAuditKit\\test.env", - "line_number": 2, - "secret_hash": "DATABASE_URL=po*******//", - "severity": "high", - "category": "config", - "remediation": "Use encrypted secrets management or mask credentials." - } - ] -} \ No newline at end of file diff --git a/test.env b/test.env deleted file mode 100644 index ade97b8..0000000 --- a/test.env +++ /dev/null @@ -1,2 +0,0 @@ -DEBUG=True -DATABASE_URL=postgres://user:pass@localhost:5432/db diff --git a/test_secret.py b/test_secret.py deleted file mode 100644 index 4f11280..0000000 --- a/test_secret.py +++ /dev/null @@ -1,4 +0,0 @@ -# This is a test file -AWS_ACCESS_KEY_ID = "AKIA1234567890123456" -# Another secret -api_key = "abcdef1234567890abcdef1234567890" # High entropy