一、实验目的
机器学习和数据挖掘算法是大数据分析处理领域的重要内容,随着数据规模的不断扩大,设计面向大数据处理的并行化机器学习和数据挖掘算法越来越有必要。通过对并行化数据挖掘算法的实现,掌握并行化处理问题的分析方法和编程思想方法,能够根据实际情况定制并行化的算法解决问题。
二、实验平台
1)操作系统:Linux(实验室版本为 Ubuntu17.04);
2)Hadoop 版本:2.9.0;
3)JDK 版本:1.8;
4)Java IDE:Eclipse 3.8;
5 ) Spark 版本:2.1.0。
三、实验内容
自行准备数据集,设计一种数据挖掘算法(聚类、分类、频繁项集挖掘或其他主题)对数据集进行信息提取,要求分别使用并行化和非并行化的方式实现该
算法。实验环境可选择 Hadoop 或者 Spark,程序语言可选用 Java、Python、Scala 等,在伪分布式环境下完成并行化算法的编写和测试,在单机环境下完成非并行化算法的编写和测试。
四、实验要求
自行对比并行化和非并行化实现方法的数据挖掘结果,两种结果需完全一致。

非并行(使用python实现):

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import copy

data = pd.read_csv('data.csv',names=['x','y'])
data=data.values
print(data.shape)
x = data[:,0]
y = data[:,1]
plt.subplot(5,5,1)
plt.scatter(x,y)

def distance(data,centers):
    dist=np.zeros((data.shape[0],centers.shape[0]))#每个点相对4个质心的距离
    for i in range(len(data)):
        for j in range(len(centers)):
            dist[i,j]=np.sqrt(np.sum((data[i]-centers[j])**2))
    return dist

def near_center(data,centers):#判断点到哪个质心最近
    dist = distance(data, centers)
    near_cen = np.argmin(dist, 1)
    return near_cen

def kmeans(data, k):
    centers = copy.deepcopy(data[:4,:])#取4个点为初始质心    
    print(centers)

    for i in range(20): #迭代20次
        near_cen = near_center(data, centers)
        plt.subplot(5,5,2+i)
        plt.scatter(x, y, c=near_cen)
        plt.scatter(centers[:, 0], centers[:, 1], marker='*', s=50, c='r')
        for ci in range(k):
            centers[ci]=np.mean(data[near_cen==ci],axis=0)
            
    return centers, near_cen

centers, near_cen = kmeans(data, 4)
print(near_cen)

plt.show()

并行(使用pyspark实现):

import math
import matplotlib.pyplot as plt
from pyspark import SparkContext

def closestCluster(p, centers):#计算一个点到各个质心的距离,返回距离最近的质心的ID
    closest_cluster_id = 0
    closest_dist = float("+inf")
    for i in range(len(centers)): #计算一个数据节点对于每一个质心的距离
        temp_dist=math.sqrt(math.pow((p[0]-centers[i][0]),2)+math.pow((p[1]-centers[i][1]),2))
        if temp_dist < closest_dist:#函数,当质心更新后小于阈值,返回质心坐标。
            closest_dist = temp_dist
            closest_cluster_id = i
    return closest_cluster_id

#主程序
if __name__ == "__main__":
    sc=SparkContext(appName='pr')
    file=sc.textFile("file:///home/hadoop/Documents/lab4/data.csv")#读取数据
    data_rdd=file.map(lambda x:(int(x.split(',')[0]),int(x.split(',')[1]))).cache()#缓存,因为该RDD在后面的迭代计算中会反复用到
    k=4
    converge_dist = float(0.01)#设置停止迭代时,新旧质心间的距离之和的上限
    center_points_list = data_rdd.takeSample(False, k)#随机选择2个质心,参数False为取出样本后不放回     
    temp_dist=converge_dist+1#新旧质心间距离的和,初始设置为大于converge_dist    
    
    while temp_dist > converge_dist:#开始迭代,当temp_dist小于等于converge_dist时,停止迭代        
        closest_rdd = data_rdd.map( lambda p: (closestCluster(p, center_points_list), (p, 1)))##通过map操作计算每个点距离最近的聚类中心点,返回的是一个rdd,rdd中的元素为(id, (p,1)) 1便于后面统计归属于质心的点的个数
        point_stats_rdd = closest_rdd.reduceByKey( lambda p1_c1, p2_c2: ((p1_c1[0][0] + p2_c2[0][0],p1_c1[0][1] + p2_c2[0][1]), p1_c1[1] +p2_c2[1]))#坐标求和,个数        
        new_center_points_list = point_stats_rdd.map( lambda st:(st[0], (st[1][0][0]/ st[1][1],st[1][0][1]/ st[1][1]))).collect()#计算各个新质心,也就是将每个聚类坐标求和的结果,除以每个聚类点的个数
        #新旧质心间的距离之和      
        temp_dist=0        
        for (cluster_key, new_center_point) in new_center_points_list:        
            x_2=math.pow((new_center_point[0]-center_points_list[cluster_key][0]),2)
            y_2=math.pow((new_center_point[1]-center_points_list[cluster_key][1]),2)        
            dist=math.sqrt(x_2+y_2)        
            temp_dist+=dist
                    
        #利用新得到的聚类中心的坐标更新各个质心      
        for (cluster_key, new_center_point) in new_center_points_list:        
            center_points_list[cluster_key] = new_center_point         
        
        print(temp_dist,converge_dist)
        
    cluster_result_list=closest_rdd.collect()
    sc.stop()
    
    plt.figure()
    color=['purple','g','b','y']
    for item in cluster_result_list:  
        print(item[1][0],item[0])#输出各个点以及他们所属的聚类
        plt.scatter(item[1][0][0],item[1][0][1],c=color[item[0]])
    
    print("Final centers:"+str(center_points_list))#输出得到的质心坐标 
    for item in center_points_list:
        plt.scatter(item[0],item[1],marker='*',s=50,c='r')
        
    plt.show()

data.csv文件内容(用python随机生成的100个二维点坐标):

18,6
7,15
12,5
18,2
14,0
1,4
11,13
14,15
12,12
5,18
6,10
0,12
7,5
17,18
4,6
8,0
18,9
19,8
6,16
14,19
2,13
7,17
16,18
10,16
13,1
36,25
39,40
40,28
36,38
30,34
33,26
43,37
42,35
30,35
36,25
41,36
32,42
32,25
29,30
44,39
37,42
31,31
26,29
43,43
42,36
35,38
40,30
39,41
32,37
39,32
38,15
36,1
36,5
37,4
31,16
43,18
34,7
34,6
25,12
29,14
39,8
28,3
40,15
39,13
42,1
31,14
36,11
42,12
31,6
40,6
34,16
25,7
36,6
44,18
39,7
17,27
3,37
18,41
15,43
13,27
1,38
14,33
7,41
11,29
9,37
7,36
14,41
16,29
18,40
11,39
18,42
13,37
1,44
11,33
6,40
18,42
16,36
2,37
14,28
7,38
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐