Source code for multistorageclient.contrib.async_fs

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import asyncio
 17import atexit
 18import os
 19from collections.abc import Callable
 20from concurrent.futures import ThreadPoolExecutor
 21from functools import partial
 22from typing import Any, Union
 23
 24from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks
 25
 26from ..client import StorageClient
 27from ..file import ObjectFile, PosixFile
 28from ..shortcuts import resolve_storage_client
 29from ..types import MSC_PROTOCOL_NAME
 30
 31_GLOBAL_THREAD_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("MSC_MAX_WORKERS", "8")))
 32
 33atexit.register(lambda: _GLOBAL_THREAD_POOL.shutdown(wait=False))
 34
 35
 36# pyright: reportIncompatibleMethodOverride=false
[docs] 37class MultiStorageAsyncFileSystem(AsyncFileSystem): 38 """ 39 Custom :py:class:`fsspec.asyn.AsyncFileSystem` implementation for MSC protocol (``msc://``). 40 Uses :py:class:`multistorageclient.StorageClient` for backend operations. 41 """ 42 43 protocol = MSC_PROTOCOL_NAME 44 45 def __init__(self, **kwargs: Any) -> None: 46 """ 47 Initializes the :py:class:`MultiStorageAsyncFileSystem`. 48 49 :param kwargs: Additional arguments for the :py:class:`fsspec.asyn.AsyncFileSystem`. 50 """ 51 super().__init__(**kwargs) 52
[docs] 53 def resolve_path_and_storage_client(self, path: Union[str, os.PathLike]) -> tuple[StorageClient, str]: 54 """ 55 Resolves the path and retrieves the associated :py:class:`multistorageclient.StorageClient`. 56 57 :param path: The file path to resolve. 58 59 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` and the resolved path. 60 """ 61 # Use unstrip_protocol to prepend our 'msc://' protocol only if it wasn't given in "path". 62 return resolve_storage_client(self.unstrip_protocol(str(path).lstrip("/")))
63
[docs] 64 @staticmethod 65 def asynchronize_sync(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 66 """ 67 Runs a synchronous function asynchronously using asyncio. 68 69 :param func: The synchronous function to be executed asynchronously. 70 :param args: Positional arguments to pass to the function. 71 :param kwargs: Keyword arguments to pass to the function. 72 73 :return: The result of the asynchronous execution of the function. 74 """ 75 loop = asyncio.get_event_loop() 76 return loop.run_in_executor(_GLOBAL_THREAD_POOL, partial(func, *args, **kwargs))
77
[docs] 78 def ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[list[dict[str, Any]], list[str]]: 79 """ 80 Lists the contents of a directory. 81 82 :param path: The directory path to list. 83 :param detail: Whether to return detailed information for each file. 84 :param kwargs: Additional arguments for list functionality. 85 86 :return: A list of file names or detailed information depending on the 'detail' argument. 87 """ 88 storage_client, dir_path = self.resolve_path_and_storage_client(path) 89 90 if dir_path and not dir_path.endswith("/"): 91 dir_path += "/" 92 93 objects = storage_client.list(path=dir_path, include_directories=True) 94 95 if detail: 96 return [ 97 { 98 "name": os.path.join(storage_client.profile, obj.key), 99 "ETag": obj.etag, 100 "LastModified": obj.last_modified, 101 "size": obj.content_length, 102 "ContentType": obj.content_type, 103 "type": obj.type, 104 } 105 for obj in objects 106 ] 107 else: 108 return [os.path.join(storage_client.profile, obj.key) for obj in objects]
109 110 async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[list[dict[str, Any]], list[str]]: 111 """ 112 Asynchronously lists the contents of a directory. 113 114 :param path: The directory path to list. 115 :param detail: Whether to return detailed information for each file. 116 :param kwargs: Additional arguments for list functionality. 117 118 :return: A list of file names or detailed information depending on the 'detail' argument. 119 """ 120 return await self.asynchronize_sync(self.ls, path, detail, **kwargs) 121
[docs] 122 def info(self, path: str, **kwargs: Any) -> dict[str, Any]: 123 """ 124 Retrieves metadata information for a file. 125 126 :param path: The file path to retrieve information for. 127 :param kwargs: Additional arguments for info functionality. 128 129 :return: A dictionary containing file metadata such as ETag, last modified, and size. 130 """ 131 storage_client, file_path = self.resolve_path_and_storage_client(path) 132 metadata = storage_client.info(file_path) 133 return { 134 "name": os.path.join(storage_client.profile, metadata.key), 135 "ETag": metadata.etag, 136 "LastModified": metadata.last_modified, 137 "size": metadata.content_length, 138 "ContentType": metadata.content_type, 139 "type": metadata.type, 140 }
141 142 async def _info(self, path: str, **kwargs: Any) -> dict[str, Any]: 143 """ 144 Asynchronously retrieves metadata information for a file. 145 146 :param path: The file path to retrieve information for. 147 :param kwargs: Additional arguments for info functionality. 148 149 :return: A dictionary containing file metadata such as ETag, last modified, and size. 150 """ 151 return await self.asynchronize_sync(self.info, path, **kwargs) 152
[docs] 153 def rm_file(self, path: str, **kwargs: Any): 154 """ 155 Removes a file. 156 157 :param path: The file or directory path to remove. 158 :param kwargs: Additional arguments for remove functionality. 159 """ 160 storage_client, file_path = self.resolve_path_and_storage_client(path) 161 recursive = kwargs.get("recursive", False) 162 storage_client.delete(file_path, recursive=recursive)
163 164 async def _rm_file(self, path: str, **kwargs: Any): 165 """ 166 Asynchronously removes a file. 167 168 :param path: The file or directory path to remove. 169 :param kwargs: Additional arguments for remove functionality. 170 """ 171 return await self.asynchronize_sync(self.rm_file, path, **kwargs) 172 173 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 174 """ 175 Asynchronously removes a file or directory. 176 Instead of using the implementation in the parent class to expand the path and parallel delete the files, 177 we explicitly pass down the recursive value and use the delete method in the StorageClient to handle the directory deletion. 178 179 :param path: The file or directory path to remove. 180 :param recursive: Whether to recursively remove directories. 181 :param batch_size: The number of files to process in each batch. 182 :param kwargs: Additional arguments for remove functionality. 183 """ 184 185 if "recursive" not in kwargs: 186 kwargs["recursive"] = recursive 187 188 return await _run_coros_in_chunks( 189 [self._rm_file(path, **kwargs)], 190 batch_size=-1, # no throttling 191 nofiles=True, 192 ) 193
[docs] 194 def cp_file(self, path1: str, path2: str, **kwargs: Any): 195 """ 196 Copies a file from the source path to the destination path. 197 198 :param path1: The source file path. 199 :param path2: The destination file path. 200 :param kwargs: Additional arguments for copy functionality. 201 202 :raises AttributeError: If the source and destination paths are associated with different profiles. 203 """ 204 src_storage_client, src_path = self.resolve_path_and_storage_client(path1) 205 dest_storage_client, dest_path = self.resolve_path_and_storage_client(path2) 206 207 if src_storage_client != dest_storage_client: 208 raise AttributeError( 209 f"Cannot copy file from '{path1}' to '{path2}' because the source and destination paths are associated with different profiles. Cross-profile file operations are not supported." 210 ) 211 212 src_storage_client.copy(src_path, dest_path)
213 214 async def _cp_file(self, path1: str, path2: str, **kwargs: Any): 215 """ 216 Asynchronously copies a file from the source path to the destination path. 217 218 :param path1: The source file path. 219 :param path2: The destination file path. 220 :param kwargs: Additional arguments for copy functionality. 221 222 :raises AttributeError: If the source and destination paths are associated with different profiles. 223 """ 224 await self.asynchronize_sync(self.cp_file, path1, path2, **kwargs) 225
[docs] 226 def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: 227 """ 228 Downloads a file from the remote path to the local path. 229 230 :param rpath: The remote path of the file to download. 231 :param lpath: The local path to store the file. 232 :param kwargs: Additional arguments for file retrieval functionality. 233 """ 234 storage_client, rpath = self.resolve_path_and_storage_client(rpath) 235 storage_client.download_file(rpath, lpath)
236 237 async def _get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: 238 """ 239 Asynchronously downloads a file from the remote path to the local path. 240 241 :param rpath: The remote path of the file to download. 242 :param lpath: The local path to store the file. 243 :param kwargs: Additional arguments for file retrieval functionality. 244 """ 245 await self.asynchronize_sync(self.get_file, rpath, lpath, **kwargs) 246
[docs] 247 def put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None: 248 """ 249 Uploads a local file to the remote path. 250 251 :param lpath: The local path of the file to upload. 252 :param rpath: The remote path to store the file. 253 :param kwargs: Additional arguments for file upload functionality. 254 """ 255 attributes_dict = kwargs.pop("attributes", {}) 256 storage_client, rpath = self.resolve_path_and_storage_client(rpath) 257 storage_client.upload_file(rpath, lpath, attributes=attributes_dict)
258 259 async def _put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None: 260 """ 261 Asynchronously uploads a local file to the remote path. 262 263 :param lpath: The local path of the file to upload. 264 :param rpath: The remote path to store the file. 265 :param kwargs: Additional arguments for file upload functionality. 266 """ 267 await self.asynchronize_sync(self.put_file, lpath, rpath, **kwargs) 268
[docs] 269 def open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 270 """ 271 Opens a file at the given path. 272 273 :param path: The file path to open. 274 :param mode: The mode in which to open the file. 275 :param kwargs: Additional arguments passed to client.open (e.g., check_source_version, prefetch_file, etc.) 276 277 :return: A ManagedFile object representing the opened file. 278 """ 279 storage_client, path = self.resolve_path_and_storage_client(path) 280 return storage_client.open(path, mode, **kwargs)
281 282 async def _open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 283 """ 284 Asynchronously opens a file at the given path. 285 286 :param path: The file path to open. 287 :param mode: The mode in which to open the file. 288 :param kwargs: Additional arguments for file opening. 289 290 :return: A ManagedFile object representing the opened file. 291 """ 292 return await self.asynchronize_sync(self.open, path, mode, **kwargs) 293
[docs] 294 def pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None: 295 """ 296 Writes a value (bytes) directly to a file at the given path. 297 298 :param path: The file path to write the value to. 299 :param value: The bytes to write to the file. 300 :param kwargs: Additional arguments for writing functionality (e.g., attributes dict for custom metadata). 301 """ 302 attributes_dict = kwargs.pop("attributes", {}) 303 storage_client, path = self.resolve_path_and_storage_client(path) 304 storage_client.write(path, value, attributes=attributes_dict)
305 306 async def _pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None: 307 """ 308 Asynchronously writes a value (bytes) directly to a file at the given path. 309 310 :param path: The file path to write the value to. 311 :param value: The bytes to write to the file. 312 :param kwargs: Additional arguments for writing functionality. 313 """ 314 await self.asynchronize_sync(self.pipe_file, path, value, **kwargs) 315
[docs] 316 def cat_file(self, path: str, **kwargs: Any) -> bytes: 317 """ 318 Reads the contents of a file at the given path. 319 320 :param path: The file path to read from. 321 :param kwargs: Additional arguments for file reading functionality. 322 323 :return: The contents of the file as bytes. 324 """ 325 storage_client, path = self.resolve_path_and_storage_client(path) 326 return storage_client.read(path)
327 328 async def _cat_file(self, path: str, **kwargs: Any) -> bytes: 329 """ 330 Asynchronously reads the contents of a file at the given path. 331 332 :param path: The file path to read from. 333 :param kwargs: Additional arguments for file reading functionality. 334 335 :return: The contents of the file as bytes. 336 """ 337 return await self.asynchronize_sync(self.cat_file, path, **kwargs)