1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import threading
18from collections.abc import Iterator
19from typing import Any, Optional, Union
20from urllib.parse import ParseResult, urlparse
21
22from .client import StorageClient
23from .config import DEFAULT_POSIX_PROFILE_NAME, SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS, StorageClientConfig
24from .file import ObjectFile, PosixFile
25from .telemetry import Telemetry
26from .types import MSC_PROTOCOL, ExecutionMode, ObjectMetadata
27
28_TELEMETRY: Optional[Telemetry] = None
29_TELEMETRY_LOCK = threading.Lock()
30_STORAGE_CLIENT_CACHE: dict[str, StorageClient] = {}
31_STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
32
33
[docs]
34def get_telemetry() -> Optional[Telemetry]:
35 """
36 Get the :py:class:``Telemetry`` instance to use for storage clients created by shortcuts.
37
38 :return: A telemetry instance.
39 """
40 global _TELEMETRY
41
42 return _TELEMETRY
43
44
[docs]
45def set_telemetry(telemetry: Optional[Telemetry]) -> None:
46 """
47 Set the :py:class:``Telemetry`` instance to use for storage clients created by shortcuts.
48
49 :param telemetry: A telemetry instance.
50 """
51 global _TELEMETRY
52 global _TELEMETRY_LOCK
53
54 with _TELEMETRY_LOCK:
55 _TELEMETRY = telemetry
56
57
58def _build_full_path(original_url: str, pr: ParseResult) -> str:
59 """
60 Helper function to construct the full path from a parsed URL, including query and fragment.
61
62 :param original_url: The original URL before parsing
63 :param pr: The parsed URL result from urlparse
64 :return: The complete path including query and fragment if present
65 """
66 path = pr.path
67 if pr.query:
68 path += "?" + pr.query
69 elif original_url.endswith("?"):
70 path += "?" # handle the glob pattern that has a trailing question mark
71 if pr.fragment:
72 path += "#" + pr.fragment
73 return path
74
75
76def _resolve_msc_url(url: str) -> tuple[str, str]:
77 """
78 Resolve an MSC URL to a profile name and path.
79
80 :param url: The MSC URL to resolve (msc://profile/path)
81 :return: A tuple of (profile_name, path)
82 """
83 pr = urlparse(url)
84 profile = pr.netloc
85 path = _build_full_path(url, pr)
86 if path.startswith("/"):
87 path = path[1:]
88 return profile, path
89
90
91def _resolve_non_msc_url(url: str) -> tuple[str, str]:
92 """
93 Resolve a non-MSC URL to a profile name and path.
94
95 Resolution process:
96 1. First check if MSC config exists
97 2. If config exists, check for possible path mapping
98 3. If no mapping is found, fall back to default POSIX profile
99 for file paths or create an implicit profile based on URL
100
101 :param url: The non-MSC URL to resolve
102 :return: A tuple of (profile_name, path)
103 """
104 # Check if we have a valid path mapping, if so check if there is a matching mapping
105 path_mapping = StorageClientConfig.read_path_mapping()
106 if path_mapping:
107 # Look for a matching mapping
108 possible_mapping = path_mapping.find_mapping(url)
109 if possible_mapping:
110 return possible_mapping # return the profile name and path
111
112 # For file paths, use the default POSIX profile
113 if url.startswith("file://"):
114 pr = urlparse(url)
115 return DEFAULT_POSIX_PROFILE_NAME, _build_full_path(url, pr)
116 elif url.startswith("/"):
117 url = os.path.normpath(url)
118 return DEFAULT_POSIX_PROFILE_NAME, url
119
120 # For other URL protocol, create an implicit profile name
121 pr = urlparse(url)
122 protocol = pr.scheme.lower()
123
124 # Translate relative paths to absolute paths
125 if not protocol:
126 return DEFAULT_POSIX_PROFILE_NAME, os.path.realpath(url)
127
128 # Validate the protocol is supported
129 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
130 supported_protocols = ", ".join([f"{p}://" for p in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS])
131 raise ValueError(
132 f'Unknown URL "{url}", expecting "{MSC_PROTOCOL}" or a supported protocol ({supported_protocols}) or a POSIX path'
133 )
134
135 # Build the implicit profile name using the format _protocol-bucket
136 bucket = pr.netloc
137 if not bucket:
138 raise ValueError(f'Invalid URL "{url}", bucket name is required for {protocol}:// URLs')
139
140 profile_name = f"_{protocol}-{bucket}"
141
142 # Return normalized path with leading slash removed
143 path = pr.path
144 if path.startswith("/"):
145 path = path[1:]
146
147 return profile_name, path
148
149
[docs]
150def resolve_storage_client(url: str) -> tuple[StorageClient, str]:
151 """
152 Build and return a :py:class:`multistorageclient.StorageClient` instance based on the provided URL or path.
153
154 This function parses the given URL or path and determines the appropriate storage profile and path.
155 It supports URLs with the protocol ``msc://``, as well as POSIX paths or ``file://`` URLs for local file
156 system access. If the profile has already been instantiated, it returns the cached client. Otherwise,
157 it creates a new :py:class:`StorageClient` and caches it.
158
159 The function also supports implicit profiles for non-MSC URLs. When a non-MSC URL is provided (like s3://,
160 gs://, ais://, file://), MSC will infer the storage provider based on the URL protocol and create an implicit
161 profile with the naming convention "_protocol-bucket" (e.g., "_s3-bucket1", "_gs-bucket1").
162
163 Path mapping defined in the MSC configuration are also applied before creating implicit profiles.
164 This allows for explicit mappings between source paths and destination MSC profiles.
165
166 :param url: The storage location, which can be:
167 - A URL in the format ``msc://profile/path`` for object storage.
168 - A local file system path (absolute POSIX path) or a ``file://`` URL.
169 - A non-MSC URL with a supported protocol (s3://, gs://, ais://).
170
171 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` instance and the parsed path.
172
173 :raises ValueError: If the URL's protocol is neither ``msc`` nor a valid local file system path
174 or a supported non-MSC protocol.
175 """
176 global _STORAGE_CLIENT_CACHE
177 global _STORAGE_CLIENT_CACHE_LOCK
178
179 # Normalize the path for msc:/ prefix due to pathlib.Path('msc://')
180 if url.startswith("msc:/") and not url.startswith("msc://"):
181 url = url.replace("msc:/", "msc://")
182
183 # Resolve the URL to a profile name and path
184 profile, path = _resolve_msc_url(url) if url.startswith(MSC_PROTOCOL) else _resolve_non_msc_url(url)
185
186 # Check if the profile has already been instantiated
187 if profile in _STORAGE_CLIENT_CACHE:
188 return _STORAGE_CLIENT_CACHE[profile], path
189
190 # Create a new StorageClient instance and cache it
191 with _STORAGE_CLIENT_CACHE_LOCK:
192 if profile in _STORAGE_CLIENT_CACHE:
193 return _STORAGE_CLIENT_CACHE[profile], path
194 else:
195 client = StorageClient(config=StorageClientConfig.from_file(profile=profile, telemetry=get_telemetry()))
196 _STORAGE_CLIENT_CACHE[profile] = client
197
198 return client, path
199
200
[docs]
201def open(url: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]:
202 """
203 Open a file at the given URL using the specified mode.
204
205 The function utilizes the :py:class:`multistorageclient.StorageClient` to open a file at the provided path.
206 The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` is retrieved or built.
207
208 :param url: The URL of the file to open. (example: ``msc://profile/prefix/dataset.tar``)
209 :param mode: The file mode to open the file in.
210
211 :return: A file-like object that allows interaction with the file.
212
213 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
214 """
215 client, path = resolve_storage_client(url)
216 return client.open(path, mode, **kwargs)
217
218
[docs]
219def glob(pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]:
220 """
221 Return a list of files matching a pattern.
222
223 This function supports glob-style patterns for matching multiple files within a storage system. The pattern is
224 parsed, and the associated :py:class:`multistorageclient.StorageClient` is used to retrieve the
225 list of matching files.
226
227 :param pattern: The glob-style pattern to match files. (example: ``msc://profile/prefix/**/*.tar``)
228 :param attribute_filter_expression: The attribute filter expression to apply to the result.
229
230 :return: A list of file paths matching the pattern.
231
232 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
233 """
234 client, path = resolve_storage_client(pattern)
235 if not pattern.startswith(MSC_PROTOCOL) and client.profile == DEFAULT_POSIX_PROFILE_NAME:
236 return client.glob(path, include_url_prefix=False, attribute_filter_expression=attribute_filter_expression)
237 else:
238 return client.glob(path, include_url_prefix=True, attribute_filter_expression=attribute_filter_expression)
239
240
[docs]
241def upload_file(url: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
242 """
243 Upload a file to the given URL from a local path.
244
245 The function utilizes the :py:class:`multistorageclient.StorageClient` to upload a file (object) to the
246 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient`
247 is retrieved or built.
248
249 :param url: The URL of the file. (example: ``msc://profile/prefix/dataset.tar``)
250 :param local_path: The local path of the file.
251
252 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
253 """
254 client, path = resolve_storage_client(url)
255 return client.upload_file(remote_path=path, local_path=local_path, attributes=attributes)
256
257
[docs]
258def download_file(url: str, local_path: str) -> None:
259 """
260 Download a file in a given remote_path to a local path
261
262 The function utilizes the :py:class:`multistorageclient.StorageClient` to download a file (object) at the
263 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient`
264 is retrieved or built.
265
266 :param url: The URL of the file to download. (example: ``msc://profile/prefix/dataset.tar``)
267 :param local_path: The local path where the file should be downloaded.
268
269 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
270 """
271 client, path = resolve_storage_client(url)
272 return client.download_file(remote_path=path, local_path=local_path)
273
274
[docs]
275def is_empty(url: str) -> bool:
276 """
277 Checks whether the specified URL contains any objects.
278
279 :param url: The URL to check, typically pointing to a storage location.
280 :return: ``True`` if there are no objects/files under this URL, ``False`` otherwise.
281
282 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
283 """
284 client, path = resolve_storage_client(url)
285 return client.is_empty(path)
286
287
[docs]
288def is_file(url: str) -> bool:
289 """
290 Checks whether the specified url points to a file (rather than a directory or folder).
291
292 The function utilizes the :py:class:`multistorageclient.StorageClient` to check if a file (object) exists
293 at the provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient`
294 is retrieved or built.
295
296 :param url: The URL to check the existence of a file. (example: ``msc://profile/prefix/dataset.tar``)
297 """
298 client, path = resolve_storage_client(url)
299 return client.is_file(path=path)
300
301
[docs]
302def sync(
303 source_url: str,
304 target_url: str,
305 delete_unmatched_files: bool = False,
306 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
307) -> None:
308 """
309 Syncs files from the source storage to the target storage.
310
311 :param source_url: The URL for the source storage.
312 :param target_url: The URL for the target storage.
313 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
314 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
315 """
316 source_client, source_path = resolve_storage_client(source_url)
317 target_client, target_path = resolve_storage_client(target_url)
318 target_client.sync_from(
319 source_client, source_path, target_path, delete_unmatched_files, execution_mode=execution_mode
320 )
321
322
[docs]
323def sync_replicas(
324 source_url: str,
325 replica_indices: Optional[list[int]] = None,
326 delete_unmatched_files: bool = False,
327 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
328) -> None:
329 """
330 Syncs files from the source storage to all the replicas.
331
332 :param source_url: The URL for the source storage.
333 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
334 :param delete_unmatched_files: Whether to delete files at the replicas that are not present at the source.
335 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
336 """
337 source_client, source_path = resolve_storage_client(source_url)
338 source_client.sync_replicas(
339 source_path,
340 replica_indices=replica_indices,
341 delete_unmatched_files=delete_unmatched_files,
342 execution_mode=execution_mode,
343 )
344
345
[docs]
346def list(
347 url: str,
348 start_after: Optional[str] = None,
349 end_at: Optional[str] = None,
350 include_directories: bool = False,
351 attribute_filter_expression: Optional[str] = None,
352 show_attributes: bool = False,
353) -> Iterator[ObjectMetadata]:
354 """
355 Lists the contents of the specified URL prefix.
356
357 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient`
358 for the given URL and returns an iterator of objects (files or directories) stored under the provided prefix.
359
360 :param url: The prefix to list objects under.
361 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
362 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
363 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
364 :param attribute_filter_expression: The attribute filter expression to apply to the result.
365
366 :return: An iterator of :py:class:`ObjectMetadata` objects representing the files (and optionally directories)
367 accessible under the specified URL prefix. The returned keys will always be prefixed with msc://.
368 """
369 client, path = resolve_storage_client(url)
370 return client.list(
371 path=path,
372 start_after=start_after,
373 end_at=end_at,
374 include_directories=include_directories,
375 include_url_prefix=True,
376 attribute_filter_expression=attribute_filter_expression,
377 show_attributes=show_attributes,
378 )
379
380
[docs]
381def write(url: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
382 """
383 Writes an object to the storage provider at the specified path.
384
385 :param url: The path where the object should be written.
386 :param body: The content to write to the object.
387 """
388 client, path = resolve_storage_client(url)
389 client.write(path=path, body=body, attributes=attributes)
390
391
[docs]
392def delete(url: str, recursive: bool = False) -> None:
393 """
394 Deletes the specified object(s) from the storage provider.
395
396 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient`
397 for the given URL and deletes the object(s) at the specified path.
398
399 :param url: The URL of the object to delete. (example: ``msc://profile/prefix/file.txt``)
400 :param recursive: Whether to delete objects in the path recursively.
401 """
402 client, path = resolve_storage_client(url)
403 client.delete(path, recursive=recursive)
404
405
[docs]
406def info(url: str) -> ObjectMetadata:
407 """
408 Retrieves metadata or information about an object stored at the specified path.
409
410 :param url: The URL of the object to retrieve information about. (example: ``msc://profile/prefix/file.txt``)
411
412 :return: An :py:class:`ObjectMetadata` object representing the object's metadata.
413 """
414 client, path = resolve_storage_client(url)
415 return client.info(path)
416
417