Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,17 @@ def json(self, path, schema=None):
elif type(path) == list:
return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
elif isinstance(path, RDD):
return self._df(self._jreader.json(path._jrdd))
def func(iterator):
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
if isinstance(x, unicode):
x = x.encode("utf-8")
yield x
keyed = path.mapPartitions(func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString())
return self._df(self._jreader.json(jrdd))
else:
raise TypeError("path can be only string or RDD")

Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def test_broadcast_in_udf(self):

def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.sqlCtx.jsonRDD(rdd)
df = self.sqlCtx.read.json(rdd)
df.count()
df.collect()
df.schema
Expand All @@ -345,7 +345,7 @@ def test_basic_functions(self):
df.collect()

def test_apply_schema_to_row(self):
df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())

Expand Down Expand Up @@ -821,7 +821,7 @@ def test_save_and_load_builder(self):
def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.sqlCtx.jsonRDD(rdd)
df = self.sqlCtx.read.json(rdd)
# render_doc() reproduces the help() exception without printing output
pydoc.render_doc(df)
pydoc.render_doc(df.foo)
Expand Down