Source code for renku.models.cwl.command_line_tool

# -*- coding: utf-8 -*-
#
# Copyright 2018 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent a ``CommandLineTool`` from the Common Workflow Language."""

import fnmatch
import re
import shlex
from contextlib import contextmanager

import attr
import click

from renku import errors
from renku._compat import Path

from .._datastructures import DirectoryTree
from ._ascwl import CWLClass, mapped
from .parameter import CommandInputParameter, CommandLineBinding, \
    CommandOutputParameter
from .process import Process
from .types import PATH_OBJECTS, Directory, File


[docs]def convert_arguments(value): """Convert arguments from various input formats.""" if isinstance(value, (list, tuple)): return [ CommandLineBinding(**item) if isinstance(item, dict) else item for item in value ] return shlex.split(value)
[docs]@attr.s class CommandLineTool(Process, CWLClass): """Represent a command line tool.""" # specialize inputs and outputs with Command{Input,Output}Parameter baseCommand = attr.ib( default='', validator=lambda self, attr, cmd: bool(cmd), ) # str or list(str) -> shutil.split() arguments = attr.ib( default=attr.Factory(list), converter=convert_arguments, ) # list(string, Expression, CommandLineBinding) stdin = attr.ib(default=None) stdout = attr.ib(default=None) stderr = attr.ib(default=None) inputs = mapped(CommandInputParameter) outputs = mapped(CommandOutputParameter) successCodes = attr.ib(default=attr.Factory(list)) # list(int) temporaryFailCodes = attr.ib(default=attr.Factory(list)) # list(int) permanentFailCodes = attr.ib(default=attr.Factory(list)) # list(int) def __str__(self): """Generate a simple representation.""" argv = ' '.join(self.to_argv()) if self.stdin: assert self.stdin.startswith('$(inputs.') input_id = self.stdin.split('.')[1] for input_ in self.inputs: if input_id == input_.id: argv += ' < ' + str(input_.default) break if self.stdout: argv += ' > ' + self.stdout if self.stderr: argv += ' 2> ' + self.stderr return argv
[docs] def create_run(self, **kwargs): """Return an instance of process run.""" from renku.models.provenance import ProcessRun return ProcessRun(**kwargs)
[docs] def get_output_id(self, path): # pragma: no cover """Return an id of the matching path from default values.""" for output in self.outputs: if output.type in {'stdout', 'stderr'}: stream = getattr(self, output.type) if stream == path: return output.id elif output.type in PATH_OBJECTS: glob = output.outputBinding.glob # TODO better support for Expression if glob.startswith('$(inputs.'): input_id = glob[len('$(inputs.'):-1] for input_ in self.inputs: if input_.id == input_id and input_.default == path: return output.id elif fnmatch.fnmatch(path, glob): return output.id
[docs] def to_argv(self, job=None): """Generate arguments for system call.""" if not isinstance(self.baseCommand, list): argv = [self.baseCommand] else: argv = list(self.baseCommand) args = [(a.position, a) for a in self.arguments] for i in self.inputs: if i.inputBinding: args.append((i.inputBinding.position, i)) for p, v in sorted(args): argv.extend(v.to_argv()) return argv
[docs]@attr.s class CommandLineToolFactory(object): """Command Line Tool Factory.""" _RE_SUBCOMMAND = re.compile(r'^[A-Za-z]+(-[A-Za-z]+)?$') command_line = attr.ib( converter=lambda cmd: list(cmd) if isinstance(cmd, (list, tuple)) else shlex.split(cmd), ) directory = attr.ib( default='.', converter=lambda path: Path(path).resolve(), ) working_dir = attr.ib( default='.', converter=lambda path: Path(path).resolve(), ) stdin = attr.ib(default=None) # null, str, Expression stderr = attr.ib(default=None) # null, str, Expression stdout = attr.ib(default=None) # null, str, Expression baseCommand = attr.ib(init=False) arguments = attr.ib(init=False) inputs = attr.ib(init=False) outputs = attr.ib(init=False) successCodes = attr.ib(default=attr.Factory(list)) # list(int) def __attrs_post_init__(self): """Derive basic informations.""" self.baseCommand, detect = self.split_command_and_args() self.arguments = [] self.inputs = [] self.outputs = [] if self.stdin: input_ = next( self.guess_inputs(str(self.working_dir / self.stdin)) ) assert input_.type == 'File' input_ = attr.evolve( input_, id='input_stdin', inputBinding=None, # do not include in tool arguments ) self.inputs.append(input_) self.stdin = '$(inputs.{0}.path)'.format(input_.id) for stream_name in ('stdout', 'stderr'): stream = getattr(self, stream_name) if stream and self.file_candidate(self.working_dir / stream): self.outputs.append( CommandOutputParameter( id='output_{0}'.format(stream_name), type=stream_name, ) ) for input_ in self.guess_inputs(*detect): if isinstance(input_, CommandLineBinding): self.arguments.append(input_) else: self.inputs.append(input_)
[docs] def generate_tool(self): """Return an instance of command line tool.""" return CommandLineTool( stdin=self.stdin, stderr=self.stderr, stdout=self.stdout, baseCommand=self.baseCommand, arguments=self.arguments, inputs=self.inputs, outputs=self.outputs, successCodes=self.successCodes, )
[docs] @contextmanager def watch(self, client, no_output=False, outputs=None): """Watch a Renku repository for changes to detect outputs.""" tool = self.generate_tool() repo = client.repo if outputs: directories = [ output for output in outputs if Path(output).is_dir() ] client.repo.git.rm( *outputs, r=True, force=True, ignore_unmatch=True ) client.repo.index.commit('renku: automatic removal of outputs') for directory in directories: Path(directory).mkdir(parents=True, exist_ok=True) # NOTE consider to use git index instead existing_directories = { str(p.relative_to(client.path)) for p in client.path.glob('**/') } yield tool if repo: # List of all output paths. paths = [] # Keep track of unmodified output files. unmodified = set() # Possible output paths. candidates = set(repo.untracked_files) candidates |= { item.a_path for item in repo.index.diff(None) if not item.deleted_file } from renku.cli._graph import _safe_path candidates = {path for path in candidates if _safe_path(path)} inputs = {input.id: input for input in self.inputs} outputs = list(tool.outputs) for output, input, path in self.guess_outputs(candidates): outputs.append(output) paths.append(path) if input is not None: if input.id not in inputs: # pragma: no cover raise RuntimeError('Inconsistent input name.') inputs[input.id] = input for stream_name in ('stdout', 'stderr'): stream = getattr(self, stream_name) if stream and stream not in candidates: unmodified.add(stream) elif stream: paths.append(stream) if unmodified: raise errors.UnmodifiedOutputs(repo, unmodified) if not no_output and not paths: raise errors.OutputsNotFound(repo, inputs.values()) tool.inputs = list(inputs.values()) tool.outputs = outputs client.track_paths_in_storage(*paths) # Requirement detection can be done anytime. from .process_requirements import InitialWorkDirRequirement, \ InlineJavascriptRequirement initial_work_dir_requirement = InitialWorkDirRequirement.from_tool( tool, existing_directories=existing_directories ) if initial_work_dir_requirement: tool.requirements.extend([ InlineJavascriptRequirement(), initial_work_dir_requirement, ])
[docs] @command_line.validator def validate_command_line(self, attribute, value): """Check the command line structure.""" if not value: raise ValueError('Command line can not be empty.')
[docs] @directory.validator def validate_path(self, attribute, value): """Path must exists.""" if not value.exists(): raise ValueError('Directory must exist.')
[docs] def file_candidate(self, candidate, ignore=None): """Return a path instance if it exists in current directory.""" if ignore and candidate in ignore: return candidate = Path(candidate) if not candidate.is_absolute(): candidate = self.directory / candidate if candidate.exists(): return candidate.resolve()
[docs] def split_command_and_args(self): """Return tuple with command and args from command line arguments.""" cmd = [self.command_line[0]] args = list(self.command_line[1:]) if len(args) < 2: # only guess subcommand for more arguments return cmd, args while args and re.match(self._RE_SUBCOMMAND, args[0]) \ and not self.file_candidate(args[0]): cmd.append(args.pop(0)) return cmd, args
[docs] def guess_type(self, value, ignore_filenames=None): """Return new value and CWL parameter type.""" candidate = self.file_candidate(value, ignore=ignore_filenames) if candidate: try: if candidate.is_dir(): return Directory(path=candidate), 'Directory', None return File(path=candidate), 'File', None except ValueError: # The candidate points to a file outside the working # directory # TODO suggest that the file should be imported to the repo pass try: value = int(value) return value, 'int', None except ValueError: pass if len(value) > 1 and ',' in value: return value.split(','), 'string[]', ',' return value, 'string', None
[docs] def guess_inputs(self, *arguments): """Yield command input parameters and command line bindings.""" position = 0 prefix = None output_streams = { getattr(self, stream_name) for stream_name in ('stdout', 'stderr') } for index, argument in enumerate(arguments): itemSeparator = None if prefix: if argument.startswith('-'): position += 1 yield CommandLineBinding( position=position, valueFrom=prefix, ) prefix = None if argument.startswith('--'): if '=' in argument: prefix, default = argument.split('=', 1) prefix += '=' default, type, itemSeparator = self.guess_type( default, ignore_filenames=output_streams ) # TODO can be output position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=False, ) ) prefix = None else: prefix = argument elif argument.startswith('-'): if len(argument) > 2: if '=' in argument: prefix, default = argument.split('=', 1) prefix += '=' default, type, itemSeparator = self.guess_type( default, ignore_filenames=output_streams ) else: # possibly a flag with value prefix = argument[0:2] default, type, itemSeparator = self.guess_type( argument[2:], ignore_filenames=output_streams ) position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, separate=not bool(argument[2:]), ) ) prefix = None else: prefix = argument else: default, type, itemSeparator = self.guess_type( argument, ignore_filenames=output_streams ) # TODO can be output # TODO there might be an array position += 1 yield CommandInputParameter( id='input_{0}'.format(position), type=type, default=default, inputBinding=dict( position=position, itemSeparator=itemSeparator, prefix=prefix, ) ) prefix = None if prefix: position += 1 yield CommandLineBinding( position=position, valueFrom=prefix, )
[docs] def guess_outputs(self, paths): """Yield detected output and changed command input parameter.""" # TODO what to do with duplicate paths & inputs with same defauts paths = list(paths) tree = DirectoryTree.from_list(paths) input_candidates = {} conflicting_paths = {} for index, input in enumerate(self.inputs): # Convert input defaults to paths relative to working directory. if input.type not in PATH_OBJECTS: try: input_path = (self.directory / str(input.default)).resolve().relative_to( self.working_dir ) except FileNotFoundError: continue else: input_path = input.default.path.relative_to(self.working_dir) if input_path.is_dir() and tree.get(input_path): # The directory might exist before running the script subpaths = { str(input_path / path) for path in tree.get(input_path, default=[]) } content = { str(path) for path in input_path.rglob('*') if not path.is_dir() and path.name != '.gitkeep' } extra_paths = content - subpaths if extra_paths: raise errors.InvalidOutputPath( 'The output directory "{0}" is not empty. \n\n' 'Delete existing files before running the command:' '\n (use "git rm <file>..." to remove them first)' '\n\n'.format(input_path) + '\n'.join( '\t' + click.style(path, fg='yellow') for path in extra_paths ) + '\n\n' 'Once you have removed files that should be used ' 'as outputs,\n' 'you can safely rerun the previous command.' ) # Remove files from the input directory paths = [path for path in paths if path not in subpaths] # Include input path in the paths to check paths.append(str(input_path)) input_candidates[str(input_path)] = input elif input.type not in PATH_OBJECTS: # Input need to be changed if an output is detected input_candidates[str(input_path)] = input else: # Names that can not be outputs because they are already inputs conflicting_paths[str(input_path)] = (index, input) streams = { path for path in (getattr(self, name) for name in ('stdout', 'stderr')) if path is not None } # TODO group by a common prefix for position, path in enumerate(paths): candidate = self.file_candidate(self.working_dir / path) if candidate is None: raise ValueError('Path "{0}" does not exist.'.format(path)) glob = str(candidate.relative_to(self.working_dir)) if glob in streams: continue new_input = None if glob in conflicting_paths: # it means that it is rewriting a file index, input = conflicting_paths[glob] new_input = attr.evolve(input, type='string', default=glob) input_candidates[glob] = new_input del conflicting_paths[glob] # TODO add warning ('Output already exists in inputs.') candidate_type = 'Directory' if candidate.is_dir() else 'File' if glob in input_candidates: input = input_candidates[glob] if new_input is None: new_input = input_candidates[glob] = attr.evolve( input, type='string', default=glob ) yield ( CommandOutputParameter( id='output_{0}'.format(position), type=candidate_type, outputBinding=dict( glob='$(inputs.{0})'.format(input.id), ), ), new_input, glob ) else: yield ( CommandOutputParameter( id='output_{0}'.format(position), type=candidate_type, outputBinding=dict(glob=glob, ), ), None, glob )