Homework 4

• 21 min read • 4032 words
Tags: Ma-Le
Categories: Data Science

Homework 4

1. Naive Bayes Classifiers

Q2.Q2. Categorical Feature Distribution

这一部分实现了分类特征分布的计算。我们创建一个 X,YX, Y 的联合分布表,然后对每个特征标签 y_label,统计这一部分对应的 XX 的次数和总的 YY 的类别的比值,这个结果就是对应的概率分布。

@mugrade.local_tests
class CategoricalDistribution:
    def __init__(self, X, Y, alpha=1.0):
        """
        args:
            X : pd.Series -- pandas series of feature values
            Y : pd.Series -- pandas series of class labels
            alpha : float -- smoothing factor
        """
        self.alpha = alpha
        # k: number of unique values the feature can take
        self.k = len(X.unique())
        # m: total number of instances for each class label
        self.m = Y.value_counts()

        counts = pd.crosstab(X, Y)
        counts = counts.reindex(columns=self.m.index, fill_value=0)

        self.probabilities = counts.copy()
        for y_label in self.probabilities.columns:
          total_class = self.m[y_label]
          self.probabilities[y_label] = (self.probabilities[y_label] + alpha) / (total_class + self.k * alpha)


    def log_likelihood(self, x, y):
        """
        args:
            x : string -- feature value
            y : string -- class value


        return : float -- (Laplace smoothed) log likelihood of this feature given the class label
        """
        if y not in self.probabilities.columns:
          return -np.inf
        if x in self.probabilities.index:
          prob = self.probabilities.loc[x, y]
        else:
          total_class = self.m[y]
          prob = alpha / total_class + self.k * self.alpha

        return np.log(prob)

Q3.Q3. Gaussian Feature Distribution

这一部分实现了高斯分布的计算。我们将 XX 按照 YY 进行分组,然后计算它的均值和方差。

高斯分布的对数似然的计算公式如下:

lnf(x)=ln(12πσ2)+ln(exp((xμ)22σ2))=12ln(2πσ2)(xμ)22σ2\begin{aligned} \ln f(x) &= \ln\left(\frac{1}{\sqrt{2\pi\sigma^2}}\right) + \ln\left(\exp\left(-\frac{(x-\mu)^2}{2\sigma^2}\right)\right) \\[6pt] &= -\tfrac{1}{2}\ln(2\pi\sigma^2) - \frac{(x-\mu)^2}{2\sigma^2} \end{aligned}
@mugrade.local_tests
class GaussianDistribution:
    def __init__(self, X, Y):
        """ initializes the parameterize for Categorical distribution
        args:
            X : pd.Series -- pandas series of feature values
            Y : pd.Series -- pandas series of class labels
            alpha : float -- smoothing factor
        """
        X_group = X.groupby(Y)
        means = X_group.mean()
        variances = X_group.var(ddof=0)
        self.parameters = pd.DataFrame({'mean': means, 'variance': variances})

    def log_likelihood(self, x, y):
        """ log of conditional probability of value x given label y according to each class

        args:
            x : float/int -- feature value
            y : string -- class value

        return : float -- log likelihood of this feature given the class label
        """
        try:
          mean, variance = self.parameters.loc[y]
        except KeyError:
          return -np.inf
        if variance == 0:
          return -np.inf

        log_prob = -0.5 * np.log(2 * np.pi * variance)
        log_prob -= ((x - mean) ** 2) / (2 * variance)
        return log_prob

Q4.Q4. Putting things together

这一部分将前面实现的概率分布用在实际的贝叶斯分类器中。我们给不同的对象分配不同的概率分布模型,根据朴素贝叶斯公式:

p(YX)=p(XY)p(Y)yp(Xy)p(y)p(Y\mid X)=\frac{p(X\mid Y)\,p(Y)}{\sum_{y} p(X\mid y)\,p(y)}

取对数得:

logp(YX)=log(p(XY)p(Y))log ⁣(yp(Xy)p(y))=logp(XY)+logp(Y)log ⁣(yp(Xy)p(y))\log p(Y\mid X) = \log\big(p(X\mid Y)\,p(Y)\big) - \log\!\left(\sum_{y} p(X\mid y)\,p(y)\right) = \log p(X\mid Y) + \log p(Y) - \log\!\left(\sum_{y} p(X\mid y)\,p(y)\right)

这正好是我们前面计算的一系列概率。对每一个类别,我们先计算 log(p(XY)p(Y))\log\big(p(X\mid Y)\,p(Y)\big),将其指数化变为 p(XY)p(Y)p(X\mid Y)\,p(Y)。然后,我们对所有计算出的这些概率求和 sum,就得到了分母 yp(Xy)p(y)\sum_{y} p(X\mid y)\,p(y)

我们在初始化部分完成先验概率 p(Y)p(Y) 的计算,然后在实际的 probabilities 方法中使用对应的概率分布模型计算 p(YX)p(Y\mid X)、并最终计算 p(YX)p(Y\mid X)

这部分的实现也体现了我们之前说的贝叶斯分布的核心思想:

  1. 做出朴素的条件独立性假设:p(XY)=p(XiY)p(X|Y) = \prod p(X_i|Y)
  2. 每一个独立的类条件概率 p(XiY)p(X_i|Y) 选择一个合适的概率分布模型
@mugrade.local_tests
class NaiveBayesClassifier:
    """ 
    attr:
        predictor : Dict[column_name,model] -- model for each column
        log_prior : np.ndarray -- the (log) prior probability of each class
    """

    def __init__(self, X, Y, alpha=1.):
        """
        args:
            X : pd.DataFrame -- processed dataframe, without labels column
            Y : pd.Series -- series for label column
            alpha : float -- alpha for Laplace smoothing (for both features )
        """
        self.alpha = alpha
        self.predictors = {}
        self.m = Y.unique()

        m = len(Y)
        k = len(self.m)
        class_counts = Y.value_counts()

        prior = (class_counts + alpha) / (m + k * alpha)
        self.log_prior = np.log(prior)

        for col in X.columns:
          feature_data = X[col]
          if feature_data.dtype == 'int64' or feature_data.dtype == 'float64':
            self.predictors[col] = GaussianDistribution(feature_data, Y)
          elif feature_data.dtype == 'object':
            self.predictors[col] = CategoricalDistribution(feature_data, Y, self.alpha)

    def probabilities(self, x):
        """
        args:
            x : pd.Series -- series for a single row of the data frame (without labels)

        returns : dict -- mapping of class label values to probabilities
        """
        log_probs = {}
        for m_class in self.m:
          current_prob = self.log_prior[m_class]
          for feature_name, feature_value in x.items():
            if feature_name in self.predictors:
              model = self.predictors[feature_name]
              current_prob += model.log_likelihood(feature_value, m_class)

          log_probs[m_class] = current_prob

        max_log_prob = max(log_probs.values())
        probs = {y_class: np.exp(prob - max_log_prob) for y_class, prob in log_probs.items()}
        total_probs = sum(probs.values())
        final_probs = {y_class: prob / total_probs for y_class, prob in probs.items()}
        return final_probs

2. Unsupervised Learning

Q2.Q2. K-means clustering

这一部分实现了 K-means 算法。较简单,注意矩阵维度匹配即可。

@mugrade.local_tests
def kmeans(X, k, center_indices, max_iters):
    """ 
        Args:
                X (numpy 2D matrix) : data matrix, each row is an example]
                k : number of clusters
                center_indices: indices into rows of X to use as initial clusters
                max_iters : number of iterations to run

        Return: tuple (centers, lables, objective)
               centers : (k x X.shape[1])-dimensional numpy array of extracted centers
               indices: (X.shape[0])-dimensional numpy array of assigned cluster indices at the final iteration
               objective : (max_iters)-dimensional numpy array of objective value after each iteration

    """
    centers = X[center_indices, :]
    objective_history = []


    for i in range(max_iters):
      D = -2 * X @ centers.T + (X ** 2).sum(axis=1)[:, None] + (centers ** 2).sum(axis=1)
      y = np.argmin(D, axis=1)
      objective = np.mean(np.min(D, axis=1))
      objective_history.append(objective)

      new_centers = np.zeros_like(centers)
      for j in range(k):
        points = X[y == j]
        if len(points) > 0:
          new_centers[j] = np.mean(points, axis=0)
        else:
          new_centers[j] = centers[j]
      centers = new_centers

    return (centers, y, np.array(objective_history))

Q3.Q3. K-means++

@mugrade.local_tests
def kmeans_pp_nn(X, center_index_0, k, j):
    """ Compute initial cluster points using the (deterministic) K-means++ algorithm
        Args:
                X (numpy 2D matrix) : data matrix, each row is an example]
                center_index_0 (int) : index of first data point (row of X) to use as center
                k : number of clusters
                j : index of cluster in list sorted by distance to take as the next center

        Return:
            (list) : indices of data points to use as clusters

    """
    n_samples, n_features = X.shape
    centers_indices = [center_index_0]

    for _ in range(1, k):
        current_center = X[centers_indices]
        D = -2 * X @ current_center.T + (X ** 2).sum(axis=1)[:, None] + (current_center ** 2).sum(axis=1)
        # Compute the distances of all remaining points to all selected centers
        distances = np.min(D, axis=1)

        # Sort the list of data points (in decreasing order) by the minimum such distance
        sorted_indices = np.argsort(distances)[::-1]

        # Choose the jth element of the list in this sorted order as the next center
        next_center_index = sorted_indices[j]
        centers_indices.append(next_center_index)

    return centers_indices

3. Collaborative Filtering

Q2.Q2. Alternating Minimization for Collaborative Filtering

这一部分实现了协同过滤的误差计算。

@mugrade.local_tests
def cf_error(X, U, V, b):
    """ 
        args:
            X : np.array[num_users, num_movies] -- the ratings matrix (as normal numpy array, not sparse)
            U : np.array[num_users, num_features] -- a matrix of features for each user
            V : np.array[num_movies,num_features] -- a matrix of features for each movie
            b : float -- bias term

        return: float -- the mean squared error between non-zero entries of X and the ratings
            predicted by U and V, added to the bias b; as this is an error and not a loss function,
            you do not need to include the regularizing terms.
    """
    X_pred = U @ V.T + b
    mask = X != 0
    if not np.any(mask):
      return 0.0

    squared_error = (X[mask] - X_pred[mask]) ** 2
    mean_squared_error = np.mean(squared_error)
    return mean_squared_error

然后我们完成训练函数。训练的核心步骤如下:

  1. 初始化:我们不知道 UUVV,先生成一个物品矩阵 V 和用户矩阵 U。
  2. 固定 UU、更新 VV 使得此时的最小二乘误差最小。
  3. 固定 VV、更新 UU 使得此时的最小二乘误差最小。
  4. 一直持续,直到两个矩阵收敛。

在具体实现中,我们一行一行地计算这个最小二乘误差的解。由前面的公式:

θ=(XTX)1XTy\theta =(X^TX)^{-1}X^Ty

我们可以根据实际的 rating 矩阵的每一行(列)的值,得到 UUVV 对应的值,将其填入矩阵即可。

@mugrade.local_tests
def cf_train(X_train, k, niters=10, lam=10., verbose=True):
    """ Train a collaborative filtering model.
        Args:
            X_train : np.array[num_users, num_movies] -- the training ratings matrix, assumed dense
            k : int -- the number of features in the CF model
            niters : int -- number of iterations to run
            lam : float -- regularization parameter, shown as lambda

        return : Tuple[U, V, b]
            U : np.array[num_users,  num_features] -- the user-feature matrix
            V : np.array[num_movies, num_features] -- the movie-feature matrix
            b : float -- bias term
    """
    m, n = X_train.shape
    U = np.sin(np.arange(m*k)).reshape(m,k)/5
    V = np.cos(np.arange(n*k)).reshape(n,k)/5
    W = X_train != 0
    I = np.identity(k)
    epsilon = 1e-8 # Small value to prevent singular matrix

    b = np.mean(X_train[W])
    m, n = X_train.shape

    for _ in range(niters):
      for j in range(n):
        user_indices = W[:, j]
        if not np.any(user_indices):
          continue
        U_subset = U[user_indices]
        A = U_subset.T @ U_subset + (lam + epsilon) * I
        ratings = X_train[user_indices, j]
        B = U_subset.T @ (ratings - b)
        V[j, :] = np.linalg.lstsq(A, B, rcond=None)[0]
      for i in range(m):
        movie_indices = W[i, :]
        if not np.any(movie_indices):
          continue
        V_subset = V[movie_indices]
        A = V_subset.T @ V_subset + (lam + epsilon) * I
        ratings = X_train[i, movie_indices]
        B = V_subset.T @ (ratings - b)
        U[i, :] = np.linalg.lstsq(A, B, rcond=None)[0]

    return U, V, b

Q3.Q3. Recommendations

这个部分中,我们使用我们训练得到的 UUVV 矩阵来构建推荐系统。

实现原理十分简单:我们找到使得 XpredX_pred 最大的参数,这个参数就是我们给用户推荐的电影。

同时注意,对于用户已有评分的电影,我们需要使用 -np.inf 来遮住,因为我们希望给用户推荐没有看过的电影。

 @mugrade.local_tests
def cf_recommend(X, U, V, b):
    """
        args:
            X : np.array[num_users, num_movies] -- the ratings matrix
            U : np.array[num_users, num_features] -- a matrix of features for each user
            V : np.array[num_movies,num_features] -- a matrix of features for each movie

        return: np.array[num_users] -- index of movie id to recommend for each user
    """
    X_pred = U @ V.T + b
    W = X > 0
    X_pred[W] = -np.inf
    recommendations = np.argmax(X_pred, axis=1)
    return recommendations

Comments

Total words: 4032