Basic Pipeline Setup#

In this tutorial we walk through building a basic DFM pipeline: we define operations, run them via the Data Federation Mesh (DFM), and read back the results.

Prerequisites#

Before you start, make sure you have:

  • Environment: The DFM core package with tutorial extras and weather adapters installed. See README.md for installation steps.

  • Federation configuration: From the repository root, run:

    dfm fed config set weather_fed \
      --federation-dir tutorials/weather-fed/weather_fed \
      --config-path configs/federation.dfm.yaml \
      --project-path configs/project.yaml
    
  • Generated code: The federation runtime and operation APIs generated:

    dfm fed gen code weather_fed
    

Step 1: DFM API imports#

We start by importing the core DFM classes we’ll use to define and run a Pipeline. As we go, you’ll see:

  • Advise — Marks a parameter for discovery so the DFM can suggest valid values.

  • StopToken — Sent to callbacks when a pipeline run ends.

  • Pipeline — The context manager we use to define a pipeline.

  • Yield — Sends a value from the pipeline back to the client.

  • PlaceParam — Binds an input parameter to a named place in the pipeline.

  • JobStatus — Tells us whether a job is running, finished, etc.

  • SingleFieldAdvice — The type of advice returned by discovery for a single parameter.

from nv_dfm_core.api import Pipeline, Yield, PlaceParam, Advise, ErrorToken, StopToken
from nv_dfm_core.api.discovery import SingleFieldAdvice
from nv_dfm_core.exec import Frame
from nv_dfm_core.session import JobStatus

from typing import Any, List, Optional
from pprint import pprint

Step 2: Pipeline-specific imports#

Next we import the weather federation runtime and the operations we’ll use in our pipeline: the ECMWF data loader and the xarray convert/render steps.

# import the weather_fed runtime to create the session
import weather_fed.fed.runtime.homesite

# import the TextureFile and TextureFileList classes
# to handle the texture file results
from nv_dfm_lib_common.schemas import TextureFile, TextureFileList

# import the operations we will use in the pipeline (from dataloader and xarray API modules)
from weather_fed.fed.api.dataloader import LoadEcmwfEra5Data
from weather_fed.fed.api.xarray import ConvertToUint8, RenderUint8ToImages

Step 3: Building the pipeline#

In the DFM, a pipeline is a graph of operations that you submit for execution. Each node is an operation from the federation’s API.

We’ll build a pipeline that:

  1. Loads weather data from ECMWF ERA5.

  2. Converts the floating-point values to 8-bit integers (0–255).

  3. Renders that data as image files.

  4. Sends the image data back to us via Yield.

# setup the pipeline
with Pipeline() as pipeline:
    # 1. Load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="variables"),
        selection=PlaceParam(place="selection"),
    )
    # 2. Convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # 3. Render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format="PNG",
    )
    # 4. Yield output image
    Yield(value=render_uint8_to_images)

Step 4: Creating a session#

To run the pipeline we need a session. DFM can run in two ways:

  • local — Everything runs in this process (great for this tutorial).

  • flare — Execution is distributed and orchestrated by NVIDIA Flare (we’ll try this later).

When we call session.prepare(pipeline), the pipeline is translated into a Petri-net-like representation for execution. In that representation, places hold tokens (data items); transitions consume tokens from input places and produce tokens into output places; activations are conditions that must hold before a transition can fire, so that the right tokens are available.

# create the session
session = weather_fed.fed.runtime.homesite.get_session(target="local")
session.connect()

# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)

Step 5: Setting up callbacks#

Results from the pipeline are delivered to callbacks. We register a callback and attach it to a place (here, the default Yield place). When the pipeline yields a value, it will be appended to our messages list.

# setup the callback for the results that we yield at the end of the pipeline
# store all results in the messages list
messages = []


def yield_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    _target_place: str,
    data: Any,
):
    messages.append(data)


def default_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    _target_place: str,
    data: Any,
):
    print("Received data: ", data)

Step 6: Executing the pipeline#

Now we run the pipeline. We pass:

  1. Input parameters — A dict whose keys match the place names we used in PlaceParam in the pipeline.

  2. Place callbacks — We hook yield_callback to the "yield" place so we receive the image data.

You may see a warning about unhandled messages (e.g. _dfm_status_); that’s expected until we register a status callback later.

# setup all input parameters for the pipeline
# note how the keys match the names of the PlaceParam objects in the pipeline
# The PlaceParam names have to be unique across the pipeline.
input_params = {
    "variables": ["total_column_water_vapour"],
    "selection": {"time": "2021-12-31"},
    "time_dimension_convert": "time",
    "xydims_convert": ["longitude", "latitude"],
    "min_value_convert": 0.0,
    "max_value_convert": 80.0,
    "xydims_render": ["longitude", "latitude"],
    "time_dimension_render": "time",
    "variable_render": "total_column_water_vapour",
}

# now we are ready to execute the prepared pipeline with the input parameters
# the results will be received in the yield callback and stored in the messages list
job = session.execute(
    prepared,
    input_params=input_params,
    place_callbacks={"yield": yield_callback},
    default_callback=default_callback,
)

Step 7: Waiting for the job to finish#

We block until the job completes (with a timeout so we don’t wait forever). The assertion checks that the job reached JobStatus.FINISHED. Note: FINISHED means the run completed; it doesn’t guarantee that no errors occurred along the way (e.g. adapter-level issues).

# don't block the application forever
TIMEOUT_S = 70.0
job.wait_until_finished(timeout=TIMEOUT_S)

assert job.get_status() == JobStatus.FINISHED

Step 8: Looking at the results#

The yield callback has been filling the messages list. Each entry is a TextureFileList; inside are TextureFile objects with data source, timestamp, metadata, and base64-encoded image bytes.

messages

Displaying the image#

Let’s decode the base64 image data and display it with Pillow.

import base64
from io import BytesIO
from PIL import Image
from IPython.display import display


def show_images(messages: List[Any]):
    for message in messages:
        if isinstance(message, TextureFileList):
            for item in message.texture_files:
                if isinstance(item, TextureFile):
                    image_data = base64.b64decode(item.base64_image_data)
                    response_image = Image.open(BytesIO(image_data))
                    print(item.timestamp)
                    display(response_image)


show_images(messages)

Step 9: Running with multiple inputs#

So far we used one set of input parameters. We can run the same pipeline over a list of parameter sets to get images for multiple dates. In the next cell we pass two different dates and run once; the results for both will show up in our yield callback.

# clear the lists
messages = []

# create a list of input parameters: everything stays the same except for the date
input_param_list = [input_params.copy(), input_params.copy()]
input_param_list[0]["selection"] = {"time": "2022-12-28"}
input_param_list[1]["selection"] = {"time": "2022-12-31"}
print(input_param_list)

job = session.execute(
    prepared,
    input_params=input_param_list,
    place_callbacks={"yield": yield_callback},
    default_callback=default_callback,
)

# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S * 40)

assert job.get_status() == JobStatus.FINISHED
print(messages)
show_images(messages)

Step 10: Yields and multiple callbacks#

We can route different results to different callbacks. The Yield operation takes an optional place name. If we use several Yields with different places (e.g. "load", "convert", "render"), we register a separate callback for each place. Below we redefine the pipeline to yield at three stages and attach a callback to each so we can inspect intermediate results.

# setup the pipeline
with Pipeline() as pipeline:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="variables"),
        selection=PlaceParam(place="selection"),
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format="PNG",
    )
    # yield for each operation's result
    Yield(value=data, place="load")
    Yield(value=convert_to_uint8, place="convert")
    Yield(value=render_uint8_to_images, place="render")

# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)

# setup callbacks for the intermediate results
loaded_data = []
converted_data = []
rendered_data = []


def loaded_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    _target_place: str,
    data: Any,
):
    loaded_data.append(data)


def converted_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    _target_place: str,
    data: Any,
):
    converted_data.append(data)


def rendered_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    _target_place: str,
    data: Any,
):
    rendered_data.append(data)


# execute the pipeline with the same input parameters as before
job = session.execute(
    prepared,
    input_params=input_params,
    place_callbacks={
        "load": loaded_callback,
        "convert": converted_callback,
        "render": rendered_callback,
    },
)

# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)

assert job.get_status() == JobStatus.FINISHED

# print the loaded data
print(loaded_data)

# print the converted data
print(converted_data)

# print the rendered data
print(rendered_data)

Step 11: Error callback and status messages#

To see status and errors from the DFM and adapters, we register a callback for the reserved place _dfm_status_ (use the constant STATUS_PLACE_NAME). In the next cell we do that and then deliberately pass invalid input (e.g. a wrong dimension name) so we can see the error in our callback. The same callback will also receive StopToken when the run ends. Later we’ll see an alternative using a default callback.

from nv_dfm_core.api import STATUS_PLACE_NAME

error_list = []


def error_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    _target_place: str,
    data: Any,
):
    print(f"Error callback for place: {_target_place} called with data: {data}")
    if isinstance(data, ErrorToken):
        error_list.append(data)


# trigger an error by setting a bad input parameter
bad_input_params = input_params.copy()
bad_input_params["time_dimension_render"] = "notime"

# execute the pipeline with the same input parameters as before
job = session.execute(
    prepared,
    input_params=bad_input_params,
    place_callbacks={
        "load": loaded_callback,
        "convert": converted_callback,
        "render": rendered_callback,
        STATUS_PLACE_NAME: error_callback,
    },
)


# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)

assert job.get_status() == JobStatus.FINISHED

print(f"Error list: {error_list}")

Step 12: Default callback#

Instead of registering a callback per place, we can register one default callback that receives all messages not handled by a place-specific callback. That way we can handle load, convert, render, status, and stop in a single function.

# default callback
def default_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    target_place: str,
    data: Any,
):
    if target_place == "load":
        loaded_data.append(data)
    elif target_place == "convert":
        converted_data.append(data)
    elif target_place == "render":
        rendered_data.append(data)
    elif target_place == STATUS_PLACE_NAME:
        if isinstance(data, ErrorToken):
            error_list.append(data)
        elif isinstance(data, StopToken):
            stop_list.append(data)
    else:
        print(f"Unknown target place: {target_place}, data: {data}")


# clear the lists
loaded_data = []
converted_data = []
rendered_data = []
error_list = []
stop_list = []
# execute the pipeline with the same input parameters as before
job = session.execute(
    prepared, input_params=input_params, default_callback=default_callback
)


# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)

assert job.get_status() == JobStatus.FINISHED

print(f"Loaded data:\n{loaded_data}")
print(f"Converted data:\n{converted_data}")
print(f"Rendered data:\n{rendered_data}")
print(f"Error list:\n{error_list}")
print(f"Stop list:\n{stop_list}")

Step 13: Discovery#

Sometimes we don’t know the valid parameter values ahead of time. Discovery mode lets the DFM tell us: we mark parameters with Advise() and run the pipeline with mode="discovery". The DFM returns advice (e.g. available variables, time ranges) without running the full pipeline.

from nv_dfm_core.api import DISCOVERY_PLACE_NAME

with Pipeline(mode="discovery") as pipeline:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=Advise(),
        selection=Advise(),
    )

# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)

# setup discovery and error callbacks
discovery_list = []
error_list = []
default_messages = []


def discovery_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    target_place: str,
    data: Any,
):
    discovery_list.append(data)


def error_callback(
    _from_site: str,
    _node: int | str | None,
    _frame: Frame,
    target_place: str,
    data: Any,
):
    if isinstance(data, ErrorToken):
        error_list.append(data)


# execute the pipeline with the same input parameters as before
job = session.execute(
    prepared,
    input_params={},
    place_callbacks={
        DISCOVERY_PLACE_NAME: discovery_callback,
        STATUS_PLACE_NAME: error_callback,
    },
)

# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)

assert job.get_status() == JobStatus.FINISHED

pprint(discovery_list)
print(error_list)

You should see SingleFieldAdvice entries for variables and selection. For more complex operations, discovery can return a decision tree of interdependent options in one go.

In the next cell we walk that advice, pick a valid date, and run the pipeline again using that date (and a bit of invalid data) to confirm it works.

from datetime import datetime, timedelta

# extract the advice from the discovery list
advice_tuple = discovery_list[0][0]
advice = advice_tuple[1]
assert isinstance(advice, SingleFieldAdvice)

# iterate over the linked list of advices and print them
# remember the start and end dates of the selection advice
while True:
    print(f"field: {advice.field}, value: {advice.value}")
    if advice.field == "selection":
        first_date = advice.value[1]["time"]["first_date"]
        last_date = advice.value[1]["time"]["last_date"]
        frequency = advice.value[1]["time"]["frequency"]
        print(
            f"start date: {first_date}, end date: {last_date}, frequency: {frequency}"
        )
    if advice.edge is None:
        break
    advice = advice.edge


# clear the lists
messages = []

# select a date by using the discovery advice
date_obj = datetime.strptime(first_date, "%Y-%m-%d")
delta_d = timedelta(days=frequency)
date_obj += delta_d * 100
date_to_use = date_obj.strftime("%Y-%m-%d")

input_params_new = input_params.copy()
input_params_new["selection"] = {"time": date_to_use, "garbage": "garbage"}
print(input_params_new)

# setup the pipeline
with Pipeline() as pipeline:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        variables=PlaceParam(place="variables"),
        selection=PlaceParam(place="selection"),
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format="PNG",
    )
    Yield(value=render_uint8_to_images)

# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)

job = session.execute(
    prepared, input_params=input_params_new, place_callbacks={"yield": yield_callback}
)

# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)

assert job.get_status() == JobStatus.FINISHED

show_images(messages)

Step 14: Flare POC mode#

So far we’ve run everything in local mode on this machine. Next we switch to flare mode: we’ll use NVIDIA Flare in POC (Proof of Concept) mode. It still runs on this machine but execution is orchestrated by Flare, so we can test the same pipeline in a distributed setup before deploying to a real federation. First we ensure we’re in the repo root and that the POC is running.

from helpers.find_repo_root import change_directory_to_repo_root

change_directory_to_repo_root()
%%bash

pwd

# check if the POC is already running, if not start it
status_msg=$(uv run dfm poc status -f weather_fed | tail -n 1)

if [[ "$status_msg" == *"POC is running."* ]]; then
  echo "POC is already running."
else
  export PYTHONPATH=$(pwd)/tutorials/weather-fed:$PYTHONPATH
  uv run dfm poc start -f weather_fed
fi

Now we create a Flare session (from the homesite runtime), point it at the POC workspace, and run our pipeline with target="flare". The operations are explicitly assigned to site="client1".

from pathlib import Path

# need to create the session from the admin site which is home site in our example
# all NVFlare sites are specified in `project.yaml` file of weather_fed/config
import weather_fed.fed.runtime.homesite

# clear the lists
messages = []
error_list = []

user = "homesite@nv-dfm-lib.nvidia.com"
# assuming that jupyter lab is running in the root of the DFM repository
flare_workspace = Path("./workspace/weather_fed_poc/weather_fed/").resolve()
admin_package = flare_workspace / "prod_00" / user
if not admin_package.exists():
    raise RuntimeError(f"Admin package not found at {admin_package}")

flare_session = weather_fed.fed.runtime.homesite.get_session(
    target="flare",
    user=user,
    flare_workspace=flare_workspace,
    admin_package=admin_package,
)
flare_session.connect()

# setup the pipeline
# notice that we now also specify the site where to run operation
with Pipeline() as pipeline:
    # load the data from the ECMWF ERA5 dataset
    data = LoadEcmwfEra5Data(
        site="client1",
        variables=PlaceParam(place="variables"),
        selection=PlaceParam(place="selection"),
    )
    # convert the data to uint8
    convert_to_uint8 = ConvertToUint8(
        site="client1",
        data=data,
        time_dimension=PlaceParam(place="time_dimension_convert"),
        xydims=PlaceParam(place="xydims_convert"),
        min_value=PlaceParam(place="min_value_convert"),
        max_value=PlaceParam(place="max_value_convert"),
    )
    # render the uint8 data to images
    render_uint8_to_images = RenderUint8ToImages(
        site="client1",
        data=convert_to_uint8,
        variable=PlaceParam(place="variable_render"),
        xydims=PlaceParam(place="xydims_render"),
        time_dimension=PlaceParam(place="time_dimension_render"),
        format="PNG",
    )
    Yield(value=render_uint8_to_images)

# prepare the pipeline for execution, this will run a compilation step
prepared = flare_session.prepare(pipeline)
print("Starting execution")
job = flare_session.execute(
    prepared,
    input_params=input_params,
    place_callbacks={"yield": yield_callback, STATUS_PLACE_NAME: error_callback},
)
# don't block the application forever
TIMEOUT_FLARE_S = TIMEOUT_S * 10
print("Waiting for the job to finish")
job.wait_until_finished(timeout=TIMEOUT_FLARE_S)

assert job.get_status() == JobStatus.FINISHED

print(messages)
show_images(messages)

print(error_list)

When you’re done with the Flare section, stop the POC to shut down the local Flare processes.

%%bash

dfm poc stop