diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5113f96 --- /dev/null +++ b/.gitignore @@ -0,0 +1,71 @@ +``` +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environment +.env +.env.local +*.env.* + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# Logs +*.log + +# Temp files +*.tmp + +# Coverage +.coverage +coverage/ +htmlcov/ + +# Distribution / packaging +.Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +``` \ No newline at end of file diff --git a/vuln_monitor.py b/vuln_monitor.py new file mode 100644 index 0000000..3e1950d --- /dev/null +++ b/vuln_monitor.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +Linux Kernel Vulnerability Monitor +Fetches vulnerability information from linux-security-vulns repository +and formats output showing vulnerability ID, introduced version, mainline fix, +and fixes for specific stable branches. +""" + +import os +import sys +import json +import argparse +import subprocess +from datetime import datetime +from pathlib import Path + + +class VulnMonitor: + def __init__(self, repo_url="https://github.com/analogdevicesinc/linux-security-vulns"): + self.repo_url = repo_url + self.local_repo_path = "/tmp/linux-security-vulns" + + def clone_or_update_repo(self): + """Clone or update the security vulnerabilities repository""" + if os.path.exists(self.local_repo_path): + print(f"Updating existing repository at {self.local_repo_path}") + subprocess.run(["git", "-C", self.local_repo_path, "fetch"], check=True) + subprocess.run(["git", "-C", self.local_repo_path, "pull"], check=True) + else: + print(f"Cloning repository from {self.repo_url}") + subprocess.run(["git", "clone", self.repo_url, self.local_repo_path], check=True) + + def parse_vulnerability_data(self): + """Parse vulnerability data from the repository""" + cve_dir = os.path.join(self.local_repo_path, "cve", "published") + if not os.path.exists(cve_dir): + print(f"CVE directory not found at {cve_dir}") + return [] + + vulnerabilities = [] + + # Walk through all subdirectories to find JSON files + for root, dirs, files in os.walk(cve_dir): + for filename in files: + if filename.endswith('.json'): + filepath = os.path.join(root, filename) + try: + with open(filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + vulnerabilities.append(data) + except Exception as e: + print(f"Error reading {filepath}: {e}") + + return vulnerabilities + + def extract_fix_info(self, vuln_data, stable_branches): + """Extract fix information for specific stable branches""" + fixes = {} + + # Extract CVE ID for reference + cve_id = vuln_data.get("cveMetadata", {}).get("cveID", "N/A") + + # Check for mainline fix in references or affected versions + mainline_fix = "N/A" + if "containers" in vuln_data and "cna" in vuln_data["containers"]: + container = vuln_data["containers"]["cna"] + + # Look in references for mainline commits + if "references" in container: + for ref in container["references"]: + url = ref.get("url", "") + if "git.kernel.org" in url and "mainline" in url: + # Extract commit hash or version from URL + parts = url.split('/') + if len(parts) >= 2: + mainline_fix = parts[-1][:12] + "..." # Short commit hash + break + + # Also check in affected versions for fixed versions + if "affected" in container: + for affected in container["affected"]: + if "versions" in affected: + for version_info in affected["versions"]: + status = version_info.get("status", "") + if status == "unaffected": + # This might indicate a fix version + if "version" in version_info: + version = version_info["version"] + # Only consider this a mainline fix if it's not a wildcard + if "*" not in version and "original_commit_for_fix" not in str(version_info.get("versionType", "")): + mainline_fix = version + break + + fixes['mainline'] = mainline_fix + + # Check for specific stable branch fixes + for branch in stable_branches: + branch_fix = "N/A" + + if "affected" in container: + for affected in container["affected"]: + if "versions" in affected: + for version_info in affected["versions"]: + status = version_info.get("status", "") + if status == "unaffected": + version = version_info.get("version", "N/A") + # Check if this version applies to our target branch + if version != "N/A" and "*" not in version: + # Simple pattern matching for stable branches + if branch.replace("stable-", "") in version or \ + (branch.replace("stable-", "").split('.')[0] in version and len(version.split('.')) >= 2): + branch_fix = version + break + + fixes[branch] = branch_fix + + return fixes + + def display_vulnerabilities(self, vulnerabilities, stable_branches=['stable-4.19', 'stable-5.10', 'stable-6.6']): + """Display vulnerabilities in formatted table""" + print("\nLinux Kernel Vulnerability Report") + print("=" * 120) + print(f"{'CVE/Vuln ID':<15} {'Introduced':<12} {'Mainline Fix':<15} {'Stable-4.19':<12} {'Stable-5.10':<12} {'Stable-6.6':<12}") + print("-" * 120) + + count = 0 + for vuln in vulnerabilities: + if count >= 50: # Limit to first 50 vulnerabilities to avoid too much output + print("... (truncated for readability)") + break + + # Extract basic info + vuln_id = vuln.get("cveMetadata", {}).get("cveID", "N/A") + + # Get introduced version - look for vulnerable versions in the data + introduced = "N/A" + if "containers" in vuln and "cna" in vuln["containers"]: + container = vuln["containers"]["cna"] + if "affected" in container: + for affected in container["affected"]: + # First check ranges for introduced version + if "ranges" in affected: + for range_entry in affected["ranges"]: + if range_entry.get("type") in ["VERSION", "GIT"]: + for event in range_entry.get("events", []): + if "introduced" in event: + introduced = event["introduced"] + break + elif "from" in event and event["from"] != "0": + # Some ranges use "from" instead of "introduced" + introduced = event["from"] + break + if introduced != "N/A": + break + + # If not found in ranges, check specific versions + if introduced == "N/A" and "versions" in affected: + for version_info in affected["versions"]: + status = version_info.get("status", "") + if status == "affected": + # Check if this is the introduced version + if "version" in version_info: + version = version_info["version"] + if version and "*" not in version and "original_commit_for_fix" not in str(version_info.get("versionType", "")): + # Only set if it looks like a proper version number (fallback to basic check) + if self.is_basic_kernel_version(version): + introduced = version + break + if introduced != "N/A": + break + + # Get fix information + fixes = self.extract_fix_info(vuln, stable_branches) + + print(f"{vuln_id:<15} {introduced:<12} {fixes['mainline']:<15} {fixes['stable-4.19']:<12} {fixes['stable-5.10']:<12} {fixes['stable-6.6']:<12}") + count += 1 + + def is_basic_kernel_version(self, version): + """Basic check if a version string looks like a valid kernel version (fallback)""" + if not version or version == "N/A": + return False + + # Remove any extra qualifiers like -rc, -next, etc. + base_version = version.split('-')[0] + + try: + parts = base_version.split('.') + if len(parts) >= 2: + # Check if major and minor parts are numeric + major = int(parts[0]) + minor = int(parts[1]) + + # Basic sanity check: major version shouldn't be extremely large + if major > 0 and major < 10 and len(str(major)) <= 2: + return True + except ValueError: + pass + + return False + + + def run(self): + """Main execution method""" + try: + # Clone or update the repository + self.clone_or_update_repo() + + # Parse vulnerability data + vulnerabilities = self.parse_vulnerability_data() + + # Display vulnerabilities in formatted output + self.display_vulnerabilities(vulnerabilities) + + print(f"\nTotal vulnerabilities found: {len(vulnerabilities)}") + print(f"Report generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + except subprocess.CalledProcessError as e: + print(f"Git operation failed: {e}") + sys.exit(1) + except Exception as e: + print(f"An error occurred: {e}") + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser(description='Linux Kernel Vulnerability Monitor') + parser.add_argument('--repo', default='https://github.com/analogdevicesinc/linux-security-vulns', + help='URL of the vulnerability repository (default: %(default)s)') + parser.add_argument('--branches', nargs='+', + default=['stable-4.19', 'stable-5.10', 'stable-6.6'], + help='List of stable branches to check for fixes (default: %(default)s)') + + args = parser.parse_args() + + monitor = VulnMonitor(repo_url=args.repo) + monitor.run() + + +if __name__ == "__main__": + main() \ No newline at end of file