Source code for renku.core.models.provenance.activities

# -*- coding: utf-8 -*-
# Copyright 2018-2019 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent a Git commit."""

import os
import uuid
from collections import OrderedDict
from pathlib import Path

import attr
from git import NULL_TREE

from renku.core.models import jsonld
from renku.core.models.cwl import WORKFLOW_STEP_RUN_TYPES
from renku.core.models.cwl.ascwl import CWLClass
from renku.core.models.cwl.types import PATH_OBJECTS
from renku.core.models.refs import LinkReference

from .agents import Person, renku_agent
from .entities import Collection, CommitMixin, Entity, Process, Workflow
from .qualified import Association, Generation, Usage

def _nodes(output, parent=None):
    """Yield nodes from entities."""
    # NOTE refactor so all outputs behave the same
    entity = getattr(output, 'entity', output)

    if isinstance(entity, Collection):
        for member in entity.members:
            if parent is not None:
                member = attr.evolve(member, parent=parent)
            yield from _nodes(member)
        yield output
        yield output

[docs]@jsonld.s( type='prov:Activity', context={ 'prov': '', 'rdfs': '', 'schema': '' }, cmp=False, ) class Activity(CommitMixin): """Represent an activity in the repository.""" _id = jsonld.ib(context='@id', kw_only=True) _message = jsonld.ib(context='rdfs:comment', kw_only=True) _was_informed_by = jsonld.ib( context='prov:wasInformedBy', kw_only=True, ) part_of = attr.ib(default=None, kw_only=True) process = attr.ib(default=None, kw_only=True) outputs = attr.ib(kw_only=True) _collections = attr.ib( default=attr.Factory(OrderedDict), init=False, kw_only=True ) generated = jsonld.ib( context={ '@reverse': 'prov:activity', }, kw_only=True, hash=False ) influenced = jsonld.ib( context='prov:influenced', kw_only=True, ) started_at_time = jsonld.ib( context={ '@id': 'prov:startedAtTime', '@type': '', }, kw_only=True, ) ended_at_time = jsonld.ib( context={ '@id': 'prov:endedAtTime', '@type': '', }, kw_only=True, ) agent = jsonld.ib(context='prov:agent', kw_only=True, default=renku_agent) person_agent = jsonld.ib(context='prov:agent', kw_only=True)
[docs] @generated.default def default_generated(self): """Entities generated by this Action.""" results = [] for path, role in self.outputs.items(): client, commit, path = self.client.resolve_in_submodules( self.commit, path, ) output_path = client.path / path parents = list(output_path.relative_to(client.path).parents) collection = None members = [] for parent in reversed(parents[:-1]): if str(parent) in self._collections: collection = self._collections[str(parent)] else: collection = Collection( client=client, commit=commit, path=str(parent), members=[], parent=collection, ) members.append(collection) self._collections[str(parent)] = collection members = collection.members entity_cls = Entity if (self.client.path / path).is_dir(): entity_cls = Collection # TODO: use a factory method to generate the entity if str(path).startswith( os.path.join(client.renku_home, client.DATASETS) ): entity = client.get_dataset(Path(path), commit=commit) else: entity = entity_cls( commit=commit, client=client, path=str(path), parent=collection, ) if collection: collection.members.append(entity) results.append( Generation( activity=self, entity=entity, role=role, ) ) return results
[docs] @influenced.default def default_influenced(self): """Calculate default values.""" return list(self._collections.values())
@property def parents(self): """Return parent commits.""" return list(self.commit.parents) @property def paths(self): """Return all paths in the commit.""" index = set() for file_ in self.commit.diff(self.commit.parents or NULL_TREE): path_ = Path(file_.a_path) is_dataset = self.client.DATASETS in str(path_) not_refs = LinkReference.REFS not in str(path_) does_not_exists = not path_.exists() if all([is_dataset, not_refs, does_not_exists]): uid = uuid.UUID( path_ = ( Path(self.client.renku_home) / self.client.DATASETS / str(uid) / self.client.METADATA ) index.add(str(path_)) return index
[docs] @classmethod def generate_id(cls, commit): """Calculate action ID.""" return 'commit/{commit.hexsha}'.format(commit=commit)
[docs] @_id.default def default_id(self): """Configure calculated ID.""" return self.generate_id(self.commit)
[docs] @_message.default def default_message(self): """Generate a default message.""" return self.commit.message
[docs] @_was_informed_by.default def default_was_informed_by(self): """List parent actions.""" return [{ '@id': self.generate_id(parent), } for parent in self.commit.parents]
[docs] @outputs.default def default_outputs(self): """Guess default outputs from a commit.""" return {path: None for path in self.paths}
[docs] @started_at_time.default def default_started_at_time(self): """Configure calculated properties.""" return self.commit.authored_datetime.isoformat()
[docs] @ended_at_time.default def default_ended_at_time(self): """Configure calculated properties.""" return self.commit.committed_datetime.isoformat()
[docs] @person_agent.default def default_person_agent(self): """Set person agent to be the author of the commit.""" if self.commit: return Person.from_commit(self.commit) return None
@property def nodes(self): """Return topologically sorted nodes.""" collections = OrderedDict() def _parents(node): if node.parent: yield from _parents(node.parent) yield node.parent for output in self.generated: for parent in _parents(output.entity): collections[parent.path] = parent yield from _nodes(output) yield from reversed(collections.values())
[docs]@jsonld.s( type='wfprov:ProcessRun', context={ 'wfprov': '', }, cmp=False, ) class ProcessRun(Activity): """A process run is a particular execution of a Process description.""" __association_cls__ = Process inputs = attr.ib(kw_only=True) outputs = attr.ib(kw_only=True) generated = jsonld.ib( context={ '@reverse': 'prov:activity', }, kw_only=True, hash=False, ) association = jsonld.ib( context='prov:qualifiedAssociation', default=None, kw_only=True, ) qualified_usage = jsonld.ib(context='prov:qualifiedUsage', kw_only=True)
[docs] @generated.default def default_generated(self): """Calculate default values.""" return super().default_generated()
def __attrs_post_init__(self): """Calculate properties.""" if self.association is None: self.association = Association.from_activity(self) if self.path is None: # FIXME only works for linking directory to file existing_outputs = set(self.outputs.values()) for output_id, output_path in self.iter_output_files(): if output_id not in existing_outputs: self.outputs[os.path.join( next( path for path, usage in self.inputs.items() if usage.role == 'input_directory' ), output_path )] = output_id break
[docs] @inputs.default def default_inputs(self): """Guess default inputs from a process.""" inputs = {} basedir = os.path.dirname(self.path) commit = self.commit client = self.client process = self.process revision = '{0}^'.format(commit) for input_id, input_path in process.iter_input_files(basedir): try: usage_id = self._id + '/inputs/' + input_id dependency = Usage.from_revision( client=client, path=input_path, role=input_id, revision=revision, id=usage_id, ) inputs[input_path] = dependency except KeyError: continue return inputs
[docs] @qualified_usage.default def default_qualified_usage(self): """Generate list of used artifacts.""" return list(self.inputs.values())
[docs] def iter_output_files(self, commit=None): """Yield tuples with output id and path.""" process = self.process for output in process.outputs: if output.type in {'stdout', 'stderr'}: stream = getattr(process, output.type) if stream: yield, stream elif output.type in PATH_OBJECTS: glob = output.outputBinding.glob # TODO better support for Expression if glob.startswith('$(inputs.'): input_id = glob[len('$(inputs.'):-1] for input_ in process.inputs: if == input_id: yield, input_.default break # out from process.inputs else: yield, glob
[docs] @outputs.default def default_outputs(self): """Guess default outputs from a process.""" if self.path is None: return {} return { output_path: output_id for output_id, output_path in self.iter_output_files() }
@property def parents(self): """Return parent commits.""" return [ member.commit for usage in self.qualified_usage for member in usage.entity.entities ] + super().parents @property def nodes(self): """Return topologically sorted nodes.""" # Outputs go first yield from super().nodes # Activity itself yield self.association.plan
[docs]@jsonld.s( type='wfprov:WorkflowRun', context={ 'wfprov': '', }, cmp=False, ) class WorkflowRun(ProcessRun): """A workflow run typically contains several subprocesses.""" __association_cls__ = Workflow # @reverse wfprov:wasPartOfWorkflowRun children = attr.ib(kw_only=True) _processes = jsonld.ib( context={ '@reverse': 'wfprov:wasPartOfWorkflowRun', }, default=attr.Factory(list), kw_only=True, ) subprocesses = attr.ib(kw_only=True) outputs = attr.ib(kw_only=True) generated = jsonld.ib( context={ '@reverse': 'prov:activity', }, kw_only=True, hash=False, )
[docs] @children.default def default_children(self): """Load children from process.""" basedir = os.path.dirname(self.path) if self.path is not None else None def _load(step): """Load step definition.""" if isinstance(, WORKFLOW_STEP_RUN_TYPES): return if self.commit: import yaml data = (self.commit.tree / basedir / return CWLClass.from_cwl(yaml.safe_load(data)) return CWLClass.from_yaml( return { _load(step) for step in self.process.steps}
[docs] @subprocesses.default def default_subprocesses(self): """Load subprocesses.""" basedir = os.path.dirname(self.path) revision = '{0}^'.format(self.commit) ins = { dependency.role: dependency for path, dependency in self.inputs.items() if isinstance(dependency, Usage) } entities = {} outs = {} subprocesses = {} for step in reversed(self.process.topological_steps): if isinstance(, WORKFLOW_STEP_RUN_TYPES): path = None process = else: path = os.path.join(basedir, process = self.children[] subprocess_id = self._id + '/steps/' + inputs = {} for alias, source in step.in_.items(): usage_id = subprocess_id + '/inputs/' + alias if source in ins: dependency = ins[source] inputs[dependency.path] = attr.evolve( dependency, role=alias, id=usage_id, ) elif source in outs: input_path = outs[source] inputs[input_path] = Usage( entity=entities[input_path], role=alias, id=usage_id, ) else: # TODO check that it is not Path or Directory pass subprocess_entity_commit = self.client.find_previous_commit( path, revision=revision ) subprocess = process.create_run( commit=self.commit, client=self.client, part_of=self, process=process, path=path, inputs=inputs, id=subprocess_id, ) subprocess.association = Association.from_activity( subprocess, commit=subprocess_entity_commit, ) for output_path, source in subprocess.outputs.items(): outs.setdefault( + '/' + source, output_path) for generation in subprocess.generated: entity = generation.entity entities[entity.path] = entity if isinstance(entity, Collection): entities.update( **{member.path: member for member in entity.members} ) subprocesses[] = subprocess self._processes.append(subprocess) return subprocesses
[docs] def iter_output_files(self, commit=None): """Yield tuples with output id and path.""" commit = commit or self.commit tools = self.default_children() setattr(self, 'children', tools) for output in self.process.outputs: if output.type not in PATH_OBJECTS: continue if output.outputSource: step_id, _, source = output.outputSource.partition('/') subprocess = self.subprocesses[step_id] for glob, output_id in subprocess.outputs.items(): if == output_id: yield, glob break elif output.outputBinding: glob = output.outputBinding.glob # TODO better support for Expression if glob.startswith('$(inputs.'): input_id = glob[len('$(inputs.'):-1] for input_ in self.inputs: if == input_id: yield, input_.default else: yield, glob
[docs] @outputs.default def default_outputs(self): """Guess default outputs from a workflow.""" return super().default_outputs()
[docs] @generated.default def default_generated(self): """Calculate default values.""" results = [] for output in self.process.outputs: step_id, _, source = output.outputSource.partition('/') assert step_id in self.children for generated in self.subprocesses[step_id].generated: if generated.role == source: results.append( attr.evolve( generated,, activity=self, ) ) break else: raise KeyError(output) return results
@property def nodes(self): """Yield all graph nodes.""" for subprocess in reversed(self._processes): if subprocess.path is None: # skip nodes connecting directory to file continue yield from subprocess.nodes