Source code for renku.core.models.cwl.command_line_tool

# -*- coding: utf-8 -*-
#
# Copyright 2018-2020- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent a ``CommandLineTool`` from the Common Workflow Language."""

import fnmatch
import os
import re
import shlex
import time
from contextlib import contextmanager
from pathlib import Path

import attr
import click

from renku.core import errors

from ...management.config import RENKU_HOME
from ..datastructures import DirectoryTree
from .annotation import Annotation
from .ascwl import CWLClass, mapped
from .parameter import CommandInputParameter, CommandLineBinding, \
    CommandOutputParameter
from .process import Process
from .types import PATH_OBJECTS, Directory, File

STARTED_AT = int(time.time() * 1000)

RENKU_TMP_DIR = os.path.join(RENKU_HOME, 'tmp')
RENKU_FILELIST_PATH = os.getenv('RENKU_FILELIST_PATH', RENKU_TMP_DIR)
INDIRECT_INPUTS_LIST = os.path.join(RENKU_FILELIST_PATH, 'inputs.txt')
INDIRECT_OUTPUTS_LIST = os.path.join(RENKU_FILELIST_PATH, 'outputs.txt')


[docs]def convert_arguments(value): """Convert arguments from various input formats.""" if isinstance(value, (list, tuple)): return [ CommandLineBinding(**item) if isinstance(item, dict) else item for item in value ] return shlex.split(value)
[docs]@attr.s class CommandLineTool(Process, CWLClass): """Represent a command line tool.""" STD_STREAMS_REPR = { 'stdin': '<', 'stdout': '>', 'stderr': '2>', } """Format streams for a shell command representation.""" # specialize inputs and outputs with Command{Input,Output}Parameter baseCommand = attr.ib( default='', validator=lambda self, attr, cmd: bool(cmd), ) # str or list(str) -> shutil.split() arguments = attr.ib( default=attr.Factory(list), converter=convert_arguments, ) # list(string, Expression, CommandLineBinding) stdin = attr.ib(default=None) stdout = attr.ib(default=None) stderr = attr.ib(default=None) inputs = mapped(CommandInputParameter) outputs = mapped(CommandOutputParameter) successCodes = attr.ib(default=attr.Factory(list)) # list(int) temporaryFailCodes = attr.ib(default=attr.Factory(list)) # list(int) permanentFailCodes = attr.ib(default=attr.Factory(list)) # list(int) annotations = attr.ib( metadata={ 'cwl_metadata': { 'namespace': 'http://www.w3.org/ns/oa#', 'prefix': 'oa', 'property': 'oa:hasTarget', 'reverse': True, 'type': Annotation } }, default=None ) def _std_streams(self, basedir=None): """Return mapped standard streams.""" streams = {} if self.stdin: assert self.stdin.startswith('$(inputs.') input_id = self.stdin.split('.')[1] for input_ in self.inputs: if input_id == input_.id: streams['stdin'] = os.path.relpath( str(Path(basedir or '.') / str(input_.default)) ) break if self.stdout: streams['stdout'] = self.stdout if self.stderr: streams['stderr'] = self.stderr return streams def __str__(self): """Generate a simple representation.""" return ' '.join(self.to_argv()) + ' ' + ' '.join( self.STD_STREAMS_REPR[key] + ' ' + str(path) for key, path in self._std_streams().items() )
[docs] def create_run(self, **kwargs): """Return an instance of process run.""" from renku.core.models.provenance.activities import ProcessRun return ProcessRun(**kwargs)
[docs] def get_output_id(self, path): # pragma: no cover """Return an id of the matching path from default values.""" for output in self.outputs: if output.type in {'stdout', 'stderr'}: stream = getattr(self, output.type) if stream == path: return output.id elif output.type in PATH_OBJECTS: glob = output.outputBinding.glob # TODO better support for Expression if glob.startswith('$(inputs.'): input_id = glob[len('$(inputs.'):-1] for input_ in self.inputs: if input_.id == input_id and input_.default == path: return output.id elif fnmatch.fnmatch(path, glob): return output.id
[docs] def to_argv(self, job=None): """Generate arguments for system call.""" if not isinstance(self.baseCommand, list): argv = [self.baseCommand] else: argv = list(self.baseCommand) args = [(a.position, a) for a in self.arguments] for i in self.inputs: if i.inputBinding: args.append((i.inputBinding.position, i)) for _, v in sorted(args): argv.extend(v.to_argv()) return argv
[docs]@attr.s class CommandLineToolFactory(object): """Command Line Tool Factory.""" _RE_SUBCOMMAND = re.compile(r'^[A-Za-z]+(-[A-Za-z]+)?$') command_line = attr.ib( converter=lambda cmd: list(cmd) if isinstance(cmd, (list, tuple)) else shlex.split(cmd), ) explicit_inputs = attr.ib( default=[], converter=lambda paths: [Path(path).resolve() for path in paths] ) explicit_outputs = attr.ib( default=[], converter=lambda paths: [Path(path).resolve() for path in paths] ) directory = attr.ib( default='.', converter=lambda path: Path(path).resolve(), ) working_dir = attr.ib( default='.', converter=lambda path: Path(path).resolve(), ) stdin = attr.ib(default=None) # null, str, Expression stderr = attr.ib(default=None) # null, str, Expression stdout = attr.ib(default=None) # null, str, Expression baseCommand = attr.ib(init=False) arguments = attr.ib(init=False) inputs = attr.ib(init=False) outputs = attr.ib(init=False) successCodes = attr.ib(default=attr.Factory(list)) # list(int) def __attrs_post_init__(self): """Derive basic information.""" self.baseCommand, detect = self.split_command_and_args() self.arguments = [] self.inputs = [] self.outputs = [] if self.stdin: input_ = next( self.guess_inputs(str(self.working_dir / self.stdin)) ) assert input_.type == 'File' input_ = attr.evolve( input_, id='input_stdin', inputBinding=None, # do not include in tool arguments ) self.inputs.append(input_) self.stdin = '$(inputs.{0}.path)'.format(input_.id) for stream_name in ('stdout', 'stderr'): stream = getattr(self, stream_name) if stream and self.file_candidate(self.working_dir / stream): self.outputs.append( CommandOutputParameter( id='output_{0}'.format(stream_name), type=stream_name, ) ) for input_ in self.guess_inputs(*detect): if isinstance(input_, CommandLineBinding): self.arguments.append(input_) else: self.inputs.append(input_) if self.explicit_inputs: for input in self.find_explicit_inputs(): self.inputs.append(input)
[docs] def generate_tool(self): """Return an instance of command line tool.""" return CommandLineTool( stdin=self.stdin, stderr=self.stderr, stdout=self.stdout, baseCommand=self.baseCommand, arguments=self.arguments, inputs=self.inputs, outputs=self.outputs, successCodes=self.successCodes, )
[docs] @contextmanager def watch(self, client, no_output=False): """Watch a Renku repository for changes to detect outputs.""" tool = self.generate_tool() repo = client.repo # Remove indirect files list if any self.delete_indirect_files_list() # NOTE consider to use git index instead existing_directories = { str(p.relative_to(client.path)) for p in client.path.glob('**/') } from renku.core.plugins.pluginmanager import get_plugin_manager pm = get_plugin_manager() pm.hook.pre_run(tool=tool) yield tool if repo: # Include indirect inputs and outputs before further processing self.add_indirect_inputs() self.add_indirect_outputs() # Remove indirect files list if any self.delete_indirect_files_list() # List of all output paths. paths = [] inputs = {input.id: input for input in self.inputs} outputs = list(tool.outputs) # Keep track of unmodified output files. unmodified = set() # Calculate possible output paths. # Capture newly created files through redirects. candidates = {file_ for file_ in repo.untracked_files} # Capture modified files through redirects. candidates |= { o.a_path for o in repo.index.diff(None) if not o.deleted_file } from renku.core.commands.graph import _safe_path candidates = {path for path in candidates if _safe_path(path)} for output, input, path in self.guess_outputs(candidates): outputs.append(output) paths.append(path) if input is not None: if input.id not in inputs: # pragma: no cover raise RuntimeError('Inconsistent input name.') inputs[input.id] = input for stream_name in ('stdout', 'stderr'): stream = getattr(self, stream_name) if ( stream and stream not in candidates and Path(stream).resolve() not in self.explicit_outputs ): unmodified.add(stream) elif stream: paths.append(stream) if self.explicit_outputs: last_output_id = len(outputs) for output, input, path in self.find_explicit_outputs( last_output_id ): outputs.append(output) paths.append(path) if input is not None: if input.id not in inputs: # pragma: no cover raise RuntimeError('Inconsistent input name.') inputs[input.id] = input if unmodified: raise errors.UnmodifiedOutputs(repo, unmodified) if not no_output and not paths: raise errors.OutputsNotFound(repo, inputs.values()) if client.has_external_storage: client.track_paths_in_storage(*paths) tool.inputs = list(inputs.values()) tool.outputs = outputs # Requirement detection can be done anytime. from .process_requirements import InitialWorkDirRequirement, \ InlineJavascriptRequirement initial_work_dir_requirement = InitialWorkDirRequirement.from_tool( tool, existing_directories=existing_directories, working_dir=self.working_dir ) if initial_work_dir_requirement: tool.requirements.extend([ InlineJavascriptRequirement(), initial_work_dir_requirement, ]) results = pm.hook.cmdline_tool_annotations(tool=tool) tool.annotations = [a for r in results for a in r]
[docs] @command_line.validator def validate_command_line(self, attribute, value): """Check the command line structure.""" if not value: raise errors.UsageError('Command line can not be empty.')
[docs] @directory.validator def validate_path(self, attribute, value): """Path must exists.""" if not value.exists(): raise errors.UsageError('Directory must exist.')
[docs] def file_candidate(self, candidate, ignore=None): """Return a path instance if it exists in current directory.""" if ignore and candidate in ignore: return candidate = Path(candidate) if not candidate.is_absolute(): candidate = self.directory / candidate if candidate.exists(): return candidate.resolve()
[docs] def split_command_and_args(self): """Return tuple with command and args from command line arguments.""" if self.file_candidate(self.command_line[0]): return [], list(self.command_line) cmd = [self.command_line[0]] args = list(self.command_line[1:]) if len(args) < 2: # only guess subcommand for more arguments return cmd, args while args and re.match(self._RE_SUBCOMMAND, args[0]) \ and not self.file_candidate(args[0]): cmd.append(args.pop(0)) return cmd, args
[docs] def guess_type(self, value, ignore_filenames=None): """Return new value and CWL parameter type.""" candidate = self.file_candidate(value, ignore=ignore_filenames) if candidate: try: if candidate.is_dir(): return Directory(path=candidate), 'Directory', None return File(path=candidate), 'File', None except ValueError: # The candidate points to a file outside the working # directory # TODO suggest that the file should be imported to the repo pass try: value = int(value) return value, 'int', None except ValueError: pass if len(value) > 1 and ',' in value: return value.split(','), 'string[]', ',' return value, 'string', None
[docs] def guess_inputs(self, *arguments): """Yield command input parameters and command line bindings.""" position = 0 prefix = None output_streams = { getattr(self, stream_name) for stream_name in ('stdout', 'stderr') } for index, argument in enumerate(arguments): itemSeparator = None if prefix: if argument.startswith('-'): position += 1 yield CommandLineBinding( position=position, valueFrom=prefix, ) prefix = None if argument.startswith('--'): if '=' in argument: prefix, default = argument.split('=', 1) prefix += '=' default, type, itemSeparator = self.guess_type( default, ignore_filenames=output_streams ) # TODO can be output position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=False, ) ) prefix = None else: prefix = argument elif argument.startswith('-'): if len(argument) > 2: if '=' in argument: prefix, default = argument.split('=', 1) prefix += '=' default, type, itemSeparator = self.guess_type( default, ignore_filenames=output_streams ) else: # possibly a flag with value prefix = argument[0:2] default, type, itemSeparator = self.guess_type( argument[2:], ignore_filenames=output_streams ) position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=not bool(argument[2:]), ) ) prefix = None else: prefix = argument else: default, type, itemSeparator = self.guess_type( argument, ignore_filenames=output_streams ) # TODO can be output # TODO there might be an array position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, ) ) prefix = None if prefix: position += 1 yield CommandLineBinding( position=position, valueFrom=prefix, )
[docs] def guess_outputs(self, paths): """Yield detected output and changed command input parameter.""" # TODO what to do with duplicate paths & inputs with same defauts paths = list(paths) tree = DirectoryTree.from_list(paths) input_candidates = {} conflicting_paths = {} for index, input in enumerate(self.inputs): # Convert input defaults to paths relative to working directory. if input.type not in PATH_OBJECTS: try: path = self.directory / str(input.default) input_path = path.resolve().relative_to(self.working_dir) except FileNotFoundError: continue else: input_path = input.default.path.relative_to(self.working_dir) if input_path.is_dir() and tree.get(input_path): # The directory might exist before running the script subpaths = { str(input_path / path) for path in tree.get(input_path, default=[]) } if input_path.resolve() not in self.explicit_outputs: content = { str(path) for path in input_path.rglob('*') if not path.is_dir() and path.name != '.gitkeep' } extra_paths = content - subpaths if extra_paths: raise errors.InvalidOutputPath( 'The output directory "{0}" is not empty. \n\n' 'Delete existing files before running the ' 'command:' '\n (use "git rm <file>..." to remove them ' 'first)' '\n\n'.format(input_path) + '\n'.join( '\t' + click.style(path, fg='yellow') for path in extra_paths ) + '\n\n' 'Once you have removed files that should be used ' 'as outputs,\n' 'you can safely rerun the previous command.' ) # Remove files from the input directory paths = [path for path in paths if path not in subpaths] # Include input path in the paths to check paths.append(str(input_path)) input_candidates[str(input_path)] = input elif input.type not in PATH_OBJECTS: # Input need to be changed if an output is detected input_candidates[str(input_path)] = input else: # Names that can not be outputs because they are already inputs conflicting_paths[str(input_path)] = (index, input) streams = { path for path in (getattr(self, name) for name in ('stdout', 'stderr')) if path is not None } # TODO group by a common prefix for position, path in enumerate(paths): if Path(path).resolve() in self.explicit_outputs: del paths[position] for position, path in enumerate(paths): candidate = self.file_candidate(self.working_dir / path) if candidate is None: raise ValueError('Path "{0}" does not exist.'.format(path)) glob = str(candidate.relative_to(self.working_dir)) if glob in streams: continue new_input = None if glob in conflicting_paths: # it means that it is rewriting a file index, input = conflicting_paths[glob] new_input = attr.evolve(input, type='string', default=glob) input_candidates[glob] = new_input del conflicting_paths[glob] # TODO add warning ('Output already exists in inputs.') candidate_type = 'Directory' if candidate.is_dir() else 'File' if glob in input_candidates: input = input_candidates[glob] if new_input is None: new_input = input_candidates[glob] = attr.evolve( input, type='string', default=glob ) yield ( CommandOutputParameter( id='output_{0}'.format(position), type=candidate_type, outputBinding=dict( glob='$(inputs.{0})'.format(input.id), ), ), new_input, glob ) else: yield ( CommandOutputParameter( id='output_{0}'.format(position), type=candidate_type, outputBinding=dict(glob=glob, ), ), None, glob )
[docs] def find_explicit_inputs(self): """Yield explicit inputs and command line input bindings if any.""" input_paths = [ input.default.path for input in self.inputs if input.type in PATH_OBJECTS ] input_id = len(self.inputs) + len(self.arguments) for explicit_input in self.explicit_inputs: if explicit_input in input_paths: continue try: explicit_input.relative_to(self.working_dir) except ValueError: raise errors.InvalidInputPath( 'The input file or directory is not in the repository.' '\n\n\t' + click.style(str(explicit_input), fg='yellow') + '\n\n' ) if self.file_candidate(explicit_input) is None: raise errors.InvalidInputPath( 'The input file or directory does not exist.' '\n\n\t' + click.style(str(explicit_input), fg='yellow') + '\n\n' ) input_id += 1 default, type, _ = self.guess_type(explicit_input) # Explicit inputs are either File or Directory assert type in PATH_OBJECTS # The inputBinging is None because these inputs won't # appear on command-line yield CommandInputParameter( id='input_{0}'.format(input_id), type=type, default=default, inputBinding=None )
[docs] def find_explicit_outputs(self, starting_output_id): """Yield explicit output and changed command input parameter.""" inputs = { str(i.default.path.relative_to(self.working_dir)): i for i in self.inputs if i.type in PATH_OBJECTS } output_id = starting_output_id for path in self.explicit_outputs: if self.file_candidate(path) is None: raise errors.InvalidOutputPath( 'The output file or directory does not exist.' '\n\n\t' + click.style(str(path), fg='yellow') + '\n\n' ) output_path = str(path.relative_to(self.working_dir)) type = 'Directory' if path.is_dir() else 'File' if output_path in inputs: # change input type to note that it is also an output input = inputs[output_path] input = attr.evolve(input, type='string', default=output_path) yield ( CommandOutputParameter( id='output_{0}'.format(output_id), type=type, outputBinding=dict( glob='$(inputs.{0})'.format(input.id) ) ), input, output_path ) else: yield ( CommandOutputParameter( id='output_{0}'.format(output_id), type=type, outputBinding=dict(glob=str(output_path)) ), None, output_path ) output_id += 1
[docs] def delete_indirect_files_list(self): """Remove indirect inputs and outputs list.""" try: path = str(self.working_dir / INDIRECT_INPUTS_LIST) os.remove(path) except FileNotFoundError: pass try: path = str(self.working_dir / INDIRECT_OUTPUTS_LIST) os.remove(path) except FileNotFoundError: pass
[docs] def add_indirect_inputs(self): """Read indirect inputs list and add them to explicit inputs.""" for indirect_input in self.read_files_list(INDIRECT_INPUTS_LIST): # treat indirect inputs like explicit inputs self.explicit_inputs.append(indirect_input) # add new explicit inputs (if any) to inputs for input in self.find_explicit_inputs(): self.inputs.append(input)
[docs] def add_indirect_outputs(self): """Read indirect outputs list and add them to explicit outputs.""" for indirect_output in self.read_files_list(INDIRECT_OUTPUTS_LIST): # treat indirect outputs like explicit outputs self.explicit_outputs.append(indirect_output)
[docs] def read_files_list(self, files_list): """Read files list where each line is a filepath.""" try: path = str(files_list) with open(path, 'r') as f: for line in f: line = line.strip() if line: yield Path(line).resolve() except FileNotFoundError: return