Weather Data Loaders and Transforms#

This tutorial walks through DFM’s weather adapters in nv-dfm-lib-weather: we load data from several sources (ECMWF and GFS), transform it with xarray operations, and visualize the results. It builds on the Basic Pipeline notebook.

Initial setup#

The helpers folder contains Python helpers that mirror the patterns from the Basic Pipeline notebook (imports, pipeline helper, image and run-panel utilities). We use them so we can focus on the weather operations.

# import all the basic classes from the imports file, listing them explicitly for the linter
from helpers.imports import (
    Pipeline,
    Yield,
    PlaceParam,
    Advise,
    Session,
    ConvertToUint8,
    LoadEcmwfEra5Data,
    LoadGfsEra5Data,
    RenderUint8ToImages,
)

# and the pipeline helper class
from helpers.pipeline_helper import PipelineHelper

# and the image helper class
from helpers.image_helper import texturefile_to_image, texturefilelist_to_images

# import the federation runtime to create the session
import weather_fed.fed.runtime.homesite

from datetime import datetime
import ipywidgets as widgets
from IPython.display import display


def create_weather_widgets(discovered_data):
    """
    Create weather parameter selection widgets from discovered data.

    Args:
        discovered_data: The discovery results from the weather data pipeline

    Returns:
        tuple: (var_dropdown, date_picker) - The weather variable dropdown and date picker widgets
    """
    # extract the advice from the discovery list
    advice_tuple = discovered_data[0][0]
    advice = advice_tuple[1]

    # iterate over the linked list of advices and extract the data
    # remember the start and end dates of the selection advice
    variables = None
    first_date = None
    last_date = None
    frequency = None

    while True:
        if advice.field == "selection":
            first_date = advice.value[1]["time"]["first_date"]
            last_date = advice.value[1]["time"]["last_date"]
            frequency = advice.value[1]["time"]["frequency"]
        if advice.field == "variables":
            # skipping the * advice, reading position 1
            variables = advice.value[1]
        if advice.edge is None:
            break
        advice = advice.edge

    if variables is None or first_date is None or last_date is None:
        raise ValueError(
            "Could not extract variables or date range from discovered data"
        )

    print(f"variables: {variables}")
    print(f"start date: {first_date}, end date: {last_date}, frequency: {frequency}")

    start_date = datetime.strptime(first_date, "%Y-%m-%d").date()
    end_date = datetime.strptime(last_date, "%Y-%m-%d").date()

    var_dropdown = widgets.Dropdown(options=variables, description="Weather Variable:")
    date_picker = widgets.DatePicker(
        description="Pick a Date", value=start_date, min=start_date, max=end_date
    )

    return var_dropdown, date_picker

ECMWF weather data#

The LoadEcmwfEra5Data adapter downloads ERA5 reanalysis data from ECMWF and returns an xarray.Dataset with geospatial coordinates and time (typical dimensions: time, latitude, longitude). That dataset can be passed to downstream operations such as ConvertToUint8, VariableNorm, and RenderUint8ToImages.

  • Historical weather and climate variables from the ERA5 archive

  • Variable filtering and time-based selection

  • Regular lat/lon grid with coordinate metadata preserved

  • Compatible with the xarray-based pipelines in this notebook

LoadEcmwfEra5Data parameters#

Required parameters:

  • variables (list[str]): ERA5 variable names to include, or ['*'] for all supported variables

    • Examples: "10m_u_component_of_wind", "10m_v_component_of_wind", "2m_temperature", "mean_sea_level_pressure", "total_precipitation"

    • Use discovery mode in this notebook to list the supported variables

  • selection (dict[str, Any]): Coordinate selectors, typically time

    • Common usage: { "time": "YYYY-MM-DD" } or { "time": ["YYYY-MM-DD", ...] }

    • Selection is applied with method="nearest" for convenience

Returns: xarray.Dataset with the requested variables and selection; coordinates are preserved.

Notes:

  • If variables list is specified, only those variables are retained; coordinates are preserved.

  • If selection is omitted, the full available time axis may be loaded. Loading a single date is recommended for performance.

  • The dataset is cached internally using a hash of variables and selection to avoid repeated downloads.

We use discovery to find valid values for variables and selection without reading the adapter source. The next cell defines a small discovery pipeline and runs it.

from nv_dfm_core.api import DISCOVERY_PLACE_NAME


# extend the PipelineHelper class to add a method for discovering parameters
class MyEcmwfPipeline(PipelineHelper):
    def __init__(self, session: Session):
        super().__init__(session)

    def discover_parameters(self):
        with Pipeline(mode="discovery") as pipeline:
            LoadEcmwfEra5Data(
                variables=Advise(),
                selection=Advise(),
            )
        self.prepare(pipeline)
        self.run(input_params={})
        return self.callback_results[DISCOVERY_PLACE_NAME]


# create a session and run the pipeline
e2_session = weather_fed.fed.runtime.homesite.get_session(target="local")
e2_session.connect()
helper_ecmwf = MyEcmwfPipeline(e2_session)
discovered_ecmwf_data = helper_ecmwf.discover_parameters()
print(discovered_ecmwf_data)

We use the discovery results to build dropdown and date-picker widgets for variable and date selection.

variables_dd_ecmwf, date_picker_ecmwf = create_weather_widgets(discovered_ecmwf_data)

Select weather parameters#

display(variables_dd_ecmwf)
display(date_picker_ecmwf)
print(f"selected weather variable: {variables_dd_ecmwf.value}")
print(f"selected date: {date_picker_ecmwf.value}")
date_str = date_picker_ecmwf.value.strftime("%Y-%m-%d")

input_params_ecmwf = {
    "weather_variable": [variables_dd_ecmwf.value],
    "date": {"time": date_str},
}

with Pipeline() as ecmwf_pipeline:
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="weather_variable"),
        selection=PlaceParam(place="date"),
    )
    Yield(value=data)

helper_ecmwf.prepare(ecmwf_pipeline)
helper_ecmwf.run(input_params_ecmwf)
print(helper_ecmwf.callback_results["yield"])

Show as image#

We extend the pipeline with ConvertToUint8 and (later) RenderUint8ToImages so the xarray data is converted to uint8 and then to an image.

# setup the pipeline
with Pipeline() as pipeline_ecmwf:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="weather_variable"),
        selection=PlaceParam(place="date"),
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    Yield(value=convert_to_uint8)

We set the pipeline inputs and run it to obtain the uint8 array (before rendering to an image).

Note: Here min_value_convert and max_value_convert are None. In that case ConvertToUint8 infers min and max from the data and scales to [0, 255]. For multiple frames you may want fixed min/max so brightness is consistent across frames; suitable values depend on the weather variable. The adapter reports the inferred min and max as attributes on the result.

# setup all input parameters for the pipeline
# note how the keys match the names of the PlaceParam objects in the pipeline
# The PlaceParam names have to be unique across the pipeline.
input_params_ecmwf = {
    "weather_variable": [variables_dd_ecmwf.value],
    "date": {"time": date_str},
    "time_dimension_convert": "time",
    "xydims_convert": ["longitude", "latitude"],
    "min_value_convert": None,
    "max_value_convert": None,
}

helper_ecmwf.clear_callback_results()
helper_ecmwf.prepare(pipeline_ecmwf)
helper_ecmwf.run(input_params_ecmwf)
print(helper_ecmwf.callback_results["yield"])

We add RenderUint8ToImages to the pipeline so the uint8 array is turned into a PNG and displayed.

Note: The operations take the time dimension and spatial dimensions (xydims) so reduction and rendering happen on the correct axes. They support xarray data with multiple time steps; RenderUint8ToImages returns a list of images (one per frame).

# setup the pipeline
with Pipeline() as pipeline_ecmwf:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="weather_variable"),
        selection=PlaceParam(place="date"),
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format=PlaceParam(place="format_render"),
    )
    Yield(value=render_uint8_to_images)

# setup all input parameters for the pipeline
# note how the keys match the names of the PlaceParam objects in the pipeline
# The PlaceParam names have to be unique across the pipeline.
input_params_ecmwf = {
    "weather_variable": [variables_dd_ecmwf.value],
    "date": {"time": date_str},
    "time_dimension_convert": "time",
    "xydims_convert": ["longitude", "latitude"],
    "min_value_convert": None,
    "max_value_convert": None,
    "xydims_render": ["longitude", "latitude"],
    "time_dimension_render": "time",
    "variable_render": variables_dd_ecmwf.value,
    "format_render": "PNG",
}

helper_ecmwf.clear_callback_results()
helper_ecmwf.prepare(pipeline_ecmwf)
helper_ecmwf.run(input_params_ecmwf)
print(helper_ecmwf.callback_results["yield"])

We display the rendered image using the same base64-decoding helpers as in the Basic Pipeline notebook.

from helpers.image_helper import texturefilelist_to_images

images = texturefilelist_to_images(helper_ecmwf.callback_results["yield"][0])
display(images[0])

We combine the ECMWF pipeline with a small UI: variable and date selectors plus a run button that executes the pipeline and shows the image.

# helper to make a run panel with a run button and an output area
from helpers.make_run_panel import make_run_panel


# callback to run the pipeline
def run_pipeline_ecmwf(controls):
    var_dd, date_pk = controls
    date_str = date_pk.value.strftime("%Y-%m-%d")
    input_params_ecmwf.update(
        {
            "weather_variable": [var_dd.value],
            "variable_render": var_dd.value,
            "date": {"time": date_str},
        }
    )
    helper_ecmwf.clear_callback_results()
    helper_ecmwf.run(input_params_ecmwf)
    yields = helper_ecmwf.callback_results.get("yield", [])
    return yields[0] if yields else None


# callback to display the result
def display_result_ecmwf(result, output):
    if not result:
        print("No images found")
        return
    images = texturefilelist_to_images(result)
    if images:
        display(images[0])
    else:
        print("No images found")


# make the run panel
panel = make_run_panel(
    controls=[variables_dd_ecmwf, date_picker_ecmwf],
    run_callback=run_pipeline_ecmwf,
    display_callback=display_result_ecmwf,
    auto_run=True,
)

display(panel)

GFS weather data#

The LoadGfsEra5Data adapter downloads NOAA Global Forecast System (GFS) data and exposes it in an ERA5-like schema: variable names and structure are aligned so the same downstream steps (e.g. ConvertToUint8, RenderUint8ToImages) work as for ECMWF.

  • Short-range forecast data with ERA5-style variable names

  • Requires explicit date/time selection (GFS runs and forecast cycles)

  • Regular lat/lon grid with time, lat, lon coordinates

  • Internal mapping from requested variable names to GFS fields

LoadGfsEra5Data parameters#

Required parameters:

  • variables (list[str]): ERA5‑style variable names to request. These are mapped internally to GFS fields.

    • Examples (from discovery): "u10m", "v10m", "t2m", "msl", "sp", many pressure‑level variants like "u500", "v700", "t850", "z500", etc.

    • Use discovery mode in this notebook to list currently supported names

  • selection (dict[str, Any]): Coordinate selectors. A specific date is required.

    • Required key: "time" with value "YYYY-MM-DD"

    • Example: { "time": "2025-08-15" }

    • The adapter validates that a time is provided and will error if missing

Returns: xarray.Dataset with the requested variables on a lat/lon grid (time, lat, lon). Variable names follow ERA5-style naming for downstream compatibility.

Notes:

  • Data is accessed from NOAA NOMADS OPeNDAP; selecting a single date keeps downloads fast.

  • Variables requested are translated via an internal ERA5↔GFS map and may include level selections (the adapter drops intermediate indexing variables).

  • Results are cached via a hash of variables and selection to avoid repeat downloads.

Query discovery#

The pattern is the same as for ECMWF; we only swap in LoadGfsEra5Data and run discovery to get GFS variables and selection options.

from nv_dfm_core.api import DISCOVERY_PLACE_NAME


# extend the PipelineHelper class to add a method for discovering parameters
class MyGFSHelper(PipelineHelper):
    def __init__(self, session: Session):
        super().__init__(session)

    def discover_parameters(self):
        with Pipeline(mode="discovery") as pipeline:
            LoadGfsEra5Data(
                variables=Advise(),
                selection=Advise(),
            )
        self.prepare(pipeline)
        self.run(input_params={})
        return self.callback_results[DISCOVERY_PLACE_NAME]


# create a session and run the pipeline
e2_session = weather_fed.fed.runtime.homesite.get_session(target="local")
e2_session.connect()
helper_gfs = MyGFSHelper(e2_session)
discovered_gfs_data = helper_gfs.discover_parameters()
print(discovered_gfs_data)

We define the GFS pipeline (load → convert → render) and wire it to the discovered parameters. Input param keys match the PlaceParam place names.

# create pipeline for GFS data
with Pipeline() as pipeline_gfs:
    # load the data from the GFS dataset
    data = LoadGfsEra5Data(
        variables=PlaceParam(place="weather_variable"),
        selection=PlaceParam(place="date"),
        invalidate_cache=True,
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format=PlaceParam(place="format_render"),
    )
    Yield(value=render_uint8_to_images)

# setup all input parameters for the pipeline
# note how the keys match the names of the PlaceParam objects in the pipeline
# The PlaceParam names have to be unique across the pipeline.
input_params_gfs = {
    "weather_variable": [],  # ["u10m"],
    "date": {"time": None},
    "time_dimension_convert": "time",
    "xydims_convert": ["lon", "lat"],
    "min_value_convert": None,
    "max_value_convert": None,
    "xydims_render": ["lon", "lat"],
    "time_dimension_render": "time",
    "variable_render": None,
    "format_render": "PNG",
}

# prepare the pipeline
helper_gfs.prepare(pipeline_gfs)

We build the same kind of run panel as for ECMWF: variable and date widgets plus a run button, using the discovered GFS values.

# helper to make a run panel with a run button and an output area
from helpers.make_run_panel import make_run_panel

# create the widgets for the GFS data
variables_dd_gfs, date_picker_gfs = create_weather_widgets(discovered_gfs_data)


# callback to run the pipeline
def run_pipeline_gfs(controls):
    var_dd, date_pk = controls
    date_str = date_pk.value.strftime("%Y-%m-%d")
    input_params_gfs.update(
        {
            "weather_variable": [var_dd.value],
            "variable_render": var_dd.value,
            "date": {"time": date_str},
        }
    )
    helper_gfs.clear_callback_results()
    helper_gfs.run(input_params_gfs)
    yields = helper_gfs.callback_results.get("yield", [])
    return yields[0] if yields else None


# callback to display the result
def display_result_gfs(result, output):
    if not result:
        print("No images found")
        return
    images = texturefilelist_to_images(result)
    if images:
        display(images[0])
    else:
        print("No images found")


# run_pipeline_gfs([variables_dd_gfs, date_picker_gfs])
# make the run panel
panel_gfs = make_run_panel(
    controls=[variables_dd_gfs, date_picker_gfs],
    run_callback=run_pipeline_gfs,
    display_callback=display_result_gfs,
    auto_run=True,
)

display(panel_gfs)

Xarray operations#

The next section summarizes the main xarray operations used in this notebook and then shows a combined example with VariableNorm.

Three core xarray operations are used in this notebook: they turn raw weather datasets into images via conversion, optional normalization, and rendering.

1. ConvertToUint8 — data type conversion#

Purpose: Convert floating-point xarray data to 8-bit unsigned integers (0–255) for image processing and visualization.

API Parameters:

  • data (xarray.Dataset): Input dataset from loading operations

  • time_dimension (str): Name of the time dimension (e.g., “time”)

  • xydims (list[str]): Names of spatial dimensions (e.g., [“longitude”, “latitude”])

  • min_value (float, optional): Minimum value for scaling (if None, auto-detected)

  • max_value (float, optional): Maximum value for scaling (if None, auto-detected)

Returns: xarray.Dataset with uint8 data type and scaling metadata

Key Features:

  • Automatic min/max detection when parameters are None

  • Preserves coordinate information

  • Adds data_min and data_max attributes to track scaling

  • Handles multi-temporal datasets

  • Validates input schema and ensures proper dimensionality

Example Usage:

convert_to_uint8 = ConvertToUint8(
    data=weather_data,
    time_dimension="time",
    xydims=["longitude", "latitude"],
    min_value=None,  # Auto-detect
    max_value=None   # Auto-detect
)

2. RenderUint8ToImages — rendering#

Purpose: Convert uint8 xarray datasets into image files (PNG, JPEG, TIFF) for visualization and mapping.

API Parameters:

  • data (xarray.Dataset): uint8 dataset from ConvertToUint8

  • variable (str): Name of the variable to render

  • xydims (list[str]): Spatial dimensions for rendering (e.g., [“longitude”, “latitude”])

  • time_dimension (str): Time dimension name (e.g., “time”)

  • format (str): Output image format (“PNG”, “JPEG”, “TIFF”)

  • quality (int, optional): Image quality (1-100, default: 99)

  • additional_meta_data (dict, optional): Extra metadata to include

  • return_meta_data (bool, optional): Whether to return metadata (default: True)

  • return_image_data (bool, optional): Whether to return image data (default: True)

Returns: TextureFileList containing base64-encoded images with metadata

Key Features:

  • Supports multiple image formats with quality control

  • Handles multi-temporal data (returns list of images)

  • Includes geographic metadata (lon_minmax, lat_minmax)

  • Provides timestamp information for each frame

  • Optimized for web mapping applications

  • Validates input schema requirements

Example Usage:

render_uint8_to_images = RenderUint8ToImages(
    data=uint8_data,
    variable="2m_temperature",
    xydims=["longitude", "latitude"],
    time_dimension="time",
    format="PNG",
    quality=95
)

3. VariableNorm — normalization#

Purpose: Compute the p-norm of specified variables in an xarray dataset, e.g. to combine u/v wind components into a single wind-speed field.

API Parameters:

  • data (xarray.Dataset): Input dataset containing variables to normalize

  • variables (list[str]): List of variable names to include in norm calculation

  • p (float, optional): P-value for norm calculation (default: 2.0)

    • Common values: 1 (Manhattan norm), 2 (Euclidean norm), 3 (cubic norm)

  • output_name (str, optional): Name for the output variable (default: “norm”)

Returns: xarray.Dataset with computed norm values and metadata

Key Features:

  • Supports various p-norm calculations (L1, L2, L3 norms)

  • Validates that all specified variables exist in the dataset

  • Preserves coordinate system and dimensions

  • Adds comprehensive metadata including:

    • Description of the norm calculation

    • List of variables used

    • P-value used

    • Data min/max values

  • Handles both single and multi-variable datasets

Example Usage:

variable_norm = VariableNorm(
    data=weather_data,
    variables=["u_component_of_wind", "v_component_of_wind"],
    p=2.0,  # Euclidean norm
    output_name="wind_speed"
)

Pipeline flow#

Typical order: Convert (float → uint8) → optionally VariableNorm (e.g. wind components → wind speed) → Render (uint8 → images). Coordinates are preserved so outputs work with mapping libraries like leafmap.

Typical use: weather maps (temperature, pressure, wind), multi-variable norms (e.g. wind speed from u/v), time-series or animated frames, and geographic overlays in web maps.

Example: VariableNorm with ConvertToUint8 and RenderUint8ToImages#

We extend the ECMWF flow with VariableNorm: we load u and v wind components, compute wind speed (e.g. L2 norm), then convert to uint8 and render to an image.

from helpers.imports import (
    Pipeline,
    Yield,
    PlaceParam,
    ConvertToUint8,
    VariableNorm,
    LoadEcmwfEra5Data,
)
from helpers.pipeline_helper import PipelineHelper
from helpers.image_helper import texturefilelist_to_images


# helper to make a run panel with a run button and an output area
from helpers.make_run_panel import make_run_panel

# setup the pipeline
with Pipeline() as pipeline_variable_norm:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="variables_load"),
        selection=PlaceParam(place="date"),
    )
    # normalize the data
    variable_norm = VariableNorm(
        data=data,
        variables=PlaceParam(place="variables_norm"),
        output_name=PlaceParam(place="output_name"),
        p=PlaceParam(place="p"),
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=variable_norm,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format=PlaceParam(place="format_render"),
    )
    Yield(value=render_uint8_to_images)

session = weather_fed.fed.runtime.homesite.get_session(target="local")
session.connect()
helper_variable_norm = PipelineHelper(session=session)
helper_variable_norm.prepare(pipeline_variable_norm)


# callback to run the pipeline
def run_pipeline_variable_norm(controls):
    date_pk = controls[0]
    date_str = date_pk.value.strftime("%Y-%m-%d")
    input_params_variable_norm = {
        "variables_load": ["10m_u_component_of_wind", "10m_v_component_of_wind"],
        "variables_norm": ["10m_u_component_of_wind", "10m_v_component_of_wind"],
        "date": {"time": date_str},
        "time_dimension_convert": "time",
        "xydims_convert": ["longitude", "latitude"],
        "min_value_convert": None,
        "max_value_convert": None,
        "xydims_render": ["longitude", "latitude"],
        "time_dimension_render": "time",
        "variable_render": "wind_speed",
        "format_render": "PNG",
        "output_name": "wind_speed",
        "p": 2.0,
    }
    helper_variable_norm.clear_callback_results()
    helper_variable_norm.run(input_params_variable_norm, timeout_s=100.0)
    yields = helper_variable_norm.callback_results.get("yield", [])
    return yields[0] if yields else None


# callback to display the result
def display_result_variable_norm(result, output):
    if not result:
        print("No images found")
        return
    images = texturefilelist_to_images(result)
    if images:
        display(images[0])
    else:
        print("No images found")


# hardcode date limits to what we've discovered in previous examples
start_date = datetime.strptime("1959-01-01", "%Y-%m-%d").date()
end_date = datetime.strptime("2021-12-31", "%Y-%m-%d").date()
date_picker = widgets.DatePicker(
    description="Pick a Date", value=start_date, min=start_date, max=end_date
)

# make the run panel
panel = make_run_panel(
    controls=[date_picker],
    run_callback=run_pipeline_variable_norm,
    display_callback=display_result_variable_norm,
    auto_run=True,
)

display(panel)

Render with leafmap#

We use leafmap’s velocity layer to visualize the wind vector field (u/v components) as particle streams on a map.

from leafmap import leafmap
from helpers.imports import (
    Pipeline,
    Yield,
    PlaceParam,
    ConvertToUint8,
    VariableNorm,
    LoadEcmwfEra5Data,
)
from helpers.pipeline_helper import PipelineHelper

# helper to make a run panel with a run button and an output area
from helpers.make_run_panel import make_run_panel
from helpers.image_helper import (
    texturefilelist_to_images,
    roll_image_by_degrees,
    image_to_bytes,
)

# setup the pipeline
with Pipeline() as pipeline_variable_norm_velocity:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="variables_load"),
        selection=PlaceParam(place="date"),
    )
    Yield(value=data)


session = weather_fed.fed.runtime.homesite.get_session(target="local")
session.connect()
helper_variable_norm_velocity = PipelineHelper(session=session)
helper_variable_norm_velocity.prepare(pipeline_variable_norm_velocity)

velocity_map = leafmap.Map(layers_control=True)
velocity_map.add_basemap("CartoDB.DarkMatter")


# callback to run the pipeline
def run_pipeline_variable_norm_velocity(controls):
    date_pk = controls[0]
    date_str = date_pk.value.strftime("%Y-%m-%d")
    input_params_variable_norm = {
        "variables_load": ["10m_u_component_of_wind", "10m_v_component_of_wind"],
        "date": {"time": date_str},
    }
    helper_variable_norm_velocity.clear_callback_results()
    helper_variable_norm_velocity.run(input_params_variable_norm, timeout_s=200.0)
    yields = helper_variable_norm_velocity.callback_results.get("yield", [])
    return yields if yields else None


# callback to display the result
def display_result_variable_norm_velocity(result, output):
    if not result:
        print("No images found")
        return
    # remove previous velocity layer, add_velocity does not return the layer
    for layer in velocity_map.layers:
        if layer.__class__.__name__ == "Velocity":
            velocity_map.remove(layer)
    velocity_map.add_velocity(
        result[0],
        name=f"wind_velocity-{date_picker.value.strftime('%Y-%m-%d')}",
        latitude_dimension="latitude",
        longitude_dimension="longitude",
        zonal_speed="10m_u_component_of_wind",
        meridional_speed="10m_v_component_of_wind",
        color_scale=[
            "rgb(0,0,150)",
            "rgb(0,150,0)",
            "rgb(255,255,0)",
            "rgb(255,165,0)",
            "rgb(150,0,0)",
        ],
    )
    display(velocity_map)


# make the run panel
panel = make_run_panel(
    controls=[date_picker],
    run_callback=run_pipeline_variable_norm_velocity,
    display_callback=display_result_variable_norm_velocity,
    auto_run=True,
)

display(panel)