Source code for renku.core.util.os

# Copyright Swiss Data Science Center (SDSC). A partnership between
# École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OS utility functions."""

import fnmatch
import glob
import hashlib
import io
import os
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any, BinaryIO, Dict, Generator, List, Optional, Sequence, Union

from renku.core import errors

BLOCK_SIZE = 4096


[docs]def get_relative_path_to_cwd(path: Union[Path, str]) -> str: """Get a relative path to current working directory.""" absolute_path = os.path.abspath(path) return os.path.relpath(absolute_path, os.getcwd())
[docs]def get_expanded_user_path(path: Union[Path, str]) -> str: """Expand the path if it starts with ``~``.""" return "" if not path else os.path.expanduser(path)
[docs]def get_absolute_path( path: Union[Path, str], base: Optional[Union[Path, str]] = None, resolve_symlinks: bool = False, expand: bool = True ) -> str: """Return absolute normalized path. Args: path(Union[Path, str]): Path to get its absolute. base(Union[Path, str]): Base path to get absolute path from it. resolve_symlinks(bool): Whether to keep or resolve symlinks. expand(bool): Whether to expand ``~`` or not (Default value = True) Returns: str: Absolute path. """ if expand: path = get_expanded_user_path(path) if base is not None: if expand: base = get_expanded_user_path(base) base = Path(base).resolve() if resolve_symlinks else os.path.abspath(base) path = os.path.join(base, path) if resolve_symlinks: return os.path.realpath(path) else: # NOTE: Do not use os.path.realpath or Path.resolve() because they resolve symlinks return os.path.abspath(path)
[docs]def get_safe_relative_path(path: Union[Path, str], base: Union[Path, str]) -> Path: """Return a relative path to the base and check path is within base with all symlinks resolved. NOTE: This is used to prevent path traversal attack. """ try: base = Path(base).resolve() absolute_path = base / path return absolute_path.resolve().relative_to(base) except ValueError: raise ValueError(f"Path '{path}' is not with base directory '{base}'")
[docs]def get_relative_path(path: Union[Path, str], base: Union[Path, str], strict: bool = False) -> Optional[str]: """Return a relative path to the base if path is within base without resolving symlinks.""" try: absolute_path = get_absolute_path(path=path, base=base) return str(Path(absolute_path).relative_to(base)) except ValueError: if strict: raise errors.ParameterError(f"File {path} is not within path {base}") return None
[docs]def is_subpath(path: Union[Path, str], base: Union[Path, str]) -> bool: """Return True if path is within or same as base.""" absolute_path = get_absolute_path(path=path) absolute_base = get_absolute_path(path=base) try: Path(absolute_path).relative_to(absolute_base) except ValueError: return False else: return True
[docs]def get_relative_paths(paths: Sequence[Union[Path, str]], base: Union[Path, str]) -> List[str]: """Return a list of paths relative to a base path.""" relative_paths = [] for path in paths: relative_path = get_relative_path(path=path, base=base) if relative_path is None: raise errors.ParameterError(f"Path '{path}' is not within base path '{base}'") relative_paths.append(relative_path) return relative_paths
[docs]def get_files(path: Path) -> Generator[Path, None, None]: """Return all files from a starting file/directory.""" if not path.is_dir(): yield path else: for subpath in path.rglob("*"): if not subpath.is_dir(): yield subpath
[docs]def are_paths_equal(a: Union[Path, str], b: Union[Path, str]) -> bool: """Returns if two paths are the same.""" # NOTE: The two paths should be identical; we don't consider the case where one is a sub-path of another return get_absolute_path(a) == get_absolute_path(b)
[docs]def is_path_empty(path: Union[Path, str]) -> bool: """Check if path contains files. :ref path: target path """ subpaths = Path(path).glob("*") return not any(subpaths)
[docs]def delete_path(path: Union[Path, str]) -> None: """Delete a file/directory/symlink.""" try: os.remove(path) except FileNotFoundError: pass except (PermissionError, IsADirectoryError, OSError): shutil.rmtree(path, ignore_errors=True)
[docs]def unmount_path(path: Union[Path, str]) -> None: """Unmount the given path and ignore all errors.""" def execute_command(*command: str) -> bool: try: subprocess.run(command, check=True, text=True, capture_output=True) except (subprocess.CalledProcessError, FileNotFoundError): return False else: return True path = str(path) # NOTE: A symlink means that the path is not mounted itself but it's a link to a mount-point; just delete the link. if os.path.islink(path): os.remove(path) return # NOTE: ``fusermount`` is available on linux and ``umount`` is for macOS result = False if shutil.which("fusermount"): result = execute_command("fusermount", "-u", "-z", path) if not result: execute_command("umount", path)
[docs]def is_ascii(data): """Check if provided string contains only ascii characters.""" return len(data) == len(data.encode())
[docs]def get_file_size(path: Union[Path, str], follow_symlinks: bool = True) -> Optional[int]: """Return size of a file in bytes.""" path = Path(path).resolve() if follow_symlinks else Path(path) try: return path.stat().st_size except OSError: return None
[docs]def normalize_to_ascii(input_string, sep="-"): """Convert a string to only contain ASCII characters, with non-ASCII substring replaced with ``sep``.""" replace_all = [sep, "_", "."] for replacement in replace_all: input_string = input_string.replace(replacement, " ") return ( sep.join( [ component for component in re.sub(r"[^a-zA-Z0-9_.-]+", " ", input_string).split(" ") if component and is_ascii(component) ] ) .lower() .strip(sep) )
[docs]def delete_dataset_file(filepath: Union[Path, str], ignore_errors: bool = True, follow_symlinks: bool = False): """Remove a file/symlink and its pointer file (for external files).""" path = Path(filepath) link = None try: link = path.parent / os.readlink(path) except FileNotFoundError: if not ignore_errors: raise return except OSError: # not a symlink but a normal file pass try: os.remove(path) except OSError: if not ignore_errors: raise if follow_symlinks and link: try: os.remove(link) except FileNotFoundError: pass
[docs]def hash_file(path: Union[Path, str], hash_type: str = "sha256") -> Optional[str]: """Calculate the sha256 hash of a file.""" if not os.path.exists(path): return None with open(path, "rb") as f: return hash_file_descriptor(f, hash_type)
[docs]def hash_string(content: str, hash_type: str = "sha256") -> str: """Hash a string.""" content_bytes = content.encode("utf-8") file = io.BytesIO(content_bytes) return hash_file_descriptor(file, hash_type)
[docs]def hash_file_descriptor(file: BinaryIO, hash_type: str = "sha256") -> str: """Hash content of a file descriptor.""" hash_type = hash_type.lower() assert hash_type in ("sha256", "md5") hash_value = hashlib.sha256() if hash_type == "sha256" else hashlib.md5() # nosec for byte_block in iter(lambda: file.read(BLOCK_SIZE), b""): hash_value.update(byte_block) return hash_value.hexdigest()
[docs]def safe_read_yaml(path: Union[Path, str]) -> Dict[str, Any]: """Parse a YAML file. Returns: In case of success a dictionary of the YAML's content, otherwise raises a ParameterError exception. """ try: from renku.core.util import yaml as yaml return yaml.read_yaml(path) except Exception as e: raise errors.ParameterError(e)
[docs]def matches(path: Union[Path, str], pattern: str) -> bool: """Check if a path matched a given pattern.""" pattern = pattern.rstrip(os.sep) path = Path(path) paths = [path] + list(path.parents)[:-1] for parent in paths: if fnmatch.fnmatch(str(parent), pattern): return True return False
[docs]def expand_directories(paths): """Expand directory with all files it contains.""" processed_paths = set() for path in paths: for matched_path in glob.iglob(str(path), recursive=True): if matched_path in processed_paths: continue path_ = Path(matched_path) if path_.is_dir(): for expanded in path_.rglob("*"): processed_paths.add(str(expanded)) yield str(expanded) else: processed_paths.add(matched_path) yield matched_path
UNITS = { "b": 1, "kb": 1000, "mb": 1000**2, "gb": 1000**3, "tb": 1000**4, "m": 1000**2, "g": 1000**3, "t": 1000**4, "p": 1000**5, "e": 1000**6, "z": 1000**7, "y": 1000**8, "ki": 1024, "mi": 1024**2, "gi": 1024**3, "ti": 1024**4, "pi": 1024**5, "ei": 1024**6, "zi": 1024**7, "yi": 1024**8, }
[docs]def parse_file_size(size_str): """Parse a human readable file size to bytes.""" res = re.search(r"([0-9.]+)([a-zA-Z]{1,2})", size_str) if not res or res.group(2).lower() not in UNITS: raise ValueError( "Supplied file size does not contain a unit. " "Valid units are: {}".format(", ".join(UNITS.keys())) ) value = float(res.group(1)) unit = UNITS[res.group(2).lower()] return int(value * unit)
[docs]def bytes_to_unit(size_in_bytes, unit: str) -> Optional[float]: """Return size in the provided unit.""" unit = unit.lower() if unit not in UNITS: raise ValueError(f"Invalid unit '{unit}'. Valid units are: [{', '.join(UNITS)}]") return None if size_in_bytes is None else size_in_bytes / UNITS[unit]