山东大学机器学习实验lab9 决策树
- 所有公式来源于
<<机器学习>>周志华
- github上有
.ipynb
源文件
修改:
- 2024 5.15 添加了一些Node属性,用于标记每个Node使用的划分feature的名称,修改后的版本见 github
Node
类
- 构造函数 初始化决策树的节点,包含节点ID、特征数据、特征ID、划分阈值、标签、父节点ID、子节点列表以及节点所属的类别
set_sons
设置节点的子节点列表judge_stop
判断是否停止继续生成节点条件包括- 所有样本属于同一类别
- 所有样本在所有属性上取值相同
- 节点对应的样本集合为空
DecisionTree
类
- 构造函数 初始化决策树,包括根节点列表、节点ID计数器
ent
计算信息熵,衡量数据集纯度gain_single_feature
对于连续属性,寻找最佳划分点以最大化信息增益gain_rate
计算所有特征的信息增益率,选择信息增益率最高的特征作为分裂依据generate_node
递归生成子节点,根据特征的最佳分裂点划分数据集,直到满足停止条件train
从根节点开始构建决策树predict
给定数据,根据决策树进行分类预测
Data
类
- 构造函数 加载数据集,初始化特征和标签
get_k_exmaine
实现K折交叉验证的数据切分,返回K个特征和标签组
整体流程
- 数据预处理:使用
Data
类读取数据文件,进行K折交叉验证的数据切分 - 模型训练与评估:对每一份测试数据,结合其余K-1份数据进行训练,使用
DecisionTree
类构建决策树模型,然后对测试集进行预测,计算正确率 - 结果展示:收集每一次交叉验证的正确率,最后计算并输出平均正确率
代码执行流程
- 首先,
Data
类加载数据并进行K折交叉验证数据分割 - 接着,对于每个验证折,训练数据被合并以训练决策树模型,然后在对应的测试数据上进行预测
- 对每个测试样本,通过遍历决策树找到其所属类别,并与实际标签对比,累计正确预测的数量
- 计算并打印每次验证的正确率,最后计算并输出所有折的平均正确率,以此评估模型的泛化能力
代码以及运行结果展示
import numpy as np
import matplotlib.pyplot as plt
import math
class Node():def __init__(self,id_,features_,feature_id_,divide_,labels_,father_,sons_=[]):self.divide=divide_self.feature_id=feature_id_self.id=id_self.feature=features_self.labels=labels_ self.father=father_ self.sons=sons_self.clas='None'def set_sons(self,sons_):self.sons=sons_ def judge_stop(self):labels=np.unique(self.labels)#如果节点样本属于同一类别if(labels.shape[0]==1):self.clas=labels[0]return Truefeatures=np.unique(self.feature)#如果所有样本在所有属性上取值相同if(features.shape[0]==1):unique_values, counts = np.unique(labels, return_counts=True)self.clas = unique_values[counts.argmax()]return True #如果对应的样本集合为空if(self.feature.shape[0]==0 or self.feature.shape[1]==0):self.clas=1return Truereturn False
class DecisionTree():def __init__(self):self.tree=[]self.id=0pass #计算信息熵def ent(self,labels):labels_s=list(set([labels[i,0] for i in range(labels.shape[0])]))ans=0for label in labels_s:num=np.sum(labels==label)p=num/labels.shape[0]ans-=p*math.log(p,2)return ans #计算一个标签对应的最佳分界(连续值)def gain_single_feature(self,feature,labels):origin_ent=self.ent(labels)divide_edge=[]feature=list(set(feature))feature=np.sort(feature)divide_edge=[(feature[i]+feature[i+1])/2.0 for i in range(feature.shape[0]-1)]best_ent=0best_divide=0l1=l2=np.array([[]])for condition in divide_edge:labels1=np.array([labels[i] for i in range(feature.shape[0]) if feature[i]<=condition])labels2=np.array([labels[i] for i in range(feature.shape[0]) if feature[i]>condition])ent1=self.ent(labels1)ent2=self.ent(labels2)ans=origin_ent-((labels1.shape[0]/labels.shape[0])*ent1+(labels2.shape[0]/labels.shape[0])*ent2)if(ans>=best_ent):best_divide=conditionl1=labels1l2=labels2best_ent=ans return best_divide,l1,l2,best_ent#计算信息增益def gain_rate(self,features,labels):origin_ent=self.ent(labels)gain_rate=0feature_id=-1divide=-1l=labels.shape[0]for id in range(features.shape[1]):divide1,labels1,labels2,th_gain=self.gain_single_feature(features[:,id],labels)l1=labels1.shape[0]l2=labels2.shape[0]iv=-1*((l1/l)*math.log(l1/l,2)+(l2/l)*math.log(l2/l,2))if iv!=0:rate=th_gain/ivelse:rate=0if(rate>=gain_rate):gain_rate=ratedivide=divide1feature_id=idreturn feature_id,dividedef generate_node(self,node:Node):a=1features1_id=np.array([i for i in range(node.feature.shape[0]) if node.feature[i,node.feature_id]>=node.divide])features2_id=np.array([i for i in range(node.feature.shape[0]) if node.feature[i,node.feature_id]<node.divide])features1=node.feature[features1_id]features2=node.feature[features2_id]labels1=node.labels[features1_id]labels2=node.labels[features2_id]features1=np.delete(features1,node.feature_id,axis=1)features2=np.delete(features2,node.feature_id,axis=1)features_id1,divide1=self.gain_rate(features1,labels1)features_id2,divide2=self.gain_rate(features2,labels2)tmp=0if(features_id1!=-1):tmp+=1node1=Node(self.id+tmp,features1,features_id1,divide1,labels1,node.id,[])node1.father=node.idself.tree.append(node1)node.sons.append(self.id+tmp)if(features_id2!=-1):tmp+=1node2=Node(self.id+tmp,features2,features_id2,divide2,labels2,node.id,[])node2.father=node.idself.tree.append(node2)node.sons.append(self.id+tmp)self.id+=tmpif(tmp==0):unique_values, counts = np.unique(node.labels, return_counts=True)node.clas = 0 if counts[0]>counts[1] else 1return for n in [self.tree[i] for i in node.sons]:if(n.judge_stop()):continueelse:self.generate_node(n)def train(self,features,labels):feature_id,divide=self.gain_rate(features,labels)root=Node(0,features,feature_id,divide,labels,-1,[])self.tree.append(root)self.generate_node(root)def predict(self,features):re=[]for feature in features:node=self.tree[0]while(node.clas=='None'):th_feature=feature[node.feature_id]feature=np.delete(feature,node.feature_id,axis=0)th_divide=node.divideif(node.clas!='None'):break if(th_feature<th_divide):node=self.tree[node.sons[len(node.sons)-1]]else:node=self.tree[node.sons[0]]re.append(node.clas)return re
class Data():def __init__(self):self.data=np.loadtxt('/home/wangxv/Files/hw/ml/lab9/data/ex6Data/ex6Data.txt',delimiter=',')self.data_num=self.data.shape[0]self.features=self.data[:,:-1]self.labels=self.data[:,-1:]def get_k_exmaine(self,k:int):num=int(self.data_num/k)data=self.datanp.random.shuffle(self.data)features=data[:,:-1]labels=self.data[:,-1:]feature_groups=[features[i:i+num-1] for i in np.linspace(0,self.data_num,k+1,dtype=int)[:-1]]labels_groups=[labels[i:i+num-1] for i in np.linspace(0,self.data_num,k+1,dtype=int)[:-1]]return feature_groups,labels_groups
data=Data()
feature_groups,label_groups=data.get_k_exmaine(10)
rate_set=[]
for ind in range(10):dt=DecisionTree()features_=[feature_groups[i] for i in range(10) if i!=ind]labels_=[label_groups[i] for i in range(10) if i!=ind]train_features=features_[0]train_labels=labels_[0]for feature,label in zip(features_[1:],labels_[1:]):train_features=np.vstack((train_features,feature))train_labels=np.vstack((train_labels,label))test_features=feature_groups[ind]test_labels=label_groups[ind]dt.train(train_features,train_labels)pred_re=dt.predict(test_features)right_num=0for i in range(len(pred_re)): if pred_re[i]==test_labels[i][0]:right_num+=1right_rate=right_num/len(pred_re)print(str(ind+1)+' correct_rate : '+str(right_rate))rate_set.append(right_rate)
print("average_rate : "+str(np.mean(np.array(rate_set))))
1 correct_rate : 0.7930327868852459
2 correct_rate : 0.8032786885245902
3 correct_rate : 0.8012295081967213
4 correct_rate : 0.7848360655737705
5 correct_rate : 0.7889344262295082
6 correct_rate : 0.7909836065573771
7 correct_rate : 0.7827868852459017
8 correct_rate : 0.7766393442622951
9 correct_rate : 0.8012295081967213
10 correct_rate : 0.7725409836065574
average_rate : 0.7895491803278689
最终得到的平均正确率为0.7895>0.78符合实验书要求
可视化
- 需要将自定义的
Node
类转换为可识别的数据格式
from graphviz import Digraphdef show_tree(root_node, dot=None):if dot is None:dot = Digraph(comment='decision_tree')node_label = f"{root_node.id} [label=\"feature {root_node.feature_id}: {root_node.divide} | class: {root_node.clas}\"]"dot.node(str(root_node.id), node_label)if root_node.sons:for son_id in root_node.sons:dot.edge(str(root_node.id), str(son_id))visualize_tree(dt.tree[son_id], dot)return dot
root_node=dt.tree[0]
dot = show_tree(root_node)
dot.render('decision_tree', view=True,format='png')
from PIL import Image
Image.open('./decision_tree.png')
- 这个决策树图有点过于大了,凑合看吧