Source code for calamus.fields

# -*- coding: utf-8 -*-
#
# Copyright 2017-2020- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Marshmallow fields for use with JSON-LD."""

import copy
import logging
import types
import typing
from functools import total_ordering

import marshmallow.fields as fields
from marshmallow import class_registry, utils
from marshmallow.base import SchemaABC
from marshmallow.exceptions import ValidationError

from calamus.schema import JsonLDSchema
from calamus.utils import ONTOLOGY_QUERY, Proxy, normalize_type, normalize_value

logger = logging.getLogger("calamus")


[docs]@total_ordering class IRIReference(object): """Represent an IRI in a namespace. Args: namespace (Namespace): The ``Namespace`` this IRI is part of. name (str): the property name of this IRI.""" def __init__(self, namespace, name): self.namespace = namespace self.name = name def __str__(self): """Return expanded string for IRI.""" return "{namespace}{name}".format(namespace=self.namespace, name=self.name) def __repr__(self): """Representation of IRI.""" return 'IRIReference(namespace="{namespace}", name="{name}")'.format(namespace=self.namespace, name=self.name) def __eq__(self, other): """Check equality between this and an other IRIReference.""" expanded = str(self) if isinstance(other, IRIReference): other = str(other) return expanded == other def __lt__(self, other): """Compare this with another IRI.""" return str(self) < str(other) def __hash__(self): return str(self).__hash__()
[docs]class Namespace(object): """Represents a namespace/ontology. Args: namespace (str): The base namespace URI for this namespace. ontology (str): Path to an ontology(OWL) file for this namespace. """ def __init__(self, namespace, ontology=None): self.namespace = namespace self.ontology = None if ontology: from rdflib.graph import Graph g = Graph() self.ontology = g.parse(ontology) def __getattr__(self, name): reference = IRIReference(self, name) if self.ontology: from rdflib.term import URIRef p = URIRef(str(reference)) qres = self.ontology.query(ONTOLOGY_QUERY, initBindings={"property": p}) if not next(iter(qres), False): raise ValueError(f"Property {name} does not exist in namespace {self.namespace}") return reference def __str__(self): return self.namespace
class _JsonLDField(fields.Field): """Internal class that enables marshmallow fields to be serialized with a JsonLD field name. Args: field_name (IRIReference): The JSON-LD field name. reverse (bool): Whether this is a reverse relation (via the ``@reverse`` keyword). init_name (str): Name of this parameter in the ``__init__`` method, if it differs from the name on the class. add_value_types (bool): Whether to add xsd value type information when serializing. """ def __init__(self, field_name=None, *args, **kwargs): if "default" in kwargs: kwargs["load_default"] = kwargs.get("default") filtered_kwargs = { key: value for key, value in kwargs.items() if key not in ["reverse", "init_name", "add_value_types", "default"] } super().__init__(*args, **filtered_kwargs) self.field_name = field_name self.reverse = kwargs.get("reverse", False) self.init_name = kwargs.get("init_name", None) self.add_value_types = kwargs.get("add_value_types", False) @property def data_key(self): """Return the (expanded) JsonLD field name.""" if self.field_name is None: raise ValueError("field_name was not set for {} in schema {}".format(self.name, self.root.__class__)) return str(self.field_name) @data_key.setter def data_key(self, value): pass def _deserialize(self, value, attr, data, **kwargs): value = normalize_value(value) return super()._deserialize(value, attr, data, **kwargs) def _reversed_fields(self): """Get fields that are reversed in type hierarchy.""" return {}
[docs]class Id(_JsonLDField, fields.String): """A node identifier.""" def __init__(self, *args, **kwargs): super().__init__(field_name="@id", *args, **kwargs)
[docs]class BlankNodeId(_JsonLDField, fields.String): """A blank/anonymous node identifier.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) return f"_:{value}" def _deserialize(self, value, attr, data, **kwargs): if isinstance(value, str) and value.startswith("_:"): value = value[2:] return super()._deserialize(value, attr, data, **kwargs) @property def data_key(self): """Return the (expanded) JsonLD field name.""" return "@id" @data_key.setter def data_key(self, value): pass
[docs]class String(_JsonLDField, fields.String): """A string field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#string"} return value
[docs]class IRI(String): """An external IRI reference.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): if self.parent.opts.add_value_types or self.add_value_types: return {"@id": value} value = super()._serialize(value, attr, obj, **kwargs) if value: return {"@id": value} def _deserialize(self, value, attr, data, **kwargs): if "@id" in value: value = value["@id"] return super()._deserialize(value, attr, data, **kwargs)
[docs]class Integer(_JsonLDField, fields.Integer): """An integer field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#integer"} return value
[docs]class Float(_JsonLDField, fields.Float): """A float field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#float"} return value
[docs]class Boolean(_JsonLDField, fields.Boolean): """A Boolean field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#boolean"} return value
[docs]class DateTime(_JsonLDField, fields.DateTime): """A date/time field.""" def __init__(self, *args, extra_formats=("%Y-%m-%d",), **kwargs): super().__init__(*args, **kwargs) self._extra_formats = extra_formats def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#dateTime"} return value def _deserialize(self, value, attr, data, **kwargs): try: return super()._deserialize(value, attr, data, **kwargs) except ValidationError: pass # Try with extra formats for format in self._extra_formats: try: original_format = self.format self.format = format return super()._deserialize(value, attr, data, **kwargs) except ValidationError: pass finally: self.format = original_format raise self.make_error("invalid", input=value, obj_type=self.OBJ_TYPE)
[docs]class NaiveDateTime(_JsonLDField, fields.NaiveDateTime): """A naive date/time field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#dateTime"} return value
[docs]class AwareDateTime(_JsonLDField, fields.AwareDateTime): """A naive date/time field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#dateTime"} return value
[docs]class Time(_JsonLDField, fields.Time): """A naive date/time field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#time"} return value
[docs]class Date(_JsonLDField, fields.Date): """A naive date/time field.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) if self.parent.opts.add_value_types or self.add_value_types: value = {"@value": value, "@type": "http://www.w3.org/2001/XMLSchema#date"} return value
[docs]class Dict(_JsonLDField, fields.Dict): """A dict field.""" pass
[docs]class Raw(_JsonLDField, fields.Raw): """A raw field.""" pass
[docs]class RawJsonLD(_JsonLDField, fields.Raw): """A raw JSON-LD field.""" def _dereference_single_id(self, value, attr, **kwargs): """Dereference a single id.""" data = kwargs["_all_objects"].get(value, None) if not data: raise ValueError("Couldn't dereference id {id}".format(id=value)) try: data = dict(data) except (TypeError, ValueError): raise ValueError(f"Couldn't convert value '{data}' to dictionary.") if self.reverse: # we need to remove the property from the child when handling reverse nesting del data[attr] return data def _dereference_flattened(self, value, attr, **kwargs): """Dereference an id or a list of ids.""" if isinstance(value, list) or isinstance(value, types.GeneratorType): return [self._dereference_flattened(i, attr, **kwargs) for i in value] if isinstance(value, str): return self._dereference_single_id(value, attr, **kwargs) elif isinstance(value, dict): if len(value.keys()) == 1 and "@id" in value: value = self._dereference_single_id(value["@id"], attr, **kwargs) for k, v in value.items(): if not k.startswith("@"): value[k] = self._dereference_flattened(v, attr, **kwargs) return value else: return value else: raise ValueError("Nested field needs to be a dict or an id entry/list, got {value}".format(value=value)) def _deserialize(self, value, attr, data, **kwargs): """Deserialize object.""" if kwargs.get("flattened", False): # could be id references, dereference them to continue deserialization value = self._dereference_flattened(value, attr, **kwargs) return super()._deserialize(value, attr, data, **kwargs)
[docs]class Nested(_JsonLDField, fields.Nested): """A reference to one or more nested classes.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not isinstance(self.nested, list): self.nested = [self.nested] nested = [] for n in self.nested: if isinstance(n, str) or issubclass(n, JsonLDSchema): nested.append(n) else: if hasattr(n, "__calamus_schema__"): nested.append(n.__calamus_schema__) else: raise ValueError(f"Only calamus schema is allowed in nested fields, not {n}") self.nested = nested @property def schema(self): """The nested Schema object. This method was copied from marshmallow and modified to support multiple different nested schemes. """ if not self._schema: # Inherit context from parent. context = getattr(self.parent, "context", {}) self._schema = {"from": {}, "to": {}} for nest in self.nested: if isinstance(nest, SchemaABC): rdf_type = str(normalize_type(nest.opts.rdf_type)) model = nest.opts.model if not rdf_type or not model: raise ValueError("Both rdf_type and model need to be set on the schema for nested to work") _schema = copy.copy(nest) _schema.context.update(context) # Respect only and exclude passed from parent and re-initialize fields set_class = _schema.set_class if self.only is not None: if self._schema.only is not None: original = _schema.only else: # only=None -> all fields original = _schema.fields.keys() _schema.only = set_class(self.only).intersection(original) if self.exclude: original = _schema.exclude _schema.exclude = set_class(self.exclude).union(original) _schema._init_fields() _schema._visited = self.root._visited self._schema["from"][rdf_type] = _schema self._schema["to"][model] = _schema else: if isinstance(nest, type) and issubclass(nest, SchemaABC): schema_class = nest elif not isinstance(nest, (str, bytes)): raise ValueError("Nested fields must be passed a Schema, not {}.".format(nest.__class__)) elif nest == "self": ret = self while not isinstance(ret, SchemaABC): ret = ret.parent schema_class = ret.__class__ else: schema_class = class_registry.get_class(nest) rdf_type = str(normalize_type(schema_class.opts.rdf_type)) model = schema_class.opts.model if not rdf_type or not model: raise ValueError("Both rdf_type and model need to be set on the schema for nested to work") self._schema["from"][rdf_type] = schema_class( many=False, only=self.only, exclude=self.exclude, context=context, load_only=self._nested_normalized_option("load_only"), dump_only=self._nested_normalized_option("dump_only"), lazy=self.root.lazy, flattened=self.root.flattened, _visited=self.root._visited, _top_level=False, ) self._schema["to"][model] = self._schema["from"][rdf_type] return self._schema def _serialize_single_obj(self, obj, **kwargs): """Deserializes a single (possibly flattened) object.""" if isinstance(obj, Proxy): proxy_schema = obj.__proxy_schema__ matching_schema = next((s for s in self.schema["to"].values() if isinstance(proxy_schema, type(s))), None) if ( not obj.__proxy_initialized__ and matching_schema and proxy_schema.flattened == matching_schema.flattened ): # if proxy was not accessed and we use the same schema, return original data return obj.__proxy_original_data__ # resolve Proxy object obj = obj.__wrapped__ if type(obj) not in self.schema["to"]: ValueError("Type {} not found in field {}.{}".format(type(obj), type(self.parent), self.name)) schema = self.schema["to"][type(obj)] schema._top_level = False return schema.dump(obj) def _serialize(self, nested_obj, attr, obj, many=False, **kwargs): """Deserialize a nested field with one or many values.""" if nested_obj is None: return None if isinstance(nested_obj, Proxy): proxy_schema = nested_obj.__proxy_schema__ matching_schema = next((s for s in self.schema["to"].values() if isinstance(proxy_schema, type(s))), None) if ( not nested_obj.__proxy_initialized__ and matching_schema and proxy_schema.flattened == matching_schema.flattened ): # if proxy was not accessed and we use the same schema, return original data return nested_obj.__proxy_original_data__ # resolve Proxy object nested_obj = nested_obj.__wrapped__ many = self.many or many if many: result = [] for obj in nested_obj: result.append(self._serialize_single_obj(obj, **kwargs)) return result else: if utils.is_collection(nested_obj): raise ValueError("Expected single value for field {} but got a collection".format(self.name)) return self._serialize_single_obj(nested_obj, **kwargs) def _test_collection(self, value, many=False): return # getting a non list for a list field is valid in jsonld
[docs] def load_single_entry(self, value, partial): """Loads a single nested entry from its schema.""" type_ = normalize_type(value["@type"]) schema = self.schema["from"][str(type_)] if not schema: ValueError("Type {} not found in {}.{}".format(value["@type"], type(self.parent), self.data_key)) if not schema._all_objects and self.root._all_objects: schema._all_objects = self.root._all_objects schema._reversed_properties = self.root._reversed_properties if schema.lazy: return Proxy(lambda: schema.load(value, unknown=self.unknown, partial=partial), schema, value) return schema.load(value, unknown=self.unknown, partial=partial)
def _load(self, value, data, partial=None, many=False): many = self.many or many try: if many: if not utils.is_collection(value): value = [value] valid_data = [] for val in value: valid_data.append(self.load_single_entry(val, partial)) else: if utils.is_collection(value): # single values can be single element lists in jsonld if len(value) > 1: raise ValueError( "Got multiple values for nested field {name} but many is not set.".format(name=self.name) ) else: value = value[0] valid_data = self.load_single_entry(value, partial) except ValidationError as error: raise ValidationError(error.messages, valid_data=error.valid_data) from error return valid_data def _dereference_single_id(self, value, attr, **kwargs): """Dereference a single id.""" data = kwargs["_all_objects"].get(value, None) if not data: raise ValueError("Couldn't dereference id {id}".format(id=value)) try: data = dict(data) except (TypeError, ValueError): raise ValueError(f"Couldn't convert value '{data}' to dictionary.") if self.reverse: # we need to remove the property from the child when handling reverse nesting del data[attr] return data def _dereference_flattened(self, value, attr, **kwargs): """Dereference an id or a list of ids.""" if isinstance(value, list) or isinstance(value, types.GeneratorType): return [self._dereference_flattened(i, attr, **kwargs) for i in value] if isinstance(value, str): return self._dereference_single_id(value, attr, **kwargs) elif isinstance(value, dict): if len(value.keys()) == 1 and "@id" in value: return self._dereference_single_id(value["@id"], attr, **kwargs) else: return value else: raise ValueError("Nested field needs to be a dict or an id entry/list, got {value}".format(value=value)) def _deserialize(self, value, attr, data, **kwargs): """Deserialize nested object.""" if kwargs.get("flattened", False): # could be id references, dereference them to continue deserialization value = self._dereference_flattened(value, attr, **kwargs) return super()._deserialize(value, attr, data, **kwargs) def _reversed_fields(self): """Get fields that are reversed in type hierarchy.""" fields = {} if self.reverse: fields = {self.data_key: {type(s) for s in self.schema["from"].values()}} for schema in self.schema["from"].values(): for k, v in schema._reversed_properties.items(): if k not in fields: fields[k] = set() fields[k].update(v) return fields
[docs]class List(_JsonLDField, fields.List): """A potentially ordered list using the ``@list`` keyword. Args: ordered (bool): Whether this is an ordered (via ``@list`` keyword) list. Warning: The JSON-LD flattening algorithm does not combine ``@list`` entries when merging nodes. So if you use ``ordered=True`` and flatten the output, and you have the node containing the list in multiple places in the graph, the node will get merged but its lists wont get merged (you get a list of lists instead), which means that the output can't be deserialized back to python objects.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.ordered = kwargs.get("ordered", False) def _serialize(self, value, attr, obj, **kwargs): value = super()._serialize(value, attr, obj, **kwargs) return {"@list": value} if self.ordered else value def _deserialize(self, value, attr, data, **kwargs) -> typing.List[typing.Any]: if isinstance(value, dict): # an ordered list value = value["@list"] return super(_JsonLDField, self)._deserialize(value, attr, data, **kwargs) @property def opts(self): """Return parent's opts.""" return self.parent.opts def _reversed_fields(self): """Get fields that are reversed in type hierarchy.""" return self.inner._reversed_fields()