.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "examples/extend/03_custom_datasource.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code. .. rst-class:: sphx-glr-example-title .. _sphx_glr_examples_extend_03_custom_datasource.py: Extending Data Sources ====================== Implementing a custom data source This example will demonstrate how to extend Earth2Studio by implementing a custom data source to use in a built in workflow. In this example you will learn: - API requirements of data soruces - Implementing a custom data soruce .. GENERATED FROM PYTHON SOURCE LINES 34-45 Custom Data Source ------------------ Earth2Studio defines the required APIs for data sources in :py:class:`earth2studio.data.base.DataSource` which requires just a call function. For this example, we will consider extending an existing remote data source with another atmospheric field we can calculate. The :py:class:`earth2studio.data.ARCO` data source provides the ERA5 dataset in a cloud optimized format, however it only provides specific humidity. This is a problem for models that may use relative humidity as an input. Based on ECMWF documentation we can calculate the relative humidity based on temperature and geo-potential. .. GENERATED FROM PYTHON SOURCE LINES 47-180 .. code-block:: Python import os os.makedirs("outputs", exist_ok=True) from dotenv import load_dotenv load_dotenv() # TODO: make common example prep function from datetime import datetime import numpy as np import xarray as xr from earth2studio.data import ARCO, GFS from earth2studio.data.utils import prep_data_inputs from earth2studio.utils.type import TimeArray, VariableArray class CustomDataSource: """Custom ARCO datasource""" relative_humidity_ids = [ "r50", "r100", "r150", "r200", "r250", "r300", "r400", "r500", "r600", "r700", "r850", "r925", "r1000", ] def __init__(self, cache: bool = True, verbose: bool = True): self.arco = ARCO(cache, verbose) def __call__( self, time: datetime | list[datetime] | TimeArray, variable: str | list[str] | VariableArray, ) -> xr.DataArray: """Function to get data. Parameters ---------- time : datetime | list[datetime] | TimeArray Timestamps to return data for (UTC). variable : str | list[str] | VariableArray String, list of strings or array of strings that refer to variables to return. Must be in IFS lexicon. Returns ------- xr.DataArray """ time, variable = prep_data_inputs(time, variable) # Replace relative humidity with respective temperature # and specifc humidity fields variable_expanded = [] for v in variable: if v in self.relative_humidity_ids: level = int(v[1:]) variable_expanded.extend([f"t{level}", f"q{level}"]) else: variable_expanded.append(v) variable_expanded = list(set(variable_expanded)) # Fetch from ARCO da_exp = self.arco(time, variable_expanded) # Calculate relative humidity when needed arrays = [] for v in variable: if v in self.relative_humidity_ids: level = int(v[1:]) t = da_exp.sel(variable=f"t{level}").values q = da_exp.sel(variable=f"q{level}").values rh = self.calc_relative_humdity(t, q, 100 * level) arrays.append(rh) else: arrays.append(da_exp.sel(variable=v).values) da = xr.DataArray( data=np.stack(arrays, axis=1), dims=["time", "variable", "lat", "lon"], coords=dict( time=da_exp.coords["time"].values, variable=np.array(variable), lat=da_exp.coords["lat"].values, lon=da_exp.coords["lon"].values, ), ) return da def calc_relative_humdity( self, temperature: np.array, specific_humidity: np.array, pressure: float ) -> np.array: """Relative humidity calculation Parameters ---------- temperature : np.array Temperature field (K) specific_humidity : np.array Specific humidity field (g.kg-1) pressure : float Pressure (Pa) Returns ------- np.array """ epsilon = 0.621981 p = pressure q = specific_humidity t = temperature e = (p * q * (1.0 / epsilon)) / (1 + q * (1.0 / (epsilon) - 1)) es_w = 611.21 * np.exp(17.502 * (t - 273.16) / (t - 32.19)) es_i = 611.21 * np.exp(22.587 * (t - 273.16) / (t + 0.7)) alpha = np.clip((t - 250.16) / (273.16 - 250.16), 0, 1.2) ** 2 es = alpha * es_w + (1 - alpha) * es_i rh = 100 * e / es return rh .. GENERATED FROM PYTHON SOURCE LINES 181-188 :py:func:`__call__` API ~~~~~~~~~~~~~~~~~~~~~~~ The call function is the main API of data source which return the Xarray data array with the requested data. For this custom data source we intercept relative humidity variables, replace them with temperature and specific humidity requests then calculate the relative humidity from these fields. Note that the ARCO data source is handling the remote complexity, we are just manipulating Numpy arrays .. GENERATED FROM PYTHON SOURCE LINES 190-198 :py:func:`calc_relative_humdity` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Based on the calculations ECMWF uses in their IFS numerical simulator which accounts for estimating the water vapor and ice present in the atmosphere. .. note:: See reference, equation 7.98 onwards: https://www.ecmwf.int/en/elibrary/81370-ifs-documentation-cy48r1-part-iv-physical-processes .. GENERATED FROM PYTHON SOURCE LINES 200-204 Verification ------------ Before plugging this into our workflow, let's quickly verify our data source is consistent with when GFS provides for relative humidity. .. GENERATED FROM PYTHON SOURCE LINES 206-214 .. code-block:: Python ds = CustomDataSource() da_custom = ds(time=datetime(2022, 1, 1, hour=0), variable=["r500"]) ds_gfs = GFS() da_gfs = ds_gfs(time=datetime(2022, 1, 1, hour=0), variable=["r500"]) print(da_custom) .. GENERATED FROM PYTHON SOURCE LINES 215-244 .. code-block:: Python import cartopy.crs as ccrs import matplotlib.pyplot as plt fig, ax = plt.subplots( 1, 2, figsize=(10, 3), subplot_kw={"projection": ccrs.Mollweide()}, constrained_layout=True, ) ax[0].imshow( da_custom.sel(variable="r500")[0], transform=ccrs.PlateCarree(), vmin=0, vmax=100 ) ax[1].imshow( da_gfs.sel(variable="r500")[0], transform=ccrs.PlateCarree(), vmin=0, vmax=100 ) ax[0].set_title("Custom ARCO") ax[1].set_title("GFS") plt.suptitle("r500", fontsize=24) cbar = plt.cm.ScalarMappable() cbar.set_array(da_custom.sel(variable="r500")[0]) cbar.set_clim(0, 100) cbar = fig.colorbar(cbar, ax=ax[-1], orientation="vertical", shrink=0.8) plt.savefig("outputs/custom_datasource_gfs_versus_custom.jpg") .. GENERATED FROM PYTHON SOURCE LINES 245-256 Execute Workflow ---------------- We will use this custom data source to run deterministic inference with a model that requires relative humidity. :mod:`earth2studio.models.px.FCN` is one such model. Since we are using ARCO, we can run inference for a time quite far back in time. Let's instantiate the components needed. - Prognostic Model: Use the built in FourCastNet Model :py:class:`earth2studio.models.px.FCN`. - Datasource: Custom data source above - IO Backend: Save the outputs into a Zarr store :py:class:`earth2studio.io.ZarrBackend`. .. GENERATED FROM PYTHON SOURCE LINES 258-280 .. code-block:: Python from dotenv import load_dotenv load_dotenv() # TODO: make common example prep function import earth2studio.run as run from earth2studio.io import ZarrBackend from earth2studio.models.px import FCN package = FCN.load_default_package() model = FCN.load_model(package) # Create the data source data = CustomDataSource() # Create the IO handler, store in memory io = ZarrBackend() nsteps = 4 io = run.deterministic(["1993-04-05"], nsteps, model, data, io) print(io.root.tree()) .. GENERATED FROM PYTHON SOURCE LINES 281-285 Post Processing --------------- To confirm that our model is working as expected, we will plot the total column water vapor field for a few time-steps. .. GENERATED FROM PYTHON SOURCE LINES 287-310 .. code-block:: Python forecast = "1993-04-05" variable = "tcwv" plt.close("all") # Create a figure and axes with the specified projection fig, ax = plt.subplots(2, 2, figsize=(6, 4)) # Plot tcwv every 6 hours ax[0, 0].imshow(io[variable][0, 0], vmin=0, vmax=80, cmap="magma") ax[0, 1].imshow(io[variable][0, 1], vmin=0, vmax=80, cmap="magma") ax[1, 0].imshow(io[variable][0, 2], vmin=0, vmax=80, cmap="magma") ax[1, 1].imshow(io[variable][0, 3], vmin=0, vmax=80, cmap="magma") # Set title plt.suptitle(f"{variable} - {forecast}") times = io["lead_time"].astype("timedelta64[h]").astype(int) ax[0, 0].set_title(f"Lead time: {times[0]}hrs") ax[0, 1].set_title(f"Lead time: {times[1]}hrs") ax[1, 0].set_title(f"Lead time: {times[2]}hrs") ax[1, 1].set_title(f"Lead time: {times[3]}hrs") plt.savefig("outputs/custom_datasource_prediction.jpg", bbox_inches="tight") .. _sphx_glr_download_examples_extend_03_custom_datasource.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: 03_custom_datasource.ipynb <03_custom_datasource.ipynb>` .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: 03_custom_datasource.py <03_custom_datasource.py>` .. container:: sphx-glr-download sphx-glr-download-zip :download:`Download zipped: 03_custom_datasource.zip <03_custom_datasource.zip>` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_