Source code for multistorageclient.signers.cloudfront
1# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import annotations
17
18import base64
19import json
20from datetime import datetime, timedelta, timezone
21from typing import Any
22
23try:
24 from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
25 from cryptography.hazmat.primitives.hashes import SHA1
26 from cryptography.hazmat.primitives.serialization import load_pem_private_key
27except ImportError:
28 PKCS1v15 = None # type: ignore[assignment, misc]
29 SHA1 = None # type: ignore[assignment, misc]
30 load_pem_private_key = None # type: ignore[assignment]
31
32from .base import URLSigner
33
34DEFAULT_CLOUDFRONT_EXPIRES_IN = 3600
35
36
[docs]
37class CloudFrontURLSigner(URLSigner):
38 """
39 Generates CloudFront signed URLs using an RSA key pair.
40
41 Implements the CloudFront canned-policy signing spec directly so that it
42 has no dependency on ``botocore`` — only the ``cryptography`` package is
43 required (RSA-SHA1 / PKCS1v15).
44 """
45
46 _key_pair_id: str
47 _private_key_path: str
48 _domain: str
49 _expires_in: int
50
51 def __init__(
52 self,
53 *,
54 key_pair_id: str,
55 private_key_path: str,
56 domain: str,
57 expires_in: int = DEFAULT_CLOUDFRONT_EXPIRES_IN,
58 **_kwargs: Any,
59 ) -> None:
60 if load_pem_private_key is None:
61 raise ImportError(
62 "The 'cryptography' package is required for CloudFront URL signing. "
63 "Install it with: pip install 'multi-storage-client[cloudfront]'"
64 )
65 self._key_pair_id = key_pair_id
66 self._private_key_path = private_key_path
67 self._domain = domain.rstrip("/")
68 self._expires_in = expires_in
69 self._private_key: Any = None
70
71 def _get_private_key(self) -> Any:
72 if self._private_key is None:
73 assert load_pem_private_key is not None
74 with open(self._private_key_path, "rb") as f:
75 self._private_key = load_pem_private_key(f.read(), password=None)
76 return self._private_key
77
[docs]
78 def generate_presigned_url(self, path: str, *, method: str = "GET") -> str:
79 private_key = self._get_private_key()
80
81 url = f"https://{self._domain}/{path.lstrip('/')}"
82 expiry = datetime.now(timezone.utc) + timedelta(seconds=self._expires_in)
83 epoch = int(expiry.timestamp())
84
85 policy = json.dumps(
86 {"Statement": [{"Resource": url, "Condition": {"DateLessThan": {"AWS:EpochTime": epoch}}}]},
87 separators=(",", ":"),
88 )
89
90 signature = private_key.sign(policy.encode("utf-8"), PKCS1v15(), SHA1()) # type: ignore[union-attr]
91
92 encoded_sig = _cf_b64encode(signature)
93 separator = "&" if "?" in url else "?"
94 return f"{url}{separator}Expires={epoch}&Signature={encoded_sig}&Key-Pair-Id={self._key_pair_id}"
95
96
97def _cf_b64encode(data: bytes) -> str:
98 """CloudFront URL-safe base64: ``+`` → ``-``, ``=`` → ``_``, ``/`` → ``~``.
99
100 Standard base64 characters ``+``, ``=``, and ``/`` are reserved in URLs;
101 CloudFront requires this substitution for signed URL query parameters.
102 See https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-canned-policy.html
103 """
104 return base64.b64encode(data).decode("ascii").replace("+", "-").replace("=", "_").replace("/", "~")