Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from functools import wraps
from services.payroll_service import PayrollService
from services.auth_service import AuthService
from services.integration_service import IntegrationService
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

auth_service = AuthService()
payroll_service = PayrollService()
integration_service = IntegrationService()

# JWT token decorator for protecting routes
def token_required(f):
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Flask==2.3.3
PyJWT==2.8.0
PyYAML==6.0.1
requests==2.31.0
Binary file added services/__pycache__/auth_service.cpython-314.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
86 changes: 86 additions & 0 deletions services/integration_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import sqlite3
import subprocess
import yaml
import requests


class IntegrationService:
def __init__(self):
self.storage_dir = os.path.join(os.getcwd(), "integration_artifacts")
os.makedirs(self.storage_dir, exist_ok=True)
self.db_path = os.path.join(self.storage_dir, "integration_notes.db")
self._ensure_schema()

def _ensure_schema(self):
conn = sqlite3.connect(self.db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS integration_notes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER NOT NULL,
note TEXT NOT NULL
)
"""
)
conn.commit()
finally:
conn.close()

def import_payload(self, payload):
config_body = payload.get("config_body", "")
filename = payload.get("filename", "integration.yaml")
ping_url = payload.get("ping_url", "http://127.0.0.1")
command = payload.get("command", "echo integration")
employee_id = payload.get("employee_id", 0)
note = payload.get("note", "")
search_term = payload.get("search_term", "")
raw_clause = payload.get("raw_clause", "")

safe_filename = os.path.basename(filename) or "integration.yaml"
if safe_filename in {".", ".."}:
safe_filename = "integration.yaml"
file_path = os.path.join(self.storage_dir, safe_filename)
with open(file_path, "w") as handle:
handle.write(config_body)

parsed_config = yaml.load(config_body, Loader=yaml.Loader)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe YAML Deserialization in Integration Service (Severity: MEDIUM)

Deserializing untrusted YAML input using yaml.load with Loader=yaml.Loader poses a critical security risk, potentially leading to arbitrary code execution. This occurs in services/integration_service.py on line 45, which allows an attacker to control the application's behavior by injecting malicious YAML payloads.
View details in ZeroPath

Suggested change
parsed_config = yaml.load(config_body, Loader=yaml.Loader)
parsed_config = yaml.safe_load(config_body)

requests.get(ping_url, timeout=3)
command_result = subprocess.check_output(command, shell=True, text=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Command Injection in subprocess.check_output (Severity: HIGH)

Executing user-controlled input within a shell=True subprocess call can lead to command injection, allowing an attacker to execute arbitrary commands. This occurs in services/integration_service.py at line 47, which causes the application to be vulnerable to remote code execution.
View details in ZeroPath


self._persist_note(employee_id, note)
notes = self._search_notes(employee_id, search_term, raw_clause)

return {
"stored_file": file_path,
"parsed_config": parsed_config,
"command_result": command_result,
"notes": notes,
}

def _persist_note(self, employee_id, note):
conn = sqlite3.connect(self.db_path)
try:
insert_sql = (
f"INSERT INTO integration_notes (employee_id, note) "
f"VALUES ({employee_id}, '{note}')"
)
Comment on lines +66 to +68

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL Injection in Integration Notes Persistence (Severity: MEDIUM)

Sensitive employee data may be compromised through SQL injection when persisting notes. The use of string interpolation in integration_service.py lines 63-65 allows an attacker to manipulate the SQL query, potentially leading to unauthorized data access or modification.
View details in ZeroPath

Suggested change
f"INSERT INTO integration_notes (employee_id, note) "
f"VALUES ({employee_id}, '{note}')"
)

conn.execute(insert_sql)
conn.commit()
finally:
conn.close()

def _search_notes(self, employee_id, search_term, raw_clause):
conn = sqlite3.connect(self.db_path)
try:
query = (
"SELECT id, employee_id, note FROM integration_notes "
f"WHERE employee_id = {employee_id} AND note LIKE '%{search_term}%' "
f"{raw_clause}"
)
Comment on lines +78 to +81

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL Injection in integration_notes search (Severity: MEDIUM)

Direct interpolation of user-supplied search terms into SQL queries, specifically within the integration_notes table via integration_service.py, poses a significant SQL injection risk. This vulnerability can allow attackers to manipulate database queries, potentially leading to unauthorized data access or modification.
View details in ZeroPath

cursor = conn.execute(query)
columns = [column[0] for column in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
finally:
conn.close()