Single Variable Perturbation Method#

Intermediate ensemble inference using a custom perturbation method.

This example will demonstrate how to run a an ensemble inference workflow with a custom perturbation method that only applies noise to a specific variable.

In this example you will learn:

  • How to extend an existing pertubration method

  • How to instantiate a built in prognostic model

  • Creating a data source and IO object

  • Running a simple built in workflow

  • Extend a built-in method using custom code.

  • Post-processing results

Set Up#

All workflows inside Earth2Studio require constructed components to be handed to them. In this example, we will use the built in ensemble workflow earth2studio.run.ensemble().

def ensemble(
    time: list[str] | list[datetime] | list[np.datetime64],
    nsteps: int,
    nensemble: int,
    prognostic: PrognosticModel,
    data: DataSource,
    io: IOBackend,
    perturbation: Perturbation,
    batch_size: int | None = None,
    output_coords: CoordSystem = OrderedDict({}),
    device: torch.device | None = None,
) -> IOBackend:
    """Built in ensemble workflow.

    Parameters
    ----------
    time : list[str] | list[datetime] | list[np.datetime64]
        List of string, datetimes or np.datetime64
    nsteps : int
        Number of forecast steps
    nensemble : int
        Number of ensemble members to run inference for.
    prognostic : PrognosticModel
        Prognostic models
    data : DataSource
        Data source
    io : IOBackend
        IO object
    perturbation_method : Perturbation
        Method to perturb the initial condition to create an ensemble.
    batch_size: int, optional
        Number of ensemble members to run in a single batch,
        by default None.
    output_coords: CoordSystem, optional
        IO output coordinate system override, by default OrderedDict({})
    device : torch.device, optional
        Device to run inference on, by default None

    Returns
    -------
    IOBackend
        Output IO object
    """

We need the following:

import os

os.makedirs("outputs", exist_ok=True)
from dotenv import load_dotenv

load_dotenv()  # TODO: make common example prep function

import numpy as np
import torch

from earth2studio.data import GFS
from earth2studio.io import ZarrBackend
from earth2studio.models.px import DLWP
from earth2studio.perturbation import Perturbation, SphericalGaussian
from earth2studio.run import ensemble
from earth2studio.utils.type import CoordSystem

# Load the default model package which downloads the check point from NGC
package = DLWP.load_default_package()
model = DLWP.load_model(package)

# Create the data source
data = GFS()
/usr/local/lib/python3.10/dist-packages/modulus/models/module.py:360: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
  model_dict = torch.load(

The perturbation method in Running Ensemble Inference is naive because it applies the same noise amplitude to every variable. We can create a custom wrapper that only applies the perturbation method to a particular variable instead.

class ApplyToVariable:
    """Apply a perturbation to only a particular variable."""

    def __init__(self, pm: Perturbation, variable: str | list[str]):
        self.pm = pm
        if isinstance(variable, str):
            variable = [variable]
        self.variable = variable

    @torch.inference_mode()
    def __call__(
        self,
        x: torch.Tensor,
        coords: CoordSystem,
    ) -> tuple[torch.Tensor, CoordSystem]:
        # Apply perturbation
        xp, _ = self.pm(x, coords)
        # Add perturbed slice back into original tensor
        ind = np.in1d(coords["variable"], self.variable)
        x[..., ind, :, :] = xp[..., ind, :, :]
        return x, coords


# Generate a new noise amplitude that specifically targets 't2m' with a 1 K noise amplitude
avsg = ApplyToVariable(SphericalGaussian(noise_amplitude=1.0), "t2m")

# Create the IO handler, store in memory
chunks = {"ensemble": 1, "time": 1, "lead_time": 1}
io = ZarrBackend(
    file_name="outputs/05_ensemble_avsg.zarr",
    chunks=chunks,
    backend_kwargs={"overwrite": True},
)

Execute the Workflow#

With all components initialized, running the workflow is a single line of Python code. Workflow will return the provided IO object back to the user, which can be used to then post process. Some have additional APIs that can be handy for post-processing or saving to file. Check the API docs for more information.

For the forecast we will predict for 10 steps (for FCN, this is 60 hours) with 8 ensemble members which will be ran in 2 batches with batch size 4.

nsteps = 10
nensemble = 8
batch_size = 4
io = ensemble(
    ["2024-01-01"],
    nsteps,
    nensemble,
    model,
    data,
    io,
    avsg,
    batch_size=batch_size,
    output_coords={"variable": np.array(["t2m", "tcwv"])},
)
2025-01-23 04:41:14.782 | INFO     | earth2studio.run:ensemble:315 - Running ensemble inference!
2025-01-23 04:41:14.782 | INFO     | earth2studio.run:ensemble:323 - Inference device: cuda
2025-01-23 04:41:14.799 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:209 - Fetching GFS index file: 2023-12-31 18:00:00 lead 0:00:00

Fetching GFS for 2023-12-31 18:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:14.803 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t850 at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:14.830 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z1000 at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:14.857 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z700 at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:14.883 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z500 at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:   0%|          | 0/7 [00:00<?, ?it/s]
Fetching GFS for 2023-12-31 18:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.47it/s]

2025-01-23 04:41:14.910 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z300 at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.47it/s]

2025-01-23 04:41:14.936 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: tcwv at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.47it/s]

2025-01-23 04:41:14.962 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t2m at 2023-12-31 18:00:00_0:00:00

Fetching GFS for 2023-12-31 18:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.47it/s]
Fetching GFS for 2023-12-31 18:00:00: 100%|██████████| 7/7 [00:00<00:00, 37.65it/s]
2025-01-23 04:41:15.004 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:209 - Fetching GFS index file: 2024-01-01 00:00:00 lead 0:00:00

Fetching GFS for 2024-01-01 00:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:15.007 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t850 at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:15.034 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z1000 at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:15.061 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z700 at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:   0%|          | 0/7 [00:00<?, ?it/s]

2025-01-23 04:41:15.088 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z500 at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:   0%|          | 0/7 [00:00<?, ?it/s]
Fetching GFS for 2024-01-01 00:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.15it/s]

2025-01-23 04:41:15.115 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: z300 at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.15it/s]

2025-01-23 04:41:15.141 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: tcwv at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.15it/s]

2025-01-23 04:41:15.168 | DEBUG    | earth2studio.data.gfs:_fetch_gfs_dataarray:255 - Fetching GFS grib file for variable: t2m at 2024-01-01 00:00:00_0:00:00

Fetching GFS for 2024-01-01 00:00:00:  57%|█████▋    | 4/7 [00:00<00:00, 37.15it/s]
Fetching GFS for 2024-01-01 00:00:00: 100%|██████████| 7/7 [00:00<00:00, 37.44it/s]
2025-01-23 04:41:15.252 | SUCCESS  | earth2studio.run:ensemble:345 - Fetched data from GFS
2025-01-23 04:41:15.261 | INFO     | earth2studio.run:ensemble:367 - Starting 8 Member Ensemble Inference with             2 number of batches.

Total Ensemble Batches:   0%|          | 0/2 [00:00<?, ?it/s]

Running batch 0 inference:   0%|          | 0/11 [00:00<?, ?it/s]

Running batch 0 inference:   9%|▉         | 1/11 [00:00<00:03,  2.86it/s]

Running batch 0 inference:  18%|█▊        | 2/11 [00:00<00:03,  2.44it/s]

Running batch 0 inference:  27%|██▋       | 3/11 [00:01<00:03,  2.52it/s]

Running batch 0 inference:  36%|███▋      | 4/11 [00:01<00:02,  2.50it/s]

Running batch 0 inference:  45%|████▌     | 5/11 [00:01<00:02,  2.54it/s]

Running batch 0 inference:  55%|█████▍    | 6/11 [00:02<00:01,  2.53it/s]

Running batch 0 inference:  64%|██████▎   | 7/11 [00:02<00:01,  2.56it/s]

Running batch 0 inference:  73%|███████▎  | 8/11 [00:03<00:01,  2.55it/s]

Running batch 0 inference:  82%|████████▏ | 9/11 [00:03<00:00,  2.57it/s]

Running batch 0 inference:  91%|█████████ | 10/11 [00:03<00:00,  2.55it/s]

Running batch 0 inference: 100%|██████████| 11/11 [00:04<00:00,  2.58it/s]


Total Ensemble Batches:  50%|█████     | 1/2 [00:09<00:09,  9.38s/it]

Running batch 4 inference:   0%|          | 0/11 [00:00<?, ?it/s]

Running batch 4 inference:   9%|▉         | 1/11 [00:00<00:03,  2.69it/s]

Running batch 4 inference:  18%|█▊        | 2/11 [00:00<00:03,  2.53it/s]

Running batch 4 inference:  27%|██▋       | 3/11 [00:01<00:03,  2.57it/s]

Running batch 4 inference:  36%|███▋      | 4/11 [00:01<00:02,  2.49it/s]

Running batch 4 inference:  45%|████▌     | 5/11 [00:01<00:02,  2.53it/s]

Running batch 4 inference:  55%|█████▍    | 6/11 [00:02<00:01,  2.51it/s]

Running batch 4 inference:  64%|██████▎   | 7/11 [00:02<00:01,  2.54it/s]

Running batch 4 inference:  73%|███████▎  | 8/11 [00:03<00:01,  2.52it/s]

Running batch 4 inference:  82%|████████▏ | 9/11 [00:03<00:00,  2.54it/s]

Running batch 4 inference:  91%|█████████ | 10/11 [00:03<00:00,  2.52it/s]

Running batch 4 inference: 100%|██████████| 11/11 [00:04<00:00,  2.54it/s]


Total Ensemble Batches: 100%|██████████| 2/2 [00:18<00:00,  9.30s/it]
Total Ensemble Batches: 100%|██████████| 2/2 [00:18<00:00,  9.31s/it]
2025-01-23 04:41:33.890 | SUCCESS  | earth2studio.run:ensemble:412 - Inference complete

Post Processing#

The last step is to post process our results. Lets plot both the perturbed t2m field and also the unperturbed tcwv field. First to confirm the perturbation method works as expect, the initial state is plotted.

Notice that the Zarr IO function has additional APIs to interact with the stored data.

import matplotlib.pyplot as plt

forecast = "2024-01-01"


def plot_(axi, data, title, cmap):
    """Simple plot util function"""
    im = axi.imshow(data, cmap=cmap)
    plt.colorbar(im, ax=axi, shrink=0.5, pad=0.04)
    axi.set_title(title)


step = 0  # lead time = 24 hrs
plt.close("all")

# Create a figure and axes with the specified projection
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(10, 6))
plot_(
    ax[0, 0],
    np.mean(io["t2m"][:, 0, step], axis=0),
    f"{forecast} - t2m - Lead time: {6*step}hrs - Mean",
    "coolwarm",
)
plot_(
    ax[0, 1],
    np.std(io["t2m"][:, 0, step], axis=0),
    f"{forecast} - t2m - Lead time: {6*step}hrs - Std",
    "coolwarm",
)
plot_(
    ax[1, 0],
    np.mean(io["tcwv"][:, 0, step], axis=0),
    f"{forecast} - tcwv - Lead time: {6*step}hrs - Mean",
    "Blues",
)
plot_(
    ax[1, 1],
    np.std(io["tcwv"][:, 0, step], axis=0),
    f"{forecast} - tcwv - Lead time: {6*step}hrs - Std",
    "Blues",
)

plt.savefig(f"outputs/05_{forecast}_{step}_ensemble.jpg")
2024-01-01 - t2m - Lead time: 0hrs - Mean, 2024-01-01 - t2m - Lead time: 0hrs - Std, 2024-01-01 - tcwv - Lead time: 0hrs - Mean, 2024-01-01 - tcwv - Lead time: 0hrs - Std

Due to the intrinsic coupling between all fields, we should expect all variables to have some uncertainty for later lead times. Here the total column water vapor is plotted at a lead time of 24 hours, note the variance in the members despite just perturbing the temperature field.

step = 4  # lead time = 24 hrs
plt.close("all")

# Create a figure and axes with the specified projection
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(10, 3))
plot_(
    ax[0],
    np.mean(io["tcwv"][:, 0, step], axis=0),
    f"{forecast} - tcwv - Lead time: {6*step}hrs - Mean",
    "Blues",
)
plot_(
    ax[1],
    np.std(io["tcwv"][:, 0, step], axis=0),
    f"{forecast} - tcwv - Lead time: {6*step}hrs - Std",
    "Blues",
)

plt.savefig(f"outputs/05_{forecast}_{step}_ensemble.jpg")
2024-01-01 - tcwv - Lead time: 24hrs - Mean, 2024-01-01 - tcwv - Lead time: 24hrs - Std

Total running time of the script: (0 minutes 23.301 seconds)

Gallery generated by Sphinx-Gallery