Note
Go to the end to download the full example code.
Single Variable Perturbation Method#
Intermediate ensemble inference using a custom perturbation method.
This example will demonstrate how to run a an ensemble inference workflow with a custom perturbation method that only applies noise to a specific variable.
In this example you will learn:
How to extend an existing pertubration method
How to instantiate a built in prognostic model
Creating a data source and IO object
Running a simple built in workflow
Extend a built-in method using custom code.
Post-processing results
Set Up#
All workflows inside Earth2Studio require constructed components to be
handed to them. In this example, we will use the built in ensemble workflow
earth2studio.run.ensemble()
.
def ensemble(
time: list[str] | list[datetime] | list[np.datetime64],
nsteps: int,
nensemble: int,
prognostic: PrognosticModel,
data: DataSource,
io: IOBackend,
perturbation: Perturbation,
batch_size: int | None = None,
output_coords: CoordSystem = OrderedDict({}),
device: torch.device | None = None,
) -> IOBackend:
"""Built in ensemble workflow.
Parameters
----------
time : list[str] | list[datetime] | list[np.datetime64]
List of string, datetimes or np.datetime64
nsteps : int
Number of forecast steps
nensemble : int
Number of ensemble members to run inference for.
prognostic : PrognosticModel
Prognostic models
data : DataSource
Data source
io : IOBackend
IO object
perturbation_method : Perturbation
Method to perturb the initial condition to create an ensemble.
batch_size: int, optional
Number of ensemble members to run in a single batch,
by default None.
output_coords: CoordSystem, optional
IO output coordinate system override, by default OrderedDict({})
device : torch.device, optional
Device to run inference on, by default None
Returns
-------
IOBackend
Output IO object
"""
We need the following:
Prognostic Model: Use the built in DLWP model
earth2studio.models.px.DLWP
.perturbation_method: Extend the Spherical Gaussian Method
earth2studio.perturbation.SphericalGaussian
.Datasource: Pull data from the GFS data api
earth2studio.data.GFS
.IO Backend: Save the outputs into a Zarr store
earth2studio.io.ZarrBackend
.
import os
os.makedirs("outputs", exist_ok=True)
from dotenv import load_dotenv
load_dotenv() # TODO: make common example prep function
import numpy as np
import torch
from earth2studio.data import GFS
from earth2studio.io import ZarrBackend
from earth2studio.models.px import DLWP
from earth2studio.perturbation import Perturbation, SphericalGaussian
from earth2studio.run import ensemble
from earth2studio.utils.type import CoordSystem
# Load the default model package which downloads the check point from NGC
package = DLWP.load_default_package()
model = DLWP.load_model(package)
# Create the data source
data = GFS()
/usr/local/lib/python3.10/dist-packages/modulus/models/module.py:360: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
model_dict = torch.load(
The perturbation method in Running Ensemble Inference is naive because it applies the same noise amplitude to every variable. We can create a custom wrapper that only applies the perturbation method to a particular variable instead.
class ApplyToVariable:
"""Apply a perturbation to only a particular variable."""
def __init__(self, pm: Perturbation, variable: str | list[str]):
self.pm = pm
if isinstance(variable, str):
variable = [variable]
self.variable = variable
@torch.inference_mode()
def __call__(
self,
x: torch.Tensor,
coords: CoordSystem,
) -> tuple[torch.Tensor, CoordSystem]:
# Apply perturbation
xp, _ = self.pm(x, coords)
# Add perturbed slice back into original tensor
ind = np.in1d(coords["variable"], self.variable)
x[..., ind, :, :] = xp[..., ind, :, :]
return x, coords
# Generate a new noise amplitude that specifically targets 't2m' with a 1 K noise amplitude
avsg = ApplyToVariable(SphericalGaussian(noise_amplitude=1.0), "t2m")
# Create the IO handler, store in memory
chunks = {"ensemble": 1, "time": 1, "lead_time": 1}
io = ZarrBackend(
file_name="outputs/05_ensemble_avsg.zarr",
chunks=chunks,
backend_kwargs={"overwrite": True},
)
Execute the Workflow#
With all components initialized, running the workflow is a single line of Python code. Workflow will return the provided IO object back to the user, which can be used to then post process. Some have additional APIs that can be handy for post-processing or saving to file. Check the API docs for more information.
For the forecast we will predict for 10 steps (for FCN, this is 60 hours) with 8 ensemble members which will be ran in 2 batches with batch size 4.
nsteps = 10
nensemble = 8
batch_size = 4
io = ensemble(
["2024-01-01"],
nsteps,
nensemble,
model,
data,
io,
avsg,
batch_size=batch_size,
output_coords={"variable": np.array(["t2m", "tcwv"])},
)
2025-01-23 04:41:14.782 | INFO | earth2studio.run:ensemble:315 - Running ensemble inference!
2025-01-23 04:41:14.782 | INFO | earth2studio.run:ensemble:323 - Inference device: cuda
2025-01-23 04:41:14.799 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:209 - Fetching GFS index file: 2023-12-31 18:00:00 lead 0:00:00
Fetching GFS for 2023-12-31 18:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:14.803 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t850 at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:14.830 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z1000 at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:14.857 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z700 at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:14.883 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z500 at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 0%| | 0/7 [00:00<?, ?it/s]
Fetching GFS for 2023-12-31 18:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.47it/s]
2025-01-23 04:41:14.910 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z300 at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.47it/s]
2025-01-23 04:41:14.936 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: tcwv at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.47it/s]
2025-01-23 04:41:14.962 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t2m at 2023-12-31 18:00:00_0:00:00
Fetching GFS for 2023-12-31 18:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.47it/s]
Fetching GFS for 2023-12-31 18:00:00: 100%|██████████| 7/7 [00:00<00:00, 37.65it/s]
2025-01-23 04:41:15.004 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:209 - Fetching GFS index file: 2024-01-01 00:00:00 lead 0:00:00
Fetching GFS for 2024-01-01 00:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:15.007 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t850 at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:15.034 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z1000 at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:15.061 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z700 at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 0%| | 0/7 [00:00<?, ?it/s]
2025-01-23 04:41:15.088 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z500 at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 0%| | 0/7 [00:00<?, ?it/s]
Fetching GFS for 2024-01-01 00:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.15it/s]
2025-01-23 04:41:15.115 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z300 at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.15it/s]
2025-01-23 04:41:15.141 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: tcwv at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.15it/s]
2025-01-23 04:41:15.168 | DEBUG | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t2m at 2024-01-01 00:00:00_0:00:00
Fetching GFS for 2024-01-01 00:00:00: 57%|█████▋ | 4/7 [00:00<00:00, 37.15it/s]
Fetching GFS for 2024-01-01 00:00:00: 100%|██████████| 7/7 [00:00<00:00, 37.44it/s]
2025-01-23 04:41:15.252 | SUCCESS | earth2studio.run:ensemble:345 - Fetched data from GFS
2025-01-23 04:41:15.261 | INFO | earth2studio.run:ensemble:367 - Starting 8 Member Ensemble Inference with 2 number of batches.
Total Ensemble Batches: 0%| | 0/2 [00:00<?, ?it/s]
Running batch 0 inference: 0%| | 0/11 [00:00<?, ?it/s]
Running batch 0 inference: 9%|▉ | 1/11 [00:00<00:03, 2.86it/s]
Running batch 0 inference: 18%|█▊ | 2/11 [00:00<00:03, 2.44it/s]
Running batch 0 inference: 27%|██▋ | 3/11 [00:01<00:03, 2.52it/s]
Running batch 0 inference: 36%|███▋ | 4/11 [00:01<00:02, 2.50it/s]
Running batch 0 inference: 45%|████▌ | 5/11 [00:01<00:02, 2.54it/s]
Running batch 0 inference: 55%|█████▍ | 6/11 [00:02<00:01, 2.53it/s]
Running batch 0 inference: 64%|██████▎ | 7/11 [00:02<00:01, 2.56it/s]
Running batch 0 inference: 73%|███████▎ | 8/11 [00:03<00:01, 2.55it/s]
Running batch 0 inference: 82%|████████▏ | 9/11 [00:03<00:00, 2.57it/s]
Running batch 0 inference: 91%|█████████ | 10/11 [00:03<00:00, 2.55it/s]
Running batch 0 inference: 100%|██████████| 11/11 [00:04<00:00, 2.58it/s]
Total Ensemble Batches: 50%|█████ | 1/2 [00:09<00:09, 9.38s/it]
Running batch 4 inference: 0%| | 0/11 [00:00<?, ?it/s]
Running batch 4 inference: 9%|▉ | 1/11 [00:00<00:03, 2.69it/s]
Running batch 4 inference: 18%|█▊ | 2/11 [00:00<00:03, 2.53it/s]
Running batch 4 inference: 27%|██▋ | 3/11 [00:01<00:03, 2.57it/s]
Running batch 4 inference: 36%|███▋ | 4/11 [00:01<00:02, 2.49it/s]
Running batch 4 inference: 45%|████▌ | 5/11 [00:01<00:02, 2.53it/s]
Running batch 4 inference: 55%|█████▍ | 6/11 [00:02<00:01, 2.51it/s]
Running batch 4 inference: 64%|██████▎ | 7/11 [00:02<00:01, 2.54it/s]
Running batch 4 inference: 73%|███████▎ | 8/11 [00:03<00:01, 2.52it/s]
Running batch 4 inference: 82%|████████▏ | 9/11 [00:03<00:00, 2.54it/s]
Running batch 4 inference: 91%|█████████ | 10/11 [00:03<00:00, 2.52it/s]
Running batch 4 inference: 100%|██████████| 11/11 [00:04<00:00, 2.54it/s]
Total Ensemble Batches: 100%|██████████| 2/2 [00:18<00:00, 9.30s/it]
Total Ensemble Batches: 100%|██████████| 2/2 [00:18<00:00, 9.31s/it]
2025-01-23 04:41:33.890 | SUCCESS | earth2studio.run:ensemble:412 - Inference complete
Post Processing#
The last step is to post process our results. Lets plot both the perturbed t2m field and also the unperturbed tcwv field. First to confirm the perturbation method works as expect, the initial state is plotted.
Notice that the Zarr IO function has additional APIs to interact with the stored data.
import matplotlib.pyplot as plt
forecast = "2024-01-01"
def plot_(axi, data, title, cmap):
"""Simple plot util function"""
im = axi.imshow(data, cmap=cmap)
plt.colorbar(im, ax=axi, shrink=0.5, pad=0.04)
axi.set_title(title)
step = 0 # lead time = 24 hrs
plt.close("all")
# Create a figure and axes with the specified projection
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(10, 6))
plot_(
ax[0, 0],
np.mean(io["t2m"][:, 0, step], axis=0),
f"{forecast} - t2m - Lead time: {6*step}hrs - Mean",
"coolwarm",
)
plot_(
ax[0, 1],
np.std(io["t2m"][:, 0, step], axis=0),
f"{forecast} - t2m - Lead time: {6*step}hrs - Std",
"coolwarm",
)
plot_(
ax[1, 0],
np.mean(io["tcwv"][:, 0, step], axis=0),
f"{forecast} - tcwv - Lead time: {6*step}hrs - Mean",
"Blues",
)
plot_(
ax[1, 1],
np.std(io["tcwv"][:, 0, step], axis=0),
f"{forecast} - tcwv - Lead time: {6*step}hrs - Std",
"Blues",
)
plt.savefig(f"outputs/05_{forecast}_{step}_ensemble.jpg")
Due to the intrinsic coupling between all fields, we should expect all variables to have some uncertainty for later lead times. Here the total column water vapor is plotted at a lead time of 24 hours, note the variance in the members despite just perturbing the temperature field.
step = 4 # lead time = 24 hrs
plt.close("all")
# Create a figure and axes with the specified projection
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(10, 3))
plot_(
ax[0],
np.mean(io["tcwv"][:, 0, step], axis=0),
f"{forecast} - tcwv - Lead time: {6*step}hrs - Mean",
"Blues",
)
plot_(
ax[1],
np.std(io["tcwv"][:, 0, step], axis=0),
f"{forecast} - tcwv - Lead time: {6*step}hrs - Std",
"Blues",
)
plt.savefig(f"outputs/05_{forecast}_{step}_ensemble.jpg")
Total running time of the script: (0 minutes 23.301 seconds)