Homework 4
• 21 min read • 4032 words
Tags: Ma-Le
Categories: Data Science
Homework 4
1. Naive Bayes Classifiers
Categorical Feature Distribution
这一部分实现了分类特征分布的计算。我们创建一个 的联合分布表,然后对每个特征标签 y_label
,统计这一部分对应的 的次数和总的 的类别的比值,这个结果就是对应的概率分布。
@mugrade.local_tests
class CategoricalDistribution:
def __init__(self, X, Y, alpha=1.0):
"""
args:
X : pd.Series -- pandas series of feature values
Y : pd.Series -- pandas series of class labels
alpha : float -- smoothing factor
"""
self.alpha = alpha
# k: number of unique values the feature can take
self.k = len(X.unique())
# m: total number of instances for each class label
self.m = Y.value_counts()
counts = pd.crosstab(X, Y)
counts = counts.reindex(columns=self.m.index, fill_value=0)
self.probabilities = counts.copy()
for y_label in self.probabilities.columns:
total_class = self.m[y_label]
self.probabilities[y_label] = (self.probabilities[y_label] + alpha) / (total_class + self.k * alpha)
def log_likelihood(self, x, y):
"""
args:
x : string -- feature value
y : string -- class value
return : float -- (Laplace smoothed) log likelihood of this feature given the class label
"""
if y not in self.probabilities.columns:
return -np.inf
if x in self.probabilities.index:
prob = self.probabilities.loc[x, y]
else:
total_class = self.m[y]
prob = alpha / total_class + self.k * self.alpha
return np.log(prob)
Gaussian Feature Distribution
这一部分实现了高斯分布的计算。我们将 按照 进行分组,然后计算它的均值和方差。
高斯分布的对数似然的计算公式如下:
@mugrade.local_tests
class GaussianDistribution:
def __init__(self, X, Y):
""" initializes the parameterize for Categorical distribution
args:
X : pd.Series -- pandas series of feature values
Y : pd.Series -- pandas series of class labels
alpha : float -- smoothing factor
"""
X_group = X.groupby(Y)
means = X_group.mean()
variances = X_group.var(ddof=0)
self.parameters = pd.DataFrame({'mean': means, 'variance': variances})
def log_likelihood(self, x, y):
""" log of conditional probability of value x given label y according to each class
args:
x : float/int -- feature value
y : string -- class value
return : float -- log likelihood of this feature given the class label
"""
try:
mean, variance = self.parameters.loc[y]
except KeyError:
return -np.inf
if variance == 0:
return -np.inf
log_prob = -0.5 * np.log(2 * np.pi * variance)
log_prob -= ((x - mean) ** 2) / (2 * variance)
return log_prob
Putting things together
这一部分将前面实现的概率分布用在实际的贝叶斯分类器中。我们给不同的对象分配不同的概率分布模型,根据朴素贝叶斯公式:
取对数得:
这正好是我们前面计算的一系列概率。对每一个类别,我们先计算 ,将其指数化变为 。然后,我们对所有计算出的这些概率求和 sum
,就得到了分母 。
我们在初始化部分完成先验概率 的计算,然后在实际的 probabilities
方法中使用对应的概率分布模型计算 、并最终计算 。
这部分的实现也体现了我们之前说的贝叶斯分布的核心思想:
- 做出朴素的条件独立性假设:。
- 为每一个独立的类条件概率 选择一个合适的概率分布模型。
@mugrade.local_tests
class NaiveBayesClassifier:
"""
attr:
predictor : Dict[column_name,model] -- model for each column
log_prior : np.ndarray -- the (log) prior probability of each class
"""
def __init__(self, X, Y, alpha=1.):
"""
args:
X : pd.DataFrame -- processed dataframe, without labels column
Y : pd.Series -- series for label column
alpha : float -- alpha for Laplace smoothing (for both features )
"""
self.alpha = alpha
self.predictors = {}
self.m = Y.unique()
m = len(Y)
k = len(self.m)
class_counts = Y.value_counts()
prior = (class_counts + alpha) / (m + k * alpha)
self.log_prior = np.log(prior)
for col in X.columns:
feature_data = X[col]
if feature_data.dtype == 'int64' or feature_data.dtype == 'float64':
self.predictors[col] = GaussianDistribution(feature_data, Y)
elif feature_data.dtype == 'object':
self.predictors[col] = CategoricalDistribution(feature_data, Y, self.alpha)
def probabilities(self, x):
"""
args:
x : pd.Series -- series for a single row of the data frame (without labels)
returns : dict -- mapping of class label values to probabilities
"""
log_probs = {}
for m_class in self.m:
current_prob = self.log_prior[m_class]
for feature_name, feature_value in x.items():
if feature_name in self.predictors:
model = self.predictors[feature_name]
current_prob += model.log_likelihood(feature_value, m_class)
log_probs[m_class] = current_prob
max_log_prob = max(log_probs.values())
probs = {y_class: np.exp(prob - max_log_prob) for y_class, prob in log_probs.items()}
total_probs = sum(probs.values())
final_probs = {y_class: prob / total_probs for y_class, prob in probs.items()}
return final_probs
2. Unsupervised Learning
K-means clustering
这一部分实现了 K-means 算法。较简单,注意矩阵维度匹配即可。
@mugrade.local_tests
def kmeans(X, k, center_indices, max_iters):
"""
Args:
X (numpy 2D matrix) : data matrix, each row is an example]
k : number of clusters
center_indices: indices into rows of X to use as initial clusters
max_iters : number of iterations to run
Return: tuple (centers, lables, objective)
centers : (k x X.shape[1])-dimensional numpy array of extracted centers
indices: (X.shape[0])-dimensional numpy array of assigned cluster indices at the final iteration
objective : (max_iters)-dimensional numpy array of objective value after each iteration
"""
centers = X[center_indices, :]
objective_history = []
for i in range(max_iters):
D = -2 * X @ centers.T + (X ** 2).sum(axis=1)[:, None] + (centers ** 2).sum(axis=1)
y = np.argmin(D, axis=1)
objective = np.mean(np.min(D, axis=1))
objective_history.append(objective)
new_centers = np.zeros_like(centers)
for j in range(k):
points = X[y == j]
if len(points) > 0:
new_centers[j] = np.mean(points, axis=0)
else:
new_centers[j] = centers[j]
centers = new_centers
return (centers, y, np.array(objective_history))
K-means++
@mugrade.local_tests
def kmeans_pp_nn(X, center_index_0, k, j):
""" Compute initial cluster points using the (deterministic) K-means++ algorithm
Args:
X (numpy 2D matrix) : data matrix, each row is an example]
center_index_0 (int) : index of first data point (row of X) to use as center
k : number of clusters
j : index of cluster in list sorted by distance to take as the next center
Return:
(list) : indices of data points to use as clusters
"""
n_samples, n_features = X.shape
centers_indices = [center_index_0]
for _ in range(1, k):
current_center = X[centers_indices]
D = -2 * X @ current_center.T + (X ** 2).sum(axis=1)[:, None] + (current_center ** 2).sum(axis=1)
# Compute the distances of all remaining points to all selected centers
distances = np.min(D, axis=1)
# Sort the list of data points (in decreasing order) by the minimum such distance
sorted_indices = np.argsort(distances)[::-1]
# Choose the jth element of the list in this sorted order as the next center
next_center_index = sorted_indices[j]
centers_indices.append(next_center_index)
return centers_indices
3. Collaborative Filtering
Alternating Minimization for Collaborative Filtering
这一部分实现了协同过滤的误差计算。
@mugrade.local_tests
def cf_error(X, U, V, b):
"""
args:
X : np.array[num_users, num_movies] -- the ratings matrix (as normal numpy array, not sparse)
U : np.array[num_users, num_features] -- a matrix of features for each user
V : np.array[num_movies,num_features] -- a matrix of features for each movie
b : float -- bias term
return: float -- the mean squared error between non-zero entries of X and the ratings
predicted by U and V, added to the bias b; as this is an error and not a loss function,
you do not need to include the regularizing terms.
"""
X_pred = U @ V.T + b
mask = X != 0
if not np.any(mask):
return 0.0
squared_error = (X[mask] - X_pred[mask]) ** 2
mean_squared_error = np.mean(squared_error)
return mean_squared_error
然后我们完成训练函数。训练的核心步骤如下:
- 初始化:我们不知道 和 ,先生成一个物品矩阵 V 和用户矩阵 U。
- 固定 、更新 使得此时的最小二乘误差最小。
- 固定 、更新 使得此时的最小二乘误差最小。
- 一直持续,直到两个矩阵收敛。
在具体实现中,我们一行一行地计算这个最小二乘误差的解。由前面的公式:
我们可以根据实际的 rating 矩阵的每一行(列)的值,得到 或 对应的值,将其填入矩阵即可。
@mugrade.local_tests
def cf_train(X_train, k, niters=10, lam=10., verbose=True):
""" Train a collaborative filtering model.
Args:
X_train : np.array[num_users, num_movies] -- the training ratings matrix, assumed dense
k : int -- the number of features in the CF model
niters : int -- number of iterations to run
lam : float -- regularization parameter, shown as lambda
return : Tuple[U, V, b]
U : np.array[num_users, num_features] -- the user-feature matrix
V : np.array[num_movies, num_features] -- the movie-feature matrix
b : float -- bias term
"""
m, n = X_train.shape
U = np.sin(np.arange(m*k)).reshape(m,k)/5
V = np.cos(np.arange(n*k)).reshape(n,k)/5
W = X_train != 0
I = np.identity(k)
epsilon = 1e-8 # Small value to prevent singular matrix
b = np.mean(X_train[W])
m, n = X_train.shape
for _ in range(niters):
for j in range(n):
user_indices = W[:, j]
if not np.any(user_indices):
continue
U_subset = U[user_indices]
A = U_subset.T @ U_subset + (lam + epsilon) * I
ratings = X_train[user_indices, j]
B = U_subset.T @ (ratings - b)
V[j, :] = np.linalg.lstsq(A, B, rcond=None)[0]
for i in range(m):
movie_indices = W[i, :]
if not np.any(movie_indices):
continue
V_subset = V[movie_indices]
A = V_subset.T @ V_subset + (lam + epsilon) * I
ratings = X_train[i, movie_indices]
B = V_subset.T @ (ratings - b)
U[i, :] = np.linalg.lstsq(A, B, rcond=None)[0]
return U, V, b
Recommendations
这个部分中,我们使用我们训练得到的 、 矩阵来构建推荐系统。
实现原理十分简单:我们找到使得 最大的参数,这个参数就是我们给用户推荐的电影。
同时注意,对于用户已有评分的电影,我们需要使用 -np.inf
来遮住,因为我们希望给用户推荐没有看过的电影。
@mugrade.local_tests
def cf_recommend(X, U, V, b):
"""
args:
X : np.array[num_users, num_movies] -- the ratings matrix
U : np.array[num_users, num_features] -- a matrix of features for each user
V : np.array[num_movies,num_features] -- a matrix of features for each movie
return: np.array[num_users] -- index of movie id to recommend for each user
"""
X_pred = U @ V.T + b
W = X > 0
X_pred[W] = -np.inf
recommendations = np.argmax(X_pred, axis=1)
return recommendations
Total words: 4032
Comments