Source code for renku.core.models.jsonld

# -*- coding: utf-8 -*-
#
# Copyright 2017-2020 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support JSON-LD context in models."""

import json
import os
import weakref
from copy import deepcopy
from datetime import datetime, timezone
from functools import partial
from importlib import import_module
from pathlib import Path

import attr
import yaml
from attr._compat import iteritems
from attr._funcs import has
from attr._make import Factory, fields

from renku.core.compat import pyld
from renku.core.models.locals import ReferenceMixin, with_reference
from renku.core.models.migrations import JSONLD_MIGRATIONS

KEY = '__json_ld'
KEY_CLS = '__json_ld_cls'

DOC_TPL = (
    '{cls.__doc__}\n\n'
    '**Type:**\n\n'
    '.. code-block:: json\n\n'
    '    {type}\n\n'
    '**Context:**\n\n'
    '.. code-block:: json\n\n'
    '{context}\n'
)

make_type = type


# Shamelessly copy/pasting from SO:
# https://stackoverflow.com/questions/34667108/ignore-dates-and-times-while-parsing-yaml
# This is needed to allow us to load from yaml and use json down the line.
class NoDatesSafeLoader(yaml.SafeLoader):
    """Used to safely load basic python objects but ignore datetime strings."""

    @classmethod
    def remove_implicit_resolver(cls, tag_to_remove):
        """
        Remove implicit resolvers for a particular tag.

        Takes care not to modify resolvers in super classes.

        We want to load datetimes as strings, not dates, because we
        go on to serialise as json which doesn't have the advanced types
        of yaml, and leads to incompatibilities down the track.
        """
        if 'yaml_implicit_resolvers' not in cls.__dict__:
            cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()

        for first_letter, mappings in cls.yaml_implicit_resolvers.items():
            cls.yaml_implicit_resolvers[first_letter] = [
                (tag, regexp)
                for tag, regexp in mappings if tag != tag_to_remove
            ]


NoDatesSafeLoader.remove_implicit_resolver('tag:yaml.org,2002:timestamp')


def attrs(
    maybe_cls=None, type=None, context=None, translate=None, **attrs_kwargs
):
    """Wrap an attr enabled class."""
    if isinstance(type, (list, tuple, set)):
        types = list(type)
    else:
        types = [type] if type is not None else []
    context = context or {}
    translate = translate or {}

    if '@version' not in context:
        context['@version'] = 1.1

    def wrap(cls):
        """Decorate an attr enabled class."""
        jsonld_cls = attr.s(cls, **attrs_kwargs)

        if not issubclass(jsonld_cls, JSONLDMixin):
            jsonld_cls = attr.s(
                make_type(cls.__name__, (jsonld_cls, JSONLDMixin), {}),
                **attrs_kwargs
            )
        # Merge types
        for subcls in jsonld_cls.mro():
            subtype = getattr(subcls, '_jsonld_type', None)
            if subtype:
                if isinstance(subtype, (tuple, list)):
                    types.extend(subtype)
                else:
                    types.append(subtype)

            for key, value in getattr(subcls, '_jsonld_context', {}).items():
                if key in context and context[key] != value:
                    raise TypeError()
                context.setdefault(key, value)

        property_context, scoped_properties = _add_class_property_contexts(
            jsonld_cls, context
        )

        context.update(property_context)

        jsonld_cls.__module__ = cls.__module__
        jsonld_cls._jsonld_type = types[0] if len(types) == 1 else list(
            sorted(set(types))
        )
        jsonld_cls._scoped_properties = scoped_properties
        jsonld_cls._renku_type = fullname(cls)

        jsonld_cls._jsonld_context = context
        jsonld_cls._jsonld_translate = translate
        jsonld_cls._jsonld_fields = {
            a.name
            for a in attr.fields(jsonld_cls) if KEY in a.metadata
        }

        context_doc = '\n'.join(
            '   ' + line for line in json.dumps(context, indent=2).split('\n')
        )
        jsonld_cls.__doc__ = DOC_TPL.format(
            cls=cls,
            type=json.dumps(jsonld_cls._jsonld_type),
            context=context_doc,
        )

        # Register class for given JSON-LD @type
        try:
            type_ = pyld.jsonld.expand({
                '@type': jsonld_cls._jsonld_type,
                '@context': context
            })[0]['@type']
            if isinstance(type_, list):
                type_ = tuple(sorted(type_))
        except Exception:
            # FIXME make sure all classes have @id defined
            return jsonld_cls

        if (
            type_ in jsonld_cls.__type_registry__ and
            jsonld_cls.__type_registry__[type_] != jsonld_cls
        ):
            raise TypeError(
                'Type {0!r} in {1!r} is already registered for {2!r}.'.format(
                    jsonld_cls._jsonld_type,
                    jsonld_cls,
                    jsonld_cls.__type_registry__[type_],
                )
            )

        jsonld_cls.__type_registry__[type_] = jsonld_cls
        return jsonld_cls

    if maybe_cls is None:
        return wrap
    return wrap(maybe_cls)


def _add_class_property_contexts(jsonld_cls, context):
    """Adds ``@context`` of a class' properties to the class' ``@context``."""
    scoped_properties = []

    property_context = {}

    for a in attr.fields(jsonld_cls):
        key = a.name
        ctx = a.metadata.get(KEY)
        if ctx is None:
            continue

        current_context = None
        if isinstance(ctx, str) and ':' in ctx:
            prefix, _ = ctx.split(':', 1)
            if prefix in context:
                current_context = ctx

        elif isinstance(ctx, dict) or ctx not in context:
            current_context = ctx

        if KEY_CLS in a.metadata:
            t = a.metadata[KEY_CLS]
            current_context, is_scoped = _propagate_reference_contexts(
                t, current_context, ctx
            )

            if is_scoped:
                scoped_properties.append(key)

        if current_context:
            property_context[key] = current_context

    return property_context, scoped_properties


def _propagate_reference_contexts(
    type_references, current_context, parent_context
):
    """Get JSON-LD contexts for all types of a reference and propagate them."""
    if not isinstance(type_references, (list, set, tuple)):
        type_references = [type_references]
    classes = [import_class_from_string(c) for c in type_references]
    classes = [c for c in classes if hasattr(c, '_jsonld_context')]
    scoped_properties = False

    if len(classes) == 1:
        merge_ctx = classes[0]._jsonld_context

        if not current_context:
            current_context = {'@id': parent_context}
        elif not isinstance(current_context, dict):
            current_context = {'@id': current_context}

        current_context['@context'] = merge_ctx
    else:
        scoped_properties = True

        for cls in classes:
            merge_ctx = cls._jsonld_context

            if not current_context:
                current_context = {'@id': parent_context}
            elif not isinstance(current_context, dict):
                current_context = {'@id': current_context}

            if '@context' not in current_context:
                current_context['@context'] = []

            subtypes = cls._jsonld_type

            if not isinstance(subtypes, (tuple, list)):
                subtypes = [subtypes]

            for subtype in subtypes:
                # Use nested, type scoped contexts for each semantic type
                # of a reference, to uniquely bind a context to a type.
                # We need to expand the subtype, as type scoped contexts
                # behave weirdly

                expanded_subtype = subtype

                prefix, suffix = subtype.split(':', 1)

                if prefix in merge_ctx:
                    expanded_subtype = '{}{}'.format(merge_ctx[prefix], suffix)

                current_context['@context'].append({
                    fullname(cls) + '_' + subtype: {
                        '@id': expanded_subtype,
                        '@context': merge_ctx
                    }
                })

    return current_context, scoped_properties


def _default_converter(cls, value):
    """A default converter method that tries to deserialize objects."""
    if isinstance(value, dict):
        return cls.from_jsonld(value)

    return value


def attrib(context=None, type=None, **kwargs):
    """Create a new attribute with context."""
    kwargs.setdefault('metadata', {})
    kwargs['metadata'][KEY] = context
    if type:
        kwargs['metadata'][KEY_CLS] = type

        if 'converter' not in kwargs and hasattr(type, 'from_jsonld'):
            kwargs['converter'] = partial(_default_converter, type)
    return attr.ib(**kwargs)


_container_types = (
    ('list', list, lambda type, value: [type.from_jsonld(v) for v in value]),
    ('set', set, lambda type, value: {type.from_jsonld(v)
                                      for v in value}),
    (
        'index', dict,
        lambda type, value: {k: type.from_jsonld(v)
                             for k, v in value.items()}
    ),
)


def _container_attrib_builder(name, container, mapper):
    """Builder for container attributes."""
    factory = Factory(container)

    def _attrib(type, **kwargs):
        """Define a container attribute."""
        kwargs.setdefault('metadata', {})
        kwargs['metadata'][KEY_CLS] = type
        kwargs['default'] = factory

        def _converter(value):
            """Convert value to the given type."""
            if isinstance(value, container):
                return mapper(type, value)
            elif value is None:
                return value

            raise ValueError(value)

        kwargs.setdefault('converter', _converter)

        return attrib(**kwargs)

    return _attrib


container = type(
    'Container', (object, ), {
        name: staticmethod(_container_attrib_builder(name, container, mapper))
        for name, container, mapper in _container_types
    }
)


def asjsonld(
    inst,
    recurse=True,
    filter=None,
    dict_factory=dict,
    retain_collection_types=False,
    add_context=True,
    use_scoped_type_form=False,
    basedir=None,
):
    """Dump a JSON-LD class to the JSON with generated ``@context`` field."""
    jsonld_fields = inst.__class__._jsonld_fields
    attrs = tuple(
        field
        for field in fields(inst.__class__) if field.name in jsonld_fields
    )
    rv = dict_factory()

    def convert_value(value):
        """Convert non-serializable types."""
        if isinstance(value, Path):
            result = str(value)
            if basedir:
                result = os.path.relpath(result, str(basedir))
            return result

        if isinstance(value, datetime):
            if not value.tzinfo:
                # set timezone to local timezone
                tz = datetime.now(timezone.utc).astimezone().tzinfo
                value = value.replace(tzinfo=tz)
            return value.isoformat()

        return value

    inst_cls = type(inst)

    for a in attrs:
        v = getattr(inst, a.name)
        scoped = a.name in inst_cls._scoped_properties

        # skip proxies
        if isinstance(v, weakref.ReferenceType):
            continue

        if filter is not None and not filter(a, v):
            continue
        if recurse is True:
            if has(v.__class__):
                rv[a.name] = asjsonld(
                    v,
                    recurse=True,
                    filter=filter,
                    dict_factory=dict_factory,
                    add_context=False,
                    use_scoped_type_form=scoped,
                    basedir=basedir,
                )
            elif isinstance(v, (tuple, list, set)):
                cf = v.__class__ if retain_collection_types is True else list
                rv[a.name] = cf([
                    asjsonld(
                        i,
                        recurse=True,
                        filter=filter,
                        dict_factory=dict_factory,
                        add_context=False,
                        use_scoped_type_form=scoped,
                        basedir=basedir,
                    ) if has(i.__class__) else i for i in v
                ])
            elif isinstance(v, dict):
                df = dict_factory
                rv[a.name] = df((
                    asjsonld(
                        kk,
                        dict_factory=df,
                        add_context=False,
                        basedir=basedir,
                    ) if has(kk.__class__) else convert_value(kk),
                    asjsonld(
                        vv,
                        dict_factory=df,
                        add_context=False,
                        basedir=basedir,
                    ) if has(vv.__class__) else vv
                ) for kk, vv in iteritems(v))
            else:
                rv[a.name] = convert_value(v)
        else:
            rv[a.name] = convert_value(v)

    if add_context:
        rv['@context'] = deepcopy(inst_cls._jsonld_context)

    rv_type = []
    if inst_cls._jsonld_type:
        if isinstance(inst_cls._jsonld_type, (list, tuple, set)):
            rv_type.extend(inst_cls._jsonld_type)
        else:
            rv_type.append(inst_cls._jsonld_type)

        if use_scoped_type_form:
            rv_type = [
                '{}_{}'.format(inst_cls._renku_type, t) for t in rv_type
            ]

        rv['@type'] = rv_type[0] if len(rv_type) == 1 else rv_type

    return rv


class JSONLDMixin(ReferenceMixin):
    """Mixin for loading a JSON-LD data."""

    __type_registry__ = {}

    @classmethod
    def from_jsonld(
        cls,
        data,
        client=None,
        commit=None,
        __reference__=None,
        __source__=None,
    ):
        """Instantiate a JSON-LD class from data."""
        if isinstance(data, cls):
            return data

        if not isinstance(data, dict):
            raise ValueError(data)

        if '@type' in data:
            # @type could be a string or a list - make sure it is a list
            type_ = data['@type']
            if not isinstance(type_, list):
                type_ = [type_]
            # If a json-ld class has multiple types, they are in a
            # sorted tuple. This is used as the key for the class
            # registry, so we have to match it here.
            type_ = tuple(sorted(type_))
            if type_ in cls.__type_registry__ and getattr(
                cls, '_jsonld_type', None
            ) != type_:
                new_cls = cls.__type_registry__[type_]
                if cls != new_cls:
                    return new_cls.from_jsonld(
                        data, client=client, commit=commit
                    )

        if cls._jsonld_translate:
            # perform the translation
            data = pyld.jsonld.compact(data, cls._jsonld_translate)
            # compact using the class json-ld context
            data.pop('@context', None)
            data = pyld.jsonld.compact(data, cls._jsonld_context)

        data.setdefault('@context', cls._jsonld_context)

        schema_type = data.get('@type')
        migrations = []

        if isinstance(schema_type, list):
            for schema in schema_type:
                mig_ = JSONLD_MIGRATIONS.get(schema)
                if mig_:
                    migrations += mig_

        if isinstance(schema_type, str) and not migrations:
            migrations += JSONLD_MIGRATIONS.get(schema_type, [])

        for migration in set(migrations):
            data = migration(data, client)
            if __source__:
                __source__ = migration(__source__, client)

        if data['@context'] != cls._jsonld_context:
            # merge new context into old context to prevent properties
            # getting lost in jsonld expansion
            if isinstance(data['@context'], str):
                data['@context'] = {'@base': data['@context']}
            data['@context'].update(cls._jsonld_context)
            try:
                compacted = pyld.jsonld.compact(data, cls._jsonld_context)
            except Exception:
                compacted = data
        else:
            compacted = data

        fields = cls._jsonld_fields

        data_ = {}
        # `client` and `commit` are passed in optionally for some classes
        # They might be unset if the metadata is used to instantiate
        # an object outside of a repo/client context.
        if client:
            data_['client'] = client
        if commit:
            data_['commit'] = commit

        for k, v in compacted.items():
            if k in fields:
                no_value_context = isinstance(v, dict) and '@context' not in v
                has_nested_context = (
                    k in compacted['@context'] and
                    '@context' in compacted['@context'][k]
                )
                if no_value_context and has_nested_context:
                    # Propagate down context
                    v['@context'] = compacted['@context'][k]['@context']

                data_[k.lstrip('_')] = v

        if __reference__:
            with with_reference(__reference__):
                self = cls(**data_)
        else:
            self = cls(**data_)

        if __source__:
            setattr(self, '__source__', __source__)

        return self

    @classmethod
    def from_yaml(cls, path, client=None, commit=None):
        """Return an instance from a YAML file."""
        import yaml

        with path.open(mode='r') as fp:
            source = yaml.load(fp, Loader=NoDatesSafeLoader) or {}
            self = cls.from_jsonld(
                source,
                client=client,
                commit=commit,
                __reference__=path,
                __source__=deepcopy(source)
            )
        return self

    def asjsonld(self):
        """Create JSON-LD with the original source data."""
        source = {}
        if self.__source__:
            source.update(self.__source__)
        source.update(asjsonld(self))
        return source

    def to_yaml(self):
        """Store an instance to the referenced YAML file."""
        dumper = yaml.dumper.Dumper
        dumper.ignore_aliases = lambda _, data: True

        with self.__reference__.open('w') as fp:
            jsonld_ = self.asjsonld()
            yaml.dump(jsonld_, fp, default_flow_style=False, Dumper=dumper)


def fullname(cls):
    """Gets the fully qualified type name of this class."""
    if type(cls) != type:
        cls = type(cls)
    module = cls.__module__
    if module is None or module == str.__class__.__module__:
        return cls.__name__  # Avoid reporting __builtin__
    else:
        return '.'.join([module, cls.__name__])


def import_class_from_string(dotted_path):
    """Imports a fully qualified class string."""
    if not isinstance(dotted_path, str):
        return dotted_path
    module_path, class_name = dotted_path.rsplit('.', 1)
    module = import_module(module_path)
    try:
        return getattr(module, class_name)
    except AttributeError:
        return None


s = attrs
ib = attrib