Source code for renku.core.dataset.providers.dataverse

# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataverse API integration."""

import json
import posixpath
import re
import urllib
from pathlib import Path
from string import Template
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib import parse as urlparse

from renku.core import errors
from renku.core.config import get_value, set_value
from renku.core.dataset.providers.api import (
    ExporterApi,
    ExportProviderInterface,
    ImportProviderInterface,
    ProviderApi,
    ProviderPriority,
)
from renku.core.dataset.providers.dataverse_metadata_templates import (
    AUTHOR_METADATA_TEMPLATE,
    CONTACT_METADATA_TEMPLATE,
    DATASET_METADATA_TEMPLATE,
    KEYWORDS_METADATA_TEMPLATE,
)
from renku.core.dataset.providers.doi import DOIProvider
from renku.core.dataset.providers.repository import RepositoryImporter, make_request
from renku.core.util import communication
from renku.core.util.datetime8601 import fix_datetime
from renku.core.util.doi import extract_doi, get_doi_url, is_doi
from renku.core.util.urls import remove_credentials
from renku.domain_model.project_context import project_context

if TYPE_CHECKING:
    from renku.core.dataset.providers.models import ProviderDataset, ProviderParameter
    from renku.domain_model.dataset import Dataset, DatasetTag

DATAVERSE_API_PATH = "api/v1"

DATAVERSE_VERSION_API = "info/version"
DATAVERSE_METADATA_API = "datasets/export"
DATAVERSE_VERSIONS_API = "datasets/:persistentId/versions"
DATAVERSE_FILE_API = "access/datafile/:persistentId/"
DATAVERSE_EXPORTER = "schema.org"

DATAVERSE_SUBJECTS = [
    "Agricultural Sciences",
    "Arts and Humanities",
    "Astronomy and Astrophysics",
    "Business and Management",
    "Chemistry",
    "Computer and Information Science",
    "Earth and Environmental Sciences",
    "Engineering",
    "Law",
    "Mathematical Sciences",
    "Medicine, Health and Life Sciences",
    "Physics",
    "Social Sciences",
    "Other",
]


[docs]class DataverseProvider(ProviderApi, ExportProviderInterface, ImportProviderInterface): """Dataverse API provider.""" priority = ProviderPriority.HIGH name = "Dataverse" is_remote = True def __init__(self, uri: str, is_doi: bool = False): super().__init__(uri=uri) self.is_doi = is_doi self._server_url = None self._dataverse_name = None self._publish: bool = False
[docs] @staticmethod def supports(uri): """Check if provider supports a given URI.""" is_doi_ = is_doi(uri) is_dataverse_uri = is_doi_ is None and check_dataverse_uri(uri) is_dataverse_doi = is_doi_ and check_dataverse_doi(is_doi_.group(0)) return is_dataverse_uri or is_dataverse_doi
[docs] @staticmethod def get_export_parameters() -> List["ProviderParameter"]: """Returns parameters that can be set for export.""" from renku.core.dataset.providers.models import ProviderParameter return [ ProviderParameter("dataverse-server", help="Dataverse server URL.", type=str), ProviderParameter("dataverse-name", help="Dataverse name to export to.", type=str), ProviderParameter("publish", help="Publish the exported dataset.", is_flag=True), ]
[docs] @staticmethod def record_id(uri): """Extract record id from URI.""" parsed = urlparse.urlparse(uri) return urlparse.parse_qs(parsed.query)["persistentId"][0]
[docs] def get_importer(self, **kwargs) -> "DataverseImporter": """Get importer for a record from Dataverse. Returns: DataverseImporter: The found record """ def get_export_uri(uri): """Gets a dataverse api export URI from a dataverse entry.""" record_id = DataverseProvider.record_id(uri) return make_records_url(record_id, uri) uri = self.uri if self.is_doi: doi = DOIProvider(uri=uri).get_importer() uri = doi.uri uri = get_export_uri(uri) response = make_request(uri) return DataverseImporter(json=response.json(), uri=uri, original_uri=self.uri)
[docs] def get_exporter( self, dataset: "Dataset", *, tag: Optional["DatasetTag"], dataverse_server: Optional[str] = None, dataverse_name: Optional[str] = None, publish: bool = False, **kwargs, ) -> "ExporterApi": """Create export manager for given dataset.""" def set_export_parameters(): """Set and validate required parameters for exporting for a provider.""" server = dataverse_server config_base_url = "server_url" if not server: server = get_value("dataverse", config_base_url) else: set_value("dataverse", config_base_url, server, global_only=True) if not server: raise errors.ParameterError("Dataverse server URL is required.") if not dataverse_name: raise errors.ParameterError("Dataverse name is required.") self._server_url = server # type: ignore self._dataverse_name = dataverse_name # type: ignore self._publish = publish set_export_parameters() return DataverseExporter(dataset=dataset, server_url=self._server_url, dataverse_name=self._dataverse_name)
[docs]class DataverseImporter(RepositoryImporter): """Dataverse record serializer.""" def __init__(self, uri: str, original_uri: str, json: Dict[str, Any]): super().__init__(uri=uri, original_uri=original_uri) self._json: Dict[str, Any] = json
[docs] def is_latest_version(self): """Check if record is at last possible version.""" return True
@staticmethod def _convert_json_property_name(property_name): """Removes '@' and converts names to snake_case.""" property_name = property_name.strip("@") property_name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", property_name) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", property_name).lower() @property def version(self): """Get the major and minor version of this dataset.""" uri = make_versions_url(DataverseProvider.record_id(self._uri), self._uri) response = make_request(uri).json() newest_version = response["data"][0] return "{}.{}".format(newest_version["versionNumber"], newest_version["versionMinorNumber"]) @property def latest_uri(self): """Get URI of latest version.""" return self._uri
[docs] def get_files(self): """Get Dataverse files metadata as ``DataverseFileSerializer``.""" files = [] for f in self._json["distribution"]: mapped_file = {self._convert_json_property_name(k): v for k, v in f.items()} mapped_file["parent_url"] = self._uri files.append(mapped_file) if not files: raise LookupError("no files have been found - deposit is empty or protected") return [DataverseFileSerializer(**file) for file in files]
[docs] def fetch_provider_dataset(self) -> "ProviderDataset": """Deserialize a ``Dataset``.""" from marshmallow import post_load, pre_load from renku.command.schema.agent import PersonSchema from renku.core.dataset.providers.models import ProviderDataset, ProviderDatasetFile, ProviderDatasetSchema from renku.domain_model.dataset import Url, generate_default_slug class DataverseDatasetSchema(ProviderDatasetSchema): """Schema for Dataverse datasets.""" @pre_load def fix_data(self, data, **kwargs): """Fix data that is received from Dataverse.""" # Fix context context = data.get("@context") if context and isinstance(context, str): if context == "http://schema.org": context = "http://schema.org/" data["@context"] = {"@base": context, "@vocab": context} # Add type to creators creators = data.get("creator", []) for c in creators: c["@type"] = [str(t) for t in PersonSchema.opts.rdf_type] # Fix license to be a string license = data.get("license") if license and isinstance(license, dict): data["license"] = license.get("url", "") return data @post_load def fix_timezone(self, obj, **kwargs): """Add timezone to datetime object.""" if obj.get("date_modified"): obj["date_modified"] = fix_datetime(obj["date_modified"]) if obj.get("date_published"): obj["date_published"] = fix_datetime(obj["date_published"]) return obj files = self.get_files() dataset = ProviderDataset.from_jsonld(data=self._json, schema_class=DataverseDatasetSchema) dataset.version = self.version dataset.slug = generate_default_slug(name=dataset.name or "", version=dataset.version) dataset.same_as = ( Url(url_str=get_doi_url(dataset.identifier)) if is_doi(dataset.identifier) else Url(url_id=remove_credentials(self.original_uri)) ) if dataset.description and not dataset.description.strip(): dataset.description = None for creator in dataset.creators: if creator.affiliation == "": creator.affiliation = None self._provider_dataset_files = [ ProviderDatasetFile( source=file.remote_url.geturl(), filename=Path(file.name).name, checksum="", filesize=file.content_size, filetype=file.file_format, path="", ) for file in files ] self._provider_dataset = dataset return self._provider_dataset
[docs]class DataverseFileSerializer: """Dataverse record file.""" def __init__( self, *, content_size=None, content_url=None, description=None, file_format=None, id=None, identifier=None, name=None, parent_url=None, type=None, encoding_format=None, ): self.content_size = content_size self.content_url = content_url self.description = description self.file_format = file_format self.id = id self.identifier = identifier self.name = name self.parent_url = parent_url self.type = type self.encoding_format = encoding_format @property def remote_url(self): """Get remote URL as ``urllib.ParseResult``.""" if self.content_url is not None: return urllib.parse.urlparse(self.content_url) if self.identifier is None: return None doi = extract_doi(self.identifier) if doi is None: return None file_url = make_file_url("doi:" + doi, self.parent_url) return urllib.parse.urlparse(file_url)
[docs]class DataverseExporter(ExporterApi): """Dataverse export manager.""" def __init__(self, *, dataset, server_url=None, dataverse_name=None, publish=False): super().__init__(dataset) self._access_token = None self._server_url = server_url self._dataverse_name = dataverse_name self._publish = publish
[docs] def set_access_token(self, access_token): """Set access token.""" self._access_token = access_token
[docs] def get_access_token_url(self): """Endpoint for creation of access token.""" return urllib.parse.urljoin(self._server_url, "/dataverseuser.xhtml?selectTab=apiTokenTab")
[docs] def export(self, **kwargs): """Execute export process.""" from renku.domain_model.dataset import get_file_path_in_dataset deposition = _DataverseDeposition(server_url=self._server_url, access_token=self._access_token) metadata = self._get_dataset_metadata() response = deposition.create_dataset(dataverse_name=self._dataverse_name, metadata=metadata) dataset_pid = response.json()["data"]["persistentId"] repository = project_context.repository with communication.progress("Uploading files ...", total=len(self.dataset.files)) as progressbar: for file in self.dataset.files: filepath = repository.copy_content_to_file(path=file.entity.path, checksum=file.entity.checksum) path_in_dataset = get_file_path_in_dataset(dataset=self.dataset, dataset_file=file) deposition.upload_file(full_path=filepath, path_in_dataset=path_in_dataset) progressbar.update() if self._publish: deposition.publish_dataset() return dataset_pid
def _get_dataset_metadata(self): authors, contacts = self._get_creators() subject = self._get_subject() keywords = self._get_keywords() metadata_template = Template(DATASET_METADATA_TEMPLATE) metadata = metadata_template.substitute( name=_escape_json_string(self.dataset.name), authors=json.dumps(authors), contacts=json.dumps(contacts), description=_escape_json_string(self.dataset.description), subject=subject, keywords=json.dumps(keywords), ) return json.loads(metadata) @staticmethod def _get_subject(): text_prompt = "Subject of this dataset: \n\n" text_prompt += "\n".join(f"{s}\t[{i}]" for i, s in enumerate(DATAVERSE_SUBJECTS, start=1)) text_prompt += "\n\nSubject" selection = communication.prompt(text_prompt, type=int, default=len(DATAVERSE_SUBJECTS)) or 0 return DATAVERSE_SUBJECTS[selection - 1] def _get_creators(self): authors = [] contacts = [] for creator in self.dataset.creators: name = creator.name or "" affiliation = creator.affiliation or "" email = creator.email or "" author_template = Template(AUTHOR_METADATA_TEMPLATE) author = author_template.substitute( name=_escape_json_string(name), affiliation=_escape_json_string(affiliation) ) authors.append(json.loads(author)) contact_template = Template(CONTACT_METADATA_TEMPLATE) contact = contact_template.substitute(name=_escape_json_string(name), email=email) contacts.append(json.loads(contact)) return authors, contacts def _get_keywords(self): keywords = [] for keyword in self.dataset.keywords: keyword_template = Template(KEYWORDS_METADATA_TEMPLATE) keyword_str = keyword_template.substitute(keyword=_escape_json_string(keyword)) keywords.append(json.loads(keyword_str)) return keywords
class _DataverseDeposition: """Dataverse record for deposit.""" def __init__(self, *, access_token, server_url, dataset_pid=None): self.access_token = access_token self.server_url = server_url self.dataset_pid = dataset_pid DATASET_CREATE_PATH = "dataverses/{dataverseName}/datasets" FILE_UPLOAD_PATH = "datasets/:persistentId/add" DATASET_PUBLISH_PATH = "datasets/:persistentId/actions/:publish" def create_dataset(self, dataverse_name, metadata): """Create a dataset in a given dataverse.""" api_path = self.DATASET_CREATE_PATH.format(dataverseName=dataverse_name) url = self._make_url(api_path=api_path) response = self._post(url=url, json=metadata) self._check_response(response) self.dataset_pid = response.json()["data"]["persistentId"] return response def upload_file(self, full_path, path_in_dataset): """Upload a file to a previously-created dataset.""" if self.dataset_pid is None: raise errors.ExportError("Dataset not created.") url = self._make_url(self.FILE_UPLOAD_PATH, persistentId=self.dataset_pid) params = {"directoryLabel": str(path_in_dataset.parent)} data = dict(jsonData=json.dumps(params)) files = {"file": (path_in_dataset.name, open(full_path, "rb"))} response = self._post(url=url, data=data, files=files) self._check_response(response) return response def publish_dataset(self): """Publish a previously-created dataset.""" if self.dataset_pid is None: raise errors.ExportError("Dataset not created.") url = self._make_url(self.DATASET_PUBLISH_PATH, persistentId=self.dataset_pid, type="major") response = self._post(url=url) self._check_response(response) return response def _make_url(self, api_path, **query_params): """Create URL for creating a dataset.""" url_parts = urlparse.urlparse(self.server_url) path = posixpath.join(DATAVERSE_API_PATH, api_path) query_params_str = urllib.parse.urlencode(query_params) url_parts = url_parts._replace(path=path, query=query_params_str) return urllib.parse.urlunparse(url_parts) def _post(self, url, json=None, data=None, files=None): from renku.core.util import requests headers = {"X-Dataverse-key": self.access_token} try: return requests.post(url=url, json=json, data=data, files=files, headers=headers) except errors.RequestError as e: raise errors.ExportError("Cannot POST to remote server.") from e @staticmethod def _check_response(response): from renku.core.util import requests try: requests.check_response(response=response) except errors.RequestError: json_res = response.json() raise errors.ExportError( "HTTP {} - Cannot export dataset: {}".format( response.status_code, json_res["message"] if "message" in json_res else json_res["status"] ) ) def _escape_json_string(value): """Create a JSON-safe string.""" if isinstance(value, str): return json.dumps(value)[1:-1] return value
[docs]def check_dataverse_uri(url): """Check if an URL points to a dataverse instance.""" from renku.core.util import requests url_parts = list(urlparse.urlparse(url)) url_parts[2] = posixpath.join(DATAVERSE_API_PATH, DATAVERSE_VERSION_API) url_parts[3:6] = [""] * 3 version_url = urlparse.urlunparse(url_parts) response = requests.get(version_url) if response.status_code != 200: return False version_data = response.json() if "status" not in version_data or "data" not in version_data: return False version_info = version_data["data"] if "version" not in version_info or "build" not in version_info: return False return True
[docs]def check_dataverse_doi(doi): """Check if a DOI points to a dataverse dataset.""" try: doi = DOIProvider(uri=doi).get_importer() except LookupError: return False return check_dataverse_uri(doi.uri)
[docs]def make_records_url(record_id, base_url): """Create URL to access record by ID.""" url_parts = list(urlparse.urlparse(base_url)) url_parts[2] = posixpath.join(DATAVERSE_API_PATH, DATAVERSE_METADATA_API) args_dict = {"exporter": DATAVERSE_EXPORTER, "persistentId": record_id} url_parts[4] = urllib.parse.urlencode(args_dict) return urllib.parse.urlunparse(url_parts)
[docs]def make_versions_url(record_id, base_url): """Create URL to access the versions of a record.""" url_parts = list(urlparse.urlparse(base_url)) url_parts[2] = posixpath.join(DATAVERSE_API_PATH, DATAVERSE_VERSIONS_API) args_dict = {"exporter": DATAVERSE_EXPORTER, "persistentId": record_id} url_parts[4] = urllib.parse.urlencode(args_dict) return urllib.parse.urlunparse(url_parts)
[docs]def make_file_url(file_id, base_url): """Create URL to access record by ID.""" url_parts = list(urlparse.urlparse(base_url)) url_parts[2] = posixpath.join(DATAVERSE_API_PATH, DATAVERSE_FILE_API) args_dict = {"persistentId": file_id} url_parts[4] = urllib.parse.urlencode(args_dict) return urllib.parse.urlunparse(url_parts)