Source code for kdelearn.ckde

from typing import Optional, Tuple

import numpy as np
from numpy import ndarray

from .bandwidth_selection import direct_plugin, kernel_properties, normal_reference
from .cutils import compute_d, compute_kde


[docs]class CKDE: """Conditional kernel density estimator with product kernel: TODO: <MATH FORMULA and READ MORE and REFERENCES> Parameters ---------- kernel_name : {'gaussian', 'uniform', 'epanechnikov', 'cauchy'}, default='gaussian' Name of kernel function. Examples -------- >>> # Prepare data >>> m_train = 100 >>> n_x, n_y = 1, 1 >>> x_train = np.random.normal(0, 1, size=(m_train, n_x)) >>> y_train = np.random.normal(0, 1, size=(m_train, n_y)) >>> y_star = np.array([0.0] * n_y) >>> # Fit >>> ckde = CKDE("gaussian").fit(x_train, y_train, y_star) """ def __init__(self, kernel_name: str = "gaussian"): if kernel_name not in kernel_properties: available_kernels = list(kernel_properties.keys()) raise ValueError(f"invalid 'kernel_name' - try one of {available_kernels}") self.kernel_name = kernel_name self.fitted = False
[docs] def fit( self, x_train: ndarray, y_train: ndarray, y_star: ndarray, weights_train: Optional[ndarray] = None, bandwidth_x: Optional[ndarray] = None, bandwidth_y: Optional[ndarray] = None, bandwidth_method: str = "normal_reference", **kwargs, ): """Fit the estimator. Parameters ---------- x_train : ndarray of shape (m_train, n_x) Data points (describing variables) as an array containing data with float type. y_train : ndarray of shape (m_train, n_y) Data points (conditioning variables) as an array containing data with float type. y_star : ndarray of shape (n_y,) Conditioned value. weights_train : ndarray of shape (m_train,), optional Weights of data points. If None, all points are equally weighted. bandwidth_x : ndarray of shape (n_x,), optional Smoothing parameter of describing variables. bandwidth_y : ndarray of shape (n_y,), optional Smoothing parameter of conditioning variables. bandwidth_method : {'normal_reference', 'direct_plugin', 'ste_plugin', \ 'ml_cv'}, default='normal_reference' Name of bandwidth selection method used to compute smoothing parameter when `bandwidth` is not given explicitly. Returns ------- self : object Fitted self instance of CKDE. Examples -------- >>> # Prepare data >>> m_train = 100 >>> n_x, n_y = 1, 1 >>> x_train = np.random.normal(0, 1, size=(m_train, n_x)) >>> y_train = np.random.normal(0, 1, size=(m_train, n_y)) >>> weights_train = np.random.randint(1, 10, size=(m_train,)) >>> y_star = np.array([0.0] * n_y) >>> bandwidth_x = np.array([0.5] * n_x) >>> bandwidth_y = np.array([0.5] * n_y) >>> # Fit the estimator >>> params = (x_train, y_train, y_star, weights_train, bandwidth_x, bandwidth_y) >>> ckde = CKDE().fit(*params) """ if x_train.ndim != 2: raise ValueError("invalid shape of 'x_train' - should be 2d") self.x_train = x_train m_train = self.x_train.shape[0] n_x = self.x_train.shape[1] if y_train.ndim != 2: raise ValueError("invalid shape of 'y_train' - should be 2d") if y_train.shape[0] != x_train.shape[0]: raise ValueError("invalid size of 'y_train'") n_y = y_train.shape[1] if y_star.ndim != 1: raise ValueError("invalid shape of 'y_star' - should be 1d") if y_star.shape[0] != n_y: raise ValueError(f"invalid size of 'y_star'- should contain {n_y} values") if weights_train is None: weights_train = np.full(m_train, 1 / m_train) else: if weights_train.ndim != 1: raise ValueError("invalid shape of 'weights_train' - should be 1d") if weights_train.shape[0] != x_train.shape[0]: raise ValueError("invalid size of 'weights_train'") if not (weights_train > 0).all(): raise ValueError("'weights_train' should be positive") weights_train = weights_train / weights_train.sum() if bandwidth_x is None or bandwidth_y is None: z_train = np.concatenate((self.x_train, y_train), axis=1) if bandwidth_method == "normal_reference": bandwidth = normal_reference(z_train, weights_train, self.kernel_name) elif bandwidth_method == "direct_plugin": stage = kwargs["stage"] if "stage" in kwargs else 2 bandwidth = direct_plugin(z_train, self.kernel_name, stage) else: raise ValueError("invalid 'bandwidth_method'") self.bandwidth_x = bandwidth[:n_x] bandwidth_y = bandwidth[n_x:] else: if bandwidth_x.ndim != 1: raise ValueError("invalid shape of 'bandwidth_x' - should be 1d") if bandwidth_y.ndim != 1: raise ValueError("invalid shape of 'bandwidth_y' - should be 1d") if bandwidth_x.shape[0] != n_x: raise ValueError( f"invalid size of 'bandwidth_x' - should contain {n_x} values" ) if bandwidth_y.shape[0] != n_y: raise ValueError( f"invalid size of 'bandwidth_y' - should contain {n_y} values" ) if not (bandwidth_x > 0).all(): raise ValueError("'bandwidth_x' should be positive") if not (bandwidth_y > 0).all(): raise ValueError("'bandwidth_y' should be positive") self.bandwidth_x = bandwidth_x self.cond_weights_train = compute_d( y_train, weights_train, y_star, bandwidth_y, self.kernel_name, ) self.fitted = True return self
[docs] def pdf( self, x_test: ndarray, ) -> Tuple[ndarray, ndarray]: """Compute conditional probability density function. Parameters ---------- x_test : ndarray of shape (m_test, n_x) Grid data points (describing variables) as an array containing data with float type. Returns ------- scores : ndarray of shape (m_test,) Values of kernel density estimator. cond_weights_train : ndarray of shape (m_train,) TODO: complete !!!!!!!! Examples -------- >>> # Prepare data >>> m_train = 100 >>> n_x, n_y = 1, 1 >>> m_test = 10 >>> x_train = np.random.normal(0, 1, (m_train, n_x)) >>> y_train = np.random.normal(0, 1, (m_train, n_y)) >>> y_star = np.array([0.0] * n_y) >>> x_test = np.random.uniform(-3, 3, (m_test, n_x)) >>> # Fit the estimator. >>> ckde = CKDE().fit(x_train, y_train, y_star) >>> # Compute pdf >>> scores, d = ckde.pdf(x_test) # scores shape (10,) """ if not self.fitted: raise RuntimeError("fit the estimator first") if x_test.ndim != 2: raise ValueError("invalid shape of 'x_test' - should be 2d") scores = compute_kde( self.x_train, self.cond_weights_train, x_test, self.bandwidth_x, self.kernel_name, ) return scores