Learning Objectives


In the Asynchronous Lecture


In the Synchronous Lecture


If you have any questions while watching the pre-recorded material, be sure to write them down and to bring them up during the synchronous portion of the lecture.




Synchronous Materials





Asynchronous Materials


The following tabs contain pre-recorded lecture materials for class this week. Please review these materials prior to the synchronous lecture.

Total time: Approx. 1 hour and 49 Minutes



_




K-Nearest Neighbors (Concept)





K-Nearest Neighbors (Implementation)


Code from the video

Download the turnout.csv data used in the video.

'''
KNN from Scratch
'''

import numpy as np
import pandas as pd
from plotnine import *
from sklearn.model_selection import train_test_split
import sklearn.metrics as m

# %% -----------------------------------------

# Data 
dat = pd.read_csv("turnout.csv")
y = dat['vote'].values
X = dat.drop(columns=['vote','id']).values


# Split the data.
train_X,test_X,train_y,test_y = train_test_split(X,y,test_size=0.25, random_state=123)


# %% -----------------------------------------

class KNN:
    '''
    Implementation of the K-Nearest Neighbors Algorithm
    '''

    def __init__(self,train_X,train_y,k=3):
        self.X = train_X # Training predictors
        self.y = train_y # Training outcome
        self.k = k       # N nearest neighbors to consider

    def distance(self,loc1,loc2):
        """Calculate euclidean distance.

        Args:
            loc1 (list): row of data.
            loc2 (list): row of data.

        Returns:
            float: euclidean distance of the two provided data points.
        """
        squared_distance = [(loc1[i] - loc2[i])**2 for i in range(len(loc1))]
        sum_squared_distance = sum(squared_distance)
        euclidean_distance = np.sqrt(sum_squared_distance)
        return euclidean_distance

    def get_neighbors(self,test_row):
        """Calculate the distance between a test data point and the
        training data to generate a prediction.

        Args:
            test_row (list): row of test data to locate neighbors from the training data
                for to generate a prediction.

        Returns:
            numpy array: matrix of distance and outcome data for the k nearest
                data points from the training data.
        """

        distances = list() # store the distances

        # iterate through each row of the training data.
        for i, train_row in enumerate(self.X):
            # Calculate the distance between the training data
            # And the test row.
            d = self.distance(train_row,test_row)

            # Append the data entry on.
            distances.append([d,self.y[i]])

        # Sort in ascending order to calculate the nearest neighbors
        distances.sort()

        # Calculate the number of k nearest neighbors.
        nearest_neighbors = distances[:self.k]

        # Return data as a numpy array
        return np.array(nearest_neighbors)

    def generate_prediction(self,nearest_neighbors):
        """Generate a prediction for the new row of data. If a classification problem,
        returns the predicted probability by taking a majority vote of the nearest
        training data. If a regression problem, calculates the average y from the
        nearest training data.

        Args:
            nearest_neighbors (numpy array): matrix of distance and outcome values of the k
                nearest observations from the training data.

        Returns:
            float: returns an average prediction from the outcome of the nearest training data.

        """
        prediction = nearest_neighbors[:,1].mean()
        return prediction

    def fit(self,test_X):
        """Main implementation of the K Nearest Neighbors Algorithm.

        Args:
            test_X (numpy array): matric of new data to generate a prediction for.
        """

        # Store predictions
        predictions = list()

        # Iterate through each row of the new data.
        for test_row in test_X:
            # Calculate the nearest neightbors
            neighbors = self.get_neighbors(test_row)

            # Generate a prediction
            pred = self.generate_prediction(neighbors)

            # Story the prediction
            predictions.append(pred)

        # store predictions
        self.predictions = np.array(predictions)




# %% -----------------------------------------

# Implementation
mod = KNN(train_X=train_X,train_y = train_y,k = 5)
mod.fit(test_X) # Fit the model to the test data.

# Convert to a class
prob = mod.predictions
pred = 1*(prob >= .5)

# Performance
m.accuracy_score(test_y,pred)
m.roc_auc_score(test_y,prob)
m.confusion_matrix(test_y,pred)



# %% -----------------------------------------


# Example of overfitting an underfitting using KNN

# Fake data
np.random.seed(123)
N = 200
x = np.random.normal(size=N)
z = np.random.normal(size=N)
e = np.random.normal(size=N,scale=3)
y = 1 + x + -1.5*x**2 + np.sin(x) + e
D = pd.DataFrame(dict(y=y,x=x))





# %% -----------------------------------------


# Plot the data
(
    ggplot(D,aes(x="x",y="y")) +
    geom_point() +
    theme(figure_size=(10,5))
)

# %% -----------------------------------------

# Run the algorithms for different values of K.

X = D.drop(columns=['y']).values
Y = D['y'].values

# Fit our knn model
mod = KNN(train_X=X,train_y = Y,k = 1)
mod.fit(X) # Fit the model
D['prediction k=1'] = mod.predictions # Save the predictions

mod = KNN(train_X=X,train_y = Y,k = 5)
mod.fit(X) # Fit the model
D['prediction k=5'] = mod.predictions # Save the predictions

mod = KNN(train_X=X,train_y = Y,k = 25)
mod.fit(X) # Fit the model
D['prediction k=25'] = mod.predictions # Save the predictions


# %% -----------------------------------------

# Plot the data with the different fits on the data points. 
(
    ggplot(D,aes(x="x",y="y")) +
    geom_point(alpha=.5) +
    geom_line(D,aes(x="x",y='prediction k=1'),
              color="orange",size=1,alpha=.5) +
    geom_line(D,aes(x="x",y='prediction k=5'),
              color="darkred",size=1.5,alpha=.75) +
    geom_line(D,aes(x="x",y='prediction k=25'),
              color="forestgreen",size=1.5,alpha=.75) +
    theme(figure_size=(10,5))
)



Decision Tree (Concept)





Decision Tree (Implementation)


Code from the video

'''
Decision Tree from Scratch
'''

import numpy as np
import pandas as pd
from plotnine import *
from sklearn.model_selection import train_test_split
import sklearn.metrics as m
from gapminder import gapminder as dat

# %% -----------------------------------------


# Predict life expectancy using the Gapminder data
y = dat['lifeExp']
X = (dat
     .eval("ln_pop = log(pop)")
     .eval("ln_gdp = log(gdpPercap)")
     .eval("cold_war = 1*(year < 1990)")
     [['ln_pop','ln_gdp','cold_war']]
     )


# Split the data.
train_X,test_X,train_y,test_y = train_test_split(X,y,test_size=0.25, random_state=123)


# %% -----------------------------------------

class DecisionTree:
    '''
    Implementation of the decision tree algorithm.
    '''

    def __init__(self,train_X,train_y,max_depth=3):
        self.X = train_X # Training predictors
        self.y = train_y # Training outcome
        self.max_depth = max_depth # How deep should the tree grow? This is an important tuning parameter

    def rss(self,y):
        '''
        Residual sum of squares.
        '''
        return np.sum((y - np.mean(y)) ** 2)

    def cost(self,y_left,y_right):
        '''
        Tree cost function: residual sum of squares for both branches.
        '''
        return self.rss(y_left) + self.rss(y_right)

    def find_rule(self,X,y):
        '''
        Find the split rule. That is, which feature and which threshold of the potential
        values that feature takes on should we split the data by.

        Returns:
            dict: containing the feature and split rule.
        '''

        # initialize the objects
        best_feature = None # empyt
        best_threshold = None # empty
        min_rss = np.inf # Set the error high...

        # For all available features, find the feature that when split
        # minimizes residual sum of squares
        for feature in X.columns:

            # All unique values the selected feature can take on.
            thresholds = X[feature].unique().tolist()
            thresholds.sort() # Sort the threshold values in ascending order
            # drop the first value (when splitting we need values on left and right.)
            thresholds = thresholds[1:]

            # Iterate through the thresholds
            for t in thresholds:

                # Data below the threshold goes on the left
                y_left_ix = X[feature] < t # this is a boolean array

                # the reset fo the data goes on the right.
                y_left, y_right = y[y_left_ix], y[~y_left_ix]

                # Calculate the rss for both sides.
                t_rss = self.cost(y_left, y_right)

                # If the error is less than the current min, record the split rule information
                if t_rss < min_rss:
                    min_rss = t_rss # Save the new minimum (only changes if we can beat it)
                    best_threshold = t # Save the new threshold
                    best_feature = feature # Save the relevant feature

        # Return the feature and the split that minimizes the error.
        return {'feature': best_feature, 'threshold': best_threshold}

    def split(self,X, y, depth):
        '''
        Recursive spliting: split the data up until we reach our stop rule
        dictated by the max depth.
        '''

        # Stopping rule: if we reach the maximum depth
        # OR if there are only two observations left in the node.
        # Return prediction (average y)
        if depth == self.max_depth or len(X) < 2:
            return {"prediction": np.mean(y)}

        # Find the split rule
        rule = self.find_rule(X,y)

        # If data is below the rule, then push to the left node, else go left.
        # Remember that the output of the find_rule() is a dictionary.
        left_ix = X[rule['feature']] < rule['threshold']

        # Recursive Part: split data then call a smaller subset of the data.
        # The depth moves us toward the max_depth (the stopping rule!); This
        # little piece causes the recursion to stop.

        # Go left
        rule['left'] = self.split(X[left_ix], y[left_ix], depth + 1)

        # Go right
        rule['right'] = self.split(X[~left_ix], y[~left_ix], depth + 1)

        # Once the stopping rule is hit, the rules are returned (with the prediction attached
        # to the terminal notes)
        return rule

    def fit(self):
        '''
        Fit the decision tree model using the training data.
        '''
        # recursively split the data and store the split rules.
        self.split_rules = self.split(self.X, self.y, depth = 0)

    def generate_prediction(self,test_row):
        '''
        Make predictions on new (test) observation. use the test data to trace
        the decision tree to make a prediction.
        '''

        prediction = None

        # Copy the split rules
        rules = self.split_rules.copy()

        while prediction is None:

            # Select the feature and the split
            feature = rules['feature']
            threshold = rules['threshold']

            # If below the threhold, go left
            if test_row[feature] < threshold:
                rules = rules['left']
            else: # else go right
                rules = rules['right']

            # If you hit a terminal node, there will be a prediction key
            # If not, return none and continue the while loop
            prediction = rules.get('prediction')

        # Return the prediction from the tree
        return prediction

    def predict(self,test_X):
        '''
        Make a prediction on every observation in the test dataset
        '''
        # Store all the predictions for the new test data
        predictions = list()

        # Iterate through each row in the test data
        for i, test_row in test_X.iterrows():

            # Generate a prediction for this row of data
            pred = self.generate_prediction(test_row)

            # Append the prediction to the data.
            predictions.append(pred)

        # Return the array of predicted values
        return np.array(predictions)


# %% -----------------------------------------


# Test Implementation.
mod = DecisionTree(train_X = train_X,train_y = train_y, max_depth = 4)
mod.fit()

# Look at the split rules
mod.split_rules


# Make a prediction
y_hat = mod.predict(test_X)


# Examine out-of-sample performance
m.r2_score(test_y,y_hat)
m.mean_absolute_error(test_y,y_hat)



# %% -----------------------------------------

# Printing the Decision Rules

def print_tree_rules(rules,indent=0,digit=2):
    '''
    Recursively print the decision tree rules.
    '''
    pred = rules.get('prediction')
    if pred is not None:
        print('\t'*indent + f"y = {round(pred,digit)}",end="\n")
    else:
        print('\t'*indent + f"IF {rules['feature']} < {round(rules['threshold'],digit)}")
        print_tree_rules(rules['left'],indent=indent + 1)
        print('\t'*indent + "ELSE")
        print_tree_rules(rules['right'],indent=indent + 1)


# Print the rules.
print("DECISION RULES\n")
print_tree_rules(mod.split_rules)



 

The following materials were generated for students enrolled in PPOL564. Please do not distribute without permission.

ed769@georgetown.edu | www.ericdunford.com