四时宝库

程序员的知识宝库

用MapReduce实现机器学习小例

作者 石默研

用分布式的并发计算能力来实现机器学习算法,是AI实践领域比较重要的方向,因为对海量数据的AI计算来讲,往往单机的能力严重不足,在自己机器上做点实验进行学习可以,但在实际工程中,特别是在所谓的大数据时代,往往需要借助分布式并行计算的能力。


当然,已经有很多框架,比如MPI,Hadoop的Mahout,Spark ML、参数服务器等等,在工程中可以供开发者使用.不过,如果要对用分布式来实现机器学习的方法论与过程有更深入的理解与掌握,自行编程进行相关方向的实验是需要的,在工程中,这种深入的体验也很有可能会对解决实际问题有帮助,知其然,并知期所以然。


将一个标准的机器学习的算法,包括深度学习,在分布式架构上实现,多数情况下,并不会那么直接,一般都需要做一些设计与改造工作,但有一个很基本的原则就是:相互无关联的计算部分可以并行,有上下串行关联的部分不可以并行。机器学习算法基本都是有不断迭代的过程,大多是基于上一次的结果进行下一次,在分布式并行计算中,虽然对不相关的数据集迭代可以并行,但这个过程一般还是不可避免;而有些算法本身就是适合分布式计算的,比如简单贝叶斯,有些算法则需要进行改造才可以,如支持向量机SVM,需要对SMO算法改造成替代品如Pegasos算法后才适合。


这里,以聚类Kmeans算法为例,以MapReduce作分布式框架,以程序实例介绍AI算法的分布式改造过程。很显然,Kmeans算法中,需要大量的每个点与其它所有点的距离计算,这些计算之间没有相关关联的关系,完全可以用分布式框架来实现。


使用Python的mrjob框架,kmeans mr改造后的例程如下;

from mrjob.job import MRJob
from numpy import *


def loadDataSet(file):
    dataMat = []
    fr = open(file)
    for line in fr.readlines():
        curLine = line.strip().split('\t')
        #print('curLine: ', curLine)
        fltLine = list(map(float, curLine))  # 将每个元素转成float类型
        dataMat.append(fltLine)


    return dataMat


def distEclud(vecA, vecB):
    return sqrt(sum(power(vecA-vecB, 2)))


def randCent(dataSet, k):
    n = shape(dataSet)[1]
    centroids = mat(zeros((k, n)))
    for j in range(n):
        minJ = min(dataSet[:,j])
        maxJ = max(dataSet[:,j])
        rangeJ = float(maxJ- minJ)
        centroids[:, j] = minJ+rangeJ*random.rand(k,1)


    return centroids


datMat = mat(loadDataSet('inputFile.txt'))
m = shape(datMat[0])
clusK = 4
centroids = randCent(datMat, clusK)
centroids_pre = mat(zeros((clusK, shape(datMat)[1])))


class MRKmeans(MRJob):
    def __init__(self, *args, **kwargs):
        super(MRKmeans, self).__init__(*args, **kwargs)


    def mapper(self, k, v):
        if False:yield
        fitLine = float(v)
        minDist = inf;
        minIndex = -1;
        for j in range(clusK):
            distJI = distEclud(centroids[j, :], fitLine)
            if distJI < minDist:
                minDist = distJI
                minIndex = j


        yield (minIndex, fitLine)


    def reducer(self, key, listvalues):
        k = int(key)
        centroids[k, :] = mean(list(map(float, listvalues)), axis=0)


        yield (k, 0)


if __name__ == '__main__':
    print(centroids)
    for i in range(100):
        MRKmeans.run()
        mat_sub = subtract(centroids, centroids_pre)
        print(mat_sub)
        mat_dot = sum(mat_sub)
        if(mat_dot*mat_dot < 0.0001):
            break


        centroids_pre = centroids.copy()


    print(centroids)


如上程序逻辑比较简单:


用Map找出每个点与现有K个中心点的距离,并且求出最小距离,对该进行分类

用Reduce对分到同一类的点,计算平均值,求出新的中心点

如上两步迭代,直到满足设定误差要求

所用测试数据集也比较简单,是一个只有一维的数据InputFile.txt:


0.970413

0.901817

0.828698

0.197744

0.466887

0.962147

0.187294

0.388509

0.243889

0.115732

0.616292

0.713436

0.761446

....


如上程序编制为了简单易行、易懂,设计不是很优雅,但可以据此对用MR实现AI算法以达到分布式改造目标的过程与思路进行学习。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接