Source code for renku.core.models.cwl.command_line_tool

# -*- coding: utf-8 -*-
#
# Copyright 2018-2020- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent a ``CommandLineToolFactory`` for tracking workflows."""

import os
import re
import shlex
import time
from contextlib import contextmanager
from pathlib import Path

import attr
import click
from git import Actor

from renku.core import errors
from renku.core.commands.echo import INFO
from renku.version import __version__, version_url

from ...management.config import RENKU_HOME
from ..datastructures import DirectoryTree
from .parameter import CommandInputParameter, CommandLineBinding, CommandOutputParameter
from .types import PATH_OBJECTS, Directory, File

STARTED_AT = int(time.time() * 1000)

RENKU_TMP_DIR = os.path.join(RENKU_HOME, "tmp")
RENKU_FILELIST_PATH = os.getenv("RENKU_FILELIST_PATH", RENKU_TMP_DIR)
INDIRECT_INPUTS_LIST = os.path.join(RENKU_FILELIST_PATH, "inputs.txt")
INDIRECT_OUTPUTS_LIST = os.path.join(RENKU_FILELIST_PATH, "outputs.txt")


[docs]@attr.s class CommandLineToolFactory(object): """Command Line Tool Factory.""" _RE_SUBCOMMAND = re.compile(r"^[A-Za-z]+(-[A-Za-z]+)?$") command_line = attr.ib(converter=lambda cmd: list(cmd) if isinstance(cmd, (list, tuple)) else shlex.split(cmd),) explicit_inputs = attr.ib(factory=list, converter=lambda paths: [Path(os.path.abspath(p)) for p in paths]) explicit_outputs = attr.ib(factory=list, converter=lambda paths: [Path(os.path.abspath(p)) for p in paths]) no_input_detection = attr.ib(default=False) no_output_detection = attr.ib(default=False) directory = attr.ib(default=".", converter=lambda path: Path(path).resolve(),) working_dir = attr.ib(default=".", converter=lambda path: Path(path).resolve(),) stdin = attr.ib(default=None) # null, str, Expression stderr = attr.ib(default=None) # null, str, Expression stdout = attr.ib(default=None) # null, str, Expression baseCommand = attr.ib(init=False) arguments = attr.ib(init=False) inputs = attr.ib(init=False) outputs = attr.ib(init=False) successCodes = attr.ib(default=attr.Factory(list)) # list(int) annotations = attr.ib(default=None) _had_changes = False existing_directories = set() messages = attr.ib(default=None) warnings = attr.ib(default=None) def __attrs_post_init__(self): """Derive basic information.""" self.baseCommand, detect = self.split_command_and_args() self.arguments = [] self.inputs = [] self.outputs = [] if self.stdin: input_ = next(self.guess_inputs(str(self.working_dir / self.stdin))) assert input_.type == "File" input_ = attr.evolve(input_, id="input_stdin", inputBinding=None,) # do not include in tool arguments self.inputs.append(input_) self.stdin = "$(inputs.{0}.path)".format(input_.id) for stream_name in ("stdout", "stderr"): stream = getattr(self, stream_name) if stream and self.is_existing_path(self.working_dir / stream): self.outputs.append(CommandOutputParameter(id="output_{0}".format(stream_name), type=stream_name,)) for input_ in self.guess_inputs(*detect): if isinstance(input_, CommandLineBinding): self.arguments.append(input_) elif ( not self.no_input_detection or input_.type not in PATH_OBJECTS or input_.default.path in self.explicit_inputs ): self.inputs.append(input_) if self.explicit_inputs: for input in self.find_explicit_inputs(): self.inputs.append(input)
[docs] def generate_process_run(self, client, commit, path): """Return an instance of ``ProcessRun``.""" from ..provenance.activities import ProcessRun from ..workflow.run import Run run = Run.from_factory(factory=self, client=client, commit=commit, path=path,) process_run = ProcessRun.from_run(run, client, path, commit) if not self._had_changes: process_run.invalidated = [] if hasattr(self, "annotations") and self.annotations: process_run.add_annotations(self.annotations) return process_run
[docs] def iter_input_files(self, basedir): """Yield tuples with input id and path.""" stdin = getattr(self, "stdin", None) if stdin and stdin[0] != "$": # pragma: no cover raise NotImplementedError(self.stdin) for input_ in self.inputs: if input_.type in PATH_OBJECTS and input_.default: yield (input_.id, os.path.normpath(os.path.join(basedir, str(input_.default.path))))
[docs] @contextmanager def watch(self, client, no_output=False): """Watch a Renku repository for changes to detect outputs.""" client.check_external_storage() repo = client.repo # Remove indirect files list if any self.delete_indirect_files_list() from renku.core.plugins.pluginmanager import get_plugin_manager pm = get_plugin_manager() pm.hook.pre_run(tool=self) self.existing_directories = {str(p.relative_to(client.path)) for p in client.path.glob("**/")} yield self if repo: # Include indirect inputs and outputs before further processing self.add_indirect_inputs() self.add_indirect_outputs() # Remove indirect files list if any self.delete_indirect_files_list() # List of all output paths. output_paths = [] inputs = {input.id: input for input in self.inputs} outputs = list(self.outputs) # Keep track of unmodified output files. unmodified = set() candidates = set() if not self.no_output_detection: # Calculate possible output paths. # Capture newly created files through redirects. candidates |= {file_ for file_ in repo.untracked_files} # Capture modified files through redirects. candidates |= {o.a_path for o in repo.index.diff(None) if not o.deleted_file} # Include explicit outputs candidates |= {str(path.relative_to(self.working_dir)) for path in self.explicit_outputs} from renku.core.commands.graph import _safe_path candidates = {path for path in candidates if _safe_path(path)} for output, input, path in self.guess_outputs(candidates): outputs.append(output) output_paths.append(path) if input is not None: if input.id not in inputs: # pragma: no cover raise RuntimeError("Inconsistent input name.") inputs[input.id] = input for stream_name in ("stdout", "stderr"): stream = getattr(self, stream_name) if stream and stream not in candidates and Path(os.path.abspath(stream)) not in self.explicit_outputs: unmodified.add(stream) elif stream: output_paths.append(stream) if unmodified: raise errors.UnmodifiedOutputs(repo, unmodified) if not no_output and not output_paths: raise errors.OutputsNotFound(repo, inputs.values()) if client.check_external_storage(): lfs_paths = client.track_paths_in_storage(*output_paths) show_message = client.get_value("renku", "show_lfs_message") if lfs_paths and (show_message is None or show_message == "True"): self.messages = ( INFO + "Adding these files to Git LFS:\n" + "\t{}".format("\n\t".join(lfs_paths)) + "\nTo disable this message in the future, run:" + "\n\trenku config show_lfs_message False" ) repo.git.add(*output_paths) if repo.is_dirty(): commit_msg = ("renku run: " "committing {} newly added files").format(len(output_paths)) committer = Actor("renku {0}".format(__version__), version_url) repo.index.commit( commit_msg, committer=committer, skip_hooks=True, ) self._had_changes = True self.inputs = list(inputs.values()) self.outputs = outputs results = pm.hook.cmdline_tool_annotations(tool=self) self.annotations = [a for r in results for a in r]
[docs] @command_line.validator def validate_command_line(self, attribute, value): """Check the command line structure.""" if not value: raise errors.UsageError("Command line can not be empty.")
[docs] @directory.validator def validate_path(self, attribute, value): """Path must exists.""" if not value.exists(): raise errors.UsageError("Directory must exist.")
[docs] def is_existing_path(self, candidate, ignore=None): """Return a path instance if it exists in current directory.""" if ignore and candidate in ignore: return candidate = Path(candidate) if not candidate.is_absolute(): candidate = self.directory / candidate if candidate.exists() or candidate.is_symlink(): try: path = candidate.resolve() path.relative_to(self.directory) except ValueError: # An external file return Path(os.path.abspath(candidate)) else: return path
[docs] def split_command_and_args(self): """Return tuple with command and args from command line arguments.""" if self.is_existing_path(self.command_line[0]): return [], list(self.command_line) cmd = [self.command_line[0]] args = list(self.command_line[1:]) if len(args) < 2: # only guess subcommand for more arguments return cmd, args while args and re.match(self._RE_SUBCOMMAND, args[0]) and not self.is_existing_path(args[0]): cmd.append(args.pop(0)) return cmd, args
[docs] def guess_type(self, value, ignore_filenames=None): """Return new value and CWL parameter type.""" candidate = self.is_existing_path(value, ignore=ignore_filenames) if candidate: try: if candidate.is_dir(): return Directory(path=candidate), "Directory", None return File(path=candidate), "File", None except ValueError: # The candidate points to a file outside the working # directory # TODO suggest that the file should be imported to the repo pass try: value = int(value) return value, "int", None except ValueError: pass if len(value) > 1 and "," in value: return value.split(","), "string[]", "," return value, "string", None
[docs] def guess_inputs(self, *arguments): """Yield command input parameters and command line bindings.""" position = 0 prefix = None output_streams = {getattr(self, stream_name) for stream_name in ("stdout", "stderr")} for index, argument in enumerate(arguments): itemSeparator = None if prefix: if argument.startswith("-"): position += 1 yield CommandLineBinding( position=position, valueFrom=prefix, ) prefix = None if argument.startswith("--"): if "=" in argument: prefix, default = argument.split("=", 1) prefix += "=" default, type, itemSeparator = self.guess_type(default, ignore_filenames=output_streams) # TODO can be output position += 1 yield CommandInputParameter( id="input_{0}".format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=False, ), ) prefix = None else: prefix = argument elif argument.startswith("-"): if len(argument) > 2: if "=" in argument: prefix, default = argument.split("=", 1) prefix += "=" default, type, itemSeparator = self.guess_type(default, ignore_filenames=output_streams) else: # possibly a flag with value prefix = argument[0:2] default, type, itemSeparator = self.guess_type(argument[2:], ignore_filenames=output_streams) position += 1 yield CommandInputParameter( id="input_{0}".format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=not bool(argument[2:]), ), ) prefix = None else: prefix = argument else: default, type, itemSeparator = self.guess_type(argument, ignore_filenames=output_streams) # TODO can be output # TODO there might be an array position += 1 yield CommandInputParameter( id="input_{0}".format(position), type=type, default=default, inputBinding=dict(position=position, itemSeparator=itemSeparator, prefix=prefix,), ) prefix = None if prefix: position += 1 yield CommandLineBinding( position=position, valueFrom=prefix, )
[docs] def guess_outputs(self, candidates): """Yield detected output and changed command input parameter.""" # TODO what to do with duplicate paths & inputs with same defaults candidates = list(candidates) tree = DirectoryTree.from_list(candidates) input_candidates = {} conflicting_paths = {} for index, input in enumerate(self.inputs): # Convert input defaults to paths relative to working directory. if input.type not in PATH_OBJECTS: if self.no_input_detection: continue try: path = self.directory / str(input.default) input_path = Path(os.path.abspath(path)).relative_to(self.working_dir) except FileNotFoundError: continue else: input_path = input.default.path.relative_to(self.working_dir) if input_path.is_dir() and tree.get(input_path): # The directory might exist before running the script subpaths = {str(input_path / path) for path in tree.get(input_path, default=[])} absolute_path = os.path.abspath(input_path) if Path(absolute_path) not in self.explicit_outputs: content = { str(path) for path in input_path.rglob("*") if not path.is_dir() and path.name != ".gitkeep" } preexisting_paths = content - subpaths if preexisting_paths: raise errors.InvalidOutputPath( 'The output directory "{0}" is not empty. \n\n' "Delete existing files before running the " "command:" '\n (use "git rm <file>..." to remove them ' "first)" "\n\n".format(input_path) + "\n".join("\t" + click.style(path, fg="yellow") for path in preexisting_paths) + "\n\n" "Once you have removed files that should be used " "as outputs,\n" "you can safely rerun the previous command." ) # Remove files from the input directory candidates[:] = (path for path in candidates if path not in subpaths) # Include input path in the candidates to check candidates.append(str(input_path)) input_candidates[str(input_path)] = input elif input.type not in PATH_OBJECTS: # Input need to be changed if an output is detected input_candidates[str(input_path)] = input else: # Names that can not be outputs because they are already inputs conflicting_paths[str(input_path)] = input streams = {path for path in (getattr(self, name) for name in ("stdout", "stderr")) if path is not None} # TODO group by a common prefix for position, path in enumerate(candidates): candidate = self.is_existing_path(self.working_dir / path) if candidate is None: raise errors.UsageError('Path "{0}" does not exist.'.format(path)) glob = str(candidate.relative_to(self.working_dir)) if glob in streams: continue new_input = None if glob in conflicting_paths: # it means that it is rewriting a file input = conflicting_paths[glob] new_input = attr.evolve(input, type="string", default=glob) input_candidates[glob] = new_input del conflicting_paths[glob] # TODO add warning ('Output already exists in inputs.') candidate_type = "Directory" if candidate.is_dir() else "File" if glob in input_candidates: input = input_candidates[glob] if new_input is None: new_input = input_candidates[glob] = attr.evolve(input, type="string", default=glob) yield ( CommandOutputParameter( id="output_{0}".format(position), type=candidate_type, outputBinding=dict(glob="$(inputs.{0})".format(input.id),), ), new_input, glob, ) else: yield ( CommandOutputParameter( id="output_{0}".format(position), type=candidate_type, outputBinding=dict(glob=glob,), ), None, glob, )
[docs] def find_explicit_inputs(self): """Yield explicit inputs and command line input bindings if any.""" input_paths = [input.default.path for input in self.inputs if input.type in PATH_OBJECTS] input_id = len(self.inputs) + len(self.arguments) for explicit_input in self.explicit_inputs: if explicit_input in input_paths: continue try: explicit_input.relative_to(self.working_dir) except ValueError: raise errors.UsageError( "The input file or directory is not in the repository." "\n\n\t" + click.style(str(explicit_input), fg="yellow") + "\n\n" ) if self.is_existing_path(explicit_input) is None: raise errors.UsageError( "The input file or directory does not exist." "\n\n\t" + click.style(str(explicit_input), fg="yellow") + "\n\n" ) input_id += 1 default, type, _ = self.guess_type(explicit_input) # Explicit inputs are either File or Directory assert type in PATH_OBJECTS # The inputBinging is None because these inputs won't # appear on command-line yield CommandInputParameter(id="input_{0}".format(input_id), type=type, default=default, inputBinding=None)
[docs] def delete_indirect_files_list(self): """Remove indirect inputs and outputs list.""" try: path = str(self.working_dir / INDIRECT_INPUTS_LIST) os.remove(path) except FileNotFoundError: pass try: path = str(self.working_dir / INDIRECT_OUTPUTS_LIST) os.remove(path) except FileNotFoundError: pass
[docs] def add_indirect_inputs(self): """Read indirect inputs list and add them to explicit inputs.""" for indirect_input in self.read_files_list(INDIRECT_INPUTS_LIST): # treat indirect inputs like explicit inputs self.explicit_inputs.append(indirect_input) # add new explicit inputs (if any) to inputs for input in self.find_explicit_inputs(): self.inputs.append(input)
[docs] def add_indirect_outputs(self): """Read indirect outputs list and add them to explicit outputs.""" for indirect_output in self.read_files_list(INDIRECT_OUTPUTS_LIST): # treat indirect outputs like explicit outputs self.explicit_outputs.append(indirect_output)
[docs] def read_files_list(self, files_list): """Read files list where each line is a filepath.""" try: path = str(files_list) with open(path, "r") as f: for line in f: line = line.strip() if line: yield Path(os.path.abspath(line)) except FileNotFoundError: return