Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ Support the developer of this open-source project
```

**1. Clone and run:**
The new version of INTERCEPT includes PEPPER as a security enhancement, so all users, both new and existing, must follow the steps below to use the platform. SAVE your PEPPER for future updates or you will not be able to log in with your usual user account.

Generate a secure PEPPER with "openssl rand -hex 32" and save it in your computer. This is necessary if you are deploying with requirements or UV. In case of ./setup.sh the PEPPER will be ask you to generate during the installation.

macOS:

echo 'export INTERCEPT_PEPPER="your_generated_secret_here"' >> ~/.zshrc
source ~/.zshrc

Linux:
echo 'export INTERCEPT_PEPPER="your_generated_secret_here"' >> ~/.bashrc
source ~/.bashrc

```bash
git clone https://github.com/smittix/intercept.git
cd intercept
Expand All @@ -56,7 +69,8 @@ sudo -E venv/bin/python intercept.py
```

### Docker (Alternative)

Generate a secure PEPPER with "openssl rand -hex 32".
Modify on the docker-compose the INTERCEPT_PEPPER=your_generated_secret_here
```bash
git clone https://github.com/smittix/intercept.git
cd intercept
Expand Down
21 changes: 6 additions & 15 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
import site

from utils.database import get_db
from utils.database import verify_user

# Ensure user site-packages is available (may be disabled when running as root/sudo)
if not site.ENABLE_USER_SITE:
Expand Down Expand Up @@ -241,26 +241,17 @@ def logout():
return redirect(url_for('login'))

@app.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute") # Limit to 5 login attempts per minute per IP
@limiter.limit("5 per minute")
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')

# Connect to DB and find user
with get_db() as conn:
cursor = conn.execute(
'SELECT password_hash, role FROM users WHERE username = ?',
(username,)
)
user = cursor.fetchone()
password = request.form.get('password') or ''
user_data = verify_user(username, password)

# Verify user exists and password is correct
if user and check_password_hash(user['password_hash'], password):
# Store data in session
if user_data:
session['logged_in'] = True
session['username'] = username
session['role'] = user['role']
session['role'] = user_data['role']

logger.info(f"User '{username}' logged in successfully.")
return redirect(url_for('index'))
Expand Down
35 changes: 19 additions & 16 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,27 +136,27 @@ def _get_env_bool(key: str, default: bool) -> bool:
BT_SCAN_TIMEOUT = _get_env_int('BT_SCAN_TIMEOUT', 10)
BT_UPDATE_INTERVAL = _get_env_float('BT_UPDATE_INTERVAL', 2.0)

# ADS-B settings
ADSB_SBS_PORT = _get_env_int('ADSB_SBS_PORT', 30003)
ADSB_UPDATE_INTERVAL = _get_env_float('ADSB_UPDATE_INTERVAL', 1.0)
ADSB_AUTO_START = _get_env_bool('ADSB_AUTO_START', False)
ADSB_HISTORY_ENABLED = _get_env_bool('ADSB_HISTORY_ENABLED', False)
# ADS-B settings
ADSB_SBS_PORT = _get_env_int('ADSB_SBS_PORT', 30003)
ADSB_UPDATE_INTERVAL = _get_env_float('ADSB_UPDATE_INTERVAL', 1.0)
ADSB_AUTO_START = _get_env_bool('ADSB_AUTO_START', False)
ADSB_HISTORY_ENABLED = _get_env_bool('ADSB_HISTORY_ENABLED', False)
ADSB_DB_HOST = _get_env('ADSB_DB_HOST', 'localhost')
ADSB_DB_PORT = _get_env_int('ADSB_DB_PORT', 5432)
ADSB_DB_NAME = _get_env('ADSB_DB_NAME', 'intercept_adsb')
ADSB_DB_USER = _get_env('ADSB_DB_USER', 'intercept')
ADSB_DB_PASSWORD = _get_env('ADSB_DB_PASSWORD', 'intercept')
ADSB_HISTORY_BATCH_SIZE = _get_env_int('ADSB_HISTORY_BATCH_SIZE', 500)
ADSB_HISTORY_FLUSH_INTERVAL = _get_env_float('ADSB_HISTORY_FLUSH_INTERVAL', 1.0)
ADSB_HISTORY_QUEUE_SIZE = _get_env_int('ADSB_HISTORY_QUEUE_SIZE', 50000)

# Observer location settings
SHARED_OBSERVER_LOCATION_ENABLED = _get_env_bool('SHARED_OBSERVER_LOCATION', True)

# Satellite settings
SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)
ADSB_HISTORY_BATCH_SIZE = _get_env_int('ADSB_HISTORY_BATCH_SIZE', 500)
ADSB_HISTORY_FLUSH_INTERVAL = _get_env_float('ADSB_HISTORY_FLUSH_INTERVAL', 1.0)
ADSB_HISTORY_QUEUE_SIZE = _get_env_int('ADSB_HISTORY_QUEUE_SIZE', 50000)
# Observer location settings
SHARED_OBSERVER_LOCATION_ENABLED = _get_env_bool('SHARED_OBSERVER_LOCATION', True)
# Satellite settings
SATELLITE_UPDATE_INTERVAL = _get_env_int('SATELLITE_UPDATE_INTERVAL', 30)
SATELLITE_TRAJECTORY_POINTS = _get_env_int('SATELLITE_TRAJECTORY_POINTS', 30)
SATELLITE_ORBIT_MINUTES = _get_env_int('SATELLITE_ORBIT_MINUTES', 45)

# Update checking
GITHUB_REPO = _get_env('GITHUB_REPO', 'smittix/intercept')
Expand All @@ -166,6 +166,9 @@ def _get_env_bool(key: str, default: bool) -> bool:
# Admin credentials
ADMIN_USERNAME = _get_env('ADMIN_USERNAME', 'admin')
ADMIN_PASSWORD = _get_env('ADMIN_PASSWORD', 'admin')
PEPPER = os.environ.get('INTERCEPT_PEPPER')
if PEPPER is None:
raise RuntimeError("INTERCEPT_PEPPER environment variable must be set. See README for setup instructions.")

def configure_logging() -> None:
"""Configure application logging."""
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
- INTERCEPT_HOST=0.0.0.0
- INTERCEPT_PORT=5050
- INTERCEPT_LOG_LEVEL=INFO
- INTERCEPT_PEPPER=your_generated_secret_here
# ADS-B history is disabled by default
# To enable, use: docker compose --profile history up -d
# - INTERCEPT_ADSB_HISTORY_ENABLED=true
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies = [
"bleak>=0.21.0",
"flask-sock",
"requests>=2.28.0",
'psycopg2-binary>=2.9.9',
'numpy>=1.24.0',
'scipy>=1.10.0'
]

[project.urls]
Expand Down
33 changes: 33 additions & 0 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1013,11 +1013,44 @@ install_debian_packages() {
fi
}

# ----------------------------
# Security / Pepper Check
# ----------------------------
check_pepper() {
echo
info "Checking Security Configuration..."

if [[ -z "${INTERCEPT_PEPPER:-}" ]]; then
warn "INTERCEPT_PEPPER is not set in your environment."
echo -e "For security, you must generate a unique Pepper for password hashing."

if ask_yes_no "Would you like me to generate one and show you how to save it?"; then
local NEW_PEPPER
NEW_PEPPER=$(openssl rand -hex 32)
ok "Generated Pepper: ${NEW_PEPPER}"
echo
echo "To make this permanent, run the following command:"
if [[ "$OS" == "macos" ]]; then
echo -e "${YELLOW}echo 'export INTERCEPT_PEPPER=\"$NEW_PEPPER\"' >> ~/.zshrc && source ~/.zshrc${NC}"
else
echo -e "${YELLOW}echo 'export INTERCEPT_PEPPER=\"$NEW_PEPPER\"' >> ~/.bashrc && source ~/.bashrc${NC}"
fi
echo
warn "IMPORTANT: Save this secret! If you lose it, you will be locked out of your database."
else
fail "Warning: INTERCEPT will fail to start without INTERCEPT_PEPPER."
fi
else
ok "INTERCEPT_PEPPER is already set."
fi
}

# ----------------------------
# Final summary / hard fail
# ----------------------------
final_summary_and_hard_fail() {
check_tools
check_pepper

echo "============================================"
echo
Expand Down
98 changes: 98 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
import pytest
from pathlib import Path
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from utils.database import verify_user, generate_password_hash, check_password_hash

# Mock configuration to ensure the PEPPER is consistent during tests
MOCK_PEPPER = "secret_pepper_123"

# Need to patch DB_PATH before importing database module
@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -254,3 +259,96 @@ def test_correlation_upsert(self, temp_db):
if c['wifi_mac'] == 'AA:AA:AA:AA:AA:AA']
assert len(matching) == 1
assert matching[0]['confidence'] == 0.9

######
# Tests for user verification and password hash migration
######

@pytest.fixture
def mock_db_user():
"""Simulates a database response for a user."""
def _create_user(id_val, pw_hash, role="admin"):
return {"id": id_val, "password_hash": pw_hash, "role": role}
return _create_user

### 1. Test: Successful Login with New Hash (Peppered)
@patch('utils.database.PEPPER', MOCK_PEPPER)
@patch('utils.database.get_db')
def test_verify_user_success_new_hash(mock_get_db, mock_db_user):
# Generate a hash that ALREADY includes the pepper
password = "my_secure_password"
peppered_hash = generate_password_hash(f"{password}{MOCK_PEPPER}")

# Configure the DB mock
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = mock_db_user(1, peppered_hash)
mock_get_db.return_value.__enter__.return_value = mock_conn

result = verify_user("test_user", password)

assert result is not None
assert result["role"] == "admin"

### 2. Test: Legacy Hash Detection and Automatic Migration
@patch('utils.database.PEPPER', MOCK_PEPPER)
@patch('utils.database.get_db')
def test_verify_user_legacy_migration(mock_get_db, mock_db_user):
password = "old_password"
# Create a hash WITHOUT the pepper (simulating old data)
legacy_hash = generate_password_hash(password)

mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = mock_db_user(2, legacy_hash)
mock_get_db.return_value.__enter__.return_value = mock_conn

# Act: Verify the user
result = verify_user("legacy_user", password)

# ASSERTIONS
# 1. Access must be granted (Fallback worked)
assert result is not None
assert result["role"] == "admin"

# 2. Verify the UPDATE logic was triggered
update_calls = [
call for call in mock_conn.execute.call_args_list
if 'UPDATE users SET password_hash' in call[0][0]
]
assert len(update_calls) == 1, "The database was not updated with the new hash"

# 3. CRITICAL: Verify the updated hash now includes the PEPPER
# We extract the 'new_hash' argument from the execute(query, params) call
new_hash_in_db = update_calls[0][0][1][0]

# It must fail WITHOUT the pepper now
assert check_password_hash(new_hash_in_db, password) is False
# It must succeed WITH the pepper
assert check_password_hash(new_hash_in_db, f"{password}{MOCK_PEPPER}") is True

print("✓ Migration successful: User granted access and hash upgraded with Pepper.")

### 3. Test: Login Failure (Incorrect Password)
@patch('utils.database.PEPPER', MOCK_PEPPER)
@patch('utils.database.get_db')
def test_verify_user_wrong_password(mock_get_db, mock_db_user):
correct_password = "real_password"
peppered_hash = generate_password_hash(f"{correct_password}{MOCK_PEPPER}")

mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = mock_db_user(3, peppered_hash)
mock_get_db.return_value.__enter__.return_value = mock_conn

# Attempt login with a typo/wrong password
result = verify_user("test_user", "wrong_password")

assert result is None

### 4. Test: User Does Not Exist
@patch('utils.database.get_db')
def test_verify_user_not_found(mock_get_db):
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = None
mock_get_db.return_value.__enter__.return_value = mock_conn

result = verify_user("ghost_user", "1234")
assert result is None
56 changes: 48 additions & 8 deletions utils/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from datetime import datetime
from pathlib import Path
from typing import Any
from werkzeug.security import generate_password_hash
from config import ADMIN_USERNAME, ADMIN_PASSWORD
from werkzeug.security import generate_password_hash, check_password_hash
from config import ADMIN_USERNAME, ADMIN_PASSWORD, PEPPER

logger = logging.getLogger('intercept.database')

Expand All @@ -24,6 +24,50 @@
# Thread-local storage for connections
_local = threading.local()

def verify_user(username: str, password: str) -> dict | None:
"""
Verifies user credentials. If a legacy hash is found, it migrates
the user to the new PEPPER-based hashing automatically.
Returns user data (role) if successful, None otherwise.
"""
with get_db() as conn:
cursor = conn.execute(
'SELECT id, password_hash, role FROM users WHERE username = ?',
(username,)
)
user = cursor.fetchone()

if not user:
return None

user_id = user['id']
current_hash = user['password_hash']
user_role = user['role']

# 1. New verification (Password + Pepper)
if check_password_hash(current_hash, f"{password}{PEPPER}"):
return {"role": user_role}

# 2. Legacy fallback (Password only)
if check_password_hash(current_hash, password):
logger.info(f"Legacy hash detected for user '{username}'. Migrating...")

# Upgrade the hash to include the Pepper
new_hash = generate_password_hash(f"{password}{PEPPER}")

try:
with get_db() as conn:
conn.execute(
'UPDATE users SET password_hash = ? WHERE id = ?',
(new_hash, user_id)
)
return {"role": user_role}
except Exception as e:
logger.error(f"Migration failed for {username}, but granting access: {e}")
return {"role": user_role}

return None


def get_db_path() -> Path:
"""Get the database file path, creating directory if needed."""
Expand Down Expand Up @@ -115,12 +159,7 @@ def init_db() -> None:

cursor = conn.execute('SELECT COUNT(*) FROM users')
if cursor.fetchone()[0] == 0:
from config import ADMIN_USERNAME, ADMIN_PASSWORD

logger.info(f"Creating default admin user: {ADMIN_USERNAME}")

# Password hashing
hashed_pw = generate_password_hash(ADMIN_PASSWORD)
hashed_pw = generate_password_hash(f"{ADMIN_PASSWORD}{PEPPER}")

conn.execute('''
INSERT INTO users (username, password_hash, role)
Expand Down Expand Up @@ -1955,3 +1994,4 @@ def cleanup_old_payloads(max_age_hours: int = 24) -> int:
WHERE received_at < datetime('now', ?)
''', (f'-{max_age_hours} hours',))
return cursor.rowcount