程序笔记   发布时间:2022-05-30  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了SparkGraphx计算指定节点的N度关系节点源码大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

直接上代码:

package horizon.graphx.util
import java.security.InvalIDParameterException
import horizon.graphx.util.CollectionUtil.CollectionHelper
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.reflect.classtag
/**
 * Created by yepei.ye on 2017/1/19.
 * Description:用于在图中为指定的节点计算这些节点的N度关系节点,输出这些节点与源节点的路径长度和节点ID
 */
object GraphNdegUtil {
 val maxNDegVerticesCount = 10000
 val maxDegree = 1000
 /**
 * 计算节点的N度关系
 *
 * @param edges
 * @param choosedVertex
 * @param degree
 * @tparam ED
 * @return
 */
 def aggNdegreedVertices[ED: classtag](edges: RDD[(VertexID,VertexID)],choosedVertex: RDD[VertexID],degree: int): VertexRDD[Map[Int,Set[VertexID]]] = {
 val simplegraph = Graph.fromEdgeTuples(edges,Option(PartitionStrategy.EdgePartition2D),StorageLevel.MEMORY_AND_disK_SER,StorageLevel.MEMORY_AND_disK_SER)
 aggNdegreedVertices(simplegraph,choosedVertex,degreE)
 }
 def aggNdegreedVerticesWithAttr[VD: classtag,ED: classtag](graph: Graph[VD,ED],degree: Int,sendFilter: (VD,VD) => Boolean = (_: VD,_: VD) => truE): VertexRDD[Map[Int,Set[VD]]] = {
 val ndegs: VertexRDD[Map[Int,Set[VertexID]]] = aggNdegreedVertices(graph,degree,sendFilter)
 val flated: RDD[Ver[VD]] = ndegs.flatMap(e => e._2.flatMap(t => t._2.map(s => Ver(e._1,s,t._1,null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_disK_SER)
 val matched: RDD[Ver[VD]] = flated.map(e => (e.ID,E)).join(graph.vertices).map(e => e._2._1.copy(attr = e._2._2)).persist(StorageLevel.MEMORY_AND_disK_SER)
 flated.unpersist(blocking = falsE)
 ndegS.Unpersist(blocking = falsE)
 val grouped: RDD[(VertexID,Map[Int,Set[VD]])] = matched.map(e => (e.source,ArrayBuffer(E))).reduceByKey(_ ++= _).map(e => (e._1,e._2.map(t => (t.degree,Set(t.attr))).reduceByKey(_ ++ _).toMap))
 matched.unpersist(blocking = falsE)
 VertexRDD(grouped)
 }
 def aggNdegreedVertices[VD: classtag,_: VD) => true
              ): VertexRDD[Map[Int,Set[VertexID]]] = {
 if (degree < 1) {
  throw new InvalIDParameterException("度参数错误:" + degreE)
 }
 val initVertex = choosedVertex.map(e => (e,truE)).persist(StorageLevel.MEMORY_AND_disK_SER)
 var g: Graph[DegVertex[VD],Int] = graph.outerJoinVertices(graph.degrees)((_,old,deg) => (deg.getorElse(0),old))
  .subgraph(vpred = (_,a) => a._1 <= maxDegreE)
  //去掉大节点
  .outerJoinVertices(initVerteX)((ID,hasReceivedMsg) => {
  DegVertex(old._2,hasReceivedMsg.getorElse(false),ArrayBuffer((ID,0))) //初始化要发消息的节点
 }).mapEdges(_ => 0).cache() //简化边属性
 choosedVertex.unpersist(blocking = falsE)
 var i = 0
 var prevG: Graph[DegVertex[VD],Int] = null
 var newVertexRdd: VertexRDD[ArrayBuffer[(VertexID,int)]] = null
 while (i < degree + 1) {
  prevG = g
  //发第i+1轮消息
  newVertexRdd = prevG.aggregatemessages[ArrayBuffer[(VertexID,int)]](sendMsg(_,sendFilter),(a,b) => reduceVertexIDs(a ++ b)).persist(StorageLevel.MEMORY_AND_disK_SER)
  g = g.outerJoinVertices(newVertexRdd)((vID,msg) => if (msg.isdefined) updateVertexBymsg(vID,msg.get) else old.copy(init = falsE)).cache()
  prevG.unpersistVertices(blocking = falsE)
  prevG.edgeS.Unpersist(blocking = falsE)
  newVertexRdd.unpersist(blocking = falsE)
  i += 1
 }
 newVertexRdd.unpersist(blocking = falsE)
 val maped = g.vertices.join(initVerteX).mapValues(e => sortResult(e._1)).persist(StorageLevel.MEMORY_AND_disK_SER)
 initVertex.unpersist()
 g.unpersist(blocking = falsE)
 VertexRDD(maped)
 }
 private case class Ver[VD: classtag](source: VertexID,ID: VertexID,attr: VD = null.asInstanceOf[VD])
 private def updateVertexBymsg[VD: classtag](vertexID: VertexID,oldAttr: DegVertex[VD],msg: ArrayBuffer[(VertexID,int)]): DegVertex[VD] = {
 val addOne = msg.map(e => (e._1,e._2 + 1))
 val newMsg = reduceVertexIDs(oldAttr.degVertices ++ addOnE)
 oldAttr.copy(init = msg.nonEmpty,degVertices = newMsg)
 }
 private def sortresult[VD: classtag](degs: DegVertex[VD]): Map[Int,Set[VertexID]] = degs.degVertices.map(e => (e._2,Set(e._1))).reduceByKey(_ ++ _).toMap
 case class DegVertex[VD: classtag](var attr: VD,init: Boolean = false,degVertices: ArrayBuffer[(VertexID,int)])
 case class VertexDegInfo[VD: classtag](var attr: VD,int)])
 private def sendMsg[VD: classtag](e: EdgeContext[DegVertex[VD],Int,ArrayBuffer[(VertexID,int)]],VD) => Boolean): Unit = {
 try {
  val src = e.srcAttr
  val dst = e.dstAttr
  //只有dst是ready状态才接收消息
  if (src.degVertices.size < maxNDegVerticesCount && (src.init || dst.init) && dst.degVertices.size < maxNDegVerticesCount && !isAttrSame(src,dst)) {
  if (sendFilter(src.attr,dst.attr)) {
   e.sendToDst(reduceVertexIDs(src.degVertices))
  }
  if (sendFilter(dst.attr,dst.attr)) {
   e.sendToSrc(reduceVertexIDs(dst.degVertices))
  }
  }
 } catch {
  case ex: Exception =>
  println(s"==========error found: exception:${ex.getmessagE}," +
   s"edgeTriplet:(srcID:${e.srcID},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.sizE}))," +
   s"dstID:${e.dstID},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.sizE}),attr:${e.attr}")
  ex.printstacktrace()
  throw ex
 }
 }
 private def reduceVertexIDs(IDs: ArrayBuffer[(VertexID,int)]): ArrayBuffer[(VertexID,int)] = ArrayBuffer() ++= IDs.reduceByKey(Math.min)
 private def isAttrSame[VD: classtag](a: DegVertex[VD],b: DegVertex[VD]): Boolean = a.init == b.init && allKeysAreSame(a.degVertices,b.degVertices)
 private def allKeysAreSame(a: ArrayBuffer[(VertexID,int)],b: ArrayBuffer[(VertexID,int)]): Boolean = {
 val aKeys = a.map(e => e._1).toSet
 val bKeys = b.map(e => e._1).toSet
 if (aKeys.size != bKeys.size || aKeys.isEmpty) return false
 aKeys.diff(bKeys).isEmpty && bKeys.diff(aKeys).isEmpty
 }
}

其中sortResult方法里对Traversable[(K,V)]类型的集合使用了reduceByKey方法,这个方法是自行封装的,使用时需要导入,代码如下:

/**
 * Created by yepei.ye on 2016/12/21.
 * Description:
 */
object CollectionUtil {
 /**
 * 对具有Traversable[(K,V)]类型的集合添加reduceByKey相关方法
 *
 * @param collection
 * @param kt
 * @param vt
 * @tparam K
 * @tparam V
 */
 implicit class CollectionHelper[K,V](collection: Traversable[(K,V)])(implicit kt: classtag[K],vt: classtag[V]) {
 def reduceByKey(f: (V,V) => V): Traversable[(K,V)] = collection.groupBy(_._1).map { case (_: K,values: Traversable[(K,V)]) => values.reduce((a,b) => (a._1,f(a._2,b._2))) }
 /**
  * reduceByKey的同时,返回被reduce掉的元素的集合
  *
  * @param f
  * @return
  */
 def reduceByKeyWithReduced(f: (V,V) => V)(implicit kt: classtag[K],vt: classtag[V]): (Traversable[(K,V)],Traversable[(K,V)]) = {
  val reduced: ArrayBuffer[(K,V)] = ArrayBuffer()
  val newSeq = collection.groupBy(_._1).map {
  case (_: K,b) => {
   val newValue: V = f(a._2,b._2)
   val reducedValue: V = if (newValue == a._2) b._2 else a._2
   val reducedPair: (K,V) = (a._1,reducedvalue)
   reduced += reducedPair
   (a._1,newvalue)
  })
  }
  (newSeq,reduced.toTraversablE)
 }
 }
}

总结

以上就是本文关于SparkGraphx计算指定节点的N度关系节点源码的全部内容了,希望对大家有所帮助。感兴趣的朋友可以参阅:浅谈七种常见的Hadoop和Spark项目案例  Spark的广播变量和累加器使用方法代码示例  Spark入门简介等,有什么问题请留言,小编会及时回复大家的。

大佬总结

以上是大佬教程为你收集整理的SparkGraphx计算指定节点的N度关系节点源码全部内容,希望文章能够帮你解决SparkGraphx计算指定节点的N度关系节点源码所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签:spark节点