Source code for renku.models._jsonld

# -*- coding: utf-8 -*-
#
# Copyright 2017-2018 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support JSON-LD context in models."""

import json
import weakref
from copy import deepcopy

import attr
from attr._compat import iteritems
from attr._funcs import has
from attr._make import Factory, fields
from pyld import jsonld as ld

from renku._compat import Path

KEY = '__json_ld'
KEY_CLS = '__json_ld_cls'

DOC_TPL = (
    "{cls.__doc__}\n\n"
    "**Type:**\n\n"
    ".. code-block:: json\n\n"
    "    {type}\n\n"
    "**Context:**\n\n"
    ".. code-block:: json\n\n"
    "{context}\n"
)

make_type = type


def attrs(
    maybe_cls=None, type=None, context=None, translate=None, **attrs_kwargs
):
    """Wrap an attr enabled class."""
    if isinstance(type, (list, tuple, set)):
        types = list(type)
    else:
        types = [type] if type is not None else []
    context = context or {}
    translate = translate or {}

    def wrap(cls):
        """Decorate an attr enabled class."""
        jsonld_cls = attr.s(cls, **attrs_kwargs)

        if not issubclass(jsonld_cls, JSONLDMixin):
            jsonld_cls = make_type(cls.__name__, (jsonld_cls, JSONLDMixin), {})

        # Merge types
        for subcls in jsonld_cls.mro():
            subtype = getattr(subcls, '_jsonld_type', None)
            if subtype:
                if isinstance(subtype, (tuple, list)):
                    types.extend(subtype)
                else:
                    types.append(subtype)

            for key, value in getattr(subcls, '_jsonld_context', {}).items():
                if key in context and context[key] != value:
                    raise TypeError()
                context.setdefault(key, value)

        for a in attr.fields(jsonld_cls):
            key = a.name
            ctx = a.metadata.get(KEY)
            if ctx is None:
                continue

            if ':' in ctx:
                prefix, _ = ctx.split(':', 1)
                if prefix in context:
                    context[key] = ctx
                    continue

            if isinstance(ctx, dict) or ctx not in context:
                context[key] = ctx

            if KEY_CLS in a.metadata:
                merge_ctx = a.metadata[KEY_CLS]._jsonld_context
                for ctx_key, ctx_value in merge_ctx.items():
                    context.setdefault(ctx_key, ctx_value)
                    if context[ctx_key] != ctx_value:
                        raise TypeError(
                            'Can not merge {0} and {1} because of {2}'.format(
                                jsonld_cls, a.metadata[KEY_CLS], ctx_key
                            )
                        )

        jsonld_cls.__module__ = cls.__module__
        jsonld_cls._jsonld_type = types[0] if len(types) == 1 else list(
            sorted(set(types))
        )
        jsonld_cls._jsonld_context = context
        jsonld_cls._jsonld_translate = translate
        jsonld_cls._jsonld_fields = {
            a.name
            for a in attr.fields(jsonld_cls) if KEY in a.metadata
        }

        context_doc = '\n'.join(
            '   ' + line for line in json.dumps(context, indent=2).split('\n')
        )
        jsonld_cls.__doc__ = DOC_TPL.format(
            cls=cls,
            type=json.dumps(jsonld_cls._jsonld_type),
            context=context_doc,
        )

        # Register class for given JSON-LD @type
        try:
            type_ = ld.expand({
                '@type': jsonld_cls._jsonld_type,
                '@context': context
            })[0]['@type']
            if isinstance(type_, list):
                type_ = tuple(sorted(type_))
        except Exception:
            # FIXME make sure all classes have @id defined
            return jsonld_cls

        if type_ in jsonld_cls.__type_registry__:
            raise TypeError(
                'Type {0!r} is already registered for class {1!r}.'.format(
                    jsonld_cls._jsonld_type,
                    jsonld_cls.__type_registry__[jsonld_cls._jsonld_type],
                )
            )
        jsonld_cls.__type_registry__[type_] = jsonld_cls
        return jsonld_cls

    if maybe_cls is None:
        return wrap
    return wrap(maybe_cls)


def attrib(context=None, **kwargs):
    """Create a new attribute with context."""
    kwargs.setdefault('metadata', {})
    kwargs['metadata'][KEY] = context
    return attr.ib(**kwargs)


_container_types = (
    ('list', list, lambda type, value: [type.from_jsonld(v) for v in value]),
    ('set', set, lambda type, value: {type.from_jsonld(v)
                                      for v in value}),
    (
        'index', dict,
        lambda type, value: {k: type.from_jsonld(v)
                             for k, v in value.items()}
    ),
)


def _container_attrib_builder(name, container, mapper):
    """Builder for container attributes."""
    context = {'@container': '@{0}'.format(name)}

    def _attrib(type, **kwargs):
        """Define a container attribute."""
        kwargs.setdefault('metadata', {})
        kwargs['metadata'][KEY_CLS] = type
        kwargs['default'] = Factory(container)

        def _converter(value):
            """Convert value to the given type."""
            if isinstance(value, container):
                return mapper(type, value)
            elif value is None:
                return value
            raise ValueError(value)

        kwargs.setdefault('converter', _converter)
        return attrib(context=context, **kwargs)

    return _attrib


container = type(
    'Container', (object, ), {
        name: staticmethod(_container_attrib_builder(name, container, mapper))
        for name, container, mapper in _container_types
    }
)


def asjsonld(
    inst,
    recurse=True,
    filter=None,
    dict_factory=dict,
    retain_collection_types=False,
    export_context=True,
):
    """Dump a JSON-LD class to the JSON with generated ``@context`` field."""
    jsonld_fields = inst.__class__._jsonld_fields
    attrs = tuple(
        field
        for field in fields(inst.__class__) if field.name in jsonld_fields
    )
    rv = dict_factory()

    def convert_value(v):
        """Convert special types."""
        if isinstance(v, Path):
            return str(v)
        return v

    for a in attrs:
        v = getattr(inst, a.name)

        # skip proxies
        if isinstance(v, weakref.ReferenceType):
            continue

        # do not export context for containers
        ec = export_context and KEY_CLS not in a.metadata

        if filter is not None and not filter(a, v):
            continue
        if recurse is True:
            if has(v.__class__):
                rv[a.name] = asjsonld(
                    v, recurse=True, filter=filter, dict_factory=dict_factory
                )
            elif isinstance(v, (tuple, list, set)):
                cf = v.__class__ if retain_collection_types is True else list
                rv[a.name] = cf([
                    asjsonld(
                        i,
                        recurse=True,
                        filter=filter,
                        dict_factory=dict_factory,
                        export_context=ec,
                    ) if has(i.__class__) else i for i in v
                ])
            elif isinstance(v, dict):
                df = dict_factory
                rv[a.name] = df((
                    asjsonld(kk, dict_factory=df) if has(kk.__class__) else kk,
                    asjsonld(vv, dict_factory=df, export_context=ec)
                    if has(vv.__class__) else vv
                ) for kk, vv in iteritems(v))
            else:
                rv[a.name] = convert_value(v)
        else:
            rv[a.name] = convert_value(v)

    inst_cls = type(inst)

    if export_context:
        rv['@context'] = deepcopy(inst_cls._jsonld_context)

    if inst_cls._jsonld_type:
        rv['@type'] = inst_cls._jsonld_type
    return rv


class JSONLDMixin(object):
    """Mixin for loading a JSON-LD data."""

    __type_registry__ = {}

    @classmethod
    def from_jsonld(cls, data):
        """Instantiate a JSON-LD class from data."""
        if isinstance(data, cls):
            return data

        if not isinstance(data, dict):
            raise ValueError(data)

        if '@type' in data:
            type_ = tuple(sorted(data['@type']))
            if type_ in cls.__type_registry__ and getattr(
                cls, '_jsonld_type', None
            ) != type_:
                new_cls = cls.__type_registry__[type_]
                if cls != new_cls:
                    return new_cls.from_jsonld(data)

        if cls._jsonld_translate:
            data = ld.compact(data, {'@context': cls._jsonld_translate})
            data.pop('@context', None)

        data.setdefault('@context', cls._jsonld_context)

        if data['@context'] != cls._jsonld_context:
            compacted = ld.compact(data, {'@context': cls._jsonld_context})
        else:
            compacted = data

        # assert compacted['@type'] == cls._jsonld_type, '@type must be equal'
        # TODO update self(not cls)._jsonld_context with data['@context']
        fields = cls._jsonld_fields
        return cls(
            **{k.lstrip('_'): v
               for k, v in compacted.items() if k in fields}
        )


s = attrs
ib = attrib