Source code for renku.core.models.provenance.agents

# -*- coding: utf-8 -*-
# Copyright 2018-2020- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent provenance agents."""

import os
import pathlib
import re
import urllib
import uuid
from urllib.parse import quote

import attr
from attr.validators import instance_of
from calamus.schema import JsonLDSchema
from marshmallow import EXCLUDE

from renku.core.models.calamus import StringList, fields, prov, rdfs, schema, wfprov
from renku.core.models.git import get_user_info
from renku.version import __version__, version_url

[docs]@attr.s(slots=True,) class Person: """Represent a person.""" client = attr.ib(default=None, kw_only=True) name = attr.ib(kw_only=True, validator=instance_of(str)) email = attr.ib(default=None, kw_only=True) label = attr.ib(kw_only=True) affiliation = attr.ib(default=None, kw_only=True,) alternate_name = attr.ib(default=None, kw_only=True,) _id = attr.ib(default=None, kw_only=True)
[docs] def default_id(self): """Set the default id.""" return generate_person_id(, client=self.client, full_identity=self.full_identity)
[docs] @email.validator def check_email(self, attribute, value): """Check that the email is valid.""" if and not (isinstance(value, str) and re.match(r"[^@]+@[^@]+\.[^@]+", value)): raise ValueError("Email address is invalid.")
[docs] @label.default def default_label(self): """Set the default label.""" return
[docs] @classmethod def from_commit(cls, commit): """Create an instance from a Git commit.""" return cls(,,)
@property def short_name(self): """Gives full name in short form.""" names = if len(names) == 1: return last_name = names[-1] initials = [name[0] for name in names] initials.pop() return "{0}.{1}".format(".".join(initials), last_name) @property def full_identity(self): """Return name, email, and affiliation.""" email = f" <{}>" if else "" affiliation = f" [{self.affiliation}]" if self.affiliation else "" return f"{}{email}{affiliation}"
[docs] @classmethod def from_git(cls, git): """Create an instance from a Git repo.""" name, email = get_user_info(git) return cls(name=name, email=email)
[docs] @classmethod def from_string(cls, string): """Create an instance from a 'Name <email>' string.""" regex_pattern = r"([^<>\[\]]*)" r"(?:<{1}\s*(\S+@\S+\.\S+){0,1}\s*>{1}){0,1}\s*" r"(?:\[{1}(.*)\]{1}){0,1}" name, email, affiliation =, string).groups() if name: name = name.strip() if affiliation: affiliation = affiliation.strip() affiliation = affiliation or None return cls(name=name, email=email, affiliation=affiliation)
[docs] @classmethod def from_dict(cls, obj): """Create and instance from a dictionary.""" return cls(**obj)
[docs] @classmethod def from_jsonld(cls, data): """Create an instance from JSON-LD data.""" if isinstance(data, cls): return data if not isinstance(data, dict): raise ValueError(data) return PersonSchema().load(data)
def __attrs_post_init__(self): """Finish object initialization.""" # handle the case where ids were improperly set if self._id == "mailto:None" or not self._id or self._id.startswith("_:"): self._id = self.default_id() if self.label is None: self.label = self.default_label()
class PersonSchema(JsonLDSchema): """Person schema.""" class Meta: """Meta class.""" rdf_type = [prov.Person, schema.Person] model = Person unknown = EXCLUDE name = StringList(, missing=None) email = fields.String(, missing=None) label = StringList(rdfs.label, missing=None) affiliation = StringList(schema.affiliation, missing=None) alternate_name = StringList(schema.alternateName, missing=None) _id = fields.Id(init_name="id")
[docs]@attr.s( frozen=True, slots=True, ) class SoftwareAgent: """Represent executed software.""" label = attr.ib(kw_only=True) _id = attr.ib(kw_only=True)
[docs] @classmethod def from_commit(cls, commit): """Create an instance from a Git commit.""" author = Person.from_commit(commit) if != commit.committer: return cls(, return author
[docs] @classmethod def from_jsonld(cls, data): """Create an instance from JSON-LD data.""" if isinstance(data, cls): return data if not isinstance(data, dict): raise ValueError(data) return SoftwareAgentSchema().load(data)
[docs] def as_jsonld(self): """Create JSON-LD.""" return SoftwareAgentSchema().dump(self)
# set up the default agent renku_agent = SoftwareAgent(label="renku {0}".format(__version__), id=version_url) def generate_person_id(client, email, full_identity): """Generate Person default id.""" if email: return "mailto:{email}".format(email=email) host = "localhost" if client: host = client.remote.get("host") or host host = os.environ.get("RENKU_DOMAIN") or host id_ = full_identity or str(uuid.uuid4()) return urllib.parse.urljoin( "https://{host}".format(host=host), pathlib.posixpath.join("/persons", quote(id_, safe="")) ) class SoftwareAgentSchema(JsonLDSchema): """SoftwareAgent schema.""" class Meta: """Meta class.""" rdf_type = [prov.SoftwareAgent, wfprov.WorkflowEngine] model = SoftwareAgent unknown = EXCLUDE label = fields.String(rdfs.label) _id = fields.Id(init_name="id")