Source code for renku.core.models.jsonld

# -*- coding: utf-8 -*-
#
# Copyright 2017-2019 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support JSON-LD context in models."""

import json
import os
import weakref
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path

import attr
import yaml
from attr._compat import iteritems
from attr._funcs import has
from attr._make import Factory, fields
from pyld import jsonld as ld

from renku.core.models.locals import ReferenceMixin, with_reference
from renku.core.models.migrations import JSONLD_MIGRATIONS

KEY = '__json_ld'
KEY_CLS = '__json_ld_cls'

DOC_TPL = (
    '{cls.__doc__}\n\n'
    '**Type:**\n\n'
    '.. code-block:: json\n\n'
    '    {type}\n\n'
    '**Context:**\n\n'
    '.. code-block:: json\n\n'
    '{context}\n'
)

make_type = type


# Shamelessly copy/pasting from SO:
# https://stackoverflow.com/questions/34667108/ignore-dates-and-times-while-parsing-yaml
# This is needed to allow us to load from yaml and use json down the line.
class NoDatesSafeLoader(yaml.SafeLoader):
    """Used to safely load basic python objects but ignore datetime strings."""

    @classmethod
    def remove_implicit_resolver(cls, tag_to_remove):
        """
        Remove implicit resolvers for a particular tag.

        Takes care not to modify resolvers in super classes.

        We want to load datetimes as strings, not dates, because we
        go on to serialise as json which doesn't have the advanced types
        of yaml, and leads to incompatibilities down the track.
        """
        if 'yaml_implicit_resolvers' not in cls.__dict__:
            cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()

        for first_letter, mappings in cls.yaml_implicit_resolvers.items():
            cls.yaml_implicit_resolvers[first_letter] = [
                (tag, regexp)
                for tag, regexp in mappings if tag != tag_to_remove
            ]


NoDatesSafeLoader.remove_implicit_resolver('tag:yaml.org,2002:timestamp')


def attrs(
    maybe_cls=None, type=None, context=None, translate=None, **attrs_kwargs
):
    """Wrap an attr enabled class."""
    if isinstance(type, (list, tuple, set)):
        types = list(type)
    else:
        types = [type] if type is not None else []
    context = context or {}
    translate = translate or {}

    def wrap(cls):
        """Decorate an attr enabled class."""
        jsonld_cls = attr.s(cls, **attrs_kwargs)

        if not issubclass(jsonld_cls, JSONLDMixin):
            jsonld_cls = attr.s(
                make_type(cls.__name__, (jsonld_cls, JSONLDMixin), {}),
                **attrs_kwargs
            )

        # Merge types
        for subcls in jsonld_cls.mro():
            subtype = getattr(subcls, '_jsonld_type', None)
            if subtype:
                if isinstance(subtype, (tuple, list)):
                    types.extend(subtype)
                else:
                    types.append(subtype)

            for key, value in getattr(subcls, '_jsonld_context', {}).items():
                if key in context and context[key] != value:
                    raise TypeError()
                context.setdefault(key, value)

        for a in attr.fields(jsonld_cls):
            key = a.name
            ctx = a.metadata.get(KEY)
            if ctx is None:
                continue

            if ':' in ctx:
                prefix, _ = ctx.split(':', 1)
                if prefix in context:
                    context[key] = ctx
                    continue

            if isinstance(ctx, dict) or ctx not in context:
                context[key] = ctx

            if KEY_CLS in a.metadata:
                merge_ctx = a.metadata[KEY_CLS]._jsonld_context
                for ctx_key, ctx_value in merge_ctx.items():
                    context.setdefault(ctx_key, ctx_value)

                    if context[ctx_key] != ctx_value:
                        raise TypeError(
                            'Can not merge {0} and {1} because of {2}'.format(
                                jsonld_cls, a.metadata[KEY_CLS], ctx_key
                            )
                        )

        jsonld_cls.__module__ = cls.__module__
        jsonld_cls._jsonld_type = types[0] if len(types) == 1 else list(
            sorted(set(types))
        )
        jsonld_cls._jsonld_context = context
        jsonld_cls._jsonld_translate = translate
        jsonld_cls._jsonld_fields = {
            a.name
            for a in attr.fields(jsonld_cls) if KEY in a.metadata
        }

        context_doc = '\n'.join(
            '   ' + line for line in json.dumps(context, indent=2).split('\n')
        )
        jsonld_cls.__doc__ = DOC_TPL.format(
            cls=cls,
            type=json.dumps(jsonld_cls._jsonld_type),
            context=context_doc,
        )

        # Register class for given JSON-LD @type
        try:
            type_ = ld.expand({
                '@type': jsonld_cls._jsonld_type,
                '@context': context
            })[0]['@type']
            if isinstance(type_, list):
                type_ = tuple(sorted(type_))
        except Exception:
            # FIXME make sure all classes have @id defined
            return jsonld_cls

        if type_ in jsonld_cls.__type_registry__:
            raise TypeError(
                'Type {0!r} is already registered for class {1!r}.'.format(
                    jsonld_cls._jsonld_type,
                    jsonld_cls.__type_registry__[jsonld_cls._jsonld_type],
                )
            )
        jsonld_cls.__type_registry__[type_] = jsonld_cls
        return jsonld_cls

    if maybe_cls is None:
        return wrap
    return wrap(maybe_cls)


def attrib(context=None, **kwargs):
    """Create a new attribute with context."""
    kwargs.setdefault('metadata', {})
    kwargs['metadata'][KEY] = context
    return attr.ib(**kwargs)


_container_types = (
    ('list', list, lambda type, value: [type.from_jsonld(v) for v in value]),
    ('set', set, lambda type, value: {type.from_jsonld(v)
                                      for v in value}),
    (
        'index', dict,
        lambda type, value: {k: type.from_jsonld(v)
                             for k, v in value.items()}
    ),
)


def _container_attrib_builder(name, container, mapper):
    """Builder for container attributes."""
    factory = Factory(container)

    def _attrib(type, **kwargs):
        """Define a container attribute."""
        kwargs.setdefault('metadata', {})
        kwargs['metadata'][KEY_CLS] = type
        kwargs['default'] = factory

        def _converter(value):
            """Convert value to the given type."""
            if isinstance(value, container):
                return mapper(type, value)
            elif value is None:
                return value

            raise ValueError(value)

        kwargs.setdefault('converter', _converter)

        return attrib(**kwargs)

    return _attrib


container = type(
    'Container', (object, ), {
        name: staticmethod(_container_attrib_builder(name, container, mapper))
        for name, container, mapper in _container_types
    }
)


def asjsonld(
    inst,
    recurse=True,
    filter=None,
    dict_factory=dict,
    retain_collection_types=False,
    export_context=True,
    basedir=None,
):
    """Dump a JSON-LD class to the JSON with generated ``@context`` field."""
    jsonld_fields = inst.__class__._jsonld_fields
    attrs = tuple(
        field
        for field in fields(inst.__class__) if field.name in jsonld_fields
    )
    rv = dict_factory()

    def convert_value(value):
        """Convert non-serializable types."""
        if isinstance(value, Path):
            result = str(value)
            if basedir:
                result = os.path.relpath(result, str(basedir))
            return result

        if isinstance(value, datetime):
            if not value.tzinfo:
                # set timezone to local timezone
                tz = datetime.now(timezone.utc).astimezone().tzinfo
                value = value.replace(tzinfo=tz)
            return value.isoformat()

        return value

    for a in attrs:
        v = getattr(inst, a.name)

        # skip proxies
        if isinstance(v, weakref.ReferenceType):
            continue

        # do not export context for containers
        ec = export_context and KEY_CLS not in a.metadata

        if filter is not None and not filter(a, v):
            continue
        if recurse is True:
            if has(v.__class__):
                rv[a.name] = asjsonld(
                    v,
                    recurse=True,
                    filter=filter,
                    dict_factory=dict_factory,
                    basedir=basedir,
                )
            elif isinstance(v, (tuple, list, set)):
                cf = v.__class__ if retain_collection_types is True else list
                rv[a.name] = cf([
                    asjsonld(
                        i,
                        recurse=True,
                        filter=filter,
                        dict_factory=dict_factory,
                        export_context=ec,
                        basedir=basedir,
                    ) if has(i.__class__) else i for i in v
                ])
            elif isinstance(v, dict):
                df = dict_factory
                rv[a.name] = df((
                    asjsonld(
                        kk,
                        dict_factory=df,
                        basedir=basedir,
                    ) if has(kk.__class__) else convert_value(kk),
                    asjsonld(
                        vv,
                        dict_factory=df,
                        export_context=ec,
                        basedir=basedir,
                    ) if has(vv.__class__) else vv
                ) for kk, vv in iteritems(v))
            else:
                rv[a.name] = convert_value(v)
        else:
            rv[a.name] = convert_value(v)

    inst_cls = type(inst)

    if export_context:
        rv['@context'] = deepcopy(inst_cls._jsonld_context)

    if inst_cls._jsonld_type:
        rv['@type'] = inst_cls._jsonld_type
    return rv


class JSONLDMixin(ReferenceMixin):
    """Mixin for loading a JSON-LD data."""

    __type_registry__ = {}

    @classmethod
    def from_jsonld(
        cls,
        data,
        client=None,
        commit=None,
        __reference__=None,
        __source__=None,
    ):
        """Instantiate a JSON-LD class from data."""
        if isinstance(data, cls):
            return data

        if not isinstance(data, dict):
            raise ValueError(data)

        if '@type' in data:
            # @type could be a string or a list - make sure it is a list
            type_ = data['@type']
            if not isinstance(type_, list):
                type_ = [type_]
            # If a json-ld class has multiple types, they are in a
            # sorted tuple. This is used as the key for the class
            # registry, so we have to match it here.
            type_ = tuple(sorted(type_))
            if type_ in cls.__type_registry__ and getattr(
                cls, '_jsonld_type', None
            ) != type_:
                new_cls = cls.__type_registry__[type_]
                if cls != new_cls:
                    return new_cls.from_jsonld(
                        data, client=client, commit=commit
                    )

        if cls._jsonld_translate:
            # perform the translation
            data = ld.compact(data, cls._jsonld_translate)
            # compact using the class json-ld context
            data.pop('@context', None)
            data = ld.compact(data, cls._jsonld_context)

        data.setdefault('@context', cls._jsonld_context)

        schema_type = data.get('@type')
        migrations = []

        if isinstance(schema_type, list):
            for schema in schema_type:
                mig_ = JSONLD_MIGRATIONS.get(schema)
                if mig_:
                    migrations += mig_

        if isinstance(schema_type, str) and not migrations:
            migrations += JSONLD_MIGRATIONS.get(schema_type, [])

        for migration in set(migrations):
            data = migration(data)
            if __source__:
                __source__ = migration(__source__)

        if data['@context'] != cls._jsonld_context:
            try:
                compacted = ld.compact(data, cls._jsonld_context)
            except Exception:
                compacted = data
        else:
            compacted = data

        fields = cls._jsonld_fields

        data_ = {}
        # `client` and `commit` are passed in optionally for some classes
        # They might be unset if the metadata is used to instantiate
        # an object outside of a repo/client context.
        if client:
            data_['client'] = client
        if commit:
            data_['commit'] = commit

        for k, v in compacted.items():
            if k in fields:
                data_[k.lstrip('_')] = v

        if __reference__:
            with with_reference(__reference__):
                self = cls(**data_)
        else:
            self = cls(**data_)

        if __source__:
            setattr(self, '__source__', __source__)

        return self

    @classmethod
    def from_yaml(cls, path, client=None, commit=None):
        """Return an instance from a YAML file."""
        import yaml

        with path.open(mode='r') as fp:
            source = yaml.load(fp, Loader=NoDatesSafeLoader) or {}
            self = cls.from_jsonld(
                source,
                client=client,
                commit=commit,
                __reference__=path,
                __source__=deepcopy(source)
            )
        return self

    def asjsonld(self):
        """Create JSON-LD with the original source data."""
        source = {}
        if self.__source__:
            source.update(self.__source__)
        source.update(asjsonld(self))
        return source

    def to_yaml(self):
        """Store an instance to the referenced YAML file."""
        dumper = yaml.dumper.Dumper
        dumper.ignore_aliases = lambda _, data: True

        with self.__reference__.open('w') as fp:
            jsonld_ = self.asjsonld()
            yaml.dump(jsonld_, fp, default_flow_style=False, Dumper=dumper)


s = attrs
ib = attrib