Spark MLlib 之 大规模数据集的相似度计算原理探索 (2)

那么在Spark如何快速并行处理呢?通过上面的例子,可以看到两个向量的相似度,需要把每一维度乘积后相加,但是一个向量一般都是跨RDD保存的,所以可以先计算所有向量的第一维,得出结果
\[ (向量1的第1维,向量2的第1维,value)\\ (向量1的第2维,向量2的第2维,value)\\ ...\\ (向量1的第n维,向量2的第n维,value)\\ (向量1的第1维,向量3的第1维,value)\\ ..\\ (向量1的第n维,向量3的第n维,value)\\ \]
最后对做一次reduceByKey累加结果即可.....

阅读源码

首先创建dataframe形成matrix:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.sql.SparkSession object MatrixSimTest { def main(args: Array[String]): Unit = { // 创建dataframe,转换成matrix val spark = SparkSession.builder().master("local[*]").appName("sim").getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val df = spark.createDataFrame(Seq( (0, 0, 1.0), (1, 0, 1.0), (2, 0, 1.0), (3, 0, 1.0), (0, 1, 2.0), (1, 1, 2.0), (2, 1, 1.0), (3, 1, 1.0), (0, 2, 3.0), (1, 2, 3.0), (2, 2, 3.0), (0, 3, 1.0), (1, 3, 1.0), (3, 3, 4.0) )) val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD) // 调用sim方法 val x = matrix.toRowMatrix().columnSimilarities() // 得到相似度结果 x.entries.collect().foreach(println) } }

得到的结果为:

MatrixEntry(0,3,0.7071067811865476) MatrixEntry(0,2,0.8660254037844386) MatrixEntry(2,3,0.2721655269759087) MatrixEntry(0,1,0.9486832980505139) MatrixEntry(1,2,0.9128709291752768) MatrixEntry(1,3,0.596284793999944)

直接进入columnSimilarities方法看看是怎么个流程吧!

def columnSimilarities(): CoordinateMatrix = { columnSimilarities(0.0) }

内部调用了带阈值的相似度方法,这里的阈值是指相似度小于该值时,输出结果时,会自动过滤掉。

def columnSimilarities(threshold: Double): CoordinateMatrix = { //检查参数... val gamma = if (threshold < 1e-6) { Double.PositiveInfinity } else { 10 * math.log(numCols()) / threshold } columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) }

todo todo todo gamma
然后看一下computeColumnSummaryStatistics().normL2.toArray这个方法:

def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = { val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)( (aggregator, data) => aggregator.add(data), (aggregator1, aggregator2) => aggregator1.merge(aggregator2)) updateNumRows(summary.count) summary }

之前有介绍这个treeAggregate是一种带“预reduce”的map-reduce,返回的summary,里面帮我们统计了每一个向量的很多指标,比如

currMean 为 每一个向量的平均值 currM2 为 每个向量的每一维的平方和 currL1 为 每个向量的绝对值的和 currMax 为 每个向量的最大值 currMin 为 每个向量的最小值 nnz 为 每个向量的非0个数

这里我们只需要currM2,它是每个向量的平方和。summary调用的normL2方法:

override def normL2: Vector = { require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") val realMagnitude = Array.ofDim[Double](n) var i = 0 val len = currM2.length while (i < len) { realMagnitude(i) = math.sqrt(currM2(i)) i += 1 } Vectors.dense(realMagnitude) }

上面这步就是对平方和开个根号,这样就求出来了每个向量的分母部分。
下面就是最关键的地方了:

private[mllib] def columnSimilaritiesDIMSUM( colMags: Array[Double], gamma: Double): CoordinateMatrix = { // 一些参数校验 // 对gamma进行开方 val sg = math.sqrt(gamma) // sqrt(gamma) used many times // 这里把前面算的平方根的值设置一个默认值,因为如果为0,除0会报异常,所以设置为1 val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x) // 把抽样概率数组 和 平方根数组进行广播 val sc = rows.context val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c)) val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c))) // 遍历每一行,计算每个向量该维的乘积,形成三元组 val sims = rows.mapPartitionsWithIndex { (indx, iter) => val p = pBV.value val q = qBV.value // 获得随机值 val rand = new XORShiftRandom(indx) val scaled = new Array[Double](p.size) iter.flatMap { row => row match { case SparseVector(size, indices, values) => // 如果是稀疏向量,遍历向量的每一维,除以平方根 val nnz = indices.size var k = 0 while (k < nnz) { scaled(k) = values(k) / q(indices(k)) k += 1 } // 遍历向量数组,计算每一个数值与其他数值的乘机。 // 比如向量(1, 2, 0 ,1) // 得到的结果为 (0,1,value)(0,3,value)(2,3,value) Iterator.tabulate (nnz) { k => val buf = new ListBuffer[((Int, Int), Double)]() val i = indices(k) val iVal = scaled(k) // 判断当前列是否符合采样范围,如果小于采样值,就忽略 if (iVal != 0 && rand.nextDouble() < p(i)) { var l = k + 1 while (l < nnz) { val j = indices(l) val jVal = scaled(l) if (jVal != 0 && rand.nextDouble() < p(j)) { // 计算每一维与其他维的值 buf += (((i, j), iVal * jVal)) } l += 1 } } buf }.flatten case DenseVector(values) => // 跟稀疏同理 val n = values.size var i = 0 while (i < n) { scaled(i) = values(i) / q(i) i += 1 } Iterator.tabulate (n) { i => val buf = new ListBuffer[((Int, Int), Double)]() val iVal = scaled(i) if (iVal != 0 && rand.nextDouble() < p(i)) { var j = i + 1 while (j < n) { val jVal = scaled(j) if (jVal != 0 && rand.nextDouble() < p(j)) { buf += (((i, j), iVal * jVal)) } j += 1 } } buf }.flatten } } // 最后再执行一个reduceBykey,累加所有的值,就是i和j的相似度 }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i.toLong, j.toLong, sim) } new CoordinateMatrix(sims, numCols(), numCols()) }

这样把所有向量的平方和广播后,每一行都可以在不同的节点并行处理了。

不过杰卡德目前并不能使用这种方法来计算,因为杰卡德中间有一项需要对向量求dot,这种方式就不适合了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyjfp.html