Basic Pipeline Setup#
In this tutorial we walk through building a basic DFM pipeline: we define operations, run them via the Data Federation Mesh (DFM), and read back the results.
Prerequisites#
Before you start, make sure you have:
Environment: The DFM core package with tutorial extras and weather adapters installed. See README.md for installation steps.
Federation configuration: From the repository root, run:
dfm fed config set weather_fed \ --federation-dir tutorials/weather-fed/weather_fed \ --config-path configs/federation.dfm.yaml \ --project-path configs/project.yaml
Generated code: The federation
runtimeand operation APIs generated:dfm fed gen code weather_fed
Step 1: DFM API imports#
We start by importing the core DFM classes we’ll use to define and run a Pipeline. As we go, you’ll see:
Advise — Marks a parameter for discovery so the DFM can suggest valid values.
StopToken — Sent to callbacks when a pipeline run ends.
Pipeline — The context manager we use to define a pipeline.
Yield — Sends a value from the pipeline back to the client.
PlaceParam — Binds an input parameter to a named place in the pipeline.
JobStatus — Tells us whether a job is running, finished, etc.
SingleFieldAdvice — The type of advice returned by discovery for a single parameter.
from nv_dfm_core.api import Pipeline, Yield, PlaceParam, Advise, ErrorToken, StopToken
from nv_dfm_core.api.discovery import SingleFieldAdvice
from nv_dfm_core.exec import Frame
from nv_dfm_core.session import JobStatus
from typing import Any, List, Optional
from pprint import pprint
Step 2: Pipeline-specific imports#
Next we import the weather federation runtime and the operations we’ll use in our pipeline: the ECMWF data loader and the xarray convert/render steps.
# import the weather_fed runtime to create the session
import weather_fed.fed.runtime.homesite
# import the TextureFile and TextureFileList classes
# to handle the texture file results
from nv_dfm_lib_common.schemas import TextureFile, TextureFileList
# import the operations we will use in the pipeline (from dataloader and xarray API modules)
from weather_fed.fed.api.dataloader import LoadEcmwfEra5Data
from weather_fed.fed.api.xarray import ConvertToUint8, RenderUint8ToImages
Step 3: Building the pipeline#
In the DFM, a pipeline is a graph of operations that you submit for execution. Each node is an operation from the federation’s API.
We’ll build a pipeline that:
Loads weather data from ECMWF ERA5.
Converts the floating-point values to 8-bit integers (0–255).
Renders that data as image files.
Sends the image data back to us via
Yield.
# setup the pipeline
with Pipeline() as pipeline:
# 1. Load the data from the ECMWF ERA5 dataset
data = LoadEcmwfEra5Data(
variables=PlaceParam(place="variables"),
selection=PlaceParam(place="selection"),
)
# 2. Convert the data to uint8
convert_to_uint8 = ConvertToUint8(
data=data,
time_dimension=PlaceParam(place="time_dimension_convert"),
xydims=PlaceParam(place="xydims_convert"),
min_value=PlaceParam(place="min_value_convert"),
max_value=PlaceParam(place="max_value_convert"),
)
# 3. Render the uint8 data to images
render_uint8_to_images = RenderUint8ToImages(
data=convert_to_uint8,
variable=PlaceParam(place="variable_render"),
xydims=PlaceParam(place="xydims_render"),
time_dimension=PlaceParam(place="time_dimension_render"),
format="PNG",
)
# 4. Yield output image
Yield(value=render_uint8_to_images)
Step 4: Creating a session#
To run the pipeline we need a session. DFM can run in two ways:
local — Everything runs in this process (great for this tutorial).
flare — Execution is distributed and orchestrated by NVIDIA Flare (we’ll try this later).
When we call session.prepare(pipeline), the pipeline is translated into a Petri-net-like representation for execution. In that representation, places hold tokens (data items); transitions consume tokens from input places and produce tokens into output places; activations are conditions that must hold before a transition can fire, so that the right tokens are available.
# create the session
session = weather_fed.fed.runtime.homesite.get_session(target="local")
session.connect()
# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)
Step 5: Setting up callbacks#
Results from the pipeline are delivered to callbacks. We register a callback and attach it to a place (here, the default Yield place). When the pipeline yields a value, it will be appended to our messages list.
# setup the callback for the results that we yield at the end of the pipeline
# store all results in the messages list
messages = []
def yield_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
_target_place: str,
data: Any,
):
messages.append(data)
def default_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
_target_place: str,
data: Any,
):
print("Received data: ", data)
Step 6: Executing the pipeline#
Now we run the pipeline. We pass:
Input parameters — A dict whose keys match the
placenames we used inPlaceParamin the pipeline.Place callbacks — We hook
yield_callbackto the"yield"place so we receive the image data.
You may see a warning about unhandled messages (e.g. _dfm_status_); that’s expected until we register a status callback later.
# setup all input parameters for the pipeline
# note how the keys match the names of the PlaceParam objects in the pipeline
# The PlaceParam names have to be unique across the pipeline.
input_params = {
"variables": ["total_column_water_vapour"],
"selection": {"time": "2021-12-31"},
"time_dimension_convert": "time",
"xydims_convert": ["longitude", "latitude"],
"min_value_convert": 0.0,
"max_value_convert": 80.0,
"xydims_render": ["longitude", "latitude"],
"time_dimension_render": "time",
"variable_render": "total_column_water_vapour",
}
# now we are ready to execute the prepared pipeline with the input parameters
# the results will be received in the yield callback and stored in the messages list
job = session.execute(
prepared,
input_params=input_params,
place_callbacks={"yield": yield_callback},
default_callback=default_callback,
)
Step 7: Waiting for the job to finish#
We block until the job completes (with a timeout so we don’t wait forever). The assertion checks that the job reached JobStatus.FINISHED. Note: FINISHED means the run completed; it doesn’t guarantee that no errors occurred along the way (e.g. adapter-level issues).
# don't block the application forever
TIMEOUT_S = 70.0
job.wait_until_finished(timeout=TIMEOUT_S)
assert job.get_status() == JobStatus.FINISHED
Step 8: Looking at the results#
The yield callback has been filling the messages list. Each entry is a TextureFileList; inside are TextureFile objects with data source, timestamp, metadata, and base64-encoded image bytes.
messages
Displaying the image#
Let’s decode the base64 image data and display it with Pillow.
import base64
from io import BytesIO
from PIL import Image
from IPython.display import display
def show_images(messages: List[Any]):
for message in messages:
if isinstance(message, TextureFileList):
for item in message.texture_files:
if isinstance(item, TextureFile):
image_data = base64.b64decode(item.base64_image_data)
response_image = Image.open(BytesIO(image_data))
print(item.timestamp)
display(response_image)
show_images(messages)
Step 9: Running with multiple inputs#
So far we used one set of input parameters. We can run the same pipeline over a list of parameter sets to get images for multiple dates. In the next cell we pass two different dates and run once; the results for both will show up in our yield callback.
# clear the lists
messages = []
# create a list of input parameters: everything stays the same except for the date
input_param_list = [input_params.copy(), input_params.copy()]
input_param_list[0]["selection"] = {"time": "2022-12-28"}
input_param_list[1]["selection"] = {"time": "2022-12-31"}
print(input_param_list)
job = session.execute(
prepared,
input_params=input_param_list,
place_callbacks={"yield": yield_callback},
default_callback=default_callback,
)
# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S * 40)
assert job.get_status() == JobStatus.FINISHED
print(messages)
show_images(messages)
Step 10: Yields and multiple callbacks#
We can route different results to different callbacks. The Yield operation takes an optional place name. If we use several Yields with different places (e.g. "load", "convert", "render"), we register a separate callback for each place. Below we redefine the pipeline to yield at three stages and attach a callback to each so we can inspect intermediate results.
# setup the pipeline
with Pipeline() as pipeline:
# load the data from the ECMWF ERA5 dataset
data = LoadEcmwfEra5Data(
variables=PlaceParam(place="variables"),
selection=PlaceParam(place="selection"),
)
# convert the data to uint8
convert_to_uint8 = ConvertToUint8(
data=data,
time_dimension=PlaceParam(place="time_dimension_convert"),
xydims=PlaceParam(place="xydims_convert"),
min_value=PlaceParam(place="min_value_convert"),
max_value=PlaceParam(place="max_value_convert"),
)
# render the uint8 data to images
render_uint8_to_images = RenderUint8ToImages(
data=convert_to_uint8,
variable=PlaceParam(place="variable_render"),
xydims=PlaceParam(place="xydims_render"),
time_dimension=PlaceParam(place="time_dimension_render"),
format="PNG",
)
# yield for each operation's result
Yield(value=data, place="load")
Yield(value=convert_to_uint8, place="convert")
Yield(value=render_uint8_to_images, place="render")
# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)
# setup callbacks for the intermediate results
loaded_data = []
converted_data = []
rendered_data = []
def loaded_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
_target_place: str,
data: Any,
):
loaded_data.append(data)
def converted_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
_target_place: str,
data: Any,
):
converted_data.append(data)
def rendered_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
_target_place: str,
data: Any,
):
rendered_data.append(data)
# execute the pipeline with the same input parameters as before
job = session.execute(
prepared,
input_params=input_params,
place_callbacks={
"load": loaded_callback,
"convert": converted_callback,
"render": rendered_callback,
},
)
# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)
assert job.get_status() == JobStatus.FINISHED
# print the loaded data
print(loaded_data)
# print the converted data
print(converted_data)
# print the rendered data
print(rendered_data)
Step 11: Error callback and status messages#
To see status and errors from the DFM and adapters, we register a callback for the reserved place _dfm_status_ (use the constant STATUS_PLACE_NAME). In the next cell we do that and then deliberately pass invalid input (e.g. a wrong dimension name) so we can see the error in our callback. The same callback will also receive StopToken when the run ends. Later we’ll see an alternative using a default callback.
from nv_dfm_core.api import STATUS_PLACE_NAME
error_list = []
def error_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
_target_place: str,
data: Any,
):
print(f"Error callback for place: {_target_place} called with data: {data}")
if isinstance(data, ErrorToken):
error_list.append(data)
# trigger an error by setting a bad input parameter
bad_input_params = input_params.copy()
bad_input_params["time_dimension_render"] = "notime"
# execute the pipeline with the same input parameters as before
job = session.execute(
prepared,
input_params=bad_input_params,
place_callbacks={
"load": loaded_callback,
"convert": converted_callback,
"render": rendered_callback,
STATUS_PLACE_NAME: error_callback,
},
)
# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)
assert job.get_status() == JobStatus.FINISHED
print(f"Error list: {error_list}")
Step 12: Default callback#
Instead of registering a callback per place, we can register one default callback that receives all messages not handled by a place-specific callback. That way we can handle load, convert, render, status, and stop in a single function.
# default callback
def default_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
target_place: str,
data: Any,
):
if target_place == "load":
loaded_data.append(data)
elif target_place == "convert":
converted_data.append(data)
elif target_place == "render":
rendered_data.append(data)
elif target_place == STATUS_PLACE_NAME:
if isinstance(data, ErrorToken):
error_list.append(data)
elif isinstance(data, StopToken):
stop_list.append(data)
else:
print(f"Unknown target place: {target_place}, data: {data}")
# clear the lists
loaded_data = []
converted_data = []
rendered_data = []
error_list = []
stop_list = []
# execute the pipeline with the same input parameters as before
job = session.execute(
prepared, input_params=input_params, default_callback=default_callback
)
# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)
assert job.get_status() == JobStatus.FINISHED
print(f"Loaded data:\n{loaded_data}")
print(f"Converted data:\n{converted_data}")
print(f"Rendered data:\n{rendered_data}")
print(f"Error list:\n{error_list}")
print(f"Stop list:\n{stop_list}")
Step 13: Discovery#
Sometimes we don’t know the valid parameter values ahead of time. Discovery mode lets the DFM tell us: we mark parameters with Advise() and run the pipeline with mode="discovery". The DFM returns advice (e.g. available variables, time ranges) without running the full pipeline.
from nv_dfm_core.api import DISCOVERY_PLACE_NAME
with Pipeline(mode="discovery") as pipeline:
# load the data from the ECMWF ERA5 dataset
data = LoadEcmwfEra5Data(
variables=Advise(),
selection=Advise(),
)
# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)
# setup discovery and error callbacks
discovery_list = []
error_list = []
default_messages = []
def discovery_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
target_place: str,
data: Any,
):
discovery_list.append(data)
def error_callback(
_from_site: str,
_node: int | str | None,
_frame: Frame,
target_place: str,
data: Any,
):
if isinstance(data, ErrorToken):
error_list.append(data)
# execute the pipeline with the same input parameters as before
job = session.execute(
prepared,
input_params={},
place_callbacks={
DISCOVERY_PLACE_NAME: discovery_callback,
STATUS_PLACE_NAME: error_callback,
},
)
# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)
assert job.get_status() == JobStatus.FINISHED
pprint(discovery_list)
print(error_list)
You should see SingleFieldAdvice entries for variables and selection. For more complex operations, discovery can return a decision tree of interdependent options in one go.
In the next cell we walk that advice, pick a valid date, and run the pipeline again using that date (and a bit of invalid data) to confirm it works.
from datetime import datetime, timedelta
# extract the advice from the discovery list
advice_tuple = discovery_list[0][0]
advice = advice_tuple[1]
assert isinstance(advice, SingleFieldAdvice)
# iterate over the linked list of advices and print them
# remember the start and end dates of the selection advice
while True:
print(f"field: {advice.field}, value: {advice.value}")
if advice.field == "selection":
first_date = advice.value[1]["time"]["first_date"]
last_date = advice.value[1]["time"]["last_date"]
frequency = advice.value[1]["time"]["frequency"]
print(
f"start date: {first_date}, end date: {last_date}, frequency: {frequency}"
)
if advice.edge is None:
break
advice = advice.edge
# clear the lists
messages = []
# select a date by using the discovery advice
date_obj = datetime.strptime(first_date, "%Y-%m-%d")
delta_d = timedelta(days=frequency)
date_obj += delta_d * 100
date_to_use = date_obj.strftime("%Y-%m-%d")
input_params_new = input_params.copy()
input_params_new["selection"] = {"time": date_to_use, "garbage": "garbage"}
print(input_params_new)
# setup the pipeline
with Pipeline() as pipeline:
# load the data from the ECMWF ERA5 dataset
data = LoadEcmwfEra5Data(
variables=PlaceParam(place="variables"),
selection=PlaceParam(place="selection"),
)
# convert the data to uint8
convert_to_uint8 = ConvertToUint8(
data=data,
time_dimension=PlaceParam(place="time_dimension_convert"),
xydims=PlaceParam(place="xydims_convert"),
min_value=PlaceParam(place="min_value_convert"),
max_value=PlaceParam(place="max_value_convert"),
)
# render the uint8 data to images
render_uint8_to_images = RenderUint8ToImages(
data=convert_to_uint8,
variable=PlaceParam(place="variable_render"),
xydims=PlaceParam(place="xydims_render"),
time_dimension=PlaceParam(place="time_dimension_render"),
format="PNG",
)
Yield(value=render_uint8_to_images)
# prepare the pipeline for execution, this will run a compilation step
prepared = session.prepare(pipeline)
job = session.execute(
prepared, input_params=input_params_new, place_callbacks={"yield": yield_callback}
)
# don't block the application forever
job.wait_until_finished(timeout=TIMEOUT_S)
assert job.get_status() == JobStatus.FINISHED
show_images(messages)
Step 14: Flare POC mode#
So far we’ve run everything in local mode on this machine. Next we switch to flare mode: we’ll use NVIDIA Flare in POC (Proof of Concept) mode. It still runs on this machine but execution is orchestrated by Flare, so we can test the same pipeline in a distributed setup before deploying to a real federation. First we ensure we’re in the repo root and that the POC is running.
from helpers.find_repo_root import change_directory_to_repo_root
change_directory_to_repo_root()
%%bash
pwd
# check if the POC is already running, if not start it
status_msg=$(uv run dfm poc status -f weather_fed | tail -n 1)
if [[ "$status_msg" == *"POC is running."* ]]; then
echo "POC is already running."
else
export PYTHONPATH=$(pwd)/tutorials/weather-fed:$PYTHONPATH
uv run dfm poc start -f weather_fed
fi
Now we create a Flare session (from the homesite runtime), point it at the POC workspace, and run our pipeline with target="flare". The operations are explicitly assigned to site="client1".
from pathlib import Path
# need to create the session from the admin site which is home site in our example
# all NVFlare sites are specified in `project.yaml` file of weather_fed/config
import weather_fed.fed.runtime.homesite
# clear the lists
messages = []
error_list = []
user = "homesite@nv-dfm-lib.nvidia.com"
# assuming that jupyter lab is running in the root of the DFM repository
flare_workspace = Path("./workspace/weather_fed_poc/weather_fed/").resolve()
admin_package = flare_workspace / "prod_00" / user
if not admin_package.exists():
raise RuntimeError(f"Admin package not found at {admin_package}")
flare_session = weather_fed.fed.runtime.homesite.get_session(
target="flare",
user=user,
flare_workspace=flare_workspace,
admin_package=admin_package,
)
flare_session.connect()
# setup the pipeline
# notice that we now also specify the site where to run operation
with Pipeline() as pipeline:
# load the data from the ECMWF ERA5 dataset
data = LoadEcmwfEra5Data(
site="client1",
variables=PlaceParam(place="variables"),
selection=PlaceParam(place="selection"),
)
# convert the data to uint8
convert_to_uint8 = ConvertToUint8(
site="client1",
data=data,
time_dimension=PlaceParam(place="time_dimension_convert"),
xydims=PlaceParam(place="xydims_convert"),
min_value=PlaceParam(place="min_value_convert"),
max_value=PlaceParam(place="max_value_convert"),
)
# render the uint8 data to images
render_uint8_to_images = RenderUint8ToImages(
site="client1",
data=convert_to_uint8,
variable=PlaceParam(place="variable_render"),
xydims=PlaceParam(place="xydims_render"),
time_dimension=PlaceParam(place="time_dimension_render"),
format="PNG",
)
Yield(value=render_uint8_to_images)
# prepare the pipeline for execution, this will run a compilation step
prepared = flare_session.prepare(pipeline)
print("Starting execution")
job = flare_session.execute(
prepared,
input_params=input_params,
place_callbacks={"yield": yield_callback, STATUS_PLACE_NAME: error_callback},
)
# don't block the application forever
TIMEOUT_FLARE_S = TIMEOUT_S * 10
print("Waiting for the job to finish")
job.wait_until_finished(timeout=TIMEOUT_FLARE_S)
assert job.get_status() == JobStatus.FINISHED
print(messages)
show_images(messages)
print(error_list)
When you’re done with the Flare section, stop the POC to shut down the local Flare processes.
%%bash
dfm poc stop