# -*- coding: utf-8 -*-
#
# Copyright 2018-2020 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Converter for workflows to cwl."""
import os
import tempfile
from collections import defaultdict
from pathlib import Path
from uuid import uuid4
import cwlgen
from renku.core.models.entities import Collection
from renku.core.models.workflow.parameters import CommandOutput
class CommandLineTool(cwlgen.CommandLineTool):
"""CommandLineTool that supports empty outputs."""
def get_dict(self):
"""Set outputs to empty if not set."""
d = super(CommandLineTool, self).get_dict()
if "outputs" not in d:
d["outputs"] = []
return d
class WorkflowStep(cwlgen.WorkflowStep):
"""WorkflowStep that supports empty outputs."""
def get_dict(self):
"""Set out to empty if not set."""
d = super(WorkflowStep, self).get_dict()
if "out" not in d:
d["out"] = []
return d
def _get_argument_type(value):
"""Get the type of a command line argument."""
type_ = "string"
try:
value = int(value)
type_ = "int"
except ValueError:
pass
return value, type_
def _recurse_subprocesses(run, index):
"""Recursively get actual steps (not ``WorkflowRun``) for execution."""
if not run.subprocesses:
return [(index, run)], index + 1
processes = []
for s in sorted(run.subprocesses, key=lambda x: x.index):
result, index = _recurse_subprocesses(s.process, index)
processes.extend(result)
return processes, index
[docs]class CWLConverter(object):
"""Converts a ``Run`` to cwl file(s)."""
[docs] @staticmethod
def convert(run, client, path=None):
"""Convert the workflow to one ore more .cwl files."""
basedir = client.path
filename = None
if path:
if os.path.isdir(path):
tmpdir = path
else:
tmpdir = path.parent
filename = path
else:
tmpdir = tempfile.mkdtemp()
if hasattr(run, "subprocesses") and run.subprocesses:
return CWLConverter._convert_composite(run, tmpdir, basedir, filename=filename)
else:
return CWLConverter._convert_step(run, tmpdir, basedir, filename=filename)
@staticmethod
def _convert_composite(run, tmpdir, basedir, filename=None):
"""Converts a workflow made up of several steps."""
inputs = {}
arguments = {}
outputs = {}
consumed_outputs = set()
steps = []
input_index = 1
argument_index = 1
subprocesses, _ = _recurse_subprocesses(run, 1)
# preprocess to add dummy outputs in case of output directories
previous_output_dirs = defaultdict(list)
for _, subprocess in subprocesses:
for input in subprocess.inputs:
entity = input.consumes
key = (entity.commit.hexsha, entity.path)
if key not in previous_output_dirs:
continue
for previous_process in previous_output_dirs[key]:
previous_process.outputs.append(CommandOutput(produces=entity, create_folder=False))
for output in subprocess.outputs:
entity = output.produces
if not isinstance(entity, Collection):
continue
for e in entity.entities:
if e.commit.hexsha != entity.commit.hexsha:
continue
key = (e.commit.hexsha, e.path)
previous_output_dirs[key].append(subprocess)
# Convert workflow steps
for i, subprocess in subprocesses:
tool, path = CWLConverter._convert_step(subprocess, tmpdir, basedir)
step = WorkflowStep("step_{}".format(i), path)
for input in subprocess.inputs:
input_path = input.consumes.path
sanitized_id = input.sanitized_id.replace("/", "_")
if input_path in inputs:
# already used as top-level input elsewhere, reuse
step.inputs.append(cwlgen.WorkflowStepInput(sanitized_id, source=inputs[input_path]))
elif input_path in outputs:
# output of a previous step, refer to it
consumed_outputs.add(outputs[input_path][0])
step.inputs.append(
cwlgen.WorkflowStepInput(
sanitized_id, source="{}/{}".format(outputs[input_path][1], outputs[input_path][0])
)
)
else:
# input isn't output and doesn't exist yet, add new
inputs[input_path] = "input_{}".format(input_index)
step.inputs.append(cwlgen.WorkflowStepInput(sanitized_id, source=inputs[input_path]))
input_index += 1
for argument in subprocess.arguments:
argument_id = "argument_{}".format(argument_index)
arguments[argument_id] = argument.value
step.inputs.append(
cwlgen.WorkflowStepInput(argument.sanitized_id.replace("/", "_"), source=argument_id)
)
argument_index += 1
for output in subprocess.outputs:
sanitized_id = output.sanitized_id.replace("/", "_")
if output.mapped_to:
sanitized_id = "output_{}".format(output.mapped_to.stream_type)
outputs[output.produces.path] = (sanitized_id, step.id)
step.out.append(cwlgen.WorkflowStepOutput(sanitized_id))
steps.append(step)
workflow_object = cwlgen.Workflow(str(uuid4()), cwl_version="v1.0")
workflow_object.hints = []
workflow_object.requirements = []
# check types of paths and add as top level inputs/outputs
for path, id_ in inputs.items():
type_ = "Directory" if os.path.isdir(path) else "File"
workflow_object.inputs.append(
cwlgen.InputParameter(
id_,
param_type=type_,
default={"path": os.path.abspath(os.path.join(basedir, path)), "class": type_},
)
)
for id_, value in arguments.items():
value, type_ = _get_argument_type(value)
workflow_object.inputs.append(cwlgen.InputParameter(id_, param_type=type_, default=value))
for index, (path, (id_, step_id)) in enumerate(outputs.items(), 1):
type_ = "Directory" if os.path.isdir(path) else "File"
workflow_object.outputs.append(
cwlgen.WorkflowOutputParameter(
"output_{}".format(index), output_source="{}/{}".format(step_id, id_), param_type=type_
)
)
workflow_object.steps.extend(steps)
if not filename:
filename = "parent_{}.cwl".format(uuid4())
path = os.path.join(tmpdir, filename)
workflow_object.export(path)
return workflow_object, path
@staticmethod
def _convert_step(step, tmpdir, basedir, filename=None):
"""Converts a single workflow step to a cwl file."""
stdin, stdout, stderr = None, None, None
inputs = list(step.inputs)
for output in step.outputs:
if not output.mapped_to:
continue
if output.mapped_to.stream_type == "stderr":
stderr = output.produces.path
if output.mapped_to.stream_type == "stdout":
stdout = output.produces.path
tool_object = CommandLineTool(
tool_id=str(uuid4()),
base_command=step.command.split(" "),
stdin=stdin,
stderr=stderr,
stdout=stdout,
cwl_version="v1.0",
)
workdir_req = cwlgen.InitialWorkDirRequirement([])
jsrequirement = False
dirents = []
for output in step.outputs:
path = output.produces.path
if not os.path.isdir(path):
path = str(Path(path).parent)
if path != "." and path not in dirents and output.create_folder:
# workflow needs to create subdirectory for output file,
# if the directory was not already present
workdir_req.listing.append(
cwlgen.InitialWorkDirRequirement.Dirent(
entry='$({"listing": [], "class": "Directory"})', entryname=path, writable=True
)
)
dirents.append(path)
jsrequirement = True
outp, arg = CWLConverter._convert_output(output)
tool_object.outputs.append(outp)
if arg:
tool_object.inputs.append(arg)
for input_ in inputs:
tool_input = CWLConverter._convert_input(input_, basedir)
workdir_req.listing.append(
cwlgen.InitialWorkDirRequirement.Dirent(
entry="$(inputs.{})".format(tool_input.id), entryname=input_.consumes.path, writable=False
)
)
tool_object.inputs.append(tool_input)
if input_.mapped_to:
tool_object.stdin = "$(inputs.{}.path)".format(tool_input.id)
jsrequirement = True
for argument in step.arguments:
tool_object.inputs.append(CWLConverter._convert_argument(argument))
if workdir_req.listing:
tool_object.requirements.append(workdir_req)
if jsrequirement:
tool_object.requirements.append(cwlgen.InlineJavascriptRequirement())
if not filename:
filename = "{}.cwl".format(uuid4())
path = os.path.join(tmpdir, filename)
tool_object.export(path)
return tool_object, path
@staticmethod
def _convert_input(input, basedir):
"""Converts an input to a CWL input."""
entity = input.consumes
type_ = "Directory" if isinstance(entity, Collection) else "File"
sanitized_id = input.sanitized_id
sanitized_id = sanitized_id.replace("/", "_")
if input.mapped_to:
sanitized_id = "input_stdin"
separate = None
prefix = None
if input.prefix:
prefix = input.prefix
separate = False
if prefix.endswith(" "):
prefix = prefix[:-1]
separate = True
return cwlgen.CommandInputParameter(
sanitized_id,
param_type=type_,
input_binding=cwlgen.CommandLineBinding(position=input.position, prefix=prefix, separate=separate),
default={"path": os.path.abspath(os.path.join(basedir, entity.path)), "class": type_},
)
@staticmethod
def _convert_output(output):
"""Converts an output to a CWL output."""
if output.mapped_to:
return (
cwlgen.CommandOutputParameter(
"output_{}".format(output.mapped_to.stream_type),
param_type=output.mapped_to.stream_type,
streamable=False,
),
None,
)
entity = output.produces
type_ = "Directory" if isinstance(entity, Collection) else "File"
sanitized_id = output.sanitized_id.replace("/", "_")
if output.position:
# output is specified as a parameter, create an input as well
separate = None
prefix = None
if output.prefix:
prefix = output.prefix
separate = False
if prefix.endswith(" "):
prefix = prefix[:-1]
separate = True
arg = cwlgen.CommandInputParameter(
"{}_arg".format(sanitized_id),
param_type="string",
input_binding=cwlgen.CommandLineBinding(position=output.position, prefix=prefix, separate=separate),
default=entity.path,
)
outp = cwlgen.CommandOutputParameter(
sanitized_id,
param_type=type_,
output_binding=cwlgen.CommandOutputBinding(glob="$(inputs.{})".format(arg.id)),
)
return outp, arg
return (
cwlgen.CommandOutputParameter(
sanitized_id, param_type=type_, output_binding=cwlgen.CommandOutputBinding(glob=entity.path)
),
None,
)
@staticmethod
def _convert_argument(argument):
"""Converts an argument to a CWL input."""
value, type_ = _get_argument_type(argument.value)
separate = None
prefix = None
if argument.prefix:
prefix = argument.prefix
separate = False
if prefix.endswith(" "):
prefix = prefix[:-1]
separate = True
return cwlgen.CommandInputParameter(
argument.sanitized_id.replace("/", "_"),
param_type=type_,
input_binding=cwlgen.CommandLineBinding(position=argument.position, prefix=prefix, separate=separate),
default=value,
)