Task1: Stacking Model (90 points)
In this task, you will implement several core parts of the stacking machine learning method in Pyspark. More specifically, you are required to complete a series of functions in the file with methods in PySpark SQL and PySpark MLlib. Details are listed as follows:

Dataset Description
The dataset consists of sentences from customer reviews of different restaurants. There are 2241, 800, 800 customer reviews in the train, development, and test datasets, respectively. It should be noted that there is at least one sentence in each customer review and each customer review may not be with ending punctuation such as ., ?.
The task is to identify the category of each customer review using the review text and the trained model.
The categories include:
FOOD: reviews that involve comments on the food.
e.g. “All the appetizers and salads were fabulous , the steak was mouth watering and the pasta was delicious”
PAS: reviews that only involve comments on price, ambience, or service.
e.g. “Now it ‘s so crowded and loud you ca n’t even talk to the person next to you”
MISC: reviews that do not belong to the above categories including sentences that are general recommendations reviews describing the reviewer’s personal experience or context, but that do not usually provide information on the restaurant quality
e.g. “Your friends will thank you for introducing them to this gem!”
e.g. “I knew upon visiting NYC that I wanted to try an original deli”
You can view samples from the dataset using to get five samples with descript column showing the review text and category column showing the annotated class.
Task 1.1 (30 points): Build a Preprocessing Pipeline
In this task, you need to complete the base_features_gen_pipeline() function in, which outputs a pipeline (NOTE: not a pipeline model). The returned pipeline will be used to process the data, construct the feature vectors and labels.

More specifically, the function is defined as

def build_base_features_pipeline(input_descript_col=”descript”, input_category_col=”category”, output_feature_col=”features”, output_label_col=”label”):
The function needs to tokenize each customer review (i.e., the descript) and generate bag of words count vectors as features. It also needs to convert the category into label which is an integer between 0 and 2.

The returned type of this function should be

Task 1.2 (30 points): Generate Meta Features for Training
In this task, you need to complete the gen_meta_features() function in, which outputs a dataframe with generated meta features for training the meta classifier.

More specifically, the function is defined as

def gen_meta_features(training_df, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2):
The description of input parameters are as below:

training_df: the dataframe contains features, labels, and group ids for training data. The schema of training_df is:

|– id: integer (nullable = true)
|– features: vector (nullable = true)
|– label: double (nullable = false)
|– label_0: double (nullable = false)
|– label_1: double (nullable = false)
|– label_2: double (nullable = false)
|– group: integer (nullable = true)
where features and label are generated using the pipeline built in Task 1.1. label_x corresponds to the binary label of label x (e.g., label_0==0 means that label!=0). group is the group id as defined in the lecture slides (i.e., L7P45).

nb_x: the predefined x-th Naive Bayes model (i.e., the one will be trained using label_x)

svm_x: the predefined x-th SVM model (i.e., the one will be trained using label_x)
The output of the function is a dataframe with the following schema:

|– id: integer (nullable = true)
|– group: integer (nullable = true)
|– features: vector (nullable = true)
|– label: double (nullable = false)
|– label_0: double (nullable = false)
|– label_1: double (nullable = false)
|– label_2: double (nullable = false)
|– nb_pred_0: double (nullable = false)
|– nb_pred_1: double (nullable = false)
|– nb_pred_2: double (nullable = false)
|– svm_pred_0: double (nullable = false)
|– svm_pred_1: double (nullable = false)
|– svm_pred_2: double (nullable = false)
|– joint_pred_0: double (nullable = false)
|– joint_pred_1: double (nullable = false)
|– joint_pred_2: double (nullable = false)
where nb_pred_x is the prediction of model nb_x, svm_pred_x is the prediction of model svm_x, and joint_pred_x is the joint prediction of model nb_x and svm_x.

More specifically, the value of joint_pred_x is the decimal number of the joint prediction in L7P51 (hence it ranges from 0 to 3). E.g., if nb_pred_1==1 and svm_pred_1==0, then joint_pred_1==2.

Task 1.3 (30 points): Obtain the prediction for the test data
In this task, you need to complete the test_prediction() function in, which outputs a dataframe with predicted labels of the test data.

More specifically, the function is defined as

def test_prediction(test_df, base_features_pipeline_model, gen_base_pred_pipeline_model, gen_meta_feature_pipeline_model, meta_classifier):
The description of input parameters are as below:

test_df: the dataframe contains features, labels, and group ids for test data. The schema of training_df is:

|– id: integer (nullable = true)
|– category: string (nullable = true)
|– descript: string (nullable = true)
base_features_pipeline_model is the fitted pipeline model for the pipeline built in Task 1.1.

gen_base_pred_pipeline_model is the fitted pipeline model that generates predictions of base classifiers for the test data.
gen_meta_feature_pipeline_model is the fitted pipeline model that generates meta features of the data from the single and joint predictions of base classifiers
meta_classifier is the fitted meta classifier.
you will see how we declare all the above 3 pipeline models in the examples below.
The output of the function is a dataframe with the following schema:

|– id: integer (nullable = true)
|– label: double (nullable = false)
|– final_prediction: double (nullable = false)
where labels are generated using the pipeline built in Task 1.1, and final_prediction is the prediction result of the test data.

The evaluation of the project is based on the correctness of your implementation. The three subtasks will be tested independently, i.e., even if you don’t complete task 1.1 and task 1.2, you may still get 30 points, if you have correctly implemented task 1.3.

Similar to Project 1, we will set a very loose time threshold T just in case your code takes long to complete… If your implementation does not finish prediction in a certain time, it will be killed. Hence, 0 score.

Task 2: Report (10 points)
You are also required to submit a report named report.pdf. Specifically, in the report, you are at least expected to answer the following questions:

Evaluation of your stacking model on the test data.
How would you improve the performance (e.g., F1) of the stacking model.
For task 2.2, you may try from the following directions:

the base feature generation
the meta feature generation
the hyper-parameters of base and meta models
Hint: make proper use of the development data.

How to execute your implementation (EXAMPLE)
from pyspark.sql import *
from pyspark import SparkConf

from pyspark.sql import DataFrame
from pyspark.sql.functions import rand
from pyspark.sql.types import IntegerType, DoubleType

from import Pipeline
from import OneHotEncoderEstimator, VectorAssembler
from import LogisticRegression, LinearSVC, NaiveBayes
from import MulticlassClassificationEvaluator

from submission import base_features_gen_pipeline, gen_meta_features, test_prediction

import random
rseed = 1024

def gen_binary_labels(df):
df = df.withColumn(‘label_0’, (df[‘label’] == 0).cast(DoubleType()))
df = df.withColumn(‘label_1’, (df[‘label’] == 1).cast(DoubleType()))
df = df.withColumn(‘label_2’, (df[‘label’] == 2).cast(DoubleType()))
return df

# Create a Spark Session
conf = SparkConf().setMaster(“local[*]”).setAppName(“lab3”)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Load data
train_data =“proj2train.csv”, format=”csv”, sep=”\t”, inferSchema=”true”, header=”true”)
test_data =“proj2test.csv”, format=”csv”, sep=”\t”, inferSchema=”true”, header=”true”)

# build the pipeline from task 1.1
base_features_pipeline = base_features_gen_pipeline()
# Fit the pipeline using train_data
base_features_pipeline_model =
# Transform the train_data using fitted pipeline
training_set = base_features_pipeline_model.transform(train_data)
# assign random groups and binarize the labels
training_set = training_set.withColumn(‘group’, (rand(rseed)*5).cast(IntegerType()))
training_set = gen_binary_labels(training_set)

# define base models
nb_0 = NaiveBayes(featuresCol=’features’, labelCol=’label_0′, predictionCol=’nb_pred_0′, probabilityCol=’nb_prob_0′, rawPredictionCol=’nb_raw_0′)
nb_1 = NaiveBayes(featuresCol=’features’, labelCol=’label_1′, predictionCol=’nb_pred_1′, probabilityCol=’nb_prob_1′, rawPredictionCol=’nb_raw_1′)
nb_2 = NaiveBayes(featuresCol=’features’, labelCol=’label_2′, predictionCol=’nb_pred_2′, probabilityCol=’nb_prob_2′, rawPredictionCol=’nb_raw_2′)
svm_0 = LinearSVC(featuresCol=’features’, labelCol=’label_0′, predictionCol=’svm_pred_0′, rawPredictionCol=’svm_raw_0′)
svm_1 = LinearSVC(featuresCol=’features’, labelCol=’label_1′, predictionCol=’svm_pred_1′, rawPredictionCol=’svm_raw_1′)
svm_2 = LinearSVC(featuresCol=’features’, labelCol=’label_2′, predictionCol=’svm_pred_2′, rawPredictionCol=’svm_raw_2′)

# build pipeline to generate predictions from base classifiers, will be used in task 1.3
gen_base_pred_pipeline = Pipeline(stages=[nb_0, nb_1, nb_2, svm_0, svm_1, svm_2])
gen_base_pred_pipeline_model =

# task 1.2
meta_features = gen_meta_features(training_set, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2)

# build onehotencoder and vectorassembler pipeline
onehot_encoder = OneHotEncoderEstimator(inputCols=[‘nb_pred_0’, ‘nb_pred_1’, ‘nb_pred_2’, ‘svm_pred_0’, ‘svm_pred_1’, ‘svm_pred_2’, ‘joint_pred_0’, ‘joint_pred_1’, ‘joint_pred_2’], outputCols=[‘vec{}’.format(i) for i in range(9)])
vector_assembler = VectorAssembler(inputCols=[‘vec{}’.format(i) for i in range(9)], outputCol=’meta_features’)
gen_meta_feature_pipeline = Pipeline(stages=[onehot_encoder, vector_assembler])
gen_meta_feature_pipeline_model =
meta_features = gen_meta_feature_pipeline_model.transform(meta_features)

# train the meta clasifier
lr_model = LogisticRegression(featuresCol=’meta_features’, labelCol=’label’, predictionCol=’final_prediction’, maxIter=20, regParam=1., elasticNetParam=0)
meta_classifier =

# task 1.3
pred_test = test_prediction(test_data, base_features_pipeline_model, gen_base_pred_pipeline_model, gen_meta_feature_pipeline_model, meta_classifier)

# Evaluation
evaluator = MulticlassClassificationEvaluator(predictionCol=”prediction”,metricName=’f1′)
print(evaluator.evaluate(pred_test, {evaluator.predictionCol:’final_prediction’}))
test_set = base_features_pipeline_model.transform(test_data)

test_df = gen_base_pred_pipeline_model.transform(test_set)

from submission import binary_udf
test_df = test_df.withColumn(‘joint_pred_0’,binary_udf(test_df.nb_pred_0,test_df.svm_pred_0))
test_df = test_df.withColumn(‘joint_pred_1’,binary_udf(test_df.nb_pred_1,test_df.svm_pred_1))
test_df = test_df.withColumn(‘joint_pred_2’,binary_udf(test_df.nb_pred_2,test_df.svm_pred_2))
| id| features|label| nb_raw_0| nb_prob_0|nb_pred_0| nb_raw_1| nb_prob_1|nb_pred_1| nb_raw_2| nb_prob_2|nb_pred_2| svm_raw_0|svm_pred_0| svm_raw_1|svm_pred_1| svm_raw_2|svm_pred_2|joint_pred_0|joint_pred_1|joint_pred_2|
| 0|(5421,[0,4,33,236…| 0.0|[-48.341369261424…|[0.19614886584696…| 1.0|[-46.208165054373…|[0.97923503027322…| 0.0|[-46.937290227051…|[0.91632505550240…| 0.0|[0.14642672426259…| 0.0|[0.66325549724766…| 0.0|[0.16649606651848…| 0.0| 2.0| 0.0| 0.0|
| 1|(5421,[0,3,5,7,9,…| 2.0|[-86.458452400279…|[0.88093279726886…| 0.0|[-87.161290233883…|[0.78900659615814…| 0.0|[-85.250071826316…|[0.99971827670784…| 0.0|[0.23675860567462…| 0.0|[1.24748128472761…| 0.0|[0.29771837908168…| 0.0| 0.0| 0.0| 0.0|
| 2|(5421,[1,3,4,13,5…| 0.0|[-45.733480920531…|[0.00200508286711…| 1.0|[-39.410490713823…|[0.99945946104157…| 0.0|[-39.745173934995…|[0.99856722409015…| 0.0|[-3.1476511545634…| 1.0|[2.64649412476236…| 0.0|[2.28273385399354…| 0.0| 3.0| 0.0| 0.0|
| 3|(5421,[0,4,5,19,3…| 0.0|[-45.521727994914…|[0.09985746264376…| 1.0|[-43.102033368085…|[0.96608678037306…| 0.0|[-43.018979134885…|[0.97111546589242…| 0.0|[-1.5186267475890…| 1.0|[1.82614438702245…| 0.0|[0.94239224060138…| 0.0| 3.0| 0.0| 0.0|
| 4|(5421,[0,1,4,9,10…| 0.0|[-107.76931569139…|[0.16163027067658…| 1.0|[-105.32076885332…|[0.99603735123497…| 0.0|[-103.86801466547…|[0.99988049905916…| 0.0|[-1.7029806277348…| 1.0|[1.43217514313774…| 0.0|[1.98453099512514…| 0.0| 3.0| 0.0| 0.0|
| 5|(5421,[0,5,10,25,…| 1.0|[-55.746965915041…|[0.80277903614147…| 0.0|[-56.323354164214…|[0.35543710996769…| 1.0|[-54.089944198750…|[0.99756642905225…| 0.0|[0.62258808476065…| 0.0|[-1.0988600246440…| 1.0|[1.78099399558658…| 0.0| 0.0| 3.0| 0.0|
| 6|(5421,[30,72,114,…| 0.0|[-37.441844635618…|[0.22063780283694…| 1.0|[-35.710362320208…|[0.95303086285398…| 0.0|[-35.634301482321…|[0.97954068892739…| 0.0|[-0.9221268759861…| 1.0|[1.04132598974703…| 0.0|[0.84501255054035…| 0.0| 3.0| 0.0| 0.0|
| 7|(5421,[28,47,224,…| 0.0|[-61.555085891356…|[1.19822854435072…| 1.0|[-53.936077094577…|[0.99874256589826…| 0.0|[-53.585811374739…|[0.99970478705057…| 0.0|[-2.9992214393151…| 1.0|[1.08542931189704…| 0.0|[2.30954101985323…| 0.0| 3.0| 0.0| 0.0|
| 8|(5421,[0,20,78,15…| 0.0|[-45.969809042614…|[0.03219581050514…| 1.0|[-43.088650689629…|[0.91825564600412…| 0.0|[-42.809376742006…|[0.98112428258869…| 0.0|[-1.1536709564272…| 1.0|[0.92240331710778…| 0.0|[1.01005247252827…| 0.0| 3.0| 0.0| 0.0|
| 9|(5421,[0,1,4,5,13…| 0.0|[-73.214519796959…|[0.00627390454029…| 1.0|[-68.221243874330…|[0.99958137503563…| 0.0|[-67.957350922683…|[0.99950174908233…| 0.0|[-1.0999485792047…| 1.0|[2.23512888009423…| 0.0|[2.40381628415323…| 0.0| 3.0| 0.0| 0.0|
| 10|(5421,[1,2,6,9,11…| 0.0|[-51.521673286274…|[0.13504572695472…| 1.0|[-48.846111146072…|[0.98872315534770…| 0.0|[-48.689499903920…|[0.99163800188079…| 0.0|[-0.7949575536511…| 1.0|[1.00681860631537…| 0.0|[0.91888872612383…| 0.0| 3.0| 0.0| 0.0|
| 11|(5421,[1,2,3,8,10…| 0.0|[-119.44002381149…|[0.86032130506183…| 0.0|[-117.92098680914…|[0.99746323578424…| 0.0|[-118.42727772761…|[0.98528484158654…| 0.0|[-1.2143396249327…| 1.0|[0.75123091643382…| 0.0|[2.07141992214092…| 0.0| 1.0| 0.0| 0.0|
| 12|(5421,[0,1,2,12,1…| 0.0|[-76.646541759777…|[0.96735376195924…| 0.0|[-77.338608446797…|[0.77522603557866…| 0.0|[-76.306151489938…|[0.97090706641918…| 0.0|[-0.1756256424518…| 1.0|[1.42758276643806…| 0.0|[1.20342450281338…| 0.0| 1.0| 0.0| 0.0|
| 13|(5421,[1,6,36,50,…| 2.0|[-36.122478124374…|[0.97977830701575…| 0.0|[-36.031489000173…|[0.98694938035763…| 0.0|[-38.809854579901…|[0.09297650388200…| 1.0|[1.39301970489154…| 0.0|[1.05076055412813…| 0.0|[-1.2862130348015…| 1.0| 0.0| 0.0| 3.0|
| 14|(5421,[0,44,972],…| 0.0|[-19.256089290286…|[0.32346999193328…| 1.0|[-18.118770634098…|[0.91441018676601…| 0.0|[-18.225783805668…|[0.87204374684082…| 0.0|[-0.2892035826628…| 1.0|[0.56662381552801…| 0.0|[0.89325835176373…| 0.0| 3.0| 0.0| 0.0|
| 15|(5421,[44,82,3910…| 0.0|[-25.087133544553…|[0.04799918454551…| 1.0|[-22.273569300616…|[0.97398137967931…| 0.0|[-22.335166245255…|[0.95377363377287…| 0.0|[-0.9289224223325…| 1.0|[0.53942533672335…| 0.0|[0.81071212788825…| 0.0| 3.0| 0.0| 0.0|
| 16|(5421,[0,1,2,3,5,…| 0.0|[-170.98355482561…|[0.00267243432700…| 1.0|[-166.36647572689…|[0.96682012770873…| 0.0|[-160.41741315074…|[0.99999999757322…| 0.0|[-4.7531557365169…| 1.0|[2.41131732848790…| 0.0|[4.88963013241530…| 0.0| 3.0| 0.0| 0.0|
| 17|(5421,[0,6,13,14,…| 0.0|[-31.946195377668…|[0.04051716863399…| 1.0|[-28.006938439494…|[0.99962590717990…| 0.0|[-29.014814317206…|[0.98354636191296…| 0.0|[-0.9192259319662…| 1.0|[2.74465265316217…| 0.0|[0.34310097470671…| 0.0| 3.0| 0.0| 0.0|
| 18|(5421,[0,7,13,19,…| 0.0|[-63.055274812118…|[0.00370756283503…| 1.0|[-56.952017561664…|[0.99964522251268…| 0.0|[-57.448964475704…|[0.99863111403071…| 0.0|[-2.5109468897500…| 1.0|[2.41893165644261…| 0.0|[2.12026360140104…| 0.0| 3.0| 0.0| 0.0|
| 19|(5421,[0,20,23,31…| 2.0|[-48.222854335847…|[0.54944473160571…| 0.0|[-47.096635012403…|[0.98202540534417…| 0.0|[-47.172969895708…|[0.95609446948711…| 0.0|[-0.2754390165240…| 1.0|[0.62263875699349…| 0.0|[0.73376143997250…| 0.0| 1.0| 0.0| 0.0|
test_features = gen_meta_feature_pipeline_model.transform(test_df)
| meta_features|
test_predict = meta_classifier.transform(test_features)
result =‘id’,’label’,’final_prediction’)
| id|label|final_prediction|
| 0| 0.0| 0.0|
| 1| 2.0| 0.0|
| 2| 0.0| 0.0|
| 3| 0.0| 0.0|
| 4| 0.0| 0.0|
| 5| 1.0| 1.0|
| 6| 0.0| 0.0|
| 7| 0.0| 0.0|
| 8| 0.0| 0.0|
| 9| 0.0| 0.0|
| 10| 0.0| 0.0|
| 11| 0.0| 0.0|
| 12| 0.0| 0.0|
| 13| 2.0| 2.0|
| 14| 0.0| 0.0|
| 15| 0.0| 0.0|
| 16| 0.0| 0.0|
| 17| 0.0| 0.0|
| 18| 0.0| 0.0|
| 19| 2.0| 0.0|
Project Submission and Feedback
For the project submission, you are required to submit the following files:

Your implementation in the python file
The report report.pdf.
Detailed instructions about using give to submit the project files will be announced later via Piazza.