机器学习聚类算法 python实现 冷不防 2024-04-01 18:53 24阅读 0赞 #### 聚类算法 #### * watermelon4.0.csv 西瓜数据集 * LVQ.py * K-means.py * GMM.py * AGNES.py(初始化30个不同颜色的簇) * AGNES.py ## watermelon4.0.csv 西瓜数据集 ## 1,0.697,0.460 2,0.774,0.376 3, 0.634,0.264 4,0.608,0.318 5,0.556,0.215 6,0.403,0.237 7,0.481,0.149 7,0.666,0.091 8,0.437,0.211 9,0.666,0.091 10,0.243,0.267 11,0.245,0.057 12,0.343,0.099 13,0.639,0.161 14,0.657,0.198 15,0.360,0.370 16,0.593,0.042 17,0.719,0.103 18,0.359,0.188 19,0.339,0.241 20,0.282,0.257 21,0.748,0.232 22,0.714,0.346 23,0.483,0.312 24,0.478,0.437 25,0.525,0.369 26,0.751,0.489 27,0.532,0.472 28,0.473,0.376 29,0.725,0.445 30,0.446,0.459 ## LVQ.py ## import matplotlib.pyplot as plt import numpy as np import pandas as pd def LVQ(X1, y1, pNum, learningRate=0.1): # 随机选择pNum个样本作为p向量 idx = np.random.choice(X1.shape[0], pNum) p = X1[idx, :] # 获取p的标签 py = y1[idx] # 画图用 fig, ax = plt.subplots(3, 3, figsize=(12, 12), sharex='all', sharey='all') # 解决中文显示问题 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 初始化原型向量图 ax[0, 0].scatter(X1[:, 0], X1[:, 1], c=y1) ax[0, 0].scatter(p[:, 0], p[:, 1], marker='x', color='red', s=100) ax[0, 0].set_title("初始化原型向量") ax[0, 0].set_xlim(xlim) ax[0, 0].set_ylim(ylim) j = 0 for i in range(2001): # 随机选择一个样本xi idx = np.random.choice(X1.shape[0], 1) xi = X1[idx, :] # 计算xi到各个p向量的距离 dist = np.sqrt(np.sum(np.square(xi - p), axis=1)) # 找到最小距离p向量的索引 minIdx = np.argmin(dist) # 如果xi的标签与该向量的标签相等,则靠近,不然就原理 if y1[idx] == py[minIdx]: p[minIdx] = p[minIdx] + learningRate * (xi - p[minIdx]) else: p[minIdx] = p[minIdx] - learningRate * (xi - p[minIdx]) # 每循环500次画图 if (i > 0) and (i in [20, 50, 100, 200, 500, 1000, 1500, 2000]): j += 1 clusters = [] # 对于样本里的每一个x,找到它属于哪个类 for x in X1: dist = np.sqrt(np.sum(np.square(x - p), axis=1)) label = np.argmin(dist) clusters.append(label) if j < 3: k = 0 elif j < 6: k = 1 else: k = 2 if not ((k == 0) and ((j % 3) == 0)): ax[k, j % 3].scatter(X[:, 0], X[:, 1], c=clusters) ax[k, j % 3].scatter(p[:, 0], p[:, 1], marker='x', color='red', s=100) ax[k, j % 3].set_title("迭代次数: %d" % i) ax[k, j % 3].set_xlim(xlim) ax[k, j % 3].set_ylim(ylim) if __name__ == "__main__": data = pd.read_csv('watermelon4.0.csv', header=None) data['y'] = np.zeros((data.shape[0], 1), dtype=int) data.iloc[9:22, 3] = 1 X = data.iloc[:, 1:3].values y = data.iloc[:, 3].values plt.scatter(X[:, 0], X[:, 1], c=y) xlim = (plt.axis()[0], plt.axis()[1]) ylim = (plt.axis()[2], plt.axis()[3]) LVQ(X, y, 5) plt.show() ## K-means.py ## import numpy as np import pandas as pd import matplotlib.pyplot as plt # 解决中文显示问题 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 功能: 计算样本与聚类中心的距离, 返回离簇中心最近的类别 # params: sample: 单个数据样本, centers: k个簇中心 # return: 返回的是当前的样本数据属于那一个簇中心的id或者索引 def distance(sample1, centers1): # 这里使用欧氏距离计算公式 dist = np.sqrt(np.sum(np.square(sample1 - centers1), axis=1)) minIdx = np.argmin(dist) return minIdx # 功能: 对当前的分类集进行可视化展示 def clusters_show(clusters, center, step): color = ["g", "b", "y"] plt.figure(figsize=(8, 8)) plt.title("迭代次数: {}".format(step)) plt.xlabel("密度", loc="center") plt.ylabel("糖含量", loc="center") # 用颜色区分k个簇的数据样本 for i, cluster in enumerate(clusters): cluster = np.array(cluster) plt.scatter(center[:, 0], center[:, 1], marker='x', color='red', s=100) plt.scatter(cluster[:, 0], cluster[:, 1], c=color[i], marker='.', s=150) # 功能: 根据输入的样本集与划分的簇数,分别返回k个簇样本 # params: samples:样本集, k:聚类簇数 # return:返回是每个簇的簇类中心 def k_means(samples, k): data_number = len(samples) centers_flag = np.zeros((k,)) # 随机在数据中选择k个聚类中心 center = samples[np.random.choice(data_number, k, replace=False)] plt.title("初始化原型向量") plt.xlabel("密度", loc="center") plt.ylabel("糖含量", loc="center") plt.scatter(center[:, 0], center[:, 1], marker='x', color='red', s=100) plt.scatter(samples[:, 0], samples[:, 1], c='black') step = 0 while True: # 计算每个样本距离簇中心的距离, 然后分到距离最短的簇中心中 clusters = [[] for i in range(k)] for sample1 in samples: ci = distance(sample1, center) clusters[ci].append(sample1) # 可视化当前的聚类结构 clusters_show(clusters, center, step) # 分完簇之后更新每个簇的中心点, 得到了簇中心继续进行下一步的聚类 for i, sub_clusters in enumerate(clusters): new_center = np.array(sub_clusters).mean(axis=0) # 如果数值有变化则更新, 如果没有变化则设置标志位为1,当所有的标志位为1则退出循环 if (center[i] != new_center).all(): center[i] = new_center else: centers_flag[i] = 1 step += 1 if centers_flag.all(): break return center # 功能: 根据簇类中心对簇进行分类,获取最后的分类结果 # params: samples是全部的数据样本,centers是聚类好的簇中心 # return: 返回的是子数组 def split_data(samples, centers1): # 根据中心样本得知簇数 k = len(centers1) clusters = [[] for i in range(k)] for samples in samples: ci = distance(samples, centers1) clusters[ci].append(samples) return clusters if __name__ == '__main__': # 功能: 设置随机种子, 确保结果可复现 np.random.seed(5) data = pd.read_csv('watermelon4.0.csv', header=None) sample = data.iloc[:, 1:3].values centers = k_means(sample, 3) plt.show() ## GMM.py ## import numpy as np from matplotlib import pyplot as plt def createDataSet(): # Watermelon Dataset 4.0 data = np.array([ [0.697, 0.460], [0.774, 0.376], [0.634, 0.264], [0.608, 0.318], [0.556, 0.215], [0.403, 0.237], [0.481, 0.149], [0.437, 0.211], [0.666, 0.091], [0.243, 0.267], [0.245, 0.057], [0.343, 0.099], [0.639, 0.161], [0.657, 0.198], [0.360, 0.370], [0.593, 0.042], [0.719, 0.103], [0.359, 0.188], [0.339, 0.241], [0.282, 0.257], [0.748, 0.232], [0.714, 0.346], [0.483, 0.312], [0.478, 0.437], [0.525, 0.369], [0.751, 0.489], [0.532, 0.472], [0.473, 0.376], [0.725, 0.445], [0.446, 0.459]]) return data # 计算高斯多元分布,返回多元高斯概率,对应公式(9.28) def multiGaussian(x, n_clusters, miu, sigma): # pow(arg1, arg2): arg1的arg2次方 left = 1 / (pow(2 * np.pi, n_clusters / 2) * pow(np.linalg.det(sigma), 0.5)) right = np.exp((-0.5) * (x - miu).dot(np.linalg.pinv(sigma)).dot(x - miu).T) return left * right # 计算gamma, 即所有样本x在所有cluster上的高斯混合 # 对应公式(9.30), P208最后一行另后验概率为gamma def computeGamma(X, miu, sigma, alpha, multiGaussian): n_samples = X.shape[0] # 样本数 n_clusters = len(alpha) # 簇的个数 # gamma 为 n_samples 行, n_clusters 列的矩阵 gamma = np.zeros((n_samples, n_clusters)) # 存最后的结果,每个 x 对应 n_clusters 个 gamma, 即在各个簇中的概率 p = np.zeros(n_clusters) # 多元高斯概率,一个 x 有 n_clusters 个不同分布上的概率 g = np.zeros(n_clusters) # alpha * 多元高斯概率 for i in range(n_samples): # 对每个样本 x for j in range(n_clusters): # 对每个聚类 p[j] = multiGaussian(X[i], n_clusters, miu[j], sigma[j]) # 计算每个样本的高斯多元分布 g[j] = alpha[j] * p[j] for k in range(n_clusters): # 样本 x, 对每个聚类算出一个 gamma gamma[i, k] = g[k] / np.sum(g) # 更新第 i 行, 第 k 列的值 return gamma class GMM(): def __init__(self, n_clusters, iter=50): self.n_clusters = n_clusters self.iter = iter self.miu = 0 self.sigma = 0 self.alpha = 0 def fit(self, data): n_samples = data.shape[0] n_features = data.shape[1] # 初始化alpha, miu, sigma alpha = np.ones(self.n_clusters) / self.n_clusters # [1, 1, 1]/3 -> [0.33333333, 0.33333333, 0.33333333] miu = np.array([[.403, .237], [.714, .346], [.532, .472]]) sigma = np.full((self.n_clusters, n_features, n_features), np.diag(np.full(n_features, 0.1))) # EM算法, 迭代更新 alpha for i in range(self.iter): gamma = computeGamma(data, miu, sigma, alpha, multiGaussian) alpha = np.sum(gamma, axis=0) / n_samples # 纵向求和再除以样本数,即在每个聚类中,概率的平均值 # 更新 miu for j in range(self.n_clusters): miu[j] = np.sum(data * gamma[:, j].reshape((n_samples, 1)), axis=0) / np.sum(gamma, axis=0)[j] sigma[j] = 0 # 更新 sigma for k in range(n_samples): sigma[j] += (data[k].reshape((1, n_features)) - miu[j]).T.dot( (data[k] - miu[j]).reshape((1, n_features))) * gamma[k, j] sigma[j] = sigma[j] / np.sum(gamma, axis=0)[j] self.miu = miu self.sigma = sigma # 获得最优的参数 self.alpha = alpha def predict(self, data): pred = computeGamma(data, self.miu, self.sigma, self.alpha, multiGaussian) cluster_results = np.argmax(pred, axis=1) return cluster_results if __name__ == '__main__': data = createDataSet() model = GMM(3, iter=100) # clusters = 3, iter 默认为50 model.fit(data) result = model.predict(data) plt.scatter(data[:, 0], data[:, 1], c=result) plt.scatter(model.miu[:, 0], model.miu[:, 1], marker='x', color='red') plt.show() ## AGNES.py(初始化30个不同颜色的簇) ## # -*- coding:utf-8 -*- import colorsys import random import pandas as pd import pylab as pl def get_n_hls_colors(num): hls_colors = [] i = 0 step = 360.0 / num while i < 360: h = i s = 90 + random.random() * 10 l = 50 + random.random() * 10 _hlsc = [h / 360.0, l / 100.0, s / 100.0] hls_colors.append(_hlsc) i += step return hls_colors def ncolors(num): rgb_colors = [] if num < 1: return rgb_colors hls_colors = get_n_hls_colors(num) for hlsc in hls_colors: _r, _g, _b = colorsys.hls_to_rgb(hlsc[0], hlsc[1], hlsc[2]) r, g, b = [int(x * 255.0) for x in (_r, _g, _b)] rgb_colors.append([r, g, b]) return rgb_colors def color(value): digit = list(map(str, range(10))) + list("ABCDEF") if isinstance(value, tuple): string = '#' for i in value: a1 = i // 16 a2 = i % 16 string += digit[a1] + digit[a2] return string elif isinstance(value, str): a1 = digit.index(value[1]) * 16 + digit.index(value[2]) a2 = digit.index(value[3]) * 16 + digit.index(value[4]) a3 = digit.index(value[5]) * 16 + digit.index(value[6]) return (a1, a2, a3) # 计算欧几里得距离,a,b分别为两个元组 def dist(a, b): return math.sqrt(math.pow(a[0] - b[0], 2) + math.pow(a[1] - b[1], 2)) # dist_min def dist_min(Ci, Cj): return min(dist(i, j) for i in Ci for j in Cj) # dist_max def dist_max(Ci, Cj): return max(dist(i, j) for i in Ci for j in Cj) # dist_avg def dist_avg(Ci, Cj): return sum(dist(i, j) for i in Ci for j in Cj) / (len(Ci) * len(Cj)) # 找到距离最小的下标 def find_Min(M): min = 1000 x = 0 y = 0 for i in range(len(M)): for j in range(len(M[i])): if i != j and M[i][j] < min: min = M[i][j] x = i y = j return (x, y, min) # 算法模型: def AGNES(dataset, dist, k): # 初始化C和M C = [] M = [] for i in dataset: Ci = [] Ci.append(i) C.append(Ci) for i in C: Mi = [] for j in C: Mi.append(dist(i, j)) M.append(Mi) q = len(dataset) # 合并更新 while q > k: x, y, min = find_Min(M) C[x].extend(C[y]) C.remove(C[y]) M = [] for i in C: Mi = [] for j in C: Mi.append(dist(i, j)) M.append(Mi) q -= 1 return C def c11(): import random L1 = random.sample(range(1, 255), 15) L2 = random.sample(range(1, 255), 25) L3 = random.sample(range(1, 255), 17) d = [] for i in L1: for j in L3: for k in L2: d.append((k, j, i)) return d def color1(value): digit = list(map(str, range(10))) + list("ABCDEF") if isinstance(value, tuple): string = '#' for i in value: a1 = i // 16 a2 = i % 16 string += digit[a1] + digit[a2] return string elif isinstance(value, str): a1 = digit.index(value[1]) * 16 + digit.index(value[2]) a2 = digit.index(value[3]) * 16 + digit.index(value[4]) a3 = digit.index(value[5]) * 16 + digit.index(value[6]) return (a1, a2, a3) # 画图 def draw(C, c2): colValue = ['r', 'y', 'g', 'b', 'c', 'k', 'm'] for i in range(len(C)): coo_X = [] # x坐标列表 coo_Y = [] # y坐标列表 for j in range(len(C[i])): coo_X.append(C[i][j][0]) coo_Y.append(C[i][j][1]) pl.rcParams['font.sans-serif'] = ['SimHei'] pl.rcParams['axes.unicode_minus'] = False # print(color1(c[i])) set_lst = set(c2) # set会生成一个元素无序且不重复的可迭代对象,也就是我们常说的去重 if len(set_lst) == len(c2): print('列表里的元素互不重复!') else: print('列表里有重复的元素!') # print(c2) # print(set_lst) pl.scatter(coo_X, coo_Y, marker='x', color=c2[i], label=i) pl.title("迭代次数:") pl.show() # https://blog.csdn.net/qq_16564093/article/details/80698479 计算两个rgb颜色的相似度 def ColourDistance(rgb_1, rgb_2): R_1, G_1, B_1 = rgb_1 R_2, G_2, B_2 = rgb_2 rmean = (R_1 + R_2) / 2 R = R_1 - R_2 G = G_1 - G_2 B = B_1 - B_2 # print("R=",R,"G=",G,"B=",B) distance = math.sqrt((2 + rmean / 256) * (R ** 2) + 4 * (G ** 2) + (2 + (255 - rmean) / 256) * (B ** 2)) # print("ColourDistance=",distance) return distance # https://blog.51cto.com/alun51cto/2424785 import math def colorSimilarity(rgb1, rgb2): r1, g1, b1 = rgb1 r2, g2, b2 = rgb2 r3 = (r1 - r2) / 256 g3 = (g1 - g2) / 256 b3 = (b1 - b2) / 256 diff = math.sqrt(r3 * r3 + g3 * g3 + b3 * b3) # print("diff=",diff) return diff import time def createRGB(): colors = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'A', 'B', 'C', 'D', 'E', 'F'] np.random.seed(int(time.time())) # https://www.jb51.net/article/255793.htm cs1 = colors[np.random.randint(0, 16)] cs2 = colors[np.random.randint(0, 16)] # https://m.php.cn/article/471335.html cs1 = int(cs1, 16) cs2 = int(cs2, 16) r1 = cs1 * 16 + cs2 cs3 = colors[np.random.randint(0, 16)] cs4 = colors[np.random.randint(0, 16)] # color2 = int(cs2,16) cs3 = int(cs3, 16) cs4 = int(cs4, 16) b1 = cs3 * 16 + cs4 cs5 = colors[np.random.randint(0, 16)] cs6 = colors[np.random.randint(0, 16)] # color3 = int(cs3,16) cs5 = int(cs5, 16) cs6 = int(cs6, 16) g1 = cs5 * 16 + cs6 rgb = [r1, b1, g1] # print("rgb=",rgb) return rgb def rgbOctToHex(rgbOct): rgbHex = "#" colors = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'] for i in rgbOct: cs = hex(i)[2:] if len(cs) == 1: rgbHex += '0' rgbHex += cs return rgbHex def createColors(num): i = 0 d = [] while i < num: rgb_1 = createRGB() if len(d) == 0: d.append(rgb_1) i += 1 else: good = 1 for rgb in d: diff = ColourDistance(rgb_1, rgb) if diff < 80: good = 0 break if good == 1: print("diff=", diff) i += 1 d.append(rgb_1) # https://blog.csdn.net/bolixi7800/article/details/100954774 print("d=", d) rd = [] for rgb in d: rd.append(str(rgbOctToHex(rgb))) return rd import numpy as np if __name__ == '__main__': # 功能: 设置随机种子, 确保结果可复现 # np.random.seed(5) data = pd.read_csv('watermelon4.0.csv', header=None) sample = data.iloc[:, 1:3].values # 数据处理 dataset是30个样本(密度,含糖量)的列表 dataset = [tuple(i) for i in sample] C = AGNES(dataset, dist_min, 30) c2 = c11() d = createColors(30) print(d) draw(C, d) ## AGNES.py ## # -*- coding:utf-8 -*- import math import numpy as np import pandas as pd import pylab as pl # 计算欧几里得距离,a,b分别为两个元组 def dist(a, b): return math.sqrt(math.pow(a[0] - b[0], 2) + math.pow(a[1] - b[1], 2)) # dist_min def dist_min(Ci, Cj): return min(dist(i, j) for i in Ci for j in Cj) # dist_max def dist_max(Ci, Cj): return max(dist(i, j) for i in Ci for j in Cj) # dist_avg def dist_avg(Ci, Cj): return sum(dist(i, j) for i in Ci for j in Cj) / (len(Ci) * len(Cj)) # 找到距离最小的下标 def find_Min(M): min = 1000 x = 0 y = 0 for i in range(len(M)): for j in range(len(M[i])): if i != j and M[i][j] < min: min = M[i][j] x = i y = j return (x, y, min) # 算法模型: def AGNES(dataset, dist, k): # 初始化C和M C = [] M = [] for i in dataset: Ci = [] Ci.append(i) C.append(Ci) for i in C: Mi = [] for j in C: Mi.append(dist(i, j)) M.append(Mi) q = len(dataset) # 合并更新 while q > k: x, y, min = find_Min(M) C[x].extend(C[y]) C.remove(C[y]) M = [] for i in C: Mi = [] for j in C: Mi.append(dist(i, j)) M.append(Mi) q -= 1 return C # 画图 def draw(C): colValue = ['r', 'y', 'g', 'b', 'c', 'k', 'm', 'peru'] for i in range(len(C)): coo_X = [] # x坐标列表 coo_Y = [] # y坐标列表 for j in range(len(C[i])): coo_X.append(C[i][j][0]) coo_Y.append(C[i][j][1]) pl.rcParams['font.sans-serif'] = ['SimHei'] pl.rcParams['axes.unicode_minus'] = False pl.scatter(coo_X, coo_Y, marker='x', color=colValue[i % len(colValue)], label=i) pl.title("fig4:聚类簇数k=5") pl.show() if __name__ == '__main__': # 功能: 设置随机种子, 确保结果可复现 np.random.seed(5) data = pd.read_csv('watermelon4.0.csv', header=None) sample = data.iloc[:, 1:3].values # 数据处理 dataset是30个样本(密度,含糖量)的列表 dataset = [tuple(i) for i in sample] C = AGNES(dataset, dist_min, 5) draw(C)
还没有评论,来说两句吧...