NearestNeighbors#
- class spark_rapids_ml.knn.NearestNeighbors(*, k: Optional[int] = None, inputCol: Optional[Union[str, List[str]]] = None, idCol: Optional[str] = None, num_workers: Optional[int] = None, verbose: Union[int, bool] = False, **kwargs: Any)#
NearestNeighbors retrieves the exact k nearest neighbors in item vectors for each query vector. The main methods accept distributed CPU dataframes as inputs, leverage GPUs to accelerate computation, and take care of communication and aggregation automatically. However, it should be noted that only the euclidean distance (also known as L2 distance) is supported in the current implementations and the feature data type must be of the float type. All other data types will be converted into float during computation.
- Parameters:
- k: int (default = 5)
the default number nearest neighbors to retrieve for each query.
- inputCol: str or List[str]
The feature column names, spark-rapids-ml supports vector, array and columnar as the input.
When the value is a string, the feature columns must be assembled into 1 column with vector or array type.
When the value is a list of strings, the feature columns must be numeric types.
- idCol: str (default = None)
the name of the column in a dataframe that uniquely identifies each vector. idCol should be set if such a column exists in the dataframe. If idCol is not set, a column with the name unique_id will be automatically added to the dataframe and used as unique identifier for each vector.
- num_workers:
Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU. If not set, spark-rapids-ml tries to infer the number of cuML workers (i.e. GPUs in cluster) from the Spark environment.
- verbose:
- Logging level.
0
- Disables all log messages.1
- Enables only critical messages.2
- Enables all messages up to and including errors.3
- Enables all messages up to and including warnings.4 or False
- Enables all messages up to and including information messages.5 or True
- Enables all messages up to and including debug messages.6
- Enables all messages up to and including trace messages.
Examples
>>> from spark_rapids_ml.knn import NearestNeighbors >>> data = [(0, [1.0, 1.0]), ... (1, [2.0, 2.0]), ... (2, [3.0, 3.0]),] >>> data_df = spark.createDataFrame(data, schema="id int, features array<float>") >>> query = [(3, [1.0, 1.0]), ... (4, [3.0, 3.0]),] >>> query_df = spark.createDataFrame(query, schema="id int, features array<float>") >>> topk = 2 >>> gpu_knn = NearestNeighbors().setInputCol("features").setIdCol("id").setK(topk) >>> gpu_model = gpu_knn.fit(data_df) >>> (data_df, query_df, knn_df) = gpu_model.kneighbors(query_df) >>> knn_df.show() +--------+-------+----------------+ |query_id|indices| distances| +--------+-------+----------------+ | 3| [0, 1]|[0.0, 1.4142135]| | 4| [2, 1]|[0.0, 1.4142135]| +--------+-------+----------------+ >>> data_df.show() +---+----------+ | id| features| +---+----------+ | 0|[1.0, 1.0]| | 1|[2.0, 2.0]| | 2|[3.0, 3.0]| +---+----------+ >>> query_df.show() +---+----------+ | id| features| +---+----------+ | 3|[1.0, 1.0]| | 4|[3.0, 3.0]| +---+----------+ >>> knnjoin_df = gpu_model.exactNearestNeighborsJoin(query_df, distCol="EuclideanDistance") >>> knnjoin_df.show() +---------------+---------------+-----------------+ | item_df| query_df|EuclideanDistance| +---------------+---------------+-----------------+ |{1, [2.0, 2.0]}|{3, [1.0, 1.0]}| 1.4142135| |{0, [1.0, 1.0]}|{3, [1.0, 1.0]}| 0.0| |{2, [3.0, 3.0]}|{4, [3.0, 3.0]}| 0.0| |{1, [2.0, 2.0]}|{4, [3.0, 3.0]}| 1.4142135| +---------------+---------------+-----------------+
>>> # vector column input >>> from spark_rapids_ml.knn import NearestNeighbors >>> from pyspark.ml.linalg import Vectors >>> data = [(0, Vectors.dense([1.0, 1.0]),), ... (1, Vectors.dense([2.0, 2.0]),), ... (2, Vectors.dense([3.0, 3.0]),)] >>> data_df = spark.createDataFrame(data, ["id", "features"]) >>> query = [(3, Vectors.dense([1.0, 1.0]),), ... (4, Vectors.dense([3.0, 3.0]),)] >>> query_df = spark.createDataFrame(query, ["id", "features"]) >>> topk = 2 >>> gpu_knn = NearestNeighbors().setInputCol("features").setIdCol("id").setK(topk) >>> gpu_model = gpu_knn.fit(data_df)
>>> # multi-column input >>> from spark_rapids_ml.knn import NearestNeighbors >>> data = [(0, 1.0, 1.0), ... (1, 2.0, 2.0), ... (2, 3.0, 3.0),] >>> data_df = spark.createDataFrame(data, schema="id int, f1 float, f2 float") >>> query = [(3, 1.0, 1.0), ... (4, 3.0, 3.0),] >>> query_df = spark.createDataFrame(query, schema="id int, f1 float, f2 float") >>> topk = 2 >>> gpu_knn = NearestNeighbors().setInputCols(["f1", "f2"]).setIdCol("id").setK(topk) >>> gpu_model = gpu_knn.fit(data_df)
Methods
clear
(param)Reset a Spark ML Param to its default value, setting matching cuML parameter, if exists.
copy
([extra])Create a copy of the current instance, including its parameters and cuml_params.
explainParam
(param)Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.
Returns the documentation of all params with their optionally default values and user-supplied values.
extractParamMap
([extra])Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.
fit
(dataset[, params])Fits a model to the input dataset with optional parameters.
fitMultiple
(dataset, paramMaps)Fits multiple models to the input dataset for all param maps in a single pass.
getIdCol
()Gets the value of idCol.
Gets the value of inputCol or its default value.
Gets the value of inputCols or its default value.
getK
()Get the value of k.
Gets the value of labelCol or its default value.
getOrDefault
(param)Gets the value of a param in the user-supplied param map or its default value.
getParam
(paramName)Gets a param by its name.
hasDefault
(param)Checks whether a param has a default value.
hasParam
(paramName)Tests whether this instance contains a param with a given (string) name.
isDefined
(param)Checks whether a param is explicitly set by user or has a default value.
isSet
(param)Checks whether a param is explicitly set by user.
set
(param, value)Sets a parameter in the embedded param map.
setIdCol
(value)Sets the value of idCol.
setInputCol
(value)setInputCols
(value)Sets the value of
inputCols
.setK
(value)Sets the value of k.
Attributes
Returns the dictionary of parameters intended for the underlying cuML class.
Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU.
Returns all params ordered by name.
Methods Documentation
- clear(param: Param) None #
Reset a Spark ML Param to its default value, setting matching cuML parameter, if exists.
- copy(extra: Optional[ParamMap] = None) P #
Create a copy of the current instance, including its parameters and cuml_params.
This function extends the default copy() method to ensure the cuml_params variable is also copied. The default super().copy() method only handles _paramMap and _defaultParamMap.
- Parameters:
- extraOptional[ParamMap]
A dictionary or ParamMap containing additional parameters to set in the copied instance. Note ParamMap = Dict[pyspark.ml.param.Param, Any].
- Returns:
- P
A new instance of the same type as the current object, with parameters and cuml_params copied.
- Raises:
- TypeError
If any key in the extra dictionary is not an instance of pyspark.ml.param.Param.
- explainParam(param: Union[str, Param]) str #
Explains a single param and returns its name, doc, and optional default value and user-supplied value in a string.
- explainParams() str #
Returns the documentation of all params with their optionally default values and user-supplied values.
- extractParamMap(extra: Optional[ParamMap] = None) ParamMap #
Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into a flat param map, where the latter value is used if there exist conflicts, i.e., with ordering: default param values < user-supplied values < extra.
- Parameters:
- extradict, optional
extra param values
- Returns:
- dict
merged param map
- fit(dataset: DataFrame, params: Optional[Union[ParamMap, List[ParamMap], Tuple[ParamMap]]] = None) Union[M, List[M]] #
Fits a model to the input dataset with optional parameters.
New in version 1.3.0.
- Parameters:
- dataset
pyspark.sql.DataFrame
input dataset.
- paramsdict or list or tuple, optional
an optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.
- dataset
- Returns:
Transformer
or a list ofTransformer
fitted model(s)
- fitMultiple(dataset: DataFrame, paramMaps: Sequence[ParamMap]) Iterator[Tuple[int, _CumlModel]] #
Fits multiple models to the input dataset for all param maps in a single pass.
- Parameters:
- dataset
pyspark.sql.DataFrame
input dataset.
- paramMaps
collections.abc.Sequence
A Sequence of param maps.
- dataset
- Returns:
_FitMultipleIterator
A thread safe iterable which contains one model for each param map. Each call to next(modelIterator) will return (index, model) where model was fit using paramMaps[index]. index values may not be sequential.
- getIdCol() str #
Gets the value of idCol.
- getInputCol() str #
Gets the value of inputCol or its default value.
- getInputCols() List[str] #
Gets the value of inputCols or its default value.
- getK() int #
Get the value of k.
- getLabelCol() str #
Gets the value of labelCol or its default value.
- getOrDefault(param: Union[str, Param[T]]) Union[Any, T] #
Gets the value of a param in the user-supplied param map or its default value. Raises an error if neither is set.
- hasParam(paramName: str) bool #
Tests whether this instance contains a param with a given (string) name.
- isDefined(param: Union[str, Param[Any]]) bool #
Checks whether a param is explicitly set by user or has a default value.
- setIdCol(value: str) P #
Sets the value of idCol. If not set, an id column will be added with column name unique_id. The id column is used to specify nearest neighbor vectors by associated id value.
- setInputCols(value: List[str]) P #
Sets the value of
inputCols
. Used when input vectors are stored as multiple feature columns.
- setK(value: int) P #
Sets the value of k.
Attributes Documentation
- cuml_params#
Returns the dictionary of parameters intended for the underlying cuML class.
- idCol = Param(parent='undefined', name='idCol', doc='id column name.')#
- inputCol: Param[str] = Param(parent='undefined', name='inputCol', doc='input column name.')#
- inputCols: Param[List[str]] = Param(parent='undefined', name='inputCols', doc='input column names.')#
- k = Param(parent='undefined', name='k', doc='The number nearest neighbors to retrieve. Must be >= 1.')#
- labelCol: Param[str] = Param(parent='undefined', name='labelCol', doc='label column name.')#
- num_workers#
Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU.
- params#
Returns all params ordered by name. The default implementation uses
dir()
to get all attributes of typeParam
.
- verbose: Param[Union[int, bool]] = Param(parent='undefined', name='verbose', doc='cuml verbosity level (False, True or an integer between 0 and 6).')#