那么在Spark如何快速并行处理呢?通过上面的例子,可以看到两个向量的相似度,需要把每一维度乘积后相加,但是一个向量一般都是跨RDD保存的,所以可以先计算所有向量的第一维,得出结果
\[
(向量1的第1维,向量2的第1维,value)\\
(向量1的第2维,向量2的第2维,value)\\
...\\
(向量1的第n维,向量2的第n维,value)\\
(向量1的第1维,向量3的第1维,value)\\
..\\
(向量1的第n维,向量3的第n维,value)\\
\]
最后对做一次reduceByKey累加结果即可.....
首先创建dataframe形成matrix:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.sql.SparkSession object MatrixSimTest { def main(args: Array[String]): Unit = { // 创建dataframe,转换成matrix val spark = SparkSession.builder().master("local[*]").appName("sim").getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val df = spark.createDataFrame(Seq( (0, 0, 1.0), (1, 0, 1.0), (2, 0, 1.0), (3, 0, 1.0), (0, 1, 2.0), (1, 1, 2.0), (2, 1, 1.0), (3, 1, 1.0), (0, 2, 3.0), (1, 2, 3.0), (2, 2, 3.0), (0, 3, 1.0), (1, 3, 1.0), (3, 3, 4.0) )) val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD) // 调用sim方法 val x = matrix.toRowMatrix().columnSimilarities() // 得到相似度结果 x.entries.collect().foreach(println) } }得到的结果为:
MatrixEntry(0,3,0.7071067811865476) MatrixEntry(0,2,0.8660254037844386) MatrixEntry(2,3,0.2721655269759087) MatrixEntry(0,1,0.9486832980505139) MatrixEntry(1,2,0.9128709291752768) MatrixEntry(1,3,0.596284793999944)直接进入columnSimilarities方法看看是怎么个流程吧!
def columnSimilarities(): CoordinateMatrix = { columnSimilarities(0.0) }内部调用了带阈值的相似度方法,这里的阈值是指相似度小于该值时,输出结果时,会自动过滤掉。
def columnSimilarities(threshold: Double): CoordinateMatrix = { //检查参数... val gamma = if (threshold < 1e-6) { Double.PositiveInfinity } else { 10 * math.log(numCols()) / threshold } columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) }todo todo todo gamma
然后看一下computeColumnSummaryStatistics().normL2.toArray这个方法:
之前有介绍这个treeAggregate是一种带“预reduce”的map-reduce,返回的summary,里面帮我们统计了每一个向量的很多指标,比如
currMean 为 每一个向量的平均值 currM2 为 每个向量的每一维的平方和 currL1 为 每个向量的绝对值的和 currMax 为 每个向量的最大值 currMin 为 每个向量的最小值 nnz 为 每个向量的非0个数这里我们只需要currM2,它是每个向量的平方和。summary调用的normL2方法:
override def normL2: Vector = { require(totalWeightSum > 0, s"Nothing has been added to this summarizer.") val realMagnitude = Array.ofDim[Double](n) var i = 0 val len = currM2.length while (i < len) { realMagnitude(i) = math.sqrt(currM2(i)) i += 1 } Vectors.dense(realMagnitude) }上面这步就是对平方和开个根号,这样就求出来了每个向量的分母部分。
下面就是最关键的地方了:
这样把所有向量的平方和广播后,每一行都可以在不同的节点并行处理了。
不过杰卡德目前并不能使用这种方法来计算,因为杰卡德中间有一项需要对向量求dot,这种方式就不适合了。