Homework 3
由于本人刚刚接触数据科学,对 numpy 之类的库的使用不是很熟练,因此会写的琐碎一些。
Homework 3
1. Linear
In this homework, we are going to apply linear regression to the problem of predicting developer satisfaction based upon information about their careers, from a StackOverflow survey. The data from this question is based on the 2019 StackOverflow Survey; accordingly, the subset bundled with this assignment is also released under the Open Database License (ODbL) v1.0. For this problem, you should not use Scikit-Learn, but instead, implement all the least squares solutions manually.
Data Pharsing
这一部分是对数据的预处理,包含下面的基本操作:
- 将所有数据转换为浮点数。
- 将布尔型变量
y/NA/n
转换为+1.0/0.0/0.0
。这里我们选择NA->0.0
的缺失值填充策略。 - 忽略对我们没用的一些字符串。
- 将有序类别型变量的等级与顺序通过数字大小表示出来。
我们创建一个 map
来存储这些映射,然后对每个列使用 .map
方法应用这些映射;对需要计算再得出最终结果的,我们把转换的表达式写出来,然后使用 .apply
方法应用这些转换。需要忽略的字符串调用 .drop
方法即可。
@mugrade.local_tests
def parse_stackoverflow_data():
"""Load data from the "eggs.csv.gz" file, and convert for use in linear regression
Returns:
pandas.DataFrame, containing the data converted to floating point values appropriately.
"""
df = pd.read_csv('eggs.csv.gz', dtype=str, keep_default_na=False)
MAPS = {
'CareerSat': {'vd': -2.0, 'sd': -1.0, 'ne': 0.0, 'NA': 0.0, 'ss': 1.0, 'vs': 2.0},
'EdLevel': {'other': 0.0, 'bachelors': 1.0, 'masters': 1.5, 'doctoral': 2.0, 'NA': 0.0},
'MgrIdiot': {'NA': -1.0, 'not': -1.0, 'some': 0.0, 'very': 1.0},
'OpSys': {'win': -1.0, 'mac': 0.0, 'NA': 0.0, 'tux': 1.0, 'BSD': 1.0},
'OpenSourcer': {'never': 0.0, 'year': 0.5, 'month-year': 1.0, 'month': 2.0, 'NA': 0.0}
}
for col, mapping in MAPS.items():
if col in df.columns:
df[col] = df[col].map(mapping)
def convert_orgsize(val):
if val == 'NA' or not val:
return 0.0
cleaned_val = val.replace(',', '').split('-')[0].split(' ')[0]
try:
num = float(cleaned_val)
return np.log(num) if num > 0 else 0.0
except (ValueError, TypeError):
return 0.0
def convert_bool(val):
return 1.0 if val == 'y' else 0.0
def convert_integer(val):
return float(val) if val != 'NA' else 0.0
bool_cols = ['MgrWant', 'Dependents', 'DevEnvironVSC', 'DevTypeFullStack',
'EduOtherMOOC', 'EduOtherSelf', 'Extraversion', 'GenderIsMan',
'Hobbyist', 'Student', 'UndergradMajorIsComputerScience',
'UnitTestsProcess']
int_cols = ['Age', 'CodeRevHrs', 'ConvertedComp', 'WorkWeekHrs',
'YearsCode', 'YearsCodePro']
drop_cols = ['Country', 'EduOtherSelf', 'Respondent', ]
df['OrgSize'] = df['OrgSize'].apply(convert_orgsize)
for col in bool_cols:
if col in df.columns:
df[col] = df[col].apply(convert_bool)
......
for col in drop_cols:
if col in df.columns:
df = df.drop(columns=[col])
return df
Splitting Data
这一部分是对训练集-测试集的划分,题目要求自己实现划分逻辑:
- 将整个数据集划分成训练集和测试集两部分。
- 对这两个部分:
- 把待预测的值的列单独拆开。
- 给剩余的作为输入的列加上一个全为 1 的列(这是因为偏置项)。然后把这些东西返回。
我们先获取划分索引(注意是沿行划分)。对划分的每个数据集,我们把它的第一列提出来,然后给剩余列 np.hstack
一个形状为 (dataset_shape[0], 1)
的全为 1 的列(注意要创建一个矩阵而不是只创建一个 (dataset_shape[0])
的向量,不然维度不匹配)。
@mugrade.local_tests
def split_data(df):
"""
returns: X_train, y_train, X_val, y_val
X_train : np.ndarray -- the second 80% of the data features
y_train : np.ndarray -- the second 80% of the target values
X_val : np.ndarray -- the first 20% (rounded down) of the data features
y_val : np.ndarray -- the first 20% of the target valuesn
"""
if df is None or df.empty:
return None, None, None, None
data_array = df.to_numpy()
num_rows = data_array.shape[0]
split_index = int(num_rows * 0.2)
validation_data = data_array[:split_index]
train_data = data_array[split_index:]
y_val = validation_data[:,0]
X_val = np.hstack([validation_data[:, 1:], np.ones((validation_data.shape[0], 1))])
y_train = train_data[:, 0]
X_train = np.hstack([train_data[:, 1:], np.ones((train_data.shape[0], 1))])
使用 scikit-learn
这个库的实现如下:
from sklearn.model_selection import train_test_split
@mugrade.local_tests
def split_data(df):
feature = df.iloc[:, 1:]
target = df.iloc[:, 0]
X_train, X_val, y_train, y_val = train_test_split(
feature, target, test_size=0.2, shuffle=False
)
X_train_with_bias = np.hstack([X_train.to_numpy(), np.ones(X_train.shape[0], 1)])
X_val_with_bias = np.hstack([X_val.to_numpy(), np.ones(X_val.shape[0], 1)])
return X_train_with_bias, y_train.to_numpy(), X_val_with_bias, y_val.to_numpy()
Linear Regression
这一部分需要实现具体线性回归逻辑。我们在这个类中存储用于预测结果的参数 。只需要使用最小二乘的解析解公式计算:
然后在预测方法 predict
中,使用这个进行预测即可。
def squared_error(y_pred, y):
return np.mean((y_pred - y)**2)
class LinearRegression():
def __init__(self, X, y):
self.theta = np.linalg(X.T @ X, X.T @ y)
def predict(self, X):
return X @ self.theta
Evaluation versus baselines
这一部分需要实现基线模型:我们采用计算平均值的算法作为基线模型,然后将基线模型的预测结果和线性回归模型的预测结果进行比较。
@mugrade.local_tests
def evaluate_linear_regression(X_train, y_train, X_val, y_val):
""" Evaluate the squared error of linear regression versus the simple mean-prediciton baseline.
Args: X_train, y_train, X_val, y_val -- output of split_data() function
Return: Tuple[validation_mse, baseline_mse]:
validation_mse: float -- squared error of predictions on validation set, when training on training set
baseline_mse: float -- squared error of predicting the mean on the training set
"""
model = LinearRegression(X_train, y_train)
y_pred = model.predict(X_train)
validation_mse = squared_error(y_pred, y_val)
baseline_pred = np.mean(y_train)
baseline_mse = squared_error(baseline_pred, y_val)
return validation_mse, baseline_mse
2. NLP
Bag of words, and TFIDF
这一部分需要根据输入文档返回构建好的 TFIDF 稀疏矩阵:
- 根据词袋模型计算出文档中每个词的出现次数。
- 计算一个词在单篇文档中出现的频率:词频(Term Frequency, TF)与一个词在所有文档中的普遍程度:逆文档频率(Inverse Document Frequency),然后相乘得到 TF-IDF 矩阵
注意,虽然在“自由文本处理”的笔记中我们讲解了TF矩阵的构建,但在实际计算中,我们应该遍历整个文档、计算这个文档的词频,然后将得到的TF-IDF值加入稀疏矩阵中,而不是构建一个完整的TF矩阵。
这里还使用了一个便捷的类 collections.Counter
,它接受一个可迭代对象,并返回一个字典:
- 键是可迭代对象中的元素。
- 值是该元素出现的次数。
from collections import Counter
@mugrade.local_tests
def tfidf(docs):
"""
Args:
docs: list of strings, where each string represents a space-separated
document
Returns: tuple: (tfidf_matrix, all_words)
tfidf_matrix: sparse matrix (in any scipy sparse format) of size (# docs) x
(# total unique words), where i,j entry is TFIDF score for
document i and term j
all_words: list of strings, where the ith element indicates the word
that corresponds to the ith column in the TFIDF matrix
"""
词汇表的构建比较简单:我们对所有的文档进行分词、去重,然后构建词语与索引的映射即可:
split_docs = [doc.split(' ') for doc in docs]
vocab = set()
for doc in split_docs:
vocab.update(doc)
if "" in vocab:
vocab.remove("")
all_words = sorted(list(vocab))
vocab_dict = {word: i for i, word in enumerate(all_words)}
num_docs = len(split_docs)
num_words = len(vocab)
然后我们构建每个词的逆文档频率:
df = np.zeros(num_words)
for doc in docs:
unique_doc = set(doc)
if "" in unique_doc:
unique_doc.remove("")
for word in unique_doc:
df[vocab_dict[word]] += 1
idf = np.log(num_docs / df)
然后,我们遍历所有文档,对文档中的每一个词计算词频,并将词频与前面计算的 idf 值相乘,得到最终 TF-IDF 的元素:
data, rows, cols = [], [], []
for i, doc in enumerate(split_docs):
word_count = Counter(doc)
if "" in word_count:
del word_count[""]
# Calculate document length without empty strings
doc_length = len([w for w in doc if w != ''])
if doc_length > 0: # Avoid division by zero
for word, count in word_count.items():
if word in vocab_dict:
j = vocab_dict[word]
tf = count / doc_length
tfidf_score = tf * idf[j]
if tfidf_score != 0: # Only add non-zero values to sparse matrix
data.append(tfidf_score)
rows.append(i)
cols.append(j)
tfidf_matrix = sp.coo_matrix((data, (rows, cols)), shape=(num_docs, num_words))
Cosine Similarity
这一部分需要计算余弦相似度、得到相似度矩阵。余弦相似度的分子是元素的点积,分母是元素的二阶范数。
由于结果需要返回一个矩阵,因此对于分母,我们需要进行外积操作来将向量拓展成一个矩阵:
@mugrade.local_tests
def cosine_similarity(X):
"""
Args:
X: sparse matrix of TFIDF scores or term frequencies
Returns:
M: dense numpy array of all pairwise cosine similarities. That is, the
entry M[i,j], should correspond to the cosine similarity between the
ith and jth rows of X.
"""
doc_product = X @ X.T
norms = np.sqrt(X.power(2).sum(axis=1))
denominator = np.outer(norms, norms)
numerator_dense = doc_product.toarray()
similarity_matrix = np.divide(
numerator_dense,
denominator,
out=np.zeros_like(numerator_dense),
where=denominator != 0
)
return similarity_matrix
Analyzing document authorship
这一部分需要我们使用先前构建的TF-IDF矩阵和余弦相似度计算,通过比较 unknown 文档和其他三个 author 文档的相似度,来判断 unknown 文档的 author 究竟是谁。
在具体实现中,我们存储每个 author 对应的文档索引,然后将这些索引作为行索引、unknown 的文档索引作为列索引,得到对应的子相似度矩阵,然后计算平均值。
这部分的处理逻辑非常实用:对于一些待分析的文档,我们:
- 构建 TF-IDF 矩阵。
- 计算需要的余弦相似度,然后使用这些相似度信息进行分析与计算。
@mugrade.local_tests
def author_cosine_similarity(docs, authors):
"""
Args:
docs: list of strings, where each string represents a space-separated
document
authors: list of lists, which each list contains the author (or potential authors) of a given document
Returns: tuple: (hamilton_mcs, madison_mcs, jay_mcs)
hamilton_mcs: Average cosine similarity between all the known Hamilton papers and all the unknown papers.
madison_mcs: Average cosine similarity between all the known Madison papers and all the unknown papers.
jay_mcs: Average cosine similarity between all the known Jay papers and all the unknown papers.
"""
hamilton_index = []
madison_index = []
jay_index = []
unknown_index = []
for i, author_tuple in enumerate(authors):
if author_tuple == ("HAMILTON",):
hamilton_index.append(i)
elif author_tuple == ("MADISON",):
madison_index.append(i)
elif author_tuple == ("JAY",):
jay_index.append(i)
elif author_tuple == ("HAMILTON", "MADISON"):
unknown_index.append(i)
tfidf_matrix, _ = tfidf(docs)
similarity_matrix = cosine_similarity(tfidf_matrix)
hamilton_mcs = similarity_matrix[np.ix_(hamilton_index, unknown_index)].mean()
madison_mcs = similarity_matrix[np.ix_(madison_index, unknown_index)].mean()
jay_mcs = similarity_matrix[np.ix_(jay_index, unknown_index)].mean()
return hamilton_mcs, madison_mcs, jay_mcs
Building an n-gram language model
这一部分需要我们实现一个 n-gram 语言模型。这是一种和 TF-IDF 不同的文档处理模型,在最终应用语言模型的模块我们会对它的总体流程进行总结。
首先我们创建文档对应的词汇表。这和我们在 TF-IDF 中做的完全一致
@mugrade.local_tests
class LanguageModel:
def __init__(self, docs, n):
"""
Args:
docs: list of strings, where each string represents a space-separated
document
n: integer, degree of n-gram model
"""
self.n = n
# Build vocabulary from all documents
split_docs = [doc.split(' ') for doc in docs]
vocab = set()
for doc in split_docs:
vocab.update(doc)
if "" in vocab:
vocab.remove("")
self.vocab = vocab
之后,我们进行 n-gram 需要的上下文计数,记录:
- Context 的次数
- Context 下当前词的出现次数
# Initialize counting structures
self.counts = collections.defaultdict(lambda: collections.defaultdict(int))
self.count_sums = collections.defaultdict(int)
# Count n-grams from each document separately
for doc in docs:
words = doc.split(' ')
words = [w for w in words if w != '']
if len(words) >= n:
for i in range(len(words) - n + 1):
context = words[i: i + n - 1]
current_word = words[i + n - 1]
context_key = " ".join(context)
self.counts[context_key][current_word] += 1
self.count_sums[context_key] += 1
然后,基于这些已经计算好的属性,我们开始计算模型的困惑度。根据题中的困惑度计算公式:
我们只要利用先前计算的 counts
和 count_sums
,来计算每一个位置的上下文概率:
即可:
def perplexity(self, text, alpha=1e-3):
"""
Evaluate perplexity of model on some text.
Args:
text: string containing space-separated words, on which to compute
alpha: constant to use in Laplace smoothing
Note: for the purposes of smoothing, the vocabulary size (i.e, the D term)
should be equal to the total number of unique words used to build the model
_and_ in the input text to this function.
Returns: perplexity
perplexity: floating point value, perplexity of the text as evaluated
under the model.
"""
test_words = text.split(' ')
test_vocab = set(test_words)
if '' in test_vocab:
test_vocab.remove('')
vocab = self.vocab.union(test_vocab)
D = len(vocab)
total_log_prob = 0.0
num_terms = 0
for i in range(len(test_words) - (self.n - 1)):
context = test_words[i: i + self.n - 1]
current_word = test_words[i + self.n - 1]
context_key = " ".join(context)
# Get counts, handling cases where context or word doesn't exist
if context_key in self.counts and current_word in self.counts[context_key]:
current_word_count = self.counts[context_key][current_word]
else:
current_word_count = 0
if context_key in self.count_sums:
context_count = self.count_sums[context_key]
else:
context_count = 0
prob = (current_word_count + alpha) / (context_count + D * alpha)
total_log_prob += np.log2(prob)
num_terms += 1
if num_terms == 0:
return float('inf')
average_log_prob = total_log_prob / num_terms
perplexity = 2 ** (-average_log_prob)
return perplexity
Author identification via language models
这一部分我们需要将我们的 n-gram 模型用在先前的 unknown author 识别任务中:
- 我们为每一个 author 都初始化一个 n-gram 模型,这个模型包含了这个 author 的文档的语言特征。
- 为了识别 unknown 文档的实际作者,我们计算每个模型关于这些文档的困惑度。困惑度最低的就是我们预测的 author。
@mugrade.local_tests
def mean_perplexity(docs, authors):
"""
Args:
docs: list of strings, where each string represents a space-separated document
authors: list of lists, which each list contains the author (or potential authors) of a given document
Returns: tuple: (perp_hamilton, perp_madison, perp_jay)
perp_hamilton: floating point value, mean perplexity of the unknown Federalist papers for the language
models from Hamilton
perp_madison: floating point value, mean perplexity of the unknown Federalist papers for the language
models from Madison
perp_jay: floating point value, mean perplexity of the unknown Federalist papers for the language
models from Jay
"""
hamilton_docs = []
madison_docs = []
jay_docs = []
unknown_docs = []
for i, author_tuple in enumerate(authors):
doc = docs[i]
if author_tuple == ("HAMILTON",):
hamilton_docs.append(doc)
elif author_tuple == ("MADISON",):
madison_docs.append(doc)
elif author_tuple == ("JAY",):
jay_docs.append(doc)
elif author_tuple == ("HAMILTON", "MADISON"):
unknown_docs.append(doc)
n = 3
alpha = 1e-3
hamilton_model = LanguageModel(hamilton_docs, n)
madison_model = LanguageModel(madison_docs, n)
jay_model = LanguageModel(jay_docs, n)
hamilton_perplexity = [hamilton_model.perplexity(doc, alpha=alpha) for doc in unknown_docs]
madison_perplexity = [madison_model.perplexity(doc, alpha=alpha) for doc in unknown_docs]
jay_perplexity = [jay_model.perplexity(doc, alpha=alpha) for doc in unknown_docs]
return (np.mean(hamilton_perplexity), np.mean(madison_perplexity), np.mean(jay_perplexity))
3. Text
这个作业的任务和前面非常类似,只不过对于 TFIDF 矩阵构建、分类器构建使用的是现有的 scikit-learn
库。
Word distributions
这一部分对每个 token 进行计数:
@mugrade.local_tests
def get_distribution(data):
"""
args:
data -- the training or testing data
return : collections.Counter -- the distribution of word counts
"""
word_counts = collections.Counter()
for _, tokenized_text in data:
word_counts.update(tokenized_text)
return word_counts
Vectorizing
这一部分使用 scikit-learn
的 TfidfVectorizer
组件来创建并得到 TFIDF 矩阵。
需要注意,使用 TfidfVectorizer
初始化方法得到的是一个空的 TFIDF 矩阵,我们需要先在训练集上进行训练 fit_transform
,然后在应用到测试集中。
from sklearn.feature_extraction.text import
@mugrade.local_tests
def create_features(train_data, test_data):
"""creates the feature matrices and label vector for the training and test sets.
args:
train_data, test_data : output of read_data() function
returns: Tuple[train_features, train_labels, test_features]
train_features : scipy.sparse.csr.csr_matrix -- TFIDF feature matrix for the training set
train_labels : np.array[num_train] -- a numpy vector, where 1 stands for Republican and 0 stands for Democrat
test_features : scipy.sparse.csr.csr_matrix -- TFIDF feature matrix for the test set
"""
train_text = [data[1] for data in train_data]
train_bool_labels = [data[0] for data in train_data]
# Convert boolean labels to integers: True (Republican) -> 1, False (Democrat) -> 0
train_labels = np.array(train_bool_labels, dtype=int)
test_text = [data[1] for data in test_data]
vectorizer = TfidfVectorizer(
preprocessor=lambda x: x,
tokenizer=lambda x: x,
token_pattern=None,
min_df=5,
max_df=0.4
)
train_features = vectorizer.fit_transform(train_text)
test_features = vectorizer.transform(test_text)
return (train_features, train_labels, test_features)
注意:
test_features
的生成不能使用fit_transform
!这样会导致测试数据的泄漏!
Training a classifier
这一部分使用 scikit-learn
的 LinearSVC
初始化一个分类器,并使用我们先前得到的特征矩阵进行训练:
from sklearn.svm import LinearSVC
@mugrade.local_tests
def train_classifier(features, labels, C):
"""learns a classifier from the input features and labels using a specified kernel function
args:
features: scipy.sparse.csr.csr_matrix -- sparse matrix of features
labels : numpy.ndarray(bool): binary vector of class labels
C : float -- C regularization parameters
returns: sklearn.svm.LinearSVC -- classifier trained on data
"""
classifier = LinearSVC(
C=C,
loss='hinge',
max_iter=100000, # Increased from 10000 to ensure convergence
random_state=0
)
classifier.fit(features, labels)
return classifier
Cross validation
这个部分我们使用交叉验证的方式来判断哪个参数 的效果最好:
from sklearn.metrics import f1_score
@mugrade.local_tests
def evaluate_classifier(features, labels, C = (0.01, 0.1, 1.0, 10., 100.), train_length=10000):
"""
args:
features: scipy.sparse.csr.csr_matrix -- sparse matrix of features
labels : numpy.ndarray(bool): binary vector of class labels
C : Tuple[float] -- tuple of C regularization parameters
train_length: int -- use _first_ train_length features for training (and the rest of validation)
return : List[Tuple[float, float]] -- list of F1 scores for training/validation for each C parameter
"""
train_features = features[:train_length]
validation_features = features[train_length:]
train_labels = labels[:train_length]
validation_labels = labels[train_length:]
scores_list = []
for C_param in C:
classifier = train_classifier(train_features, train_labels, C_param)
train_predictions = classifier.predict(train_features)
validation_predictions = classifier.predict(validation_features)
train_f1_score = f1_score(train_labels, train_predictions)
validation_f1_score = f1_score(validation_labels, validation_predictions)
scores_list.append((train_f1_score, validation_f1_score))
return scores_list
Classifying new Tweets
这一部分,我们使用前面创建的分类器进行实际的分类任务。
@mugrade.local_tests
def predict_test(train_features, train_labels, test_features):
"""
args:
train_features: scipy.sparse.csr.csr_matrix -- sparse matrix of training features
train_labels : numpy.ndarray(bool): binary vector of training class labels
test_features: scipy.sparse.csr.csr_matrix -- sparse matrix of test set features
return : numpy.ndarray(bool): array of predictions on the test set
"""
# Based on cross validation results, C=1.0 gives the best validation F1 score
best_C = 1.0
# Train classifier on the entire training set
classifier = train_classifier(train_features, train_labels, best_C)
# Make predictions on the test set
predictions = classifier.predict(test_features)
# Convert predictions to boolean array (as required by the return type)
return predictions.astype(bool)
Comments