Source code for multistorageclient.shortcuts

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import os
 17import threading
 18from collections.abc import Callable, Iterator
 19from typing import Any, Optional, Union
 20from urllib.parse import ParseResult, urlparse
 21
 22from .client import StorageClient
 23from .config import DEFAULT_POSIX_PROFILE_NAME, SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS, StorageClientConfig
 24from .file import ObjectFile, PosixFile
 25from .telemetry import Telemetry
 26from .types import MSC_PROTOCOL, ExecutionMode, ObjectMetadata, PatternList
 27
 28_TELEMETRY_PROVIDER: Optional[Callable[[], Telemetry]] = None
 29_TELEMETRY_PROVIDER_LOCK = threading.Lock()
 30_STORAGE_CLIENT_CACHE: dict[str, StorageClient] = {}
 31_STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
 32_PROCESS_ID = os.getpid()
 33
 34
 35def _reinitialize_after_fork() -> None:
 36    """
 37    Reinitialize module state after fork to ensure fork-safety.
 38
 39    This function is called automatically after a fork to:
 40    1. Clear the storage client cache (cached clients may have invalid state)
 41    2. Reinitialize locks (parent's lock state must not be inherited)
 42    3. Update process ID tracking
 43
 44    Note: The telemetry provider is intentionally inherited by child processes,
 45    only its lock is reinitialized.
 46    """
 47    global _STORAGE_CLIENT_CACHE, _STORAGE_CLIENT_CACHE_LOCK
 48    global _TELEMETRY_PROVIDER_LOCK
 49    global _PROCESS_ID
 50
 51    _STORAGE_CLIENT_CACHE.clear()
 52    _STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
 53    # we don't need to reset telemetry provider as it supposed to be top level Python function
 54    _TELEMETRY_PROVIDER_LOCK = threading.Lock()
 55    _PROCESS_ID = os.getpid()
 56
 57
 58def _check_and_reinitialize_if_forked() -> None:
 59    """
 60    Check if the current process is a fork and reinitialize if needed.
 61
 62    This provides fork-safety for systems where os.register_at_fork is not available
 63    or as a fallback mechanism.
 64    """
 65    global _PROCESS_ID
 66
 67    current_pid = os.getpid()
 68    if current_pid != _PROCESS_ID:
 69        _reinitialize_after_fork()
 70
 71
 72if hasattr(os, "register_at_fork"):
 73    os.register_at_fork(after_in_child=_reinitialize_after_fork)
 74
 75
[docs] 76def get_telemetry_provider() -> Optional[Callable[[], Telemetry]]: 77 """ 78 Get the function used to create :py:class:``Telemetry`` instances for storage clients created by shortcuts. 79 80 :return: A function that provides a telemetry instance. 81 """ 82 global _TELEMETRY_PROVIDER 83 84 return _TELEMETRY_PROVIDER
85 86
[docs] 87def set_telemetry_provider(telemetry_provider: Optional[Callable[[], Telemetry]]) -> None: 88 """ 89 Set the function used to create :py:class:``Telemetry`` instances for storage clients created by shortcuts. 90 91 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 92 """ 93 global _TELEMETRY_PROVIDER 94 global _TELEMETRY_PROVIDER_LOCK 95 96 with _TELEMETRY_PROVIDER_LOCK: 97 _TELEMETRY_PROVIDER = telemetry_provider
98 99 100def _build_full_path(original_url: str, pr: ParseResult) -> str: 101 """ 102 Helper function to construct the full path from a parsed URL, including query and fragment. 103 104 :param original_url: The original URL before parsing 105 :param pr: The parsed URL result from urlparse 106 :return: The complete path including query and fragment if present 107 """ 108 path = pr.path 109 if pr.query: 110 path += "?" + pr.query 111 elif original_url.endswith("?"): 112 path += "?" # handle the glob pattern that has a trailing question mark 113 if pr.fragment: 114 path += "#" + pr.fragment 115 return path 116 117 118def _resolve_msc_url(url: str) -> tuple[str, str]: 119 """ 120 Resolve an MSC URL to a profile name and path. 121 122 :param url: The MSC URL to resolve (msc://profile/path) 123 :return: A tuple of (profile_name, path) 124 """ 125 pr = urlparse(url) 126 profile = pr.netloc 127 path = _build_full_path(url, pr) 128 if path.startswith("/"): 129 path = path[1:] 130 return profile, path 131 132 133def _resolve_non_msc_url(url: str) -> tuple[str, str]: 134 """ 135 Resolve a non-MSC URL to a profile name and path. 136 137 Resolution process: 138 1. First check if MSC config exists 139 2. If config exists, check for possible path mapping 140 3. If no mapping is found, fall back to default POSIX profile 141 for file paths or create an implicit profile based on URL 142 143 :param url: The non-MSC URL to resolve 144 :return: A tuple of (profile_name, path) 145 """ 146 # Check if we have a valid path mapping, if so check if there is a matching mapping 147 path_mapping = StorageClientConfig.read_path_mapping() 148 if path_mapping: 149 # Look for a matching mapping 150 possible_mapping = path_mapping.find_mapping(url) 151 if possible_mapping: 152 return possible_mapping # return the profile name and path 153 154 # For file paths, use the default POSIX profile 155 if url.startswith("file://"): 156 pr = urlparse(url) 157 return DEFAULT_POSIX_PROFILE_NAME, _build_full_path(url, pr) 158 elif url.startswith("/"): 159 url = os.path.normpath(url) 160 return DEFAULT_POSIX_PROFILE_NAME, url 161 162 # For other URL protocol, create an implicit profile name 163 pr = urlparse(url) 164 protocol = pr.scheme.lower() 165 166 # Translate relative paths to absolute paths 167 if not protocol: 168 return DEFAULT_POSIX_PROFILE_NAME, os.path.realpath(url) 169 170 # Validate the protocol is supported 171 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 172 supported_protocols = ", ".join([f"{p}://" for p in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS]) 173 raise ValueError( 174 f'Unknown URL "{url}", expecting "{MSC_PROTOCOL}" or a supported protocol ({supported_protocols}) or a POSIX path' 175 ) 176 177 # Build the implicit profile name using the format _protocol-bucket 178 bucket = pr.netloc 179 if not bucket: 180 raise ValueError(f'Invalid URL "{url}", bucket name is required for {protocol}:// URLs') 181 182 profile_name = f"_{protocol}-{bucket}" 183 184 # Return normalized path with leading slash removed 185 path = pr.path 186 if path.startswith("/"): 187 path = path[1:] 188 189 return profile_name, path 190 191
[docs] 192def resolve_storage_client(url: str) -> tuple[StorageClient, str]: 193 """ 194 Build and return a :py:class:`multistorageclient.StorageClient` instance based on the provided URL or path. 195 196 This function parses the given URL or path and determines the appropriate storage profile and path. 197 It supports URLs with the protocol ``msc://``, as well as POSIX paths or ``file://`` URLs for local file 198 system access. If the profile has already been instantiated, it returns the cached client. Otherwise, 199 it creates a new :py:class:`StorageClient` and caches it. 200 201 The function also supports implicit profiles for non-MSC URLs. When a non-MSC URL is provided (like s3://, 202 gs://, ais://, file://), MSC will infer the storage provider based on the URL protocol and create an implicit 203 profile with the naming convention "_protocol-bucket" (e.g., "_s3-bucket1", "_gs-bucket1"). 204 205 Path mapping defined in the MSC configuration are also applied before creating implicit profiles. 206 This allows for explicit mappings between source paths and destination MSC profiles. 207 208 This function is fork-safe: after a fork, the cache is automatically cleared and new client instances 209 are created in the child process to avoid sharing stale connections or file descriptors. 210 211 :param url: The storage location, which can be: 212 - A URL in the format ``msc://profile/path`` for object storage. 213 - A local file system path (absolute POSIX path) or a ``file://`` URL. 214 - A non-MSC URL with a supported protocol (s3://, gs://, ais://). 215 216 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` instance and the parsed path. 217 218 :raises ValueError: If the URL's protocol is neither ``msc`` nor a valid local file system path 219 or a supported non-MSC protocol. 220 """ 221 global _STORAGE_CLIENT_CACHE 222 global _STORAGE_CLIENT_CACHE_LOCK 223 224 _check_and_reinitialize_if_forked() 225 226 # Normalize the path for msc:/ prefix due to pathlib.Path('msc://') 227 if url.startswith("msc:/") and not url.startswith("msc://"): 228 url = url.replace("msc:/", "msc://") 229 230 # Resolve the URL to a profile name and path 231 profile, path = _resolve_msc_url(url) if url.startswith(MSC_PROTOCOL) else _resolve_non_msc_url(url) 232 233 # Check if the profile has already been instantiated 234 if profile in _STORAGE_CLIENT_CACHE: 235 return _STORAGE_CLIENT_CACHE[profile], path 236 237 # Create a new StorageClient instance and cache it 238 with _STORAGE_CLIENT_CACHE_LOCK: 239 if profile in _STORAGE_CLIENT_CACHE: 240 return _STORAGE_CLIENT_CACHE[profile], path 241 else: 242 client = StorageClient( 243 config=StorageClientConfig.from_file(profile=profile, telemetry_provider=get_telemetry_provider()) 244 ) 245 _STORAGE_CLIENT_CACHE[profile] = client 246 247 return client, path
248 249
[docs] 250def open(url: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 251 """ 252 Open a file at the given URL using the specified mode. 253 254 The function utilizes the :py:class:`multistorageclient.StorageClient` to open a file at the provided path. 255 The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` is retrieved or built. 256 257 :param url: The URL of the file to open. (example: ``msc://profile/prefix/dataset.tar``) 258 :param mode: The file mode to open the file in. 259 260 :return: A file-like object that allows interaction with the file. 261 262 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 263 """ 264 client, path = resolve_storage_client(url) 265 return client.open(path, mode, **kwargs)
266 267
[docs] 268def glob(pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 269 """ 270 Return a list of files matching a pattern. 271 272 This function supports glob-style patterns for matching multiple files within a storage system. The pattern is 273 parsed, and the associated :py:class:`multistorageclient.StorageClient` is used to retrieve the 274 list of matching files. 275 276 :param pattern: The glob-style pattern to match files. (example: ``msc://profile/prefix/**/*.tar``) 277 :param attribute_filter_expression: The attribute filter expression to apply to the result. 278 279 :return: A list of file paths matching the pattern. 280 281 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 282 """ 283 client, path = resolve_storage_client(pattern) 284 if not pattern.startswith(MSC_PROTOCOL) and client.profile == DEFAULT_POSIX_PROFILE_NAME: 285 return client.glob(path, include_url_prefix=False, attribute_filter_expression=attribute_filter_expression) 286 else: 287 return client.glob(path, include_url_prefix=True, attribute_filter_expression=attribute_filter_expression)
288 289
[docs] 290def upload_file(url: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 291 """ 292 Upload a file to the given URL from a local path. 293 294 The function utilizes the :py:class:`multistorageclient.StorageClient` to upload a file (object) to the 295 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 296 is retrieved or built. 297 298 :param url: The URL of the file. (example: ``msc://profile/prefix/dataset.tar``) 299 :param local_path: The local path of the file. 300 301 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 302 """ 303 client, path = resolve_storage_client(url) 304 return client.upload_file(remote_path=path, local_path=local_path, attributes=attributes)
305 306
[docs] 307def download_file(url: str, local_path: str) -> None: 308 """ 309 Download a file in a given remote_path to a local path 310 311 The function utilizes the :py:class:`multistorageclient.StorageClient` to download a file (object) at the 312 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 313 is retrieved or built. 314 315 :param url: The URL of the file to download. (example: ``msc://profile/prefix/dataset.tar``) 316 :param local_path: The local path where the file should be downloaded. 317 318 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 319 """ 320 client, path = resolve_storage_client(url) 321 return client.download_file(remote_path=path, local_path=local_path)
322 323
[docs] 324def is_empty(url: str) -> bool: 325 """ 326 Checks whether the specified URL contains any objects. 327 328 :param url: The URL to check, typically pointing to a storage location. 329 :return: ``True`` if there are no objects/files under this URL, ``False`` otherwise. 330 331 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 332 """ 333 client, path = resolve_storage_client(url) 334 return client.is_empty(path)
335 336
[docs] 337def is_file(url: str) -> bool: 338 """ 339 Checks whether the specified url points to a file (rather than a directory or folder). 340 341 The function utilizes the :py:class:`multistorageclient.StorageClient` to check if a file (object) exists 342 at the provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 343 is retrieved or built. 344 345 :param url: The URL to check the existence of a file. (example: ``msc://profile/prefix/dataset.tar``) 346 """ 347 client, path = resolve_storage_client(url) 348 return client.is_file(path=path)
349 350
[docs] 351def sync( 352 source_url: str, 353 target_url: str, 354 delete_unmatched_files: bool = False, 355 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 356 patterns: Optional[PatternList] = None, 357 preserve_source_attributes: bool = False, 358 ignore_hidden: bool = True, 359) -> None: 360 """ 361 Syncs files from the source storage to the target storage. 362 363 :param source_url: The URL for the source storage. 364 :param target_url: The URL for the target storage. 365 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 366 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 367 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 368 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 369 When False (default), only file content is copied. When True, custom metadata attributes are also preserved. 370 371 .. warning:: 372 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 373 request for each object to retrieve attributes, which can significantly impact performance on large-scale 374 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 375 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True. 376 """ 377 source_client, source_path = resolve_storage_client(source_url) 378 target_client, target_path = resolve_storage_client(target_url) 379 target_client.sync_from( 380 source_client, 381 source_path, 382 target_path, 383 delete_unmatched_files, 384 execution_mode=execution_mode, 385 patterns=patterns, 386 preserve_source_attributes=preserve_source_attributes, 387 ignore_hidden=ignore_hidden, 388 )
389 390
[docs] 391def sync_replicas( 392 source_url: str, 393 replica_indices: Optional[list[int]] = None, 394 delete_unmatched_files: bool = False, 395 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 396 patterns: Optional[PatternList] = None, 397 ignore_hidden: bool = True, 398) -> None: 399 """ 400 Syncs files from the source storage to all the replicas. 401 402 :param source_url: The URL for the source storage. 403 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 404 :param delete_unmatched_files: Whether to delete files at the replicas that are not present at the source. 405 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 406 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 407 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True. 408 """ 409 source_client, source_path = resolve_storage_client(source_url) 410 source_client.sync_replicas( 411 source_path, 412 replica_indices=replica_indices, 413 delete_unmatched_files=delete_unmatched_files, 414 execution_mode=execution_mode, 415 patterns=patterns, 416 ignore_hidden=ignore_hidden, 417 )
418 419
[docs] 420def list( 421 url: str, 422 start_after: Optional[str] = None, 423 end_at: Optional[str] = None, 424 include_directories: bool = False, 425 attribute_filter_expression: Optional[str] = None, 426 show_attributes: bool = False, 427) -> Iterator[ObjectMetadata]: 428 """ 429 Lists the contents of the specified URL prefix. 430 431 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient` 432 for the given URL and returns an iterator of objects (files or directories) stored under the provided prefix. 433 434 :param url: The prefix to list objects under. 435 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 436 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 437 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 438 :param attribute_filter_expression: The attribute filter expression to apply to the result. 439 440 :return: An iterator of :py:class:`ObjectMetadata` objects representing the files (and optionally directories) 441 accessible under the specified URL prefix. The returned keys will always be prefixed with msc://. 442 """ 443 client, path = resolve_storage_client(url) 444 return client.list( 445 path=path, 446 start_after=start_after, 447 end_at=end_at, 448 include_directories=include_directories, 449 include_url_prefix=True, 450 attribute_filter_expression=attribute_filter_expression, 451 show_attributes=show_attributes, 452 )
453 454
[docs] 455def write(url: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 456 """ 457 Writes an object to the storage provider at the specified path. 458 459 :param url: The path where the object should be written. 460 :param body: The content to write to the object. 461 """ 462 client, path = resolve_storage_client(url) 463 client.write(path=path, body=body, attributes=attributes)
464 465
[docs] 466def delete(url: str, recursive: bool = False) -> None: 467 """ 468 Deletes the specified object(s) from the storage provider. 469 470 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient` 471 for the given URL and deletes the object(s) at the specified path. 472 473 :param url: The URL of the object to delete. (example: ``msc://profile/prefix/file.txt``) 474 :param recursive: Whether to delete objects in the path recursively. 475 """ 476 client, path = resolve_storage_client(url) 477 client.delete(path, recursive=recursive)
478 479
[docs] 480def info(url: str) -> ObjectMetadata: 481 """ 482 Retrieves metadata or information about an object stored at the specified path. 483 484 :param url: The URL of the object to retrieve information about. (example: ``msc://profile/prefix/file.txt``) 485 486 :return: An :py:class:`ObjectMetadata` object representing the object's metadata. 487 """ 488 client, path = resolve_storage_client(url) 489 return client.info(path)
490 491
[docs] 492def commit_metadata(url: str) -> None: 493 """ 494 Commits the metadata updates for the specified storage client profile. 495 496 :param url: The URL of the path to commit metadata for. 497 """ 498 client, path = resolve_storage_client(url) 499 client.commit_metadata(prefix=path)