# -*- coding: utf-8 -*-
#
# Copyright 2018-2020 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client for handling a local repository."""
import hashlib
import json
import os
import shutil
import subprocess
import uuid
from collections import defaultdict
from contextlib import contextmanager
from subprocess import check_output
import attr
import filelock
import yaml
from jinja2 import Template
from werkzeug.utils import cached_property, secure_filename
from renku.core import errors
from renku.core.compat import Path
from renku.core.management.config import RENKU_HOME
from renku.core.models.projects import Project
from renku.core.models.refs import LinkReference
from .git import GitCore
DEFAULT_DATA_DIR = "data"
[docs]def default_path(path="."):
"""Return default repository path."""
from git import InvalidGitRepositoryError
from renku.core.commands.git import get_git_home
try:
return get_git_home(path=path)
except InvalidGitRepositoryError:
return path
[docs]def path_converter(path):
"""Converter for path in PathMixin."""
return Path(path).resolve()
[docs]@attr.s
class PathMixin:
"""Define a default path attribute."""
path = attr.ib(default=default_path, converter=path_converter,)
@path.validator
def _check_path(self, _, value):
"""Check the path exists and it is a directory."""
if not (value.exists() and value.is_dir()):
raise ValueError("Define an existing directory.")
[docs]@attr.s
class RepositoryApiMixin(GitCore):
"""Client for handling a local repository."""
renku_home = attr.ib(default=RENKU_HOME)
"""Define a name of the Renku folder (default: ``.renku``)."""
renku_path = attr.ib(init=False)
"""Store a ``Path`` instance of the Renku folder."""
parent = attr.ib(default=None)
"""Store a pointer to the parent repository."""
data_dir = attr.ib(
default=DEFAULT_DATA_DIR, kw_only=True, converter=lambda value: str(value) if value else DEFAULT_DATA_DIR
)
"""Define a name of the folder for storing datasets."""
METADATA = "metadata.yml"
"""Default name of Renku config file."""
LOCK_SUFFIX = ".lock"
"""Default suffix for Renku lock file."""
WORKFLOW = "workflow"
"""Directory for storing workflow in Renku."""
ACTIVITY_INDEX = "activity_index.yaml"
"""Caches activities that generated a path."""
DOCKERFILE = "Dockerfile"
"""Name of the Dockerfile in the repo."""
TEMPLATE_CHECKSUMS = "template_checksums.json"
RENKU_PROTECTED_PATHS = [
"\\.renku/.*",
"Dockerfile",
"\\.dockerignore",
"\\.gitignore",
"\\.gitattributes",
"\\.gitlab-ci\\.yml",
"environment\\.yml",
"requirements\\.txt",
]
_commit_activity_cache = attr.ib(factory=dict)
_activity_index = attr.ib(default=None)
_remote_cache = attr.ib(factory=dict)
def __attrs_post_init__(self):
"""Initialize computed attributes."""
#: Configure Renku path.
path = Path(self.renku_home)
if not path.is_absolute():
path = self.path / path
path.relative_to(path)
self.renku_path = path
data_dir = self.get_value("renku", self.DATA_DIR_CONFIG_KEY, local_only=True)
self.data_dir = data_dir or self.data_dir
self._subclients = {}
self._project = None
super().__attrs_post_init__()
# initialize submodules
if self.repo:
try:
check_output(["git", "submodule", "update", "--init", "--recursive"], cwd=str(self.path))
except subprocess.CalledProcessError:
pass
@property
def latest_agent(self):
"""Returns latest agent version used in the repository."""
try:
return self.project.agent_version
except ValueError as e:
if "Project name not set" in str(e):
return None
raise
@property
def lock(self):
"""Create a Renku config lock."""
return filelock.FileLock(str(self.renku_path.with_suffix(self.LOCK_SUFFIX)), timeout=0,)
@property
def renku_metadata_path(self):
"""Return a ``Path`` instance of Renku metadata file."""
return self.renku_path.joinpath(self.METADATA)
@property
def workflow_path(self):
"""Return a ``Path`` instance of the workflow folder."""
return self.renku_path / self.WORKFLOW
@property
def activity_index_path(self):
"""Path to the activity filepath cache."""
return self.renku_path / self.ACTIVITY_INDEX
@property
def docker_path(self):
"""Path to the Dockerfile."""
return self.path / self.DOCKERFILE
@property
def template_checksums(self):
"""Return a ``Path`` instance to the template checksums file."""
return self.renku_path / self.TEMPLATE_CHECKSUMS
[docs] @cached_property
def cwl_prefix(self):
"""Return a CWL prefix."""
self.workflow_path.mkdir(parents=True, exist_ok=True) # for Python 3.5
return str(self.workflow_path.resolve().relative_to(self.path))
@property
def project(self):
"""Return the Project instance."""
if self.renku_metadata_path.exists() and self._project is None:
self._project = Project.from_yaml(self.renku_metadata_path, client=self)
return self._project
@property
def remote(self, remote_name="origin"):
"""Return host, owner and name of the remote if it exists."""
from renku.core.models.git import GitURL
original_remote_name = remote_name
if original_remote_name in self._remote_cache:
return self._remote_cache[original_remote_name]
host = owner = name = None
try:
remote_branch = self.repo.head.reference.tracking_branch()
if remote_branch is not None:
remote_name = remote_branch.remote_name
except TypeError:
pass
try:
url = GitURL.parse(self.repo.remotes[remote_name].url)
# Remove gitlab. unless running on gitlab.com.
hostname_parts = url.hostname.split(".")
if len(hostname_parts) > 2 and hostname_parts[0] == "gitlab":
hostname_parts = hostname_parts[1:]
url = attr.evolve(url, hostname=".".join(hostname_parts))
except IndexError:
url = None
if url:
host = url.hostname
owner = url.owner
name = url.name
remote = {"host": host, "owner": owner, "name": name}
self._remote_cache[original_remote_name] = remote
return remote
[docs] def is_project_set(self):
"""Return if project is set for the client."""
return self._project is not None
[docs] def process_commit(self, commit=None, path=None):
"""Build an :class:`~renku.core.models.provenance.activities.Activity`.
:param commit: Commit to process. (default: ``HEAD``)
:param path: Process a specific CWL file.
"""
from renku.core.models.provenance.activities import Activity
commit = commit or self.repo.head.commit
if len(commit.parents) > 1:
return Activity(commit=commit, client=self)
if path is None:
for file_ in commit.stats.files.keys():
# Find a process (CommandLineTool or Workflow)
if self.is_workflow(file_):
if path is not None:
# Regular activity since it edits multiple CWL files
return Activity(commit=commit, client=self)
path = file_
if not path:
# search for activities a file could have been a part of
activities = self.activities_for_paths(commit.stats.files.keys(), file_commit=commit, revision="HEAD")
if len(activities) > 1:
raise errors.CommitProcessingError(
"Found multiple activities that produced the same entity at commit {}".format(commit)
)
if activities:
return activities[0]
else:
data = (commit.tree / path).data_stream.read()
process = Activity.from_jsonld(yaml.safe_load(data), client=self, commit=commit)
return process
return Activity(commit=commit, client=self)
[docs] def is_workflow(self, path):
"""Check if the path is a valid CWL file."""
return path.startswith(self.cwl_prefix) and path.endswith(".yaml")
[docs] def find_previous_commit(self, paths, revision="HEAD", return_first=False, full=False):
"""Return a previous commit for a given path starting from ``revision``.
:param revision: revision to start from, defaults to ``HEAD``
:param return_first: show the first commit in the history
:param full: return full history
:raises KeyError: if path is not present in the given commit
"""
kwargs = {}
if full:
kwargs["full_history"] = True
if return_first:
file_commits = list(self.repo.iter_commits(revision, paths=paths, **kwargs))
else:
file_commits = list(self.repo.iter_commits(revision, paths=paths, max_count=1, **kwargs))
if not file_commits:
raise KeyError("Could not find a file {0} in range {1}".format(paths, revision))
return file_commits[-1 if return_first else 0]
[docs] @cached_property
def workflow_names(self):
"""Return index of workflow names."""
names = defaultdict(list)
for ref in LinkReference.iter_items(self, common_path="workflows"):
names[str(ref.reference.relative_to(self.path))].append(ref.name)
return names
[docs] @cached_property
def submodules(self):
"""Return list of submodules it belongs to."""
if self.parent:
client, submodule = self.parent
return client.submodules + [submodule.name]
return []
[docs] def subclients(self, parent_commit):
"""Return mapping from submodule to client."""
if parent_commit in self._subclients:
return self._subclients[parent_commit]
try:
from git import Submodule
submodules = [submodule for submodule in Submodule.iter_items(self.repo, parent_commit=parent_commit)]
except (RuntimeError, ValueError):
# There are no submodules associated with the given commit.
submodules = []
subclients = {}
for submodule in submodules:
subpath = (self.path / submodule.path).resolve()
is_renku = subpath / Path(self.renku_home)
if subpath.exists() and is_renku.exists():
subclients[submodule] = self.__class__(path=subpath, parent=(self, submodule),)
return subclients
[docs] def resolve_in_submodules(self, commit, path):
"""Resolve filename in submodules."""
original_path = self.path / path
in_vendor = str(path).startswith(".renku/vendors")
if original_path.is_symlink() or in_vendor:
original_path = Path(os.path.realpath(os.path.abspath(str(original_path))))
for submodule, subclient in self.subclients(commit).items():
if (Path(submodule.path) / Path(".git")).exists():
try:
subpath = original_path.relative_to(subclient.path)
return (
subclient,
subclient.find_previous_commit(subpath, revision=submodule.hexsha),
subpath,
)
except ValueError:
pass
return self, commit, path
[docs] @contextmanager
def with_commit(self, commit):
"""Yield the state of the repo at a specific commit."""
current_branch = None
current_commit = None
try:
current_branch = self.repo.active_branch
except TypeError as e:
# not on a branch, detached head
if "HEAD is a detached" in str(e):
current_commit = self.repo.head.commit
else:
raise ValueError("Couldn't get active branch or commit", e)
self.repo.git.checkout(commit)
try:
yield
finally:
if current_branch:
self.repo.git.checkout(current_branch)
elif current_commit:
self.repo.git.checkout(current_commit)
[docs] @contextmanager
def with_workflow_storage(self):
"""Yield a workflow storage."""
from renku.core.models.cwl.workflow import Workflow
workflow = Workflow()
yield workflow
for step in workflow.steps:
step_name = "{0}_{1}.yaml".format(uuid.uuid4().hex, secure_filename("_".join(step.run.baseCommand)),)
workflow_path = self.workflow_path
if not workflow_path.exists():
workflow_path.mkdir()
path = workflow_path / step_name
run = step.run.generate_process_run(client=self, commit=self.repo.head.commit, path=path,)
run.to_yaml(path=path)
self.add_to_activity_index(run)
[docs] def init_repository(self, force=False, user=None):
"""Initialize an empty Renku repository."""
from git import Repo
from renku.core.models.provenance.agents import Person
# verify if folder is empty
if self.repo is not None and not force:
raise errors.InvalidFileOperation(
"Folder {0} already contains file. Use --force to overwrite".format(self.repo.git_dir)
)
# initialize repo and set user data
path = self.path.absolute()
self.repo = Repo.init(str(path))
if user:
config_writer = self.repo.config_writer()
for key, value in user.items():
config_writer.set_value("user", key, value)
config_writer.release()
# verify if author information is available
Person.from_git(self.repo)
@property
def path_activity_cache(self):
"""Cache of all activities and their generated paths."""
# TODO: this is there for performance reasons. Remove once graph
# is stored as a flat, append-only list (Should be graph query
# in the future)
if self._activity_index:
return self._activity_index
path = self.activity_index_path
if path.exists():
with path.open("r") as stream:
self._activity_index = yaml.safe_load(stream)
else:
self._activity_index = {}
return self._activity_index
[docs] def add_to_activity_index(self, activity):
"""Add an activity and it's generations to the cache."""
for g in activity.generated:
if g.path not in self.path_activity_cache:
self.path_activity_cache[g.path] = {}
hexsha = g.commit.hexsha
if hexsha not in self.path_activity_cache[g.path]:
self.path_activity_cache[g.path][hexsha] = []
if activity.path in self.path_activity_cache[g.path][hexsha]:
continue
self.path_activity_cache[g.path][g.commit.hexsha].append(activity.path)
if self.path_activity_cache:
with self.activity_index_path.open("w") as stream:
yaml.dump(self.path_activity_cache, stream)
[docs] def activities_for_paths(self, paths, file_commit=None, revision="HEAD"):
"""Get all activities involving a path."""
from renku.core.models.provenance.activities import Activity
result = set()
for p in paths:
if p not in self.path_activity_cache:
parent_paths = [a for a in self.path_activity_cache.keys() if p.startswith(a)]
if not parent_paths:
continue
matching_activities = [
a
for k, v in self.path_activity_cache.items()
for ck, cv in v.items()
for a in cv
if k in parent_paths and (not file_commit or ck == file_commit.hexsha)
]
else:
matching = self.path_activity_cache[p]
if file_commit:
if file_commit.hexsha not in matching:
continue
matching_activities = matching[file_commit.hexsha]
else:
matching_activities = [a for v in matching.values() for a in v]
for a in matching_activities:
if (a, revision) in self._commit_activity_cache:
activity = self._commit_activity_cache[(a, revision)]
else:
try:
commit = self.find_previous_commit(a, revision=revision)
except KeyError:
continue
activity = Activity.from_yaml(self.path / a, client=self, commit=commit)
self._commit_activity_cache[(a, revision)] = activity
result.add(activity)
return list(result)
[docs] def import_from_template(self, template_path, metadata, force=False):
"""Render template files from a template directory."""
checksums = {}
for file in template_path.glob("**/*"):
rel_path = file.relative_to(template_path)
destination = self.path / rel_path
try:
# TODO: notify about the expected variables - code stub:
# with file.open() as fr:
# file_content = fr.read()
# # look for the required keys
# env = Environment()
# parsed = env.parse(file_content)
# variables = meta.find_undeclared_variables(parsed)
# parse file and process it
template = Template(file.read_text())
rendered_content = template.render(metadata)
# NOTE: the path could contain template variables, we need to template it
destination = Path(Template(str(destination)).render(metadata))
destination.write_text(rendered_content)
checksums[str(rel_path)] = self._content_hash(destination)
except IsADirectoryError:
destination.mkdir(parents=True, exist_ok=True)
except TypeError:
shutil.copy(file, destination)
self.template_checksums.parent.mkdir(parents=True, exist_ok=True)
with open(self.template_checksums, "w") as checksum_file:
json.dump(checksums, checksum_file)
[docs] def check_immutable_template_files(self, *paths):
"""Check paths and return a list of those that are marked immutable in the project template."""
if not self.project.immutable_template_files:
return []
immutable_template_files = self.project.immutable_template_files or []
return [p for p in paths if str(p) in immutable_template_files]
def _content_hash(self, path):
"""Calculate the sha256 hash of a file."""
sha256_hash = hashlib.sha256()
with open(str(path), "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()