Skip to content

Commit 56c771c

Browse files
ankurdaverxin
authored andcommitted
[SPARK-1931] Reconstruct routing tables in Graph.partitionBy
905173d introduced a bug in partitionBy where, after repartitioning the edges, it reuses the VertexRDD without updating the routing tables to reflect the new edge layout. Subsequent accesses of the triplets contain nulls for many vertex properties. This commit adds a test for this bug and fixes it by introducing `VertexRDD#withEdges` and calling it in `partitionBy`. Author: Ankur Dave <[email protected]> Closes #885 from ankurdave/SPARK-1931 and squashes the following commits: 3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins 9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy
1 parent cb7fe50 commit 56c771c

File tree

3 files changed

+31
-4
lines changed

3 files changed

+31
-4
lines changed

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag](
300300
def reverseRoutingTables(): VertexRDD[VD] =
301301
this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
302302

303+
/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
304+
def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
305+
val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
306+
val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
307+
(partIter, routingTableIter) =>
308+
val routingTable =
309+
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
310+
partIter.map(_.withRoutingTable(routingTable))
311+
}
312+
new VertexRDD(vertexPartitions)
313+
}
314+
303315
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
304316
private[graphx] def shipVertexAttributes(
305317
shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
8888
}
8989
val edgePartition = builder.toEdgePartition
9090
Iterator((pid, edgePartition))
91-
}, preservesPartitioning = true))
92-
GraphImpl.fromExistingRDDs(vertices, newEdges)
91+
}, preservesPartitioning = true)).cache()
92+
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
9393
}
9494

9595
override def reverse: Graph[VD, ED] = {
@@ -277,7 +277,11 @@ object GraphImpl {
277277
GraphImpl(vertexRDD, edgeRDD)
278278
}
279279

280-
/** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
280+
/**
281+
* Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
282+
* VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
283+
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
284+
*/
281285
def apply[VD: ClassTag, ED: ClassTag](
282286
vertices: VertexRDD[VD],
283287
edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
@@ -290,7 +294,8 @@ object GraphImpl {
290294

291295
/**
292296
* Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
293-
* vertices.
297+
* vertices. The VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
298+
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
294299
*/
295300
def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
296301
vertices: VertexRDD[VD],

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
133133
Iterator((part.srcIds ++ part.dstIds).toSet)
134134
}.collect
135135
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
136+
137+
// Forming triplets view
138+
val g = Graph(
139+
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
140+
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
141+
assert(g.triplets.collect.map(_.toTuple).toSet ===
142+
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
143+
val gPart = g.partitionBy(EdgePartition2D)
144+
assert(gPart.triplets.collect.map(_.toTuple).toSet ===
145+
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
136146
}
137147
}
138148

0 commit comments

Comments
 (0)