Source code for multistorageclient.shortcuts

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17import os
 18import threading
 19from collections.abc import Callable, Iterator
 20from typing import Any, Optional, Union
 21from urllib.parse import ParseResult, urlparse
 22
 23from .client import StorageClient
 24from .config import RESERVED_POSIX_PROFILE_NAME, SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS, StorageClientConfig
 25from .file import ObjectFile, PosixFile
 26from .telemetry import Telemetry
 27from .types import MSC_PROTOCOL, ExecutionMode, ObjectMetadata, PatternList, SyncResult
 28
 29_TELEMETRY_PROVIDER: Optional[Callable[[], Telemetry]] = None
 30_TELEMETRY_PROVIDER_LOCK = threading.Lock()
 31_STORAGE_CLIENT_CACHE: dict[str, StorageClient] = {}
 32_STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
 33_PROCESS_ID = os.getpid()
 34
 35logger = logging.getLogger(__name__)
 36
 37
 38def _reinitialize_after_fork() -> None:
 39    """
 40    Reinitialize module state after fork to ensure fork-safety.
 41
 42    This function is called automatically after a fork to:
 43    1. Clear the storage client cache (cached clients may have invalid state)
 44    2. Reinitialize locks (parent's lock state must not be inherited)
 45    3. Update process ID tracking
 46
 47    Note: The telemetry provider is intentionally inherited by child processes,
 48    only its lock is reinitialized.
 49    """
 50    global _STORAGE_CLIENT_CACHE, _STORAGE_CLIENT_CACHE_LOCK
 51    global _TELEMETRY_PROVIDER_LOCK
 52    global _PROCESS_ID
 53
 54    _STORAGE_CLIENT_CACHE.clear()
 55    _STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
 56    # we don't need to reset telemetry provider as it supposed to be top level Python function
 57    _TELEMETRY_PROVIDER_LOCK = threading.Lock()
 58    _PROCESS_ID = os.getpid()
 59
 60
 61def _check_and_reinitialize_if_forked() -> None:
 62    """
 63    Check if the current process is a fork and reinitialize if needed.
 64
 65    This provides fork-safety for systems where os.register_at_fork is not available
 66    or as a fallback mechanism.
 67    """
 68    global _PROCESS_ID
 69
 70    current_pid = os.getpid()
 71    if current_pid != _PROCESS_ID:
 72        _reinitialize_after_fork()
 73
 74
 75if hasattr(os, "register_at_fork"):
 76    os.register_at_fork(after_in_child=_reinitialize_after_fork)
 77
 78
[docs] 79def get_telemetry_provider() -> Optional[Callable[[], Telemetry]]: 80 """ 81 Get the function used to create :py:class:``Telemetry`` instances for storage clients created by shortcuts. 82 83 :return: A function that provides a telemetry instance. 84 """ 85 global _TELEMETRY_PROVIDER 86 87 return _TELEMETRY_PROVIDER
88 89
[docs] 90def set_telemetry_provider(telemetry_provider: Optional[Callable[[], Telemetry]]) -> None: 91 """ 92 Set the function used to create :py:class:``Telemetry`` instances for storage clients created by shortcuts. 93 94 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 95 """ 96 global _TELEMETRY_PROVIDER 97 global _TELEMETRY_PROVIDER_LOCK 98 99 with _TELEMETRY_PROVIDER_LOCK: 100 _TELEMETRY_PROVIDER = telemetry_provider
101 102 103def _build_full_path(original_url: str, pr: ParseResult) -> str: 104 """ 105 Helper function to construct the full path from a parsed URL, including query and fragment. 106 107 :param original_url: The original URL before parsing 108 :param pr: The parsed URL result from urlparse 109 :return: The complete path including query and fragment if present 110 """ 111 path = pr.path 112 if pr.query: 113 path += "?" + pr.query 114 elif original_url.endswith("?"): 115 path += "?" # handle the glob pattern that has a trailing question mark 116 if pr.fragment: 117 path += "#" + pr.fragment 118 return path 119 120 121def _resolve_msc_url(url: str) -> tuple[str, str]: 122 """ 123 Resolve an MSC URL to a profile name and path. 124 125 :param url: The MSC URL to resolve (msc://profile/path) 126 :return: A tuple of (profile_name, path) 127 """ 128 pr = urlparse(url) 129 profile = pr.netloc 130 path = _build_full_path(url, pr) 131 if path.startswith("/"): 132 path = path[1:] 133 return profile, path 134 135 136def _resolve_non_msc_url(url: str) -> tuple[str, str]: 137 """ 138 Resolve a non-MSC URL to a profile name and path. 139 140 Resolution process: 141 1. First check if MSC config exists 142 2. If config exists, check for possible path mapping 143 3. If no mapping is found, fall back to the reserved POSIX profile (``__filesystem__``) for file paths or create an implicit profile based on URL 144 145 :param url: The non-MSC URL to resolve 146 :return: A tuple of (profile_name, path) 147 """ 148 # Check if we have a valid path mapping, if so check if there is a matching mapping 149 path_mapping = StorageClientConfig.read_path_mapping() 150 if path_mapping: 151 # Look for a matching mapping 152 possible_mapping = path_mapping.find_mapping(url) 153 if possible_mapping: 154 return possible_mapping # return the profile name and path 155 156 # For file paths, use the default POSIX profile 157 if url.startswith("file://"): 158 pr = urlparse(url) 159 return RESERVED_POSIX_PROFILE_NAME, _build_full_path(url, pr) 160 elif url.startswith("/"): 161 url = os.path.normpath(url) 162 return RESERVED_POSIX_PROFILE_NAME, url 163 164 # For other URL protocol, create an implicit profile name 165 pr = urlparse(url) 166 protocol = pr.scheme.lower() 167 168 # Translate relative paths to absolute paths 169 if not protocol: 170 return RESERVED_POSIX_PROFILE_NAME, os.path.realpath(url) 171 172 # Validate the protocol is supported 173 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 174 supported_protocols = ", ".join([f"{p}://" for p in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS]) 175 raise ValueError( 176 f'Unknown URL "{url}", expecting "{MSC_PROTOCOL}" or a supported protocol ({supported_protocols}) or a POSIX path' 177 ) 178 179 # Build the implicit profile name using the format _protocol-bucket 180 bucket = pr.netloc 181 if not bucket: 182 raise ValueError(f'Invalid URL "{url}", bucket name is required for {protocol}:// URLs') 183 184 profile_name = f"_{protocol}-{bucket}" 185 186 # Return normalized path with leading slash removed 187 path = pr.path 188 if path.startswith("/"): 189 path = path[1:] 190 191 return profile_name, path 192 193
[docs] 194def resolve_storage_client(url: str) -> tuple[StorageClient, str]: 195 """ 196 Build and return a :py:class:`multistorageclient.StorageClient` instance based on the provided URL or path. 197 198 This function parses the given URL or path and determines the appropriate storage profile and path. 199 It supports URLs with the protocol ``msc://``, as well as POSIX paths or ``file://`` URLs for local file 200 system access. If the profile has already been instantiated, it returns the cached client. Otherwise, 201 it creates a new :py:class:`StorageClient` and caches it. 202 203 The function also supports implicit profiles for non-MSC URLs. When a non-MSC URL is provided (like s3://, 204 gs://, ais://, file://), MSC will infer the storage provider based on the URL protocol and create an implicit 205 profile with the naming convention "_protocol-bucket" (e.g., "_s3-bucket1", "_gs-bucket1"). 206 207 Path mapping defined in the MSC configuration are also applied before creating implicit profiles. 208 This allows for explicit mappings between source paths and destination MSC profiles. 209 210 This function is fork-safe: after a fork, the cache is automatically cleared and new client instances 211 are created in the child process to avoid sharing stale connections or file descriptors. 212 213 :param url: The storage location, which can be: 214 - A URL in the format ``msc://profile/path`` for object storage. 215 - A local file system path (absolute POSIX path) or a ``file://`` URL. 216 - A non-MSC URL with a supported protocol (s3://, gs://, ais://). 217 218 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` instance and the parsed path. 219 220 :raises ValueError: If the URL's protocol is neither ``msc`` nor a valid local file system path 221 or a supported non-MSC protocol. 222 """ 223 global _STORAGE_CLIENT_CACHE 224 global _STORAGE_CLIENT_CACHE_LOCK 225 226 _check_and_reinitialize_if_forked() 227 228 # Normalize the path for msc:/ prefix due to pathlib.Path('msc://') 229 if url.startswith("msc:/") and not url.startswith("msc://"): 230 url = url.replace("msc:/", "msc://") 231 232 # Resolve the URL to a profile name and path 233 profile, path = _resolve_msc_url(url) if url.startswith(MSC_PROTOCOL) else _resolve_non_msc_url(url) 234 235 # Check if the profile has already been instantiated 236 if profile in _STORAGE_CLIENT_CACHE: 237 return _STORAGE_CLIENT_CACHE[profile], path 238 239 # Create a new StorageClient instance and cache it 240 with _STORAGE_CLIENT_CACHE_LOCK: 241 if profile in _STORAGE_CLIENT_CACHE: 242 return _STORAGE_CLIENT_CACHE[profile], path 243 else: 244 client = StorageClient( 245 config=StorageClientConfig.from_file(profile=profile, telemetry_provider=get_telemetry_provider()) 246 ) 247 _STORAGE_CLIENT_CACHE[profile] = client 248 249 return client, path
250 251
[docs] 252def open(url: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 253 """ 254 Open a file at the given URL using the specified mode. 255 256 The function utilizes the :py:class:`multistorageclient.StorageClient` to open a file at the provided path. 257 The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` is retrieved or built. 258 259 :param url: The URL of the file to open. (example: ``msc://profile/prefix/dataset.tar``) 260 :param mode: The file mode to open the file in. 261 262 :return: A file-like object that allows interaction with the file. 263 264 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 265 """ 266 client, path = resolve_storage_client(url) 267 return client.open(path, mode, **kwargs)
268 269
[docs] 270def glob(pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 271 """ 272 Return a list of files matching a pattern. 273 274 This function supports glob-style patterns for matching multiple files within a storage system. The pattern is 275 parsed, and the associated :py:class:`multistorageclient.StorageClient` is used to retrieve the 276 list of matching files. 277 278 :param pattern: The glob-style pattern to match files. (example: ``msc://profile/prefix/**/*.tar``) 279 :param attribute_filter_expression: The attribute filter expression to apply to the result. 280 281 :return: A list of file paths matching the pattern. 282 283 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 284 """ 285 client, path = resolve_storage_client(pattern) 286 if not pattern.startswith(MSC_PROTOCOL) and client.profile == RESERVED_POSIX_PROFILE_NAME: 287 return client.glob(path, include_url_prefix=False, attribute_filter_expression=attribute_filter_expression) 288 else: 289 return client.glob(path, include_url_prefix=True, attribute_filter_expression=attribute_filter_expression)
290 291
[docs] 292def upload_file(url: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 293 """ 294 Upload a file to the given URL from a local path. 295 296 The function utilizes the :py:class:`multistorageclient.StorageClient` to upload a file (object) to the 297 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 298 is retrieved or built. 299 300 :param url: The URL of the file. (example: ``msc://profile/prefix/dataset.tar``) 301 :param local_path: The local path of the file. 302 303 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 304 """ 305 client, path = resolve_storage_client(url) 306 return client.upload_file(remote_path=path, local_path=local_path, attributes=attributes)
307 308
[docs] 309def download_file(url: str, local_path: str) -> None: 310 """ 311 Download a file in a given remote_path to a local path 312 313 The function utilizes the :py:class:`multistorageclient.StorageClient` to download a file (object) at the 314 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 315 is retrieved or built. 316 317 :param url: The URL of the file to download. (example: ``msc://profile/prefix/dataset.tar``) 318 :param local_path: The local path where the file should be downloaded. 319 320 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 321 """ 322 client, path = resolve_storage_client(url) 323 return client.download_file(remote_path=path, local_path=local_path)
324 325
[docs] 326def is_empty(url: str) -> bool: 327 """ 328 Checks whether the specified URL contains any objects. 329 330 :param url: The URL to check, typically pointing to a storage location. 331 :return: ``True`` if there are no objects/files under this URL, ``False`` otherwise. 332 333 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 334 """ 335 client, path = resolve_storage_client(url) 336 return client.is_empty(path)
337 338
[docs] 339def is_file(url: str) -> bool: 340 """ 341 Checks whether the specified url points to a file (rather than a directory or folder). 342 343 The function utilizes the :py:class:`multistorageclient.StorageClient` to check if a file (object) exists 344 at the provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 345 is retrieved or built. 346 347 :param url: The URL to check the existence of a file. (example: ``msc://profile/prefix/dataset.tar``) 348 """ 349 client, path = resolve_storage_client(url) 350 return client.is_file(path=path)
351 352
[docs] 353def sync( 354 source_url: str, 355 target_url: str, 356 delete_unmatched_files: bool = False, 357 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 358 patterns: Optional[PatternList] = None, 359 preserve_source_attributes: bool = False, 360 ignore_hidden: bool = True, 361) -> SyncResult: 362 """ 363 Syncs files from the source storage to the target storage. 364 365 :param source_url: The URL for the source storage. 366 :param target_url: The URL for the target storage. 367 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 368 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 369 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 370 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 371 When False (default), only file content is copied. When True, custom metadata attributes are also preserved. 372 373 .. warning:: 374 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 375 request for each object to retrieve attributes, which can significantly impact performance on large-scale 376 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 377 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True. 378 """ 379 source_client, source_path = resolve_storage_client(source_url) 380 target_client, target_path = resolve_storage_client(target_url) 381 return target_client.sync_from( 382 source_client, 383 source_path, 384 target_path, 385 delete_unmatched_files, 386 execution_mode=execution_mode, 387 patterns=patterns, 388 preserve_source_attributes=preserve_source_attributes, 389 ignore_hidden=ignore_hidden, 390 )
391 392
[docs] 393def sync_replicas( 394 source_url: str, 395 replica_indices: Optional[list[int]] = None, 396 delete_unmatched_files: bool = False, 397 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 398 patterns: Optional[PatternList] = None, 399 ignore_hidden: bool = True, 400) -> None: 401 """ 402 Syncs files from the source storage to all the replicas. 403 404 :param source_url: The URL for the source storage. 405 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 406 :param delete_unmatched_files: Whether to delete files at the replicas that are not present at the source. 407 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 408 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 409 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True. 410 """ 411 source_client, source_path = resolve_storage_client(source_url) 412 source_client.sync_replicas( 413 source_path, 414 replica_indices=replica_indices, 415 delete_unmatched_files=delete_unmatched_files, 416 execution_mode=execution_mode, 417 patterns=patterns, 418 ignore_hidden=ignore_hidden, 419 )
420 421
[docs] 422def list( 423 url: str, 424 start_after: Optional[str] = None, 425 end_at: Optional[str] = None, 426 include_directories: bool = False, 427 attribute_filter_expression: Optional[str] = None, 428 show_attributes: bool = False, 429 patterns: Optional[PatternList] = None, 430) -> Iterator[ObjectMetadata]: 431 """ 432 Lists the contents of the specified URL prefix. 433 434 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient` 435 for the given URL and returns an iterator of objects (files or directories) stored under the provided prefix. 436 437 :param url: The prefix to list objects under. 438 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 439 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 440 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 441 :param attribute_filter_expression: The attribute filter expression to apply to the result. 442 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 443 :return: An iterator of :py:class:`ObjectMetadata` objects representing the files (and optionally directories) 444 accessible under the specified URL prefix. The returned keys will always be prefixed with msc://. 445 """ 446 client, path = resolve_storage_client(url) 447 return client.list( 448 path=path, 449 start_after=start_after, 450 end_at=end_at, 451 include_directories=include_directories, 452 include_url_prefix=True, 453 attribute_filter_expression=attribute_filter_expression, 454 show_attributes=show_attributes, 455 patterns=patterns, 456 )
457 458
[docs] 459def write(url: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 460 """ 461 Writes an object to the storage provider at the specified path. 462 463 :param url: The path where the object should be written. 464 :param body: The content to write to the object. 465 """ 466 client, path = resolve_storage_client(url) 467 client.write(path=path, body=body, attributes=attributes)
468 469
[docs] 470def delete(url: str, recursive: bool = False) -> None: 471 """ 472 Deletes the specified object(s) from the storage provider. 473 474 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient` 475 for the given URL and deletes the object(s) at the specified path. 476 477 :param url: The URL of the object to delete. (example: ``msc://profile/prefix/file.txt``) 478 :param recursive: Whether to delete objects in the path recursively. 479 """ 480 client, path = resolve_storage_client(url) 481 client.delete(path, recursive=recursive)
482 483
[docs] 484def info(url: str) -> ObjectMetadata: 485 """ 486 Retrieves metadata or information about an object stored at the specified path. 487 488 :param url: The URL of the object to retrieve information about. (example: ``msc://profile/prefix/file.txt``) 489 490 :return: An :py:class:`ObjectMetadata` object representing the object's metadata. 491 """ 492 client, path = resolve_storage_client(url) 493 return client.info(path)
494 495
[docs] 496def commit_metadata(url: str) -> None: 497 """ 498 Commits the metadata updates for the specified storage client profile. 499 500 :param url: The URL of the path to commit metadata for. 501 """ 502 client, path = resolve_storage_client(url) 503 client.commit_metadata(prefix=path)