From a97a0a8a7c0a5d1e0c1f5d5415743747510712cd Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 26 Feb 2016 18:16:20 +0900 Subject: [PATCH 1/8] Support for writing CSV with a single function call --- .../apache/spark/sql/DataFrameWriter.scala | 29 +++++++++++++++++++ .../execution/datasources/csv/CSVSuite.scala | 3 +- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d6bdd3d82556..9ec8080209e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -464,6 +464,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("parquet").save(path) * }}} * + * You can set the following JSON-specific options for writing JSON files: + *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. + * This should be the fully qualified name of a class implementing + * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive + * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * * @since 1.4.0 */ def parquet(path: String): Unit = format("parquet").save(path) @@ -492,10 +498,33 @@ final class DataFrameWriter private[sql](df: DataFrame) { * df.write().text("/path/to/output") * }}} * + * You can set the following options for writing text files: + *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. + * This should be the fully qualified name of a class implementing + * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive + * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * * @since 1.6.0 */ def text(path: String): Unit = format("text").save(path) + /** + * Saves the content of the [[DataFrame]] in CSV format at the specified path. + * This is equivalent to: + * {{{ + * format("csv").save(path) + * }}} + * + * You can set the following CSV-specific options for writing CSV files: + *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. + * This should be the fully qualified name of a class implementing + * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive + * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * + * @since 2.0.0 + */ + def csv(path: String): Unit = format("csv").save(path) + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7671bc106610..db119dc369d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -268,9 +268,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .load(testFile(carsFile)) cars.coalesce(1).write - .format("csv") .option("header", "true") - .save(csvDir) + .csv(csvDir) val carsCopy = sqlContext.read .format("csv") From f82a2f4741f7b7bd436e6ccbe62d148ce713bfba Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 26 Feb 2016 21:12:39 +0900 Subject: [PATCH 2/8] Move the comments above for json. --- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9ec8080209e0..e0722987fcce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -453,6 +453,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("json").save(path) * }}} * + * You can set the following JSON-specific options for writing JSON files: + *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. + * This should be the fully qualified name of a class implementing + * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive + * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * * @since 1.4.0 */ def json(path: String): Unit = format("json").save(path) @@ -464,12 +470,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("parquet").save(path) * }}} * - * You can set the following JSON-specific options for writing JSON files: - *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. - * This should be the fully qualified name of a class implementing - * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive - * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • - * * @since 1.4.0 */ def parquet(path: String): Unit = format("parquet").save(path) From 9fe8fcadba1b5ed145b11599445a59f1d044cfe5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 29 Feb 2016 09:20:48 +0900 Subject: [PATCH 3/8] Update comments and remove the supprot for `codec` for JSON and TEXT --- .../apache/spark/sql/DataFrameWriter.scala | 24 +++++++------------ .../datasources/json/JSONOptions.scala | 5 +--- .../datasources/text/DefaultSource.scala | 5 +--- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e0722987fcce..093504c765ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -453,11 +453,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("json").save(path) * }}} * - * You can set the following JSON-specific options for writing JSON files: - *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. - * This should be the fully qualified name of a class implementing - * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive - * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * You can set the following JSON-specific option(s) for writing JSON files: + *
  • `compression` (default `null`): compression codec to use when saving to file. This can be + * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`).
  • * * @since 1.4.0 */ @@ -498,11 +496,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { * df.write().text("/path/to/output") * }}} * - * You can set the following options for writing text files: - *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. - * This should be the fully qualified name of a class implementing - * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive - * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * You can set the following option(s) for writing text files: + *
  • `compression` (default `null`): compression codec to use when saving to file. This can be + * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`).
  • * * @since 1.6.0 */ @@ -515,11 +511,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { * format("csv").save(path) * }}} * - * You can set the following CSV-specific options for writing CSV files: - *
  • `compression` or `codec` (default `null`): compression codec to use when saving to file. - * This should be the fully qualified name of a class implementing - * [[org.apache.hadoop.io.compress.CompressionCodec]] or one of the known case-insensitive - * shorten names(`bzip2`, `gzip`, `lz4`, and `snappy`).
  • + * You can set the following CSV-specific option(s) for writing CSV files: + *
  • `compression` (default `null`): compression codec to use when saving to file. This can be + * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`).
  • * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 31a95ed46121..e59dbd6b3d43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -48,10 +48,7 @@ private[sql] class JSONOptions( parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true) val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) - val compressionCodec = { - val name = parameters.get("compression").orElse(parameters.get("codec")) - name.map(CompressionCodecs.getCodecClassName) - } + val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 60155b32349a..8f3f6335e428 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -115,10 +115,7 @@ private[sql] class TextRelation( /** Write path. */ override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = job.getConfiguration - val compressionCodec = { - val name = parameters.get("compression").orElse(parameters.get("codec")) - name.map(CompressionCodecs.getCodecClassName) - } + val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } From c703def212e9734fa5b62fc204b3c51113431452 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 29 Feb 2016 11:59:12 +0900 Subject: [PATCH 4/8] Add Python API --- python/pyspark/sql/readwriter.py | 48 ++++++++++++++++++++++++++++++++ python/test_support/sql/ages.csv | 3 ++ 2 files changed, 51 insertions(+) create mode 100644 python/test_support/sql/ages.csv diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index b1453c637f79..88bab8d6cc2e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -233,6 +233,21 @@ def text(self, paths): paths = [paths] return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths))) + @since(2.0) + def csv(self, paths): + """Loads a CSV file and returns the result as a [[DataFrame]]. + + This function goes through the input once to determine the input schema. To avoid going + through the entire data once, specify the schema explicitly using [[schema]]. + + :param paths: string, or list of strings, for input path(s). + + >>> df = sqlContext.read.csv('python/test_support/sql/ages.csv') + >>> df.dtypes + [('C0', 'string'), ('C1', 'bigint')] + """ + return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths))) + @since(1.5) def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. @@ -448,6 +463,11 @@ def json(self, path, mode=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. + You can set the following JSON-specific option(s) for writing JSON files: + * ``compression`` (default ``None``): compression codec to use when saving to file. + This can be one of the known case-insensitive shorten names + (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). + >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode)._jwrite.json(path) @@ -476,11 +496,39 @@ def parquet(self, path, mode=None, partitionBy=None): def text(self, path): """Saves the content of the DataFrame in a text file at the specified path. + :param path: the path in any Hadoop supported file system + The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. + + You can set the following option(s) for writing text files: + * ``compression`` (default ``None``): compression codec to use when saving to file. + This can be one of the known case-insensitive shorten names + (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). """ self._jwrite.text(path) + @since(2.0) + def csv(self, path, mode=None): + """Saves the content of the [[DataFrame]] in CSV format at the specified path. + + :param path: the path in any Hadoop supported file system + :param mode: specifies the behavior of the save operation when data already exists. + + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. + + You can set the following CSV-specific option(s) for writing CSV files: + * ``compression`` (default ``None``): compression codec to use when saving to file. + This can be one of the known case-insensitive shorten names + (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). + + >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) + """ + self.mode(mode)._jwrite.csv(path) + @since(1.5) def orc(self, path, mode=None, partitionBy=None): """Saves the content of the :class:`DataFrame` in ORC format at the specified path. diff --git a/python/test_support/sql/ages.csv b/python/test_support/sql/ages.csv new file mode 100644 index 000000000000..8c703d40867e --- /dev/null +++ b/python/test_support/sql/ages.csv @@ -0,0 +1,3 @@ +Joe,20 +Tom,30 +Hyukjin,25 \ No newline at end of file From 8fbad40c19fdb1f48054fd18089e1767be224157 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 29 Feb 2016 12:39:36 +0900 Subject: [PATCH 5/8] Add a newline at the end of the test file --- python/test_support/sql/ages.csv | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/test_support/sql/ages.csv b/python/test_support/sql/ages.csv index 8c703d40867e..18991feda788 100644 --- a/python/test_support/sql/ages.csv +++ b/python/test_support/sql/ages.csv @@ -1,3 +1,4 @@ Joe,20 Tom,30 -Hyukjin,25 \ No newline at end of file +Hyukjin,25 + From 9ca920bbc37dd53697b214eabfa413d61f09d682 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 29 Feb 2016 12:43:45 +0900 Subject: [PATCH 6/8] Update an weired indentation --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 88bab8d6cc2e..ee0efdbbe353 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -520,7 +520,7 @@ def csv(self, path, mode=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - You can set the following CSV-specific option(s) for writing CSV files: + You can set the following CSV-specific option(s) for writing CSV files: * ``compression`` (default ``None``): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (``bzip2``, ``gzip``, ``lz4``, and ``snappy``). From fea9df82bf37514965e5978095ec0e9c129f1ebe Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 29 Feb 2016 16:15:26 +0900 Subject: [PATCH 7/8] Make path seq if it is an string --- python/pyspark/sql/readwriter.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ee0efdbbe353..7588e2eb6144 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -242,10 +242,12 @@ def csv(self, paths): :param paths: string, or list of strings, for input path(s). - >>> df = sqlContext.read.csv('python/test_support/sql/ages.csv') + >>> df = sqlContext.read.option("inferSchema", "true").csv('python/test_support/sql/ages.csv') >>> df.dtypes - [('C0', 'string'), ('C1', 'bigint')] + [('C0', 'string'), ('C1', 'int')] """ + if isinstance(paths, basestring): + paths = [paths] return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths))) @since(1.5) From cec8442b516cf73069a5ac8bc4c02f56618b9cc5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 29 Feb 2016 16:17:02 +0900 Subject: [PATCH 8/8] Read without options --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7588e2eb6144..7f5368d8bdbb 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -242,9 +242,9 @@ def csv(self, paths): :param paths: string, or list of strings, for input path(s). - >>> df = sqlContext.read.option("inferSchema", "true").csv('python/test_support/sql/ages.csv') + >>> df = sqlContext.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes - [('C0', 'string'), ('C1', 'int')] + [('C0', 'string'), ('C1', 'string')] """ if isinstance(paths, basestring): paths = [paths]