Source code for multistorageclient.signers.cloudfront

  1# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations
 17
 18import base64
 19import json
 20from datetime import datetime, timedelta, timezone
 21from typing import Any
 22
 23try:
 24    from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
 25    from cryptography.hazmat.primitives.hashes import SHA1
 26    from cryptography.hazmat.primitives.serialization import load_pem_private_key
 27except ImportError:
 28    PKCS1v15 = None  # type: ignore[assignment, misc]
 29    SHA1 = None  # type: ignore[assignment, misc]
 30    load_pem_private_key = None  # type: ignore[assignment]
 31
 32from .base import URLSigner
 33
 34DEFAULT_CLOUDFRONT_EXPIRES_IN = 3600
 35
 36
[docs] 37class CloudFrontURLSigner(URLSigner): 38 """ 39 Generates CloudFront signed URLs using an RSA key pair. 40 41 Implements the CloudFront canned-policy signing spec directly so that it 42 has no dependency on ``botocore`` — only the ``cryptography`` package is 43 required (RSA-SHA1 / PKCS1v15). 44 """ 45 46 _key_pair_id: str 47 _private_key_path: str 48 _domain: str 49 _expires_in: int 50 _origin_path: str 51 _origin_prefix: str 52 53 def __init__( 54 self, 55 *, 56 key_pair_id: str, 57 private_key_path: str, 58 domain: str, 59 expires_in: int = DEFAULT_CLOUDFRONT_EXPIRES_IN, 60 origin_path: str = "", 61 **_kwargs: Any, 62 ) -> None: 63 if load_pem_private_key is None: 64 raise ImportError( 65 "The 'cryptography' package is required for CloudFront URL signing. " 66 "Install it with: pip install 'multi-storage-client[cloudfront]'" 67 ) 68 self._key_pair_id = key_pair_id 69 self._private_key_path = private_key_path 70 self._domain = domain.rstrip("/") 71 self._expires_in = expires_in 72 self._origin_path = origin_path.strip("/") 73 self._origin_prefix = self._origin_path + "/" if self._origin_path else "" 74 self._private_key: Any = None 75 76 def _get_private_key(self) -> Any: 77 if self._private_key is None: 78 assert load_pem_private_key is not None 79 with open(self._private_key_path, "rb") as f: 80 self._private_key = load_pem_private_key(f.read(), password=None) 81 return self._private_key 82
[docs] 83 def generate_presigned_url(self, path: str, *, method: str = "GET") -> str: 84 private_key = self._get_private_key() 85 86 effective = path.lstrip("/") 87 if self._origin_prefix: 88 if effective.startswith(self._origin_prefix): 89 effective = effective[len(self._origin_prefix) :] 90 else: 91 raise ValueError( 92 f"Object path {path} does not start with CloudFront origin path {self._origin_path}. " 93 "Ensure the 'origin_path' option matches the distribution's configured origin path." 94 ) 95 96 url = f"https://{self._domain}/{effective}" 97 expiry = datetime.now(timezone.utc) + timedelta(seconds=self._expires_in) 98 epoch = int(expiry.timestamp()) 99 100 policy = json.dumps( 101 {"Statement": [{"Resource": url, "Condition": {"DateLessThan": {"AWS:EpochTime": epoch}}}]}, 102 separators=(",", ":"), 103 ) 104 105 signature = private_key.sign(policy.encode("utf-8"), PKCS1v15(), SHA1()) # type: ignore[union-attr] 106 107 encoded_sig = _cf_b64encode(signature) 108 separator = "&" if "?" in url else "?" 109 return f"{url}{separator}Expires={epoch}&Signature={encoded_sig}&Key-Pair-Id={self._key_pair_id}"
110 111 112def _cf_b64encode(data: bytes) -> str: 113 """CloudFront URL-safe base64: ``+`` → ``-``, ``=`` → ``_``, ``/`` → ``~``. 114 115 Standard base64 characters ``+``, ``=``, and ``/`` are reserved in URLs; 116 CloudFront requires this substitution for signed URL query parameters. 117 See https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-canned-policy.html 118 """ 119 return base64.b64encode(data).decode("ascii").replace("+", "-").replace("=", "_").replace("/", "~")