Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
981 changes: 38 additions & 943 deletions schema/__init__.py

Large diffs are not rendered by default.

409 changes: 409 additions & 0 deletions schema/_schema_core.py

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions schema/_schema_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Schema exception classes."""

from typing import Iterable, List, Set, Union


class SchemaError(Exception):
"""Error during Schema validation."""

def __init__(
self,
autos: Union[Iterable[Union[str, None]], None],
errors: Union[List, str, None] = None,
) -> None:
self.autos = autos if isinstance(autos, List) else [autos]
self.errors = errors if isinstance(errors, List) else [errors]
Exception.__init__(self, self.code)

@property
def code(self) -> str:
"""Remove duplicates in autos and errors list and combine them into a single message."""

def uniq(seq: Iterable[Union[str, None]]) -> List[str]:
"""Utility function to remove duplicates while preserving the order."""
seen: Set[str] = set()
unique_list: List[str] = []
for x in seq:
if x is not None and x not in seen:
seen.add(x)
unique_list.append(x)
return unique_list

data_set = uniq(self.autos)
error_list = uniq(self.errors)

return "\n".join(error_list if error_list else data_set)


class SchemaWrongKeyError(SchemaError):
"""Error Should be raised when an unexpected key is detected within the
data set being."""

pass


class SchemaMissingKeyError(SchemaError):
"""Error should be raised when a mandatory key is not found within the
data set being validated"""

pass


class SchemaOnlyOneAllowedError(SchemaError):
"""Error should be raised when an only_one Or key has multiple matching candidates"""

pass


class SchemaForbiddenKeyError(SchemaError):
"""Error should be raised when a forbidden key is found within the
data set being validated, and its value matches the value that was specified"""

pass


class SchemaUnexpectedTypeError(SchemaError):
"""Error should be raised when a type mismatch is detected within the
data set being validated."""

pass
266 changes: 266 additions & 0 deletions schema/_schema_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
"""JSON Schema generation logic."""

import re
from typing import TYPE_CHECKING, Any, Dict, List, Type, Union, cast

from schema._schema_utils import COMPARABLE, DICT, ITERABLE, TYPE, VALIDATOR, _priority
from schema._schema_validators import Literal
from schema._schema_core import Optional

if TYPE_CHECKING:
from schema._schema_core import Schema


def generate_json_schema(
schema: "Schema",
schema_id: str,
use_refs: bool = False,
**kwargs: Any,
) -> Dict[str, Any]:
"""Generate a draft-07 JSON schema dict representing the Schema.

:param schema: The Schema instance to convert
:param schema_id: The value of the $id on the main schema
:param use_refs: Enable reusing object references in the resulting JSON schema.
:param kwargs: Additional keyword arguments for default value evaluation
"""
from schema._schema_core import Schema
from schema._schema_validators import Or, And
from schema._schema_utils import _invoke_with_optional_kwargs

seen: Dict[int, Dict[str, Any]] = {}
definitions_by_name: Dict[str, Dict[str, Any]] = {}

def _json_schema(
schema_obj: Schema,
is_main_schema: bool = True,
title: Union[str, None] = None,
description: Union[str, None] = None,
allow_reference: bool = True,
) -> Dict[str, Any]:
def _create_or_use_ref(return_dict: Dict[str, Any]) -> Dict[str, Any]:
if not use_refs or is_main_schema:
return return_schema

hashed = hash(repr(sorted(return_dict.items())))
if hashed not in seen:
seen[hashed] = return_dict
return return_dict
else:
id_str = "#" + str(hashed)
seen[hashed]["$id"] = id_str
return {"$ref": id_str}

def _get_type_name(python_type: Type) -> str:
if python_type == str:
return "string"
elif python_type == int:
return "integer"
elif python_type == float:
return "number"
elif python_type == bool:
return "boolean"
elif python_type == list:
return "array"
elif python_type == dict:
return "object"
return "string"

def _to_json_type(value: Any) -> Any:
if value is None or type(value) in (str, int, float, bool, list, dict):
return value

if type(value) in (tuple, set, frozenset):
return list(value)

if isinstance(value, Literal):
return value.schema

return str(value)

def _to_schema(s: Any, ignore_extra_keys: bool) -> Schema:
if not isinstance(s, Schema):
return Schema(s, ignore_extra_keys=ignore_extra_keys)
return s

s: Any = schema_obj.schema
i: bool = schema_obj.ignore_extra_keys
flavor = _priority(s)

return_schema: Dict[str, Any] = {}

return_description: Union[str, None] = description or schema_obj.description
if return_description:
return_schema["description"] = return_description
if title:
return_schema["title"] = title

if allow_reference and schema_obj.as_reference:
if schema_obj.name not in definitions_by_name:
definitions_by_name[cast(str, schema_obj.name)] = {}
definitions_by_name[cast(str, schema_obj.name)] = _json_schema(
schema_obj, is_main_schema=False, allow_reference=False
)

return_schema["$ref"] = "#/definitions/" + cast(str, schema_obj.name)
else:
if schema_obj.name and not title:
return_schema["title"] = schema_obj.name

if flavor == TYPE:
return_schema["type"] = _get_type_name(s)
elif flavor == ITERABLE:
return_schema["type"] = "array"
if len(s) == 1:
return_schema["items"] = _json_schema(
_to_schema(s[0], i), is_main_schema=False
)
elif len(s) > 1:
return_schema["items"] = _json_schema(
Schema(Or(*s)), is_main_schema=False
)
elif isinstance(s, Or):
if all(
priority == COMPARABLE
for priority in [_priority(value) for value in s.args]
):
or_values = [
str(val) if isinstance(val, Literal) else val for val in s.args
]
if len(or_values) == 1:
or_value = or_values[0]
if or_value is None:
return_schema["type"] = "null"
else:
return_schema["const"] = _to_json_type(or_value)
return return_schema
return_schema["enum"] = or_values
else:
any_of_values: List[Dict[str, Any]] = []
for or_key in s.args:
new_value = _json_schema(
_to_schema(or_key, i), is_main_schema=False
)
if new_value != {} and new_value not in any_of_values:
any_of_values.append(new_value)
if len(any_of_values) == 1:
return_schema.update(any_of_values[0])
else:
return_schema["anyOf"] = any_of_values
elif isinstance(s, And):
all_of_values: List[Dict[str, Any]] = []
for and_key in s.args:
new_value = _json_schema(
_to_schema(and_key, i), is_main_schema=False
)
if new_value != {} and new_value not in all_of_values:
all_of_values.append(new_value)
if len(all_of_values) == 1:
return_schema.update(all_of_values[0])
else:
return_schema["allOf"] = all_of_values
elif flavor == COMPARABLE:
if s is None:
return_schema["type"] = "null"
else:
return_schema["const"] = _to_json_type(s)
elif flavor == VALIDATOR and type(s).__name__ == "Regex":
return_schema["type"] = "string"
return_schema["pattern"] = re.sub(
r"\(\?P<[a-z\d_]+>", "(", s.pattern_str
).replace("/", r"\/")
else:
if flavor != DICT:
return return_schema

required_keys: List[str] = []
expanded_schema: Dict[str, Any] = {}
additional_properties = i

def _key_allows_additional_properties(key: Any) -> bool:
if isinstance(key, Optional):
return _key_allows_additional_properties(key.schema)
return key == str or key == object

def _get_key_title(key: Any) -> Union[str, None]:
if isinstance(key, Optional):
return _get_key_title(key.schema)
if isinstance(key, Literal):
return key.title
return None

def _get_key_description(key: Any) -> Union[str, None]:
if isinstance(key, Optional):
return _get_key_description(key.schema)
if isinstance(key, Literal):
return key.description
return None

def _get_key_name(key: Any) -> Any:
if isinstance(key, Optional):
return _get_key_name(key.schema)
if isinstance(key, Literal):
return key.schema
return key

for key in s:
if hasattr(key, "handler"):
continue

additional_properties = (
additional_properties
or _key_allows_additional_properties(key)
)
sub_schema = _to_schema(s[key], ignore_extra_keys=i)
key_name = _get_key_name(key)

if isinstance(key_name, str):
if not isinstance(key, Optional):
required_keys.append(key_name)
expanded_schema[key_name] = _json_schema(
sub_schema,
is_main_schema=False,
title=_get_key_title(key),
description=_get_key_description(key),
)
if isinstance(key, Optional) and hasattr(key, "default"):
expanded_schema[key_name]["default"] = _to_json_type(
_invoke_with_optional_kwargs(key.default, **kwargs)
if callable(key.default)
else key.default
)
elif isinstance(key_name, Or):
for or_key in key_name.args:
expanded_schema[_get_key_name(or_key)] = _json_schema(
sub_schema,
is_main_schema=False,
description=_get_key_description(or_key),
)

return_schema.update(
{
"type": "object",
"properties": expanded_schema,
"required": required_keys,
"additionalProperties": additional_properties,
}
)

if is_main_schema:
return_schema.update(
{
"$id": schema_id,
"$schema": "http://json-schema.org/draft-07/schema#",
}
)
if schema_obj.name:
return_schema["title"] = schema_obj.name

if definitions_by_name:
return_schema["definitions"] = {}
for definition_name, definition in definitions_by_name.items():
return_schema["definitions"][definition_name] = definition

return _create_or_use_ref(return_schema)

return _json_schema(schema, True)
Loading