In the Asynchronous Lecture
In the Synchronous Lecture
If you have any questions while watching the pre-recorded material, be sure to write them down and to bring them up during the synchronous portion of the lecture.
Slides on Trees, Bags, and Forests
Model comparison and hyper parameter tuning using sklearn
.ipynb
Notebook.The following tabs contain pre-recorded lecture materials for class this week. Please review these materials prior to the synchronous lecture.
Total time: Approx. 1 hour and 49 Minutes
Download the turnout.csv data used in the video.
'''
KNN from Scratch
'''
import numpy as np
import pandas as pd
from plotnine import *
from sklearn.model_selection import train_test_split
import sklearn.metrics as m
# %% -----------------------------------------
# Data
= pd.read_csv("turnout.csv")
dat = dat['vote'].values
y = dat.drop(columns=['vote','id']).values
X
# Split the data.
= train_test_split(X,y,test_size=0.25, random_state=123)
train_X,test_X,train_y,test_y
# %% -----------------------------------------
class KNN:
'''
Implementation of the K-Nearest Neighbors Algorithm
'''
def __init__(self,train_X,train_y,k=3):
self.X = train_X # Training predictors
self.y = train_y # Training outcome
self.k = k # N nearest neighbors to consider
def distance(self,loc1,loc2):
"""Calculate euclidean distance.
Args:
loc1 (list): row of data.
loc2 (list): row of data.
Returns:
float: euclidean distance of the two provided data points.
"""
= [(loc1[i] - loc2[i])**2 for i in range(len(loc1))]
squared_distance = sum(squared_distance)
sum_squared_distance = np.sqrt(sum_squared_distance)
euclidean_distance return euclidean_distance
def get_neighbors(self,test_row):
"""Calculate the distance between a test data point and the
training data to generate a prediction.
Args:
test_row (list): row of test data to locate neighbors from the training data
for to generate a prediction.
Returns:
numpy array: matrix of distance and outcome data for the k nearest
data points from the training data.
"""
= list() # store the distances
distances
# iterate through each row of the training data.
for i, train_row in enumerate(self.X):
# Calculate the distance between the training data
# And the test row.
= self.distance(train_row,test_row)
d
# Append the data entry on.
self.y[i]])
distances.append([d,
# Sort in ascending order to calculate the nearest neighbors
distances.sort()
# Calculate the number of k nearest neighbors.
= distances[:self.k]
nearest_neighbors
# Return data as a numpy array
return np.array(nearest_neighbors)
def generate_prediction(self,nearest_neighbors):
"""Generate a prediction for the new row of data. If a classification problem,
returns the predicted probability by taking a majority vote of the nearest
training data. If a regression problem, calculates the average y from the
nearest training data.
Args:
nearest_neighbors (numpy array): matrix of distance and outcome values of the k
nearest observations from the training data.
Returns:
float: returns an average prediction from the outcome of the nearest training data.
"""
= nearest_neighbors[:,1].mean()
prediction return prediction
def fit(self,test_X):
"""Main implementation of the K Nearest Neighbors Algorithm.
Args:
test_X (numpy array): matric of new data to generate a prediction for.
"""
# Store predictions
= list()
predictions
# Iterate through each row of the new data.
for test_row in test_X:
# Calculate the nearest neightbors
= self.get_neighbors(test_row)
neighbors
# Generate a prediction
= self.generate_prediction(neighbors)
pred
# Story the prediction
predictions.append(pred)
# store predictions
self.predictions = np.array(predictions)
# %% -----------------------------------------
# Implementation
= KNN(train_X=train_X,train_y = train_y,k = 5)
mod # Fit the model to the test data.
mod.fit(test_X)
# Convert to a class
= mod.predictions
prob = 1*(prob >= .5)
pred
# Performance
m.accuracy_score(test_y,pred)
m.roc_auc_score(test_y,prob)
m.confusion_matrix(test_y,pred)
# %% -----------------------------------------
# Example of overfitting an underfitting using KNN
# Fake data
123)
np.random.seed(= 200
N = np.random.normal(size=N)
x = np.random.normal(size=N)
z = np.random.normal(size=N,scale=3)
e = 1 + x + -1.5*x**2 + np.sin(x) + e
y = pd.DataFrame(dict(y=y,x=x))
D
# %% -----------------------------------------
# Plot the data
(="x",y="y")) +
ggplot(D,aes(x+
geom_point() =(10,5))
theme(figure_size
)
# %% -----------------------------------------
# Run the algorithms for different values of K.
= D.drop(columns=['y']).values
X = D['y'].values
Y
# Fit our knn model
= KNN(train_X=X,train_y = Y,k = 1)
mod # Fit the model
mod.fit(X) 'prediction k=1'] = mod.predictions # Save the predictions
D[
= KNN(train_X=X,train_y = Y,k = 5)
mod # Fit the model
mod.fit(X) 'prediction k=5'] = mod.predictions # Save the predictions
D[
= KNN(train_X=X,train_y = Y,k = 25)
mod # Fit the model
mod.fit(X) 'prediction k=25'] = mod.predictions # Save the predictions
D[
# %% -----------------------------------------
# Plot the data with the different fits on the data points.
(="x",y="y")) +
ggplot(D,aes(x=.5) +
geom_point(alpha="x",y='prediction k=1'),
geom_line(D,aes(x="orange",size=1,alpha=.5) +
color="x",y='prediction k=5'),
geom_line(D,aes(x="darkred",size=1.5,alpha=.75) +
color="x",y='prediction k=25'),
geom_line(D,aes(x="forestgreen",size=1.5,alpha=.75) +
color=(10,5))
theme(figure_size )
'''
Decision Tree from Scratch
'''
import numpy as np
import pandas as pd
from plotnine import *
from sklearn.model_selection import train_test_split
import sklearn.metrics as m
from gapminder import gapminder as dat
# %% -----------------------------------------
# Predict life expectancy using the Gapminder data
= dat['lifeExp']
y = (dat
X eval("ln_pop = log(pop)")
.eval("ln_gdp = log(gdpPercap)")
.eval("cold_war = 1*(year < 1990)")
.'ln_pop','ln_gdp','cold_war']]
[[
)
# Split the data.
= train_test_split(X,y,test_size=0.25, random_state=123)
train_X,test_X,train_y,test_y
# %% -----------------------------------------
class DecisionTree:
'''
Implementation of the decision tree algorithm.
'''
def __init__(self,train_X,train_y,max_depth=3):
self.X = train_X # Training predictors
self.y = train_y # Training outcome
self.max_depth = max_depth # How deep should the tree grow? This is an important tuning parameter
def rss(self,y):
'''
Residual sum of squares.
'''
return np.sum((y - np.mean(y)) ** 2)
def cost(self,y_left,y_right):
'''
Tree cost function: residual sum of squares for both branches.
'''
return self.rss(y_left) + self.rss(y_right)
def find_rule(self,X,y):
'''
Find the split rule. That is, which feature and which threshold of the potential
values that feature takes on should we split the data by.
Returns:
dict: containing the feature and split rule.
'''
# initialize the objects
= None # empyt
best_feature = None # empty
best_threshold = np.inf # Set the error high...
min_rss
# For all available features, find the feature that when split
# minimizes residual sum of squares
for feature in X.columns:
# All unique values the selected feature can take on.
= X[feature].unique().tolist()
thresholds # Sort the threshold values in ascending order
thresholds.sort() # drop the first value (when splitting we need values on left and right.)
= thresholds[1:]
thresholds
# Iterate through the thresholds
for t in thresholds:
# Data below the threshold goes on the left
= X[feature] < t # this is a boolean array
y_left_ix
# the reset fo the data goes on the right.
= y[y_left_ix], y[~y_left_ix]
y_left, y_right
# Calculate the rss for both sides.
= self.cost(y_left, y_right)
t_rss
# If the error is less than the current min, record the split rule information
if t_rss < min_rss:
= t_rss # Save the new minimum (only changes if we can beat it)
min_rss = t # Save the new threshold
best_threshold = feature # Save the relevant feature
best_feature
# Return the feature and the split that minimizes the error.
return {'feature': best_feature, 'threshold': best_threshold}
def split(self,X, y, depth):
'''
Recursive spliting: split the data up until we reach our stop rule
dictated by the max depth.
'''
# Stopping rule: if we reach the maximum depth
# OR if there are only two observations left in the node.
# Return prediction (average y)
if depth == self.max_depth or len(X) < 2:
return {"prediction": np.mean(y)}
# Find the split rule
= self.find_rule(X,y)
rule
# If data is below the rule, then push to the left node, else go left.
# Remember that the output of the find_rule() is a dictionary.
= X[rule['feature']] < rule['threshold']
left_ix
# Recursive Part: split data then call a smaller subset of the data.
# The depth moves us toward the max_depth (the stopping rule!); This
# little piece causes the recursion to stop.
# Go left
'left'] = self.split(X[left_ix], y[left_ix], depth + 1)
rule[
# Go right
'right'] = self.split(X[~left_ix], y[~left_ix], depth + 1)
rule[
# Once the stopping rule is hit, the rules are returned (with the prediction attached
# to the terminal notes)
return rule
def fit(self):
'''
Fit the decision tree model using the training data.
'''
# recursively split the data and store the split rules.
self.split_rules = self.split(self.X, self.y, depth = 0)
def generate_prediction(self,test_row):
'''
Make predictions on new (test) observation. use the test data to trace
the decision tree to make a prediction.
'''
= None
prediction
# Copy the split rules
= self.split_rules.copy()
rules
while prediction is None:
# Select the feature and the split
= rules['feature']
feature = rules['threshold']
threshold
# If below the threhold, go left
if test_row[feature] < threshold:
= rules['left']
rules else: # else go right
= rules['right']
rules
# If you hit a terminal node, there will be a prediction key
# If not, return none and continue the while loop
= rules.get('prediction')
prediction
# Return the prediction from the tree
return prediction
def predict(self,test_X):
'''
Make a prediction on every observation in the test dataset
'''
# Store all the predictions for the new test data
= list()
predictions
# Iterate through each row in the test data
for i, test_row in test_X.iterrows():
# Generate a prediction for this row of data
= self.generate_prediction(test_row)
pred
# Append the prediction to the data.
predictions.append(pred)
# Return the array of predicted values
return np.array(predictions)
# %% -----------------------------------------
# Test Implementation.
= DecisionTree(train_X = train_X,train_y = train_y, max_depth = 4)
mod
mod.fit()
# Look at the split rules
mod.split_rules
# Make a prediction
= mod.predict(test_X)
y_hat
# Examine out-of-sample performance
m.r2_score(test_y,y_hat)
m.mean_absolute_error(test_y,y_hat)
# %% -----------------------------------------
# Printing the Decision Rules
def print_tree_rules(rules,indent=0,digit=2):
'''
Recursively print the decision tree rules.
'''
= rules.get('prediction')
pred if pred is not None:
print('\t'*indent + f"y = {round(pred,digit)}",end="\n")
else:
print('\t'*indent + f"IF {rules['feature']} < {round(rules['threshold'],digit)}")
'left'],indent=indent + 1)
print_tree_rules(rules[print('\t'*indent + "ELSE")
'right'],indent=indent + 1)
print_tree_rules(rules[
# Print the rules.
print("DECISION RULES\n")
print_tree_rules(mod.split_rules)
The following materials were generated for students enrolled in PPOL564. Please do not distribute without permission.
ed769@georgetown.edu | www.ericdunford.com