# -*- coding: utf-8 -*-
#
# Copyright 2018-2020 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represents a workflow template."""
import os
import pathlib
import urllib.parse
import uuid
import attr
from marshmallow import EXCLUDE
from renku.core.models.calamus import JsonLDSchema, Nested, fields, rdfs, renku
from renku.core.models.entities import CollectionSchema, EntitySchema
[docs]@attr.s(cmp=False,)
class MappedIOStream(object):
"""Represents an IO stream (stdin, stdout, stderr)."""
client = attr.ib(default=None, kw_only=True)
_id = attr.ib(default=None, kw_only=True)
_label = attr.ib(default=None, kw_only=True)
STREAMS = ["stdin", "stdout", "stderr"]
stream_type = attr.ib(type=str, kw_only=True,)
[docs] def default_id(self):
"""Generate an id for a mapped stream."""
host = "localhost"
if self.client:
host = self.client.remote.get("host") or host
host = os.environ.get("RENKU_DOMAIN") or host
return urllib.parse.urljoin(
"https://{host}".format(host=host), pathlib.posixpath.join("/iostreams", self.stream_type)
)
[docs] def default_label(self):
"""Set default label."""
return 'Stream mapping for stream "{}"'.format(self.stream_type)
def __attrs_post_init__(self):
"""Post-init hook."""
if not self._id:
self._id = self.default_id()
if not self._label:
self._label = self.default_label()
[docs] @classmethod
def from_jsonld(cls, data):
"""Create an instance from JSON-LD data."""
if isinstance(data, cls):
return data
if not isinstance(data, dict):
raise ValueError(data)
return MappedIOStreamSchema().load(data)
[docs] def as_jsonld(self):
"""Create JSON-LD."""
return MappedIOStreamSchema().dump(self)
[docs]@attr.s(cmp=False,)
class CommandParameter(object):
"""Represents a parameter for an execution template."""
_id = attr.ib(default=None, kw_only=True)
_label = attr.ib(default=None, kw_only=True)
position = attr.ib(default=None, type=int, kw_only=True,)
prefix = attr.ib(default=None, type=str, kw_only=True,)
@property
def sanitized_id(self):
"""Return ``_id`` sanitized for use in non-jsonld contexts."""
if "/steps/" in self._id:
return "/".join(self._id.split("/")[-4:])
return "/".join(self._id.split("/")[-2:])
[docs]@attr.s(cmp=False,)
class CommandArgument(CommandParameter):
"""An argument to a command that is neither input nor output."""
value = attr.ib(default=None, type=str, kw_only=True,)
[docs] @staticmethod
def generate_id(run_id, position=None):
"""Generate an id for an argument."""
if position:
id_ = str(position)
else:
id_ = uuid.uuid4().hex
return "{}/arguments/{}".format(run_id, id_)
[docs] def default_label(self):
"""Set default label."""
return 'Command Argument "{}"'.format(self.value)
[docs] def to_argv(self):
"""String representation (sames as cmd argument)."""
if self.prefix:
if self.prefix.endswith(" "):
return [self.prefix[:-1], self.value]
return ["{}{}".format(self.prefix, self.value)]
return [self.value]
def __attrs_post_init__(self):
"""Post-init hook."""
if not self._label:
self._label = self.default_label()
[docs] @classmethod
def from_jsonld(cls, data):
"""Create an instance from JSON-LD data."""
if isinstance(data, cls):
return data
if not isinstance(data, dict):
raise ValueError(data)
return CommandArgumentSchema().load(data)
[docs] def as_jsonld(self):
"""Create JSON-LD."""
return CommandArgumentSchema().dump(self)
[docs]@attr.s(cmp=False,)
class CommandOutput(CommandParameter):
"""An output of a command."""
create_folder = attr.ib(default=False, kw_only=True, type=bool)
produces = attr.ib(kw_only=True)
mapped_to = attr.ib(default=None, kw_only=True)
[docs] @staticmethod
def generate_id(run_id, position=None):
"""Generate an id for an argument."""
if position:
id_ = str(position)
else:
id_ = uuid.uuid4().hex
return "{}/outputs/{}".format(run_id, id_)
[docs] def default_label(self):
"""Set default label."""
return 'Command Output "{}"'.format(self.produces.path)
[docs] def to_argv(self):
"""String representation (sames as cmd argument)."""
if self.prefix:
if self.prefix.endswith(" "):
return [self.prefix[:-1], self.produces.path]
return ["{}{}".format(self.prefix, self.produces.path)]
return [self.produces.path]
[docs] def to_stream_repr(self):
"""Input stream representation."""
if not self.mapped_to:
return ""
if self.mapped_to.stream_type == "stdout":
return " > {}".format(self.produces.path)
return " 2> {}".format(self.produces.path)
def __attrs_post_init__(self):
"""Post-init hook."""
if not self._label:
self._label = self.default_label()
[docs] @classmethod
def from_jsonld(cls, data):
"""Create an instance from JSON-LD data."""
if isinstance(data, cls):
return data
if not isinstance(data, dict):
raise ValueError(data)
return CommandOutputSchema().load(data)
[docs] def as_jsonld(self):
"""Create JSON-LD."""
return CommandOutputSchema().dump(self)
[docs]class MappedIOStreamSchema(JsonLDSchema):
"""MappedIOStream schema."""
_id = fields.Id(init_name="id")
_label = fields.String(rdfs.label, init_name="label")
stream_type = fields.String(renku.streamType)
[docs]class CommandParameterSchema(JsonLDSchema):
"""CommandParameter schema."""
_id = fields.Id(init_name="id")
_label = fields.String(rdfs.label, init_name="label")
position = fields.Integer(renku.position, missing=None)
prefix = fields.String(renku.prefix, missing=None)
[docs]class CommandArgumentSchema(CommandParameterSchema):
"""CommandArgument schema."""
value = fields.String(renku.value)
[docs]class CommandOutputSchema(CommandParameterSchema):
"""CommandArgument schema."""
create_folder = fields.Boolean(renku.createFolder)
produces = Nested(renku.produces, [EntitySchema, CollectionSchema])
mapped_to = Nested(renku.mappedTo, MappedIOStreamSchema, missing=None)