1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import asyncio
17import os
18from collections.abc import Callable
19from concurrent.futures import ThreadPoolExecutor
20from functools import partial
21from typing import Any, Union
22
23from fsspec.asyn import AsyncFileSystem
24
25from ..client import StorageClient
26from ..file import ObjectFile, PosixFile
27from ..shortcuts import resolve_storage_client
28from ..types import MSC_PROTOCOL_NAME
29
30_global_thread_pool = ThreadPoolExecutor(max_workers=int(os.getenv("MSC_MAX_WORKERS", "8")))
31
32
33# pyright: reportIncompatibleMethodOverride=false
[docs]
34class MultiStorageAsyncFileSystem(AsyncFileSystem):
35 """
36 Custom :py:class:`fsspec.asyn.AsyncFileSystem` implementation for MSC protocol (``msc://``).
37 Uses :py:class:`multistorageclient.StorageClient` for backend operations.
38 """
39
40 protocol = MSC_PROTOCOL_NAME
41
42 def __init__(self, **kwargs: Any) -> None:
43 """
44 Initializes the :py:class:`MultiStorageAsyncFileSystem`.
45
46 :param kwargs: Additional arguments for the :py:class:`fsspec.asyn.AsyncFileSystem`.
47 """
48 super().__init__(**kwargs)
49
[docs]
50 def resolve_path_and_storage_client(self, path: Union[str, os.PathLike]) -> tuple[StorageClient, str]:
51 """
52 Resolves the path and retrieves the associated :py:class:`multistorageclient.StorageClient`.
53
54 :param path: The file path to resolve.
55
56 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` and the resolved path.
57 """
58 # Use unstrip_protocol to prepend our 'msc://' protocol only if it wasn't given in "path".
59 return resolve_storage_client(self.unstrip_protocol(str(path).lstrip("/")))
60
[docs]
61 @staticmethod
62 def asynchronize_sync(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
63 """
64 Runs a synchronous function asynchronously using asyncio.
65
66 :param func: The synchronous function to be executed asynchronously.
67 :param args: Positional arguments to pass to the function.
68 :param kwargs: Keyword arguments to pass to the function.
69
70 :return: The result of the asynchronous execution of the function.
71 """
72 loop = asyncio.get_event_loop()
73 return loop.run_in_executor(_global_thread_pool, partial(func, *args, **kwargs))
74
[docs]
75 def ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[list[dict[str, Any]], list[str]]:
76 """
77 Lists the contents of a directory.
78
79 :param path: The directory path to list.
80 :param detail: Whether to return detailed information for each file.
81 :param kwargs: Additional arguments for list functionality.
82
83 :return: A list of file names or detailed information depending on the 'detail' argument.
84 """
85 storage_client, dir_path = self.resolve_path_and_storage_client(path)
86
87 if dir_path and not dir_path.endswith("/"):
88 dir_path += "/"
89
90 objects = storage_client.list(dir_path, include_directories=True)
91
92 if detail:
93 return [
94 {
95 "name": os.path.join(storage_client.profile, obj.key),
96 "ETag": obj.etag,
97 "LastModified": obj.last_modified,
98 "size": obj.content_length,
99 "ContentType": obj.content_type,
100 "type": obj.type,
101 }
102 for obj in objects
103 ]
104 else:
105 return [os.path.join(storage_client.profile, obj.key) for obj in objects]
106
107 async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[list[dict[str, Any]], list[str]]:
108 """
109 Asynchronously lists the contents of a directory.
110
111 :param path: The directory path to list.
112 :param detail: Whether to return detailed information for each file.
113 :param kwargs: Additional arguments for list functionality.
114
115 :return: A list of file names or detailed information depending on the 'detail' argument.
116 """
117 return await self.asynchronize_sync(self.ls, path, detail, **kwargs)
118
[docs]
119 def info(self, path: str, **kwargs: Any) -> dict[str, Any]:
120 """
121 Retrieves metadata information for a file.
122
123 :param path: The file path to retrieve information for.
124 :param kwargs: Additional arguments for info functionality.
125
126 :return: A dictionary containing file metadata such as ETag, last modified, and size.
127 """
128 storage_client, file_path = self.resolve_path_and_storage_client(path)
129 metadata = storage_client.info(file_path)
130 return {
131 "name": os.path.join(storage_client.profile, metadata.key),
132 "ETag": metadata.etag,
133 "LastModified": metadata.last_modified,
134 "size": metadata.content_length,
135 "ContentType": metadata.content_type,
136 "type": metadata.type,
137 }
138
139 async def _info(self, path: str, **kwargs: Any) -> dict[str, Any]:
140 """
141 Asynchronously retrieves metadata information for a file.
142
143 :param path: The file path to retrieve information for.
144 :param kwargs: Additional arguments for info functionality.
145
146 :return: A dictionary containing file metadata such as ETag, last modified, and size.
147 """
148 return await self.asynchronize_sync(self.info, path, **kwargs)
149
[docs]
150 def rm_file(self, path: str, **kwargs: Any):
151 """
152 Removes a file.
153
154 :param path: The file or directory path to remove.
155 :param kwargs: Additional arguments for remove functionality.
156 """
157 storage_client, file_path = self.resolve_path_and_storage_client(path)
158 storage_client.delete(file_path)
159
160 async def _rm_file(self, path: str, **kwargs: Any):
161 """
162 Asynchronously removes a file.
163
164 :param path: The file or directory path to remove.
165 :param kwargs: Additional arguments for remove functionality.
166 """
167 return await self.asynchronize_sync(self.rm_file, path, **kwargs)
168
[docs]
169 def cp_file(self, path1: str, path2: str, **kwargs: Any):
170 """
171 Copies a file from the source path to the destination path.
172
173 :param path1: The source file path.
174 :param path2: The destination file path.
175 :param kwargs: Additional arguments for copy functionality.
176
177 :raises AttributeError: If the source and destination paths are associated with different profiles.
178 """
179 src_storage_client, src_path = self.resolve_path_and_storage_client(path1)
180 dest_storage_client, dest_path = self.resolve_path_and_storage_client(path2)
181
182 if src_storage_client != dest_storage_client:
183 raise AttributeError(
184 f"Cannot copy file from '{path1}' to '{path2}' because the source and destination paths are associated with different profiles. Cross-profile file operations are not supported."
185 )
186
187 src_storage_client.copy(src_path, dest_path)
188
189 async def _cp_file(self, path1: str, path2: str, **kwargs: Any):
190 """
191 Asynchronously copies a file from the source path to the destination path.
192
193 :param path1: The source file path.
194 :param path2: The destination file path.
195 :param kwargs: Additional arguments for copy functionality.
196
197 :raises AttributeError: If the source and destination paths are associated with different profiles.
198 """
199 await self.asynchronize_sync(self.cp_file, path1, path2, **kwargs)
200
[docs]
201 def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None:
202 """
203 Downloads a file from the remote path to the local path.
204
205 :param rpath: The remote path of the file to download.
206 :param lpath: The local path to store the file.
207 :param kwargs: Additional arguments for file retrieval functionality.
208 """
209 storage_client, rpath = self.resolve_path_and_storage_client(rpath)
210 storage_client.download_file(rpath, lpath)
211
212 async def _get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None:
213 """
214 Asynchronously downloads a file from the remote path to the local path.
215
216 :param rpath: The remote path of the file to download.
217 :param lpath: The local path to store the file.
218 :param kwargs: Additional arguments for file retrieval functionality.
219 """
220 await self.asynchronize_sync(self.get_file, rpath, lpath, **kwargs)
221
[docs]
222 def put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None:
223 """
224 Uploads a local file to the remote path.
225
226 :param lpath: The local path of the file to upload.
227 :param rpath: The remote path to store the file.
228 :param kwargs: Additional arguments for file upload functionality.
229 """
230 storage_client, rpath = self.resolve_path_and_storage_client(rpath)
231 storage_client.upload_file(rpath, lpath)
232
233 async def _put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None:
234 """
235 Asynchronously uploads a local file to the remote path.
236
237 :param lpath: The local path of the file to upload.
238 :param rpath: The remote path to store the file.
239 :param kwargs: Additional arguments for file upload functionality.
240 """
241 await self.asynchronize_sync(self.put_file, lpath, rpath, **kwargs)
242
[docs]
243 def open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]:
244 """
245 Opens a file at the given path.
246
247 :param path: The file path to open.
248 :param mode: The mode in which to open the file.
249 :param kwargs: Additional arguments for file opening.
250
251 :return: A ManagedFile object representing the opened file.
252 """
253 storage_client, path = self.resolve_path_and_storage_client(path)
254 return storage_client.open(path, mode)
255
256 async def _open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]:
257 """
258 Asynchronously opens a file at the given path.
259
260 :param path: The file path to open.
261 :param mode: The mode in which to open the file.
262 :param kwargs: Additional arguments for file opening.
263
264 :return: A ManagedFile object representing the opened file.
265 """
266 return await self.asynchronize_sync(self.open, path, mode, **kwargs)
267
[docs]
268 def pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None:
269 """
270 Writes a value (bytes) directly to a file at the given path.
271
272 :param path: The file path to write the value to.
273 :param value: The bytes to write to the file.
274 :param kwargs: Additional arguments for writing functionality.
275 """
276 storage_client, path = self.resolve_path_and_storage_client(path)
277 storage_client.write(path, value)
278
279 async def _pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None:
280 """
281 Asynchronously writes a value (bytes) directly to a file at the given path.
282
283 :param path: The file path to write the value to.
284 :param value: The bytes to write to the file.
285 :param kwargs: Additional arguments for writing functionality.
286 """
287 await self.asynchronize_sync(self.pipe_file, path, value, **kwargs)
288
[docs]
289 def cat_file(self, path: str, **kwargs: Any) -> bytes:
290 """
291 Reads the contents of a file at the given path.
292
293 :param path: The file path to read from.
294 :param kwargs: Additional arguments for file reading functionality.
295
296 :return: The contents of the file as bytes.
297 """
298 storage_client, path = self.resolve_path_and_storage_client(path)
299 return storage_client.read(path)
300
301 async def _cat_file(self, path: str, **kwargs: Any) -> bytes:
302 """
303 Asynchronously reads the contents of a file at the given path.
304
305 :param path: The file path to read from.
306 :param kwargs: Additional arguments for file reading functionality.
307
308 :return: The contents of the file as bytes.
309 """
310 return await self.asynchronize_sync(self.cat_file, path, **kwargs)