# -*- coding: utf-8 -*- import functools import re from datetime import datetime, timedelta from typing import Any, Callable, Dict, Iterable, List, Optional, Union from uuid import UUID from voluptuous import Schema, Url, MultipleInvalid from primordial.constants import CANONICAL_TIMESTRING_FORMAT ValidatorType = Optional[Union[Iterable[Callable], Callable]] DateTimeType = Union[datetime, str] class BadParameterException(Exception): """To be raised when a validation operation fails.""" def validate(validator: Callable, param_value: Any, coerce_: bool = False) -> Any: """Run a validation operation. Validate a particular parameter with a particular validator and possibly coerce the value into the validator's return type. :param validator: The validator to be run :param param_value: The value to be validated :param coerce_: Whether to return a type coerced value :raises ValueError: If the parameter could not be validated. """ if callable(validator): # it's a callable try: myval = validator(param_value) except Exception as e: raise BadParameterException("Parameter %s failed validation (%s)" % (param_value, e)) from e if coerce_ is True: ret = myval else: ret = param_value return ret raise ValueError("Cannot use a non-callable as a parameter validator: %s" % validator) def validate_param(param_validator: ValidatorType, param_value: Any, coerce_: bool = False) -> Any: """Validate a parameter. :param param_validator: The validator (or list of validators) to be run :param param_value: The value to be validated :param coerce_: Whether to return a type coerced value :raises ValueError: If the parameter could not be validated. """ if param_validator is not None: # Exclusion below is due to Pylint bug https://github.com/PyCQA/pylint/issues/3507 if isinstance(param_validator, Iterable): # pylint: disable=isinstance-second-argument-not-valid-type for validator in param_validator: if validator is None: # maybe this is a bad semantic choice, but, unlike a bare None as # a validator not in a list context, None here doesn't mean skip with # no validation, but instead means that the value can be the value None # itself. The reason I think this is OK is that it's nonsense to have # a list of validators which includes the global None validator since # that would be formally equivalent to just using a bare None -- EA if param_value is None: return param_value # otherwise we keep searching the list try: retval = validate(validator, param_value, coerce_=coerce_) # take first non-excepting value return retval except Exception as _: # pylint: disable=W0703 pass raise ValueError("No validator in list validated %s (%s)" % (param_value, param_validator)) return validate(param_validator, param_value, coerce_=coerce_) return param_value URL_SCHEMA = Schema(Url()) # pylint: disable=E1120 def url(val: str) -> str: """Validate that a string looks like a URL. url is intended to be used like str or int to be a basic callable that will except on type mismatch or non- coercible value. :param val: The value to be checked :raises ValueError: If the value does not look like a URL. """ # this will raise a voluptuous MultipleInvalid error if it fails to validate try: URL_SCHEMA(val) except MultipleInvalid as e: # use a normal valueerror externally raise ValueError("Not a url: %s (%s)" % (val, e)) from e # return uncoerced value return val def parseable_datetime(val: str) -> bool: """Validate that we can parse a datetime from a string. Catch exception and return false if strptime doesn't work. :param val: The value to be checked """ try: _ = datetime.strptime(val, CANONICAL_TIMESTRING_FORMAT) return True except Exception: # pylint: disable=W0703 return False def is_datetime(val: DateTimeType) -> DateTimeType: """Validate that a value represents a datetime. :param val: The value to be checked :raises ValueError: If the value does not represent a datetime. """ if isinstance(val, datetime) or parseable_datetime(val): return val raise ValueError("Not a datetime: %s" % val) def is_timedelta(val: timedelta) -> timedelta: """Validate that a value is a timedelta. :param val: The value to be checked :raises ValueError: If the value is not a timedelta. """ if isinstance(val, timedelta): return val raise ValueError("Not a timedelta: %s" % val) def is_non_empty_dict(val: Dict) -> Dict: """Validate that a value is a non-empty dictionary. :param val: The value to be checked :raises ValueError: If the value is not a dictionary, or is empty. """ if isinstance(val, dict) and val != {}: return val raise ValueError("Not a non-empty dict: %s" % val) POSTGRES_NAME_REGEX = re.compile(r'^[a-z_][a-z0-9_]*$') def is_postgres_name(val: str) -> str: """Validate that argument is a valid Postgres identifier. :param val: The value to be checked :raises ValueError: If the value is not a valid Postgres identifier. """ if POSTGRES_NAME_REGEX.match(val): return val raise ValueError("Not a valid Postgres name (%s): %s" % (POSTGRES_NAME_REGEX.pattern, val)) def internal_validate_positionals(positional_args: List[Any], positional_arg_values: List[Any], coerce_: bool = False) -> List[Any]: """Validate a list of positional arguments. If we run out of stated positionals, we simply dump the originals unvalidated (we're saying the validators are optional) :param positional_args: The validators to be run, in order, against the values :param positional_arg_values: The values to be validated :param coerce_: Whether to return type coerced values """ outargs = [] for index, myvalue in enumerate(positional_arg_values): try: validator = positional_args[index] except IndexError as _: validator = None if validator is not None: myval = validate_param(validator, myvalue, coerce_=coerce_) else: myval = myvalue outargs.append(myval) return outargs def internal_validate_keywords(keyword_args: Dict[str, Any], keyword_arg_values: Dict[str, Any], coerce_: bool = False) -> Dict[str, Any]: """Validate a dictionary of keyword arguments. If there is no matching validator for a particular keyword, then the original value is passed along in the output unvalidated. :param keyword_args: The validators to be run against the values :param keyword_arg_values: The values to be validated :param coerce_: Whether to return type coerced values """ outdict = {} for key, value in keyword_arg_values.items(): try: validator = keyword_args[key] except KeyError as _: validator = None if validator is not None: myval = validate_param(validator, value, coerce_=coerce_) else: myval = value outdict[key] = myval return outdict def coerce_or_error( positionals: Optional[List[Any]] = None, keywords: Optional[Dict[str, Any]] = None, coerce_: bool = False) -> Callable: """Either coerce the arguments in the suggested way or die with error back to the client. :param positionals: A list of validators to be run against positional arguments :param keywords: A dictionary of validators to be run against keyword arguments :param coerce_: Whether to return type coerced values """ def decorator(function): """Inner""" @functools.wraps(function) def wrapper(*args, **kwargs): """Actual wrapper""" try: if positionals is not None: outargs = internal_validate_positionals(positionals, args, coerce_=coerce_) else: outargs = args if keywords is not None: outkwargs = internal_validate_keywords(keywords, kwargs, coerce_=coerce_) else: outkwargs = kwargs except BadParameterException as e: raise ValueError("Failed to validate: %s, %s" % (str(e), str(e))) from e return function(*outargs, **outkwargs) return wrapper return decorator def is_uuid(uuid_maybe: str) -> bool: """Validate that a value represents a UUID. :param uuid_maybe: The value to be checked """ try: UUID(uuid_maybe) return True except ValueError: return False