ApproximateNearestNeighbors#

class spark_rapids_ml.knn.ApproximateNearestNeighbors(*, k: Optional[int] = None, algorithm: str = 'ivfflat', metric: str = 'euclidean', algoParams: Optional[Dict[str, Any]] = None, inputCol: Optional[Union[str, List[str]]] = None, idCol: Optional[str] = None, verbose: Union[int, bool] = False, **kwargs: Any)#

ApproximateNearestNeighbors retrieves k approximate nearest neighbors (ANNs) in item vectors for each query. The key APIs are similar to the NearestNeighbor class which returns the exact k nearest neighbors. The ApproximateNearestNeighbors is currently built on the CAGRA (graph-based) algorithm of cuVS, and the IVFFLAT and IVFPQ algorithms of cuML.

The current implementation build index independently on each data partition of item_df. Queries will be broadcast to all GPUs, then every query probes closest centers on individual index. Local topk results will be aggregated to obtain global topk ANNs.

CAGRA is a graph-based algorithm designed to construct a nearest neighbors graph index using either “ivf_pq” or “nn_descent” method. This index is then utilized to efficiently answer approximate nearest neighbor (ANN) queries. Graph-based algorithms have consistently demonstrated superior performance in ANN search, offering the fastest search speeds with minimal loss in search quality. Due to the high computational complexity involved in graph construction, these algorithms are particularly well-suited for GPU acceleration.

IVFFLAT algorithm trains a set of kmeans centers, then partition every item vector to the closest center. In the query processing phase, a query will be partitioned into a number of closest centers, and probe all the items associated with those centers. In the end the top k closest items will be returned as the approximate nearest neighbors.

The IVFPQ algorithm employs product quantization to compress high-dimensional vectors into compact bit representations, enabling rapid distance computation between vectors. While IVFPQ typically delivers faster search speeds compared to IVFFLAT, it does so with a tradeoff in search quality, such as reduced recall. It is important to note that the distances returned by IVFPQ are approximate and do not represent the exact distances in the original high-dimensional space.

Parameters:
k: int (default = 5)

the default number of approximate nearest neighbors to retrieve for each query.

algorithm: str (default = ‘ivfflat’)

the algorithm parameter to be passed into cuML. It currently must be ‘ivfflat’, ‘ivfpq’ or ‘cagra’. Other algorithms are expected to be supported later.

algoParams: Optional[Dict[str, Any]] (default = None)

if set, algoParam is used to configure the algorithm, on each data partition (or maxRecordsPerBatch if Arrow is enabled) of the item_df. Note this class constructs the kmeans index independently on individual data partition (or maxRecordPerBatch if Arrow is enabled). When algorithm is ‘cagra’, parameters for index construction:

  • build_algo: (str, default = ‘ivf_pq’) algorithm to build graph index, can be either ‘ivf_pq’ or ‘nn_descent’. nn_descent is expected to be generally faster than ivf_pq.

  • intermediate_graph_degree: (int, default = 128) an intermediate variable used during graph index construction.

  • graph_degree: (int, default = 64) the degree of each node in the final graph index.

When algorithm is ‘cagra’, parameters for search (full list in cuvs python API documentation):

  • itopk_size: (int, default = 64) number of intermediate search results retained during the search. Larger value improves the search accuracy but increases the search time. cuVS internally increases the value to be multiple of 32 and expects the internal value to be larger than or equal to k.

  • max_iterations (int, default = 0) maximum number of search iterations. 0 means auto select.

  • min_iterations (int, default = 0) minimum number of search iterations. 0 means auto select.

  • search_width: (int, default = 1) number of graph nodes as the initial set of search points in each iteration.

  • num_random_samplings: (int, default = 1) number of iterations for selecting initial random seed nodes.

When algorithm is ‘ivfflat’:

  • nlist: (int) number of kmeans clusters to partition the dataframe into.

  • nprobe: (int) number of closest clusters to probe for topk ANNs.

When algorithm is ‘ivfpq’:

  • nlist: (int) number of kmeans clusters to partition the dataframe into.

  • nprobe: (int) number of closest clusters to probe for topk ANNs.

  • M: (int) number of subquantizers

  • n_bits: (int) number of bits allocated per subquantizer

Note cuml requires M * n_bits to be multiple of 8 for the best efficiency.

metric: str (default = “euclidean”)

the distance metric to use. ‘ivfflat’ algorithm supports [‘euclidean’, ‘sqeuclidean’, ‘l2’, ‘inner_product’].

inputCol: str or List[str]

The feature column names, spark-rapids-ml supports vector, array and columnar as the input.

  • When the value is a string, the feature columns must be assembled into 1 column with vector or array type.

  • When the value is a list of strings, the feature columns must be numeric types.

idCol: str (default = None)

the name of the column in a dataframe that uniquely identifies each vector. idCol should be set if such a column exists in the dataframe. If idCol is not set, a column with the name unique_id will be automatically added to the dataframe and used as unique identifier for each vector.

verbose:
Logging level.
  • 0 - Disables all log messages.

  • 1 - Enables only critical messages.

  • 2 - Enables all messages up to and including errors.

  • 3 - Enables all messages up to and including warnings.

  • 4 or False - Enables all messages up to and including information messages.

  • 5 or True - Enables all messages up to and including debug messages.

  • 6 - Enables all messages up to and including trace messages.

Examples

>>> from spark_rapids_ml.knn import ApproximateNearestNeighbors
>>> data = [(0, [0.0, 0.0]),
...         (1, [1.0, 1.0]),
...         (2, [2.0, 2.0]),
...         (3, [30.0, 30.0]),
...         (4, [40.0, 40.0]),
...         (5, [50.0, 50.0]),]
>>> data_df = spark.createDataFrame(data, schema="id int, features array<float>")
>>> data_df = data_df.repartition(2) # ensure each partition having more data vectors than the 'nlist' of 'ivfflat'
>>> query = [(10, [0.0, 0.0]),
...          (11, [50.0, 50.0]),]
>>> query_df = spark.createDataFrame(query, schema="id int, features array<float>")
>>> topk = 2
>>> gpu_knn = ApproximateNearestNeighbors().setAlgorithm('ivfflat').setAlgoParams({"nlist" : 2, "nprobe": 1})
>>> gpu_knn = gpu_knn.setInputCol("features").setIdCol("id").setK(topk)
>>> gpu_model = gpu_knn.fit(data_df)
>>> (data_df, query_df, knn_df) = gpu_model.kneighbors(query_df)
>>> knn_df.show()
+--------+-------+----------------+
|query_id|indices|       distances|
+--------+-------+----------------+
|      10| [0, 1]|[0.0, 1.4142134]|
|      11| [5, 4]|[0.0, 14.142137]|
+--------+-------+----------------+
>>> data_df.show()
+---+------------+
| id|    features|
+---+------------+
|  0|  [0.0, 0.0]|
|  1|  [1.0, 1.0]|
|  4|[40.0, 40.0]|
|  2|  [2.0, 2.0]|
|  3|[30.0, 30.0]|
|  5|[50.0, 50.0]|
+---+------------+
>>> query_df.show()
+---+------------+
| id|    features|
+---+------------+
| 10|  [0.0, 0.0]|
| 11|[50.0, 50.0]|
+---+------------+
>>> knnjoin_df = gpu_model.approxSimilarityJoin(query_df, distCol="EuclideanDistance")
+-----------------+------------------+-----------------+
|          item_df|          query_df|EuclideanDistance|
+-----------------+------------------+-----------------+
|  {0, [0.0, 0.0]}|  {10, [0.0, 0.0]}|              0.0|
|  {1, [1.0, 1.0]}|  {10, [0.0, 0.0]}|        1.4142134|
|{5, [50.0, 50.0]}|{11, [50.0, 50.0]}|              0.0|
|{4, [40.0, 40.0]}|{11, [50.0, 50.0]}|        14.142137|
+-----------------+------------------+-----------------+
>>> # vector column input
>>> from spark_rapids_ml.knn import ApproximateNearestNeighbors
>>> from pyspark.ml.linalg import Vectors
>>> data = [(0, Vectors.dense([0.0, 0.0])),
...         (1, Vectors.dense([1.0, 1.0])),
...         (2, Vectors.dense([2.0, 2.0])),
...         (3, Vectors.dense([30.0, 30.0])),
...         (4, Vectors.dense([40.0, 40.0])),
...         (5, Vectors.dense([50.0, 50.0])),]
>>> data_df = spark.createDataFrame(data, ["id", "features"]).repartition(2)
>>> query = [(10, Vectors.dense([0.0, 0.0])),
...          (11, Vectors.dense([50.0, 50.0])),]
>>> query_df = spark.createDataFrame(query, ["id", "features"])
>>> topk = 2
>>> gpu_knn = ApproximateNearestNeighbors().setAlgorithm('ivfflat').setAlgoParams({"nlist" : 2, "nprobe": 1})
>>> gpu_knn = gpu_knn.setInputCol("features").setIdCol("id").setK(topk)
>>> gpu_model = gpu_knn.fit(data_df)
>>> (data_df, query_df, knn_df) = gpu_model.kneighbors(query_df)
>>> knn_df.show()
>>> # multi-column input
>>> from spark_rapids_ml.knn import ApproximateNearestNeighbors
>>> data = [(0, 0.0, 0.0),
...         (1, 1.0, 1.0),
...         (2, 2.0, 2.0),
...         (3, 30.0, 30.0),
...         (4, 40.0, 40.0),
...         (5, 50.0, 50.0),]
>>> data_df = spark.createDataFrame(data, schema="id int, f1 float, f2 float").repartition(2)
>>> query = [(10, 0.0, 0.0),
...          (11, 50.0, 50.0),]
>>> query_df = spark.createDataFrame(query, schema="id int, f1 float, f2 float")
>>> topk = 2
>>> gpu_knn = ApproximateNearestNeighbors().setAlgorithm('ivfflat').setAlgoParams({"nlist" : 2, "nprobe": 1})
>>> gpu_knn = gpu_knn.setInputCols(["f1", "f2"]).setIdCol("id").setK(topk)
>>> gpu_model = gpu_knn.fit(data_df)
>>> (data_df, query_df, knn_df) = gpu_model.kneighbors(query_df)
>>> knn_df.show()

Methods

clear(param)

Reset a Spark ML Param to its default value, setting matching cuML parameter, if exists.

copy([extra])

explainParam(param)

Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.

explainParams()

Returns the documentation of all params with their optionally default values and user-supplied values.

extractParamMap([extra])

Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.

fit(dataset[, params])

Fits a model to the input dataset with optional parameters.

fitMultiple(dataset, paramMaps)

Fits multiple models to the input dataset for all param maps in a single pass.

getAlgoParams()

Gets the value of algoParams.

getAlgorithm()

Gets the value of algorithm.

getIdCol()

Gets the value of idCol.

getInputCol()

Gets the value of inputCol or its default value.

getInputCols()

Gets the value of inputCols or its default value.

getK()

Get the value of k.

getLabelCol()

Gets the value of labelCol or its default value.

getMetric()

Gets the value of metric.

getOrDefault(param)

Gets the value of a param in the user-supplied param map or its default value.

getParam(paramName)

Gets a param by its name.

hasDefault(param)

Checks whether a param has a default value.

hasParam(paramName)

Tests whether this instance contains a param with a given (string) name.

isDefined(param)

Checks whether a param is explicitly set by user or has a default value.

isSet(param)

Checks whether a param is explicitly set by user.

set(param, value)

Sets a parameter in the embedded param map.

setAlgoParams(value)

Sets the value of algoParams.

setAlgorithm(value)

Sets the value of algorithm.

setIdCol(value)

Sets the value of idCol.

setInputCol(value)

Sets the value of inputCol or inputCols.

setInputCols(value)

Sets the value of inputCols.

setK(value)

Sets the value of k.

setMetric(value)

Sets the value of metric.

Attributes

algoParams

algorithm

cuml_params

Returns the dictionary of parameters intended for the underlying cuML class.

idCol

inputCol

inputCols

k

labelCol

metric

num_workers

Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU.

params

Returns all params ordered by name.

Methods Documentation

clear(param: Param) None#

Reset a Spark ML Param to its default value, setting matching cuML parameter, if exists.

copy(extra: Optional[ParamMap] = None) P#
explainParam(param: Union[str, Param]) str#

Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.

explainParams() str#

Returns the documentation of all params with their optionally default values and user-supplied values.

extractParamMap(extra: Optional[ParamMap] = None) ParamMap#

Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.

Parameters:
extradict, optional

extra param values

Returns:
dict

merged param map

fit(dataset: DataFrame, params: Optional[Union[ParamMap, List[ParamMap], Tuple[ParamMap]]] = None) Union[M, List[M]]#

Fits a model to the input dataset with optional parameters.

New in version 1.3.0.

Parameters:
datasetpyspark.sql.DataFrame

input dataset.

paramsdict or list or tuple, optional

an optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.

Returns:
Transformer or a list of Transformer

fitted model(s)

fitMultiple(dataset: DataFrame, paramMaps: Sequence[ParamMap]) Iterator[Tuple[int, _CumlModel]]#

Fits multiple models to the input dataset for all param maps in a single pass.

Parameters:
datasetpyspark.sql.DataFrame

input dataset.

paramMapscollections.abc.Sequence

A Sequence of param maps.

Returns:
_FitMultipleIterator

A thread safe iterable which contains one model for each param map. Each call to next(modelIterator) will return (index, model) where model was fit using paramMaps[index]. index values may not be sequential.

getAlgoParams() Dict[str, Any]#

Gets the value of algoParams.

getAlgorithm() str#

Gets the value of algorithm.

getIdCol() str#

Gets the value of idCol.

getInputCol() str#

Gets the value of inputCol or its default value.

getInputCols() List[str]#

Gets the value of inputCols or its default value.

getK() int#

Get the value of k.

getLabelCol() str#

Gets the value of labelCol or its default value.

getMetric() str#

Gets the value of metric.

getOrDefault(param: Union[str, Param[T]]) Union[Any, T]#

Gets the value of a param in the user-supplied param map or its default value. Raises an error if neither is set.

getParam(paramName: str) Param#

Gets a param by its name.

hasDefault(param: Union[str, Param[Any]]) bool#

Checks whether a param has a default value.

hasParam(paramName: str) bool#

Tests whether this instance contains a param with a given (string) name.

isDefined(param: Union[str, Param[Any]]) bool#

Checks whether a param is explicitly set by user or has a default value.

isSet(param: Union[str, Param[Any]]) bool#

Checks whether a param is explicitly set by user.

set(param: Param, value: Any) None#

Sets a parameter in the embedded param map.

setAlgoParams(value: Dict[str, Any]) P#

Sets the value of algoParams.

setAlgorithm(value: str) P#

Sets the value of algorithm.

setIdCol(value: str) P#

Sets the value of idCol. If not set, an id column will be added with column name unique_id. The id column is used to specify nearest neighbor vectors by associated id value.

setInputCol(value: Union[str, List[str]]) P#

Sets the value of inputCol or inputCols.

setInputCols(value: List[str]) P#

Sets the value of inputCols. Used when input vectors are stored as multiple feature columns.

setK(value: int) P#

Sets the value of k.

setMetric(value: str) P#

Sets the value of metric.

Attributes Documentation

algoParams = Param(parent='undefined', name='algoParams', doc='The parameters to use to set up a neighbor algorithm.')#
algorithm = Param(parent='undefined', name='algorithm', doc='The algorithm to use for approximate nearest neighbors search.')#
cuml_params#

Returns the dictionary of parameters intended for the underlying cuML class.

idCol = Param(parent='undefined', name='idCol', doc='id column name.')#
inputCol: Param[str] = Param(parent='undefined', name='inputCol', doc='input column name.')#
inputCols: Param[List[str]] = Param(parent='undefined', name='inputCols', doc='input column names.')#
k = Param(parent='undefined', name='k', doc='The number nearest neighbors to retrieve. Must be >= 1.')#
labelCol: Param[str] = Param(parent='undefined', name='labelCol', doc='label column name.')#
metric = Param(parent='undefined', name='metric', doc='The distance metric to use.')#
num_workers#

Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU.

params#

Returns all params ordered by name. The default implementation uses dir() to get all attributes of type Param.