Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions imednet/endpoints/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from imednet.core.protocols import ParamProcessor
from imednet.models.jobs import Job
from imednet.models.records import Record
from imednet.validation.cache import SchemaCache, validate_record_data
from imednet.utils.security import validate_header_value
from imednet.validation.cache import SchemaCache, validate_record_entry


class RecordsParamProcessor(ParamProcessor):
Expand Down Expand Up @@ -68,12 +69,7 @@ def _validate_records_if_schema_present(
"""
if schema is not None:
for rec in records_data:
fk = rec.get("formKey") or rec.get("form_key")
if not fk:
fid = rec.get("formId") or rec.get("form_id") or 0
fk = schema.form_key_from_id(fid)
if fk:
validate_record_data(schema, fk, rec.get("data", {}))
validate_record_entry(schema, rec)

def _build_headers(self, email_notify: Union[bool, str, None]) -> Dict[str, str]:
"""
Expand All @@ -91,9 +87,7 @@ def _build_headers(self, email_notify: Union[bool, str, None]) -> Dict[str, str]
headers = {}
if email_notify is not None:
if isinstance(email_notify, str):
# Security: Prevent header injection via newlines
if "\n" in email_notify or "\r" in email_notify:
raise ValueError("email_notify must not contain newlines")
validate_header_value(email_notify)
headers[HEADER_EMAIL_NOTIFY] = email_notify
else:
headers[HEADER_EMAIL_NOTIFY] = str(email_notify).lower()
Expand Down
14 changes: 14 additions & 0 deletions imednet/utils/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@ def sanitize_csv_formula(value: Any) -> Any:
if isinstance(value, str) and value.lstrip().startswith(("=", "+", "-", "@")):
return f"'{value}"
return value


def validate_header_value(value: str) -> None:
"""
Validate that a header value does not contain newline characters.

Args:
value: The header value to validate.

Raises:
ValueError: If the value contains newline characters.
"""
if "\n" in value or "\r" in value:
raise ValueError(f"Header value must not contain newlines: {value!r}")
25 changes: 25 additions & 0 deletions imednet/validation/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,31 @@ def validate_record_data(
_check_type(variables[name].variable_type, value)


def validate_record_entry(
schema: BaseSchemaCache[Any],
record: Dict[str, Any],
) -> None:
"""
Validate a single record entry against the schema.

Resolves the form key from "formKey", "form_key", "formId", or "form_id".

Args:
schema: The schema cache to use for validation.
record: The record data dictionary.

Raises:
ValidationError: If the form key is not present in the schema or the data
fails validation checks.
"""
fk = record.get("formKey") or record.get("form_key")
if not fk:
fid = record.get("formId") or record.get("form_id") or 0
fk = schema.form_key_from_id(fid)
if fk:
validate_record_data(schema, fk, record.get("data", {}))


class SchemaValidator(_ValidatorMixin):
"""Validate record payloads using variable metadata from the API."""

Expand Down
6 changes: 6 additions & 0 deletions tests/unit/endpoints/test_records_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,9 @@ def test_create_validates_data_with_snake_case_keys(dummy_client, context, respo
# Even if we use snake_case "form_key"
with pytest.raises(ValidationError):
ep.create("S1", [{"form_key": "F1", "data": {"bad": 1}}], schema=schema)


def test_create_raises_on_header_injection(dummy_client, context):
ep = records.RecordsEndpoint(dummy_client, context)
with pytest.raises(ValueError, match="Header value must not contain newlines"):
ep.create("S1", [{"data": {}}], email_notify="test\n@example.com")