# -*- coding: utf-8 -*-
#
# Copyright 2018-2019 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent a Git commit."""
import os
import uuid
from collections import OrderedDict
from pathlib import Path
import attr
from git import NULL_TREE
from renku.core.models import jsonld
from renku.core.models.cwl import WORKFLOW_STEP_RUN_TYPES
from renku.core.models.cwl.ascwl import CWLClass
from renku.core.models.cwl.types import PATH_OBJECTS
from renku.core.models.refs import LinkReference
from .agents import Person, renku_agent
from .entities import Collection, CommitMixin, Entity, Process, Workflow
from .qualified import Association, Generation, Usage
def _nodes(output, parent=None):
"""Yield nodes from entities."""
# NOTE refactor so all outputs behave the same
entity = getattr(output, 'entity', output)
if isinstance(entity, Collection):
for member in entity.members:
if parent is not None:
member = attr.evolve(member, parent=parent)
yield from _nodes(member)
yield output
else:
yield output
[docs]@jsonld.s(
type='prov:Activity',
context={
'prov': 'http://www.w3.org/ns/prov#',
'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
'schema': 'http://schema.org/'
},
cmp=False,
)
class Activity(CommitMixin):
"""Represent an activity in the repository."""
_id = jsonld.ib(context='@id', kw_only=True)
_message = jsonld.ib(context='rdfs:comment', kw_only=True)
_was_informed_by = jsonld.ib(
context='prov:wasInformedBy',
kw_only=True,
)
part_of = attr.ib(default=None, kw_only=True)
process = attr.ib(default=None, kw_only=True)
outputs = attr.ib(kw_only=True)
_collections = attr.ib(
default=attr.Factory(OrderedDict), init=False, kw_only=True
)
generated = jsonld.ib(
context={
'@reverse': 'prov:activity',
}, kw_only=True, hash=False
)
influenced = jsonld.ib(
context='prov:influenced',
kw_only=True,
)
started_at_time = jsonld.ib(
context={
'@id': 'prov:startedAtTime',
'@type': 'http://www.w3.org/2001/XMLSchema#dateTime',
},
kw_only=True,
)
ended_at_time = jsonld.ib(
context={
'@id': 'prov:endedAtTime',
'@type': 'http://www.w3.org/2001/XMLSchema#dateTime',
},
kw_only=True,
)
agent = jsonld.ib(context='prov:agent', kw_only=True, default=renku_agent)
person_agent = jsonld.ib(context='prov:agent', kw_only=True)
[docs] @generated.default
def default_generated(self):
"""Entities generated by this Action."""
results = []
for path, role in self.outputs.items():
client, commit, path = self.client.resolve_in_submodules(
self.commit,
path,
)
output_path = client.path / path
parents = list(output_path.relative_to(client.path).parents)
collection = None
members = []
for parent in reversed(parents[:-1]):
if str(parent) in self._collections:
collection = self._collections[str(parent)]
else:
collection = Collection(
client=client,
commit=commit,
path=str(parent),
members=[],
parent=collection,
)
members.append(collection)
self._collections[str(parent)] = collection
members = collection.members
entity_cls = Entity
if (self.client.path / path).is_dir():
entity_cls = Collection
# TODO: use a factory method to generate the entity
if str(path).startswith(
os.path.join(client.renku_home, client.DATASETS)
):
entity = client.get_dataset(Path(path), commit=commit)
else:
entity = entity_cls(
commit=commit,
client=client,
path=str(path),
parent=collection,
)
if collection:
collection.members.append(entity)
results.append(
Generation(
activity=self,
entity=entity,
role=role,
)
)
return results
[docs] @influenced.default
def default_influenced(self):
"""Calculate default values."""
return list(self._collections.values())
@property
def parents(self):
"""Return parent commits."""
return list(self.commit.parents)
@property
def paths(self):
"""Return all paths in the commit."""
index = set()
for file_ in self.commit.diff(self.commit.parents or NULL_TREE):
path_ = Path(file_.a_path)
is_dataset = self.client.DATASETS in str(path_)
not_refs = LinkReference.REFS not in str(path_)
does_not_exists = not path_.exists()
if all([is_dataset, not_refs, does_not_exists]):
uid = uuid.UUID(path_.parent.name)
path_ = (
Path(self.client.renku_home) / self.client.DATASETS /
str(uid) / self.client.METADATA
)
index.add(str(path_))
return index
[docs] @classmethod
def generate_id(cls, commit):
"""Calculate action ID."""
return 'commit/{commit.hexsha}'.format(commit=commit)
[docs] @_id.default
def default_id(self):
"""Configure calculated ID."""
return self.generate_id(self.commit)
[docs] @_message.default
def default_message(self):
"""Generate a default message."""
return self.commit.message
[docs] @outputs.default
def default_outputs(self):
"""Guess default outputs from a commit."""
return {path: None for path in self.paths}
[docs] @started_at_time.default
def default_started_at_time(self):
"""Configure calculated properties."""
return self.commit.authored_datetime.isoformat()
[docs] @ended_at_time.default
def default_ended_at_time(self):
"""Configure calculated properties."""
return self.commit.committed_datetime.isoformat()
[docs] @person_agent.default
def default_person_agent(self):
"""Set person agent to be the author of the commit."""
if self.commit:
return Person.from_commit(self.commit)
return None
@property
def nodes(self):
"""Return topologically sorted nodes."""
collections = OrderedDict()
def _parents(node):
if node.parent:
yield from _parents(node.parent)
yield node.parent
for output in self.generated:
for parent in _parents(output.entity):
collections[parent.path] = parent
yield from _nodes(output)
yield from reversed(collections.values())
[docs]@jsonld.s(
type='wfprov:ProcessRun',
context={
'wfprov': 'http://purl.org/wf4ever/wfprov#',
},
cmp=False,
)
class ProcessRun(Activity):
"""A process run is a particular execution of a Process description."""
__association_cls__ = Process
inputs = attr.ib(kw_only=True)
outputs = attr.ib(kw_only=True)
generated = jsonld.ib(
context={
'@reverse': 'prov:activity',
},
kw_only=True,
hash=False,
)
association = jsonld.ib(
context='prov:qualifiedAssociation',
default=None,
kw_only=True,
)
qualified_usage = jsonld.ib(context='prov:qualifiedUsage', kw_only=True)
[docs] @generated.default
def default_generated(self):
"""Calculate default values."""
return super().default_generated()
def __attrs_post_init__(self):
"""Calculate properties."""
if self.association is None:
self.association = Association.from_activity(self)
if self.path is None:
# FIXME only works for linking directory to file
existing_outputs = set(self.outputs.values())
for output_id, output_path in self.iter_output_files():
if output_id not in existing_outputs:
self.outputs[os.path.join(
next(
path for path, usage in self.inputs.items()
if usage.role == 'input_directory'
), output_path
)] = output_id
break
[docs] @qualified_usage.default
def default_qualified_usage(self):
"""Generate list of used artifacts."""
return list(self.inputs.values())
[docs] def iter_output_files(self, commit=None):
"""Yield tuples with output id and path."""
process = self.process
for output in process.outputs:
if output.type in {'stdout', 'stderr'}:
stream = getattr(process, output.type)
if stream:
yield output.id, stream
elif output.type in PATH_OBJECTS:
glob = output.outputBinding.glob
# TODO better support for Expression
if glob.startswith('$(inputs.'):
input_id = glob[len('$(inputs.'):-1]
for input_ in process.inputs:
if input_.id == input_id:
yield output.id, input_.default
break # out from process.inputs
else:
yield output.id, glob
[docs] @outputs.default
def default_outputs(self):
"""Guess default outputs from a process."""
if self.path is None:
return {}
return {
output_path: output_id
for output_id, output_path in self.iter_output_files()
}
@property
def parents(self):
"""Return parent commits."""
return [
member.commit for usage in self.qualified_usage
for member in usage.entity.entities
] + super().parents
@property
def nodes(self):
"""Return topologically sorted nodes."""
# Outputs go first
yield from super().nodes
# Activity itself
yield self.association.plan
[docs]@jsonld.s(
type='wfprov:WorkflowRun',
context={
'wfprov': 'http://purl.org/wf4ever/wfprov#',
},
cmp=False,
)
class WorkflowRun(ProcessRun):
"""A workflow run typically contains several subprocesses."""
__association_cls__ = Workflow
# @reverse wfprov:wasPartOfWorkflowRun
children = attr.ib(kw_only=True)
_processes = jsonld.ib(
context={
'@reverse': 'wfprov:wasPartOfWorkflowRun',
},
default=attr.Factory(list),
kw_only=True,
)
subprocesses = attr.ib(kw_only=True)
outputs = attr.ib(kw_only=True)
generated = jsonld.ib(
context={
'@reverse': 'prov:activity',
},
kw_only=True,
hash=False,
)
[docs] @children.default
def default_children(self):
"""Load children from process."""
basedir = os.path.dirname(self.path) if self.path is not None else None
def _load(step):
"""Load step definition."""
if isinstance(step.run, WORKFLOW_STEP_RUN_TYPES):
return step.run
if self.commit:
import yaml
data = (self.commit.tree / basedir /
step.run).data_stream.read()
return CWLClass.from_cwl(yaml.safe_load(data))
return CWLClass.from_yaml(step.run)
return {step.id: _load(step) for step in self.process.steps}
[docs] @subprocesses.default
def default_subprocesses(self):
"""Load subprocesses."""
basedir = os.path.dirname(self.path)
revision = '{0}^'.format(self.commit)
ins = {
dependency.role: dependency
for path, dependency in self.inputs.items()
if isinstance(dependency, Usage)
}
entities = {}
outs = {}
subprocesses = {}
for step in reversed(self.process.topological_steps):
if isinstance(step.run, WORKFLOW_STEP_RUN_TYPES):
path = None
process = step.run
else:
path = os.path.join(basedir, step.run)
process = self.children[step.id]
subprocess_id = self._id + '/steps/' + step.id
inputs = {}
for alias, source in step.in_.items():
usage_id = subprocess_id + '/inputs/' + alias
if source in ins:
dependency = ins[source]
inputs[dependency.path] = attr.evolve(
dependency,
role=alias,
id=usage_id,
)
elif source in outs:
input_path = outs[source]
inputs[input_path] = Usage(
entity=entities[input_path],
role=alias,
id=usage_id,
)
else:
# TODO check that it is not Path or Directory
pass
subprocess_entity_commit = self.client.find_previous_commit(
path, revision=revision
)
subprocess = process.create_run(
commit=self.commit,
client=self.client,
part_of=self,
process=process,
path=path,
inputs=inputs,
id=subprocess_id,
)
subprocess.association = Association.from_activity(
subprocess,
commit=subprocess_entity_commit,
)
for output_path, source in subprocess.outputs.items():
outs.setdefault(step.id + '/' + source, output_path)
for generation in subprocess.generated:
entity = generation.entity
entities[entity.path] = entity
if isinstance(entity, Collection):
entities.update(
**{member.path: member
for member in entity.members}
)
subprocesses[step.id] = subprocess
self._processes.append(subprocess)
return subprocesses
[docs] def iter_output_files(self, commit=None):
"""Yield tuples with output id and path."""
commit = commit or self.commit
tools = self.default_children()
setattr(self, 'children', tools)
for output in self.process.outputs:
if output.type not in PATH_OBJECTS:
continue
if output.outputSource:
step_id, _, source = output.outputSource.partition('/')
subprocess = self.subprocesses[step_id]
for glob, output_id in subprocess.outputs.items():
if output.id == output_id:
yield output.id, glob
break
elif output.outputBinding:
glob = output.outputBinding.glob
# TODO better support for Expression
if glob.startswith('$(inputs.'):
input_id = glob[len('$(inputs.'):-1]
for input_ in self.inputs:
if input_.id == input_id:
yield output.id, input_.default
else:
yield output.id, glob
[docs] @outputs.default
def default_outputs(self):
"""Guess default outputs from a workflow."""
return super().default_outputs()
[docs] @generated.default
def default_generated(self):
"""Calculate default values."""
results = []
for output in self.process.outputs:
step_id, _, source = output.outputSource.partition('/')
assert step_id in self.children
for generated in self.subprocesses[step_id].generated:
if generated.role == source:
results.append(
attr.evolve(
generated,
role=output.id,
activity=self,
)
)
break
else:
raise KeyError(output)
return results
@property
def nodes(self):
"""Yield all graph nodes."""
for subprocess in reversed(self._processes):
if subprocess.path is None:
# skip nodes connecting directory to file
continue
yield from subprocess.nodes