Source code for multistorageclient.contrib.async_fs

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import asyncio
 17import os
 18from concurrent.futures import ThreadPoolExecutor
 19from functools import partial
 20from typing import Any, Callable, Dict, List, Tuple, Union
 21
 22from fsspec.asyn import AsyncFileSystem
 23
 24from ..client import StorageClient
 25from ..file import ObjectFile, PosixFile
 26from ..shortcuts import resolve_storage_client
 27from ..types import MSC_PROTOCOL_NAME
 28
 29_global_thread_pool = ThreadPoolExecutor(max_workers=int(os.getenv("MSC_MAX_WORKERS", "8")))
 30
 31
 32# pyright: reportIncompatibleMethodOverride=false
[docs] 33class MultiAsyncFileSystem(AsyncFileSystem): 34 """ 35 Custom :py:class:`fsspec.asyn.AsyncFileSystem` implementation for MSC protocol (``msc://``). 36 Uses :py:class:`multistorageclient.StorageClient` for backend operations. 37 """ 38 39 protocol = MSC_PROTOCOL_NAME 40 41 def __init__(self, **kwargs: Any) -> None: 42 """ 43 Initializes the :py:class:`MultiAsyncFileSystem`. 44 45 :param kwargs: Additional arguments for the :py:class:`fsspec.asyn.AsyncFileSystem`. 46 """ 47 super().__init__(**kwargs) 48
[docs] 49 def resolve_path_and_storage_client(self, path: str) -> Tuple[StorageClient, str]: 50 """ 51 Resolves the path and retrieves the associated :py:class:`multistorageclient.StorageClient`. 52 53 :param path: The file path to resolve. 54 55 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` and the resolved path. 56 """ 57 # Use unstrip_protocol to prepend our 'msc://' protocol only if it wasn't given in "path". 58 return resolve_storage_client(self.unstrip_protocol(path.lstrip("/")))
59
[docs] 60 @staticmethod 61 def asynchronize_sync(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 62 """ 63 Runs a synchronous function asynchronously using asyncio. 64 65 :param func: The synchronous function to be executed asynchronously. 66 :param args: Positional arguments to pass to the function. 67 :param kwargs: Keyword arguments to pass to the function. 68 69 :return: The result of the asynchronous execution of the function. 70 """ 71 loop = asyncio.get_event_loop() 72 return loop.run_in_executor(_global_thread_pool, partial(func, *args, **kwargs))
73
[docs] 74 def ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[List[Dict[str, Any]], List[str]]: 75 """ 76 Lists the contents of a directory. 77 78 :param path: The directory path to list. 79 :param detail: Whether to return detailed information for each file. 80 :param kwargs: Additional arguments for list functionality. 81 82 :return: A list of file names or detailed information depending on the 'detail' argument. 83 """ 84 storage_client, dir_path = self.resolve_path_and_storage_client(path) 85 86 if dir_path and not dir_path.endswith("/"): 87 dir_path += "/" 88 89 objects = storage_client.list(dir_path, include_directories=True) 90 91 if detail: 92 return [ 93 { 94 "name": os.path.join(storage_client.profile, obj.key), 95 "ETag": obj.etag, 96 "LastModified": obj.last_modified, 97 "size": obj.content_length, 98 "ContentType": obj.content_type, 99 "type": obj.type, 100 } 101 for obj in objects 102 ] 103 else: 104 return [os.path.join(storage_client.profile, obj.key) for obj in objects]
105 106 async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[List[Dict[str, Any]], List[str]]: 107 """ 108 Asynchronously lists the contents of a directory. 109 110 :param path: The directory path to list. 111 :param detail: Whether to return detailed information for each file. 112 :param kwargs: Additional arguments for list functionality. 113 114 :return: A list of file names or detailed information depending on the 'detail' argument. 115 """ 116 return await self.asynchronize_sync(self.ls, path, detail, **kwargs) 117
[docs] 118 def info(self, path: str, **kwargs: Any) -> Dict[str, Any]: 119 """ 120 Retrieves metadata information for a file. 121 122 :param path: The file path to retrieve information for. 123 :param kwargs: Additional arguments for info functionality. 124 125 :return: A dictionary containing file metadata such as ETag, last modified, and size. 126 """ 127 storage_client, file_path = self.resolve_path_and_storage_client(path) 128 metadata = storage_client.info(file_path) 129 return { 130 "name": os.path.join(storage_client.profile, metadata.key), 131 "ETag": metadata.etag, 132 "LastModified": metadata.last_modified, 133 "size": metadata.content_length, 134 "ContentType": metadata.content_type, 135 "type": metadata.type, 136 }
137 138 async def _info(self, path: str, **kwargs: Any) -> Dict[str, Any]: 139 """ 140 Asynchronously retrieves metadata information for a file. 141 142 :param path: The file path to retrieve information for. 143 :param kwargs: Additional arguments for info functionality. 144 145 :return: A dictionary containing file metadata such as ETag, last modified, and size. 146 """ 147 return await self.asynchronize_sync(self.info, path, **kwargs) 148
[docs] 149 def rm_file(self, path: str, **kwargs: Any): 150 """ 151 Removes a file. 152 153 :param path: The file or directory path to remove. 154 :param kwargs: Additional arguments for remove functionality. 155 """ 156 storage_client, file_path = self.resolve_path_and_storage_client(path) 157 storage_client.delete(file_path)
158 159 async def _rm_file(self, path: str, **kwargs: Any): 160 """ 161 Asynchronously removes a file. 162 163 :param path: The file or directory path to remove. 164 :param kwargs: Additional arguments for remove functionality. 165 """ 166 return await self.asynchronize_sync(self.rm_file, path, **kwargs) 167
[docs] 168 def cp_file(self, path1: str, path2: str, **kwargs: Any): 169 """ 170 Copies a file from the source path to the destination path. 171 172 :param path1: The source file path. 173 :param path2: The destination file path. 174 :param kwargs: Additional arguments for copy functionality. 175 176 :raises AttributeError: If the source and destination paths are associated with different profiles. 177 """ 178 src_storage_client, src_path = self.resolve_path_and_storage_client(path1) 179 dest_storage_client, dest_path = self.resolve_path_and_storage_client(path2) 180 181 if src_storage_client != dest_storage_client: 182 raise AttributeError( 183 f"Cannot copy file from '{path1}' to '{path2}' because the source and destination paths are associated with different profiles. Cross-profile file operations are not supported." 184 ) 185 186 src_storage_client.copy(src_path, dest_path)
187 188 async def _cp_file(self, path1: str, path2: str, **kwargs: Any): 189 """ 190 Asynchronously copies a file from the source path to the destination path. 191 192 :param path1: The source file path. 193 :param path2: The destination file path. 194 :param kwargs: Additional arguments for copy functionality. 195 196 :raises AttributeError: If the source and destination paths are associated with different profiles. 197 """ 198 await self.asynchronize_sync(self.cp_file, path1, path2, **kwargs) 199
[docs] 200 def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: 201 """ 202 Downloads a file from the remote path to the local path. 203 204 :param rpath: The remote path of the file to download. 205 :param lpath: The local path to store the file. 206 :param kwargs: Additional arguments for file retrieval functionality. 207 """ 208 storage_client, rpath = self.resolve_path_and_storage_client(rpath) 209 storage_client.download_file(rpath, lpath)
210 211 async def _get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: 212 """ 213 Asynchronously downloads a file from the remote path to the local path. 214 215 :param rpath: The remote path of the file to download. 216 :param lpath: The local path to store the file. 217 :param kwargs: Additional arguments for file retrieval functionality. 218 """ 219 await self.asynchronize_sync(self.get_file, rpath, lpath, **kwargs) 220
[docs] 221 def put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None: 222 """ 223 Uploads a local file to the remote path. 224 225 :param lpath: The local path of the file to upload. 226 :param rpath: The remote path to store the file. 227 :param kwargs: Additional arguments for file upload functionality. 228 """ 229 storage_client, rpath = self.resolve_path_and_storage_client(rpath) 230 storage_client.upload_file(rpath, lpath)
231 232 async def _put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None: 233 """ 234 Asynchronously uploads a local file to the remote path. 235 236 :param lpath: The local path of the file to upload. 237 :param rpath: The remote path to store the file. 238 :param kwargs: Additional arguments for file upload functionality. 239 """ 240 await self.asynchronize_sync(self.put_file, lpath, rpath, **kwargs) 241
[docs] 242 def open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 243 """ 244 Opens a file at the given path. 245 246 :param path: The file path to open. 247 :param mode: The mode in which to open the file. 248 :param kwargs: Additional arguments for file opening. 249 250 :return: A ManagedFile object representing the opened file. 251 """ 252 storage_client, path = self.resolve_path_and_storage_client(path) 253 return storage_client.open(path, mode)
254 255 async def _open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 256 """ 257 Asynchronously opens a file at the given path. 258 259 :param path: The file path to open. 260 :param mode: The mode in which to open the file. 261 :param kwargs: Additional arguments for file opening. 262 263 :return: A ManagedFile object representing the opened file. 264 """ 265 return await self.asynchronize_sync(self.open, path, mode, **kwargs) 266
[docs] 267 def pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None: 268 """ 269 Writes a value (bytes) directly to a file at the given path. 270 271 :param path: The file path to write the value to. 272 :param value: The bytes to write to the file. 273 :param kwargs: Additional arguments for writing functionality. 274 """ 275 storage_client, path = self.resolve_path_and_storage_client(path) 276 storage_client.write(path, value)
277 278 async def _pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None: 279 """ 280 Asynchronously writes a value (bytes) directly to a file at the given path. 281 282 :param path: The file path to write the value to. 283 :param value: The bytes to write to the file. 284 :param kwargs: Additional arguments for writing functionality. 285 """ 286 await self.asynchronize_sync(self.pipe_file, path, value, **kwargs) 287
[docs] 288 def cat_file(self, path: str, **kwargs: Any) -> bytes: 289 """ 290 Reads the contents of a file at the given path. 291 292 :param path: The file path to read from. 293 :param kwargs: Additional arguments for file reading functionality. 294 295 :return: The contents of the file as bytes. 296 """ 297 storage_client, path = self.resolve_path_and_storage_client(path) 298 return storage_client.read(path)
299 300 async def _cat_file(self, path: str, **kwargs: Any) -> bytes: 301 """ 302 Asynchronously reads the contents of a file at the given path. 303 304 :param path: The file path to read from. 305 :param kwargs: Additional arguments for file reading functionality. 306 307 :return: The contents of the file as bytes. 308 """ 309 return await self.asynchronize_sync(self.cat_file, path, **kwargs)