Adding the visualizer

main
Gijs 3 years ago
parent 56061fa6dc
commit da22430d9b

@ -0,0 +1,351 @@
# Using this tutorial : From scratch: https://machinelearningmastery.com/implement-random-forest-scratch-python/
# Explanation on decision trees & random forest: https://pad.constantvzw.org/p/anais_berck_frart_Meise_random_forest
# Random Forest Algorithm on Sonar Dataset
from random import seed
from random import randrange
from csv import reader
from math import sqrt
import json
# Load a CSV file. Definition of the function to read the csv and create dataset here
def load_csv(filename):
dataset = list()
with open(filename, 'r') as file:
csv_reader = reader(file)
for row in csv_reader:
if not row:
continue
dataset.append(row)
return dataset
# Convert string column to float - original dataset is in string format
def str_column_to_float(dataset, column):
for row in dataset:
row[column] = float(row[column].strip())
# Convert string column to integer / transforms classes 'mine' and 'rock' into 1 and 2
def str_column_to_int(dataset, column):
# extracts values of the classes of the dataset: array of all the mine, rock
class_values = [row[column] for row in dataset]
# it transforms array into a set of 2 strings, mine and rock
unique = set(class_values)
# create dictionary
lookup = dict()
# loops through the set and creates dictionary with key (value: rock or mine) and the value is a number
# destruct tuple by using enumerate with 2 variables i, value - useful if you want to get indexes from a list
# loops through the set / enumerate gives you a tuple with an index number and a value /common way to get indexes from a list
for i, value in enumerate(unique):
lookup[value] = i # the key of the dictonnary is the value: mine or rock; and the value is a number: 0 or 1
# loops through rows of dataset, replaces the name of class by number/index value
for row in dataset:
row[column] = lookup[row[column]]
# code returns lookup table
return lookup
# Split a dataset into k folds
def cross_validation_split(dataset, n_folds):
# creates list
dataset_split = list()
# copies the dataset (which is a list) using the list function
dataset_copy = list(dataset)
# size of the fold = length of dataset / amount of folds (ex. 5)
fold_size = int(len(dataset) / n_folds)
# loops through list of numbers from 0 up to n_fold
for i in range(n_folds):
fold = list()
# as long as length of list is inferior to defined fold size
while len(fold) < fold_size:
# generates random integer between 0 and total length of dataset and store it in index
index = randrange(len(dataset_copy))
# pop removes element from list and returns the value, adds value to fold
fold.append(dataset_copy.pop(index))
# adds fold to list of folds, each fold has different observations
dataset_split.append(fold)
return dataset_split #return the dataset_split, a list of folds
# Calculate accuracy percentage
def accuracy_metric(actual, predicted):
correct = 0
# loops through index list which has length of actual classes
for i in range(len(actual)):
# compares values on same positions in list of actual classes with list of predicted classes
if actual[i] == predicted[i]:
# adds one to correct variable to count number of correct guesses
correct += 1
# gives percentage by dividing correct guesses by length of actual classes divided by 100
return correct / float(len(actual)) * 100.0
# Evaluate an algorithm using a cross validation split
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
# split dataset in n folds
folds = cross_validation_split(dataset, n_folds)
# creates list called scores
scores = list()
for fold in folds:
# creates copy of dataset_split, list of lists
train_set = list(folds)
# removes 1 fold for testing
train_set.remove(fold)
# concatenates all lists of remaining folds into 1 list
train_set = sum(train_set, [])
# creates a list called test_set
test_set = list()
# iterates through test data
for row in fold:
# makes copy of the row
row_copy = list(row)
# appends copy to a new list called test set
test_set.append(row_copy)
# changes last element in the row, which is the class, to remove the number and set it to None
row_copy[-1] = None
# he specifies randomforest as an algorithm, which is also a function; this line is
# generating the model based on trainset, makes prediction of classes for testset
predicted = algorithm(train_set, test_set, *args)
# list comprehension: gives list of actual classes in fold
actual = [row[-1] for row in fold]
# compares actual classes to predicted classes to give idea of accuracy of function
accuracy = accuracy_metric(actual, predicted)
# creates a list of scores based on comparison
scores.append(accuracy)
return scores
# --------
# Split a dataset based on a feature and a feature value defined in build tree
# just trying many times, benefitting from speed of computer
def test_split(index, value, dataset):
left, right = list(), list()
for row in dataset:
# compares set value to all values in that column, if it is smaller, it goes to the left
# he goes for each value through all dataset again
if row[index] < value:
left.append(row)
# comparing the set value to itself, then it goes to the right
else:
right.append(row)
return left, right
# Calculate the Gini index for a split dataset, using left/right og test split as groups
# cfr calculating wealth distribution: https://en.wikipedia.org/wiki/Gini_coefficient
def gini_index(groups, classes):
# count all samples at split point (the dataset), converts it in a float in order to do divisions
n_instances = float(sum([len(group) for group in groups]))
# sum weighted Gini index for each group
gini = 0.0
for group in groups:
size = float(len(group))
# avoid divide by zero
if size == 0:
continue
score = 0.0
# score the group based on the score for each class
# count number of instances for current class in the group and divide by total size of the group
for class_val in classes:
# outcome lies always between 0 and 1
# for each row it takes the class value and counts how many times the set class value appears, divided by size of the group
p = [row[-1] for row in group].count(class_val) / size
# multiply makes it exponentially smaller; you amplify the badness of the score
score += p * p
# weight the group score by its relative size (size of group divided by total size of dataset)
gini += (1.0 - score) * (size / n_instances)
return gini
# Select the best split point for a dataset
def get_split(dataset, n_features):
# takes last element of each row (class) and returns it as a row, as it is a set, it has only 2 values
class_values = list(set(row[-1] for row in dataset))
# assigning values to variables
b_index, b_value, b_score, b_groups = 999, 999, 999, None
# creates list called features
features = list()
# as long as features list is not as long as square root of total dataset
while len(features) < n_features:
# creates number between 0 and nr of colums (- class)
index = randrange(len(dataset[0])-1)
# add column value if not present yet in features, creates only the index with name of the column
if index not in features:
features.append(index)
# for each column name in list features:
for index in features:
for row in dataset:
# take split point, loops through all the points, selecting 1 feature
groups = test_split(index, row[index], dataset)
# calculates how 'pure' the split is, whether it gives 2 groups that correspond to 2 classes
gini = gini_index(groups, class_values)
# the lower the score is, the better / keep the best score
# you keep reference to current best option
if gini < b_score:
b_index, b_value, b_score, b_groups = index, row[index], gini, groups
# returns a dictionary
return {'index':b_index, 'value':b_value, 'groups':b_groups, 'gini':b_score}
# Create a terminal node value = node at end of the tree = end leaf
def to_terminal(group):
# returns list of classes of group
outcomes = [row[-1] for row in group]
# selects most popular class; list of outcomes is reduced to 0 or 1; key counts the amount of times 0 or 1 occurs
# selects class based on calculating how many times the class occurs
return max(set(outcomes), key=outcomes.count)
# Counts the amount of unique values in a 'group' (rows in dataset)
def count_unique_values (group):
# Pick classes in the dataset, transform to a set
# count amount of values
return len(set([row[-1] for row in group]))
# Create child splits for a node or make terminals/end leafs
# recursive function, it calls itself
# node is dictionary returned by get_split (b_index, b_value, b_groups)
def split(node, max_depth, min_size, n_features, depth):
left, right = node['groups']
del(node['groups'])
# check for a no split
# if one of the groups is empty: left and right are both becoming a number with most popular class of the list
# decision is prediction whether sample is class 0 or 1
if not left or not right:
node['left'] = node['right'] = to_terminal(left + right)
return
# check for max depth / how many levels of nodes you want to have
if depth >= max_depth:
node['left'], node['right'] = to_terminal(left), to_terminal(right)
return
# process left child
# if length of left group is smaller or equal to 1
if len(left) <= min_size:
# it creates an end leaf
node['left'] = to_terminal(left)
else:
# Test here whether the group has only one class
# if so it can be a terminal
# it tries again to find best split for the reduced group that is at left of the tree at that moment
node['left'] = get_split(left, n_features)
split(node['left'], max_depth, min_size, n_features, depth+1)
# process right child
if len(right) <= min_size:
node['right'] = to_terminal(right)
elif count_unique_values(right) == 1:
# Test here whether the group has only one class
# if so it can be a terminal
node['right'] = right[0][-1]
else:
node['right'] = get_split(right, n_features)
# it tries again to find best split for the reduced group that is at right of the tree at that moment
split(node['right'], max_depth, min_size, n_features, depth+1)
# return no value because functions are working on the same dictionaries
# Build a decision tree
def build_tree(train, max_depth, min_size, n_features):
# root of decision tree is defined by dictionary of index, value, 2 groups (left/right of the split)
root = get_split(train, n_features)
split(root, max_depth, min_size, n_features, 1)
#root is a Node
# Node: {index: int, value: float, gini: float, left: Node|TerminalNode, right: Node|TerminalNode}
# TerminalNode: {index: int, value: float, gini: float, left: int(class), right: int (class)}
return root
# Make a prediction with a decision tree
# recursive function as well
def predict(node, row):
# node index = column feature, it looks up value for this feature for this row in dataset
# compare feature value of row you're checking with feature value of node
if row[node['index']] < node['value']:
# is it node?
if isinstance(node['left'], dict):
# recursive function at the left
return predict(node['left'], row)
else:
# creates final leaf at the left
return node['left']
else:
# is it node?
if isinstance(node['right'], dict):
# recursive function at the right
return predict(node['right'], row)
else:
# creates final leaf at the left
return node['right']
# Create a random subsample from the dataset with replacement, ratio is called sample_size further on
# This is called BOOTSTRAPPING: build new datasets from the original data, with the same number of rows
# with replacement: after selecting the row we put it back into the data, so it can be selected twice or more
def subsample(dataset, ratio):
sample = list()
# if it is smaller than 1, not all dataset is taken as sample - he uses the full dataset
n_sample = round(len(dataset) * ratio)
while len(sample) < n_sample:
index = randrange(len(dataset))
sample.append(dataset[index])
return sample
# Make a prediction with a list of bagged trees
def bagging_predict(trees, row):
# asks the forest to predict class for every row in the test data, this gives list of votes
predictions = [predict(tree, row) for tree in trees]
# it calculates amount of votes for each class, returns most popular class as prediction
return max(set(predictions), key=predictions.count)
# Random Forest Algorithm
def random_forest(train, test, max_depth, min_size, sample_size, n_trees, n_features):
trees = list()
for i in range(n_trees):
sample = subsample(train, sample_size)
# building a tree / root is dictionary with index, value, left/right)
tree = build_tree(sample, max_depth, min_size, n_features)
trees.append(tree)
with open('random_forest_model.json', 'w') as outfile:
json.dump(trees, outfile, indent = 6)
# prediction using one of the folds we separated in the beginning, forest votes on every row of test data
predictions = [bagging_predict(trees, row) for row in test]
# returns votes/predictions of the forest
return(predictions)
# --------------------------------------
# Test the random forest algorithm
seed(2)
# load and prepare data
# filename = 'sonar_csv.csv'
filename = 'iris_data.csv'
dataset = load_csv(filename)
#print(dataset)
# convert string attributes to integers
for i in range(0, len(dataset[0])-1):
str_column_to_float(dataset, i)
# convert class column to integers
#lookup = str_column_to_int(dataset, len(dataset[0])-1)
#print(lookup)
# evaluate algorithm
# specifies amounts of subsets for training (-1 for testing)
n_folds = 5
max_depth = 50
min_size = 1
sample_size = 1.0
# it specifies the size of the subset of features for the folds, where the size is close to the square root of the total number of features
n_features = int(sqrt(len(dataset[0])-1))
# it tries forest of 1 tree, 5 trees, 10 trees
for n_trees in [1, 5, 10]:
scores = evaluate_algorithm(dataset, random_forest, n_folds, max_depth, min_size, sample_size, n_trees, n_features)
print('Trees: %d' % n_trees)
print('Scores: %s' % scores)
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))
#### Note: you also need F1-score & confusion matrix!
#### https://towardsdatascience.com/metrics-to-evaluate-your-machine-learning-algorithm-f10ba6e38234
#### how to use this model now?
#### pickle trees
#### use: unpickle trees + bagging_predict with new data
with open('random_forest_model.json', 'r') as infile:
trees = json.load(infile)
prediction = bagging_predict(trees, dataset[23])
# this gives a number, you have to reorganise model to get back the string of the class
print(prediction)

@ -0,0 +1,26 @@
import textwrap
from string import ascii_uppercase
# Returns a function to generate node names with given prefix
def make_name_generator (prefix = '', length = 1):
wheels = [{ 'position': None, 'max': len(ascii_uppercase), 'values': list(ascii_uppercase)} for _ in range(length)]
def name_generator ():
for wheel in wheels:
if wheel['position'] is None:
wheel['position'] = 0
else:
wheel['position'] += 1
if wheel['position'] < wheel['max']:
break
else:
wheel['position'] = 0
return prefix + ''.join(reversed([wheel['values'][wheel['position']] for wheel in wheels]))
return name_generator
# Wrap text on labels
def wrapped (text, width=45, join='\n'):
return join.join(textwrap.wrap(text, width=width))

@ -0,0 +1,74 @@
from graph_utils import make_name_generator
from graphviz import Graph
# Visualizes a decision tree, from the random forest
# as generatered by the random_forest_model_altered.py
# Node: {index: int, value: float, gini: float, left: Node|TerminalNode, right: Node|TerminalNode}
# TerminalNode: {index: int, value: float, gini: float, left: int(class), right: int (class)}
# Creates a regular node in the graph and returns its name
# so other function can draw the edges
def make_regular_node (graph:Graph, generate_node_name:callable, node:dict):
node_name = generate_node_name()
graph.node(
node_name,
label="<Index:{index}<BR align=\"left\"/>Value: {value}<BR align=\"left\"/>Gini: {gini:.3f}<BR align=\"left\"/>>".format(**node),
shape='diamond')
return node_name
# Visualizes a TerminalNode from the decision tree
def make_terminal_node (graph: Graph, generate_node_name:callable, className:str):
node_name = generate_node_name()
graph.node(
node_name,
label=className,
shape='plaintext')
return node_name
def make_invisible_node (graph: Graph, generate_node_name:callable):
node_name = generate_node_name()
graph.node(node_name, label=node_name, style='invis')
return node_name
def visualize_node (graph, generate_node_name, node):
if isinstance(node, dict):
# Draw the node itselft
node_name = make_regular_node(graph, generate_node_name, node)
# Make left child/subtree and draw edge
left_child_name = visualize_node(graph, generate_node_name, node['left'])
graph.edge(node_name, left_child_name, tailport='nw', headport='s')
# Make center child and draw edge
center_node_name = make_invisible_node(graph, generate_node_name)
graph.edge(node_name, center_node_name, tailport='n', headport='s', style='invis')
# Make right child/subtree and draw edge
right_child_name = visualize_node(graph, generate_node_name, node['right'])
graph.edge(node_name, right_child_name, tailport='ne', headport='s')
else:
node_name = make_terminal_node(graph, generate_node_name, node)
return node_name
def make_graph (graphname):
graph = Graph(name=graphname, format='svg', engine='dot')
graph.attr('graph', splines='line', rankdir='BT')
return graph
def visualize (tree, graphname, generate_node_name = make_name_generator(length=3)):
graph = make_graph(graphname)
visualize_node(graph, generate_node_name, tree)
graph.render(graphname)
if __name__ == '__main__':
import json
with open('../random_forest_model.json', 'r') as file_in:
forest = json.load(file_in)
for idx, tree in enumerate(forest):
visualize(tree, 'random-tree-{}'.format(idx))
Loading…
Cancel
Save