diff --git a/imednet/endpoints/records.py b/imednet/endpoints/records.py index e4cb7373..0e9e9a83 100644 --- a/imednet/endpoints/records.py +++ b/imednet/endpoints/records.py @@ -7,7 +7,8 @@ from imednet.core.protocols import ParamProcessor from imednet.models.jobs import Job from imednet.models.records import Record -from imednet.validation.cache import SchemaCache, validate_record_data +from imednet.utils.security import validate_header_value +from imednet.validation.cache import SchemaCache, validate_record_entry class RecordsParamProcessor(ParamProcessor): @@ -68,12 +69,7 @@ def _validate_records_if_schema_present( """ if schema is not None: for rec in records_data: - fk = rec.get("formKey") or rec.get("form_key") - if not fk: - fid = rec.get("formId") or rec.get("form_id") or 0 - fk = schema.form_key_from_id(fid) - if fk: - validate_record_data(schema, fk, rec.get("data", {})) + validate_record_entry(schema, rec) def _build_headers(self, email_notify: Union[bool, str, None]) -> Dict[str, str]: """ @@ -91,9 +87,7 @@ def _build_headers(self, email_notify: Union[bool, str, None]) -> Dict[str, str] headers = {} if email_notify is not None: if isinstance(email_notify, str): - # Security: Prevent header injection via newlines - if "\n" in email_notify or "\r" in email_notify: - raise ValueError("email_notify must not contain newlines") + validate_header_value(email_notify) headers[HEADER_EMAIL_NOTIFY] = email_notify else: headers[HEADER_EMAIL_NOTIFY] = str(email_notify).lower() diff --git a/imednet/utils/security.py b/imednet/utils/security.py index f078b7d2..fd83d955 100644 --- a/imednet/utils/security.py +++ b/imednet/utils/security.py @@ -14,3 +14,17 @@ def sanitize_csv_formula(value: Any) -> Any: if isinstance(value, str) and value.lstrip().startswith(("=", "+", "-", "@")): return f"'{value}" return value + + +def validate_header_value(value: str) -> None: + """ + Validate that a header value does not contain newline characters. + + Args: + value: The header value to validate. + + Raises: + ValueError: If the value contains newline characters. + """ + if "\n" in value or "\r" in value: + raise ValueError(f"Header value must not contain newlines: {value!r}") diff --git a/imednet/validation/cache.py b/imednet/validation/cache.py index 15f6abca..7e83a59d 100644 --- a/imednet/validation/cache.py +++ b/imednet/validation/cache.py @@ -163,6 +163,31 @@ def validate_record_data( _check_type(variables[name].variable_type, value) +def validate_record_entry( + schema: BaseSchemaCache[Any], + record: Dict[str, Any], +) -> None: + """ + Validate a single record entry against the schema. + + Resolves the form key from "formKey", "form_key", "formId", or "form_id". + + Args: + schema: The schema cache to use for validation. + record: The record data dictionary. + + Raises: + ValidationError: If the form key is not present in the schema or the data + fails validation checks. + """ + fk = record.get("formKey") or record.get("form_key") + if not fk: + fid = record.get("formId") or record.get("form_id") or 0 + fk = schema.form_key_from_id(fid) + if fk: + validate_record_data(schema, fk, record.get("data", {})) + + class SchemaValidator(_ValidatorMixin): """Validate record payloads using variable metadata from the API.""" diff --git a/tests/unit/endpoints/test_records_endpoint.py b/tests/unit/endpoints/test_records_endpoint.py index 7f742d4b..e8f7d444 100644 --- a/tests/unit/endpoints/test_records_endpoint.py +++ b/tests/unit/endpoints/test_records_endpoint.py @@ -113,3 +113,9 @@ def test_create_validates_data_with_snake_case_keys(dummy_client, context, respo # Even if we use snake_case "form_key" with pytest.raises(ValidationError): ep.create("S1", [{"form_key": "F1", "data": {"bad": 1}}], schema=schema) + + +def test_create_raises_on_header_injection(dummy_client, context): + ep = records.RecordsEndpoint(dummy_client, context) + with pytest.raises(ValueError, match="Header value must not contain newlines"): + ep.create("S1", [{"data": {}}], email_notify="test\n@example.com")