import warnings
from typing import Optional, Tuple
import numpy as np
from numpy import ndarray
from .bandwidth_selection import direct_plugin, kernel_properties, normal_reference
from .ckde import CKDE
from .cutils import assign_labels, compute_d, gradient_ascent, mean_shift
[docs]class CKDEClassification:
"""Bayes' classifier based on conditional kernel density estimation.
TODO: <MATH FORMULA and READ MORE and REFERENCES>
Parameters
----------
kernel_name : {'gaussian', 'uniform', 'epanechnikov', 'cauchy'}, default='gaussian'
Name of kernel function.
Examples
--------
>>> # Prepare data for two classes
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train1 = np.full(m_train // 2, 1)
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train2 = np.full(m_train // 2, 2)
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> labels_train = np.concatenate((labels_train1, labels_train2))
>>> y_star = np.array([0.0] * n_y)
>>> # Fit
>>> classifier = CKDEClassification().fit(x_train, y_train, y_star, labels_train)
"""
def __init__(self, kernel_name: str = "gaussian"):
if kernel_name not in kernel_properties:
available_kernels = list(kernel_properties.keys())
raise ValueError(f"invalid 'kernel_name' - try one of {available_kernels}")
self.kernel_name = kernel_name
self.fitted = False
[docs] def fit(
self,
x_train: ndarray,
y_train: ndarray,
y_star: ndarray,
labels_train: ndarray,
weights_train: Optional[ndarray] = None,
share_bandwidth: bool = False,
bandwidths_x: Optional[ndarray] = None,
bandwidths_y: Optional[ndarray] = None,
bandwidth_method: str = "normal_reference",
prior_prob: Optional[ndarray] = None,
**kwargs,
):
"""Fit the classifier.
Parameters
----------
x_train : ndarray of shape (m_train, n_x)
Data points (describing variables) as an array containing data with float
type.
y_train : ndarray of shape (m_train, n_y)
Data points (conditioning variables) as an array containing data with float
type.
y_star : ndarray of shape (n_y,)
Conditioned value.
labels_train : ndarray of shape (m_train,)
Labels of data points as an array containing data with int type.
weights_train : ndarray of shape (m_train,), default=None
Weights for data points. If None, all points are equally weighted.
share_bandwidth : bool, default=False
Determines whether all classes should have common bandwidth.
If False, estimator of each class gets its own bandwidth.
bandwidths_x : ndarray of shape (n_classes, n_x), optional
Smoothing parameter of describing variables for each class.
bandwidths_y : ndarray of shape (n_classes, n_y), optional
Smoothing parameter of conditioning variables for each class.
bandwidth_method : {'normal_reference', 'direct_plugin', 'ste_plugin', \
'ml_cv'}, default='normal_reference'
Name of bandwidth selection method used to compute smoothing parameter.
prior_prob : ndarray of shape (n_classes,), default=None
Prior probabilities of each class. If None, all classes are equally
probable.
Returns
-------
self : object
Fitted self instance of CKDEClassification.
Examples
--------
>>> # Prepare data for two classes
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train1 = np.full(m_train // 2, 1)
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train2 = np.full(m_train // 2, 2)
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> labels_train = np.concatenate((labels_train1, labels_train2))
>>> y_star = np.array([0.0] * n_y)
>>> weights_train = np.random.uniform(0, 1, size=(m_train,))
>>> # Fit
>>> prior_prob = np.array([0.3, 0.7])
>>> params = (x_train, y_train, y_star, labels_train, weights_train)
>>> classifier = CKDEClassification().fit(*params, prior_prob=prior_prob)
"""
if x_train.ndim != 2:
raise ValueError("invalid shape of 'x_train' - should be 2d")
self.x_train = x_train
self.m_train = self.x_train.shape[0]
n_x = self.x_train.shape[1]
if y_train.ndim != 2:
raise ValueError("invalid shape of 'y_train' - should be 2d")
if y_train.shape[0] != x_train.shape[0]:
raise ValueError("invalid size of 'y_train'")
self.y_train = y_train
n_y = self.y_train.shape[1]
if y_star.ndim != 1:
raise ValueError("invalid shape of 'y_star' - should be 1d")
if y_star.shape[0] != n_y:
raise ValueError(f"invalid size of 'y_star'- should contain {n_y} values")
self.y_star = y_star
if labels_train.ndim != 1:
raise ValueError("invalid shape of 'labels_train' - should be 1d")
if not np.issubdtype(labels_train.dtype, np.integer):
raise ValueError("invalid dtype of 'labels_train' - should be of int type")
self.labels_train = labels_train
if weights_train is None:
self.weights_train = np.full(self.m_train, 1 / self.m_train)
else:
if weights_train.ndim != 1:
raise ValueError("invalid shape of 'weights_train' - should be 1d")
if weights_train.shape[0] != x_train.shape[0]:
raise ValueError("invalid size of 'weights_train'")
if not (weights_train > 0).all():
raise ValueError("'weights_train' must be positive")
self.weights_train = weights_train / weights_train.sum()
self.ulabels = np.unique(labels_train) # Sorted unique labels
self.n_classes = self.ulabels.shape[0]
self.bandwidth_method = bandwidth_method
if share_bandwidth:
if self.bandwidth_method == "normal_reference":
bandwidth_y = normal_reference(y_train, weights_train, self.kernel_name)
bandwidth_x = normal_reference(
self.x_train, weights_train, self.kernel_name
)
elif self.bandwidth_method == "direct_plugin":
stage = kwargs["stage"] if "stage" in kwargs else 2
bandwidth_y = direct_plugin(
y_train, weights_train, self.kernel_name, stage
)
bandwidth_x = direct_plugin(
self.x_train, weights_train, self.kernel_name, stage
)
else:
raise ValueError("invalid 'bandwidth_method'")
self.bandwidths_x = np.full((self.n_classes, n_x), bandwidth_x)
self.bandwidths_y = np.full((self.n_classes, n_y), bandwidth_y)
else:
if bandwidths_x is not None and bandwidths_y is not None:
self.bandwidths_x = bandwidths_x
self.bandwidths_y = bandwidths_y
else:
self.bandwidths_x = np.full((self.n_classes,), bandwidths_x)
self.bandwidths_y = np.full((self.n_classes,), bandwidths_y)
if prior_prob is None:
self.prior = self._compute_prior()
else:
if prior_prob.ndim != 1:
raise ValueError("invalid shape of 'prior_prob' - should be 1d")
if prior_prob.shape[0] != self.n_classes:
raise ValueError(
f"invalid size of 'prior_prob' - should contain {self.n_classes} "
"values"
)
self.prior = prior_prob / prior_prob.sum()
self.kwargs = kwargs
self.fitted = True
return self
[docs] def predict(self, x_test: ndarray) -> ndarray:
"""Predict class labels.
Parameters
----------
x_test : ndarray of shape (m_test, n_x)
Grid data points (describing variables) as an array containing data with
float type.
Returns
-------
labels_pred : ndarray of shape (m_test,)
Predicted labels as an array containing data with int type.
Examples
--------
>>> # Prepare data for two classes
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> m_test = 10
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train1 = np.full(m_train // 2, 1)
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train2 = np.full(m_train // 2, 2)
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> labels_train = np.concatenate((labels_train1, labels_train2))
>>> y_star = np.array([0.0] * n_y)
>>> # Fit the classifier
>>> x_test = np.random.uniform(-1, 4, size=(m_test, n_x))
>>> params = (x_train, y_train, y_star, labels_train)
>>> classifier = CKDEClassification().fit(*params)
>>> # Predict labels
>>> labels_pred = classifier.predict(x_test) # labels_pred shape (10,)
"""
if not self.fitted:
raise RuntimeError("fit the model first")
if x_test.ndim != 2:
raise ValueError("invalid shape of 'x_test' - should be 2d")
labels_pred, _ = self._classify(x_test)
return labels_pred
[docs] def pdfs(self, x_test: ndarray) -> ndarray:
"""Compute pdf of each class.
Parameters
----------
x_test : ndarray of shape (m_test, n_x)
Grid data points (describing variables) as an array containing data with
float type.
Returns
-------
scores : ndarray of shape (m_test, n_classes)
Predicted scores as an array containing data with float type.
Examples
--------
>>> # Prepare data for two classes
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> m_test = 10
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train1 = np.full(m_train // 2, 1)
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> labels_train2 = np.full(m_train // 2, 2)
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> labels_train = np.concatenate((labels_train1, labels_train2))
>>> y_star = np.array([0.0] * n_y)
>>> # Fit the classifier
>>> x_test = np.random.uniform(-1, 4, size=(m_test, n_x))
>>> params = (x_train, y_train, y_star, labels_train)
>>> classifier = CKDEClassification().fit(*params)
>>> # Compute pdf of each class
>>> scores = classifier.pdfs(x_test) # scores shape (10, 2)
"""
if not self.fitted:
raise RuntimeError("fit the classifier first")
if x_test.ndim != 2:
raise ValueError("invalid shape of 'x_test' - should be 2d")
_, scores = self._classify(x_test)
return scores
def _compute_prior(self) -> ndarray:
prior = np.empty(self.ulabels.shape)
for idx, label in enumerate(self.ulabels):
mask = self.labels_train == label
prior[idx] = self.labels_train[mask].shape[0] / self.m_train
return prior
def _classify(self, x_test: ndarray) -> Tuple[ndarray, ndarray]:
scores = np.empty((x_test.shape[0], self.n_classes))
for idx, label in enumerate(self.ulabels):
mask = self.labels_train == label
ckde = CKDE(self.kernel_name).fit(
self.x_train[mask],
self.y_train[mask],
self.y_star,
self.weights_train[mask],
self.bandwidths_x[idx],
self.bandwidths_y[idx],
self.bandwidth_method,
**self.kwargs,
)
scores[:, idx] = ckde.pdf(x_test)
if np.any(np.all(scores == 0, axis=1)):
warnings.warn(
"some labels have been predicted randomly (zero probability issue) - "
"try again with continuous kernel"
)
labels_pred = self.ulabels[np.argmax(self.prior * scores, axis=1)]
return labels_pred, scores
[docs]class CKDEOutliersDetection:
"""Outliers detection based on conditional kernel density estimation.
TODO: <READ MORE>
Parameters
----------
kernel_name : {'gaussian', 'uniform', 'epanechnikov', 'cauchy'}, default='gaussian'
Name of kernel function.
Examples
--------
>>> # Prepare data
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train = np.random.normal(0, 1, size=(m_train, n_x))
>>> y_train = np.random.normal(0, 1, size=(m_train, n_y))
>>> y_star = np.array([0.0] * n_y)
>>> # Fit the outliers detector
>>> params = (x_train, y_train, y_star)
>>> outliers_detector = CKDEOutliersDetection("gaussian").fit(*params)
"""
def __init__(self, kernel_name: str = "gaussian"):
if kernel_name not in kernel_properties:
available_kernels = list(kernel_properties.keys())
raise ValueError(f"invalid 'kernel_name' - try one of {available_kernels}")
self.kernel_name = kernel_name
self.fitted = False
[docs] def fit(
self,
x_train: ndarray,
y_train: ndarray,
y_star: ndarray,
weights_train: Optional[ndarray] = None,
bandwidth_x: Optional[ndarray] = None,
bandwidth_y: Optional[ndarray] = None,
bandwidth_method: str = "normal_reference",
r: float = 0.1,
**kwargs,
):
"""Fit the outliers detector.
Parameters
----------
x_train : ndarray of shape (m_train, n_x)
Data points (describing variables) as an array containing data with float
type.
y_train : ndarray of shape (m_train, n_y)
Data points (conditioning variables) as an array containing data with float
type.
y_star : ndarray of shape (n_y,)
Conditioned value.
weights_train : ndarray of shape (m_train,), default=None
Weights for data points. If None is passed, all points are equally weighted.
bandwidth_x : ndarray of shape (n_x,), optional
Smoothing parameter of describing variables.
bandwidth_y : ndarray of shape (n_y,), optional
Smoothing parameter of conditioning variables.
bandwidth_method : {'normal_reference', 'direct_plugin'}, \
default='normal_reference'
Name of bandwidth selection method used to compute smoothing parameter
when `bandwidth` is not given explicitly.
r : float, default=0.1
Threshold separating outliers and inliers.
Returns
-------
self : object
Fitted self instance of CKDEOutliersDetection.
Examples
--------
>>> # Prepare data
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train = np.random.normal(0, 1, size=(m_train, n_x))
>>> y_train = np.random.normal(0, 1, size=(m_train, n_y))
>>> y_star = np.array([0.0] * n_y)
>>> weights_train = np.random.uniform(0, 1, size=(m_train,))
>>> # Fit the outliers detector
>>> params = (x_train, y_train, y_star, weights_train)
>>> outliers_detector = CKDEOutliersDetection().fit(*params, r=0.1)
"""
if r < 0 or r > 1:
raise ValueError("invalid value of 'r' - should be in range [0, 1]")
self.ckde = CKDE(self.kernel_name).fit(
x_train,
y_train,
y_star,
weights_train,
bandwidth_x,
bandwidth_y,
bandwidth_method,
**kwargs,
)
scores = self.ckde.pdf(x_train)
idx_sorted = np.argsort(scores)
scores_ord = scores[idx_sorted]
cond_weights_train_ord = self.ckde.cond_weights_train[idx_sorted]
cond_weights_train_ord_cumsum = np.cumsum(cond_weights_train_ord)
k = np.where(cond_weights_train_ord_cumsum > r)[0][0] - 1
tmp1 = (cond_weights_train_ord_cumsum[k + 1] - r) * scores_ord[k]
tmp2 = (r - cond_weights_train_ord_cumsum[k]) * scores_ord[k + 1]
self.threshold = (tmp1 + tmp2) / cond_weights_train_ord[k + 1]
# self.threshold = np.quantile(scores, r)
self.fitted = True
return self
[docs] def predict(self, x_test: ndarray) -> ndarray:
"""Predict the labels.
Parameters
----------
x_test : ndarray of shape (m_test, n_x)
Grid data points (describing variables) as a 2D array containing data with
float type.
Returns
-------
labels_pred : ndarray of shape (m_test,)
Predicted labels (0 - inlier, 1 - outlier) as an array containing data
with int type.
Examples
--------
>>> # Prepare data
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> m_test = 10
>>> x_train = np.random.normal(0, 1, size=(m_train, n_x))
>>> y_train = np.random.normal(0, 1, size=(m_train, n_y))
>>> y_star = np.array([0.0] * n_y)
>>> x_test = np.random.uniform(-3, 3, size=(m_test, n_x))
>>> # Fit the outliers detector
>>> params = (x_train, y_train, y_star)
>>> outliers_detector = CKDEOutliersDetection().fit(*params, r=0.1)
>>> # Predict the labels
>>> labels_pred = outliers_detector.predict(x_test) # labels_pred shape (10,)
"""
if not self.fitted:
raise RuntimeError("fit the outliers detector first")
if len(x_test.shape) != 2:
raise ValueError("invalid shape of 'x_test' - should be 2d")
scores = self.ckde.pdf(x_test)
labels_pred = np.where(scores <= self.threshold, 1, 0)
return labels_pred
[docs]class CKDEClustering:
"""Clustering based on conditional kernel density estimation.
TODO: <READ MORE>
Examples
--------
>>> # Prepare data for two clusters
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> y_star = np.array([0.0] * n_y)
>>> # Fit
>>> clustering = CKDEClustering().fit(x_train, y_train, y_star)
"""
def __init__(self):
self.kernel_name = "gaussian"
self.fitted = False
[docs] def fit(
self,
x_train: ndarray,
y_train: ndarray,
y_star: ndarray,
weights_train: Optional[ndarray] = None,
bandwidth_x: Optional[ndarray] = None,
bandwidth_y: Optional[ndarray] = None,
bandwidth_method: str = "normal_reference",
**kwargs,
):
"""Fit the model.
Parameters
----------
x_train : ndarray of shape (m_train, n_x)
Data points (describing variables) as an array containing data with float
type.
y_train : ndarray of shape (m_train, n_y)
Data points (conditioning variables) as an array containing data with float
type.
y_star : ndarray of shape (n_y,)
Conditioned value.
weights_train : ndarray of shape (m_train,), optional
Weights of data points. If None, all points are equally weighted.
bandwidth_x : ndarray of shape (n_x,), optional
Smoothing parameter of describing variables.
bandwidth_y : ndarray of shape (n_y,), optional
Smoothing parameter of conditioning variables.
bandwidth_method : {'normal_reference', 'direct_plugin', 'ste_plugin', \
'ml_cv'}, default='normal_reference'
Name of bandwidth selection method used to compute smoothing parameter
when `bandwidth` is not given explicitly.
Returns
-------
self : object
Fitted self instance of CKDEClustering.
Examples
--------
>>> # Prepare data for two clusters
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> y_star = np.array([0.0] * n_y)
>>> weights_train = np.random.uniform(0, 1, size=(m_train,))
>>> # Fit
>>> clustering = CKDEClustering().fit(x_train, y_train, y_star, weights_train)
"""
if x_train.ndim != 2:
raise ValueError("invalid shape of 'x_train' - should be 2d")
self.x_train = x_train
self.m_train = self.x_train.shape[0]
self.n_x = self.x_train.shape[1]
if y_train.ndim != 2:
raise ValueError("invalid shape of 'y_train' - should be 2d")
if y_train.shape[0] != self.x_train.shape[0]:
raise ValueError("invalid size of 'y_train'")
self.y_train = y_train
self.n_y = self.y_train.shape[1]
if y_star.ndim != 1:
raise ValueError("invalid shape of 'y_star' - should be 1d")
if y_star.shape[0] != self.n_y:
raise ValueError(
f"invalid size of 'y_star'- should contain {self.n_y} values"
)
self.y_star = y_star
if weights_train is None:
self.weights_train = np.full(self.m_train, 1 / self.m_train)
else:
if weights_train.ndim != 1:
raise ValueError("invalid shape of 'weights_train' - should be 1d")
if weights_train.shape[0] != x_train.shape[0]:
raise ValueError("invalid size of 'weights_train'")
if not (weights_train > 0).all():
raise ValueError("'weights_train' should be positive")
self.weights_train = weights_train / weights_train.sum()
if bandwidth_x is None or bandwidth_y is None:
z_train = np.concatenate((self.x_train, self.y_train), axis=1)
if bandwidth_method == "normal_reference":
bandwidth = normal_reference(
z_train, self.weights_train, self.kernel_name
)
elif bandwidth_method == "direct_plugin":
stage = kwargs["stage"] if "stage" in kwargs else 2
bandwidth = direct_plugin(z_train, self.kernel_name, stage)
else:
raise ValueError("invalid 'bandwidth_method'")
self.bandwidth_x = bandwidth[: self.n_x]
self.bandwidth_y = bandwidth[self.n_x :]
else:
if bandwidth_x.ndim != 1:
raise ValueError("invalid shape of 'bandwidth_x' - should be 1d")
if bandwidth_y.ndim != 1:
raise ValueError("invalid shape of 'bandwidth_y' - should be 1d")
if bandwidth_x.shape[0] != self.n_x:
raise ValueError(
f"invalid size of 'bandwidth_x' - should contain {self.n_x} values"
)
if bandwidth_y.shape[0] != self.n_y:
raise ValueError(
f"invalid size of 'bandwidth_y' - should contain {self.n_y} values"
)
if not (bandwidth_x > 0).all():
raise ValueError("'bandwidth_x' should be positive")
if not (bandwidth_y > 0).all():
raise ValueError("'bandwidth_y' should be positive")
self.bandwidth_x = bandwidth_x
self.bandwidth_y = bandwidth_y
self.fitted = True
return self
[docs] def predict(
self,
x_test: ndarray,
algorithm: str = "mean_shift",
epsilon: float = 1e-8,
delta: float = 1e-3, # 1e-1
):
"""Predict cluster labels.
Parameters
----------
x_test : ndarray of shape (m_test, n_x)
Grid data points (describing variables) as an array containing data with
float type.
algorithm : {'gradient_ascent', 'mean_shift'}, default='mean_shift'
Name of clustering algorithm.
epsilon : float, default=1e-8
Threshold for difference (euclidean distance) of data point position while
shifting. When the difference is less than epsilon, data point is no longer
shifted.
delta : float, default=1e-3
Acceptance error (euclidean distance) between shifted data point and
representative of cluster. If the error is less than delta, data point is
assigned to cluster represented by cluster representative.
Returns
-------
labels_pred : ndarray of shape (m_train,)
Predicted labels as an array containing data with int type.
Examples
--------
>>> # Prepare data for two clusters
>>> m_train = 100
>>> n_x, n_y = 1, 1
>>> x_train1 = np.random.normal(0, 1, size=(m_train // 2, n_x))
>>> y_train1 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> x_train2 = np.random.normal(3, 1, size=(m_train // 2, n_x))
>>> y_train2 = np.random.normal(0, 1, size=(m_train // 2, n_y))
>>> x_train = np.concatenate((x_train1, x_train2))
>>> y_train = np.concatenate((y_train1, y_train2))
>>> y_star = np.array([0.0] * n_y)
>>> # Fit
>>> clustering = CKDEClustering().fit(x_train, y_train, y_star)
>>> labels_pred = clustering.predict()
"""
cond_weights_train = compute_d(
self.y_train,
self.weights_train,
self.y_star,
self.bandwidth_y,
self.kernel_name,
)
if algorithm == "gradient_ascent":
x_k = gradient_ascent(
self.x_train, cond_weights_train, x_test, self.bandwidth_x, epsilon
)
elif algorithm == "mean_shift":
x_k = mean_shift(
self.x_train, cond_weights_train, x_test, self.bandwidth_x, epsilon
)
else:
raise ValueError("invalid 'algorithm'")
labels_pred = assign_labels(x_k, delta)
return labels_pred