PK!Y __init__.pynu[""" Cross-specification, implementation-agnostic JSON referencing. """ from referencing._core import Anchor, Registry, Resource, Specification __all__ = ["Anchor", "Registry", "Resource", "Specification"] PK!$F _attrs.pynu[from __future__ import annotations from typing import NoReturn, TypeVar from attrs import define as _define, frozen as _frozen _T = TypeVar("_T") def define(cls: type[_T]) -> type[_T]: # pragma: no cover cls.__init_subclass__ = _do_not_subclass return _define(cls) def frozen(cls: type[_T]) -> type[_T]: cls.__init_subclass__ = _do_not_subclass return _frozen(cls) class UnsupportedSubclassing(Exception): def __str__(self): return ( "Subclassing is not part of referencing's public API. " "If no other suitable API exists for what you're trying to do, " "feel free to file an issue asking for one." ) @staticmethod def _do_not_subclass() -> NoReturn: # pragma: no cover raise UnsupportedSubclassing() PK!A%[// _attrs.pyinu[from typing import Any, Callable, TypeVar, Union from attr import attrib, field class UnsupportedSubclassing(Exception): ... _T = TypeVar("_T") def __dataclass_transform__( *, frozen_default: bool = False, field_descriptors: tuple[Union[type, Callable[..., Any]], ...] = ..., ) -> Callable[[_T], _T]: ... @__dataclass_transform__(field_descriptors=(attrib, field)) def define(cls: type[_T]) -> type[_T]: ... @__dataclass_transform__( frozen_default=True, field_descriptors=(attrib, field), ) def frozen(cls: type[_T]) -> type[_T]: ... PK!#``_core.pynu[from __future__ import annotations from collections.abc import Iterable, Iterator, Sequence from enum import Enum from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar from urllib.parse import unquote, urldefrag, urljoin from attrs import evolve, field from rpds import HashTrieMap, HashTrieSet, List from referencing import exceptions from referencing._attrs import frozen from referencing.typing import URI, Anchor as AnchorType, D, Mapping, Retrieve EMPTY_UNCRAWLED: HashTrieSet[URI] = HashTrieSet() EMPTY_PREVIOUS_RESOLVERS: List[URI] = List() class _Unset(Enum): """ What sillyness... """ SENTINEL = 1 _UNSET = _Unset.SENTINEL class _MaybeInSubresource(Protocol[D]): def __call__( self, segments: Sequence[int | str], resolver: Resolver[D], subresource: Resource[D], ) -> Resolver[D]: ... def _detect_or_error(contents: D) -> Specification[D]: if not isinstance(contents, Mapping): raise exceptions.CannotDetermineSpecification(contents) jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType] if not isinstance(jsonschema_dialect_id, str): raise exceptions.CannotDetermineSpecification(contents) from referencing.jsonschema import specification_with return specification_with(jsonschema_dialect_id) def _detect_or_default( default: Specification[D], ) -> Callable[[D], Specification[D]]: def _detect(contents: D) -> Specification[D]: if not isinstance(contents, Mapping): return default jsonschema_dialect_id = contents.get("$schema") # type: ignore[reportUnknownMemberType] if jsonschema_dialect_id is None: return default from referencing.jsonschema import specification_with return specification_with( jsonschema_dialect_id, # type: ignore[reportUnknownArgumentType] default=default, ) return _detect class _SpecificationDetector: def __get__( self, instance: Specification[D] | None, cls: type[Specification[D]], ) -> Callable[[D], Specification[D]]: if instance is None: return _detect_or_error else: return _detect_or_default(instance) @frozen class Specification(Generic[D]): """ A specification which defines referencing behavior. The various methods of a `Specification` allow for varying referencing behavior across JSON Schema specification versions, etc. """ #: A short human-readable name for the specification, used for debugging. name: str #: Find the ID of a given document. id_of: Callable[[D], URI | None] #: Retrieve the subresources of the given document (without traversing into #: the subresources themselves). subresources_of: Callable[[D], Iterable[D]] #: While resolving a JSON pointer, conditionally enter a subresource #: (if e.g. we have just entered a keyword whose value is a subresource) maybe_in_subresource: _MaybeInSubresource[D] #: Retrieve the anchors contained in the given document. _anchors_in: Callable[ [Specification[D], D], Iterable[AnchorType[D]], ] = field(alias="anchors_in") #: An opaque specification where resources have no subresources #: nor internal identifiers. OPAQUE: ClassVar[Specification[Any]] #: Attempt to discern which specification applies to the given contents. #: #: May be called either as an instance method or as a class method, with #: slightly different behavior in the following case: #: #: Recall that not all contents contains enough internal information about #: which specification it is written for -- the JSON Schema ``{}``, #: for instance, is valid under many different dialects and may be #: interpreted as any one of them. #: #: When this method is used as an instance method (i.e. called on a #: specific specification), that specification is used as the default #: if the given contents are unidentifiable. #: #: On the other hand when called as a class method, an error is raised. #: #: To reiterate, ``DRAFT202012.detect({})`` will return ``DRAFT202012`` #: whereas the class method ``Specification.detect({})`` will raise an #: error. #: #: (Note that of course ``DRAFT202012.detect(...)`` may return some other #: specification when given a schema which *does* identify as being for #: another version). #: #: Raises: #: #: `CannotDetermineSpecification` #: #: if the given contents don't have any discernible #: information which could be used to guess which #: specification they identify as detect = _SpecificationDetector() def __repr__(self) -> str: return f"" def anchors_in(self, contents: D): """ Retrieve the anchors contained in the given document. """ return self._anchors_in(self, contents) def create_resource(self, contents: D) -> Resource[D]: """ Create a resource which is interpreted using this specification. """ return Resource(contents=contents, specification=self) Specification.OPAQUE = Specification( name="opaque", id_of=lambda contents: None, subresources_of=lambda contents: [], anchors_in=lambda specification, contents: [], maybe_in_subresource=lambda segments, resolver, subresource: resolver, ) @frozen class Resource(Generic[D]): r""" A document (deserialized JSON) with a concrete interpretation under a spec. In other words, a Python object, along with an instance of `Specification` which describes how the document interacts with referencing -- both internally (how it refers to other `Resource`\ s) and externally (how it should be identified such that it is referenceable by other documents). """ contents: D _specification: Specification[D] = field(alias="specification") @classmethod def from_contents( cls, contents: D, default_specification: ( type[Specification[D]] | Specification[D] ) = Specification, ) -> Resource[D]: """ Create a resource guessing which specification applies to the contents. Raises: `CannotDetermineSpecification` if the given contents don't have any discernible information which could be used to guess which specification they identify as """ specification = default_specification.detect(contents) return specification.create_resource(contents=contents) @classmethod def opaque(cls, contents: D) -> Resource[D]: """ Create an opaque `Resource` -- i.e. one with opaque specification. See `Specification.OPAQUE` for details. """ return Specification.OPAQUE.create_resource(contents=contents) def id(self) -> URI | None: """ Retrieve this resource's (specification-specific) identifier. """ id = self._specification.id_of(self.contents) if id is None: return return id.rstrip("#") def subresources(self) -> Iterable[Resource[D]]: """ Retrieve this resource's subresources. """ return ( Resource.from_contents( each, default_specification=self._specification, ) for each in self._specification.subresources_of(self.contents) ) def anchors(self) -> Iterable[AnchorType[D]]: """ Retrieve this resource's (specification-specific) identifier. """ return self._specification.anchors_in(self.contents) def pointer(self, pointer: str, resolver: Resolver[D]) -> Resolved[D]: """ Resolve the given JSON pointer. Raises: `exceptions.PointerToNowhere` if the pointer points to a location not present in the document """ if not pointer: return Resolved(contents=self.contents, resolver=resolver) contents = self.contents segments: list[int | str] = [] for segment in unquote(pointer[1:]).split("/"): if isinstance(contents, Sequence): segment = int(segment) else: segment = segment.replace("~1", "/").replace("~0", "~") try: contents = contents[segment] # type: ignore[reportUnknownArgumentType] except LookupError as lookup_error: error = exceptions.PointerToNowhere(ref=pointer, resource=self) raise error from lookup_error segments.append(segment) last = resolver resolver = self._specification.maybe_in_subresource( segments=segments, resolver=resolver, subresource=self._specification.create_resource(contents), ) if resolver is not last: segments = [] return Resolved(contents=contents, resolver=resolver) # type: ignore[reportUnknownArgumentType] def _fail_to_retrieve(uri: URI): raise exceptions.NoSuchResource(ref=uri) @frozen class Registry(Mapping[URI, Resource[D]]): r""" A registry of `Resource`\ s, each identified by their canonical URIs. Registries store a collection of in-memory resources, and optionally enable additional resources which may be stored elsewhere (e.g. in a database, a separate set of files, over the network, etc.). They also lazily walk their known resources, looking for subresources within them. In other words, subresources contained within any added resources will be retrievable via their own IDs (though this discovery of subresources will be delayed until necessary). Registries are immutable, and their methods return new instances of the registry with the additional resources added to them. The ``retrieve`` argument can be used to configure retrieval of resources dynamically, either over the network, from a database, or the like. Pass it a callable which will be called if any URI not present in the registry is accessed. It must either return a `Resource` or else raise a `NoSuchResource` exception indicating that the resource does not exist even according to the retrieval logic. """ _resources: HashTrieMap[URI, Resource[D]] = field( default=HashTrieMap(), converter=HashTrieMap.convert, # type: ignore[reportGeneralTypeIssues] alias="resources", ) _anchors: HashTrieMap[tuple[URI, str], AnchorType[D]] = HashTrieMap() _uncrawled: HashTrieSet[URI] = EMPTY_UNCRAWLED _retrieve: Retrieve[D] = field(default=_fail_to_retrieve, alias="retrieve") def __getitem__(self, uri: URI) -> Resource[D]: """ Return the (already crawled) `Resource` identified by the given URI. """ try: return self._resources[uri.rstrip("#")] except KeyError: raise exceptions.NoSuchResource(ref=uri) from None def __iter__(self) -> Iterator[URI]: """ Iterate over all crawled URIs in the registry. """ return iter(self._resources) def __len__(self) -> int: """ Count the total number of fully crawled resources in this registry. """ return len(self._resources) def __rmatmul__( self, new: Resource[D] | Iterable[Resource[D]], ) -> Registry[D]: """ Create a new registry with resource(s) added using their internal IDs. Resources must have a internal IDs (e.g. the :kw:`$id` keyword in modern JSON Schema versions), otherwise an error will be raised. Both a single resource as well as an iterable of resources works, i.e.: * ``resource @ registry`` or * ``[iterable, of, multiple, resources] @ registry`` which -- again, assuming the resources have internal IDs -- is equivalent to calling `Registry.with_resources` as such: .. code:: python registry.with_resources( (resource.id(), resource) for resource in new_resources ) Raises: `NoInternalID` if the resource(s) in fact do not have IDs """ if isinstance(new, Resource): new = (new,) resources = self._resources uncrawled = self._uncrawled for resource in new: id = resource.id() if id is None: raise exceptions.NoInternalID(resource=resource) uncrawled = uncrawled.insert(id) resources = resources.insert(id, resource) return evolve(self, resources=resources, uncrawled=uncrawled) def __repr__(self) -> str: size = len(self) pluralized = "resource" if size == 1 else "resources" if self._uncrawled: uncrawled = len(self._uncrawled) if uncrawled == size: summary = f"uncrawled {pluralized}" else: summary = f"{pluralized}, {uncrawled} uncrawled" else: summary = f"{pluralized}" return f"" def get_or_retrieve(self, uri: URI) -> Retrieved[D, Resource[D]]: """ Get a resource from the registry, crawling or retrieving if necessary. May involve crawling to find the given URI if it is not already known, so the returned object is a `Retrieved` object which contains both the resource value as well as the registry which ultimately contained it. """ resource = self._resources.get(uri) if resource is not None: return Retrieved(registry=self, value=resource) registry = self.crawl() resource = registry._resources.get(uri) if resource is not None: return Retrieved(registry=registry, value=resource) try: resource = registry._retrieve(uri) except ( exceptions.CannotDetermineSpecification, exceptions.NoSuchResource, ): raise except Exception as error: raise exceptions.Unretrievable(ref=uri) from error else: registry = registry.with_resource(uri, resource) return Retrieved(registry=registry, value=resource) def remove(self, uri: URI): """ Return a registry with the resource identified by a given URI removed. """ if uri not in self._resources: raise exceptions.NoSuchResource(ref=uri) return evolve( self, resources=self._resources.remove(uri), uncrawled=self._uncrawled.discard(uri), anchors=HashTrieMap( (k, v) for k, v in self._anchors.items() if k[0] != uri ), ) def anchor(self, uri: URI, name: str): """ Retrieve a given anchor from a resource which must already be crawled. """ value = self._anchors.get((uri, name)) if value is not None: return Retrieved(value=value, registry=self) registry = self.crawl() value = registry._anchors.get((uri, name)) if value is not None: return Retrieved(value=value, registry=registry) resource = self[uri] canonical_uri = resource.id() if canonical_uri is not None: value = registry._anchors.get((canonical_uri, name)) if value is not None: return Retrieved(value=value, registry=registry) if "/" in name: raise exceptions.InvalidAnchor( ref=uri, resource=resource, anchor=name, ) raise exceptions.NoSuchAnchor(ref=uri, resource=resource, anchor=name) def contents(self, uri: URI) -> D: """ Retrieve the (already crawled) contents identified by the given URI. """ return self[uri].contents def crawl(self) -> Registry[D]: """ Crawl all added resources, discovering subresources. """ resources = self._resources anchors = self._anchors uncrawled = [(uri, resources[uri]) for uri in self._uncrawled] while uncrawled: uri, resource = uncrawled.pop() id = resource.id() if id is not None: uri = urljoin(uri, id) resources = resources.insert(uri, resource) for each in resource.anchors(): anchors = anchors.insert((uri, each.name), each) uncrawled.extend((uri, each) for each in resource.subresources()) return evolve( self, resources=resources, anchors=anchors, uncrawled=EMPTY_UNCRAWLED, ) def with_resource(self, uri: URI, resource: Resource[D]): """ Add the given `Resource` to the registry, without crawling it. """ return self.with_resources([(uri, resource)]) def with_resources( self, pairs: Iterable[tuple[URI, Resource[D]]], ) -> Registry[D]: r""" Add the given `Resource`\ s to the registry, without crawling them. """ resources = self._resources uncrawled = self._uncrawled for uri, resource in pairs: # Empty fragment URIs are equivalent to URIs without the fragment. # TODO: Is this true for non JSON Schema resources? Probably not. uri = uri.rstrip("#") uncrawled = uncrawled.insert(uri) resources = resources.insert(uri, resource) return evolve(self, resources=resources, uncrawled=uncrawled) def with_contents( self, pairs: Iterable[tuple[URI, D]], **kwargs: Any, ) -> Registry[D]: r""" Add the given contents to the registry, autodetecting when necessary. """ return self.with_resources( (uri, Resource.from_contents(each, **kwargs)) for uri, each in pairs ) def combine(self, *registries: Registry[D]) -> Registry[D]: """ Combine together one or more other registries, producing a unified one. """ if registries == (self,): return self resources = self._resources anchors = self._anchors uncrawled = self._uncrawled retrieve = self._retrieve for registry in registries: resources = resources.update(registry._resources) anchors = anchors.update(registry._anchors) uncrawled = uncrawled.update(registry._uncrawled) if registry._retrieve is not _fail_to_retrieve: if registry._retrieve is not retrieve is not _fail_to_retrieve: raise ValueError( # noqa: TRY003 "Cannot combine registries with conflicting retrieval " "functions.", ) retrieve = registry._retrieve return evolve( self, anchors=anchors, resources=resources, uncrawled=uncrawled, retrieve=retrieve, ) def resolver(self, base_uri: URI = "") -> Resolver[D]: """ Return a `Resolver` which resolves references against this registry. """ return Resolver(base_uri=base_uri, registry=self) def resolver_with_root(self, resource: Resource[D]) -> Resolver[D]: """ Return a `Resolver` with a specific root resource. """ uri = resource.id() or "" return Resolver( base_uri=uri, registry=self.with_resource(uri, resource), ) #: An anchor or resource. AnchorOrResource = TypeVar("AnchorOrResource", AnchorType[Any], Resource[Any]) @frozen class Retrieved(Generic[D, AnchorOrResource]): """ A value retrieved from a `Registry`. """ value: AnchorOrResource registry: Registry[D] @frozen class Resolved(Generic[D]): """ A reference resolved to its contents by a `Resolver`. """ contents: D resolver: Resolver[D] @frozen class Resolver(Generic[D]): """ A reference resolver. Resolvers help resolve references (including relative ones) by pairing a fixed base URI with a `Registry`. This object, under normal circumstances, is expected to be used by *implementers of libraries* built on top of `referencing` (e.g. JSON Schema implementations or other libraries resolving JSON references), not directly by end-users populating registries or while writing schemas or other resources. References are resolved against the base URI, and the combined URI is then looked up within the registry. The process of resolving a reference may itself involve calculating a *new* base URI for future reference resolution (e.g. if an intermediate resource sets a new base URI), or may involve encountering additional subresources and adding them to a new registry. """ _base_uri: URI = field(alias="base_uri") _registry: Registry[D] = field(alias="registry") _previous: List[URI] = field(default=List(), repr=False, alias="previous") def lookup(self, ref: URI) -> Resolved[D]: """ Resolve the given reference to the resource it points to. Raises: `exceptions.Unresolvable` or a subclass thereof (see below) if the reference isn't resolvable `exceptions.NoSuchAnchor` if the reference is to a URI where a resource exists but contains a plain name fragment which does not exist within the resource `exceptions.PointerToNowhere` if the reference is to a URI where a resource exists but contains a JSON pointer to a location within the resource that does not exist """ if ref.startswith("#"): uri, fragment = self._base_uri, ref[1:] else: uri, fragment = urldefrag(urljoin(self._base_uri, ref)) try: retrieved = self._registry.get_or_retrieve(uri) except exceptions.NoSuchResource: raise exceptions.Unresolvable(ref=ref) from None except exceptions.Unretrievable as error: raise exceptions.Unresolvable(ref=ref) from error if fragment.startswith("/"): resolver = self._evolve(registry=retrieved.registry, base_uri=uri) return retrieved.value.pointer(pointer=fragment, resolver=resolver) if fragment: retrieved = retrieved.registry.anchor(uri, fragment) resolver = self._evolve(registry=retrieved.registry, base_uri=uri) return retrieved.value.resolve(resolver=resolver) resolver = self._evolve(registry=retrieved.registry, base_uri=uri) return Resolved(contents=retrieved.value.contents, resolver=resolver) def in_subresource(self, subresource: Resource[D]) -> Resolver[D]: """ Create a resolver for a subresource (which may have a new base URI). """ id = subresource.id() if id is None: return self return evolve(self, base_uri=urljoin(self._base_uri, id)) def dynamic_scope(self) -> Iterable[tuple[URI, Registry[D]]]: """ In specs with such a notion, return the URIs in the dynamic scope. """ for uri in self._previous: yield uri, self._registry def _evolve(self, base_uri: URI, **kwargs: Any): """ Evolve, appending to the dynamic scope. """ previous = self._previous if self._base_uri and (not previous or base_uri != self._base_uri): previous = previous.push_front(self._base_uri) return evolve(self, base_uri=base_uri, previous=previous, **kwargs) @frozen class Anchor(Generic[D]): """ A simple anchor in a `Resource`. """ name: str resource: Resource[D] def resolve(self, resolver: Resolver[D]): """ Return the resource for this anchor. """ return Resolved(contents=self.resource.contents, resolver=resolver) PK! +PP exceptions.pynu[""" Errors, oh no! """ from __future__ import annotations from typing import TYPE_CHECKING, Any import attrs from referencing._attrs import frozen if TYPE_CHECKING: from referencing import Resource from referencing.typing import URI @frozen class NoSuchResource(KeyError): """ The given URI is not present in a registry. Unlike most exceptions, this class *is* intended to be publicly instantiable and *is* part of the public API of the package. """ ref: URI def __eq__(self, other: object) -> bool: if self.__class__ is not other.__class__: return NotImplemented return attrs.astuple(self) == attrs.astuple(other) def __hash__(self) -> int: return hash(attrs.astuple(self)) @frozen class NoInternalID(Exception): """ A resource has no internal ID, but one is needed. E.g. in modern JSON Schema drafts, this is the :kw:`$id` keyword. One might be needed if a resource was to-be added to a registry but no other URI is available, and the resource doesn't declare its canonical URI. """ resource: Resource[Any] def __eq__(self, other: object) -> bool: if self.__class__ is not other.__class__: return NotImplemented return attrs.astuple(self) == attrs.astuple(other) def __hash__(self) -> int: return hash(attrs.astuple(self)) @frozen class Unretrievable(KeyError): """ The given URI is not present in a registry, and retrieving it failed. """ ref: URI def __eq__(self, other: object) -> bool: if self.__class__ is not other.__class__: return NotImplemented return attrs.astuple(self) == attrs.astuple(other) def __hash__(self) -> int: return hash(attrs.astuple(self)) @frozen class CannotDetermineSpecification(Exception): """ Attempting to detect the appropriate `Specification` failed. This happens if no discernible information is found in the contents of the new resource which would help identify it. """ contents: Any def __eq__(self, other: object) -> bool: if self.__class__ is not other.__class__: return NotImplemented return attrs.astuple(self) == attrs.astuple(other) def __hash__(self) -> int: return hash(attrs.astuple(self)) @attrs.frozen # Because here we allow subclassing below. class Unresolvable(Exception): """ A reference was unresolvable. """ ref: URI def __eq__(self, other: object) -> bool: if self.__class__ is not other.__class__: return NotImplemented return attrs.astuple(self) == attrs.astuple(other) def __hash__(self) -> int: return hash(attrs.astuple(self)) @frozen class PointerToNowhere(Unresolvable): """ A JSON Pointer leads to a part of a document that does not exist. """ resource: Resource[Any] def __str__(self) -> str: msg = f"{self.ref!r} does not exist within {self.resource.contents!r}" if self.ref == "/": msg += ( ". The pointer '/' is a valid JSON Pointer but it points to " "an empty string property ''. If you intended to point " "to the entire resource, you should use '#'." ) return msg @frozen class NoSuchAnchor(Unresolvable): """ An anchor does not exist within a particular resource. """ resource: Resource[Any] anchor: str def __str__(self) -> str: return ( f"{self.anchor!r} does not exist within {self.resource.contents!r}" ) @frozen class InvalidAnchor(Unresolvable): """ An anchor which could never exist in a resource was dereferenced. It is somehow syntactically invalid. """ resource: Resource[Any] anchor: str def __str__(self) -> str: return ( f"'#{self.anchor}' is not a valid anchor, neither as a " "plain name anchor nor as a JSON Pointer. You may have intended " f"to use '#/{self.anchor}', as the slash is required *before each " "segment* of a JSON pointer." ) PK!Dh|II jsonschema.pynu[""" Referencing implementations for JSON Schema specs (historic & current). """ from __future__ import annotations from collections.abc import Sequence, Set from typing import Any, Iterable, Union from referencing import Anchor, Registry, Resource, Specification, exceptions from referencing._attrs import frozen from referencing._core import ( _UNSET, # type: ignore[reportPrivateUsage] Resolved as _Resolved, Resolver as _Resolver, _Unset, # type: ignore[reportPrivateUsage] ) from referencing.typing import URI, Anchor as AnchorType, Mapping #: A JSON Schema which is a JSON object ObjectSchema = Mapping[str, Any] #: A JSON Schema of any kind Schema = Union[bool, ObjectSchema] #: A Resource whose contents are JSON Schemas SchemaResource = Resource[Schema] #: A JSON Schema Registry SchemaRegistry = Registry[Schema] #: The empty JSON Schema Registry EMPTY_REGISTRY: SchemaRegistry = Registry() @frozen class UnknownDialect(Exception): """ A dialect identifier was found for a dialect unknown by this library. If it's a custom ("unofficial") dialect, be sure you've registered it. """ uri: URI def _dollar_id(contents: Schema) -> URI | None: if isinstance(contents, bool): return return contents.get("$id") def _legacy_dollar_id(contents: Schema) -> URI | None: if isinstance(contents, bool) or "$ref" in contents: return id = contents.get("$id") if id is not None and not id.startswith("#"): return id def _legacy_id(contents: ObjectSchema) -> URI | None: if "$ref" in contents: return id = contents.get("id") if id is not None and not id.startswith("#"): return id def _anchor( specification: Specification[Schema], contents: Schema, ) -> Iterable[AnchorType[Schema]]: if isinstance(contents, bool): return anchor = contents.get("$anchor") if anchor is not None: yield Anchor( name=anchor, resource=specification.create_resource(contents), ) dynamic_anchor = contents.get("$dynamicAnchor") if dynamic_anchor is not None: yield DynamicAnchor( name=dynamic_anchor, resource=specification.create_resource(contents), ) def _anchor_2019( specification: Specification[Schema], contents: Schema, ) -> Iterable[Anchor[Schema]]: if isinstance(contents, bool): return [] anchor = contents.get("$anchor") if anchor is None: return [] return [ Anchor( name=anchor, resource=specification.create_resource(contents), ), ] def _legacy_anchor_in_dollar_id( specification: Specification[Schema], contents: Schema, ) -> Iterable[Anchor[Schema]]: if isinstance(contents, bool): return [] id = contents.get("$id", "") if not id.startswith("#"): return [] return [ Anchor( name=id[1:], resource=specification.create_resource(contents), ), ] def _legacy_anchor_in_id( specification: Specification[ObjectSchema], contents: ObjectSchema, ) -> Iterable[Anchor[ObjectSchema]]: id = contents.get("id", "") if not id.startswith("#"): return [] return [ Anchor( name=id[1:], resource=specification.create_resource(contents), ), ] def _subresources_of( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): """ Create a callable returning JSON Schema specification-style subschemas. Relies on specifying the set of keywords containing subschemas in their values, in a subobject's values, or in a subarray. """ def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: if isinstance(contents, bool): return for each in in_value: if each in contents: yield contents[each] for each in in_subarray: if each in contents: yield from contents[each] for each in in_subvalues: if each in contents: yield from contents[each].values() return subresources_of def _subresources_of_with_crazy_items( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): """ Specifically handle older drafts where there are some funky keywords. """ def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: if isinstance(contents, bool): return for each in in_value: if each in contents: yield contents[each] for each in in_subarray: if each in contents: yield from contents[each] for each in in_subvalues: if each in contents: yield from contents[each].values() items = contents.get("items") if items is not None: if isinstance(items, Sequence): yield from items else: yield items return subresources_of def _subresources_of_with_crazy_items_dependencies( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): """ Specifically handle older drafts where there are some funky keywords. """ def subresources_of(contents: Schema) -> Iterable[ObjectSchema]: if isinstance(contents, bool): return for each in in_value: if each in contents: yield contents[each] for each in in_subarray: if each in contents: yield from contents[each] for each in in_subvalues: if each in contents: yield from contents[each].values() items = contents.get("items") if items is not None: if isinstance(items, Sequence): yield from items else: yield items dependencies = contents.get("dependencies") if dependencies is not None: values = iter(dependencies.values()) value = next(values, None) if isinstance(value, Mapping): yield value yield from values return subresources_of def _subresources_of_with_crazy_aP_items_dependencies( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): """ Specifically handle even older drafts where there are some funky keywords. """ def subresources_of(contents: ObjectSchema) -> Iterable[ObjectSchema]: for each in in_value: if each in contents: yield contents[each] for each in in_subarray: if each in contents: yield from contents[each] for each in in_subvalues: if each in contents: yield from contents[each].values() items = contents.get("items") if items is not None: if isinstance(items, Sequence): yield from items else: yield items dependencies = contents.get("dependencies") if dependencies is not None: values = iter(dependencies.values()) value = next(values, None) if isinstance(value, Mapping): yield value yield from values for each in "additionalItems", "additionalProperties": value = contents.get(each) if isinstance(value, Mapping): yield value return subresources_of def _maybe_in_subresource( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): in_child = in_subvalues | in_subarray def maybe_in_subresource( segments: Sequence[int | str], resolver: _Resolver[Any], subresource: Resource[Any], ) -> _Resolver[Any]: _segments = iter(segments) for segment in _segments: if segment not in in_value and ( segment not in in_child or next(_segments, None) is None ): return resolver return resolver.in_subresource(subresource) return maybe_in_subresource def _maybe_in_subresource_crazy_items( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): in_child = in_subvalues | in_subarray def maybe_in_subresource( segments: Sequence[int | str], resolver: _Resolver[Any], subresource: Resource[Any], ) -> _Resolver[Any]: _segments = iter(segments) for segment in _segments: if segment == "items" and isinstance( subresource.contents, Mapping, ): return resolver.in_subresource(subresource) if segment not in in_value and ( segment not in in_child or next(_segments, None) is None ): return resolver return resolver.in_subresource(subresource) return maybe_in_subresource def _maybe_in_subresource_crazy_items_dependencies( in_value: Set[str] = frozenset(), in_subvalues: Set[str] = frozenset(), in_subarray: Set[str] = frozenset(), ): in_child = in_subvalues | in_subarray def maybe_in_subresource( segments: Sequence[int | str], resolver: _Resolver[Any], subresource: Resource[Any], ) -> _Resolver[Any]: _segments = iter(segments) for segment in _segments: if segment in {"items", "dependencies"} and isinstance( subresource.contents, Mapping, ): return resolver.in_subresource(subresource) if segment not in in_value and ( segment not in in_child or next(_segments, None) is None ): return resolver return resolver.in_subresource(subresource) return maybe_in_subresource #: JSON Schema draft 2020-12 DRAFT202012 = Specification( name="draft2020-12", id_of=_dollar_id, subresources_of=_subresources_of( in_value={ "additionalProperties", "contains", "contentSchema", "else", "if", "items", "not", "propertyNames", "then", "unevaluatedItems", "unevaluatedProperties", }, in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, in_subvalues={ "$defs", "definitions", "dependentSchemas", "patternProperties", "properties", }, ), anchors_in=_anchor, maybe_in_subresource=_maybe_in_subresource( in_value={ "additionalProperties", "contains", "contentSchema", "else", "if", "items", "not", "propertyNames", "then", "unevaluatedItems", "unevaluatedProperties", }, in_subarray={"allOf", "anyOf", "oneOf", "prefixItems"}, in_subvalues={ "$defs", "definitions", "dependentSchemas", "patternProperties", "properties", }, ), ) #: JSON Schema draft 2019-09 DRAFT201909 = Specification( name="draft2019-09", id_of=_dollar_id, subresources_of=_subresources_of_with_crazy_items( in_value={ "additionalItems", "additionalProperties", "contains", "contentSchema", "else", "if", "not", "propertyNames", "then", "unevaluatedItems", "unevaluatedProperties", }, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={ "$defs", "definitions", "dependentSchemas", "patternProperties", "properties", }, ), anchors_in=_anchor_2019, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real maybe_in_subresource=_maybe_in_subresource_crazy_items( in_value={ "additionalItems", "additionalProperties", "contains", "contentSchema", "else", "if", "not", "propertyNames", "then", "unevaluatedItems", "unevaluatedProperties", }, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={ "$defs", "definitions", "dependentSchemas", "patternProperties", "properties", }, ), ) #: JSON Schema draft 7 DRAFT7 = Specification( name="draft-07", id_of=_legacy_dollar_id, subresources_of=_subresources_of_with_crazy_items_dependencies( in_value={ "additionalItems", "additionalProperties", "contains", "else", "if", "not", "propertyNames", "then", }, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( in_value={ "additionalItems", "additionalProperties", "contains", "else", "if", "not", "propertyNames", "then", }, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), ) #: JSON Schema draft 6 DRAFT6 = Specification( name="draft-06", id_of=_legacy_dollar_id, subresources_of=_subresources_of_with_crazy_items_dependencies( in_value={ "additionalItems", "additionalProperties", "contains", "not", "propertyNames", }, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), anchors_in=_legacy_anchor_in_dollar_id, # type: ignore[reportGeneralTypeIssues] # TODO: check whether this is real maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( in_value={ "additionalItems", "additionalProperties", "contains", "not", "propertyNames", }, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), ) #: JSON Schema draft 4 DRAFT4 = Specification( name="draft-04", id_of=_legacy_id, subresources_of=_subresources_of_with_crazy_aP_items_dependencies( in_value={"not"}, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), anchors_in=_legacy_anchor_in_id, maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( in_value={"additionalItems", "additionalProperties", "not"}, in_subarray={"allOf", "anyOf", "oneOf"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), ) #: JSON Schema draft 3 DRAFT3 = Specification( name="draft-03", id_of=_legacy_id, subresources_of=_subresources_of_with_crazy_aP_items_dependencies( in_subarray={"extends"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), anchors_in=_legacy_anchor_in_id, maybe_in_subresource=_maybe_in_subresource_crazy_items_dependencies( in_value={"additionalItems", "additionalProperties"}, in_subarray={"extends"}, in_subvalues={"definitions", "patternProperties", "properties"}, ), ) _SPECIFICATIONS: Registry[Specification[Schema]] = Registry( { # type: ignore[reportGeneralTypeIssues] # :/ internal vs external types dialect_id: Resource.opaque(specification) for dialect_id, specification in [ ("https://json-schema.org/draft/2020-12/schema", DRAFT202012), ("https://json-schema.org/draft/2019-09/schema", DRAFT201909), ("http://json-schema.org/draft-07/schema", DRAFT7), ("http://json-schema.org/draft-06/schema", DRAFT6), ("http://json-schema.org/draft-04/schema", DRAFT4), ("http://json-schema.org/draft-03/schema", DRAFT3), ] }, ) def specification_with( dialect_id: URI, default: Specification[Any] | _Unset = _UNSET, ) -> Specification[Any]: """ Retrieve the `Specification` with the given dialect identifier. Raises: `UnknownDialect` if the given ``dialect_id`` isn't known """ resource = _SPECIFICATIONS.get(dialect_id.rstrip("#")) if resource is not None: return resource.contents if default is _UNSET: raise UnknownDialect(dialect_id) return default @frozen class DynamicAnchor: """ Dynamic anchors, introduced in draft 2020. """ name: str resource: SchemaResource def resolve(self, resolver: _Resolver[Schema]) -> _Resolved[Schema]: """ Resolve this anchor dynamically. """ last = self.resource for uri, registry in resolver.dynamic_scope(): try: anchor = registry.anchor(uri, self.name).value except exceptions.NoSuchAnchor: continue if isinstance(anchor, DynamicAnchor): last = anchor.resource return _Resolved( contents=last.contents, resolver=resolver.in_subresource(last), ) def lookup_recursive_ref(resolver: _Resolver[Schema]) -> _Resolved[Schema]: """ Recursive references (via recursive anchors), present only in draft 2019. As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive reference is supported (and is therefore assumed to be the relevant reference). """ resolved = resolver.lookup("#") if isinstance(resolved.contents, Mapping) and resolved.contents.get( "$recursiveAnchor", ): for uri, _ in resolver.dynamic_scope(): next_resolved = resolver.lookup(uri) if not isinstance( next_resolved.contents, Mapping, ) or not next_resolved.contents.get("$recursiveAnchor"): break resolved = next_resolved return resolved PK!py.typednu[PK!  retrieval.pynu[""" Helpers related to (dynamic) resource retrieval. """ from __future__ import annotations from functools import lru_cache from typing import TYPE_CHECKING, Callable, TypeVar import json from referencing import Resource if TYPE_CHECKING: from referencing.typing import URI, D, Retrieve #: A serialized document (e.g. a JSON string) _T = TypeVar("_T") def to_cached_resource( cache: Callable[[Retrieve[D]], Retrieve[D]] | None = None, loads: Callable[[_T], D] = json.loads, from_contents: Callable[[D], Resource[D]] = Resource.from_contents, ) -> Callable[[Callable[[URI], _T]], Retrieve[D]]: """ Create a retriever which caches its return values from a simpler callable. Takes a function which returns things like serialized JSON (strings) and returns something suitable for passing to `Registry` as a retrieve function. This decorator both reduces a small bit of boilerplate for a common case (deserializing JSON from strings and creating `Resource` objects from the result) as well as makes the probable need for caching a bit easier. Retrievers which otherwise do expensive operations (like hitting the network) might otherwise be called repeatedly. Examples -------- .. testcode:: from referencing import Registry from referencing.typing import URI import referencing.retrieval @referencing.retrieval.to_cached_resource() def retrieve(uri: URI): print(f"Retrieved {uri}") # Normally, go get some expensive JSON from the network, a file ... return ''' { "$schema": "https://json-schema.org/draft/2020-12/schema", "foo": "bar" } ''' one = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo") print(one.value.contents["foo"]) # Retrieving the same URI again reuses the same value (and thus doesn't # print another retrieval message here) two = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo") print(two.value.contents["foo"]) .. testoutput:: Retrieved urn:example:foo bar bar """ if cache is None: cache = lru_cache(maxsize=None) def decorator(retrieve: Callable[[URI], _T]): @cache def cached_retrieve(uri: URI): response = retrieve(uri) contents = loads(response) return from_contents(contents) return cached_retrieve return decorator PK!`N2 typing.pynu[""" Type-annotation related support for the referencing library. """ from __future__ import annotations from typing import TYPE_CHECKING, Protocol, TypeVar try: from collections.abc import Mapping as Mapping Mapping[str, str] except TypeError: # pragma: no cover from typing import Mapping as Mapping if TYPE_CHECKING: from referencing._core import Resolved, Resolver, Resource #: A URI which identifies a `Resource`. URI = str #: The type of documents within a registry. D = TypeVar("D") class Retrieve(Protocol[D]): """ A retrieval callable, usable within a `Registry` for resource retrieval. Does not make assumptions about where the resource might be coming from. """ def __call__(self, uri: URI) -> Resource[D]: """ Retrieve the resource with the given URI. Raise `referencing.exceptions.NoSuchResource` if you wish to indicate the retriever cannot lookup the given URI. """ ... class Anchor(Protocol[D]): """ An anchor within a `Resource`. Beyond "simple" anchors, some specifications like JSON Schema's 2020 version have dynamic anchors. """ @property def name(self) -> str: """ Return the name of this anchor. """ ... def resolve(self, resolver: Resolver[D]) -> Resolved[D]: """ Return the resource for this anchor. """ ... PK!tests/__init__.pynu[PK!fbޓޓtests/test_core.pynu[from rpds import HashTrieMap import pytest from referencing import Anchor, Registry, Resource, Specification, exceptions from referencing.jsonschema import DRAFT202012 ID_AND_CHILDREN = Specification( name="id-and-children", id_of=lambda contents: contents.get("ID"), subresources_of=lambda contents: contents.get("children", []), anchors_in=lambda specification, contents: [ Anchor( name=name, resource=specification.create_resource(contents=each), ) for name, each in contents.get("anchors", {}).items() ], maybe_in_subresource=lambda segments, resolver, subresource: ( resolver.in_subresource(subresource) if not len(segments) % 2 and all(each == "children" for each in segments[::2]) else resolver ), ) def blow_up(uri): # pragma: no cover """ A retriever suitable for use in tests which expect it never to be used. """ raise RuntimeError("This retrieve function expects to never be called!") class TestRegistry: def test_with_resource(self): """ Adding a resource to the registry then allows re-retrieving it. """ resource = Resource.opaque(contents={"foo": "bar"}) uri = "urn:example" registry = Registry().with_resource(uri=uri, resource=resource) assert registry[uri] is resource def test_with_resources(self): """ Adding multiple resources to the registry is like adding each one. """ one = Resource.opaque(contents={}) two = Resource(contents={"foo": "bar"}, specification=ID_AND_CHILDREN) registry = Registry().with_resources( [ ("http://example.com/1", one), ("http://example.com/foo/bar", two), ], ) assert registry == Registry().with_resource( uri="http://example.com/1", resource=one, ).with_resource( uri="http://example.com/foo/bar", resource=two, ) def test_matmul_resource(self): uri = "urn:example:resource" resource = ID_AND_CHILDREN.create_resource({"ID": uri, "foo": 12}) registry = resource @ Registry() assert registry == Registry().with_resource(uri, resource) def test_matmul_many_resources(self): one_uri = "urn:example:one" one = ID_AND_CHILDREN.create_resource({"ID": one_uri, "foo": 12}) two_uri = "urn:example:two" two = ID_AND_CHILDREN.create_resource({"ID": two_uri, "foo": 12}) registry = [one, two] @ Registry() assert registry == Registry().with_resources( [(one_uri, one), (two_uri, two)], ) def test_matmul_resource_without_id(self): resource = Resource.opaque(contents={"foo": "bar"}) with pytest.raises(exceptions.NoInternalID) as e: resource @ Registry() assert e.value == exceptions.NoInternalID(resource=resource) def test_with_contents_from_json_schema(self): uri = "urn:example" schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} registry = Registry().with_contents([(uri, schema)]) expected = Resource(contents=schema, specification=DRAFT202012) assert registry[uri] == expected def test_with_contents_and_default_specification(self): uri = "urn:example" registry = Registry().with_contents( [(uri, {"foo": "bar"})], default_specification=Specification.OPAQUE, ) assert registry[uri] == Resource.opaque({"foo": "bar"}) def test_len(self): total = 5 registry = Registry().with_contents( [(str(i), {"foo": "bar"}) for i in range(total)], default_specification=Specification.OPAQUE, ) assert len(registry) == total def test_bool_empty(self): assert not Registry() def test_bool_not_empty(self): registry = Registry().with_contents( [(str(i), {"foo": "bar"}) for i in range(3)], default_specification=Specification.OPAQUE, ) assert registry def test_iter(self): registry = Registry().with_contents( [(str(i), {"foo": "bar"}) for i in range(8)], default_specification=Specification.OPAQUE, ) assert set(registry) == {str(i) for i in range(8)} def test_crawl_still_has_top_level_resource(self): resource = Resource.opaque({"foo": "bar"}) uri = "urn:example" registry = Registry({uri: resource}).crawl() assert registry[uri] is resource def test_crawl_finds_a_subresource(self): child_id = "urn:child" root = ID_AND_CHILDREN.create_resource( {"ID": "urn:root", "children": [{"ID": child_id, "foo": 12}]}, ) registry = root @ Registry() with pytest.raises(LookupError): registry[child_id] expected = ID_AND_CHILDREN.create_resource({"ID": child_id, "foo": 12}) assert registry.crawl()[child_id] == expected def test_crawl_finds_anchors_with_id(self): resource = ID_AND_CHILDREN.create_resource( {"ID": "urn:bar", "anchors": {"foo": 12}}, ) registry = resource @ Registry() assert registry.crawl().anchor(resource.id(), "foo").value == Anchor( name="foo", resource=ID_AND_CHILDREN.create_resource(12), ) def test_crawl_finds_anchors_no_id(self): resource = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}}) registry = Registry().with_resource("urn:root", resource) assert registry.crawl().anchor("urn:root", "foo").value == Anchor( name="foo", resource=ID_AND_CHILDREN.create_resource(12), ) def test_contents(self): resource = Resource.opaque({"foo": "bar"}) uri = "urn:example" registry = Registry().with_resource(uri, resource) assert registry.contents(uri) == {"foo": "bar"} def test_getitem_strips_empty_fragments(self): uri = "http://example.com/" resource = ID_AND_CHILDREN.create_resource({"ID": uri + "#"}) registry = resource @ Registry() assert registry[uri] == registry[uri + "#"] == resource def test_contents_strips_empty_fragments(self): uri = "http://example.com/" resource = ID_AND_CHILDREN.create_resource({"ID": uri + "#"}) registry = resource @ Registry() assert ( registry.contents(uri) == registry.contents(uri + "#") == {"ID": uri + "#"} ) def test_contents_nonexistent_resource(self): registry = Registry() with pytest.raises(exceptions.NoSuchResource) as e: registry.contents("urn:example") assert e.value == exceptions.NoSuchResource(ref="urn:example") def test_crawled_anchor(self): resource = ID_AND_CHILDREN.create_resource({"anchors": {"foo": "bar"}}) registry = Registry().with_resource("urn:example", resource) retrieved = registry.anchor("urn:example", "foo") assert retrieved.value == Anchor( name="foo", resource=ID_AND_CHILDREN.create_resource("bar"), ) assert retrieved.registry == registry.crawl() def test_anchor_in_nonexistent_resource(self): registry = Registry() with pytest.raises(exceptions.NoSuchResource) as e: registry.anchor("urn:example", "foo") assert e.value == exceptions.NoSuchResource(ref="urn:example") def test_init(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) registry = Registry( { "http://example.com/1": one, "http://example.com/foo/bar": two, }, ) assert ( registry == Registry() .with_resources( [ ("http://example.com/1", one), ("http://example.com/foo/bar", two), ], ) .crawl() ) def test_dict_conversion(self): """ Passing a `dict` to `Registry` gets converted to a `HashTrieMap`. So continuing to use the registry works. """ one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) registry = Registry( {"http://example.com/1": one}, ).with_resource("http://example.com/foo/bar", two) assert ( registry.crawl() == Registry() .with_resources( [ ("http://example.com/1", one), ("http://example.com/foo/bar", two), ], ) .crawl() ) def test_no_such_resource(self): registry = Registry() with pytest.raises(exceptions.NoSuchResource) as e: registry["urn:bigboom"] assert e.value == exceptions.NoSuchResource(ref="urn:bigboom") def test_combine(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) four = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}}) first = Registry({"http://example.com/1": one}) second = Registry().with_resource("http://example.com/foo/bar", two) third = Registry( { "http://example.com/1": one, "http://example.com/baz": three, }, ) fourth = ( Registry() .with_resource( "http://example.com/foo/quux", four, ) .crawl() ) assert first.combine(second, third, fourth) == Registry( [ ("http://example.com/1", one), ("http://example.com/baz", three), ("http://example.com/foo/quux", four), ], anchors=HashTrieMap( { ("http://example.com/foo/quux", "foo"): Anchor( name="foo", resource=ID_AND_CHILDREN.create_resource(12), ), }, ), ).with_resource("http://example.com/foo/bar", two) def test_combine_self(self): """ Combining a registry with itself short-circuits. This is a performance optimization -- otherwise we do lots more work (in jsonschema this seems to correspond to making the test suite take *3x* longer). """ registry = Registry({"urn:foo": "bar"}) assert registry.combine(registry) is registry def test_combine_with_uncrawled_resources(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) first = Registry().with_resource("http://example.com/1", one) second = Registry().with_resource("http://example.com/foo/bar", two) third = Registry( { "http://example.com/1": one, "http://example.com/baz": three, }, ) expected = Registry( [ ("http://example.com/1", one), ("http://example.com/foo/bar", two), ("http://example.com/baz", three), ], ) combined = first.combine(second, third) assert combined != expected assert combined.crawl() == expected def test_combine_with_single_retrieve(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) def retrieve(uri): # pragma: no cover pass first = Registry().with_resource("http://example.com/1", one) second = Registry( retrieve=retrieve, ).with_resource("http://example.com/2", two) third = Registry().with_resource("http://example.com/3", three) assert first.combine(second, third) == Registry( retrieve=retrieve, ).with_resources( [ ("http://example.com/1", one), ("http://example.com/2", two), ("http://example.com/3", three), ], ) assert second.combine(first, third) == Registry( retrieve=retrieve, ).with_resources( [ ("http://example.com/1", one), ("http://example.com/2", two), ("http://example.com/3", three), ], ) def test_combine_with_common_retrieve(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) def retrieve(uri): # pragma: no cover pass first = Registry(retrieve=retrieve).with_resource( "http://example.com/1", one, ) second = Registry( retrieve=retrieve, ).with_resource("http://example.com/2", two) third = Registry(retrieve=retrieve).with_resource( "http://example.com/3", three, ) assert first.combine(second, third) == Registry( retrieve=retrieve, ).with_resources( [ ("http://example.com/1", one), ("http://example.com/2", two), ("http://example.com/3", three), ], ) assert second.combine(first, third) == Registry( retrieve=retrieve, ).with_resources( [ ("http://example.com/1", one), ("http://example.com/2", two), ("http://example.com/3", three), ], ) def test_combine_conflicting_retrieve(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) three = ID_AND_CHILDREN.create_resource({"baz": "quux"}) def foo_retrieve(uri): # pragma: no cover pass def bar_retrieve(uri): # pragma: no cover pass first = Registry(retrieve=foo_retrieve).with_resource( "http://example.com/1", one, ) second = Registry().with_resource("http://example.com/2", two) third = Registry(retrieve=bar_retrieve).with_resource( "http://example.com/3", three, ) with pytest.raises(Exception, match="conflict.*retriev"): first.combine(second, third) def test_remove(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) registry = Registry({"urn:foo": one, "urn:bar": two}) assert registry.remove("urn:foo") == Registry({"urn:bar": two}) def test_remove_uncrawled(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) registry = Registry().with_resources( [("urn:foo", one), ("urn:bar", two)], ) assert registry.remove("urn:foo") == Registry().with_resource( "urn:bar", two, ) def test_remove_with_anchors(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"anchors": {"foo": "bar"}}) registry = ( Registry() .with_resources( [("urn:foo", one), ("urn:bar", two)], ) .crawl() ) assert ( registry.remove("urn:bar") == Registry() .with_resource( "urn:foo", one, ) .crawl() ) def test_remove_nonexistent_uri(self): with pytest.raises(exceptions.NoSuchResource) as e: Registry().remove("urn:doesNotExist") assert e.value == exceptions.NoSuchResource(ref="urn:doesNotExist") def test_retrieve(self): foo = Resource.opaque({"foo": "bar"}) registry = Registry(retrieve=lambda uri: foo) assert registry.get_or_retrieve("urn:example").value == foo def test_retrieve_arbitrary_exception(self): foo = Resource.opaque({"foo": "bar"}) def retrieve(uri): if uri == "urn:succeed": return foo raise Exception("Oh no!") registry = Registry(retrieve=retrieve) assert registry.get_or_retrieve("urn:succeed").value == foo with pytest.raises(exceptions.Unretrievable): registry.get_or_retrieve("urn:uhoh") def test_retrieve_no_such_resource(self): foo = Resource.opaque({"foo": "bar"}) def retrieve(uri): if uri == "urn:succeed": return foo raise exceptions.NoSuchResource(ref=uri) registry = Registry(retrieve=retrieve) assert registry.get_or_retrieve("urn:succeed").value == foo with pytest.raises(exceptions.NoSuchResource): registry.get_or_retrieve("urn:uhoh") def test_retrieve_cannot_determine_specification(self): def retrieve(uri): return Resource.from_contents({}) registry = Registry(retrieve=retrieve) with pytest.raises(exceptions.CannotDetermineSpecification): registry.get_or_retrieve("urn:uhoh") def test_retrieve_already_available_resource(self): foo = Resource.opaque({"foo": "bar"}) registry = Registry({"urn:example": foo}, retrieve=blow_up) assert registry["urn:example"] == foo assert registry.get_or_retrieve("urn:example").value == foo def test_retrieve_first_checks_crawlable_resource(self): child = ID_AND_CHILDREN.create_resource({"ID": "urn:child", "foo": 12}) root = ID_AND_CHILDREN.create_resource({"children": [child.contents]}) registry = Registry(retrieve=blow_up).with_resource("urn:root", root) assert registry.crawl()["urn:child"] == child def test_resolver(self): one = Resource.opaque(contents={}) registry = Registry({"http://example.com": one}) resolver = registry.resolver(base_uri="http://example.com") assert resolver.lookup("#").contents == {} def test_resolver_with_root_identified(self): root = ID_AND_CHILDREN.create_resource({"ID": "http://example.com"}) resolver = Registry().resolver_with_root(root) assert resolver.lookup("http://example.com").contents == root.contents assert resolver.lookup("#").contents == root.contents def test_resolver_with_root_unidentified(self): root = Resource.opaque(contents={}) resolver = Registry().resolver_with_root(root) assert resolver.lookup("#").contents == root.contents def test_repr(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) registry = Registry().with_resources( [ ("http://example.com/1", one), ("http://example.com/foo/bar", two), ], ) assert repr(registry) == "" assert repr(registry.crawl()) == "" def test_repr_mixed_crawled(self): one = Resource.opaque(contents={}) two = ID_AND_CHILDREN.create_resource({"foo": "bar"}) registry = ( Registry( {"http://example.com/1": one}, ) .crawl() .with_resource(uri="http://example.com/foo/bar", resource=two) ) assert repr(registry) == "" def test_repr_one_resource(self): registry = Registry().with_resource( uri="http://example.com/1", resource=Resource.opaque(contents={}), ) assert repr(registry) == "" def test_repr_empty(self): assert repr(Registry()) == "" class TestResource: def test_from_contents_from_json_schema(self): schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} resource = Resource.from_contents(schema) assert resource == Resource(contents=schema, specification=DRAFT202012) def test_from_contents_with_no_discernible_information(self): """ Creating a resource with no discernible way to see what specification it belongs to (e.g. no ``$schema`` keyword for JSON Schema) raises an error. """ with pytest.raises(exceptions.CannotDetermineSpecification): Resource.from_contents({"foo": "bar"}) def test_from_contents_with_no_discernible_information_and_default(self): resource = Resource.from_contents( {"foo": "bar"}, default_specification=Specification.OPAQUE, ) assert resource == Resource.opaque(contents={"foo": "bar"}) def test_from_contents_unneeded_default(self): schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} resource = Resource.from_contents( schema, default_specification=Specification.OPAQUE, ) assert resource == Resource( contents=schema, specification=DRAFT202012, ) def test_non_mapping_from_contents(self): resource = Resource.from_contents( True, default_specification=ID_AND_CHILDREN, ) assert resource == Resource( contents=True, specification=ID_AND_CHILDREN, ) def test_from_contents_with_fallback(self): resource = Resource.from_contents( {"foo": "bar"}, default_specification=Specification.OPAQUE, ) assert resource == Resource.opaque(contents={"foo": "bar"}) def test_id_delegates_to_specification(self): specification = Specification( name="", id_of=lambda contents: "urn:fixedID", subresources_of=lambda contents: [], anchors_in=lambda specification, contents: [], maybe_in_subresource=( lambda segments, resolver, subresource: resolver ), ) resource = Resource( contents={"foo": "baz"}, specification=specification, ) assert resource.id() == "urn:fixedID" def test_id_strips_empty_fragment(self): uri = "http://example.com/" root = ID_AND_CHILDREN.create_resource({"ID": uri + "#"}) assert root.id() == uri def test_subresources_delegates_to_specification(self): resource = ID_AND_CHILDREN.create_resource({"children": [{}, 12]}) assert list(resource.subresources()) == [ ID_AND_CHILDREN.create_resource(each) for each in [{}, 12] ] def test_subresource_with_different_specification(self): schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} resource = ID_AND_CHILDREN.create_resource({"children": [schema]}) assert list(resource.subresources()) == [ DRAFT202012.create_resource(schema), ] def test_anchors_delegates_to_specification(self): resource = ID_AND_CHILDREN.create_resource( {"anchors": {"foo": {}, "bar": 1, "baz": ""}}, ) assert list(resource.anchors()) == [ Anchor(name="foo", resource=ID_AND_CHILDREN.create_resource({})), Anchor(name="bar", resource=ID_AND_CHILDREN.create_resource(1)), Anchor(name="baz", resource=ID_AND_CHILDREN.create_resource("")), ] def test_pointer_to_mapping(self): resource = Resource.opaque(contents={"foo": "baz"}) resolver = Registry().resolver() assert resource.pointer("/foo", resolver=resolver).contents == "baz" def test_pointer_to_array(self): resource = Resource.opaque(contents={"foo": {"bar": [3]}}) resolver = Registry().resolver() assert resource.pointer("/foo/bar/0", resolver=resolver).contents == 3 def test_root_pointer(self): contents = {"foo": "baz"} resource = Resource.opaque(contents=contents) resolver = Registry().resolver() assert resource.pointer("", resolver=resolver).contents == contents def test_opaque(self): contents = {"foo": "bar"} assert Resource.opaque(contents) == Resource( contents=contents, specification=Specification.OPAQUE, ) class TestResolver: def test_lookup_exact_uri(self): resource = Resource.opaque(contents={"foo": "baz"}) resolver = Registry({"http://example.com/1": resource}).resolver() resolved = resolver.lookup("http://example.com/1") assert resolved.contents == resource.contents def test_lookup_subresource(self): root = ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/", "children": [ {"ID": "http://example.com/a", "foo": 12}, ], }, ) registry = root @ Registry() resolved = registry.resolver().lookup("http://example.com/a") assert resolved.contents == {"ID": "http://example.com/a", "foo": 12} def test_lookup_anchor_with_id(self): root = ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/", "anchors": {"foo": 12}, }, ) registry = root @ Registry() resolved = registry.resolver().lookup("http://example.com/#foo") assert resolved.contents == 12 def test_lookup_anchor_without_id(self): root = ID_AND_CHILDREN.create_resource({"anchors": {"foo": 12}}) resolver = Registry().with_resource("urn:example", root).resolver() resolved = resolver.lookup("urn:example#foo") assert resolved.contents == 12 def test_lookup_unknown_reference(self): resolver = Registry().resolver() ref = "http://example.com/does/not/exist" with pytest.raises(exceptions.Unresolvable) as e: resolver.lookup(ref) assert e.value == exceptions.Unresolvable(ref=ref) def test_lookup_non_existent_pointer(self): resource = Resource.opaque({"foo": {}}) resolver = Registry({"http://example.com/1": resource}).resolver() ref = "http://example.com/1#/foo/bar" with pytest.raises(exceptions.Unresolvable) as e: resolver.lookup(ref) assert e.value == exceptions.PointerToNowhere( ref="/foo/bar", resource=resource, ) assert str(e.value) == "'/foo/bar' does not exist within {'foo': {}}" def test_lookup_non_existent_pointer_to_array_index(self): resource = Resource.opaque([1, 2, 4, 8]) resolver = Registry({"http://example.com/1": resource}).resolver() ref = "http://example.com/1#/10" with pytest.raises(exceptions.Unresolvable) as e: resolver.lookup(ref) assert e.value == exceptions.PointerToNowhere( ref="/10", resource=resource, ) def test_lookup_pointer_to_empty_string(self): resolver = Registry().resolver_with_root(Resource.opaque({"": {}})) assert resolver.lookup("#/").contents == {} def test_lookup_non_existent_pointer_to_empty_string(self): resource = Resource.opaque({"foo": {}}) resolver = Registry().resolver_with_root(resource) with pytest.raises( exceptions.Unresolvable, match="^'/' does not exist within {'foo': {}}.*'#'", ) as e: resolver.lookup("#/") assert e.value == exceptions.PointerToNowhere( ref="/", resource=resource, ) def test_lookup_non_existent_anchor(self): root = ID_AND_CHILDREN.create_resource({"anchors": {}}) resolver = Registry().with_resource("urn:example", root).resolver() resolved = resolver.lookup("urn:example") assert resolved.contents == root.contents ref = "urn:example#noSuchAnchor" with pytest.raises(exceptions.Unresolvable) as e: resolver.lookup(ref) assert "'noSuchAnchor' does not exist" in str(e.value) assert e.value == exceptions.NoSuchAnchor( ref="urn:example", resource=root, anchor="noSuchAnchor", ) def test_lookup_invalid_JSON_pointerish_anchor(self): resolver = Registry().resolver_with_root( ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/", "foo": {"bar": 12}, }, ), ) valid = resolver.lookup("#/foo/bar") assert valid.contents == 12 with pytest.raises(exceptions.InvalidAnchor) as e: resolver.lookup("#foo/bar") assert " '#/foo/bar'" in str(e.value) def test_lookup_retrieved_resource(self): resource = Resource.opaque(contents={"foo": "baz"}) resolver = Registry(retrieve=lambda uri: resource).resolver() resolved = resolver.lookup("http://example.com/") assert resolved.contents == resource.contents def test_lookup_failed_retrieved_resource(self): """ Unretrievable exceptions are also wrapped in Unresolvable. """ uri = "http://example.com/" registry = Registry(retrieve=blow_up) with pytest.raises(exceptions.Unretrievable): registry.get_or_retrieve(uri) resolver = registry.resolver() with pytest.raises(exceptions.Unresolvable): resolver.lookup(uri) def test_repeated_lookup_from_retrieved_resource(self): """ A (custom-)retrieved resource is added to the registry returned by looking it up. """ resource = Resource.opaque(contents={"foo": "baz"}) once = [resource] def retrieve(uri): return once.pop() resolver = Registry(retrieve=retrieve).resolver() resolved = resolver.lookup("http://example.com/") assert resolved.contents == resource.contents resolved = resolved.resolver.lookup("http://example.com/") assert resolved.contents == resource.contents def test_repeated_anchor_lookup_from_retrieved_resource(self): resource = Resource.opaque(contents={"foo": "baz"}) once = [resource] def retrieve(uri): return once.pop() resolver = Registry(retrieve=retrieve).resolver() resolved = resolver.lookup("http://example.com/") assert resolved.contents == resource.contents resolved = resolved.resolver.lookup("#") assert resolved.contents == resource.contents # FIXME: The tests below aren't really representable in the current # suite, though we should probably think of ways to do so. def test_in_subresource(self): root = ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/", "children": [ { "ID": "child/", "children": [{"ID": "grandchild"}], }, ], }, ) registry = root @ Registry() resolver = registry.resolver() first = resolver.lookup("http://example.com/") assert first.contents == root.contents with pytest.raises(exceptions.Unresolvable): first.resolver.lookup("grandchild") sub = first.resolver.in_subresource( ID_AND_CHILDREN.create_resource(first.contents["children"][0]), ) second = sub.lookup("grandchild") assert second.contents == {"ID": "grandchild"} def test_in_pointer_subresource(self): root = ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/", "children": [ { "ID": "child/", "children": [{"ID": "grandchild"}], }, ], }, ) registry = root @ Registry() resolver = registry.resolver() first = resolver.lookup("http://example.com/") assert first.contents == root.contents with pytest.raises(exceptions.Unresolvable): first.resolver.lookup("grandchild") second = first.resolver.lookup("#/children/0") third = second.resolver.lookup("grandchild") assert third.contents == {"ID": "grandchild"} def test_dynamic_scope(self): one = ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/", "children": [ { "ID": "child/", "children": [{"ID": "grandchild"}], }, ], }, ) two = ID_AND_CHILDREN.create_resource( { "ID": "http://example.com/two", "children": [{"ID": "two-child/"}], }, ) registry = [one, two] @ Registry() resolver = registry.resolver() first = resolver.lookup("http://example.com/") second = first.resolver.lookup("#/children/0") third = second.resolver.lookup("grandchild") fourth = third.resolver.lookup("http://example.com/two") assert list(fourth.resolver.dynamic_scope()) == [ ("http://example.com/child/grandchild", fourth.resolver._registry), ("http://example.com/child/", fourth.resolver._registry), ("http://example.com/", fourth.resolver._registry), ] assert list(third.resolver.dynamic_scope()) == [ ("http://example.com/child/", third.resolver._registry), ("http://example.com/", third.resolver._registry), ] assert list(second.resolver.dynamic_scope()) == [ ("http://example.com/", second.resolver._registry), ] assert list(first.resolver.dynamic_scope()) == [] class TestSpecification: def test_create_resource(self): specification = Specification( name="", id_of=lambda contents: "urn:fixedID", subresources_of=lambda contents: [], anchors_in=lambda specification, contents: [], maybe_in_subresource=( lambda segments, resolver, subresource: resolver ), ) resource = specification.create_resource(contents={"foo": "baz"}) assert resource == Resource( contents={"foo": "baz"}, specification=specification, ) assert resource.id() == "urn:fixedID" def test_detect_from_json_schema(self): schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} specification = Specification.detect(schema) assert specification == DRAFT202012 def test_detect_with_no_discernible_information(self): with pytest.raises(exceptions.CannotDetermineSpecification): Specification.detect({"foo": "bar"}) def test_detect_with_non_URI_schema(self): with pytest.raises(exceptions.CannotDetermineSpecification): Specification.detect({"$schema": 37}) def test_detect_with_no_discernible_information_and_default(self): specification = Specification.OPAQUE.detect({"foo": "bar"}) assert specification is Specification.OPAQUE def test_detect_unneeded_default(self): schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} specification = Specification.OPAQUE.detect(schema) assert specification == DRAFT202012 def test_non_mapping_detect(self): with pytest.raises(exceptions.CannotDetermineSpecification): Specification.detect(True) def test_non_mapping_detect_with_default(self): specification = ID_AND_CHILDREN.detect(True) assert specification is ID_AND_CHILDREN def test_detect_with_fallback(self): specification = Specification.OPAQUE.detect({"foo": "bar"}) assert specification is Specification.OPAQUE def test_repr(self): assert ( repr(ID_AND_CHILDREN) == "" ) class TestOpaqueSpecification: THINGS = [{"foo": "bar"}, True, 37, "foo", object()] @pytest.mark.parametrize("thing", THINGS) def test_no_id(self, thing): """ An arbitrary thing has no ID. """ assert Specification.OPAQUE.id_of(thing) is None @pytest.mark.parametrize("thing", THINGS) def test_no_subresources(self, thing): """ An arbitrary thing has no subresources. """ assert list(Specification.OPAQUE.subresources_of(thing)) == [] @pytest.mark.parametrize("thing", THINGS) def test_no_anchors(self, thing): """ An arbitrary thing has no anchors. """ assert list(Specification.OPAQUE.anchors_in(thing)) == [] @pytest.mark.parametrize( "cls", [Anchor, Registry, Resource, Specification, exceptions.PointerToNowhere], ) def test_nonsubclassable(cls): with pytest.raises(Exception, match="(?i)subclassing"): class Boom(cls): # pragma: no cover pass PK!*'tests/test_exceptions.pynu[import itertools import pytest from referencing import Resource, exceptions def pairs(choices): return itertools.combinations(choices, 2) TRUE = Resource.opaque(True) thunks = ( lambda: exceptions.CannotDetermineSpecification(TRUE), lambda: exceptions.NoSuchResource("urn:example:foo"), lambda: exceptions.NoInternalID(TRUE), lambda: exceptions.InvalidAnchor(resource=TRUE, anchor="foo", ref="a#b"), lambda: exceptions.NoSuchAnchor(resource=TRUE, anchor="foo", ref="a#b"), lambda: exceptions.PointerToNowhere(resource=TRUE, ref="urn:example:foo"), lambda: exceptions.Unresolvable("urn:example:foo"), lambda: exceptions.Unretrievable("urn:example:foo"), ) @pytest.mark.parametrize("one, two", pairs(each() for each in thunks)) def test_eq_incompatible_types(one, two): assert one != two @pytest.mark.parametrize("thunk", thunks) def test_hash(thunk): assert thunk() in {thunk()} PK!W--tests/test_jsonschema.pynu[import pytest from referencing import Registry, Resource, Specification import referencing.jsonschema @pytest.mark.parametrize( "uri, expected", [ ( "https://json-schema.org/draft/2020-12/schema", referencing.jsonschema.DRAFT202012, ), ( "https://json-schema.org/draft/2019-09/schema", referencing.jsonschema.DRAFT201909, ), ( "http://json-schema.org/draft-07/schema#", referencing.jsonschema.DRAFT7, ), ( "http://json-schema.org/draft-06/schema#", referencing.jsonschema.DRAFT6, ), ( "http://json-schema.org/draft-04/schema#", referencing.jsonschema.DRAFT4, ), ( "http://json-schema.org/draft-03/schema#", referencing.jsonschema.DRAFT3, ), ], ) def test_schemas_with_explicit_schema_keywords_are_detected(uri, expected): """ The $schema keyword in JSON Schema is a dialect identifier. """ contents = {"$schema": uri} resource = Resource.from_contents(contents) assert resource == Resource(contents=contents, specification=expected) def test_unknown_dialect(): dialect_id = "http://example.com/unknown-json-schema-dialect-id" with pytest.raises(referencing.jsonschema.UnknownDialect) as excinfo: Resource.from_contents({"$schema": dialect_id}) assert excinfo.value.uri == dialect_id @pytest.mark.parametrize( "id, specification", [ ("$id", referencing.jsonschema.DRAFT202012), ("$id", referencing.jsonschema.DRAFT201909), ("$id", referencing.jsonschema.DRAFT7), ("$id", referencing.jsonschema.DRAFT6), ("id", referencing.jsonschema.DRAFT4), ("id", referencing.jsonschema.DRAFT3), ], ) def test_id_of_mapping(id, specification): uri = "http://example.com/some-schema" assert specification.id_of({id: uri}) == uri @pytest.mark.parametrize( "specification", [ referencing.jsonschema.DRAFT202012, referencing.jsonschema.DRAFT201909, referencing.jsonschema.DRAFT7, referencing.jsonschema.DRAFT6, ], ) @pytest.mark.parametrize("value", [True, False]) def test_id_of_bool(specification, value): assert specification.id_of(value) is None @pytest.mark.parametrize( "specification", [ referencing.jsonschema.DRAFT202012, referencing.jsonschema.DRAFT201909, referencing.jsonschema.DRAFT7, referencing.jsonschema.DRAFT6, ], ) @pytest.mark.parametrize("value", [True, False]) def test_anchors_in_bool(specification, value): assert list(specification.anchors_in(value)) == [] @pytest.mark.parametrize( "specification", [ referencing.jsonschema.DRAFT202012, referencing.jsonschema.DRAFT201909, referencing.jsonschema.DRAFT7, referencing.jsonschema.DRAFT6, ], ) @pytest.mark.parametrize("value", [True, False]) def test_subresources_of_bool(specification, value): assert list(specification.subresources_of(value)) == [] @pytest.mark.parametrize( "uri, expected", [ ( "https://json-schema.org/draft/2020-12/schema", referencing.jsonschema.DRAFT202012, ), ( "https://json-schema.org/draft/2019-09/schema", referencing.jsonschema.DRAFT201909, ), ( "http://json-schema.org/draft-07/schema#", referencing.jsonschema.DRAFT7, ), ( "http://json-schema.org/draft-06/schema#", referencing.jsonschema.DRAFT6, ), ( "http://json-schema.org/draft-04/schema#", referencing.jsonschema.DRAFT4, ), ( "http://json-schema.org/draft-03/schema#", referencing.jsonschema.DRAFT3, ), ], ) def test_specification_with(uri, expected): assert referencing.jsonschema.specification_with(uri) == expected @pytest.mark.parametrize( "uri, expected", [ ( "http://json-schema.org/draft-07/schema", referencing.jsonschema.DRAFT7, ), ( "http://json-schema.org/draft-06/schema", referencing.jsonschema.DRAFT6, ), ( "http://json-schema.org/draft-04/schema", referencing.jsonschema.DRAFT4, ), ( "http://json-schema.org/draft-03/schema", referencing.jsonschema.DRAFT3, ), ], ) def test_specification_with_no_empty_fragment(uri, expected): assert referencing.jsonschema.specification_with(uri) == expected def test_specification_with_unknown_dialect(): dialect_id = "http://example.com/unknown-json-schema-dialect-id" with pytest.raises(referencing.jsonschema.UnknownDialect) as excinfo: referencing.jsonschema.specification_with(dialect_id) assert excinfo.value.uri == dialect_id def test_specification_with_default(): dialect_id = "http://example.com/unknown-json-schema-dialect-id" specification = referencing.jsonschema.specification_with( dialect_id, default=Specification.OPAQUE, ) assert specification is Specification.OPAQUE # FIXME: The tests below should move to the referencing suite but I haven't yet # figured out how to represent dynamic (& recursive) ref lookups in it. def test_lookup_trivial_dynamic_ref(): one = referencing.jsonschema.DRAFT202012.create_resource( {"$dynamicAnchor": "foo"}, ) resolver = Registry().with_resource("http://example.com", one).resolver() resolved = resolver.lookup("http://example.com#foo") assert resolved.contents == one.contents def test_multiple_lookup_trivial_dynamic_ref(): TRUE = referencing.jsonschema.DRAFT202012.create_resource(True) root = referencing.jsonschema.DRAFT202012.create_resource( { "$id": "http://example.com", "$dynamicAnchor": "fooAnchor", "$defs": { "foo": { "$id": "foo", "$dynamicAnchor": "fooAnchor", "$defs": { "bar": True, "baz": { "$dynamicAnchor": "fooAnchor", }, }, }, }, }, ) resolver = ( Registry() .with_resources( [ ("http://example.com", root), ("http://example.com/foo/", TRUE), ("http://example.com/foo/bar", root), ], ) .resolver() ) first = resolver.lookup("http://example.com") second = first.resolver.lookup("foo/") resolver = second.resolver.lookup("bar").resolver fourth = resolver.lookup("#fooAnchor") assert fourth.contents == root.contents def test_multiple_lookup_dynamic_ref_to_nondynamic_ref(): one = referencing.jsonschema.DRAFT202012.create_resource( {"$anchor": "fooAnchor"}, ) two = referencing.jsonschema.DRAFT202012.create_resource( { "$id": "http://example.com", "$dynamicAnchor": "fooAnchor", "$defs": { "foo": { "$id": "foo", "$dynamicAnchor": "fooAnchor", "$defs": { "bar": True, "baz": { "$dynamicAnchor": "fooAnchor", }, }, }, }, }, ) resolver = ( Registry() .with_resources( [ ("http://example.com", two), ("http://example.com/foo/", one), ("http://example.com/foo/bar", two), ], ) .resolver() ) first = resolver.lookup("http://example.com") second = first.resolver.lookup("foo/") resolver = second.resolver.lookup("bar").resolver fourth = resolver.lookup("#fooAnchor") assert fourth.contents == two.contents def test_lookup_trivial_recursive_ref(): one = referencing.jsonschema.DRAFT201909.create_resource( {"$recursiveAnchor": True}, ) resolver = Registry().with_resource("http://example.com", one).resolver() first = resolver.lookup("http://example.com") resolved = referencing.jsonschema.lookup_recursive_ref( resolver=first.resolver, ) assert resolved.contents == one.contents def test_lookup_recursive_ref_to_bool(): TRUE = referencing.jsonschema.DRAFT201909.create_resource(True) registry = Registry({"http://example.com": TRUE}) resolved = referencing.jsonschema.lookup_recursive_ref( resolver=registry.resolver(base_uri="http://example.com"), ) assert resolved.contents == TRUE.contents def test_multiple_lookup_recursive_ref_to_bool(): TRUE = referencing.jsonschema.DRAFT201909.create_resource(True) root = referencing.jsonschema.DRAFT201909.create_resource( { "$id": "http://example.com", "$recursiveAnchor": True, "$defs": { "foo": { "$id": "foo", "$recursiveAnchor": True, "$defs": { "bar": True, "baz": { "$recursiveAnchor": True, "$anchor": "fooAnchor", }, }, }, }, }, ) resolver = ( Registry() .with_resources( [ ("http://example.com", root), ("http://example.com/foo/", TRUE), ("http://example.com/foo/bar", root), ], ) .resolver() ) first = resolver.lookup("http://example.com") second = first.resolver.lookup("foo/") resolver = second.resolver.lookup("bar").resolver fourth = referencing.jsonschema.lookup_recursive_ref(resolver=resolver) assert fourth.contents == root.contents def test_multiple_lookup_recursive_ref_with_nonrecursive_ref(): one = referencing.jsonschema.DRAFT201909.create_resource( {"$recursiveAnchor": True}, ) two = referencing.jsonschema.DRAFT201909.create_resource( { "$id": "http://example.com", "$recursiveAnchor": True, "$defs": { "foo": { "$id": "foo", "$recursiveAnchor": True, "$defs": { "bar": True, "baz": { "$recursiveAnchor": True, "$anchor": "fooAnchor", }, }, }, }, }, ) three = referencing.jsonschema.DRAFT201909.create_resource( {"$recursiveAnchor": False}, ) resolver = ( Registry() .with_resources( [ ("http://example.com", three), ("http://example.com/foo/", two), ("http://example.com/foo/bar", one), ], ) .resolver() ) first = resolver.lookup("http://example.com") second = first.resolver.lookup("foo/") resolver = second.resolver.lookup("bar").resolver fourth = referencing.jsonschema.lookup_recursive_ref(resolver=resolver) assert fourth.contents == two.contents def test_empty_registry(): assert referencing.jsonschema.EMPTY_REGISTRY == Registry() PK!k  tests/test_referencing_suite.pynu[from pathlib import Path import json import os import pytest from referencing import Registry from referencing.exceptions import Unresolvable import referencing.jsonschema class SuiteNotFound(Exception): def __str__(self): # pragma: no cover return ( "Cannot find the referencing suite. " "Set the REFERENCING_SUITE environment variable to the path to " "the suite, or run the test suite from alongside a full checkout " "of the git repository." ) if "REFERENCING_SUITE" in os.environ: # pragma: no cover SUITE = Path(os.environ["REFERENCING_SUITE"]) / "tests" else: SUITE = Path(__file__).parent.parent.parent / "suite/tests" if not SUITE.is_dir(): # pragma: no cover raise SuiteNotFound() DIALECT_IDS = json.loads(SUITE.joinpath("specifications.json").read_text()) @pytest.mark.parametrize( "test_path", [ pytest.param(each, id=f"{each.parent.name}-{each.stem}") for each in SUITE.glob("*/**/*.json") ], ) def test_referencing_suite(test_path, subtests): dialect_id = DIALECT_IDS[test_path.relative_to(SUITE).parts[0]] specification = referencing.jsonschema.specification_with(dialect_id) loaded = json.loads(test_path.read_text()) registry = loaded["registry"] registry = Registry().with_resources( (uri, specification.create_resource(contents)) for uri, contents in loaded["registry"].items() ) for test in loaded["tests"]: with subtests.test(test=test): if "normalization" in test_path.stem: pytest.xfail("APIs need to change for proper URL support.") resolver = registry.resolver(base_uri=test.get("base_uri", "")) if test.get("error"): with pytest.raises(Unresolvable): resolver.lookup(test["ref"]) else: resolved = resolver.lookup(test["ref"]) assert resolved.contents == test["target"] then = test.get("then") while then: # pragma: no cover with subtests.test(test=test, then=then): resolved = resolved.resolver.lookup(then["ref"]) assert resolved.contents == then["target"] then = then.get("then") PK!g+htests/test_retrieval.pynu[from functools import lru_cache import json import pytest from referencing import Registry, Resource, exceptions from referencing.jsonschema import DRAFT202012 from referencing.retrieval import to_cached_resource class TestToCachedResource: def test_it_caches_retrieved_resources(self): contents = {"$schema": "https://json-schema.org/draft/2020-12/schema"} stack = [json.dumps(contents)] @to_cached_resource() def retrieve(uri): return stack.pop() registry = Registry(retrieve=retrieve) expected = Resource.from_contents(contents) got = registry.get_or_retrieve("urn:example:schema") assert got.value == expected # And a second time we get the same value. again = registry.get_or_retrieve("urn:example:schema") assert again.value is got.value def test_custom_loader(self): contents = {"$schema": "https://json-schema.org/draft/2020-12/schema"} stack = [json.dumps(contents)[::-1]] @to_cached_resource(loads=lambda s: json.loads(s[::-1])) def retrieve(uri): return stack.pop() registry = Registry(retrieve=retrieve) expected = Resource.from_contents(contents) got = registry.get_or_retrieve("urn:example:schema") assert got.value == expected # And a second time we get the same value. again = registry.get_or_retrieve("urn:example:schema") assert again.value is got.value def test_custom_from_contents(self): contents = {} stack = [json.dumps(contents)] @to_cached_resource(from_contents=DRAFT202012.create_resource) def retrieve(uri): return stack.pop() registry = Registry(retrieve=retrieve) expected = DRAFT202012.create_resource(contents) got = registry.get_or_retrieve("urn:example:schema") assert got.value == expected # And a second time we get the same value. again = registry.get_or_retrieve("urn:example:schema") assert again.value is got.value def test_custom_cache(self): schema = {"$schema": "https://json-schema.org/draft/2020-12/schema"} mapping = { "urn:example:1": dict(schema, foo=1), "urn:example:2": dict(schema, foo=2), "urn:example:3": dict(schema, foo=3), } resources = { uri: Resource.from_contents(contents) for uri, contents in mapping.items() } @to_cached_resource(cache=lru_cache(maxsize=2)) def retrieve(uri): return json.dumps(mapping.pop(uri)) registry = Registry(retrieve=retrieve) got = registry.get_or_retrieve("urn:example:1") assert got.value == resources["urn:example:1"] assert registry.get_or_retrieve("urn:example:1").value is got.value assert registry.get_or_retrieve("urn:example:1").value is got.value got = registry.get_or_retrieve("urn:example:2") assert got.value == resources["urn:example:2"] assert registry.get_or_retrieve("urn:example:2").value is got.value assert registry.get_or_retrieve("urn:example:2").value is got.value # This still succeeds, but evicts the first URI got = registry.get_or_retrieve("urn:example:3") assert got.value == resources["urn:example:3"] assert registry.get_or_retrieve("urn:example:3").value is got.value assert registry.get_or_retrieve("urn:example:3").value is got.value # And now this fails (as we popped the value out of `mapping`) with pytest.raises(exceptions.Unretrievable): registry.get_or_retrieve("urn:example:1") PK!%R;Ѧ)tests/__pycache__/__init__.cpython-38.pycnu[U af@sdS)NrrrK/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/tests/__init__.pyPK!yUU*tests/__pycache__/test_core.cpython-38.pycnu[U afޓ@sddlmZddlZddlmZmZmZmZmZddl m Z edddddd dd dd Z d d Z GdddZ GdddZGdddZGdddZGdddZejdeeeeejgddZdS)) HashTrieMapN)AnchorRegistryResource Specification exceptions) DRAFT202012zid-and-childrencCs |dS)NIDgetcontentsrL/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/tests/test_core.py rcCs |dgS)Nchildrenr r rrrr rcsfdd|diDS)Ncs$g|]\}}t|j|ddqS)r nameresource)rcreate_resource).0reach specificationrr s  z..anchors)r itemsrr rrrr s cCs6t|ds2tdd|dddDr2||S|S)Ncss|]}|dkVqdS)rNrrrrrr sz..)lenallin_subresourcesegmentsresolverZ subresourcerrrrs   rid_ofsubresources_of anchors_inZmaybe_in_subresourcecCs tddS)zQ A retriever suitable for use in tests which expect it never to be used. z2This retrieve function expects to never be called!N) RuntimeErrorurirrrblow_upsr/c@seZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZddZdd Zd!d"Zd#d$Zd%d&Zd'd(Zd)d*Zd+d,Zd-d.Zd/d0Zd1d2Zd3d4Zd5d6Zd7d8Zd9d:Zd;d<Z d=d>Z!d?d@Z"dAdBZ#dCdDZ$dEdFZ%dGdHZ&dIdJZ'dKdLZ(dMdNZ)dOdPZ*dQdRZ+dSdTZ,dUdVZ-dWdXZ.dYdZZ/d[d\Z0d]d^Z1d_S)` TestRegistrycCs8tjddid}d}tj||d}|||ks4tdS)zQ Adding a resource to the registry then allows re-retrieving it. foobarr urn:exampler.rN)ropaquer with_resourceAssertionErrorselfrr.registryrrrtest_with_resource#szTestRegistry.test_with_resourcecCsZtjid}tdditd}td|fd|fg}|tjd|djd|dksVtdS) zT Adding multiple resources to the registry is like adding each one. r r1r2r rhttp://example.com/1http://example.com/foo/barr4N)rr5ID_AND_CHILDRENrwith_resourcesr6r7r9onetwor:rrrtest_with_resources-s z TestRegistry.test_with_resourcescCs8d}t|dd}|t}|t||ks4tdS)Nzurn:example:resource r r1)r?rrr6r7r9r.rr:rrrtest_matmul_resourceBs z!TestRegistry.test_matmul_resourcecCsZd}t|dd}d}t|dd}||gt}|t||f||fgksVtdS)Nzurn:example:onerErFzurn:example:two)r?rrr@r7)r9Zone_urirBZtwo_urirCr:rrrtest_matmul_many_resourcesHsz'TestRegistry.test_matmul_many_resourcesc CsLtjddid}ttj}|tW5QRX|jtj|dksHtdS)Nr1r2r r) rr5pytestraisesrZ NoInternalIDrvaluer7)r9rerrrtest_matmul_resource_without_idTsz,TestRegistry.test_matmul_resource_without_idcCs>d}ddi}t||fg}t|td}|||ks:tdS)Nr3$schema,https://json-schema.org/draft/2020-12/schemar<)r with_contentsrrr7)r9r.schemar:expectedrrr#test_with_contents_from_json_schemaZs  z0TestRegistry.test_with_contents_from_json_schemacCs>d}tj|ddifgtjd}||tddiks:tdS)Nr3r1r2Zdefault_specification)rrRrOPAQUErr5r7)r9r.r:rrr,test_with_contents_and_default_specificationbs  z9TestRegistry.test_with_contents_and_default_specificationcCs8d}tjddt|Dtjd}t||ks4tdS)NcSsg|]}t|ddifqSr1r2strrirrrrmsz)TestRegistry.test_len..rV)rrRrangerrWr"r7)r9totalr:rrrtest_lenjs zTestRegistry.test_lencCstr tdSN)rr7r9rrrtest_bool_emptyrszTestRegistry.test_bool_emptycCs,tjddtdDtjd}|s(tdS)NcSsg|]}t|ddifqSrZr[r]rrrrwsz4TestRegistry.test_bool_not_empty..rV)rrRr_rrWr7r9r:rrrtest_bool_not_emptyus z TestRegistry.test_bool_not_emptycCsBtjddtdDtjd}t|ddtdDks>tdS)NcSsg|]}t|ddifqSrZr[r]rrrr~sz*TestRegistry.test_iter..rVcSsh|] }t|qSrr[r]rrr sz)TestRegistry.test_iter..)rrRr_rrWsetr7rfrrr test_iter|s zTestRegistry.test_itercCs6tddi}d}t||i}|||ks2tdSNr1r2r3)rr5rcrawlr7r8rrr'test_crawl_still_has_top_level_resourcesz4TestRegistry.test_crawl_still_has_top_level_resourcec Csld}td|ddgd}|t}tt||W5QRXt|dd}|||kshtdS)N urn:childurn:rootrErFr r)r?rrrKrL LookupErrorrmr7)r9Zchild_idrootr:rTrrrtest_crawl_finds_a_subresources  z+TestRegistry.test_crawl_finds_a_subresourcecCsNtdddid}|t}||djtdtddksJtdS)Nurn:barr1rEr rr) r?rrrmanchoridrMrr7r9rr:rrr test_crawl_finds_anchors_with_ids  z-TestRegistry.test_crawl_finds_anchors_with_idcCsLtdddii}td|}|ddjtdtddksHtdS)Nrr1rErpr) r?rrr6rmrwrMrr7ryrrrtest_crawl_finds_anchors_no_ids z+TestRegistry.test_crawl_finds_anchors_no_idcCs:tddi}d}t||}||ddiks6tdSrl)rr5rr6r r7r8rrr test_contentsszTestRegistry.test_contentscCsJd}td|di}|t}||||dkr@|ksFntdSNhttp://example.com/r #)r?rrr7rGrrr#test_getitem_strips_empty_fragmentss z0TestRegistry.test_getitem_strips_empty_fragmentscCsVd}td|di}|t}||||dkrLd|diksRntdSr})r?rrr r7rGrrr$test_contents_strips_empty_fragmentss   z1TestRegistry.test_contents_strips_empty_fragmentsc CsBt}ttj}|dW5QRX|jtjddks>tdS)Nr3ref)rrKrLrNoSuchResourcer rMr7r9r:rNrrr"test_contents_nonexistent_resourcesz/TestRegistry.test_contents_nonexistent_resourcecCs^tdddii}td|}|dd}|jtdtddksHt|j| ksZtdS)Nrr1r2r3r) r?rrr6rwrMrr7r:rm)r9rr:Z retrievedrrrtest_crawled_anchors  z TestRegistry.test_crawled_anchorc CsDt}ttj}|ddW5QRX|jtjddks@tdS)Nr3r1r)rrKrLrrrwrMr7rrrr#test_anchor_in_nonexistent_resourcesz0TestRegistry.test_anchor_in_nonexistent_resourcecCsPtjid}tddi}t||d}|td|fd|fgksLtdS)Nr r1r2)r=r>r=r>)rr5r?rrr@rmr7rArrr test_inits zTestRegistry.test_initcCsZtjid}tddi}td|id|}|td|fd|fgksVtdS)z Passing a `dict` to `Registry` gets converted to a `HashTrieMap`. So continuing to use the registry works. r r1r2r=r>N) rr5r?rrr6rmr@r7rArrrtest_dict_conversions  z!TestRegistry.test_dict_conversionc Cs@t}ttj}|dW5QRX|jtjddksr=http://example.com/bazhttp://example.com/foo/quuxr)rr1r)r) rr5r?rrr6rmcombinerrr7) r9rBrCthreeZfourfirstsecondthirdfourthrrr test_combinesB  zTestRegistry.test_combinecCs"tddi}|||kstdS)z Combining a registry with itself short-circuits. This is a performance optimization -- otherwise we do lots more work (in jsonschema this seems to correspond to making the test suite take *3x* longer). urn:foor2N)rrr7rfrrrtest_combine_self.s zTestRegistry.test_combine_selfc Cstjid}tddi}tddi}td|}td|}t||d}td|fd|fd |fg}|||}||kst||kstdS) Nr r1r2rrr=r>rr) rr5r?rrr6rr7rm) r9rBrCrrrrrTZcombinedrrr%test_combine_with_uncrawled_resources:s&   z2TestRegistry.test_combine_with_uncrawled_resourcescCstjid}tddi}tddi}dd}td|}t|d d |}td |}|||t|d d|fd |fd |fgkst|||t|d d|fd |fd |fgkstdS) Nr r1r2rrcSsdSrbrr-rrrretrieveWsz@TestRegistry.test_combine_with_single_retrieve..retriever=rhttp://example.com/2http://example.com/3 rr5r?rrr6rr@r7r9rBrCrrrrrrrr!test_combine_with_single_retrieveRs8   z.TestRegistry.test_combine_with_single_retrievecCstjid}tddi}tddi}dd}t|dd |}t|dd |}t|dd |}|||t|dd |fd |fd |fgkst|||t|dd |fd |fd |fgkstdS) Nr r1r2rrcSsdSrbrr-rrrrxsz@TestRegistry.test_combine_with_common_retrieve..retrieverr=rrrrrrr!test_combine_with_common_retrievessD     z.TestRegistry.test_combine_with_common_retrievec Cstjid}tddi}tddi}dd}dd }t|d d |}td |}t|d d |}tjtdd| ||W5QRXdS)Nr r1r2rrcSsdSrbrr-rrr foo_retrieveszDTestRegistry.test_combine_conflicting_retrieve..foo_retrievecSsdSrbrr-rrr bar_retrieveszDTestRegistry.test_combine_conflicting_retrieve..bar_retrieverr=rrzconflict.*retrievmatch) rr5r?rrr6rKrL Exceptionr) r9rBrCrrrrrrrrr!test_combine_conflicting_retrieves    z.TestRegistry.test_combine_conflicting_retrievecCsFtjid}tddi}t||d}|dtd|iksBtdS)Nr r1r2)rrurru)rr5r?rrremover7rArrr test_removes zTestRegistry.test_removecCsRtjid}tddi}td|fd|fg}|dtd|ksNtdS)Nr r1r2rru) rr5r?rrr@rr6r7rArrrtest_remove_uncrawleds z"TestRegistry.test_remove_uncrawledcCs^tjid}tdddii}td|fd|fg}|dtd|ksZt dS)Nr rr1r2rru) rr5r?rrr@rmrr6r7rArrrtest_remove_with_anchorss z%TestRegistry.test_remove_with_anchorsc Cs>ttj}tdW5QRX|jtjddks:tdS)Nzurn:doesNotExistr)rKrLrrrrrMr7)r9rNrrrtest_remove_nonexistent_urisz(TestRegistry.test_remove_nonexistent_urics8tdditfddd}|djks4tdS)Nr1r2csSrbrr-r1rrrrz,TestRegistry.test_retrieve..rr3)rr5rget_or_retrieverMr7rfrrr test_retrieveszTestRegistry.test_retrievec s^tddifdd}t|d}|djks8tttj |dW5QRXdS)Nr1r2cs|dkr StddS)N urn:succeedzOh no!)rr-rrrrsz@TestRegistry.test_retrieve_arbitrary_exception..retrieverrurn:uhoh) rr5rrrMr7rKrLr Unretrievabler9rr:rrr!test_retrieve_arbitrary_exceptions   z.TestRegistry.test_retrieve_arbitrary_exceptionc s^tddifdd}t|d}|djks8tttj |dW5QRXdS)Nr1r2cs|dkr Stj|ddS)Nrr)rrr-rrrrsz=TestRegistry.test_retrieve_no_such_resource..retrieverrr) rr5rrrMr7rKrLrrrrrrtest_retrieve_no_such_resources   z+TestRegistry.test_retrieve_no_such_resourcec Cs8dd}t|d}ttj|dW5QRXdS)NcSs tiSrb)r from_contentsr-rrrrszKTestRegistry.test_retrieve_cannot_determine_specification..retrieverr)rrKrLrCannotDetermineSpecificationrrrrr,test_retrieve_cannot_determine_specifications z9TestRegistry.test_retrieve_cannot_determine_specificationcCsFtddi}td|itd}|d|ks.t|dj|ksBtdS)Nr1r2r3r)rr5rr/r7rrM)r9r1r:rrr(test_retrieve_already_available_resourcesz5TestRegistry.test_retrieve_already_available_resourcecCsLtddd}td|jgi}ttdd|}|d|ksHtdS)NrorErFrrrp)r?rr rr/r6rmr7)r9childrsr:rrr-test_retrieve_first_checks_crawlable_resource sz:TestRegistry.test_retrieve_first_checks_crawlable_resourcecCs<tjid}td|i}|jdd}|djiks8tdS)Nr http://example.com)Zbase_urirrr5rr'lookupr r7)r9rBr:r'rrr test_resolvers   zTestRegistry.test_resolvercCsJtddi}t|}|dj|jks0t|dj|jksFtdS)Nr rr)r?rrresolver_with_rootrr r7r9rsr'rrr"test_resolver_with_root_identifieds z/TestRegistry.test_resolver_with_root_identifiedcCs2tjid}t|}|dj|jks.tdS)Nr r)rr5rrrr r7rrrr$test_resolver_with_root_unidentifieds  z1TestRegistry.test_resolver_with_root_unidentifiedcCsZtjid}tddi}td|fd|fg}t|dksBtt|dksVtdS)Nr r1r2r=r>z"z) rr5r?rrr@reprr7rmrArrr test_repr s zTestRegistry.test_reprcCsHtjid}tddi}td|ijd|d}t|dksDtdS)Nr r1r2r=r>r4z%) rr5r?rrrmr6rr7rArrrtest_repr_mixed_crawled,s z$TestRegistry.test_repr_mixed_crawledcCs,tjdtjidd}t|dks(tdS)Nr=r r4z!)rr6rr5rr7rfrrrtest_repr_one_resource8s  z#TestRegistry.test_repr_one_resourcecCsttdkstdS)Nz)rrr7rcrrrtest_repr_empty?szTestRegistry.test_repr_emptyN)2__name__ __module__ __qualname__r;rDrHrIrOrUrXrardrgrkrnrtrzr{r|rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr0"s^       & !'     r0c@seZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZddZdS) TestResourcecCs*ddi}t|}|t|tdks&tdS)NrPrQr<)rrrr7r9rSrrrr#test_from_contents_from_json_schemaDs z0TestResource.test_from_contents_from_json_schemac Cs*ttjtddiW5QRXdS)z Creating a resource with no discernible way to see what specification it belongs to (e.g. no ``$schema`` keyword for JSON Schema) raises an error. r1r2N)rKrLrrrrrcrrr2test_from_contents_with_no_discernible_informationIsz?TestResource.test_from_contents_with_no_discernible_informationcCs0tjdditjd}|tjddidks,tdSNr1r2rVr rrrrWr5r7r9rrrr>test_from_contents_with_no_discernible_information_and_defaultSs zKTestResource.test_from_contents_with_no_discernible_information_and_defaultcCs0ddi}tj|tjd}|t|tdks,tdS)NrPrQrVr<)rrrrWrr7rrrr#test_from_contents_unneeded_defaultZsz0TestResource.test_from_contents_unneeded_defaultcCs&tjdtd}|tdtdks"tdS)NTrVr<)rrr?r7rrrrtest_non_mapping_from_contentsesz+TestResource.test_non_mapping_from_contentscCs0tjdditjd}|tjddidks,tdSrrrrrr test_from_contents_with_fallbackos z-TestResource.test_from_contents_with_fallbackcCsFtdddddddddd}tdd i|d }|d ksBtdS) NcSsdSN urn:fixedIDrr rrrryrzATestResource.test_id_delegates_to_specification..cSsgSrbrr rrrrzrcSsgSrbrrrrrr{rcSs|Srbrr%rrrr}rr(r1rr<r)rrrxr7r9rrrrr"test_id_delegates_to_specificationvs z/TestResource.test_id_delegates_to_specificationcCs*d}td|di}||ks&tdSr})r?rrxr7)r9r.rsrrrtest_id_strips_empty_fragmentsz*TestResource.test_id_strips_empty_fragmentcCs8tdidgi}t|ddidfDks4tdS)NrrEcSsg|]}t|qSr)r?rr rrrrszMTestResource.test_subresources_delegates_to_specification..)r?rlist subresourcesr7rrrr,test_subresources_delegates_to_specificationsz9TestResource.test_subresources_delegates_to_specificationcCs8ddi}td|gi}t|t|gks4tdS)NrPrQr)r?rrrrr7rrrr-test_subresource_with_different_specifications  z:TestResource.test_subresource_with_different_specificationcCs^tdidddi}t|tdtidtdtddtdtddgksZtdS) Nrr)r1r2rr1rr2r)r?rrrrr7rrrr'test_anchors_delegates_to_specifications z4TestResource.test_anchors_delegates_to_specificationcCs6tjddid}t}|jd|djdks2tdS)Nr1rr z/foor'rr5rr'pointerr r7r9rr'rrrtest_pointer_to_mappings z$TestResource.test_pointer_to_mappingcCs<tjdddgiid}t}|jd|djdks8tdS)Nr1r2rer z /foo/bar/0rrrrrrtest_pointer_to_arrays z"TestResource.test_pointer_to_arraycCs:ddi}tj|d}t}|jd|dj|ks6tdS)Nr1rr rrr)r9r rr'rrrtest_root_pointers  zTestResource.test_root_pointercCs(ddi}t|t|tjdks$tdS)Nr1r2r<)rr5rrWr7)r9r rrr test_opaques  zTestResource.test_opaqueN)rrrrrrrrrrrrrrrrrrrrrrrCs    rc@seZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZddZdd Zd!d"Zd#d$Zd%S)& TestResolvercCs>tjddid}td|i}|d}|j|jks:tdS)Nr1rr r=r)r9rr'resolvedrrrtest_lookup_exact_uris z"TestResolver.test_lookup_exact_uricCsHtddddgd}|t}|d}|jdddksDtdS)Nr~zhttp://example.com/arErFrqr?rrr'rr r7r9rsr:rrrrtest_lookup_subresources z$TestResolver.test_lookup_subresourcecCs>tdddid}|t}|d}|jdks:tdS)Nr~r1rErvzhttp://example.com/#foorrrrrtest_lookup_anchor_with_ids z'TestResolver.test_lookup_anchor_with_idcCs@tdddii}td|}|d}|jdks.rr~r)r9r'rrrJrtest_lookup_retrieved_resource/s z+TestResolver.test_lookup_retrieved_resourcec Cs^d}ttd}ttj||W5QRX|}ttj| |W5QRXdS)zL Unretrievable exceptions are also wrapped in Unresolvable. r~rN) rr/rKrLrrrr'rr)r9r.r:r'rrr%test_lookup_failed_retrieved_resource5s z2TestResolver.test_lookup_failed_retrieved_resourcecsjtjddid}|gfdd}t|d}|d}|j|jksJt|jd}|j|jksftdS) zk A (custom-)retrieved resource is added to the registry returned by looking it up. r1rr csSrbpopr-oncerrrLszKTestResolver.test_repeated_lookup_from_retrieved_resource..retrieverr~Nrr9rrr'rrrr,test_repeated_lookup_from_retrieved_resourceDs   z9TestResolver.test_repeated_lookup_from_retrieved_resourcecsjtjddid}|gfdd}t|d}|d}|j|jksJt|jd}|j|jksftdS) Nr1rr csSrbrr-rrrrZszRTestResolver.test_repeated_anchor_lookup_from_retrieved_resource..retrieverr~rrrrrr3test_repeated_anchor_lookup_from_retrieved_resourceVs   z@TestResolver.test_repeated_anchor_lookup_from_retrieved_resourcec Cstddddigdgd}|t}|}|d}|j|jksJttt j |jdW5QRX|j t|jdd}|d}|jddikstdS)Nr~child/r grandchildrqrr) r?rrr'rr r7rKrLrrr$)r9rsr:r'rsubrrrrtest_in_subresourcegs&   z TestResolver.test_in_subresourcec Cstddddigdgd}|t}|}|d}|j|jksJttt j |jdW5QRX|jd}|jd}|jddikstdS)Nr~r r r rq #/children/0) r?rrr'rr r7rKrLrr)r9rsr:r'rrrrrrtest_in_pointer_subresources"    z(TestResolver.test_in_pointer_subresourcec Cstddddigdgd}tdddigd}||gt}|}|d}|jd}|jd}|jd}t|jd |jjfd |jjfd|jjfgkstt|jd |jjfd|jjfgkstt|jd|jjfgkstt|jgkstdS) Nr~r r r rqzhttp://example.com/twoz two-child/rz#http://example.com/child/grandchildzhttp://example.com/child/) r?rrr'rrZ dynamic_scope _registryr7) r9rBrCr:r'rrrrrrrtest_dynamic_scopes@              zTestResolver.test_dynamic_scopeN)rrrrrrrrrrrrrrrrr r rrrrrrrrs$     rc@s\eZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ dS)TestSpecificationcCs^tdddddddddd}|jdd id }|tdd i|d ksJt|d ksZtdS) NrcSsdSrrr rrrrrz8TestSpecification.test_create_resource..cSsgSrbrr rrrrrcSsgSrbrrrrrrrcSs|Srbrr%rrrrrr(r1rr r<r)rrrr7rxrrrrtest_create_resources  z&TestSpecification.test_create_resourcecCs"ddi}t|}|tkstdSNrPrQ)rdetectrr7r9rSrrrrtest_detect_from_json_schemas z.TestSpecification.test_detect_from_json_schemac Cs*ttjtddiW5QRXdSNr1r2rKrLrrrrrcrrr+test_detect_with_no_discernible_informationsz=TestSpecification.test_detect_with_no_discernible_informationc Cs*ttjtddiW5QRXdS)NrP%rrcrrrtest_detect_with_non_URI_schemasz1TestSpecification.test_detect_with_non_URI_schemacCs"tjddi}|tjkstdSrrrWrr7r9rrrr7test_detect_with_no_discernible_information_and_defaultszITestSpecification.test_detect_with_no_discernible_information_and_defaultcCs$ddi}tj|}|tks tdSr)rrWrrr7rrrrtest_detect_unneeded_defaults z.TestSpecification.test_detect_unneeded_defaultc Cs&ttjtdW5QRXdSNTrrcrrrtest_non_mapping_detectsz)TestSpecification.test_non_mapping_detectcCstd}|tkstdSr")r?rr7rrrr$test_non_mapping_detect_with_defaults z6TestSpecification.test_non_mapping_detect_with_defaultcCs"tjddi}|tjkstdSrrrrrrtest_detect_with_fallbacksz+TestSpecification.test_detect_with_fallbackcCsttdkstdS)Nz&)rr?r7rcrrrrs zTestSpecification.test_reprN) rrrrrrrr r!r#r$r%rrrrrrsrc@sbeZdZddidddegZejdeddZejdedd Z ejded d Z d S) TestOpaqueSpecificationr1r2TrthingcCstj|dkstdS)z/ An arbitrary thing has no ID. N)rrWr)r7r9r'rrr test_no_idsz"TestOpaqueSpecification.test_no_idcCsttj|gkstdS)z9 An arbitrary thing has no subresources. N)rrrWr*r7r(rrrtest_no_subresourcessz,TestOpaqueSpecification.test_no_subresourcescCsttj|gkstdS)z4 An arbitrary thing has no anchors. N)rrrWr+r7r(rrrtest_no_anchorssz'TestOpaqueSpecification.test_no_anchorsN) rrrobjectZTHINGSrKmark parametrizer)r*r+rrrrr&s     r&clsc Cs.tjtddGddd|}W5QRXdS)Nz(?i)subclassingrc@s eZdZdS)z"test_nonsubclassable..BoomN)rrrrrrrBoom sr0)rKrLr)r/r0rrrtest_nonsubclassablesr1)ZrpdsrrKZ referencingrrrrrZreferencing.jsonschemarr?r/r0rrrr&r-r.rr1rrrrs6  %w :PK!/0tests/__pycache__/test_exceptions.cpython-38.pycnu[U af @sddlZddlZddlmZmZddZedZddddd dd dd dd dd dddfZej deddeDddZ ej deddZ dS)N)Resource exceptionscCs t|dS)N) itertools combinations)choicesrR/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/tests/test_exceptions.pypairssr TcCs ttSN)rZCannotDetermineSpecificationTRUErrrr r cCs tdSNurn:example:foo)rZNoSuchResourcerrrr r rcCs ttSr )rZ NoInternalIDr rrrr r rcCstjtdddSNZfooza#b)resourceanchorref)rZ InvalidAnchorr rrrr r rcCstjtdddSr)rZ NoSuchAnchorr rrrr r rcCstjtddS)Nr)rr)rZPointerToNowherer rrrr r rcCs tdSr)rZ Unresolvablerrrr r rcCs tdSr)rZ Unretrievablerrrr r rzone, twoccs|] }|VqdSr r).0Zeachrrr srcCs||ks tdSr AssertionError)oneZtworrr test_eq_incompatible_typessrthunkcCs||hkstdSr r)rrrr test_hash sr) rZpytestZ referencingrrr opaquer ZthunksmarkZ parametrizerrrrrr s"    PK!0tests/__pycache__/test_jsonschema.cpython-38.pycnu[U af- @stddlZddlmZmZmZddlZejddejj fdejj fdejj fdejj fdejj fd ejjfgd d Zd d Zejddejj fdejj fdejj fdejj fdejj fdejjfgddZejdejj ejj ejj ejj gejdddgddZejdejj ejj ejj ejj gejdddgddZejdejj ejj ejj ejj gejdddgddZejddejj fdejj fdejj fdejj fdejj fd ejjfgddZejddejj fd ejj fd!ejj fd"ejjfgd#d$Zd%d&Zd'd(Zd)d*Zd+d,Zd-d.Zd/d0Zd1d2Zd3d4Zd5d6Zd7d8Z dS)9N)RegistryResource Specificationz uri, expectedz,https://json-schema.org/draft/2020-12/schemaz,https://json-schema.org/draft/2019-09/schemaz'http://json-schema.org/draft-07/schema#z'http://json-schema.org/draft-06/schema#z'http://json-schema.org/draft-04/schema#z'http://json-schema.org/draft-03/schema#cCs*d|i}t|}|t||dks&tdS)zE The $schema keyword in JSON Schema is a dialect identifier. $schema)contents specificationN)r from_contentsAssertionError)uriexpectedrresourcer R/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/tests/test_jsonschema.py7test_schemas_with_explicit_schema_keywords_are_detecteds! rc Cs@d}ttjj}td|iW5QRX|jj|ksd}ttjj}tj|W5QRX|jj|ks:tdS)Nr) rrrrrr&rr r rr r r'test_specification_with_unknown_dialectsr*cCs(d}tjj|tjd}|tjks$tdS)Nr)default)rrr&rZOPAQUEr )rrr r rtest_specification_with_defaults r,cCsBtjjddi}td|}|d}|j|jks>t dS)N$dynamicAnchorfoohttp://example.comzhttp://example.com#foo) rr DRAFT202012create_resourcer with_resourceresolverlookuprr )oner3resolvedr r rtest_lookup_trivial_dynamic_refs  r7c Cstjjd}tjjddddddddiddid}td|fd|fd |fg}|d}|jd }|jd j}|d }|j|jkst dS) NTr/ fooAnchorr.r-barZbazrr-$defshttp://example.com/foo/http://example.com/foo/barfoo/r: #fooAnchor rrr0r1rwith_resourcesr3r4rr TRUErootr3firstsecondfourthr r r(test_multiple_lookup_trivial_dynamic_refs8   rIc Cstjjddi}tjjddddddddiddid}td|fd |fd |fg}|d}|jd }|jd j}|d }|j|jkst dS)N$anchorr8r/r.Tr-r9r;r=r>r?r:r@rA)r5twor3rFrGrHr r r2test_multiple_lookup_dynamic_ref_to_nondynamic_refs<   rLcCsRtjjddi}td|}|d}tjj|jd}|j |j ksNt dS)N$recursiveAnchorTr/r3) rr DRAFT201909r1rr2r3r4lookup_recursive_refrr )r5r3rFr6r r r!test_lookup_trivial_recursive_refs rQcCsDtjjd}td|i}tjj|jddd}|j|jks@tdS)NTr/)Zbase_urirN) rrrOr1rrPr3rr )rDregistryr6r r r!test_lookup_recursive_ref_to_bools   rSc Cstjjd}tjjdddddddddddid}td|fd|fd |fg}|d}|jd }|jd j}tjj|d }|j |j kst dS) NTr/r.r8rMrJr9rrMr<r=r>r?r:rN rrrOr1rrBr3r4rPrr rCr r r*test_multiple_lookup_recursive_ref_to_bool(s8  rWc Cstjjddi}tjjdddddddddddid}tjjdd i}td|fd |fd |fg}|d}|jd }|jd j}tjj|d}|j |j kst dS)NrMTr/r.r8rTr9rUFr=r>r?r:rNrV)r5rKZthreer3rFrGrHr r r8test_multiple_lookup_recursive_ref_with_nonrecursive_refPsB  rXcCstjjtkstdSr)rrZEMPTY_REGISTRYrr r r r rtest_empty_registry}srY)!rrrrrZreferencing.jsonschemamarkZ parametrizerr0rOZDRAFT7ZDRAFT6ZDRAFT4ZDRAFT3rrrr!r#r$r(r)r*r,r7rIrLrQrSrWrXrYr r r rs                  ')  (-PK!'R 7tests/__pycache__/test_referencing_suite.cpython-38.pycnu[U af @sddlmZddlZddlZddlZddlmZddlmZddl ZGddde Z dej krreej ddZ neejjjd Z e se ee d Zejd d d e dDddZdS))PathN)Registry) Unresolvablec@seZdZddZdS) SuiteNotFoundcCsdS)NzCannot find the referencing suite. Set the REFERENCING_SUITE environment variable to the path to the suite, or run the test suite from alongside a full checkout of the git repository.)selfrrY/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/tests/test_referencing_suite.py__str__ szSuiteNotFound.__str__N)__name__ __module__ __qualname__r rrrrr srZREFERENCING_SUITEtestsz suite/testszspecifications.json test_pathcCs*g|]"}tj||jjd|jdqS)-)id)pytestparamparentnamestem).0Zeachrrr !srz */**/*.jsonc sTt|tjd}tj|t| }|d}t fdd|d D}|dD]}|j |dd|jkrtd|j|d d d }|d rtt||d W5QRXnz||d }|j|dkst|d}|rD|j ||d*|j|d }|j|dks.tW5QRX|d}qW5QRXq`dS)Nrregistryc3s |]\}}||fVqdS)N)Zcreate_resource)ruricontents specificationrr +sz)test_referencing_suite..r )testZ normalizationz+APIs need to change for proper URL support.base_uri)rerrorreftargetthen)rr$) DIALECT_IDS relative_toSUITEparts referencingZ jsonschemaZspecification_withjsonloads read_textrZwith_resourcesitemsrrrZxfailresolvergetZraisesrlookuprAssertionError) rZsubtestsZ dialect_idloadedrrr.resolvedr$rrrtest_referencing_suites.        r4)pathlibrr*osrr)rZreferencing.exceptionsrZreferencing.jsonschema Exceptionrenvironr'__file__ris_dirr+joinpathr,r%markZ parametrizeglobr4rrrrs(    PK! r/tests/__pycache__/test_retrieval.cpython-38.pycnu[U af@sZddlmZddlZddlZddlmZmZmZddlm Z ddl m Z GdddZ dS)) lru_cacheN)RegistryResource exceptions) DRAFT202012)to_cached_resourcec@s,eZdZddZddZddZddZd S) TestToCachedResourcecspddi}t|gtfdd}t|d}t|}|d}|j|ksRt|d}|j|jksltdS)N$schema,https://json-schema.org/draft/2020-12/schemacsSNpopuristackQ/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/tests/test_retrieval.pyretrieveszITestToCachedResource.test_it_caches_retrieved_resources..retrieverurn:example:schema jsondumpsrrr from_contentsget_or_retrievevalueAssertionErrorselfcontentsrregistryexpectedgotZagainrrr"test_it_caches_retrieved_resources s     z7TestToCachedResource.test_it_caches_retrieved_resourcescsddi}t|dddgtdddfdd}t|d }t|}|d }|j|ksdt|d }|j|jks~tdS) Nr r cSst|dddS)Nr%)rloads)srrr#z9TestToCachedResource.test_custom_loader..)r&csSr r rrrrr#sz9TestToCachedResource.test_custom_loader..retrieverrrrrrrtest_custom_loaders     z'TestToCachedResource.test_custom_loadercsri}t|gttjdfdd}t|d}t|}|d}|j|ksTt|d}|j|jksntdS)N)rcsSr r rrrrr6sz@TestToCachedResource.test_custom_from_contents..retrieverr) rrrrZcreate_resourcerrrrrrrrtest_custom_from_contents2s      z.TestToCachedResource.test_custom_from_contentsc shddi}t|ddt|ddt|ddddd D}ttdd d fd d }t|d}|d}|j|dkst|dj|jkst|dj|jkst|d}|j|dkst|dj|jkst|dj|jkst|d}|j|dkst|dj|jks*t|dj|jksBtt t j |dW5QRXdS)Nr r )Zfoo) urn:example:1 urn:example:2 urn:example:3cSsi|]\}}|t|qSr)rr).0rr rrr Msz:TestToCachedResource.test_custom_cache..)maxsize)cachecst|Sr )rrr rmappingrrrRsz8TestToCachedResource.test_custom_cache..retrieverr/r0r1) dictitemsrrrrrrpytestZraisesrZ Unretrievable)rZschema resourcesrr!r#rr6rtest_custom_cacheEs2       z&TestToCachedResource.test_custom_cacheN)__name__ __module__ __qualname__r$r*r+r<rrrrr sr) functoolsrrr:Z referencingrrrZreferencing.jsonschemarZreferencing.retrievalrrrrrrs   PK!X#__pycache__/__init__.cpython-38.pycnu[U af@s,dZddlmZmZmZmZddddgZdS)z@ Cross-specification, implementation-agnostic JSON referencing. )AnchorRegistryResource SpecificationrrrrN)__doc__Zreferencing._corerrrr__all__rrE/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/__init__.pysPK! '!__pycache__/_attrs.cpython-38.pycnu[U af@szddlmZddlmZmZddlmZmZ edZ dddddZdddd d ZGd d d e Z e d dddZdS)) annotations)NoReturnTypeVar)definefrozen_Tztype[_T])clsreturncCst|_t|SN)_do_not_subclass__init_subclass___definerrC/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/_attrs.pyr srcCst|_t|Sr )r r _frozenrrrrrsrc@seZdZddZdS)UnsupportedSubclassingcCsdS)NzSubclassing is not part of referencing's public API. If no other suitable API exists for what you're trying to do, feel free to file an issue asking for one.r)selfrrr__str__szUnsupportedSubclassing.__str__N)__name__ __module__ __qualname__rrrrrrsrr)r cCs tdSr )rrrrrr sr N) __future__rtypingrrattrsrr rrr Exceptionr staticmethodr rrrrs  PK!!irOUU __pycache__/_core.cpython-38.pycnu[U af`@s,UddlmZddlmZmZmZddlmZddlm Z m Z m Z m Z m Z mZddlmZmZmZddlmZmZddlmZmZmZddlmZdd lmZdd lmZm Z!m"Z"m#Z#m$Z$eZ%d e&d <eZ'd e&d<GdddeZ(e(j)Z*Gddde e"Z+dddddZ,dddddZ-GdddZ.eGddde e"Z/e/d d!d"d#d"d$d"d%d"d&e/_0eGd'd(d(e e"Z1d)d*d+d,Z2eGd-d.d.e#ee1e"fZ3ed/e!e e1e Z4eGd0d1d1e e"e4fZ5eGd2d3d3e e"Z6eGd4d5d5e e"Z7eGd6d7d7e e"Z d8S)9) annotations)IterableIteratorSequence)Enum)AnyCallableClassVarGenericProtocolTypeVar)unquote urldefragurljoin)evolvefield) HashTrieMap HashTrieSetList) exceptions)frozen)URIAnchorDMappingRetrieveHashTrieSet[URI]EMPTY_UNCRAWLED List[URI]EMPTY_PREVIOUS_RESOLVERSc@seZdZdZdZdS)_Unsetz What sillyness... N)__name__ __module__ __qualname____doc__SENTINELr'r'B/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/_core.pyr sr c@s eZdZdddddddZdS)_MaybeInSubresourcezSequence[int | str] Resolver[D] Resource[D])segmentsresolver subresourcereturncCsdSNr')selfr,r-r.r'r'r(__call__sz_MaybeInSubresource.__call__N)r"r#r$r2r'r'r'r(r)sr)rSpecification[D]contentsr/cCsFt|tst||d}t|ts2t|ddlm}||S)N$schemarspecification_with) isinstancerrCannotDetermineSpecificationgetstrreferencing.jsonschemar8r5Zjsonschema_dialect_idr8r'r'r(_detect_or_error's      r?Callable[[D], Specification[D]])defaultr/csdddfdd }|S)Nrr3r4cs<t|tsS|d}|dkr$Sddlm}||dS)Nr6rr7rA)r9rr;r=r8r>rBr'r(_detect7s   z#_detect_or_default.._detectr')rArCr'rBr(_detect_or_default4srDc@seZdZddddddZdS)_SpecificationDetectorzSpecification[D] | Noneztype[Specification[D]]r@)instanceclsr/cCs|dkr tSt|SdSr0)r?rD)r1rFrGr'r'r(__get__Jsz_SpecificationDetector.__get__N)r"r#r$rHr'r'r'r(rEIsrEc@s~eZdZUdZded<ded<ded<ded <ed d Zd ed <ded<eZddddZ ddddZ dddddZ dS) Specificationz A specification which defines referencing behavior. The various methods of a `Specification` allow for varying referencing behavior across JSON Schema specification versions, etc. r<namezCallable[[D], URI | None]id_ofzCallable[[D], Iterable[D]]subresources_ofz_MaybeInSubresource[D]maybe_in_subresource anchors_inaliasz8Callable[[Specification[D], D], Iterable[AnchorType[D]]] _anchors_inzClassVar[Specification[Any]]OPAQUEr/cCsd|jdS)Nz)rJr1r'r'r(__repr__szSpecification.__repr__rr5cCs |||S)zG Retrieve the anchors contained in the given document. )rQr1r5r'r'r(rNszSpecification.anchors_inr+r4cCs t||dS)zR Create a resource which is interpreted using this specification. )r5 specification)ResourcerXr'r'r(create_resourceszSpecification.create_resourceN) r"r#r$r%__annotations__rrQrEdetectrVrNr[r'r'r'r(rIUs  !rIopaquecCsdSr0r'rWr'r'r(r_cCsgSr0r'rWr'r'r(r_r`cCsgSr0r')rYr5r'r'r(r_r`cCs|Sr0r'r,r-r.r'r'r(r_r`)rJrKrLrNrMc@seZdZUdZded<eddZded<eefddd d d d Z edd d ddZ ddddZ ddddZ ddddZ ddddddZd S)!rZa A document (deserialized JSON) with a concrete interpretation under a spec. In other words, a Python object, along with an instance of `Specification` which describes how the document interacts with referencing -- both internally (how it refers to other `Resource`\ s) and externally (how it should be identified such that it is referenceable by other documents). rr5rYrOr3_specificationz)type[Specification[D]] | Specification[D]r+)r5default_specificationr/cCs||}|j|dS)aG Create a resource guessing which specification applies to the contents. Raises: `CannotDetermineSpecification` if the given contents don't have any discernible information which could be used to guess which specification they identify as rW)r]r[)rGr5rcrYr'r'r( from_contentss zResource.from_contentsr4cCstjj|dS)z Create an opaque `Resource` -- i.e. one with opaque specification. See `Specification.OPAQUE` for details. rW)rIrRr[)rGr5r'r'r(r^szResource.opaquez URI | NonerScCs$|j|j}|dkrdS|dS)O Retrieve this resource's (specification-specific) identifier. N#)rbrKr5rstrip)r1idr'r'r(rhsz Resource.idzIterable[Resource[D]]csfddjjDS)z8 Retrieve this resource's subresources. c3s|]}tj|jdVqdS))rcN)rZrdrb.0eachrUr'r( s z(Resource.subresources..)rbrLr5rUr'rUr( subresourcess  zResource.subresourceszIterable[AnchorType[D]]cCs|j|jS)re)rbrNr5rUr'r'r(anchorsszResource.anchorsr<r* Resolved[D])pointerr-r/c Cs|st|j|dS|j}g}t|dddD]}t|trJt|}n|dddd}z ||}Wn6tk r}zt j ||d}||W5d}~XYnX| ||}|j j |||j |d }||k r2g}q2t||dS) z Resolve the given JSON pointer. Raises: `exceptions.PointerToNowhere` if the pointer points to a location not present in the document r5r-r!N/z~1z~0~)refresourcera)Resolvedr5r splitr9rintreplace LookupErrorrZPointerToNowhereappendrbrMr[) r1rpr-r5r,segment lookup_errorerrorlastr'r'r(rps.      zResource.pointerN)r"r#r$r%r\rrb classmethodrIrdr^rhrmrnrpr'r'r'r(rZs    rZruricCstj|ddS)Nrt)rNoSuchResourcerr'r'r(_fail_to_retrieve src@sTeZdZUdZeeejddZded<eZ ded<e Z ded <ee d d Z d ed <dddddZddddZddddZdddddZddd d!Zdd"dd#d$Zdd%d&d'Zddd(d)d*Zdd+dd,d-Zddd.d/Zddd0d1d2Zd3dd4d5d6Zd7d8dd9d:d;Zddd<d=d>ZdHdd@dAdBdCZdd@dDdEdFZdGS)IRegistryab A registry of `Resource`\ s, each identified by their canonical URIs. Registries store a collection of in-memory resources, and optionally enable additional resources which may be stored elsewhere (e.g. in a database, a separate set of files, over the network, etc.). They also lazily walk their known resources, looking for subresources within them. In other words, subresources contained within any added resources will be retrievable via their own IDs (though this discovery of subresources will be delayed until necessary). Registries are immutable, and their methods return new instances of the registry with the additional resources added to them. The ``retrieve`` argument can be used to configure retrieval of resources dynamically, either over the network, from a database, or the like. Pass it a callable which will be called if any URI not present in the registry is accessed. It must either return a `Resource` or else raise a `NoSuchResource` exception indicating that the resource does not exist even according to the retrieval logic. resources)rA converterrPzHashTrieMap[URI, Resource[D]] _resourcesz+HashTrieMap[tuple[URI, str], AnchorType[D]]_anchorsr _uncrawledretrieve)rArPz Retrieve[D] _retrieverr+)rr/cCs:z|j|dWStk r4tj|ddYnXdS)zV Return the (already crawled) `Resource` identified by the given URI. rfrN)rrgKeyErrorrrr1rr'r'r( __getitem__FszRegistry.__getitem__z Iterator[URI]rScCs t|jS)z@ Iterate over all crawled URIs in the registry. )iterrrUr'r'r(__iter__OszRegistry.__iter__rxcCs t|jS)zU Count the total number of fully crawled resources in this registry. )lenrrUr'r'r(__len__UszRegistry.__len__z#Resource[D] | Iterable[Resource[D]] Registry[D])newr/cCsft|tr|f}|j}|j}|D]6}|}|dkr@tj|d||}|||}q t|||dS)a2 Create a new registry with resource(s) added using their internal IDs. Resources must have a internal IDs (e.g. the :kw:`$id` keyword in modern JSON Schema versions), otherwise an error will be raised. Both a single resource as well as an iterable of resources works, i.e.: * ``resource @ registry`` or * ``[iterable, of, multiple, resources] @ registry`` which -- again, assuming the resources have internal IDs -- is equivalent to calling `Registry.with_resources` as such: .. code:: python registry.with_resources( (resource.id(), resource) for resource in new_resources ) Raises: `NoInternalID` if the resource(s) in fact do not have IDs N)rur uncrawled) r9rZrrrhrZ NoInternalIDinsertr)r1rrrrurhr'r'r( __rmatmul__[s   zRegistry.__rmatmul__r<cCsft|}|dkrdnd}|jrNt|j}||kr)rr)r1sizeZ pluralizedrsummaryr'r'r(rVs  zRegistry.__repr__zRetrieved[D, Resource[D]]c Cs|j|}|dk r t||dS|}|j|}|dk rHt||dSz||}WnLtjtjfk rtYnHtk r}ztj |d|W5d}~XYnX| ||}t||dSdS)aE Get a resource from the registry, crawling or retrieving if necessary. May involve crawling to find the given URI if it is not already known, so the returned object is a `Retrieved` object which contains both the resource value as well as the registry which ultimately contained it. N)registryvaluer) rr; Retrievedcrawlrrr:r Exception Unretrievable with_resource)r1rrurr~r'r'r(get_or_retrieves$       zRegistry.get_or_retrievercsN|jkrtjdt||j|jtfdd|j DdS)zX Return a registry with the resource identified by a given URI removed. rc3s&|]\}}|dkr||fVqdS)rNr')rjkvrr'r(rls z"Registry.remove..)rrrn) rrrrremoverdiscardrritemsrr'rr(rs     zRegistry.remove)rrJcCs|j||f}|dk r$t||dS|}|j||f}|dk rPt||dS||}|}|dk r|j||f}|dk rt||dSd|krtj|||dtj|||ddS)zX Retrieve a given anchor from a resource which must already be crawled. N)rrrr)rtruanchor)rr;rrrhrZ InvalidAnchorZ NoSuchAnchor)r1rrJrrruZ canonical_urir'r'r(rs(   zRegistry.anchorrcCs ||jS)zV Retrieve the (already crawled) contents identified by the given URI. rWrr'r'r(r5szRegistry.contentscs|j|j}fdd|jD}|r|\}|}|dk rVt|||D]}||jf|}q^| fdd| Dq t ||t dS)zF Crawl all added resources, discovering subresources. csg|]}||fqSr'r')rjr)rr'r( sz"Registry.crawl..Nc3s|]}|fVqdSr0r'rirr'r(rlsz!Registry.crawl..)rrnr) rrrpoprhrrrnrJextendrmrr)r1rnrrurhrkr')rrr(rs$    zRegistry.crawl)rrucCs|||fgS)zP Add the given `Resource` to the registry, without crawling it. with_resources)r1rrur'r'r(rszRegistry.with_resourcez!Iterable[tuple[URI, Resource[D]]])pairsr/cCsH|j}|j}|D](\}}|d}||}|||}qt|||dS)zU Add the given `Resource`\ s to the registry, without crawling them. rfr)rrrgrr)r1rrrrrur'r'r(rs   zRegistry.with_resourceszIterable[tuple[URI, D]]r)rkwargsr/c s|fdd|DS)zW Add the given contents to the registry, autodetecting when necessary. c3s$|]\}}|tj|ffVqdSr0)rZrd)rjrrkrr'r(rlsz)Registry.with_contents..r)r1rrr'rr( with_contentsszRegistry.with_contents) registriesr/cGs||fkr|S|j}|j}|j}|j}|D]Z}||j}||j}||j}|jtk r*|j|k rrtk r~nntd|j}q*t|||||dS)zY Combine together one or more other registries, producing a unified one. z?Cannot combine registries with conflicting retrieval functions.)rnrrr)rrrrupdater ValueErrorr)r1rrrnrrrr'r'r(combine!s.     zRegistry.combiner*)base_urir/cCs t||dS)zV Return a `Resolver` which resolves references against this registry. rr)Resolver)r1rr'r'r(r-?szRegistry.resolver)rur/cCs |p d}t||||dS)zD Return a `Resolver` with a specific root resource. rr)rhrr)r1rurr'r'r(resolver_with_rootEs   zRegistry.resolver_with_rootN)r)r"r#r$r%rrconvertrr\rrrrrrrrrrVrrrr5rrrrrr-rr'r'r'r(r$s2   -  rAnchorOrResourcec@s"eZdZUdZded<ded<dS)rz. A value retrieved from a `Registry`. rrrrNr"r#r$r%r\r'r'r'r(rTs rc@s"eZdZUdZded<ded<dS)rvz? A reference resolved to its contents by a `Resolver`. rr5r*r-Nrr'r'r'r(rv^s rvc@seZdZUdZeddZded<eddZded<eed d d Z d ed <dddddZ dddddZ ddddZ dddddZ dS) ra[ A reference resolver. Resolvers help resolve references (including relative ones) by pairing a fixed base URI with a `Registry`. This object, under normal circumstances, is expected to be used by *implementers of libraries* built on top of `referencing` (e.g. JSON Schema implementations or other libraries resolving JSON references), not directly by end-users populating registries or while writing schemas or other resources. References are resolved against the base URI, and the combined URI is then looked up within the registry. The process of resolving a reference may itself involve calculating a *new* base URI for future reference resolution (e.g. if an intermediate resource sets a new base URI), or may involve encountering additional subresources and adding them to a new registry. rrOr _base_urirr _registryFprevious)rAreprrPr _previousro)rtr/c Cs|dr |j|dd}}ntt|j|\}}z|j|}WnTtjk rhtj|ddYn2tj k r}ztj|d|W5d}~XYnX|dr|j |j |d}|j j ||dS|r|j ||}|j |j |d}|j j|dS|j |j |d}t|j j|d S) a Resolve the given reference to the resource it points to. Raises: `exceptions.Unresolvable` or a subclass thereof (see below) if the reference isn't resolvable `exceptions.NoSuchAnchor` if the reference is to a URI where a resource exists but contains a plain name fragment which does not exist within the resource `exceptions.PointerToNowhere` if the reference is to a URI where a resource exists but contains a JSON pointer to a location within the resource that does not exist rfr!Nrrr)rr)rpr-r-rq) startswithrrrrrrrZ Unresolvabler_evolverrrprresolvervr5)r1rtrfragmentZ retrievedr~r-r'r'r(lookups$   zResolver.lookupr+r*)r.r/cCs(|}|dkr|St|t|j|dS)zV Create a resolver for a subresource (which may have a new base URI). N)r)rhrrr)r1r.rhr'r'r(in_subresourceszResolver.in_subresourcez!Iterable[tuple[URI, Registry[D]]]rSccs|jD]}||jfVqdS)zT In specs with such a notion, return the URIs in the dynamic scope. N)rrrr'r'r( dynamic_scopes zResolver.dynamic_scoper)rrcKs<|j}|jr&|r||jkr&||j}t|f||d|S)z9 Evolve, appending to the dynamic scope. )rr)rrZ push_frontr)r1rrrr'r'r(rs zResolver._evolveN)r"r#r$r%rrr\rrrrrrrr'r'r'r(rhs / rc@s0eZdZUdZded<ded<dddd Zd S) rz* A simple anchor in a `Resource`. r<rJr+rur*rcCst|jj|dS)z6 Return the resource for this anchor. rq)rvrur5)r1r-r'r'r(rszAnchor.resolveN)r"r#r$r%r\rr'r'r'r(rs rN)8 __future__rcollections.abcrrrenumrtypingrrr r r r urllib.parser rrattrsrrZrpdsrrrZ referencingrZreferencing._attrsrZreferencing.typingrrZ AnchorTyperrrrr\rr r&_UNSETr)r?rDrErIrRrZrrrrrvrr'r'r'r(sV        Q o.  cPK!q%__pycache__/exceptions.cpython-38.pycnu[U afP@sdZddlmZddlmZmZddlZddlmZerPddl m Z ddl m Z eGdd d e ZeGd d d eZeGd d d e ZeGdddeZejGdddeZeGdddeZeGdddeZeGdddeZdS)z Errors, oh no! ) annotations) TYPE_CHECKINGAnyN)frozen)Resource)URIc@s8eZdZUdZded<dddddZd d d d Zd S)NoSuchResourcez The given URI is not present in a registry. Unlike most exceptions, this class *is* intended to be publicly instantiable and *is* part of the public API of the package. rrefobjectboolotherreturncCs$|j|jk rtSt|t|kSN __class__NotImplementedattrsastupleselfr rG/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/exceptions.py__eq__s zNoSuchResource.__eq__intrcCstt|Srhashrrrrrr__hash__"szNoSuchResource.__hash__N__name__ __module__ __qualname____doc____annotations__rrrrrrrs rc@s8eZdZUdZded<dddddZd d d d Zd S) NoInternalIDa A resource has no internal ID, but one is needed. E.g. in modern JSON Schema drafts, this is the :kw:`$id` keyword. One might be needed if a resource was to-be added to a registry but no other URI is available, and the resource doesn't declare its canonical URI. Resource[Any]resourcer r r cCs$|j|jk rtSt|t|kSrrrrrrr3s zNoInternalID.__eq__rrcCstt|Srrrrrrr8szNoInternalID.__hash__Nr rrrrr&&s  r&c@s8eZdZUdZded<dddddZd d d d Zd S) UnretrievablezO The given URI is not present in a registry, and retrieving it failed. rr r r r cCs$|j|jk rtSt|t|kSrrrrrrrDs zUnretrievable.__eq__rrcCstt|SrrrrrrrIszUnretrievable.__hash__Nr rrrrr)<s r)c@s8eZdZUdZded<dddddZd d d d Zd S)CannotDetermineSpecificationz Attempting to detect the appropriate `Specification` failed. This happens if no discernible information is found in the contents of the new resource which would help identify it. rcontentsr r r cCs$|j|jk rtSt|t|kSrrrrrrrXs z#CannotDetermineSpecification.__eq__rrcCstt|Srrrrrrr]sz%CannotDetermineSpecification.__hash__Nr rrrrr*Ms r*c@s8eZdZUdZded<dddddZd d d d Zd S) Unresolvablez' A reference was unresolvable. rr r r r cCs$|j|jk rtSt|t|kSrrrrrrris zUnresolvable.__eq__rrcCstt|SrrrrrrrnszUnresolvable.__hash__Nr rrrrr,as r,c@s(eZdZUdZded<ddddZdS) PointerToNowherezK A JSON Pointer leads to a part of a document that does not exist. r'r(strrcCs*|jd|jj}|jdkr&|d7}|S)N does not exist within /z. The pointer '/' is a valid JSON Pointer but it points to an empty string property ''. If you intended to point to the entire resource, you should use '#'.)r r(r+)rmsgrrr__str__zs  zPointerToNowhere.__str__Nr!r"r#r$r%r2rrrrr-rs r-c@s0eZdZUdZded<ded<ddddZd S) NoSuchAnchorz@ An anchor does not exist within a particular resource. r'r(r.anchorrcCs|jd|jjS)Nr/)r5r(r+rrrrr2szNoSuchAnchor.__str__Nr3rrrrr4s r4c@s0eZdZUdZded<ded<ddddZd S) InvalidAnchorzu An anchor which could never exist in a resource was dereferenced. It is somehow syntactically invalid. r'r(r.r5rcCsd|jd|jdS)Nz'#zo' is not a valid anchor, neither as a plain name anchor nor as a JSON Pointer. You may have intended to use '#/zD', as the slash is required *before each segment* of a JSON pointer.)r5rrrrr2szInvalidAnchor.__str__Nr3rrrrr6s r6)r$ __future__rtypingrrrZreferencing._attrsrZ referencingrZreferencing.typingrKeyErrorr Exceptionr&r)r*r,r-r4r6rrrrs.    PK!G,,%__pycache__/jsonschema.cpython-38.pycnu[U afI@sUdZddlmZddlmZmZddlmZmZm Z ddl m Z m Z m Z mZmZddlmZddlmZmZmZmZddlmZm ZmZeeefZe eefZ e e Z!e e Z"e Z#d e$d <eGd d d e%Z&d ddddZ'd ddddZ(dddddZ)dd ddddZ*dd ddddZ+dd dddd Z,d!dd"dd#d$Z-e.e.e.fd%d%d%d&d'd(Z/e.e.e.fd%d%d%d&d)d*Z0e.e.e.fd%d%d%d&d+d,Z1e.e.e.fd%d%d%d&d-d.Z2e.e.e.fd%d%d%d&d/d0Z3e.e.e.fd%d%d%d&d1d2Z4e.e.e.fd%d%d%d&d3d4Z5ed5e'e/d6d7d8d9d:d;dd?d@h dAdBdCdDhdEdFdGdHdIhdJe*e3d6d7d8d9d:d;dd?d@h dAdBdCdDhdEdFdGdHdIhdJdKZ6edLe'e0dMd6d7d8d9d:dd?d@h dAdBdChdEdFdGdHdIhdJe+e4dMd6d7d8d9d:dd?d@h dAdBdChdEdFdGdHdIhdJdKZ7edNe(e1dMd6d7d9d:dhdAdBdChdFdHdIhdJe,e5dMd6d7d9d:dhdAdBdChdFdHdIhdJdKZ8edOe(e1dMd6d7ddfdgdhdidjZ?dkS)lzI Referencing implementations for JSON Schema specs (historic & current). ) annotations)SequenceSet)AnyIterableUnion)AnchorRegistryResource Specification exceptions)frozen)_UNSETResolvedResolver_Unset)URIrMappingSchemaRegistryEMPTY_REGISTRYc@seZdZUdZded<dS)UnknownDialectz A dialect identifier was found for a dialect unknown by this library. If it's a custom ("unofficial") dialect, be sure you've registered it. ruriN)__name__ __module__ __qualname____doc____annotations__rrG/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/jsonschema.pyr$s rSchemaz URI | NonecontentsreturncCst|trdS|dS)N$id) isinstanceboolget)r!rrr _dollar_id/s r'cCs:t|tsd|krdS|d}|dk r6|ds6|SdS)N$refr##)r$r%r& startswithr!idrrr_legacy_dollar_id5s  r- ObjectSchemacCs0d|kr dS|d}|dk r,|ds,|SdS)Nr(r,r))r&r*r+rrr _legacy_id=s  r/zSpecification[Schema]zIterable[AnchorType[Schema]]) specificationr!r"ccs^t|trdS|d}|dk r4t|||dV|d}|dk rZt|||dVdS)N$anchornameresourcez$dynamicAnchor)r$r%r&rcreate_resource DynamicAnchor)r0r!anchorZdynamic_anchorrrr_anchorEs   r8zIterable[Anchor[Schema]]cCs8t|trgS|d}|dkr$gSt|||dgS)Nr1r2)r$r%r&rr5)r0r!r7rrr _anchor_2019Zs  r9cCsDt|trgS|dd}|ds(gSt|dd||dgS)Nr#r)r2)r$r%r&r*rr5r0r!r,rrr_legacy_anchor_in_dollar_idks    r=zSpecification[ObjectSchema]zIterable[Anchor[ObjectSchema]]cCs6|dd}|dsgSt|dd||dgS)Nr,r:r)r;r2)r&r*rr5r<rrr_legacy_anchor_in_id|s   r>zSet[str])in_value in_subvalues in_subarraycsdddfdd }|S)z Create a callable returning JSON Schema specification-style subschemas. Relies on specifying the set of keywords containing subschemas in their values, in a subobject's values, or in a subarray. rIterable[ObjectSchema]r c3srt|trdSD]}||kr||VqD]}||kr.||EdHq.D]}||krN||EdHqNdSN)r$r%values)r!eachrAr@r?rrsubresources_ofs  z)_subresources_of..subresources_ofrr?r@rArGrrFr_subresources_ofs  rIcsdddfdd }|S)O Specifically handle older drafts where there are some funky keywords. rrBr c3st|trdSD]}||kr||VqD]}||kr.||EdHq.D]}||krN||EdHqN|d}|dk rt|tr|EdHn|VdSNitems)r$r%rDr&r)r!rErLrFrrrGs      z:_subresources_of_with_crazy_items..subresources_ofrrHrrFr!_subresources_of_with_crazy_itemss rMcsdddfdd }|S)rJrrBr c3st|trdSD]}||kr||VqD]}||kr.||EdHq.D]}||krN||EdHqN|d}|dk rt|tr|EdHn|V|d}|dk rt|}t|d}t|tr|V|EdHdS)NrL dependencies)r$r%rDr&riternextrr!rErLrNrDvaluerFrrrGs.         zG_subresources_of_with_crazy_items_dependencies..subresources_ofrrHrrFr._subresources_of_with_crazy_items_dependenciess rScsdddfdd }|S)zT Specifically handle even older drafts where there are some funky keywords. r.rBr c3sD]}||kr||VqD]}||kr ||EdHq D]}||kr@||EdHq@|d}|dk rt|tr|EdHn|V|d}|dk rt|}t|d}t|tr|V|EdHdD]}||}t|tr|VqdS)NrLrN)additionalItemsadditionalProperties)rDr&r$rrOrPrrQrFrrrGs2           zJ_subresources_of_with_crazy_aP_items_dependencies..subresources_ofrrHrrFr1_subresources_of_with_crazy_aP_items_dependenciess rVcs&||Bdddddfdd }|S)NSequence[int | str]_Resolver[Any] Resource[Any]segmentsresolver subresourcer"csBt|}|D]*}|kr |ks.t|ddkr |Sq ||SrC)rOrPin_subresourcer[r\r]Z _segmentssegmentZin_childr?rrmaybe_in_subresources  z3_maybe_in_subresource..maybe_in_subresourcerr?r@rArbrrar_maybe_in_subresources rdcs&||Bdddddfdd }|S)NrWrXrYrZcsdt|}|D]L}|dkr2t|jtr2||S|kr |ksPt|ddkr |Sq ||SrKrOr$r!rr^rPr_rarrrb6s   z?_maybe_in_subresource_crazy_items..maybe_in_subresourcerrcrrar!_maybe_in_subresource_crazy_items/srfcs&||Bdddddfdd }|S)NrWrXrYrZcsdt|}|D]L}|dkr2t|jtr2||S|kr |ksPt|ddkr |Sq ||S)N>rNrLrer_rarrrbRs   zL_maybe_in_subresource_crazy_items_dependencies..maybe_in_subresourcerrcrrar._maybe_in_subresource_crazy_items_dependenciesKsrgz draft2020-12rUcontainsZ contentSchemaelseifrLnotZ propertyNamesZthenZunevaluatedItemsZunevaluatedPropertiesZallOfZanyOfoneOfZ prefixItemsz$defs definitionsZdependentSchemasZpatternPropertiesZ properties)r?rAr@)r3Zid_ofrGZ anchors_inrbz draft2019-09rTzdraft-07zdraft-06zdraft-04zdraft-03Zextends)rAr@cCsi|]\}}|t|qSr)r opaque).0 dialect_idr0rrr /srqz,https://json-schema.org/draft/2020-12/schemaz,https://json-schema.org/draft/2019-09/schemaz&http://json-schema.org/draft-07/schemaz&http://json-schema.org/draft-06/schemaz&http://json-schema.org/draft-04/schemaz&http://json-schema.org/draft-03/schemazRegistry[Specification[Schema]]_SPECIFICATIONSrzSpecification[Any] | _UnsetzSpecification[Any])rpdefaultr"cCs2t|d}|dk r|jS|tkr.t||S)z Retrieve the `Specification` with the given dialect identifier. Raises: `UnknownDialect` if the given ``dialect_id`` isn't known r)N)rrr&rstripr!rr)rprsr4rrrspecification_with=s ruc@s2eZdZUdZded<ded<dddd d Zd S) r6z4 Dynamic anchors, introduced in draft 2020. strr3SchemaResourcer4_Resolver[Schema]_Resolved[Schema]r\r"c Csl|j}|D]H\}}z|||jj}Wntjk rDYqYnXt|tr|j}qt |j | |dS)z2 Resolve this anchor dynamically. )r!r\) r4 dynamic_scoper7r3rRr Z NoSuchAnchorr$r6 _Resolvedr!r^)selfr\lastrregistryr7rrrresolve\s  zDynamicAnchor.resolveN)rrrrrrrrrrr6Ss r6rxryrzcCsb|d}t|jtr^|jdr^|D]2\}}||}t|jtrT|jdsXq^|}q*|S)u Recursive references (via recursive anchors), present only in draft 2019. As per the 2019 specification (§ 8.2.4.2.1), only the ``#`` recursive reference is supported (and is therefore assumed to be the relevant reference). r)z$recursiveAnchor)lookupr$r!rr&r{)r\resolvedr_Z next_resolvedrrrlookup_recursive_refns   rN)@r __future__rcollections.abcrrtypingrrrZ referencingrr r r r Zreferencing._attrsr Zreferencing._corerrr|rZ _ResolverrZreferencing.typingrZ AnchorTyperrvr.r%rrwrrr Exceptionrr'r-r/r8r9r=r> frozensetrIrMrSrVrdrfrgZ DRAFT202012Z DRAFT201909ZDRAFT7ZDRAFT6ZDRAFT4ZDRAFT3rrrur6rrrrrs     !(+  4  4  "  PK!Uȶ $__pycache__/retrieval.cpython-38.pycnu[U af @sdZddlmZddlmZddlmZmZmZddl Z ddl m Z er\ddl m Z mZmZedZde je jfd d d d d ddZdS)z2 Helpers related to (dynamic) resource retrieval. ) annotations lru_cache) TYPE_CHECKINGCallableTypeVarN)Resource)URIDRetrieve_Tz+Callable[[Retrieve[D]], Retrieve[D]] | NonezCallable[[_T], D]zCallable[[D], Resource[D]]z,Callable[[Callable[[URI], _T]], Retrieve[D]])cacheloads from_contentsreturncs,dkrtddddfdd }|S)aG Create a retriever which caches its return values from a simpler callable. Takes a function which returns things like serialized JSON (strings) and returns something suitable for passing to `Registry` as a retrieve function. This decorator both reduces a small bit of boilerplate for a common case (deserializing JSON from strings and creating `Resource` objects from the result) as well as makes the probable need for caching a bit easier. Retrievers which otherwise do expensive operations (like hitting the network) might otherwise be called repeatedly. Examples -------- .. testcode:: from referencing import Registry from referencing.typing import URI import referencing.retrieval @referencing.retrieval.to_cached_resource() def retrieve(uri: URI): print(f"Retrieved {uri}") # Normally, go get some expensive JSON from the network, a file ... return ''' { "$schema": "https://json-schema.org/draft/2020-12/schema", "foo": "bar" } ''' one = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo") print(one.value.contents["foo"]) # Retrieving the same URI again reuses the same value (and thus doesn't # print another retrieval message here) two = Registry(retrieve=retrieve).get_or_retrieve("urn:example:foo") print(two.value.contents["foo"]) .. testoutput:: Retrieved urn:example:foo bar bar N)maxsizezCallable[[URI], _T]retrievecsddfdd }|S)Nr )urics|}|}|S)N)rresponsecontents)rrrrF/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/retrieval.pycached_retrieveOsz>to_cached_resource..decorator..cached_retriever)rrr rrrr decoratorNsz%to_cached_resource..decoratorr)r rrrrrrto_cached_resources7  r)__doc__ __future__r functoolsrtypingrrrjsonZ referencingrZreferencing.typingr r r r rrrrrrrs   PK!ޏ!__pycache__/typing.cpython-38.pycnu[U af@sdZddlmZddlmZmZmZzddlmZee e fWn e k r`ddlmZYnXerzddl m Z m Z mZe ZedZGdddeeZGd d d eeZd S) z> Type-annotation related support for the referencing library. ) annotations) TYPE_CHECKINGProtocolTypeVar)Mapping)ResolvedResolverResourceDc@s eZdZdZdddddZdS)Retrievez A retrieval callable, usable within a `Registry` for resource retrieval. Does not make assumptions about where the resource might be coming from. URIz Resource[D])urireturncCsdS)z Retrieve the resource with the given URI. Raise `referencing.exceptions.NoSuchResource` if you wish to indicate the retriever cannot lookup the given URI. N)selfr rrC/opt/nydus/tmp/pip-target-53d1vnqk/lib/python/referencing/typing.py__call__"szRetrieve.__call__N)__name__ __module__ __qualname____doc__rrrrrr sr c@s2eZdZdZeddddZdddd d Zd S) Anchorz An anchor within a `Resource`. Beyond "simple" anchors, some specifications like JSON Schema's 2020 version have dynamic anchors. str)rcCsdS)z1 Return the name of this anchor. Nr)rrrrname4sz Anchor.namez Resolver[D]z Resolved[D])resolverrcCsdS)z6 Return the resource for this anchor. Nr)rrrrrresolve;szAnchor.resolveN)rrrrpropertyrrrrrrr,srN)r __future__rtypingrrrcollections.abcrr TypeErrorZreferencing._corerrr r r r rrrrrs  PK!Y __init__.pynu[PK!$F  _attrs.pynu[PK!A%[// Z_attrs.pyinu[PK!#``_core.pynu[PK! +PP fexceptions.pynu[PK!Dh|II wjsonschema.pynu[PK!py.typednu[PK!  retrieval.pynu[PK!`N2 @typing.pynu[PK!tests/__init__.pynu[PK!fbޓޓQtests/test_core.pynu[PK!*'qftests/test_exceptions.pynu[PK!W--_jtests/test_jsonschema.pynu[PK!k  Ntests/test_referencing_suite.pynu[PK!g+htests/test_retrieval.pynu[PK!%R;Ѧ)tests/__pycache__/__init__.cpython-38.pycnu[PK!yUU*tests/__pycache__/test_core.cpython-38.pycnu[PK!/08Btests/__pycache__/test_exceptions.cpython-38.pycnu[PK!0$Jtests/__pycache__/test_jsonschema.cpython-38.pycnu[PK!'R 7 etests/__pycache__/test_referencing_suite.cpython-38.pycnu[PK! r/otests/__pycache__/test_retrieval.cpython-38.pycnu[PK!X#}__pycache__/__init__.cpython-38.pycnu[PK! '!e__pycache__/_attrs.cpython-38.pycnu[PK!!irOUU __pycache__/_core.cpython-38.pycnu[PK!q%__pycache__/exceptions.cpython-38.pycnu[PK!G,,%__pycache__/jsonschema.cpython-38.pycnu[PK!Uȶ $__pycache__/retrieval.cpython-38.pycnu[PK!ޏ!)__pycache__/typing.cpython-38.pycnu[PK 2