Source code for multistorageclient.shortcuts

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import os
 17import threading
 18from collections.abc import Callable, Iterator
 19from typing import Any, Optional, Union
 20from urllib.parse import ParseResult, urlparse
 21
 22from .client import StorageClient
 23from .config import DEFAULT_POSIX_PROFILE_NAME, SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS, StorageClientConfig
 24from .file import ObjectFile, PosixFile
 25from .telemetry import Telemetry
 26from .types import MSC_PROTOCOL, ExecutionMode, ObjectMetadata, PatternList
 27
 28_TELEMETRY_PROVIDER: Optional[Callable[[], Telemetry]] = None
 29_TELEMETRY_PROVIDER_LOCK = threading.Lock()
 30_STORAGE_CLIENT_CACHE: dict[str, StorageClient] = {}
 31_STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
 32
 33
[docs] 34def get_telemetry_provider() -> Optional[Callable[[], Telemetry]]: 35 """ 36 Get the function used to create :py:class:``Telemetry`` instances for storage clients created by shortcuts. 37 38 :return: A function that provides a telemetry instance. 39 """ 40 global _TELEMETRY_PROVIDER 41 42 return _TELEMETRY_PROVIDER
43 44
[docs] 45def set_telemetry_provider(telemetry_provider: Optional[Callable[[], Telemetry]]) -> None: 46 """ 47 Set the function used to create :py:class:``Telemetry`` instances for storage clients created by shortcuts. 48 49 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 50 """ 51 global _TELEMETRY_PROVIDER 52 global _TELEMETRY_PROVIDER_LOCK 53 54 with _TELEMETRY_PROVIDER_LOCK: 55 _TELEMETRY_PROVIDER = telemetry_provider
56 57 58def _build_full_path(original_url: str, pr: ParseResult) -> str: 59 """ 60 Helper function to construct the full path from a parsed URL, including query and fragment. 61 62 :param original_url: The original URL before parsing 63 :param pr: The parsed URL result from urlparse 64 :return: The complete path including query and fragment if present 65 """ 66 path = pr.path 67 if pr.query: 68 path += "?" + pr.query 69 elif original_url.endswith("?"): 70 path += "?" # handle the glob pattern that has a trailing question mark 71 if pr.fragment: 72 path += "#" + pr.fragment 73 return path 74 75 76def _resolve_msc_url(url: str) -> tuple[str, str]: 77 """ 78 Resolve an MSC URL to a profile name and path. 79 80 :param url: The MSC URL to resolve (msc://profile/path) 81 :return: A tuple of (profile_name, path) 82 """ 83 pr = urlparse(url) 84 profile = pr.netloc 85 path = _build_full_path(url, pr) 86 if path.startswith("/"): 87 path = path[1:] 88 return profile, path 89 90 91def _resolve_non_msc_url(url: str) -> tuple[str, str]: 92 """ 93 Resolve a non-MSC URL to a profile name and path. 94 95 Resolution process: 96 1. First check if MSC config exists 97 2. If config exists, check for possible path mapping 98 3. If no mapping is found, fall back to default POSIX profile 99 for file paths or create an implicit profile based on URL 100 101 :param url: The non-MSC URL to resolve 102 :return: A tuple of (profile_name, path) 103 """ 104 # Check if we have a valid path mapping, if so check if there is a matching mapping 105 path_mapping = StorageClientConfig.read_path_mapping() 106 if path_mapping: 107 # Look for a matching mapping 108 possible_mapping = path_mapping.find_mapping(url) 109 if possible_mapping: 110 return possible_mapping # return the profile name and path 111 112 # For file paths, use the default POSIX profile 113 if url.startswith("file://"): 114 pr = urlparse(url) 115 return DEFAULT_POSIX_PROFILE_NAME, _build_full_path(url, pr) 116 elif url.startswith("/"): 117 url = os.path.normpath(url) 118 return DEFAULT_POSIX_PROFILE_NAME, url 119 120 # For other URL protocol, create an implicit profile name 121 pr = urlparse(url) 122 protocol = pr.scheme.lower() 123 124 # Translate relative paths to absolute paths 125 if not protocol: 126 return DEFAULT_POSIX_PROFILE_NAME, os.path.realpath(url) 127 128 # Validate the protocol is supported 129 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 130 supported_protocols = ", ".join([f"{p}://" for p in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS]) 131 raise ValueError( 132 f'Unknown URL "{url}", expecting "{MSC_PROTOCOL}" or a supported protocol ({supported_protocols}) or a POSIX path' 133 ) 134 135 # Build the implicit profile name using the format _protocol-bucket 136 bucket = pr.netloc 137 if not bucket: 138 raise ValueError(f'Invalid URL "{url}", bucket name is required for {protocol}:// URLs') 139 140 profile_name = f"_{protocol}-{bucket}" 141 142 # Return normalized path with leading slash removed 143 path = pr.path 144 if path.startswith("/"): 145 path = path[1:] 146 147 return profile_name, path 148 149
[docs] 150def resolve_storage_client(url: str) -> tuple[StorageClient, str]: 151 """ 152 Build and return a :py:class:`multistorageclient.StorageClient` instance based on the provided URL or path. 153 154 This function parses the given URL or path and determines the appropriate storage profile and path. 155 It supports URLs with the protocol ``msc://``, as well as POSIX paths or ``file://`` URLs for local file 156 system access. If the profile has already been instantiated, it returns the cached client. Otherwise, 157 it creates a new :py:class:`StorageClient` and caches it. 158 159 The function also supports implicit profiles for non-MSC URLs. When a non-MSC URL is provided (like s3://, 160 gs://, ais://, file://), MSC will infer the storage provider based on the URL protocol and create an implicit 161 profile with the naming convention "_protocol-bucket" (e.g., "_s3-bucket1", "_gs-bucket1"). 162 163 Path mapping defined in the MSC configuration are also applied before creating implicit profiles. 164 This allows for explicit mappings between source paths and destination MSC profiles. 165 166 :param url: The storage location, which can be: 167 - A URL in the format ``msc://profile/path`` for object storage. 168 - A local file system path (absolute POSIX path) or a ``file://`` URL. 169 - A non-MSC URL with a supported protocol (s3://, gs://, ais://). 170 171 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` instance and the parsed path. 172 173 :raises ValueError: If the URL's protocol is neither ``msc`` nor a valid local file system path 174 or a supported non-MSC protocol. 175 """ 176 global _STORAGE_CLIENT_CACHE 177 global _STORAGE_CLIENT_CACHE_LOCK 178 179 # Normalize the path for msc:/ prefix due to pathlib.Path('msc://') 180 if url.startswith("msc:/") and not url.startswith("msc://"): 181 url = url.replace("msc:/", "msc://") 182 183 # Resolve the URL to a profile name and path 184 profile, path = _resolve_msc_url(url) if url.startswith(MSC_PROTOCOL) else _resolve_non_msc_url(url) 185 186 # Check if the profile has already been instantiated 187 if profile in _STORAGE_CLIENT_CACHE: 188 return _STORAGE_CLIENT_CACHE[profile], path 189 190 # Create a new StorageClient instance and cache it 191 with _STORAGE_CLIENT_CACHE_LOCK: 192 if profile in _STORAGE_CLIENT_CACHE: 193 return _STORAGE_CLIENT_CACHE[profile], path 194 else: 195 client = StorageClient( 196 config=StorageClientConfig.from_file(profile=profile, telemetry_provider=get_telemetry_provider()) 197 ) 198 _STORAGE_CLIENT_CACHE[profile] = client 199 200 return client, path
201 202
[docs] 203def open(url: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 204 """ 205 Open a file at the given URL using the specified mode. 206 207 The function utilizes the :py:class:`multistorageclient.StorageClient` to open a file at the provided path. 208 The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` is retrieved or built. 209 210 :param url: The URL of the file to open. (example: ``msc://profile/prefix/dataset.tar``) 211 :param mode: The file mode to open the file in. 212 213 :return: A file-like object that allows interaction with the file. 214 215 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 216 """ 217 client, path = resolve_storage_client(url) 218 return client.open(path, mode, **kwargs)
219 220
[docs] 221def glob(pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 222 """ 223 Return a list of files matching a pattern. 224 225 This function supports glob-style patterns for matching multiple files within a storage system. The pattern is 226 parsed, and the associated :py:class:`multistorageclient.StorageClient` is used to retrieve the 227 list of matching files. 228 229 :param pattern: The glob-style pattern to match files. (example: ``msc://profile/prefix/**/*.tar``) 230 :param attribute_filter_expression: The attribute filter expression to apply to the result. 231 232 :return: A list of file paths matching the pattern. 233 234 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 235 """ 236 client, path = resolve_storage_client(pattern) 237 if not pattern.startswith(MSC_PROTOCOL) and client.profile == DEFAULT_POSIX_PROFILE_NAME: 238 return client.glob(path, include_url_prefix=False, attribute_filter_expression=attribute_filter_expression) 239 else: 240 return client.glob(path, include_url_prefix=True, attribute_filter_expression=attribute_filter_expression)
241 242
[docs] 243def upload_file(url: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 244 """ 245 Upload a file to the given URL from a local path. 246 247 The function utilizes the :py:class:`multistorageclient.StorageClient` to upload a file (object) to the 248 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 249 is retrieved or built. 250 251 :param url: The URL of the file. (example: ``msc://profile/prefix/dataset.tar``) 252 :param local_path: The local path of the file. 253 254 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 255 """ 256 client, path = resolve_storage_client(url) 257 return client.upload_file(remote_path=path, local_path=local_path, attributes=attributes)
258 259
[docs] 260def download_file(url: str, local_path: str) -> None: 261 """ 262 Download a file in a given remote_path to a local path 263 264 The function utilizes the :py:class:`multistorageclient.StorageClient` to download a file (object) at the 265 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 266 is retrieved or built. 267 268 :param url: The URL of the file to download. (example: ``msc://profile/prefix/dataset.tar``) 269 :param local_path: The local path where the file should be downloaded. 270 271 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 272 """ 273 client, path = resolve_storage_client(url) 274 return client.download_file(remote_path=path, local_path=local_path)
275 276
[docs] 277def is_empty(url: str) -> bool: 278 """ 279 Checks whether the specified URL contains any objects. 280 281 :param url: The URL to check, typically pointing to a storage location. 282 :return: ``True`` if there are no objects/files under this URL, ``False`` otherwise. 283 284 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``. 285 """ 286 client, path = resolve_storage_client(url) 287 return client.is_empty(path)
288 289
[docs] 290def is_file(url: str) -> bool: 291 """ 292 Checks whether the specified url points to a file (rather than a directory or folder). 293 294 The function utilizes the :py:class:`multistorageclient.StorageClient` to check if a file (object) exists 295 at the provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` 296 is retrieved or built. 297 298 :param url: The URL to check the existence of a file. (example: ``msc://profile/prefix/dataset.tar``) 299 """ 300 client, path = resolve_storage_client(url) 301 return client.is_file(path=path)
302 303
[docs] 304def sync( 305 source_url: str, 306 target_url: str, 307 delete_unmatched_files: bool = False, 308 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 309 patterns: Optional[PatternList] = None, 310 preserve_source_attributes: bool = False, 311) -> None: 312 """ 313 Syncs files from the source storage to the target storage. 314 315 :param source_url: The URL for the source storage. 316 :param target_url: The URL for the target storage. 317 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 318 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 319 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 320 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 321 When False (default), only file content is copied. When True, custom metadata attributes are also preserved. 322 323 .. warning:: 324 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 325 request for each object to retrieve attributes, which can significantly impact performance on large-scale 326 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 327 """ 328 source_client, source_path = resolve_storage_client(source_url) 329 target_client, target_path = resolve_storage_client(target_url) 330 target_client.sync_from( 331 source_client, 332 source_path, 333 target_path, 334 delete_unmatched_files, 335 execution_mode=execution_mode, 336 patterns=patterns, 337 preserve_source_attributes=preserve_source_attributes, 338 )
339 340
[docs] 341def sync_replicas( 342 source_url: str, 343 replica_indices: Optional[list[int]] = None, 344 delete_unmatched_files: bool = False, 345 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 346 patterns: Optional[PatternList] = None, 347) -> None: 348 """ 349 Syncs files from the source storage to all the replicas. 350 351 :param source_url: The URL for the source storage. 352 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 353 :param delete_unmatched_files: Whether to delete files at the replicas that are not present at the source. 354 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 355 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 356 """ 357 source_client, source_path = resolve_storage_client(source_url) 358 source_client.sync_replicas( 359 source_path, 360 replica_indices=replica_indices, 361 delete_unmatched_files=delete_unmatched_files, 362 execution_mode=execution_mode, 363 patterns=patterns, 364 )
365 366
[docs] 367def list( 368 url: str, 369 start_after: Optional[str] = None, 370 end_at: Optional[str] = None, 371 include_directories: bool = False, 372 attribute_filter_expression: Optional[str] = None, 373 show_attributes: bool = False, 374) -> Iterator[ObjectMetadata]: 375 """ 376 Lists the contents of the specified URL prefix. 377 378 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient` 379 for the given URL and returns an iterator of objects (files or directories) stored under the provided prefix. 380 381 :param url: The prefix to list objects under. 382 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 383 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 384 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 385 :param attribute_filter_expression: The attribute filter expression to apply to the result. 386 387 :return: An iterator of :py:class:`ObjectMetadata` objects representing the files (and optionally directories) 388 accessible under the specified URL prefix. The returned keys will always be prefixed with msc://. 389 """ 390 client, path = resolve_storage_client(url) 391 return client.list( 392 path=path, 393 start_after=start_after, 394 end_at=end_at, 395 include_directories=include_directories, 396 include_url_prefix=True, 397 attribute_filter_expression=attribute_filter_expression, 398 show_attributes=show_attributes, 399 )
400 401
[docs] 402def write(url: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 403 """ 404 Writes an object to the storage provider at the specified path. 405 406 :param url: The path where the object should be written. 407 :param body: The content to write to the object. 408 """ 409 client, path = resolve_storage_client(url) 410 client.write(path=path, body=body, attributes=attributes)
411 412
[docs] 413def delete(url: str, recursive: bool = False) -> None: 414 """ 415 Deletes the specified object(s) from the storage provider. 416 417 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient` 418 for the given URL and deletes the object(s) at the specified path. 419 420 :param url: The URL of the object to delete. (example: ``msc://profile/prefix/file.txt``) 421 :param recursive: Whether to delete objects in the path recursively. 422 """ 423 client, path = resolve_storage_client(url) 424 client.delete(path, recursive=recursive)
425 426
[docs] 427def info(url: str) -> ObjectMetadata: 428 """ 429 Retrieves metadata or information about an object stored at the specified path. 430 431 :param url: The URL of the object to retrieve information about. (example: ``msc://profile/prefix/file.txt``) 432 433 :return: An :py:class:`ObjectMetadata` object representing the object's metadata. 434 """ 435 client, path = resolve_storage_client(url) 436 return client.info(path)
437 438
[docs] 439def commit_metadata(url: str) -> None: 440 """ 441 Commits the metadata updates for the specified storage client profile. 442 443 :param url: The URL of the path to commit metadata for. 444 """ 445 client, path = resolve_storage_client(url) 446 client.commit_metadata(prefix=path)