diff --git a/doc/source/whatsnew/v1.0.4.rst b/doc/source/whatsnew/v1.0.4.rst index 4f122b0dea5f4..714bcfb8720db 100644 --- a/doc/source/whatsnew/v1.0.4.rst +++ b/doc/source/whatsnew/v1.0.4.rst @@ -31,6 +31,10 @@ Bug fixes ~~~~~~~~~ - Bug in :meth:`SeriesGroupBy.first`, :meth:`SeriesGroupBy.last`, :meth:`SeriesGroupBy.min`, and :meth:`SeriesGroupBy.max` returning floats when applied to nullable Booleans (:issue:`33071`) - Bug in :meth:`Rolling.min` and :meth:`Rolling.max`: Growing memory usage after multiple calls when using a fixed window (:issue:`30726`) +- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`) +- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`) +- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`) +- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`) - Contributors diff --git a/pandas/io/common.py b/pandas/io/common.py index 9617965915aa5..eaf4bcf203796 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -141,6 +141,33 @@ def urlopen(*args, **kwargs): return urllib.request.urlopen(*args, **kwargs) +def get_fs_for_path(filepath: str): + """ + Get appropriate filesystem given a filepath. + Supports s3fs, gcs and local file system. + + Parameters + ---------- + filepath : str + File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj + + Returns + ------- + s3fs.S3FileSystem, gcsfs.GCSFileSystem, None + Appropriate FileSystem to use. None for local filesystem. + """ + if is_s3_url(filepath): + from pandas.io import s3 + + return s3.get_fs() + elif is_gcs_url(filepath): + from pandas.io import gcs + + return gcs.get_fs() + else: + return None + + def get_filepath_or_buffer( filepath_or_buffer: FilePathOrBuffer, encoding: Optional[str] = None, diff --git a/pandas/io/formats/csvs.py b/pandas/io/formats/csvs.py index 0d581f30e50e7..0b802f0f2b35b 100644 --- a/pandas/io/formats/csvs.py +++ b/pandas/io/formats/csvs.py @@ -62,7 +62,7 @@ def __init__( # Extract compression mode as given, if dict compression, self.compression_args = get_compression_method(compression) - self.path_or_buf, _, _, _ = get_filepath_or_buffer( + self.path_or_buf, _, _, self.should_close = get_filepath_or_buffer( path_or_buf, encoding=encoding, compression=compression, mode=mode ) self.sep = sep @@ -224,6 +224,8 @@ def save(self) -> None: f.close() for _fh in handles: _fh.close() + elif self.should_close: + f.close() def _save_header(self): writer = self.writer diff --git a/pandas/io/gcs.py b/pandas/io/gcs.py index 1f5e0faedc6d2..d2d8fc2d2139f 100644 --- a/pandas/io/gcs.py +++ b/pandas/io/gcs.py @@ -6,6 +6,10 @@ ) +def get_fs(): + return gcsfs.GCSFileSystem() + + def get_filepath_or_buffer( filepath_or_buffer, encoding=None, compression=None, mode=None ): @@ -13,6 +17,6 @@ def get_filepath_or_buffer( if mode is None: mode = "rb" - fs = gcsfs.GCSFileSystem() + fs = get_fs() filepath_or_buffer = fs.open(filepath_or_buffer, mode) return filepath_or_buffer, None, compression, True diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 3a686a1a3b122..9c94c913e35cd 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,7 +8,12 @@ from pandas import DataFrame, get_option -from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url +from pandas.io.common import ( + get_filepath_or_buffer, + get_fs_for_path, + is_gcs_url, + is_s3_url, +) def get_engine(engine: str) -> "BaseImpl": @@ -92,13 +97,15 @@ def write( **kwargs, ): self.validate_dataframe(df) - path, _, _, _ = get_filepath_or_buffer(path, mode="wb") + file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) + # write_to_dataset does not support a file-like object when + # a dircetory path is used, so just pass the path string. if partition_cols is not None: self.api.parquet.write_to_dataset( table, @@ -111,22 +118,20 @@ def write( else: self.api.parquet.write_table( table, - path, + file_obj_or_path, compression=compression, coerce_timestamps=coerce_timestamps, **kwargs, ) - - def read(self, path, columns=None, **kwargs): - path, _, _, should_close = get_filepath_or_buffer(path) - - kwargs["use_pandas_metadata"] = True - result = self.api.parquet.read_table( - path, columns=columns, **kwargs - ).to_pandas() if should_close: - path.close() + file_obj_or_path.close() + def read(self, path, columns=None, **kwargs): + parquet_ds = self.api.parquet.ParquetDataset( + path, filesystem=get_fs_for_path(path), **kwargs + ) + kwargs["columns"] = columns + result = parquet_ds.read_pandas(**kwargs).to_pandas() return result @@ -281,7 +286,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs): A file URL can also be a path to a directory that contains multiple partitioned parquet files. Both pyarrow and fastparquet support paths to directories as well as file URLs. A directory path could be: - ``file://localhost/path/to/tables`` + ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir`` If you want to pass in a path object, pandas accepts any ``os.PathLike``. diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 976c319f89d47..329c861d2386a 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -16,6 +16,10 @@ def _strip_schema(url): return result.netloc + result.path +def get_fs(): + return s3fs.S3FileSystem(anon=False) + + def get_file_and_filesystem( filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None ) -> Tuple[IO, Any]: @@ -24,7 +28,7 @@ def get_file_and_filesystem( if mode is None: mode = "rb" - fs = s3fs.S3FileSystem(anon=False) + fs = get_fs() try: file = fs.open(_strip_schema(filepath_or_buffer), mode) except (FileNotFoundError, NoCredentialsError): @@ -34,7 +38,7 @@ def get_file_and_filesystem( # aren't valid for that bucket. # A NoCredentialsError is raised if you don't have creds # for that bucket. - fs = s3fs.S3FileSystem(anon=True) + fs = get_fs() file = fs.open(_strip_schema(filepath_or_buffer), mode) return file, fs diff --git a/pandas/tests/io/parser/test_network.py b/pandas/tests/io/parser/test_network.py index b7164477c31f2..1b1576fdb9a19 100644 --- a/pandas/tests/io/parser/test_network.py +++ b/pandas/tests/io/parser/test_network.py @@ -54,8 +54,8 @@ def tips_df(datapath): @pytest.mark.usefixtures("s3_resource") @td.skip_if_not_us_locale() class TestS3: + @td.skip_if_no("s3fs") def test_parse_public_s3_bucket(self, tips_df): - pytest.importorskip("s3fs") # more of an integration test due to the not-public contents portion # can probably mock this though. @@ -159,7 +159,7 @@ def test_parse_public_s3_bucket_nrows_python(self, tips_df): assert not df.empty tm.assert_frame_equal(tips_df.iloc[:10], df) - def test_s3_fails(self): + def test_read_s3_fails(self): with pytest.raises(IOError): read_csv("s3://nyqpug/asdf.csv") @@ -168,6 +168,33 @@ def test_s3_fails(self): with pytest.raises(IOError): read_csv("s3://cant_get_it/file.csv") + def test_write_s3_csv_fails(self, tips_df): + # GH 32486 + # Attempting to write to an invalid S3 path should raise + import botocore + + # GH 34087 + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html + # Catch a ClientError since AWS Service Errors are defined dynamically + error = (FileNotFoundError, botocore.exceptions.ClientError) + + with pytest.raises(error, match="The specified bucket does not exist"): + tips_df.to_csv("s3://an_s3_bucket_data_doesnt_exit/not_real.csv") + + @td.skip_if_no("pyarrow") + def test_write_s3_parquet_fails(self, tips_df): + # GH 27679 + # Attempting to write to an invalid S3 path should raise + import botocore + + # GH 34087 + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html + # Catch a ClientError since AWS Service Errors are defined dynamically + error = (FileNotFoundError, botocore.exceptions.ClientError) + + with pytest.raises(error, match="The specified bucket does not exist"): + tips_df.to_parquet("s3://an_s3_bucket_data_doesnt_exit/not_real.parquet") + def test_read_csv_handles_boto_s3_object(self, s3_resource, tips_file): # see gh-16135 diff --git a/pandas/tests/io/test_gcs.py b/pandas/tests/io/test_gcs.py index 557a9d5c13987..cf745fcc492a1 100644 --- a/pandas/tests/io/test_gcs.py +++ b/pandas/tests/io/test_gcs.py @@ -56,7 +56,15 @@ def open(*args): monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem) df1.to_csv("gs://test/test.csv", index=True) - df2 = read_csv(StringIO(s.getvalue()), parse_dates=["dt"], index_col=0) + + def mock_get_filepath_or_buffer(*args, **kwargs): + return StringIO(df1.to_csv()), None, None, False + + monkeypatch.setattr( + "pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer + ) + + df2 = read_csv("gs://test/test.csv", parse_dates=["dt"], index_col=0) tm.assert_frame_equal(df1, df2) @@ -86,28 +94,6 @@ def open(self, path, mode="r", *args): ) -@td.skip_if_no("gcsfs") -def test_gcs_get_filepath_or_buffer(monkeypatch): - df1 = DataFrame( - { - "int": [1, 3], - "float": [2.0, np.nan], - "str": ["t", "s"], - "dt": date_range("2018-06-18", periods=2), - } - ) - - def mock_get_filepath_or_buffer(*args, **kwargs): - return (StringIO(df1.to_csv(index=False)), None, None, False) - - monkeypatch.setattr( - "pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer - ) - df2 = read_csv("gs://test/test.csv", parse_dates=["dt"]) - - tm.assert_frame_equal(df1, df2) - - @td.skip_if_installed("gcsfs") def test_gcs_not_present_exception(): with pytest.raises(ImportError) as e: diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index f8a6aba1b387c..d1bdf1209a737 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,7 +1,6 @@ """ test parquet compat """ import datetime from distutils.version import LooseVersion -import locale import os from warnings import catch_warnings @@ -130,6 +129,7 @@ def check_round_trip( read_kwargs=None, expected=None, check_names=True, + check_like=False, repeat=2, ): """Verify parquet serializer and deserializer produce the same results. @@ -149,6 +149,8 @@ def check_round_trip( Expected deserialization result, otherwise will be equal to `df` check_names: list of str, optional Closed set of column names to be compared + check_like: bool, optional + If True, ignore the order of index & columns. repeat: int, optional How many times to repeat the test """ @@ -169,7 +171,9 @@ def compare(repeat): with catch_warnings(record=True): actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal( + expected, actual, check_names=check_names, check_like=check_like + ) if path is None: with tm.ensure_clean() as path: @@ -485,15 +489,37 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) - # GH#33077 2020-03-27 - @pytest.mark.xfail( - locale.getlocale()[0] == "zh_CN", - reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'", - ) def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") + @td.skip_if_no("s3fs") + @pytest.mark.parametrize("partition_col", [["A"], []]) + def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): + from pandas.io.s3 import get_fs as get_s3_fs + + # GH #26388 + # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 + # As per pyarrow partitioned columns become 'categorical' dtypes + # and are added to back of dataframe on read + + expected_df = df_compat.copy() + if partition_col: + expected_df[partition_col] = expected_df[partition_col].astype("category") + check_round_trip( + df_compat, + pa, + expected=expected_df, + path="s3://pandas-test/parquet_dir", + write_kwargs={ + "partition_cols": partition_col, + "compression": None, + "filesystem": get_s3_fs(), + }, + check_like=True, + repeat=1, + ) + def test_partition_cols_supported(self, pa, df_full): # GH #23283 partition_cols = ["bool", "int"]