# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zenodo API integration."""
import json
import os
import posixpath
import urllib
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlparse
from renku.core import errors
from renku.core.dataset.providers.api import (
ExporterApi,
ExportProviderInterface,
ImportProviderInterface,
ProviderApi,
ProviderPriority,
)
from renku.core.dataset.providers.repository import RepositoryImporter, make_request
from renku.core.util import communication
from renku.core.util.doi import is_doi
from renku.core.util.requests import get_redirect_url
from renku.core.util.urls import remove_credentials
from renku.domain_model.project_context import project_context
if TYPE_CHECKING:
from renku.core.dataset.providers.models import ProviderDataset, ProviderParameter
from renku.domain_model.dataset import Dataset, DatasetTag
ZENODO_BASE_URL = "https://zenodo.org"
ZENODO_SANDBOX_URL = "https://sandbox.zenodo.org/"
ZENODO_API_PATH = "api"
ZENODO_DEPOSIT_PATH = "deposit"
ZENODO_PUBLISH_PATH = "record"
ZENODO_PUBLISH_ACTION_PATH = "depositions/{0}/actions/publish"
ZENODO_METADATA_URL = "depositions/{0}"
ZENODO_FILES_URL = "depositions/{0}/files"
ZENODO_NEW_DEPOSIT_URL = "depositions"
[docs]class ZenodoProvider(ProviderApi, ExportProviderInterface, ImportProviderInterface):
"""Zenodo registry API provider."""
priority = ProviderPriority.HIGH
name = "Zenodo"
is_remote = True
def __init__(self, uri: str, is_doi: bool = False):
super().__init__(uri=uri)
self.is_doi = is_doi
self._publish: bool = False
[docs] @staticmethod
def supports(uri):
"""Whether this provider supports a given URI."""
if "zenodo" in uri.lower():
return True
return False
[docs] @staticmethod
def get_record_id(uri):
"""Extract record id from URI."""
parts = urlparse(uri).path.split("/")
parts = [p for p in parts if p.isdigit()]
return parts[-1]
[docs] @staticmethod
def get_export_parameters() -> List["ProviderParameter"]:
"""Returns parameters that can be set for export."""
from renku.core.dataset.providers.models import ProviderParameter
return [ProviderParameter("publish", help="Publish the exported dataset.", is_flag=True)]
[docs] def get_importer(self, **kwargs) -> "ZenodoImporter":
"""Get importer for a record from Zenodo."""
from renku.core.dataset.providers.doi import DOIProvider
uri = self.uri
if self.is_doi:
# NOTE: Resolve the DOI and make a record for the retrieved record id
doi = DOIProvider(uri=uri).get_importer()
uri = doi.uri
response = _make_request(uri)
return ZenodoImporter(uri=uri, original_uri=self.uri, json=response.json())
[docs] def get_exporter(
self, dataset: "Dataset", *, tag: Optional["DatasetTag"], publish: bool = False, **kwargs
) -> "ZenodoExporter":
"""Create export manager for given dataset."""
self._publish = publish
return ZenodoExporter(dataset=dataset, publish=self._publish, tag=tag)
[docs]class ZenodoImporter(RepositoryImporter):
"""Zenodo importer."""
def __init__(self, *, uri: str, original_uri, json: Dict[str, Any]):
super().__init__(uri=uri, original_uri=original_uri)
self._jsonld: Optional[dict] = None
self._json = json
metadata = self._json.pop("metadata", {})
self._json["metadata"] = ZenodoMetadataSerializer.from_metadata(metadata) if metadata is not None else None
record_id = self._json.pop("record_id", None) or self._json.pop("recid", None)
self._json["record_id"] = str(record_id) if record_id is not None else None
# NOTE: Make sure that these properties have a default value
self._json["links"] = self._json.pop("links", {})
self._json["files"] = self._json.pop("files", [])
@property
def version(self):
"""Get record version."""
return self._json["metadata"].version
@property
def latest_uri(self):
"""Get URI of latest version."""
return get_redirect_url(self._json["links"].get("latest"))
[docs] def is_latest_version(self):
"""Check if this record is the latest version."""
return ZenodoProvider.get_record_id(self.latest_uri) == self._json["record_id"]
[docs] def get_jsonld(self):
"""Get record metadata as jsonld."""
response = _make_request(self._uri, accept="application/ld+json")
self._jsonld = response.json()
if self._jsonld is not None and "image" in self._jsonld and isinstance(self._jsonld["image"], str):
self._jsonld["image"] = {
"@id": self._jsonld["image"],
"@type": "ImageObject",
"position": 1,
"contentUrl": self._jsonld["image"],
}
return self._jsonld
[docs] def get_files(self):
"""Get Zenodo files metadata as ``ZenodoFile``."""
if not self._json["files"]:
raise LookupError("no files have been found - deposit is empty or protected")
return [ZenodoFileSerializer(**file) for file in self._json["files"]]
[docs] def fetch_provider_dataset(self) -> "ProviderDataset":
"""Deserialize a `Dataset`."""
from marshmallow import pre_load
from renku.command.schema.agent import PersonSchema
from renku.core.dataset.providers.models import ProviderDataset, ProviderDatasetFile, ProviderDatasetSchema
from renku.domain_model.dataset import Url, generate_default_slug
class ZenodoDatasetSchema(ProviderDatasetSchema):
"""Schema for Zenodo datasets."""
@pre_load
def fix_data(self, data, **_):
"""Fix data that is received from Zenodo."""
# Fix context
context = data.get("@context")
if context and isinstance(context, str):
if not context.endswith("/"):
context = f"{context}/"
if context == "https://schema.org/":
context = "http://schema.org/"
data["@context"] = {"@base": context, "@vocab": context}
# Add type to creators
creators = data.get("creator", [])
for c in creators:
c["@type"] = [str(t) for t in PersonSchema.opts.rdf_type]
# Fix license to be a string
license = data.get("license")
if license and isinstance(license, dict):
data["license"] = license.get("url", "")
# fix keywords to be a list
keywords = data.get("keywords")
if keywords and isinstance(keywords, str):
data["keywords"] = [k.strip() for k in keywords.split(",")]
# Delete existing isPartOf
data.pop("isPartOf", None)
data.pop("sameAs", None)
return data
files = self.get_files()
metadata = self.get_jsonld()
dataset = ProviderDataset.from_jsonld(metadata, schema_class=ZenodoDatasetSchema)
dataset.slug = generate_default_slug(name=dataset.name or "", version=dataset.version)
dataset.same_as = Url(url_id=remove_credentials(self.original_uri))
if is_doi(dataset.identifier):
dataset.same_as = Url(url_str=urllib.parse.urljoin("https://doi.org", dataset.identifier))
self._provider_dataset_files = [
ProviderDatasetFile(
source=file.remote_url.geturl(),
filename=Path(file.filename).name,
checksum=file.checksum,
filesize=file.filesize,
filetype=file.type,
path="",
)
for file in files
]
self._provider_dataset = dataset
return self._provider_dataset
[docs]class ZenodoFileSerializer:
"""Zenodo record file."""
def __init__(self, *, id=None, checksum=None, links=None, key=None, size=None, **_):
self.id = id
self.checksum = checksum
self.links = links
self.filename = key
self.filesize = size
@property
def remote_url(self):
"""Get remote URL as ``urllib.ParseResult``."""
return urllib.parse.urlparse(self.links["self"])
@property
def type(self):
"""Get file type."""
return self.filename.split(".")[-1]
[docs]class ZenodoExporter(ExporterApi):
"""Zenodo export manager."""
HEADERS = {
"Content-Type": "application/json",
"Referer": f"https://{os.environ.get('RENKU_DOMAIN', 'zenodo.org')}",
}
def __init__(self, dataset, publish, tag):
super().__init__(dataset)
self._access_token = None
self._publish = publish
self._tag = tag
@property
def zenodo_url(self):
"""Returns correct Zenodo URL based on environment."""
return ZENODO_SANDBOX_URL if "ZENODO_USE_SANDBOX" in os.environ else ZENODO_BASE_URL
[docs] def set_access_token(self, access_token):
"""Set access token."""
self._access_token = access_token
[docs] def get_access_token_url(self):
"""Endpoint for creation of access token."""
return urllib.parse.urlparse("https://zenodo.org/account/settings/applications/tokens/new/").geturl()
@property
def default_params(self):
"""Create request default parameters."""
return {"access_token": self._access_token}
[docs] def dataset_to_request(self):
"""Prepare dataset metadata for request."""
from renku.command.schema.dataset import dump_dataset_as_jsonld
jsonld = dump_dataset_as_jsonld(self.dataset)
jsonld["upload_type"] = "dataset"
return jsonld
[docs] def export(self, **kwargs):
"""Execute entire export process."""
# Step 1. Create new deposition
deposition = ZenodoDeposition(exporter=self)
# Step 2. Attach metadata to deposition
deposition.attach_metadata(self.dataset, self._tag)
# Step 3. Upload all files to created deposition
with communication.progress("Uploading files ...", total=len(self.dataset.files)) as progressbar:
for file in self.dataset.files:
filepath = project_context.repository.copy_content_to_file(
path=file.entity.path, checksum=file.entity.checksum
)
deposition.upload_file(filepath, path_in_repo=file.entity.path)
progressbar.update()
# Step 4. Publish newly created deposition
if self._publish:
deposition.publish_deposition()
return deposition.published_at
return deposition.deposit_at
[docs]class ZenodoDeposition:
"""Zenodo record for a deposit."""
def __init__(self, exporter, id=None):
self.exporter = exporter
self.id = id
response = self.new_deposition()
self.id = response.json()["id"]
@property
def publish_url(self):
"""Returns publish URL."""
url = urllib.parse.urljoin(
self.exporter.zenodo_url,
posixpath.join(ZENODO_API_PATH, ZENODO_DEPOSIT_PATH, ZENODO_PUBLISH_ACTION_PATH.format(self.id)),
)
return url
@property
def attach_metadata_url(self):
"""Return URL for attaching metadata."""
url = urllib.parse.urljoin(
self.exporter.zenodo_url,
posixpath.join(ZENODO_API_PATH, ZENODO_DEPOSIT_PATH, ZENODO_METADATA_URL.format(self.id)),
)
return url
@property
def upload_file_url(self):
"""Return URL for uploading file."""
url = urllib.parse.urljoin(
self.exporter.zenodo_url,
posixpath.join(ZENODO_API_PATH, ZENODO_DEPOSIT_PATH, ZENODO_FILES_URL.format(self.id)),
)
return url
@property
def new_deposit_url(self):
"""Return URL for creating new deposit."""
url = urllib.parse.urljoin(
self.exporter.zenodo_url,
posixpath.join(ZENODO_API_PATH, ZENODO_DEPOSIT_PATH, ZENODO_NEW_DEPOSIT_URL),
)
return url
@property
def published_at(self):
"""Return published at URL."""
url = urllib.parse.urljoin(self.exporter.zenodo_url, posixpath.join(ZENODO_PUBLISH_PATH, str(self.id)))
return url
@property
def deposit_at(self):
"""Return deposit at URL."""
url = urllib.parse.urljoin(self.exporter.zenodo_url, posixpath.join(ZENODO_DEPOSIT_PATH, str(self.id)))
return url
[docs] def new_deposition(self):
"""Create new deposition on Zenodo."""
from renku.core.util import requests
response = requests.post(
url=self.new_deposit_url, params=self.exporter.default_params, json={}, headers=self.exporter.HEADERS
)
self._check_response(response)
return response
[docs] def upload_file(self, filepath, path_in_repo):
"""Upload and attach a file to existing deposition on Zenodo."""
from renku.core.util import requests
request_payload = {"filename": Path(path_in_repo).name}
file = {"file": (Path(path_in_repo).name, open(str(filepath), "rb"))}
response = requests.post(
url=self.upload_file_url, params=self.exporter.default_params, data=request_payload, files=file
)
self._check_response(response)
return response
[docs] def publish_deposition(self):
"""Publish existing deposition."""
from renku.core.util import requests
response = requests.post(
url=self.publish_url, params=self.exporter.default_params, headers=self.exporter.HEADERS
)
self._check_response(response)
return response
@staticmethod
def _check_response(response):
from renku.core.util import requests
try:
requests.check_response(response=response)
except errors.RequestError:
if response.status_code == 400:
err_response = response.json()
if "errors" in err_response:
messages = [
'"{}" failed with "{}"'.format(err["field"], ", ".join(err["messages"]))
for err in err_response["errors"]
]
elif "message" in err_response:
messages = [err_response["message"]]
else:
messages = [response.text()]
raise errors.ExportError(
"\n" + "\n".join(messages) + "\nSee `renku dataset edit -h` for details on how to edit" " metadata"
)
else:
print(response.status_code)
raise errors.ExportError(response.content)
def _make_request(uri, accept: str = "application/json"):
"""Execute network request."""
record_id = ZenodoProvider.get_record_id(uri)
url = make_records_url(record_id, uri=uri)
return make_request(url=url, accept=accept)
[docs]def make_records_url(record_id, uri: str):
"""Create URL to access record by ID.
Args:
record_id: The id of the record.
Returns:
str: Full URL for the record.
"""
url = ZENODO_SANDBOX_URL if "sandbox.zenodo.org" in uri.lower() else ZENODO_BASE_URL
return urllib.parse.urljoin(url, posixpath.join(ZENODO_API_PATH, "records", record_id))