Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ def _load_stream_without_unbatching(self, stream):
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
key_batch = list(key_batch)
val_batch = list(val_batch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fix the doc in Serializer._load_stream_without_unbatching to say, it returns iterator of iterables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 66477f8

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I had to be clear. Actually, I meant if Serializer._load_stream_without_unbatching works as documented an iterator of deserialized batches (lists), everything should have worked fine. So, I think the reverse is actually more correct because PairDeserializer and CartesianDeserializer do not follow this.

I am okay with the current change too but I believe the reverse is better because I think we could prevent such issues in the future and make the things simpler. WDYT @aray and @holdenk ?

if len(key_batch) != len(val_batch):
raise ValueError("Can not deserialize PairRDD with different number of items"
" in batches: (%d, %d)" % (len(key_batch), len(val_batch)))
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,18 @@ def test_cartesian_chaining(self):
set([(x, (y, y)) for x in range(10) for y in range(10)])
)

def test_zip_chaining(self):
# Tests for SPARK-21985
rdd = self.sc.parallelize(range(10), 2)
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case already passes without this change, doesn't it?

self.assertSetEqual(
set(rdd.zip(rdd).zip(rdd).collect()),
set([((x, x), x) for x in range(10)])
)
self.assertSetEqual(
set(rdd.zip(rdd.zip(rdd)).collect()),
set([(x, (x, x)) for x in range(10)])
)

def test_deleting_input_files(self):
# Regression test for SPARK-1025
tempFile = tempfile.NamedTemporaryFile(delete=False)
Expand Down