Source code for multistorageclient.contrib.async_fs

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import asyncio
 17import os
 18from collections.abc import Callable
 19from concurrent.futures import ThreadPoolExecutor
 20from functools import partial
 21from typing import Any, Union
 22
 23from fsspec.asyn import AsyncFileSystem
 24
 25from ..client import StorageClient
 26from ..file import ObjectFile, PosixFile
 27from ..shortcuts import resolve_storage_client
 28from ..types import MSC_PROTOCOL_NAME
 29
 30_global_thread_pool = ThreadPoolExecutor(max_workers=int(os.getenv("MSC_MAX_WORKERS", "8")))
 31
 32
 33# pyright: reportIncompatibleMethodOverride=false
[docs] 34class MultiStorageAsyncFileSystem(AsyncFileSystem): 35 """ 36 Custom :py:class:`fsspec.asyn.AsyncFileSystem` implementation for MSC protocol (``msc://``). 37 Uses :py:class:`multistorageclient.StorageClient` for backend operations. 38 """ 39 40 protocol = MSC_PROTOCOL_NAME 41 42 def __init__(self, **kwargs: Any) -> None: 43 """ 44 Initializes the :py:class:`MultiStorageAsyncFileSystem`. 45 46 :param kwargs: Additional arguments for the :py:class:`fsspec.asyn.AsyncFileSystem`. 47 """ 48 super().__init__(**kwargs) 49
[docs] 50 def resolve_path_and_storage_client(self, path: Union[str, os.PathLike]) -> tuple[StorageClient, str]: 51 """ 52 Resolves the path and retrieves the associated :py:class:`multistorageclient.StorageClient`. 53 54 :param path: The file path to resolve. 55 56 :return: A tuple containing the :py:class:`multistorageclient.StorageClient` and the resolved path. 57 """ 58 # Use unstrip_protocol to prepend our 'msc://' protocol only if it wasn't given in "path". 59 return resolve_storage_client(self.unstrip_protocol(str(path).lstrip("/")))
60
[docs] 61 @staticmethod 62 def asynchronize_sync(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 63 """ 64 Runs a synchronous function asynchronously using asyncio. 65 66 :param func: The synchronous function to be executed asynchronously. 67 :param args: Positional arguments to pass to the function. 68 :param kwargs: Keyword arguments to pass to the function. 69 70 :return: The result of the asynchronous execution of the function. 71 """ 72 loop = asyncio.get_event_loop() 73 return loop.run_in_executor(_global_thread_pool, partial(func, *args, **kwargs))
74
[docs] 75 def ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[list[dict[str, Any]], list[str]]: 76 """ 77 Lists the contents of a directory. 78 79 :param path: The directory path to list. 80 :param detail: Whether to return detailed information for each file. 81 :param kwargs: Additional arguments for list functionality. 82 83 :return: A list of file names or detailed information depending on the 'detail' argument. 84 """ 85 storage_client, dir_path = self.resolve_path_and_storage_client(path) 86 87 if dir_path and not dir_path.endswith("/"): 88 dir_path += "/" 89 90 objects = storage_client.list(dir_path, include_directories=True) 91 92 if detail: 93 return [ 94 { 95 "name": os.path.join(storage_client.profile, obj.key), 96 "ETag": obj.etag, 97 "LastModified": obj.last_modified, 98 "size": obj.content_length, 99 "ContentType": obj.content_type, 100 "type": obj.type, 101 } 102 for obj in objects 103 ] 104 else: 105 return [os.path.join(storage_client.profile, obj.key) for obj in objects]
106 107 async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> Union[list[dict[str, Any]], list[str]]: 108 """ 109 Asynchronously lists the contents of a directory. 110 111 :param path: The directory path to list. 112 :param detail: Whether to return detailed information for each file. 113 :param kwargs: Additional arguments for list functionality. 114 115 :return: A list of file names or detailed information depending on the 'detail' argument. 116 """ 117 return await self.asynchronize_sync(self.ls, path, detail, **kwargs) 118
[docs] 119 def info(self, path: str, **kwargs: Any) -> dict[str, Any]: 120 """ 121 Retrieves metadata information for a file. 122 123 :param path: The file path to retrieve information for. 124 :param kwargs: Additional arguments for info functionality. 125 126 :return: A dictionary containing file metadata such as ETag, last modified, and size. 127 """ 128 storage_client, file_path = self.resolve_path_and_storage_client(path) 129 metadata = storage_client.info(file_path) 130 return { 131 "name": os.path.join(storage_client.profile, metadata.key), 132 "ETag": metadata.etag, 133 "LastModified": metadata.last_modified, 134 "size": metadata.content_length, 135 "ContentType": metadata.content_type, 136 "type": metadata.type, 137 }
138 139 async def _info(self, path: str, **kwargs: Any) -> dict[str, Any]: 140 """ 141 Asynchronously retrieves metadata information for a file. 142 143 :param path: The file path to retrieve information for. 144 :param kwargs: Additional arguments for info functionality. 145 146 :return: A dictionary containing file metadata such as ETag, last modified, and size. 147 """ 148 return await self.asynchronize_sync(self.info, path, **kwargs) 149
[docs] 150 def rm_file(self, path: str, **kwargs: Any): 151 """ 152 Removes a file. 153 154 :param path: The file or directory path to remove. 155 :param kwargs: Additional arguments for remove functionality. 156 """ 157 storage_client, file_path = self.resolve_path_and_storage_client(path) 158 storage_client.delete(file_path)
159 160 async def _rm_file(self, path: str, **kwargs: Any): 161 """ 162 Asynchronously removes a file. 163 164 :param path: The file or directory path to remove. 165 :param kwargs: Additional arguments for remove functionality. 166 """ 167 return await self.asynchronize_sync(self.rm_file, path, **kwargs) 168
[docs] 169 def cp_file(self, path1: str, path2: str, **kwargs: Any): 170 """ 171 Copies a file from the source path to the destination path. 172 173 :param path1: The source file path. 174 :param path2: The destination file path. 175 :param kwargs: Additional arguments for copy functionality. 176 177 :raises AttributeError: If the source and destination paths are associated with different profiles. 178 """ 179 src_storage_client, src_path = self.resolve_path_and_storage_client(path1) 180 dest_storage_client, dest_path = self.resolve_path_and_storage_client(path2) 181 182 if src_storage_client != dest_storage_client: 183 raise AttributeError( 184 f"Cannot copy file from '{path1}' to '{path2}' because the source and destination paths are associated with different profiles. Cross-profile file operations are not supported." 185 ) 186 187 src_storage_client.copy(src_path, dest_path)
188 189 async def _cp_file(self, path1: str, path2: str, **kwargs: Any): 190 """ 191 Asynchronously copies a file from the source path to the destination path. 192 193 :param path1: The source file path. 194 :param path2: The destination file path. 195 :param kwargs: Additional arguments for copy functionality. 196 197 :raises AttributeError: If the source and destination paths are associated with different profiles. 198 """ 199 await self.asynchronize_sync(self.cp_file, path1, path2, **kwargs) 200
[docs] 201 def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: 202 """ 203 Downloads a file from the remote path to the local path. 204 205 :param rpath: The remote path of the file to download. 206 :param lpath: The local path to store the file. 207 :param kwargs: Additional arguments for file retrieval functionality. 208 """ 209 storage_client, rpath = self.resolve_path_and_storage_client(rpath) 210 storage_client.download_file(rpath, lpath)
211 212 async def _get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: 213 """ 214 Asynchronously downloads a file from the remote path to the local path. 215 216 :param rpath: The remote path of the file to download. 217 :param lpath: The local path to store the file. 218 :param kwargs: Additional arguments for file retrieval functionality. 219 """ 220 await self.asynchronize_sync(self.get_file, rpath, lpath, **kwargs) 221
[docs] 222 def put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None: 223 """ 224 Uploads a local file to the remote path. 225 226 :param lpath: The local path of the file to upload. 227 :param rpath: The remote path to store the file. 228 :param kwargs: Additional arguments for file upload functionality. 229 """ 230 storage_client, rpath = self.resolve_path_and_storage_client(rpath) 231 storage_client.upload_file(rpath, lpath)
232 233 async def _put_file(self, lpath: str, rpath: str, **kwargs: Any) -> None: 234 """ 235 Asynchronously uploads a local file to the remote path. 236 237 :param lpath: The local path of the file to upload. 238 :param rpath: The remote path to store the file. 239 :param kwargs: Additional arguments for file upload functionality. 240 """ 241 await self.asynchronize_sync(self.put_file, lpath, rpath, **kwargs) 242
[docs] 243 def open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 244 """ 245 Opens a file at the given path. 246 247 :param path: The file path to open. 248 :param mode: The mode in which to open the file. 249 :param kwargs: Additional arguments for file opening. 250 251 :return: A ManagedFile object representing the opened file. 252 """ 253 storage_client, path = self.resolve_path_and_storage_client(path) 254 return storage_client.open(path, mode)
255 256 async def _open(self, path: str, mode: str = "rb", **kwargs: Any) -> Union[PosixFile, ObjectFile]: 257 """ 258 Asynchronously opens a file at the given path. 259 260 :param path: The file path to open. 261 :param mode: The mode in which to open the file. 262 :param kwargs: Additional arguments for file opening. 263 264 :return: A ManagedFile object representing the opened file. 265 """ 266 return await self.asynchronize_sync(self.open, path, mode, **kwargs) 267
[docs] 268 def pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None: 269 """ 270 Writes a value (bytes) directly to a file at the given path. 271 272 :param path: The file path to write the value to. 273 :param value: The bytes to write to the file. 274 :param kwargs: Additional arguments for writing functionality. 275 """ 276 storage_client, path = self.resolve_path_and_storage_client(path) 277 storage_client.write(path, value)
278 279 async def _pipe_file(self, path: str, value: bytes, **kwargs: Any) -> None: 280 """ 281 Asynchronously writes a value (bytes) directly to a file at the given path. 282 283 :param path: The file path to write the value to. 284 :param value: The bytes to write to the file. 285 :param kwargs: Additional arguments for writing functionality. 286 """ 287 await self.asynchronize_sync(self.pipe_file, path, value, **kwargs) 288
[docs] 289 def cat_file(self, path: str, **kwargs: Any) -> bytes: 290 """ 291 Reads the contents of a file at the given path. 292 293 :param path: The file path to read from. 294 :param kwargs: Additional arguments for file reading functionality. 295 296 :return: The contents of the file as bytes. 297 """ 298 storage_client, path = self.resolve_path_and_storage_client(path) 299 return storage_client.read(path)
300 301 async def _cat_file(self, path: str, **kwargs: Any) -> bytes: 302 """ 303 Asynchronously reads the contents of a file at the given path. 304 305 :param path: The file path to read from. 306 :param kwargs: Additional arguments for file reading functionality. 307 308 :return: The contents of the file as bytes. 309 """ 310 return await self.asynchronize_sync(self.cat_file, path, **kwargs)