Homework 3

• 26 min read • 5173 words
Tags: NLP
Categories: Data Science

由于本人刚刚接触数据科学,对 numpy 之类的库的使用不是很熟练,因此会写的琐碎一些。

Homework 3

1. Linear

In this homework, we are going to apply linear regression to the problem of predicting developer satisfaction based upon information about their careers, from a StackOverflow survey. The data from this question is based on the 2019 StackOverflow Survey; accordingly, the subset bundled with this assignment is also released under the Open Database License (ODbL) v1.0. For this problem, you should not use Scikit-Learn, but instead, implement all the least squares solutions manually.

Q1.Q1. Data Pharsing

这一部分是对数据的预处理,包含下面的基本操作:

  1. 将所有数据转换为浮点数。
  2. 将布尔型变量y/NA/n 转换为 +1.0/0.0/0.0。这里我们选择 NA->0.0 的缺失值填充策略。
  3. 忽略对我们没用的一些字符串。
  4. 将有序类别型变量的等级与顺序通过数字大小表示出来

我们创建一个 map 来存储这些映射,然后对每个列使用 .map 方法应用这些映射;对需要计算再得出最终结果的,我们把转换的表达式写出来,然后使用 .apply 方法应用这些转换。需要忽略的字符串调用 .drop 方法即可。

@mugrade.local_tests
def parse_stackoverflow_data():
    """Load data from the "eggs.csv.gz" file, and convert for use in linear regression
    
    Returns:
        pandas.DataFrame, containing the data converted to floating point values appropriately.
    """
    df = pd.read_csv('eggs.csv.gz', dtype=str, keep_default_na=False)
    
    MAPS = {
        'CareerSat': {'vd': -2.0, 'sd': -1.0, 'ne': 0.0, 'NA': 0.0, 'ss': 1.0, 'vs': 2.0},
        'EdLevel': {'other': 0.0, 'bachelors': 1.0, 'masters': 1.5, 'doctoral': 2.0, 'NA': 0.0}, 
        'MgrIdiot': {'NA': -1.0, 'not': -1.0, 'some': 0.0, 'very': 1.0},
        'OpSys': {'win': -1.0, 'mac': 0.0, 'NA': 0.0, 'tux': 1.0, 'BSD': 1.0},
        'OpenSourcer': {'never': 0.0, 'year': 0.5, 'month-year': 1.0, 'month': 2.0, 'NA': 0.0} 
    }

    for col, mapping in MAPS.items():
        if col in df.columns:
            df[col] = df[col].map(mapping)

    def convert_orgsize(val):
        if val == 'NA' or not val:
            return 0.0
        
        cleaned_val = val.replace(',', '').split('-')[0].split(' ')[0]
        try:
            num = float(cleaned_val)
            return np.log(num) if num > 0 else 0.0
        except (ValueError, TypeError):
            return 0.0
    
    def convert_bool(val):
        return 1.0 if val == 'y' else 0.0
    
    def convert_integer(val):
        return float(val) if val != 'NA' else 0.0

    bool_cols = ['MgrWant', 'Dependents', 'DevEnvironVSC', 'DevTypeFullStack', 
                 'EduOtherMOOC', 'EduOtherSelf', 'Extraversion', 'GenderIsMan', 
                 'Hobbyist', 'Student', 'UndergradMajorIsComputerScience',
                 'UnitTestsProcess']
    
    int_cols = ['Age', 'CodeRevHrs', 'ConvertedComp', 'WorkWeekHrs',
                'YearsCode', 'YearsCodePro']
    
    drop_cols = ['Country', 'EduOtherSelf', 'Respondent', ]

    df['OrgSize'] = df['OrgSize'].apply(convert_orgsize)
    
    for col in bool_cols:
        if col in df.columns:
            df[col] = df[col].apply(convert_bool)

    ......

    for col in drop_cols:
        if col in df.columns:
            df = df.drop(columns=[col])

    return df  

Q2.Q2. Splitting Data

这一部分是对训练集-测试集的划分,题目要求自己实现划分逻辑:

  1. 将整个数据集划分成训练集和测试集两部分。
  2. 对这两个部分:
    1. 把待预测的值的列单独拆开。
    2. 给剩余的作为输入的列加上一个全为 1 的列(这是因为偏置项)。然后把这些东西返回。

我们先获取划分索引(注意是沿行划分)。对划分的每个数据集,我们把它的第一列提出来,然后给剩余列 np.hstack 一个形状为 (dataset_shape[0], 1) 的全为 1 的列(注意要创建一个矩阵而不是只创建一个 (dataset_shape[0]) 的向量,不然维度不匹配)。

@mugrade.local_tests
def split_data(df):
    """
      returns: X_train, y_train, X_val, y_val
      X_train  : np.ndarray -- the second 80% of the data features
      y_train : np.ndarray -- the second 80% of the target values
      X_val : np.ndarray -- the first 20% (rounded down) of the data features
      y_val : np.ndarray -- the first 20% of the target valuesn
    """
    if df is None or df.empty:
      return None, None, None, None

    data_array = df.to_numpy()
    num_rows = data_array.shape[0]
    split_index = int(num_rows * 0.2)

    validation_data = data_array[:split_index]
    train_data = data_array[split_index:]

    y_val = validation_data[:,0]
    X_val = np.hstack([validation_data[:, 1:], np.ones((validation_data.shape[0], 1))])

    y_train = train_data[:, 0]
    X_train = np.hstack([train_data[:, 1:], np.ones((train_data.shape[0], 1))])

        

使用 scikit-learn 这个库的实现如下:

from sklearn.model_selection import train_test_split

@mugrade.local_tests
def split_data(df):
    feature = df.iloc[:, 1:]
    target = df.iloc[:, 0]
    X_train, X_val, y_train, y_val = train_test_split(
        feature, target, test_size=0.2, shuffle=False
    )

    X_train_with_bias = np.hstack([X_train.to_numpy(), np.ones(X_train.shape[0], 1)])
    X_val_with_bias = np.hstack([X_val.to_numpy(), np.ones(X_val.shape[0], 1)])
    return X_train_with_bias, y_train.to_numpy(), X_val_with_bias, y_val.to_numpy()

Q3.Q3. Linear Regression

这一部分需要实现具体线性回归逻辑。我们在这个类中存储用于预测结果的参数 θ\theta。只需要使用最小二乘的解析解公式计算:

θ=(XTX)1XTy\theta =(X^TX)^{-1}X^Ty

然后在预测方法 predict 中,使用这个进行预测即可。

def squared_error(y_pred, y):
    return np.mean((y_pred - y)**2)

class LinearRegression():
    def __init__(self, X, y):
        self.theta = np.linalg(X.T @ X, X.T @ y)

    def predict(self, X): 
        return X @ self.theta

Q4.Q4. Evaluation versus baselines

这一部分需要实现基线模型:我们采用计算平均值的算法作为基线模型,然后将基线模型的预测结果和线性回归模型的预测结果进行比较。

@mugrade.local_tests
def evaluate_linear_regression(X_train, y_train, X_val, y_val):
    """ Evaluate the squared error of linear regression versus the simple mean-prediciton baseline.
    
    Args: X_train, y_train, X_val, y_val -- output of split_data() function
    
    Return: Tuple[validation_mse, baseline_mse]:
        validation_mse: float -- squared error of predictions on validation set, when training on training set
        baseline_mse: float -- squared error of predicting the mean on the training set
    """        
    model = LinearRegression(X_train, y_train)
    y_pred = model.predict(X_train)
    validation_mse = squared_error(y_pred, y_val)

    baseline_pred = np.mean(y_train)
    baseline_mse = squared_error(baseline_pred, y_val)

    return validation_mse, baseline_mse 

2. NLP

Q1.Q1. Bag of words, and TFIDF

这一部分需要根据输入文档返回构建好的 TFIDF 稀疏矩阵:

  1. 根据词袋模型计算出文档中每个词的出现次数。
  2. 计算一个词在单篇文档中出现的频率:词频(Term Frequency, TF)与一个词在所有文档中的普遍程度:逆文档频率(Inverse Document Frequency),然后相乘得到 TF-IDF 矩阵

注意,虽然在“自由文本处理”的笔记中我们讲解了TF矩阵的构建,但在实际计算中,我们应该遍历整个文档、计算这个文档的词频,然后将得到的TF-IDF值加入稀疏矩阵中,而不是构建一个完整的TF矩阵。

这里还使用了一个便捷的类 collections.Counter,它接受一个可迭代对象,并返回一个字典:

  • 键是可迭代对象中的元素。
  • 值是该元素出现的次数。
from collections import Counter

@mugrade.local_tests
def tfidf(docs):
    """
    Args:
        docs: list of strings, where each string represents a space-separated
              document
    
    Returns: tuple: (tfidf_matrix, all_words)
        tfidf_matrix: sparse matrix (in any scipy sparse format) of size (# docs) x
               (# total unique words), where i,j entry is TFIDF score for 
               document i and term j
        all_words: list of strings, where the ith element indicates the word
                   that corresponds to the ith column in the TFIDF matrix
    """

词汇表的构建比较简单:我们对所有的文档进行分词、去重,然后构建词语与索引的映射即可:

split_docs = [doc.split(' ') for doc in docs]
vocab = set()
for doc in split_docs:
    vocab.update(doc)
if "" in vocab:
    vocab.remove("")

all_words = sorted(list(vocab))
vocab_dict = {word: i for i, word in enumerate(all_words)}

num_docs = len(split_docs)
num_words = len(vocab)

然后我们构建每个词的逆文档频率:

df = np.zeros(num_words)
for doc in docs:
    unique_doc = set(doc)
    if "" in unique_doc:
        unique_doc.remove("")
    for word in unique_doc:
        df[vocab_dict[word]] += 1

idf = np.log(num_docs / df)

然后,我们遍历所有文档,对文档中的每一个词计算词频,并将词频与前面计算的 idf 值相乘,得到最终 TF-IDF 的元素:

data, rows, cols = [], [], []
for i, doc in enumerate(split_docs):
    word_count = Counter(doc)
    if "" in word_count:
        del word_count[""]
    # Calculate document length without empty strings
    doc_length = len([w for w in doc if w != ''])
    if doc_length > 0:  # Avoid division by zero
        for word, count in word_count.items():
            if word in vocab_dict:
                j = vocab_dict[word]
                tf = count / doc_length
                tfidf_score = tf * idf[j]
                if tfidf_score != 0:  # Only add non-zero values to sparse matrix
                    data.append(tfidf_score)
                    rows.append(i)
                    cols.append(j)

tfidf_matrix = sp.coo_matrix((data, (rows, cols)), shape=(num_docs, num_words))

Q2.Q2. Cosine Similarity

这一部分需要计算余弦相似度、得到相似度矩阵。余弦相似度的分子是元素的点积,分母是元素的二阶范数。

由于结果需要返回一个矩阵,因此对于分母,我们需要进行外积操作来将向量拓展成一个矩阵:

@mugrade.local_tests
def cosine_similarity(X):
    """
    Args:
        X: sparse matrix of TFIDF scores or term frequencies
    
    Returns:
        M: dense numpy array of all pairwise cosine similarities.  That is, the 
           entry M[i,j], should correspond to the cosine similarity between the 
           ith and jth rows of X.
    """
    doc_product = X @ X.T
    norms = np.sqrt(X.power(2).sum(axis=1))
    denominator = np.outer(norms, norms)
    numerator_dense = doc_product.toarray()

    similarity_matrix = np.divide(
        numerator_dense,
        denominator,
        out=np.zeros_like(numerator_dense),
        where=denominator != 0
    )

    return similarity_matrix

Q3.Q3. Analyzing document authorship

这一部分需要我们使用先前构建的TF-IDF矩阵和余弦相似度计算,通过比较 unknown 文档和其他三个 author 文档的相似度,来判断 unknown 文档的 author 究竟是谁。

在具体实现中,我们存储每个 author 对应的文档索引,然后将这些索引作为行索引、unknown 的文档索引作为列索引,得到对应的子相似度矩阵,然后计算平均值。

这部分的处理逻辑非常实用:对于一些待分析的文档,我们:

  1. 构建 TF-IDF 矩阵。
  2. 计算需要的余弦相似度,然后使用这些相似度信息进行分析与计算。
@mugrade.local_tests
def author_cosine_similarity(docs, authors):
    """
    Args:
        docs: list of strings, where each string represents a space-separated
              document
        authors: list of lists, which each list contains the author (or potential authors) of a given document
    
    Returns: tuple: (hamilton_mcs, madison_mcs, jay_mcs)
        hamilton_mcs: Average cosine similarity between all the known Hamilton papers and all the unknown papers.
        madison_mcs: Average cosine similarity between all the known Madison papers and all the unknown papers.
        jay_mcs: Average cosine similarity between all the known Jay papers and all the unknown papers.
    """
    hamilton_index = []
    madison_index = []
    jay_index = []
    unknown_index = []

    for i, author_tuple in enumerate(authors):
        if author_tuple == ("HAMILTON",):
            hamilton_index.append(i)
        elif author_tuple == ("MADISON",):
            madison_index.append(i)
        elif author_tuple == ("JAY",):
            jay_index.append(i)
        elif author_tuple == ("HAMILTON", "MADISON"):
            unknown_index.append(i)


    tfidf_matrix, _ = tfidf(docs)
    similarity_matrix = cosine_similarity(tfidf_matrix)
    hamilton_mcs = similarity_matrix[np.ix_(hamilton_index, unknown_index)].mean()
    madison_mcs = similarity_matrix[np.ix_(madison_index, unknown_index)].mean()
    jay_mcs = similarity_matrix[np.ix_(jay_index, unknown_index)].mean()

    return hamilton_mcs, madison_mcs, jay_mcs

Q4.Q4. Building an n-gram language model

这一部分需要我们实现一个 n-gram 语言模型。这是一种和 TF-IDF 不同的文档处理模型,在最终应用语言模型的模块我们会对它的总体流程进行总结。

首先我们创建文档对应的词汇表。这和我们在 TF-IDF 中做的完全一致

@mugrade.local_tests
class LanguageModel:
    def __init__(self, docs, n):
        """
        Args:
            docs: list of strings, where each string represents a space-separated
                  document
            n: integer, degree of n-gram model
        """
        
        self.n = n

        # Build vocabulary from all documents
        split_docs = [doc.split(' ') for doc in docs]   
        vocab = set()
        for doc in split_docs:
            vocab.update(doc)
        if "" in vocab:
            vocab.remove("")
        self.vocab = vocab

之后,我们进行 n-gram 需要的上下文计数,记录:

  • Context 的次数
  • Context 下当前词的出现次数
# Initialize counting structures
self.counts = collections.defaultdict(lambda: collections.defaultdict(int))
self.count_sums = collections.defaultdict(int)

# Count n-grams from each document separately
for doc in docs:
    words = doc.split(' ')
    words = [w for w in words if w != '']
            
    if len(words) >= n:
        for i in range(len(words) - n + 1):
            context = words[i: i + n - 1]
            current_word = words[i + n - 1]
            context_key = " ".join(context)
            self.counts[context_key][current_word] += 1
            self.count_sums[context_key] += 1

然后,基于这些已经计算好的属性,我们开始计算模型的困惑度。根据题中的困惑度计算公式:

log2P(word1,,wordN)=i=nNlog2P(wordiwordin+1,,wordi1)\log_2 P(\mathrm{word}_1, \ldots, \mathrm{word}_N) = \sum_{i=n}^N \log_2 P(\mathrm{word}_i | \mathrm{word}_{i-n+1}, \ldots, \mathrm{word}_{i-1})

我们只要利用先前计算的 countscount_sums ,来计算每一个位置的上下文概率:

log2P(wordiwordin+1,,wordi1)\log_2 P(\mathrm{word}_i | \mathrm{word}_{i-n+1}, \ldots, \mathrm{word}_{i-1})

即可:

def perplexity(self, text, alpha=1e-3):
    """
    Evaluate perplexity of model on some text.
        
    Args:
        text: string containing space-separated words, on which to compute
        alpha: constant to use in Laplace smoothing
            
    Note: for the purposes of smoothing, the vocabulary size (i.e, the D term)
    should be equal to the total number of unique words used to build the model
    _and_ in the input text to this function.
            
    Returns: perplexity
        perplexity: floating point value, perplexity of the text as evaluated
                    under the model.
    """
    test_words = text.split(' ')
    test_vocab = set(test_words)
    if '' in test_vocab:
        test_vocab.remove('')
        
    vocab = self.vocab.union(test_vocab)
    D = len(vocab)

    total_log_prob = 0.0 
    num_terms = 0

    for i in range(len(test_words) - (self.n - 1)):
        context = test_words[i: i + self.n - 1]
        current_word = test_words[i + self.n - 1]
        context_key = " ".join(context)
            
        # Get counts, handling cases where context or word doesn't exist
        if context_key in self.counts and current_word in self.counts[context_key]:
            current_word_count = self.counts[context_key][current_word]
        else:
            current_word_count = 0
                
        if context_key in self.count_sums:
            context_count = self.count_sums[context_key]
        else:
            context_count = 0
            
        prob = (current_word_count + alpha) / (context_count + D * alpha)
            
        total_log_prob += np.log2(prob)
        num_terms += 1 

    if num_terms == 0:
        return float('inf')
        
    average_log_prob = total_log_prob / num_terms
    perplexity = 2 ** (-average_log_prob)
    return perplexity

Q5.Q5. Author identification via language models

这一部分我们需要将我们的 n-gram 模型用在先前的 unknown author 识别任务中:

  1. 我们为每一个 author 都初始化一个 n-gram 模型,这个模型包含了这个 author 的文档的语言特征。
  2. 为了识别 unknown 文档的实际作者,我们计算每个模型关于这些文档的困惑度。困惑度最低的就是我们预测的 author。
@mugrade.local_tests
def mean_perplexity(docs, authors):
    """
    Args:
        docs: list of strings, where each string represents a space-separated document
        authors: list of lists, which each list contains the author (or potential authors) of a given document

    Returns: tuple: (perp_hamilton, perp_madison, perp_jay)
        perp_hamilton: floating point value, mean perplexity of the unknown Federalist papers for the language 
                       models from Hamilton
        perp_madison: floating point value, mean perplexity of the unknown Federalist papers for the language 
                       models from Madison
        perp_jay: floating point value, mean perplexity of the unknown Federalist papers for the language 
                       models from Jay
    """
    hamilton_docs = []
    madison_docs = []
    jay_docs = []
    unknown_docs = []

    for i, author_tuple in enumerate(authors):
        doc = docs[i]
        if author_tuple == ("HAMILTON",):
            hamilton_docs.append(doc)
        elif author_tuple == ("MADISON",):
            madison_docs.append(doc)
        elif author_tuple == ("JAY",):
            jay_docs.append(doc)
        elif author_tuple == ("HAMILTON", "MADISON"):
            unknown_docs.append(doc)
    
    n = 3
    alpha = 1e-3
    hamilton_model = LanguageModel(hamilton_docs, n)
    madison_model = LanguageModel(madison_docs, n)
    jay_model = LanguageModel(jay_docs, n)

    hamilton_perplexity = [hamilton_model.perplexity(doc, alpha=alpha) for doc in unknown_docs]
    madison_perplexity = [madison_model.perplexity(doc, alpha=alpha) for doc in unknown_docs]
    jay_perplexity = [jay_model.perplexity(doc, alpha=alpha) for doc in unknown_docs]

    return (np.mean(hamilton_perplexity), np.mean(madison_perplexity), np.mean(jay_perplexity))

3. Text

这个作业的任务和前面非常类似,只不过对于 TFIDF 矩阵构建、分类器构建使用的是现有的 scikit-learn 库。

Q3.Q3. Word distributions

这一部分对每个 token 进行计数:

@mugrade.local_tests
def get_distribution(data):
    """
    args: 
        data -- the training or testing data

    return : collections.Counter -- the distribution of word counts
    """
    word_counts = collections.Counter()
    for _, tokenized_text in data:
        word_counts.update(tokenized_text)
    return word_counts

Q4.Q4. Vectorizing

这一部分使用 scikit-learnTfidfVectorizer 组件来创建并得到 TFIDF 矩阵。

需要注意,使用 TfidfVectorizer 初始化方法得到的是一个空的 TFIDF 矩阵,我们需要先在训练集上进行训练 fit_transform,然后在应用到测试集中。

from sklearn.feature_extraction.text import 

@mugrade.local_tests
def create_features(train_data, test_data):
    """creates the feature matrices and label vector for the training and test sets.

    args:
        train_data, test_data : output of read_data() function

    returns: Tuple[train_features, train_labels, test_features]
        train_features : scipy.sparse.csr.csr_matrix -- TFIDF feature matrix for the training set
        train_labels : np.array[num_train] -- a numpy vector, where 1 stands for Republican and 0 stands for Democrat 
        test_features : scipy.sparse.csr.csr_matrix -- TFIDF feature matrix for the test set
    """
    train_text = [data[1] for data in train_data]
    train_bool_labels = [data[0] for data in train_data]
    # Convert boolean labels to integers: True (Republican) -> 1, False (Democrat) -> 0
    train_labels = np.array(train_bool_labels, dtype=int)

    test_text = [data[1] for data in test_data]

    vectorizer = TfidfVectorizer(
        preprocessor=lambda x: x,
        tokenizer=lambda x: x,
        token_pattern=None, 
        min_df=5,
        max_df=0.4
    )

    train_features = vectorizer.fit_transform(train_text)
    test_features = vectorizer.transform(test_text)
    return (train_features, train_labels, test_features)

注意:test_features 的生成不能使用 fit_transform!这样会导致测试数据的泄漏!

Q5.Q5. Training a classifier

这一部分使用 scikit-learnLinearSVC 初始化一个分类器,并使用我们先前得到的特征矩阵进行训练:

from sklearn.svm import LinearSVC
@mugrade.local_tests
def train_classifier(features, labels, C):
    """learns a classifier from the input features and labels using a specified kernel function

    args:
        features: scipy.sparse.csr.csr_matrix -- sparse matrix of features
        labels : numpy.ndarray(bool): binary vector of class labels
        C : float -- C regularization parameters

    returns: sklearn.svm.LinearSVC -- classifier trained on data
    """
    classifier = LinearSVC(
        C=C,
        loss='hinge',
        max_iter=100000,  # Increased from 10000 to ensure convergence
        random_state=0
    )

    classifier.fit(features, labels)
    return classifier

Q6.Q6. Cross validation

这个部分我们使用交叉验证的方式来判断哪个参数 CC 的效果最好:

from sklearn.metrics import f1_score
@mugrade.local_tests
def evaluate_classifier(features, labels, C = (0.01, 0.1, 1.0, 10., 100.), train_length=10000):
    """ 
    args:
        features: scipy.sparse.csr.csr_matrix -- sparse matrix of features
        labels : numpy.ndarray(bool): binary vector of class labels
        C : Tuple[float] -- tuple of C regularization parameters
        train_length: int -- use _first_ train_length features for training (and the rest of validation)
    
    return : List[Tuple[float, float]] -- list of F1 scores for training/validation for each C parameter
    """
    train_features = features[:train_length]
    validation_features = features[train_length:]
    train_labels = labels[:train_length]
    validation_labels = labels[train_length:]

    scores_list = []

    for C_param in C:
        classifier = train_classifier(train_features, train_labels, C_param)
        train_predictions = classifier.predict(train_features)
        validation_predictions = classifier.predict(validation_features)
        
        train_f1_score = f1_score(train_labels, train_predictions)
        validation_f1_score = f1_score(validation_labels, validation_predictions)
        scores_list.append((train_f1_score, validation_f1_score))

    return scores_list

Q7.Q7. Classifying new Tweets

这一部分,我们使用前面创建的分类器进行实际的分类任务。

@mugrade.local_tests
def predict_test(train_features, train_labels, test_features):
    """
    args:
        train_features: scipy.sparse.csr.csr_matrix -- sparse matrix of training features
        train_labels : numpy.ndarray(bool): binary vector of training class labels
        test_features: scipy.sparse.csr.csr_matrix -- sparse matrix of test set features

    return : numpy.ndarray(bool): array of predictions on the test set
    """
    # Based on cross validation results, C=1.0 gives the best validation F1 score
    best_C = 1.0
    
    # Train classifier on the entire training set
    classifier = train_classifier(train_features, train_labels, best_C)
    
    # Make predictions on the test set
    predictions = classifier.predict(test_features)
    
    # Convert predictions to boolean array (as required by the return type)
    return predictions.astype(bool)

Comments

Total words: 5173