# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import functools
import os
import pathlib
import shutil
from datetime import datetime
from importlib.metadata import version
import fsspec
import gcsfs
import nest_asyncio
import numpy as np
import xarray as xr
import zarr
from fsspec.implementations.cached import WholeFileCacheFileSystem
from loguru import logger
from tqdm.asyncio import tqdm
from earth2studio.data.utils import (
AsyncCachingFileSystem,
datasource_cache_root,
prep_data_inputs,
)
from earth2studio.lexicon import ARCOLexicon
from earth2studio.utils.type import TimeArray, VariableArray
[docs]
class ARCO:
"""Analysis-Ready, Cloud Optimized (ARCO) is a data store of ERA5 re-analysis data
currated by Google. This data is stored in Zarr format and contains 31 surface and
pressure level variables (for 37 pressure levels) on a 0.25 degree lat lon grid.
Temporal resolution is 1 hour.
Parameters
----------
cache : bool, optional
Cache data source on local memory, by default True
verbose : bool, optional
Print download progress, by default True
async_timeout : int, optional
Time in sec after which download will be cancelled if not finished successfully,
by default 600
Warning
-------
This is a remote data source and can potentially download a large amount of data
to your local machine for large requests.
Note
----
Additional information on the data repository can be referenced here:
- https://cloud.google.com/storage/docs/public-datasets/era5
"""
ARCO_LAT = np.linspace(90, -90, 721)
ARCO_LON = np.linspace(0, 359.75, 1440)
def __init__(
self, cache: bool = True, verbose: bool = True, async_timeout: int = 600
):
# Check Zarr version and use appropriate method
try:
zarr_version = version("zarr")
self.zarr_major_version = int(zarr_version.split(".")[0])
except Exception:
# Fallback to older method if version check fails
self.zarr_major_version = 2 # Assume older version if we can't determine
self._cache = cache
self._verbose = verbose
if self.zarr_major_version >= 3:
# Check to see if there is a running loop (initialized in async)
try:
loop = asyncio.get_running_loop()
loop.run_until_complete(self._async_init())
except RuntimeError:
# Else we assume that async calls will be used which in that case
# we will init the group in the call function when we have the loop
self.zarr_group = None
self.level_coords = None
else:
fs = gcsfs.GCSFileSystem(
cache_timeout=-1,
token="anon", # noqa: S106 # nosec B106
access="read_only",
block_size=8**20,
asynchronous=(self.zarr_major_version == 3),
)
if self._cache:
cache_options = {
"cache_storage": self.cache,
"expiry_time": 31622400, # 1 year
}
fs = WholeFileCacheFileSystem(fs=fs, **cache_options)
# Legacy method for Zarr < 3.0
logger.warning("Using Zarr 2.0 method for ARCO")
fs_map = fsspec.FSMap(
"gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3", fs
)
self.zarr_group = zarr.open(fs_map, mode="r")
self.level_coords = self.zarr_group["level"][:]
self.async_timeout = async_timeout
async def _async_init(self) -> None:
"""Async initialization of zarr group
Note
----
Async fsspec expects initialization inside of the execution loop
"""
fs = gcsfs.GCSFileSystem(
cache_timeout=-1,
token="anon", # noqa: S106 # nosec B106
access="read_only",
block_size=8**20,
asynchronous=True,
)
if self._cache:
cache_options = {
"cache_storage": self.cache,
"expiry_time": 31622400, # 1 year
}
fs = AsyncCachingFileSystem(fs=fs, **cache_options, asynchronous=True)
zstore = zarr.storage.FsspecStore(
fs,
path="/gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
)
self.zarr_group = await zarr.api.asynchronous.open(store=zstore, mode="r")
self.level_coords = await (await self.zarr_group.get("level")).getitem(
slice(None)
)
[docs]
def __call__(
self,
time: datetime | list[datetime] | TimeArray,
variable: str | list[str] | VariableArray,
) -> xr.DataArray:
"""Function to get data
Parameters
----------
time : datetime | list[datetime] | TimeArray
Timestamps to return data for (UTC).
variable : str | list[str] | VariableArray
String, list of strings or array of strings that refer to variables to
return. Must be in the ARCO lexicon.
Returns
-------
xr.DataArray
ERA5 weather data array from ARCO
"""
nest_asyncio.apply() # Patch asyncio to work in notebooks
try:
loop = asyncio.get_event_loop()
except RuntimeError:
# If no event loop exists, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if self.zarr_group is None:
loop.run_until_complete(self._async_init())
xr_array = loop.run_until_complete(
asyncio.wait_for(self.fetch(time, variable), timeout=self.async_timeout)
)
# Delete cache if needed
if not self._cache:
shutil.rmtree(self.cache)
return xr_array
[docs]
async def fetch(
self,
time: datetime | list[datetime] | TimeArray,
variable: str | list[str] | VariableArray,
) -> xr.DataArray:
"""Async function to get data
Parameters
----------
time : datetime | list[datetime] | TimeArray
Timestamps to return data for (UTC).
variable : str | list[str] | VariableArray
String, list of strings or array of strings that refer to variables to
return. Must be in the ARCO lexicon.
Returns
-------
xr.DataArray
ERA5 weather data array from ARCO
"""
if self.zarr_group is None:
raise ValueError(
"Zarr group is not initialized! If you are calling this \
function directly make sure the data source is initialized inside the async \
loop!"
)
time, variable = prep_data_inputs(time, variable)
# Create cache dir if doesnt exist
pathlib.Path(self.cache).mkdir(parents=True, exist_ok=True)
# Make sure input time is valid
self._validate_time(time)
xr_array = xr.DataArray(
data=np.empty(
(len(time), len(variable), len(self.ARCO_LAT), len(self.ARCO_LON))
),
dims=["time", "variable", "lat", "lon"],
coords={
"time": time,
"variable": variable,
"lat": self.ARCO_LAT,
"lon": self.ARCO_LON,
},
)
args = [
(t, i, v, j) for j, v in enumerate(variable) for i, t in enumerate(time)
]
func_map = map(functools.partial(self.fetch_wrapper, xr_array=xr_array), args)
# Launch all fetch requests
await tqdm.gather(
*func_map, desc="Fetching ARCO data", disable=(not self._verbose)
)
return xr_array
async def fetch_wrapper(
self,
e: tuple[datetime, int, str, int],
xr_array: xr.DataArray,
) -> None:
"""Small wrapper to pack arrays into the DataArray"""
out = await self.fetch_array(e[0], e[2])
xr_array[e[1], e[3]] = out
async def fetch_array(self, time: datetime, variable: str) -> np.ndarray:
"""Fetches requested array from remote store
Parameters
----------
time : datetime
Time to fetch
variable : str
Variable to fetch
Returns
-------
np.ndarray
Data
"""
if self.zarr_group is None:
raise ValueError("Zarr group is not initialized")
# Get time index (vanilla zarr doesnt support date indices)
time_index = self._get_time_index(time)
logger.debug(
f"Fetching ARCO zarr array for variable: {variable} at {time.isoformat()}"
)
try:
arco_name, modifier = ARCOLexicon[variable]
except KeyError as e:
logger.error(f"variable id {variable} not found in ARCO lexicon")
raise e
arco_variable, level = arco_name.split("::")
if self.zarr_major_version >= 3:
zarr_array = await self.zarr_group.get(arco_variable)
shape = zarr_array.shape
# Static variables
if len(shape) == 2:
data = await zarr_array.getitem(slice(None))
output = modifier(data)
# Surface variable
elif len(shape) == 3:
data = await zarr_array.getitem(time_index)
output = modifier(data)
# Atmospheric variable
else:
# Load levels coordinate system from Zarr store and check
level_index = np.searchsorted(self.level_coords, int(level))
data = await zarr_array.getitem((time_index, level_index))
output = modifier(data)
else:
# Zarr 2.0 fall back
shape = self.zarr_group[arco_variable].shape
# Static variables
if len(shape) == 2:
output = modifier(self.zarr_group[arco_variable][:])
# Surface variable
elif len(shape) == 3:
output = modifier(self.zarr_group[arco_variable][time_index])
# Atmospheric variable
else:
level_index = np.where(self.level_coords == int(level))[0][0]
output = modifier(
self.zarr_group[arco_variable][time_index, level_index]
)
return output
@property
def cache(self) -> str:
"""Get the appropriate cache location."""
cache_location = os.path.join(datasource_cache_root(), "arco")
if not self._cache:
cache_location = os.path.join(cache_location, "tmp_arco")
return cache_location
@classmethod
def _validate_time(cls, times: list[datetime]) -> None:
"""Verify if date time is valid for ARCO
Parameters
----------
times : list[datetime]
list of date times to fetch data
"""
for time in times:
if not (time - datetime(1900, 1, 1)).total_seconds() % 3600 == 0:
raise ValueError(
f"Requested date time {time} needs to be 1 hour interval for ARCO"
)
if time < datetime(year=1940, month=1, day=1):
raise ValueError(
f"Requested date time {time} needs to be after January 1st, 1940 for ARCO"
)
if time >= datetime(year=2023, month=11, day=10):
raise ValueError(
f"Requested date time {time} needs to be before November 10th, 2023 for ARCO"
)
# if not self.available(time):
# raise ValueError(f"Requested date time {time} not available in ARCO")
@classmethod
def _get_time_index(cls, time: datetime) -> int:
"""Small little index converter to go from datetime to integer index.
We don't need to do with with xarray, but since we are vanilla zarr for speed
this conversion must be manual.
Parameters
----------
time : datetime
Input date time
Returns
-------
int
Time coordinate index of ARCO data
"""
start_date = datetime(year=1900, month=1, day=1)
duration = time - start_date
return int(divmod(duration.total_seconds(), 3600)[0])
[docs]
@classmethod
def available(cls, time: datetime | np.datetime64) -> bool:
"""Checks if given date time is avaliable in the ARCO data source
Parameters
----------
time : datetime | np.datetime64
Date time to access
Returns
-------
bool
If date time is avaiable
"""
if isinstance(time, np.datetime64): # np.datetime64 -> datetime
_unix = np.datetime64(0, "s")
_ds = np.timedelta64(1, "s")
time = datetime.utcfromtimestamp(float((time - _unix) / _ds))
# Offline checks
try:
cls._validate_time([time])
except ValueError:
return False
gcs = gcsfs.GCSFileSystem(cache_timeout=-1)
try:
zarr_version = version("zarr")
zarr_major_version = int(zarr_version.split(".")[0])
except Exception:
zarr_major_version = 2
if zarr_major_version >= 3:
gcstore = zarr.storage.FsspecStore(
gcs,
path="/gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
)
else:
gcstore = gcsfs.GCSMap(
"gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
gcs=gcs,
)
zarr_group = zarr.open(gcstore, mode="r")
# Load time coordinate system from Zarr store and check
time_index = cls._get_time_index(time)
max_index = zarr_group["time"][-1]
return time_index >= 0 and time_index <= max_index