Files
openapi-generator/modules/openapi-generator/src/main/resources/python-experimental/schemas.handlebars
Justin Black 3239f28e09 [python-experimental] code quality improvements (#13221)
* Changes http to https

* Refactors method to be simpler, removes unused json encoder

* Samples regen, test fixed, consolidated redundant return

* Samples regenerated

* Updates composed model property names

* Samples regenerated using composed schema name update

* Samples regen, removed unused arg in __get_oneof_class
2022-08-18 09:41:50 -07:00

2100 lines
79 KiB
Handlebars

# coding: utf-8
{{>partial_header}}
from collections import defaultdict
from datetime import date, datetime, timedelta # noqa: F401
import functools
import decimal
import io
import os
import re
import tempfile
import typing
import uuid
from dateutil.parser.isoparser import isoparser, _takes_ascii
from frozendict import frozendict
from {{packageName}}.exceptions import (
ApiTypeError,
ApiValueError,
)
from {{packageName}}.configuration import (
Configuration,
)
class Unset(object):
"""
An instance of this class is set as the default value for object type(dict) properties that are optional
When a property has an unset value, that property will not be assigned in the dict
"""
pass
unset = Unset()
none_type = type(None)
file_type = io.IOBase
class FileIO(io.FileIO):
"""
A class for storing files
Note: this class is not immutable
"""
def __new__(cls, arg: typing.Union[io.FileIO, io.BufferedReader]):
if isinstance(arg, (io.FileIO, io.BufferedReader)):
if arg.closed:
raise ApiValueError('Invalid file state; file is closed and must be open')
arg.close()
inst = super(FileIO, cls).__new__(cls, arg.name)
super(FileIO, inst).__init__(arg.name)
return inst
raise ApiValueError('FileIO must be passed arg which contains the open file')
def __init__(self, arg: typing.Union[io.FileIO, io.BufferedReader]):
pass
def update(d: dict, u: dict):
"""
Adds u to d
Where each dict is defaultdict(set)
"""
if not u:
return d
for k, v in u.items():
if not v:
continue
if k not in d:
d[k] = v
else:
d[k] = d[k] | v
class ValidationMetadata(frozendict):
"""
A class storing metadata that is needed to validate OpenApi Schema payloads
"""
def __new__(
cls,
path_to_item: typing.Tuple[typing.Union[str, int], ...] = tuple(['args[0]']),
from_server: bool = False,
configuration: typing.Optional[Configuration] = None,
seen_classes: typing.FrozenSet[typing.Type] = frozenset(),
validated_path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set['Schema']] = frozendict()
):
"""
Args:
path_to_item: the path to the current data being instantiated.
For {'a': [1]} if the code is handling, 1, then the path is ('args[0]', 'a', 0)
This changes from location to location
from_server: whether or not this data came form the server
True when receiving server data
False when instantiating model with client side data not form the server
This does not change from location to location
configuration: the Configuration instance to use
This is needed because in Configuration:
- one can disable validation checking
This does not change from location to location
seen_classes: when deserializing data that matches multiple schemas, this is used to store
the schemas that have been traversed. This is used to stop processing when a cycle is seen.
This changes from location to location
validated_path_to_schemas: stores the already validated schema classes for a given path location
This does not change from location to location
"""
return super().__new__(
cls,
path_to_item=path_to_item,
from_server=from_server,
configuration=configuration,
seen_classes=seen_classes,
validated_path_to_schemas=validated_path_to_schemas
)
def validation_ran_earlier(self, cls: type) -> bool:
validated_schemas = self.validated_path_to_schemas.get(self.path_to_item, set())
validation_ran_earlier = validated_schemas and cls in validated_schemas
if validation_ran_earlier:
return True
if cls in self.seen_classes:
return True
return False
@property
def path_to_item(self) -> typing.Tuple[typing.Union[str, int], ...]:
return self.get('path_to_item')
@property
def from_server(self) -> bool:
return self.get('from_server')
@property
def configuration(self) -> typing.Optional[Configuration]:
return self.get('configuration')
@property
def seen_classes(self) -> typing.FrozenSet[typing.Type]:
return self.get('seen_classes')
@property
def validated_path_to_schemas(self) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set['Schema']]:
return self.get('validated_path_to_schemas')
class ValidatorBase:
@staticmethod
def __is_json_validation_enabled(schema_keyword, configuration=None):
"""Returns true if JSON schema validation is enabled for the specified
validation keyword. This can be used to skip JSON schema structural validation
as requested in the configuration.
Args:
schema_keyword (string): the name of a JSON schema validation keyword.
configuration (Configuration): the configuration class.
"""
return (configuration is None or
not hasattr(configuration, '_disabled_client_side_validations') or
schema_keyword not in configuration._disabled_client_side_validations)
@staticmethod
def __raise_validation_error_message(value, constraint_msg, constraint_value, path_to_item, additional_txt=""):
raise ApiValueError(
"Invalid value `{value}`, {constraint_msg} `{constraint_value}`{additional_txt} at {path_to_item}".format(
value=value,
constraint_msg=constraint_msg,
constraint_value=constraint_value,
additional_txt=additional_txt,
path_to_item=path_to_item,
)
)
@classmethod
def __check_str_validations(cls,
validations, input_values,
validation_metadata: ValidationMetadata):
if (cls.__is_json_validation_enabled('maxLength', validation_metadata.configuration) and
'max_length' in validations and
len(input_values) > validations['max_length']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="length must be less than or equal to",
constraint_value=validations['max_length'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('minLength', validation_metadata.configuration) and
'min_length' in validations and
len(input_values) < validations['min_length']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="length must be greater than or equal to",
constraint_value=validations['min_length'],
path_to_item=validation_metadata.path_to_item
)
checked_value = input_values
if (cls.__is_json_validation_enabled('pattern', validation_metadata.configuration) and
'regex' in validations):
for regex_dict in validations['regex']:
flags = regex_dict.get('flags', 0)
if not re.search(regex_dict['pattern'], checked_value, flags=flags):
if flags != 0:
# Don't print the regex flags if the flags are not
# specified in the OAS document.
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="must match regular expression",
constraint_value=regex_dict['pattern'],
path_to_item=validation_metadata.path_to_item,
additional_txt=" with flags=`{}`".format(flags)
)
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="must match regular expression",
constraint_value=regex_dict['pattern'],
path_to_item=validation_metadata.path_to_item
)
@classmethod
def __check_tuple_validations(
cls, validations, input_values,
validation_metadata: ValidationMetadata):
if (cls.__is_json_validation_enabled('maxItems', validation_metadata.configuration) and
'max_items' in validations and
len(input_values) > validations['max_items']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="number of items must be less than or equal to",
constraint_value=validations['max_items'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('minItems', validation_metadata.configuration) and
'min_items' in validations and
len(input_values) < validations['min_items']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="number of items must be greater than or equal to",
constraint_value=validations['min_items'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('uniqueItems', validation_metadata.configuration) and
'unique_items' in validations and validations['unique_items'] and input_values):
unique_items = set(input_values)
if len(input_values) > len(unique_items):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="duplicate items were found, and the tuple must not contain duplicates because",
constraint_value='unique_items==True',
path_to_item=validation_metadata.path_to_item
)
@classmethod
def __check_dict_validations(
cls, validations, input_values,
validation_metadata: ValidationMetadata):
if (cls.__is_json_validation_enabled('maxProperties', validation_metadata.configuration) and
'max_properties' in validations and
len(input_values) > validations['max_properties']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="number of properties must be less than or equal to",
constraint_value=validations['max_properties'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('minProperties', validation_metadata.configuration) and
'min_properties' in validations and
len(input_values) < validations['min_properties']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="number of properties must be greater than or equal to",
constraint_value=validations['min_properties'],
path_to_item=validation_metadata.path_to_item
)
@classmethod
def __check_numeric_validations(
cls, validations, input_values,
validation_metadata: ValidationMetadata):
if cls.__is_json_validation_enabled('multipleOf',
validation_metadata.configuration) and 'multiple_of' in validations:
multiple_of_value = validations['multiple_of']
if (isinstance(input_values, decimal.Decimal) and
not (float(input_values) / multiple_of_value).is_integer()
):
# Note 'multipleOf' will be as good as the floating point arithmetic.
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="value must be a multiple of",
constraint_value=multiple_of_value,
path_to_item=validation_metadata.path_to_item
)
checking_max_or_min_values = {'exclusive_maximum', 'inclusive_maximum', 'exclusive_minimum',
'inclusive_minimum'}.isdisjoint(validations) is False
if not checking_max_or_min_values:
return
max_val = input_values
min_val = input_values
if (cls.__is_json_validation_enabled('exclusiveMaximum', validation_metadata.configuration) and
'exclusive_maximum' in validations and
max_val >= validations['exclusive_maximum']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="must be a value less than",
constraint_value=validations['exclusive_maximum'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('maximum', validation_metadata.configuration) and
'inclusive_maximum' in validations and
max_val > validations['inclusive_maximum']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="must be a value less than or equal to",
constraint_value=validations['inclusive_maximum'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('exclusiveMinimum', validation_metadata.configuration) and
'exclusive_minimum' in validations and
min_val <= validations['exclusive_minimum']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="must be a value greater than",
constraint_value=validations['exclusive_maximum'],
path_to_item=validation_metadata.path_to_item
)
if (cls.__is_json_validation_enabled('minimum', validation_metadata.configuration) and
'inclusive_minimum' in validations and
min_val < validations['inclusive_minimum']):
cls.__raise_validation_error_message(
value=input_values,
constraint_msg="must be a value greater than or equal to",
constraint_value=validations['inclusive_minimum'],
path_to_item=validation_metadata.path_to_item
)
@classmethod
def _check_validations_for_types(
cls,
validations,
input_values,
validation_metadata: ValidationMetadata
):
if isinstance(input_values, str):
cls.__check_str_validations(validations, input_values, validation_metadata)
elif isinstance(input_values, tuple):
cls.__check_tuple_validations(validations, input_values, validation_metadata)
elif isinstance(input_values, frozendict):
cls.__check_dict_validations(validations, input_values, validation_metadata)
elif isinstance(input_values, decimal.Decimal):
cls.__check_numeric_validations(validations, input_values, validation_metadata)
class Singleton:
"""
Enums and singletons are the same
The same instance is returned for a given key of (cls, arg)
"""
_instances = {}
def __new__(cls, arg: typing.Any, **kwargs):
"""
cls base classes: BoolClass, NoneClass, str, decimal.Decimal
The 3rd key is used in the tuple below for a corner case where an enum contains integer 1
However 1.0 can also be ingested into that enum schema because 1.0 == 1 and
Decimal('1.0') == Decimal('1')
But if we omitted the 3rd value in the key, then Decimal('1.0') would be stored as Decimal('1')
and json serializing that instance would be '1' rather than the expected '1.0'
Adding the 3rd value, the str of arg ensures that 1.0 -> Decimal('1.0') which is serialized as 1.0
"""
key = (cls, arg, str(arg))
if key not in cls._instances:
if isinstance(arg, (none_type, bool, BoolClass, NoneClass)):
inst = super().__new__(cls)
cls._instances[key] = inst
else:
cls._instances[key] = super().__new__(cls, arg)
return cls._instances[key]
def __repr__(self):
if isinstance(self, NoneClass):
return f'<{self.__class__.__name__}: None>'
elif isinstance(self, BoolClass):
if bool(self):
return f'<{self.__class__.__name__}: True>'
return f'<{self.__class__.__name__}: False>'
return f'<{self.__class__.__name__}: {super().__repr__()}>'
class NoneClass(Singleton):
@classmethod
@property
def NONE(cls):
return cls(None)
def __bool__(self) -> bool:
return False
class BoolClass(Singleton):
@classmethod
@property
def TRUE(cls):
return cls(True)
@classmethod
@property
def FALSE(cls):
return cls(False)
@functools.cache
def __bool__(self) -> bool:
for key, instance in self._instances.items():
if self is instance:
return bool(key[1])
raise ValueError('Unable to find the boolean value of this instance')
class Validator(typing.Protocol):
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]]:
pass
def _SchemaValidator(**validations: typing.Union[str, bool, None, int, float, list[dict[str, typing.Union[str, int, float]]]]) -> Validator:
class SchemaValidator(ValidatorBase):
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]]:
"""
SchemaValidator _validate
Validates that validations pass
"""
cls._check_validations_for_types(validations, arg, validation_metadata)
return super()._validate(arg, validation_metadata)
return SchemaValidator
def _SchemaTypeChecker(union_type_cls: typing.Union[typing.Any]) -> Validator:
if typing.get_origin(union_type_cls) is typing.Union:
union_classes = typing.get_args(union_type_cls)
else:
# note: when a union of a single class is passed in, the union disappears
union_classes = tuple([union_type_cls])
"""
I want the type hint... union_type_cls
and to use it as a base class but when I do, I get
TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
"""
class SchemaTypeChecker:
@staticmethod
def __get_valid_classes_phrase(input_classes):
"""Returns a string phrase describing what types are allowed"""
all_classes = list(input_classes)
all_classes = sorted(all_classes, key=lambda cls: cls.__name__)
all_class_names = [cls.__name__ for cls in all_classes]
if len(all_class_names) == 1:
return "is {0}".format(all_class_names[0])
return "is one of [{0}]".format(", ".join(all_class_names))
@classmethod
def __type_error_message(
cls, var_value=None, var_name=None, valid_classes=None, key_type=None
):
"""
Keyword Args:
var_value (any): the variable which has the type_error
var_name (str): the name of the variable which has the typ error
valid_classes (tuple): the accepted classes for current_item's
value
key_type (bool): False if our value is a value in a dict
True if it is a key in a dict
False if our item is an item in a tuple
"""
key_or_value = "value"
if key_type:
key_or_value = "key"
valid_classes_phrase = cls.__get_valid_classes_phrase(valid_classes)
msg = "Invalid type. Required {1} type {2} and " "passed type was {3}".format(
var_name,
key_or_value,
valid_classes_phrase,
type(var_value).__name__,
)
return msg
@classmethod
def _get_type_error(cls, var_value, path_to_item, valid_classes, key_type=False):
error_msg = cls.__type_error_message(
var_name=path_to_item[-1],
var_value=var_value,
valid_classes=valid_classes,
key_type=key_type,
)
return ApiTypeError(
error_msg,
path_to_item=path_to_item,
valid_classes=valid_classes,
key_type=key_type,
)
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]]:
"""
SchemaTypeChecker _validate
Validates arg's type
"""
arg_type = type(arg)
if arg_type in union_classes:
return super()._validate(arg, validation_metadata)
raise cls._get_type_error(
arg,
validation_metadata.path_to_item,
union_classes,
key_type=False,
)
return SchemaTypeChecker
class EnumMakerBase:
pass
class EnumMakerInterface(Validator):
@classmethod
@property
def _enum_value_to_name(
cls
) -> typing.Dict[typing.Union[str, decimal.Decimal, bool, none_type], str]:
pass
def _SchemaEnumMaker(enum_value_to_name: typing.Dict[typing.Union[str, decimal.Decimal, bool, none_type], str]) -> EnumMakerInterface:
class SchemaEnumMaker(EnumMakerBase):
@classmethod
@property
def _enum_value_to_name(
cls
) -> typing.Dict[typing.Union[str, decimal.Decimal, bool, none_type], str]:
pass
try:
super_enum_value_to_name = super()._enum_value_to_name
except AttributeError:
return enum_value_to_name
intersection = dict(enum_value_to_name.items() & super_enum_value_to_name.items())
return intersection
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]]:
"""
SchemaEnumMaker _validate
Validates that arg is in the enum's allowed values
"""
try:
cls._enum_value_to_name[arg]
except KeyError:
raise ApiValueError("Invalid value {} passed in to {}, {}".format(arg, cls, cls._enum_value_to_name))
return super()._validate(arg, validation_metadata)
return SchemaEnumMaker
class BoolBase:
def is_true(self) -> bool:
"""
A replacement for x is True
True if the instance is a BoolClass True Singleton
"""
if not issubclass(self.__class__, BoolClass):
return False
return bool(self)
def is_false(self) -> bool:
"""
A replacement for x is False
True if the instance is a BoolClass False Singleton
"""
if not issubclass(self.__class__, BoolClass):
return False
return bool(self) is False
class NoneBase:
def is_none(self) -> bool:
"""
A replacement for x is None
True if the instance is a NoneClass None Singleton
"""
if issubclass(self.__class__, NoneClass):
return True
return False
class StrBase:
@property
def as_str(self) -> str:
return self
@property
def as_date(self) -> date:
raise Exception('not implemented')
@property
def as_datetime(self) -> datetime:
raise Exception('not implemented')
@property
def as_decimal(self) -> decimal.Decimal:
raise Exception('not implemented')
@property
def as_uuid(self) -> uuid.UUID:
raise Exception('not implemented')
class UUIDBase(StrBase):
@property
@functools.cache
def as_uuid(self) -> uuid.UUID:
return uuid.UUID(self)
@classmethod
def _validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
uuid.UUID(arg)
return True
except ValueError:
raise ApiValueError(
"Invalid value '{}' for type UUID at {}".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate(
cls,
arg,
validation_metadata: typing.Optional[ValidationMetadata] = None,
):
"""
UUIDBase _validate
"""
cls._validate_format(arg, validation_metadata=validation_metadata)
return super()._validate(arg, validation_metadata=validation_metadata)
class CustomIsoparser(isoparser):
@_takes_ascii
def parse_isodatetime(self, dt_str):
components, pos = self._parse_isodate(dt_str)
if len(dt_str) > pos:
if self._sep is None or dt_str[pos:pos + 1] == self._sep:
components += self._parse_isotime(dt_str[pos + 1:])
else:
raise ValueError('String contains unknown ISO components')
if len(components) > 3 and components[3] == 24:
components[3] = 0
return datetime(*components) + timedelta(days=1)
if len(components) <= 3:
raise ValueError('Value is not a datetime')
return datetime(*components)
@_takes_ascii
def parse_isodate(self, datestr):
components, pos = self._parse_isodate(datestr)
if len(datestr) > pos:
raise ValueError('String contains invalid time components')
if len(components) > 3:
raise ValueError('String contains invalid time components')
return date(*components)
DEFAULT_ISOPARSER = CustomIsoparser()
class DateBase(StrBase):
@property
@functools.cache
def as_date(self) -> date:
return DEFAULT_ISOPARSER.parse_isodate(self)
@classmethod
def _validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
DEFAULT_ISOPARSER.parse_isodate(arg)
return True
except ValueError:
raise ApiValueError(
"Value does not conform to the required ISO-8601 date format. "
"Invalid value '{}' for type date at {}".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate(
cls,
arg,
validation_metadata: typing.Optional[ValidationMetadata] = None,
):
"""
DateBase _validate
"""
cls._validate_format(arg, validation_metadata=validation_metadata)
return super()._validate(arg, validation_metadata=validation_metadata)
class DateTimeBase:
@property
@functools.cache
def as_datetime(self) -> datetime:
return DEFAULT_ISOPARSER.parse_isodatetime(self)
@classmethod
def _validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
DEFAULT_ISOPARSER.parse_isodatetime(arg)
return True
except ValueError:
raise ApiValueError(
"Value does not conform to the required ISO-8601 datetime format. "
"Invalid value '{}' for type datetime at {}".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
):
"""
DateTimeBase _validate
"""
cls._validate_format(arg, validation_metadata=validation_metadata)
return super()._validate(arg, validation_metadata=validation_metadata)
class DecimalBase(StrBase):
"""
A class for storing decimals that are sent over the wire as strings
These schemas must remain based on StrBase rather than NumberBase
because picking base classes must be deterministic
"""
@property
@functools.cache
def as_decimal(self) -> decimal.Decimal:
return decimal.Decimal(self)
@classmethod
def _validate_format(cls, arg: typing.Optional[str], validation_metadata: ValidationMetadata):
if isinstance(arg, str):
try:
decimal.Decimal(arg)
return True
except decimal.InvalidOperation:
raise ApiValueError(
"Value cannot be converted to a decimal. "
"Invalid value '{}' for type decimal at {}".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
):
"""
DecimalBase _validate
"""
cls._validate_format(arg, validation_metadata=validation_metadata)
return super()._validate(arg, validation_metadata=validation_metadata)
class NumberBase:
@property
def as_int(self) -> int:
try:
return self._as_int
except AttributeError:
"""
Note: for some numbers like 9.0 they could be represented as an
integer but our code chooses to store them as
>>> Decimal('9.0').as_tuple()
DecimalTuple(sign=0, digits=(9, 0), exponent=-1)
so we can tell that the value came from a float and convert it back to a float
during later serialization
"""
if self.as_tuple().exponent < 0:
# this could be represented as an integer but should be represented as a float
# because that's what it was serialized from
raise ApiValueError(f'{self} is not an integer')
self._as_int = int(self)
return self._as_int
@property
def as_float(self) -> float:
try:
return self._as_float
except AttributeError:
if self.as_tuple().exponent >= 0:
raise ApiValueError(f'{self} is not an float')
self._as_float = float(self)
return self._as_float
class ListBase:
@classmethod
def _validate_items(cls, list_items, validation_metadata: ValidationMetadata):
"""
Ensures that:
- values passed in for items are valid
Exceptions will be raised if:
- invalid arguments were passed in
Args:
list_items: the input list of items
Raises:
ApiTypeError - for missing required arguments, or for invalid properties
"""
# if we have definitions for an items schema, use it
# otherwise accept anything
item_cls = getattr(cls, '_items', AnyTypeSchema)
path_to_schemas = {}
for i, value in enumerate(list_items):
item_validation_metadata = ValidationMetadata(
from_server=validation_metadata.from_server,
configuration=validation_metadata.configuration,
path_to_item=validation_metadata.path_to_item+(i,),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if item_validation_metadata.validation_ran_earlier(item_cls):
continue
other_path_to_schemas = item_cls._validate(
value, validation_metadata=item_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
):
"""
ListBase _validate
We return dynamic classes of different bases depending upon the inputs
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Returns:
new_cls (type): the new class
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
"""
_path_to_schemas = super()._validate(arg, validation_metadata=validation_metadata)
if not isinstance(arg, tuple):
return _path_to_schemas
updated_vm = ValidationMetadata(
configuration=validation_metadata.configuration,
from_server=validation_metadata.from_server,
path_to_item=validation_metadata.path_to_item,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
other_path_to_schemas = cls._validate_items(arg, validation_metadata=updated_vm)
update(_path_to_schemas, other_path_to_schemas)
return _path_to_schemas
@classmethod
def _get_items(
cls: 'Schema',
arg: typing.List[typing.Any],
path_to_item: typing.Tuple[typing.Union[str, int], ...],
path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]
):
'''
ListBase _get_items
'''
list_items = arg
cast_items = []
# if we have definitions for an items schema, use it
# otherwise accept anything
cls_item_cls = getattr(cls, '_items', AnyTypeSchema)
for i, value in enumerate(list_items):
item_path_to_item = path_to_item + (i,)
item_cls = path_to_schemas.get(item_path_to_item)
if item_cls is None:
item_cls = cls_item_cls
if isinstance(value, item_cls):
cast_items.append(value)
continue
new_value = item_cls._get_new_instance_without_conversion(
value,
item_path_to_item,
path_to_schemas
)
cast_items.append(new_value)
return cast_items
class Discriminable:
@classmethod
def _ensure_discriminator_value_present(cls, disc_property_name: str, validation_metadata: ValidationMetadata, *args):
if not args or args and disc_property_name not in args[0]:
# The input data does not contain the discriminator property
raise ApiValueError(
"Cannot deserialize input data due to missing discriminator. "
"The discriminator property '{}' is missing at path: {}".format(disc_property_name, validation_metadata.path_to_item)
)
@classmethod
def _get_discriminated_class(cls, disc_property_name: str, disc_payload_value: str):
"""
Used in schemas with discriminators
"""
if not hasattr(cls, '_discriminator'):
return None
disc = cls._discriminator
if disc_property_name not in disc:
return None
discriminated_cls = disc[disc_property_name].get(disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
elif not hasattr(cls, '_composed_schemas'):
return None
# TODO stop traveling if a cycle is hit
for allof_cls in cls._composed_schemas['allOf']:
discriminated_cls = allof_cls._get_discriminated_class(
disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
for oneof_cls in cls._composed_schemas['oneOf']:
discriminated_cls = oneof_cls._get_discriminated_class(
disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
for anyof_cls in cls._composed_schemas['anyOf']:
discriminated_cls = anyof_cls._get_discriminated_class(
disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
return None
class DictBase(Discriminable):
# subclass properties
_required_property_names = set()
@classmethod
def _validate_arg_presence(cls, arg):
"""
Ensures that:
- all required arguments are passed in
- the input variable names are valid
- present in properties or
- accepted because additionalProperties exists
Exceptions will be raised if:
- invalid arguments were passed in
- a var_name is invalid if additionProperties == None and var_name not in _properties
- required properties were not passed in
Args:
arg: the input dict
Raises:
ApiTypeError - for missing required arguments, or for invalid properties
"""
seen_required_properties = set()
invalid_arguments = []
for property_name in arg:
if property_name in cls._required_property_names:
seen_required_properties.add(property_name)
elif property_name in cls._property_names:
continue
elif cls._additional_properties:
continue
else:
invalid_arguments.append(property_name)
missing_required_arguments = list(cls._required_property_names - seen_required_properties)
if missing_required_arguments:
missing_required_arguments.sort()
raise ApiTypeError(
"{} is missing {} required argument{}: {}".format(
cls.__name__,
len(missing_required_arguments),
"s" if len(missing_required_arguments) > 1 else "",
missing_required_arguments
)
)
if invalid_arguments:
invalid_arguments.sort()
raise ApiTypeError(
"{} was passed {} invalid argument{}: {}".format(
cls.__name__,
len(invalid_arguments),
"s" if len(invalid_arguments) > 1 else "",
invalid_arguments
)
)
@classmethod
def _validate_args(cls, arg, validation_metadata: ValidationMetadata):
"""
Ensures that:
- values passed in for properties are valid
Exceptions will be raised if:
- invalid arguments were passed in
Args:
arg: the input dict
Raises:
ApiTypeError - for missing required arguments, or for invalid properties
"""
path_to_schemas = {}
for property_name, value in arg.items():
if property_name in cls._property_names:
schema = getattr(cls, property_name)
elif cls._additional_properties:
schema = cls._additional_properties
else:
raise ApiTypeError('Unable to find schema for value={} in class={} at path_to_item={}'.format(
value, cls, validation_metadata.path_to_item+(property_name,)
))
arg_validation_metadata = ValidationMetadata(
from_server=validation_metadata.from_server,
configuration=validation_metadata.configuration,
path_to_item=validation_metadata.path_to_item+(property_name,),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if arg_validation_metadata.validation_ran_earlier(schema):
continue
other_path_to_schemas = schema._validate(value, validation_metadata=arg_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
):
"""
DictBase _validate
We return dynamic classes of different bases depending upon the inputs
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Returns:
new_cls (type): the new class
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
"""
_path_to_schemas = super()._validate(arg, validation_metadata=validation_metadata)
if not isinstance(arg, frozendict):
return _path_to_schemas
cls._validate_arg_presence(arg)
other_path_to_schemas = cls._validate_args(arg, validation_metadata=validation_metadata)
update(_path_to_schemas, other_path_to_schemas)
try:
_discriminator = cls._discriminator
except AttributeError:
return _path_to_schemas
# discriminator exists
disc_prop_name = list(_discriminator.keys())[0]
cls._ensure_discriminator_value_present(disc_prop_name, validation_metadata, arg)
discriminated_cls = cls._get_discriminated_class(
disc_property_name=disc_prop_name, disc_payload_value=arg[disc_prop_name])
if discriminated_cls is None:
raise ApiValueError(
"Invalid discriminator value was passed in to {}.{} Only the values {} are allowed at {}".format(
cls.__name__,
disc_prop_name,
list(_discriminator[disc_prop_name].keys()),
validation_metadata.path_to_item + (disc_prop_name,)
)
)
updated_vm = ValidationMetadata(
configuration=validation_metadata.configuration,
from_server=validation_metadata.from_server,
path_to_item=validation_metadata.path_to_item,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if updated_vm.validation_ran_earlier(discriminated_cls):
return _path_to_schemas
other_path_to_schemas = discriminated_cls._validate(arg, validation_metadata=updated_vm)
update(_path_to_schemas, other_path_to_schemas)
return _path_to_schemas
@classmethod
@property
def _additional_properties(cls):
return AnyTypeSchema
@classmethod
@property
@functools.cache
def _property_names(cls):
property_names = set()
for var_name, var_value in cls.__dict__.items():
# referenced models are classmethods
is_classmethod = type(var_value) is classmethod
if is_classmethod:
property_names.add(var_name)
continue
is_class = type(var_value) is type
if not is_class:
continue
if not issubclass(var_value, Schema):
continue
if var_name == '_additional_properties':
continue
property_names.add(var_name)
property_names = list(property_names)
property_names.sort()
return tuple(property_names)
@classmethod
def _get_properties(
cls,
arg: typing.Dict[str, typing.Any],
path_to_item: typing.Tuple[typing.Union[str, int], ...],
path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]
):
"""
DictBase _get_properties, this is how properties are set
These values already passed validation
"""
dict_items = {}
# if we have definitions for property schemas convert values using it
# otherwise accept anything
for property_name_js, value in arg.items():
property_cls = getattr(cls, property_name_js, cls._additional_properties)
property_path_to_item = path_to_item + (property_name_js,)
stored_property_cls = path_to_schemas.get(property_path_to_item)
if stored_property_cls:
property_cls = stored_property_cls
if isinstance(value, property_cls):
dict_items[property_name_js] = value
continue
new_value = property_cls._get_new_instance_without_conversion(
value,
property_path_to_item,
path_to_schemas
)
dict_items[property_name_js] = new_value
return dict_items
def __setattr__(self, name, value):
if not isinstance(self, FileIO):
raise AttributeError('property setting not supported on immutable instances')
def __getattr__(self, name):
if isinstance(self, frozendict):
# if an attribute does not exist
try:
return self[name]
except KeyError as ex:
raise AttributeError(str(ex))
return super().__getattr__(self, name)
def __getattribute__(self, name):
# if an attribute does exist (for example as a class property but not as an instance method)
try:
return self[name]
except (KeyError, TypeError):
return super().__getattribute__(name)
inheritable_primitive_types_set = {decimal.Decimal, str, tuple, frozendict, FileIO, bytes, BoolClass, NoneClass}
class Schema:
"""
the base class of all swagger/openapi schemas/models
ensures that:
- payload passes required validations
- payload is of allowed types
- payload value is an allowed enum value
"""
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]]:
"""
Schema _validate
Runs all schema validation logic and
returns a dynamic class of different bases depending upon the input
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Use cases:
1. inheritable type: string/decimal.Decimal/frozendict/tuple
2. singletons: bool/None -> uses the base classes BoolClass/NoneClass
Required Steps:
1. verify type of input is valid vs the allowed _types
2. check validations that are applicable for this type of input
3. if enums exist, check that the value exists in the enum
Returns:
path_to_schemas: a map of path to schemas
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
"""
base_class = type(arg)
path_to_schemas = {validation_metadata.path_to_item: set()}
path_to_schemas[validation_metadata.path_to_item].add(cls)
path_to_schemas[validation_metadata.path_to_item].add(base_class)
return path_to_schemas
@staticmethod
def __process_schema_classes(
schema_classes: typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]
):
"""
Processes and mutates schema_classes
If a SomeSchema is a subclass of DictSchema then remove DictSchema because it is already included
"""
if len(schema_classes) < 2:
return
x_schema = schema_type_classes & schema_classes
if not x_schema:
return
x_schema = x_schema.pop()
if any(c is not x_schema and issubclass(c, x_schema) for c in schema_classes):
# needed to not have a mro error in get_new_class
schema_classes.remove(x_schema)
@classmethod
def __get_new_cls(
cls,
arg,
validation_metadata: ValidationMetadata
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], 'Schema']:
"""
Make a new dynamic class and return an instance of that class
We are making an instance of cls, but instead of making cls
make a new class, new_cls
which includes dynamic bases including cls
return an instance of that new class
Dict property + List Item Assignment Use cases:
1. value is NOT an instance of the required schema class
the value is validated by _validate
_validate returns a key value pair
where the key is the path to the item, and the value will be the required manufactured class
made out of the matching schemas
2. value is an instance of the the correct schema type
the value is NOT validated by _validate, _validate only checks that the instance is of the correct schema type
for this value, _validate does NOT return an entry for it in _path_to_schemas
and in list/dict _get_items,_get_properties the value will be directly assigned
because value is of the correct type, and validation was run earlier when the instance was created
"""
_path_to_schemas = {}
if validation_metadata.validated_path_to_schemas:
update(_path_to_schemas, validation_metadata.validated_path_to_schemas)
if not validation_metadata.validation_ran_earlier(cls):
other_path_to_schemas = cls._validate(arg, validation_metadata=validation_metadata)
update(_path_to_schemas, other_path_to_schemas)
# loop through it make a new class for each entry
# do not modify the returned result because it is cached and we would be modifying the cached value
path_to_schemas = {}
for path, schema_classes in _path_to_schemas.items():
"""
Use cases
1. N number of schema classes + enum + type != bool/None, classes in path_to_schemas: tuple/frozendict/str/Decimal/bytes/FileIo
needs Singleton added
2. N number of schema classes + enum + type == bool/None, classes in path_to_schemas: BoolClass/NoneClass
Singleton already added
3. N number of schema classes, classes in path_to_schemas: BoolClass/NoneClass/tuple/frozendict/str/Decimal/bytes/FileIo
"""
cls.__process_schema_classes(schema_classes)
enum_schema = any(
hasattr(this_cls, '_enum_value_to_name') for this_cls in schema_classes)
inheritable_primitive_type = schema_classes.intersection(inheritable_primitive_types_set)
chosen_schema_classes = schema_classes - inheritable_primitive_type
suffix = tuple(inheritable_primitive_type)
if enum_schema and suffix[0] not in {NoneClass, BoolClass}:
suffix = (Singleton,) + suffix
used_classes = tuple(sorted(chosen_schema_classes, key=lambda a_cls: a_cls.__name__)) + suffix
mfg_cls = get_new_class(class_name='DynamicSchema', bases=used_classes)
path_to_schemas[path] = mfg_cls
return path_to_schemas
@classmethod
def _get_new_instance_without_conversion(
cls: 'Schema',
arg: typing.Any,
path_to_item: typing.Tuple[typing.Union[str, int], ...],
path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Type['Schema']]
):
# We have a Dynamic class and we are making an instance of it
if issubclass(cls, frozendict):
properties = cls._get_properties(arg, path_to_item, path_to_schemas)
return super(Schema, cls).__new__(cls, properties)
elif issubclass(cls, tuple):
items = cls._get_items(arg, path_to_item, path_to_schemas)
return super(Schema, cls).__new__(cls, items)
"""
str = openapi str, date, and datetime
decimal.Decimal = openapi int and float
FileIO = openapi binary type and the user inputs a file
bytes = openapi binary type and the user inputs bytes
"""
return super(Schema, cls).__new__(cls, arg)
@classmethod
def _from_openapi_data(
cls,
arg: typing.Union[
str,
date,
datetime,
int,
float,
decimal.Decimal,
bool,
None,
'Schema',
dict,
frozendict,
tuple,
list,
io.FileIO,
io.BufferedReader,
bytes
],
_configuration: typing.Optional[Configuration]
):
"""
Schema _from_openapi_data
"""
from_server = True
validated_path_to_schemas = {}
arg = cast_to_allowed_types(arg, from_server, validated_path_to_schemas)
validation_metadata = ValidationMetadata(
from_server=from_server, configuration=_configuration, validated_path_to_schemas=validated_path_to_schemas)
path_to_schemas = cls.__get_new_cls(arg, validation_metadata)
new_cls = path_to_schemas[validation_metadata.path_to_item]
new_inst = new_cls._get_new_instance_without_conversion(
arg,
validation_metadata.path_to_item,
path_to_schemas
)
return new_inst
@staticmethod
def __get_input_dict(*args, **kwargs) -> frozendict:
input_dict = {}
if args and isinstance(args[0], (dict, frozendict)):
input_dict.update(args[0])
if kwargs:
input_dict.update(kwargs)
return frozendict(input_dict)
@staticmethod
def __remove_unsets(kwargs):
return {key: val for key, val in kwargs.items() if val is not unset}
def __new__(cls, *args: typing.Union[dict, frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema'], _configuration: typing.Optional[Configuration] = None, **kwargs: typing.Union[dict, frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema', Unset]):
"""
Schema __new__
Args:
args (int/float/decimal.Decimal/str/list/tuple/dict/frozendict/bool/None): the value
kwargs (str, int/float/decimal.Decimal/str/list/tuple/dict/frozendict/bool/None): dict values
_configuration: contains the Configuration that enables json schema validation keywords
like minItems, minLength etc
"""
kwargs = cls.__remove_unsets(kwargs)
if not args and not kwargs:
raise TypeError(
'No input given. args or kwargs must be given.'
)
if not kwargs and args and not isinstance(args[0], dict):
arg = args[0]
else:
arg = cls.__get_input_dict(*args, **kwargs)
from_server = False
validated_path_to_schemas = {}
arg = cast_to_allowed_types(
arg, from_server, validated_path_to_schemas)
validation_metadata = ValidationMetadata(
configuration=_configuration, from_server=from_server, validated_path_to_schemas=validated_path_to_schemas)
path_to_schemas = cls.__get_new_cls(arg, validation_metadata)
new_cls = path_to_schemas[validation_metadata.path_to_item]
return new_cls._get_new_instance_without_conversion(
arg,
validation_metadata.path_to_item,
path_to_schemas
)
def __init__(
self,
*args: typing.Union[
dict, frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema'],
_configuration: typing.Optional[Configuration] = None,
**kwargs: typing.Union[
dict, frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, 'Schema', Unset
]
):
"""
this is needed to fix 'Unexpected argument' warning in pycharm
this code does nothing because all Schema instances are immutable
this means that all input data is passed into and used in new, and after the new instance is made
no new attributes are assigned and init is not used
"""
pass
def cast_to_allowed_types(
arg: typing.Union[str, date, datetime, uuid.UUID, decimal.Decimal, int, float, None, dict, frozendict, list, tuple, bytes, Schema],
from_server: bool,
validated_path_to_schemas: typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]],
path_to_item: typing.Tuple[typing.Union[str, int], ...] = tuple(['args[0]']),
) -> typing.Union[frozendict, tuple, decimal.Decimal, str, bytes, BoolClass, NoneClass, FileIO]:
"""
Casts the input payload arg into the allowed types
The input validated_path_to_schemas is mutated by running this function
When from_server is False then
- date/datetime is cast to str
- int/float is cast to Decimal
If a Schema instance is passed in it is converted back to a primitive instance because
One may need to validate that data to the original Schema class AND additional different classes
those additional classes will need to be added to the new manufactured class for that payload
If the code didn't do this and kept the payload as a Schema instance it would fail to validate to other
Schema classes and the code wouldn't be able to mfg a new class that includes all valid schemas
TODO: store the validated schema classes in validation_metadata
Args:
arg: the payload
from_server: whether this payload came from the server or not
validated_path_to_schemas: a dict that stores the validated classes at any path location in the payload
"""
if isinstance(arg, Schema):
# store the already run validations
schema_classes = set()
for cls in arg.__class__.__bases__:
if cls is Singleton:
continue
schema_classes.add(cls)
validated_path_to_schemas[path_to_item] = schema_classes
if isinstance(arg, str):
return str(arg)
elif isinstance(arg, (dict, frozendict)):
return frozendict({key: cast_to_allowed_types(val, from_server, validated_path_to_schemas, path_to_item + (key,)) for key, val in arg.items()})
elif isinstance(arg, (bool, BoolClass)):
"""
this check must come before isinstance(arg, (int, float))
because isinstance(True, int) is True
"""
if arg:
return BoolClass.TRUE
return BoolClass.FALSE
elif isinstance(arg, int):
return decimal.Decimal(arg)
elif isinstance(arg, float):
decimal_from_float = decimal.Decimal(arg)
if decimal_from_float.as_integer_ratio()[1] == 1:
# 9.0 -> Decimal('9.0')
# 3.4028234663852886e+38 -> Decimal('340282346638528859811704183484516925440.0')
return decimal.Decimal(str(decimal_from_float)+'.0')
return decimal_from_float
elif isinstance(arg, (tuple, list)):
return tuple([cast_to_allowed_types(item, from_server, validated_path_to_schemas, path_to_item + (i,)) for i, item in enumerate(arg)])
elif isinstance(arg, (none_type, NoneClass)):
return NoneClass.NONE
elif isinstance(arg, (date, datetime)):
if not from_server:
return arg.isoformat()
# ApiTypeError will be thrown later by _validate_type
return arg
elif isinstance(arg, uuid.UUID):
if not from_server:
return str(arg)
# ApiTypeError will be thrown later by _validate_type
return arg
elif isinstance(arg, decimal.Decimal):
return decimal.Decimal(arg)
elif isinstance(arg, bytes):
return bytes(arg)
elif isinstance(arg, (io.FileIO, io.BufferedReader)):
return FileIO(arg)
raise ValueError('Invalid type passed in got input={} type={}'.format(arg, type(arg)))
class ComposedBase(Discriminable):
@classmethod
def __get_allof_classes(cls, arg, validation_metadata: ValidationMetadata):
path_to_schemas = defaultdict(set)
for allof_cls in cls._composed_schemas['allOf']:
if validation_metadata.validation_ran_earlier(allof_cls):
continue
other_path_to_schemas = allof_cls._validate(arg, validation_metadata=validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
@classmethod
def __get_oneof_class(
cls,
arg,
discriminated_cls,
validation_metadata: ValidationMetadata,
):
oneof_classes = []
path_to_schemas = defaultdict(set)
for oneof_cls in cls._composed_schemas['oneOf']:
if oneof_cls in path_to_schemas[validation_metadata.path_to_item]:
oneof_classes.append(oneof_cls)
continue
if validation_metadata.validation_ran_earlier(oneof_cls):
oneof_classes.append(oneof_cls)
continue
try:
path_to_schemas = oneof_cls._validate(arg, validation_metadata=validation_metadata)
except (ApiValueError, ApiTypeError) as ex:
if discriminated_cls is not None and oneof_cls is discriminated_cls:
raise ex
continue
oneof_classes.append(oneof_cls)
if not oneof_classes:
raise ApiValueError(
"Invalid inputs given to generate an instance of {}. None "
"of the oneOf schemas matched the input data.".format(cls)
)
elif len(oneof_classes) > 1:
raise ApiValueError(
"Invalid inputs given to generate an instance of {}. Multiple "
"oneOf schemas {} matched the inputs, but a max of one is allowed.".format(cls, oneof_classes)
)
return path_to_schemas
@classmethod
def __get_anyof_classes(
cls,
arg,
discriminated_cls,
validation_metadata: ValidationMetadata
):
anyof_classes = []
path_to_schemas = defaultdict(set)
for anyof_cls in cls._composed_schemas['anyOf']:
if validation_metadata.validation_ran_earlier(anyof_cls):
anyof_classes.append(anyof_cls)
continue
try:
other_path_to_schemas = anyof_cls._validate(arg, validation_metadata=validation_metadata)
except (ApiValueError, ApiTypeError) as ex:
if discriminated_cls is not None and anyof_cls is discriminated_cls:
raise ex
continue
anyof_classes.append(anyof_cls)
update(path_to_schemas, other_path_to_schemas)
if not anyof_classes:
raise ApiValueError(
"Invalid inputs given to generate an instance of {}. None "
"of the anyOf schemas matched the input data.".format(cls)
)
return path_to_schemas
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> typing.Dict[typing.Tuple[typing.Union[str, int], ...], typing.Set[typing.Union['Schema', str, decimal.Decimal, BoolClass, NoneClass, frozendict, tuple]]]:
"""
ComposedBase _validate
We return dynamic classes of different bases depending upon the inputs
This makes it so:
- the returned instance is always a subclass of our defining schema
- this allows us to check type based on whether an instance is a subclass of a schema
- the returned instance is a serializable type (except for None, True, and False) which are enums
Returns:
new_cls (type): the new class
Raises:
ApiValueError: when a string can't be converted into a date or datetime and it must be one of those classes
ApiTypeError: when the input type is not in the list of allowed spec types
"""
# validation checking on types, validations, and enums
path_to_schemas = super()._validate(arg, validation_metadata=validation_metadata)
updated_vm = ValidationMetadata(
configuration=validation_metadata.configuration,
from_server=validation_metadata.from_server,
path_to_item=validation_metadata.path_to_item,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
# process composed schema
_discriminator = getattr(cls, '_discriminator', None)
discriminated_cls = None
if _discriminator and arg and isinstance(arg, frozendict):
disc_property_name = list(_discriminator.keys())[0]
cls._ensure_discriminator_value_present(disc_property_name, updated_vm, arg)
# get discriminated_cls by looking at the dict in the current class
discriminated_cls = cls._get_discriminated_class(
disc_property_name=disc_property_name, disc_payload_value=arg[disc_property_name])
if discriminated_cls is None:
raise ApiValueError(
"Invalid discriminator value '{}' was passed in to {}.{} Only the values {} are allowed at {}".format(
arg[disc_property_name],
cls.__name__,
disc_property_name,
list(_discriminator[disc_property_name].keys()),
updated_vm.path_to_item + (disc_property_name,)
)
)
if cls._composed_schemas['allOf']:
other_path_to_schemas = cls.__get_allof_classes(arg, validation_metadata=updated_vm)
update(path_to_schemas, other_path_to_schemas)
if cls._composed_schemas['oneOf']:
other_path_to_schemas = cls.__get_oneof_class(
arg,
discriminated_cls=discriminated_cls,
validation_metadata=updated_vm
)
update(path_to_schemas, other_path_to_schemas)
if cls._composed_schemas['anyOf']:
other_path_to_schemas = cls.__get_anyof_classes(
arg,
discriminated_cls=discriminated_cls,
validation_metadata=updated_vm
)
update(path_to_schemas, other_path_to_schemas)
not_cls = cls._composed_schemas['not']
if not_cls:
other_path_to_schemas = None
not_exception = ApiValueError(
"Invalid value '{}' was passed in to {}. Value is invalid because it is disallowed by {}".format(
arg,
cls.__name__,
not_cls.__name__,
)
)
if updated_vm.validation_ran_earlier(not_cls):
raise not_exception
try:
other_path_to_schemas = not_cls._validate(arg, validation_metadata=updated_vm)
except (ApiValueError, ApiTypeError):
pass
if other_path_to_schemas:
raise not_exception
if discriminated_cls is not None and not updated_vm.validation_ran_earlier(discriminated_cls):
# TODO use an exception from this package here
assert discriminated_cls in path_to_schemas[updated_vm.path_to_item]
return path_to_schemas
# DictBase, ListBase, NumberBase, StrBase, BoolBase, NoneBase
class ComposedSchema(
_SchemaTypeChecker(typing.Union[NoneClass, str, decimal.Decimal, BoolClass, tuple, frozendict]),
ComposedBase,
DictBase,
ListBase,
NumberBase,
StrBase,
BoolBase,
NoneBase,
Schema
):
# subclass properties
_composed_schemas = {}
@classmethod
def _from_openapi_data(cls, *args: typing.Any, _configuration: typing.Optional[Configuration] = None, **kwargs):
if not args:
if not kwargs:
raise ApiTypeError('{} is missing required input data in args or kwargs'.format(cls.__name__))
args = (kwargs, )
return super()._from_openapi_data(args[0], _configuration=_configuration)
class ListSchema(
_SchemaTypeChecker(typing.Union[tuple]),
ListBase,
Schema
):
@classmethod
def _from_openapi_data(cls, arg: typing.List[typing.Any], _configuration: typing.Optional[Configuration] = None):
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, arg: typing.Union[list, tuple], **kwargs: ValidationMetadata):
return super().__new__(cls, arg, **kwargs)
class NoneSchema(
_SchemaTypeChecker(typing.Union[NoneClass]),
NoneBase,
Schema
):
@classmethod
def _from_openapi_data(cls, arg: None, _configuration: typing.Optional[Configuration] = None):
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, arg: None, **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class NumberSchema(
_SchemaTypeChecker(typing.Union[decimal.Decimal]),
NumberBase,
Schema
):
"""
This is used for type: number with no format
Both integers AND floats are accepted
"""
@classmethod
def _from_openapi_data(cls, arg: typing.Union[int, float, decimal.Decimal], _configuration: typing.Optional[Configuration] = None):
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, arg: typing.Union[decimal.Decimal, int, float], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class IntBase(NumberBase):
@property
def as_int(self) -> int:
try:
return self._as_int
except AttributeError:
self._as_int = int(self)
return self._as_int
@classmethod
def _validate_format(cls, arg: typing.Optional[decimal.Decimal], validation_metadata: ValidationMetadata):
if isinstance(arg, decimal.Decimal):
denominator = arg.as_integer_ratio()[-1]
if denominator != 1:
raise ApiValueError(
"Invalid value '{}' for type integer at {}".format(arg, validation_metadata.path_to_item)
)
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
):
"""
IntBase _validate
TODO what about types = (int, number) -> IntBase, NumberBase? We could drop int and keep number only
"""
cls._validate_format(arg, validation_metadata=validation_metadata)
return super()._validate(arg, validation_metadata=validation_metadata)
class IntSchema(IntBase, NumberSchema):
@classmethod
def _from_openapi_data(cls, arg: int, _configuration: typing.Optional[Configuration] = None):
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, arg: typing.Union[decimal.Decimal, int], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class Int32Base(
_SchemaValidator(
inclusive_minimum=decimal.Decimal(-2147483648),
inclusive_maximum=decimal.Decimal(2147483647)
),
):
pass
class Int32Schema(
Int32Base,
IntSchema
):
pass
class Int64Base(
_SchemaValidator(
inclusive_minimum=decimal.Decimal(-9223372036854775808),
inclusive_maximum=decimal.Decimal(9223372036854775807)
),
):
pass
class Int64Schema(
Int64Base,
IntSchema
):
pass
class Float32Base(
_SchemaValidator(
inclusive_minimum=decimal.Decimal(-3.4028234663852886e+38),
inclusive_maximum=decimal.Decimal(3.4028234663852886e+38)
),
):
pass
class Float32Schema(
Float32Base,
NumberSchema
):
@classmethod
def _from_openapi_data(cls, arg: typing.Union[float, decimal.Decimal], _configuration: typing.Optional[Configuration] = None):
# todo check format
return super()._from_openapi_data(arg, _configuration=_configuration)
class Float64Base(
_SchemaValidator(
inclusive_minimum=decimal.Decimal(-1.7976931348623157E+308),
inclusive_maximum=decimal.Decimal(1.7976931348623157E+308)
),
):
pass
class Float64Schema(
Float64Base,
NumberSchema
):
@classmethod
def _from_openapi_data(cls, arg: typing.Union[float, decimal.Decimal], _configuration: typing.Optional[Configuration] = None):
# todo check format
return super()._from_openapi_data(arg, _configuration=_configuration)
class StrSchema(
_SchemaTypeChecker(typing.Union[str]),
StrBase,
Schema
):
"""
date + datetime string types must inherit from this class
That is because one can validate a str payload as both:
- type: string (format unset)
- type: string, format: date
"""
@classmethod
def _from_openapi_data(cls, arg: typing.Union[str], _configuration: typing.Optional[Configuration] = None) -> 'StrSchema':
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, arg: typing.Union[str, date, datetime, uuid.UUID], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class UUIDSchema(UUIDBase, StrSchema):
def __new__(cls, arg: typing.Union[str, uuid.UUID], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class DateSchema(DateBase, StrSchema):
def __new__(cls, arg: typing.Union[str, datetime], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class DateTimeSchema(DateTimeBase, StrSchema):
def __new__(cls, arg: typing.Union[str, datetime], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class DecimalSchema(DecimalBase, StrSchema):
def __new__(cls, arg: typing.Union[str], **kwargs: typing.Union[ValidationMetadata]):
"""
Note: Decimals may not be passed in because cast_to_allowed_types is only invoked once for payloads
which can be simple (str) or complex (dicts or lists with nested values)
Because casting is only done once and recursively casts all values prior to validation then for a potential
client side Decimal input if Decimal was accepted as an input in DecimalSchema then one would not know
if one was using it for a StrSchema (where it should be cast to str) or one is using it for NumberSchema
where it should stay as Decimal.
"""
return super().__new__(cls, arg, **kwargs)
class BytesSchema(
_SchemaTypeChecker(typing.Union[bytes]),
Schema,
):
"""
this class will subclass bytes and is immutable
"""
def __new__(cls, arg: typing.Union[bytes], **kwargs: typing.Union[ValidationMetadata]):
return super(Schema, cls).__new__(cls, arg)
class FileSchema(
_SchemaTypeChecker(typing.Union[FileIO]),
Schema,
):
"""
This class is NOT immutable
Dynamic classes are built using it for example when AnyType allows in binary data
Al other schema classes ARE immutable
If one wanted to make this immutable one could make this a DictSchema with required properties:
- data = BytesSchema (which would be an immutable bytes based schema)
- file_name = StrSchema
and cast_to_allowed_types would convert bytes and file instances into dicts containing data + file_name
The downside would be that data would be stored in memory which one may not want to do for very large files
The developer is responsible for closing this file and deleting it
This class was kept as mutable:
- to allow file reading and writing to disk
- to be able to preserve file name info
"""
def __new__(cls, arg: typing.Union[io.FileIO, io.BufferedReader], **kwargs: typing.Union[ValidationMetadata]):
return super(Schema, cls).__new__(cls, arg)
class BinaryBase:
pass
class BinarySchema(
_SchemaTypeChecker(typing.Union[bytes, FileIO]),
ComposedBase,
BinaryBase,
Schema,
):
@classmethod
@property
def _composed_schemas(cls):
# we need this here to make our import statements work
# we must store _composed_schemas in here so the code is only run
# when we invoke this method. If we kept this at the class
# level we would get an error because the class level
# code would be run when this module is imported, and these composed
# classes don't exist yet because their module has not finished
# loading
return {
'allOf': [],
'oneOf': [
BytesSchema,
FileSchema,
],
'anyOf': [
],
'not': None
}
def __new__(cls, arg: typing.Union[io.FileIO, io.BufferedReader, bytes], **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg)
class BoolSchema(
_SchemaTypeChecker(typing.Union[BoolClass]),
BoolBase,
Schema
):
@classmethod
def _from_openapi_data(cls, arg: bool, _configuration: typing.Optional[Configuration] = None):
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, arg: bool, **kwargs: typing.Union[ValidationMetadata]):
return super().__new__(cls, arg, **kwargs)
class AnyTypeSchema(
_SchemaTypeChecker(
typing.Union[frozendict, tuple, decimal.Decimal, str, BoolClass, NoneClass, bytes, FileIO]
),
DictBase,
ListBase,
NumberBase,
StrBase,
BoolBase,
NoneBase,
Schema
):
pass
class DictSchema(
_SchemaTypeChecker(typing.Union[frozendict]),
DictBase,
Schema
):
@classmethod
def _from_openapi_data(cls, arg: typing.Dict[str, typing.Any], _configuration: typing.Optional[Configuration] = None):
return super()._from_openapi_data(arg, _configuration=_configuration)
def __new__(cls, *args: typing.Union[dict, frozendict], **kwargs: typing.Union[dict, frozendict, list, tuple, decimal.Decimal, float, int, str, date, datetime, bool, None, bytes, Schema, Unset, ValidationMetadata]):
return super().__new__(cls, *args, **kwargs)
schema_type_classes = set([NoneSchema, DictSchema, ListSchema, NumberSchema, StrSchema, BoolSchema])
@functools.cache
def get_new_class(
class_name: str,
bases: typing.Tuple[typing.Type[typing.Union[Schema, typing.Any]], ...]
) -> typing.Type[Schema]:
"""
Returns a new class that is made with the subclass bases
"""
return type(class_name, bases, {})
LOG_CACHE_USAGE = False
def log_cache_usage(cache_fn):
if LOG_CACHE_USAGE:
print(cache_fn.__name__, cache_fn.cache_info())