1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import threading
18from collections.abc import Iterator
19from typing import Any, Optional, Union
20from urllib.parse import ParseResult, urlparse
21
22from .client import StorageClient
23from .config import DEFAULT_POSIX_PROFILE_NAME, SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS, StorageClientConfig
24from .file import ObjectFile, PosixFile
25from .telemetry import Telemetry
26from .types import MSC_PROTOCOL, ObjectMetadata
27
28_TELEMETRY: Optional[Telemetry] = None
29_TELEMETRY_LOCK = threading.Lock()
30_STORAGE_CLIENT_CACHE: dict[str, StorageClient] = {}
31_STORAGE_CLIENT_CACHE_LOCK = threading.Lock()
32
33
[docs]
34def get_telemetry() -> Optional[Telemetry]:
35 """
36 Get the :py:class:``Telemetry`` instance to use for storage clients created by shortcuts.
37
38 :return: A telemetry instance.
39 """
40 global _TELEMETRY
41
42 return _TELEMETRY
43
44
[docs]
45def set_telemetry(telemetry: Optional[Telemetry]) -> None:
46 """
47 Set the :py:class:``Telemetry`` instance to use for storage clients created by shortcuts.
48
49 :param telemetry: A telemetry instance.
50 """
51 global _TELEMETRY
52 global _TELEMETRY_LOCK
53
54 with _TELEMETRY_LOCK:
55 _TELEMETRY = telemetry
56
57
58def _build_full_path(pr: ParseResult) -> str:
59 """
60 Helper function to construct the full path from a parsed URL, including query and fragment.
61
62 :param pr: The parsed URL result from urlparse
63 :return: The complete path including query and fragment if present
64 """
65 path = pr.path
66 if pr.query:
67 path += "?" + pr.query
68 if pr.fragment:
69 path += "#" + pr.fragment
70 return path
71
72
73def _resolve_msc_url(url: str) -> tuple[str, str]:
74 """
75 Resolve an MSC URL to a profile name and path.
76
77 :param url: The MSC URL to resolve (msc://profile/path)
78 :return: A tuple of (profile_name, path)
79 """
80 pr = urlparse(url)
81 profile = pr.netloc
82 path = _build_full_path(pr)
83 if path.startswith("/"):
84 path = path[1:]
85 return profile, path
86
87
88def _resolve_non_msc_url(url: str) -> tuple[str, str]:
89 """
90 Resolve a non-MSC URL to a profile name and path.
91
92 Resolution process:
93 1. First check if MSC config exists
94 2. If config exists, check for possible path mapping
95 3. If no mapping is found, fall back to default POSIX profile
96 for file paths or create an implicit profile based on URL
97
98 :param url: The non-MSC URL to resolve
99 :return: A tuple of (profile_name, path)
100 """
101 # Check if we have a valid path mapping, if so check if there is a matching mapping
102 path_mapping = StorageClientConfig.read_path_mapping()
103 if path_mapping:
104 # Look for a matching mapping
105 possible_mapping = path_mapping.find_mapping(url)
106 if possible_mapping:
107 return possible_mapping # return the profile name and path
108
109 # For file paths, use the default POSIX profile
110 if url.startswith("file://"):
111 pr = urlparse(url)
112 return DEFAULT_POSIX_PROFILE_NAME, _build_full_path(pr)
113 elif url.startswith("/"):
114 url = os.path.normpath(url)
115 return DEFAULT_POSIX_PROFILE_NAME, url
116
117 # For other URL protocol, create an implicit profile name
118 pr = urlparse(url)
119 protocol = pr.scheme.lower()
120
121 # Translate relative paths to absolute paths
122 if not protocol:
123 return DEFAULT_POSIX_PROFILE_NAME, os.path.realpath(url)
124
125 # Validate the protocol is supported
126 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
127 supported_protocols = ", ".join([f"{p}://" for p in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS])
128 raise ValueError(
129 f'Unknown URL "{url}", expecting "{MSC_PROTOCOL}" or a supported protocol ({supported_protocols}) or a POSIX path'
130 )
131
132 # Build the implicit profile name using the format _protocol-bucket
133 bucket = pr.netloc
134 if not bucket:
135 raise ValueError(f'Invalid URL "{url}", bucket name is required for {protocol}:// URLs')
136
137 profile_name = f"_{protocol}-{bucket}"
138
139 # Return normalized path with leading slash removed
140 path = pr.path
141 if path.startswith("/"):
142 path = path[1:]
143
144 return profile_name, path
145
146
[docs]
147def resolve_storage_client(url: str) -> tuple[StorageClient, str]:
148 """
149 Build and return a :py:class:`multistorageclient.StorageClient` instance based on the provided URL or path.
150
151 This function parses the given URL or path and determines the appropriate storage profile and path.
152 It supports URLs with the protocol ``msc://``, as well as POSIX paths or ``file://`` URLs for local file
153 system access. If the profile has already been instantiated, it returns the cached client. Otherwise,
154 it creates a new :py:class:`StorageClient` and caches it.
155
156 The function also supports implicit profiles for non-MSC URLs. When a non-MSC URL is provided (like s3://,
157 gs://, ais://, file://), MSC will infer the storage provider based on the URL protocol and create an implicit
158 profile with the naming convention "_protocol-bucket" (e.g., "_s3-bucket1", "_gs-bucket1").
159
160 Path mapping defined in the MSC configuration are also applied before creating implicit profiles.
161 This allows for explicit mappings between source paths and destination MSC profiles.
162
163 :param url: The storage location, which can be:
164 - A URL in the format ``msc://profile/path`` for object storage.
165 - A local file system path (absolute POSIX path) or a ``file://`` URL.
166 - A non-MSC URL with a supported protocol (s3://, gs://, ais://).
167
168 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` instance and the parsed path.
169
170 :raises ValueError: If the URL's protocol is neither ``msc`` nor a valid local file system path
171 or a supported non-MSC protocol.
172 """
173 global _STORAGE_CLIENT_CACHE
174 global _STORAGE_CLIENT_CACHE_LOCK
175
176 # Normalize the path for msc:/ prefix due to pathlib.Path('msc://')
177 if url.startswith("msc:/") and not url.startswith("msc://"):
178 url = url.replace("msc:/", "msc://")
179
180 # Resolve the URL to a profile name and path
181 profile, path = _resolve_msc_url(url) if url.startswith(MSC_PROTOCOL) else _resolve_non_msc_url(url)
182
183 # Check if the profile has already been instantiated
184 if profile in _STORAGE_CLIENT_CACHE:
185 return _STORAGE_CLIENT_CACHE[profile], path
186
187 # Create a new StorageClient instance and cache it
188 with _STORAGE_CLIENT_CACHE_LOCK:
189 if profile in _STORAGE_CLIENT_CACHE:
190 return _STORAGE_CLIENT_CACHE[profile], path
191 else:
192 client = StorageClient(config=StorageClientConfig.from_file(profile=profile, telemetry=get_telemetry()))
193 _STORAGE_CLIENT_CACHE[profile] = client
194
195 return client, path
196
197
[docs]
198def open(url: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]:
199 """
200 Open a file at the given URL using the specified mode.
201
202 The function utilizes the :py:class:`multistorageclient.StorageClient` to open a file at the provided path.
203 The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient` is retrieved or built.
204
205 :param url: The URL of the file to open. (example: ``msc://profile/prefix/dataset.tar``)
206 :param mode: The file mode to open the file in.
207
208 :return: A file-like object that allows interaction with the file.
209
210 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
211 """
212 client, path = resolve_storage_client(url)
213 return client.open(path, mode, **kwargs)
214
215
[docs]
216def glob(pattern: str) -> list[str]:
217 """
218 Return a list of files matching a pattern.
219
220 This function supports glob-style patterns for matching multiple files within a storage system. The pattern is
221 parsed, and the associated :py:class:`multistorageclient.StorageClient` is used to retrieve the
222 list of matching files.
223
224 :param pattern: The glob-style pattern to match files. (example: ``msc://profile/prefix/**/*.tar``)
225
226 :return: A list of file paths matching the pattern.
227
228 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
229 """
230 client, path = resolve_storage_client(pattern)
231 if not pattern.startswith(MSC_PROTOCOL) and client.profile == DEFAULT_POSIX_PROFILE_NAME:
232 return client.glob(path, include_url_prefix=False)
233 else:
234 return client.glob(path, include_url_prefix=True)
235
236
[docs]
237def upload_file(url: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
238 """
239 Upload a file to the given URL from a local path.
240
241 The function utilizes the :py:class:`multistorageclient.StorageClient` to upload a file (object) to the
242 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient`
243 is retrieved or built.
244
245 :param url: The URL of the file. (example: ``msc://profile/prefix/dataset.tar``)
246 :param local_path: The local path of the file.
247
248 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
249 """
250 client, path = resolve_storage_client(url)
251 return client.upload_file(remote_path=path, local_path=local_path, attributes=attributes)
252
253
[docs]
254def download_file(url: str, local_path: str) -> None:
255 """
256 Download a file in a given remote_path to a local path
257
258 The function utilizes the :py:class:`multistorageclient.StorageClient` to download a file (object) at the
259 provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient`
260 is retrieved or built.
261
262 :param url: The URL of the file to download. (example: ``msc://profile/prefix/dataset.tar``)
263 :param local_path: The local path where the file should be downloaded.
264
265 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
266 """
267 client, path = resolve_storage_client(url)
268 return client.download_file(remote_path=path, local_path=local_path)
269
270
[docs]
271def is_empty(url: str) -> bool:
272 """
273 Checks whether the specified URL contains any objects.
274
275 :param url: The URL to check, typically pointing to a storage location.
276 :return: ``True`` if there are no objects/files under this URL, ``False`` otherwise.
277
278 :raises ValueError: If the URL's protocol does not match the expected protocol ``msc``.
279 """
280 client, path = resolve_storage_client(url)
281 return client.is_empty(path)
282
283
[docs]
284def is_file(url: str) -> bool:
285 """
286 Checks whether the specified url points to a file (rather than a directory or folder).
287
288 The function utilizes the :py:class:`multistorageclient.StorageClient` to check if a file (object) exists
289 at the provided path. The URL is parsed, and the corresponding :py:class:`multistorageclient.StorageClient`
290 is retrieved or built.
291
292 :param url: The URL to check the existence of a file. (example: ``msc://profile/prefix/dataset.tar``)
293 """
294 client, path = resolve_storage_client(url)
295 return client.is_file(path=path)
296
297
[docs]
298def sync(source_url: str, target_url: str, delete_unmatched_files: bool = False) -> None:
299 """
300 Syncs files from the source storage to the target storage.
301
302 :param source_url: The URL for the source storage.
303 :param target_url: The URL for the target storage.
304 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
305 """
306 source_client, source_path = resolve_storage_client(source_url)
307 target_client, target_path = resolve_storage_client(target_url)
308 target_client.sync_from(source_client, source_path, target_path, delete_unmatched_files)
309
310
[docs]
311def list(
312 url: str,
313 start_after: Optional[str] = None,
314 end_at: Optional[str] = None,
315 include_directories: bool = False,
316) -> Iterator[ObjectMetadata]:
317 """
318 Lists the contents of the specified URL prefix.
319
320 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient`
321 for the given URL and returns an iterator of objects (files or directories) stored under the provided prefix.
322
323 :param url: The prefix to list objects under.
324 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
325 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
326 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
327
328 :return: An iterator of :py:class:`ObjectMetadata` objects representing the files (and optionally directories)
329 accessible under the specified URL prefix. The returned keys will always be prefixed with msc://.
330 """
331 client, prefix = resolve_storage_client(url)
332 return client.list(
333 prefix=prefix,
334 start_after=start_after,
335 end_at=end_at,
336 include_directories=include_directories,
337 include_url_prefix=True,
338 )
339
340
[docs]
341def write(url: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
342 """
343 Writes an object to the storage provider at the specified path.
344
345 :param url: The path where the object should be written.
346 :param body: The content to write to the object.
347 """
348 client, path = resolve_storage_client(url)
349 client.write(path=path, body=body, attributes=attributes)
350
351
[docs]
352def delete(url: str, recursive: bool = False) -> None:
353 """
354 Deletes the specified object(s) from the storage provider.
355
356 This function retrieves the corresponding :py:class:`multistorageclient.StorageClient`
357 for the given URL and deletes the object(s) at the specified path.
358
359 :param url: The URL of the object to delete. (example: ``msc://profile/prefix/file.txt``)
360 :param recursive: Whether to delete objects in the path recursively.
361 """
362 client, path = resolve_storage_client(url)
363 client.delete(path, recursive=recursive)
364
365
[docs]
366def info(url: str) -> ObjectMetadata:
367 """
368 Retrieves metadata or information about an object stored at the specified path.
369
370 :param url: The URL of the object to retrieve information about. (example: ``msc://profile/prefix/file.txt``)
371
372 :return: An :py:class:`ObjectMetadata` object representing the object's metadata.
373 """
374 client, path = resolve_storage_client(url)
375 return client.info(path)
376
377