Setting up a basic Data Federation for local testing#
In this part of the How-To, we will show how to create your first Data Federation and run it locally, distributing different sites across different processes on your local machine.
Table of contents#
Prerequisites#
Before starting this guide, you need:
Python 3.10 or higher: Installed on your system (check with
python --version) along withpipandvenvBasic Python Knowledge: Familiarity with Python, async functions, and virtual environments
Your Code: The
plot_gradient.pyfile with the three functions described above
Step 1: Create Workspace and Install DFM#
Create a Workspace Directory#
Create a directory for your federation project and navigate into that new directory:
mkdir -p ~/zero-to-thirty
cd ~/zero-to-thirty
Create a Virtual Environment#
Create a virtual environment inside your workspace directory:
python -m venv .venv
Note
Make sure you have Python 3.10 or higher installed with pip and venv.
Activate the Virtual Environment#
source .venv/bin/activate
Install the DFM core package and dependencies#
pip install nv-dfm-core pillow
This installs the DFM core framework and all required dependencies.
Verify Installation#
Check that DFM is available:
dfm --version
dfm --help
You should see the DFM CLI help output with commands like fed, poc, dev, etc.
Important
Keep the virtual environment activated for all subsequent commands in this guide. If you close your terminal and come back later, remember to:
Navigate to your workspace:
cd ~/zero-to-thirtyReactivate the environment:
source .venv/bin/activate
Step 2: Create Federation Structure#
Create a directory called myfed for your federation in your workspace:
mkdir -p myfed/apps
mkdir -p myfed/configs
mkdir -p myfed/myfed/lib
Through the course of this tutorial, you will create the following content:
myfed/
├── apps/
│ └── application.ipynb # We'll create this
├── configs/
│ └── federation.dfm.yaml # We'll create this
└── myfed/
├── setup.py # We'll create this
└── lib/
├── __init__.py # We'll create this
├── create.py # We'll create this
├── subset.py # We'll create this
└── plot2d.py # We'll create this
Step 3: Write the Federation Configuration File#
The federation configuration file (federation.dfm.yaml) is the heart of your DFM setup. It defines the DFM schema version, federation info, operations, and sites. Let’s build it step by step.
Create myfed/configs/federation.dfm.yaml and we’ll fill it in section by section.
DFM Schema Version#
Start with the DFM schema version:
dfm: 1.0.0
Parameter:
dfm: DFM configuration schema version (currently1.0.0)
Federation Info#
The info section provides metadata about your federation:
info:
api-version: 0.0.1
code-package: myfed
description: "Distributed array subsetting and plotting"
Parameters:
api-version: Your federation’s API version in X.Y.Z format (for example,0.0.1)code-package: Python package name for generated code (must match your directory name)description: Human-readable description of what this federation does
This section tells DFM how to generate code and identifies your federation’s version.
Operations#
The operations section defines the API for your federation—what operations can be called, what parameters they accept, and what they return.
Each operation is defined with a hierarchical name (for example, array.Create), which creates a namespace for organizing related operations.
Define the Create Operation:#
operations:
array.Create:
description: "Create a gradient array"
parameters:
shape:
type: array
description: "Shape of the array to create"
min:
type: number
description: "Minimum value in the array"
max:
type: number
description: "Maximum value in the array"
required:
- shape
returns: object
Structure:
Operation name (
array.Create): Hierarchical name (namespace.OperationName)description: Plain text describing what this operation doesparameters: Dictionary of input parameters (empty{}means no parameters). Each parameter is defined by:type: The type of the parameter as a string (supportsstring,number,integer,boolean,array, andobject)description: Human-readable description of the parameter
required: List of required parameter names (empty[]if there are no required parameters)returns: Return type as a string
Note
The object type represents a generic Python object which will be Pickled when sent between DFM sites, and the array type represents a Python list (not a tuple!).
Define the Subset Operation:#
array.Subset:
description: "Extract slice from array"
parameters:
array:
type: object
description: "Input array"
index:
type: array
description: "Index specification (list of integers)"
required:
- array
- index
returns: object
Key difference: This operation has parameters:
array: Input parameter (typeobjectfor complex Python objects likendarray)index: Input parameter (typearrayfor list ofintindices)
Each parameter has:
type: Type hint (object,string,integer,number,boolean,array)description: What this parameter is for
Define Plot2D Operation:#
array.Plot2D:
description: "Convert 2D array to grayscale image"
parameters:
array:
type: object
description: "2D array to plot"
required:
- array
returns: object
How Operations Map to Adapters#
Important
The operation definition in the config file directly maps to your adapter class structure.
parameters→ Arguments to the adapter’sbody()methodEach parameter name becomes a keyword argument to
body()Example:
array.Subsethas parametersarrayandindex, so its adapter’sbody()will be:async def body(self, array: np.ndarray, index: list) -> np.ndarray:
returns→ Return value of the adapter’sbody()methodThe adapter’s
body()must return a value compatible with the configured return typeExample:
array.Createshould return:return array
The returned value can be passed directly to downstream operations in a pipeline
Site
args(defined in sites section) → Maps values to the adapter’sbody()method argumentsEvery adapter receives
site: Siteandprovider: Provider | Noneto its__init__()automaticallyThe
argssection maps operation parameters and other values tobody()arguments usingfrom-param,const, etc.If an operation has no parameters,
argscan be empty ({})
We’ll see this mapping in action when we create the adapters in Step 4.
Sites#
The sites section defines:
What sites exist in your federation
Which operations each site can execute
How operations map to your adapter implementations
How to pass arguments to adapters
Each site has an info subsection and an interface subsection.
Define Homesite:#
sites:
homesite:
info:
description: "User's notebook/application site"
interface: {}
Special site: Every federation needs a homesite where your application runs (your Jupyter notebook, in this example). It typically has an empty interface because it doesn’t execute operations—it just sends work to other sites and receives results.
Structure:
Site name (
homesite): Special reserved name for the application siteinfo: Metadata about the sitedescription: What this site does
interface: Maps operations to adapters (empty for homesite)
Define Loader Site:#
Now, we will define the loader site, where the arrays are created and sliced. On this site, we co-locate the array.Create operation and the array.Subset operation because the initial array may be too large to communicate. But communicating a subset of the original array may be more manageable. Thus, we can string together array.Create and array.Subset operations and ensure that both operations will be done on the same site, preventing communication of the large array.
loader:
info:
description: "Site where arrays are constructed and sliced"
interface:
"#/operations/array.Create":
adapter: myfed.lib.Create
args:
shape:
from-param: shape
min:
const: -100.0
expose-as:
param: min
type: number
description: "Minimum value in the array"
max:
const: 100.0
expose-as:
param: max
type: number
description: "Maximum value in the array"
"#/operations/array.Subset":
adapter: myfed.lib.Subset
args:
array:
from-param: array
index:
from-param: index
Structure:
Site name (
loader): Arbitrary name for this siteinfo: Metadatainterface: Maps operations to adaptersKey (
"#/operations/array.Create"): JSON pointer reference to operationFormat:
"#/operations/<operation-name>"The
#refers to the root of the config fileadapter: Full Python path to adapter classFormat:
<package>.lib.<ClassName>or<package>.lib.<module>.<ClassName>Example:
myfed.lib.Create
args: Theargssection maps operation parameters to the adapter’sbody()method arguments:from-param: Pass the operation parameter directly to the correspondingbody()argumentconst: Pass the given constant value to thebody()argumentexpose-as: Allow the user to override theconstvalueparam: The operation parameter to map to the adapter’sbody()method argumenttype: The parameter typedescription: Human-readable description of the parameter
Define Plotter Site:#
plotter:
info:
description: "Site for visualization"
interface:
"#/operations/array.Plot2D":
adapter: myfed.lib.Plot2D
args:
array:
from-param: array
Same pattern - args maps the array parameter from the operation to the adapter’s body() method.
Complete Configuration File#
Putting it all together, your complete myfed/configs/federation.dfm.yaml should look like:
dfm: 1.0.0
info:
api-version: 0.0.1
code-package: myfed
description: "Distributed array subsetting and plotting"
operations:
array.Create:
description: "Create a gradient array"
parameters:
shape:
type: array
description: "Shape of the array to create"
min:
type: number
description: "Minimum value in the array"
max:
type: number
description: "Maximum value in the array"
required:
- shape
returns: object
array.Subset:
description: "Extract slice from array"
parameters:
array:
type: object
description: "Input array"
index:
type: array
description: "Index specification (list of integers)"
required:
- array
- index
returns: object
array.Plot2D:
description: "Convert 2D array to grayscale image"
parameters:
array:
type: object
description: "2D array to plot"
required:
- array
returns: object
sites:
homesite:
info:
description: "User's notebook/application site"
interface: {}
loader:
info:
description: "Site where arrays are constructed and sliced"
interface:
"#/operations/array.Create":
adapter: myfed.lib.Create
args:
shape:
from-param: shape
min:
const: -100.0
expose-as:
param: min
type: number
description: "Minimum value in the array"
max:
const: 100.0
expose-as:
param: max
type: number
description: "Maximum value in the array"
"#/operations/array.Subset":
adapter: myfed.lib.Subset
args:
array:
from-param: array
index:
from-param: index
plotter:
info:
description: "Site for visualization"
interface:
"#/operations/array.Plot2D":
adapter: myfed.lib.Plot2D
args:
array:
from-param: array
Understanding the Flow#
When you write a pipeline:
result = Create(site="loader")
The DFM uses this configuration to:
Look up the
array.Createoperation definition (parameters and returns)Find which site implements it (
loaderinsitessection via"#/operations/array.Create")Find the adapter class (
myfed.lib.Create)Map any arguments using the
argssection (withfrom-param,const, etc.)Generate code that calls that adapter at runtime
This configuration file is the “contract” between your application code and the distributed execution. The JSON pointer syntax ("#/operations/...") explicitly links interface implementations to operation definitions.
Step 4: Create Adapter Implementations#
Now wrap your original functions in the body() method of DFM adapter classes. Remember the mapping from Step 4:
parametersin the config → define what arguments thebody()method acceptsreturnsin the config → dictionary returned bybody()methodargsin the config → maps operation parameters (and other values) tobody()method arguments usingfrom-param,const, etc.
Each adapter class must have:
An
__init__(self, site: Site, provider: Provider | None)method (receives site context)An
async def body(self, **params)method where**paramsare the arguments mapped from the config’sargssection
Create myfed/myfed/lib/create.py:#
import numpy as np
from pathlib import Path
from nv_dfm_core.exec import Site, Provider
class Create:
"""Adapter for creating 3D gradient arrays."""
def __init__(self, site: Site, provider: Provider | None):
self._site = site
async def body(self, shape: tuple[int, ...], min: float = 0.0, max: float = 1.0) -> np.ndarray:
ndims = len(shape)
grads = [np.arange(s) for s in shape]
grad = sum(g[tuple(slice(None) if i == j else np.newaxis for i in range(ndims))] / (shape[j] - 1) for j, g in enumerate(grads)) / ndims
return np.asarray((max - min) * grad + min)
Create myfed/myfed/lib/subset.py:#
import numpy as np
from nv_dfm_core.exec import Site, Provider
class Subset:
"""Adapter for subsetting arrays."""
def __init__(self, site: Site, provider: Provider | None):
self._site = site
async def body(self, array: np.ndarray, index: list) -> np.ndarray:
return array[tuple(index)]
Create myfed/myfed/lib/plot2d.py:#
import numpy as np
from PIL import Image
from nv_dfm_core.exec import Site, Provider
class Plot2D:
"""Adapter for plotting 2D arrays."""
def __init__(self, site: Site, provider: Provider | None):
self._site = site
async def body(self, array: np.ndarray):
if len(np.shape(array)) != 2:
raise ValueError("Array must be 2D to be plotted")
normalized = ((array - array.min()) / (array.max() - array.min()) * 255).astype(np.uint8)
return Image.fromarray(normalized, mode='L')
Create myfed/myfed/lib/__init__.py:#
Finally, to make the myfed/myfed/lib subdirectory an importable package, we need to create the __init__.py file:
from .create import Create
from .subset import Subset
from .plot2d import Plot2D
__all__ = ["Create", "Subset", "Plot2D"]
Step 5: Register and Generate Federation Code#
Before generating code for your federation, you need to register your federation with DFM so it knows where to find the configuration files.
Create Federation Registry (First Time Only)#
If this is your first time using DFM, create an empty federation registry file:
# From your workspace directory (~/zero-to-thirty)
dfm fed config create-default
This creates a federations.yaml file that will track all your registered federations. The file location defaults to federations.yaml in the workspace directory.
Note
You only need to run this command once per workspace
The file is initially empty — you’ll add your federation configurations in the next step
If you already have a
federations.yamlfile, skip this step
Register the Federation#
From your workspace directory, register your federation:
# Make sure you're in the workspace root (where federations.yaml exists)
cd ~/zero-to-thirty
dfm fed config set myfed \
--federation-dir myfed \
--config-path configs/federation.dfm.yaml \
--project-path None
NOTE: At the moment, you need to supply a --project-path even though we do not need it for local execution. Since we are not providing any NVFlare project configuration for our local setup, we can just supply None.
Parameters:
myfed- The federation name (must matchcode_pkg_namein your config)--federation-dir myfed- The federation root directory (relative to workspace root)--config-path configs/federation.dfm.yaml- Path to your federation config file (relative to the federation directory, not the workspace)--project-path None- Path to the NVFlare project configuration file, which we don’t need so we supplyNone.
Important Path Resolution:
--federation-diris relative to the workspace root directory (wherefederations.yamlexists)--config-pathis relative to the--federation-dirpathExample: With
--federation-dir myfedand--config-path configs/federation.dfm.yaml, the final path will bemyfed/configs/federation.dfm.yaml
This adds your federation to the federations.yaml registry file, which tracks all your federation configurations.
Generate Federation Code#
Now that your federation myfed is registered, generate the DFM runtime code from your configuration:
# Still in the workspace root directory
dfm fed gen code myfed --output-dir myfed
This creates:
myfed/myfed/fed/api/- Operation classes (array.pywithCreate,Subset,Plot2D)myfed/myfed/fed/site/- Site-specific APIsmyfed/myfed/fed/runtime/- Runtime execution code for each site
You should see output confirming the code generation completed successfully.
Notes:
The
--output-dir myfedtells DFM to generate code inside themyfed/myfeddirectory, as this output directory is relative to thefederation_dirin themyfedsection of yourfederations.yamlfile (created with thedfm fed config create-defaultcommand above)You may see warnings like “Compute cost not set for operation… Using suboptimal default values” - these are informational warnings about optimization settings and can be safely ignored for now. Compute costs are optional metadata used by DFM’s optimizer to make scheduling decisions, but they’re not required for basic pipeline execution.
Install the Generated Code as a Package#
Important
You must install the generated myfed code as a Python package so that local target execution can import myfed.fed.runtime.* modules. Skipping this step will cause import errors when running the pipeline.
The myfed/myfed directory in your workspace needs to be an installable package so that individual sites can import the federation code directly. To do this, we provide a simple setup.py script in the myfed/ directory and install it into our workspace environment (so we can use it directly in our notebook).
# From the workspace root (~/zero-to-thirty)
# Create a setup.py to handle the package structure
cat > myfed/setup.py << 'EOF'
from setuptools import setup, find_packages
setup(
name="myfed",
version="0.1.0",
packages=find_packages(where="."),
install_requires=[
"nv-dfm-core",
"numpy",
"pillow",
],
)
EOF
# Install myfed in editable mode
pip install -e myfed/
This installs the myfed package so local target workers can import myfed.fed.runtime.* modules.
Step 6: Create a Jupyter Notebook for testing#
Next, we want to create a Jupyter Notebook were we can create and submit DFM pipelines using the distributed functions we’ve just created.
First, install JupyterLab if you haven’t already:
pip install jupyterlab
Then launch JupyterLab from the workspace root directory:
cd ~/zero-to-thirty
jupyter lab
Create a new notebook at myfed/apps/application.ipynb (following the same structure as the example federation).
Add the following cells to your notebook:
Cell 1: Setup imports#
from pathlib import Path
# Import from myfed package (installed via pip install -e myfed/)
from myfed.fed.runtime.homesite import get_session
from myfed.fed.api.array import Create, Subset, Plot2D
from nv_dfm_core.api import Pipeline, Yield, PlaceParam
from nv_dfm_core.session import JobStatus
Cell 2: Connect to federation#
session = get_session(target="local")
session.connect()
print("Connected to local federation!")
Cell 3: Define a pipeline#
with Pipeline() as p:
# Tell the loader site to create a 200x300x400 array with values ranging from -500 to 500
array = Create(site="loader", shape=(200,300,400), min=-500, max=500)
# Tell the loader site to subset that array to get a 2D slice
array0 = Subset(site="loader", array=array, index=[0])
# Plot at plotter site (array0 will be sent to plotter)
image = Plot2D(site="plotter", array=array0)
# Return image to notebook
Yield(value=image)
Cell 4: Prepare the pipeline#
prepared = session.prepare(p)
print("Pipeline prepared!")
Cell 5: Execute the pipeline and collect results#
Note
The callback receives all data sent to the homesite, including:
Yielded results from
YieldoperationsStopToken(indicating pipeline completion)Status/error tokens
This means you’ll see multiple “CALLBACK CALLED” messages (typically 3), but only the actual yielded data is added to the results list.
results = []
def collect_result(from_site, from_node, frame, target_place, data):
from nv_dfm_core.api import StopToken, ErrorToken
print(f"CALLBACK CALLED: from_site={from_site}, target_place={target_place}, data type={type(data)}")
# Only collect actual yielded results, not control tokens
if not isinstance(data, (StopToken, ErrorToken)):
results.append(data)
print("Submitting job...")
job = session.execute(
prepared,
input_params={},
default_callback=collect_result
)
print(f"Job submitted: {job.job_id}")
Cell 6: Wait for completion and check status#
success = job.wait_until_finished(timeout=60.0) # 60 second timeout
status = job.get_status()
print(f"Job completed: {success}")
print(f"Job status: {status}")
# If job failed, there might be errors to check
if status != JobStatus.FINISHED:
print(f"WARNING: Job did not finish successfully. Check local worker logs printed in the notebook/terminal")
Cell 7: Display the image#
if results:
result_image = results[0] # Get the image from results
display(result_image)
else:
print("No results received")
Cell 8: Cleanup (optional)#
session.close()
Step 7: Run Your Notebook#
Start Jupyter from the myfed directory:
cd myfed/apps jupyter lab
Run all cells in sequence
You should see:
Connection confirmation
Pipeline preparation confirmation
Progress messages during execution
The grayscale image displayed in the notebook
Understanding What Happened#
Create executed on the
loadersite:Constructed a gradient 3D array
Sent the 3D array as a token to the next operation
Subset executed on the
loadersite:Received the 3D array token
Extracted slice
[0, :, :](first 2D slice)Sent the 2D array as a token to the next operation
Plot2D executed on the
plottersite:Received the 2D array token
Converted to grayscale PIL Image
Sent the image back to homesite
Yield routed the result to your notebook:
Image received via callback
Displayed in Jupyter
Customizing the Pipeline#
In your notebook, try changing this basic setup.
Try extracting a different slice:#
# Extract a different slice along axis 0
array_slice = Subset(site="loader", array=array, index=[5])
Cleanup#
When done, you can shutdown your Jupyter Lab server (CTRL-C) and then deactivate your environment.
Next Steps#
Flare Variant: See 02-flare-poc-mode.md for distributed Flare/POC execution.