Skip to content

Backport PR #33645, #33632 and #34087 on branch 1.0.x #34173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/source/whatsnew/v1.0.4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ Bug fixes
~~~~~~~~~
- Bug in :meth:`SeriesGroupBy.first`, :meth:`SeriesGroupBy.last`, :meth:`SeriesGroupBy.min`, and :meth:`SeriesGroupBy.max` returning floats when applied to nullable Booleans (:issue:`33071`)
- Bug in :meth:`Rolling.min` and :meth:`Rolling.max`: Growing memory usage after multiple calls when using a fixed window (:issue:`30726`)
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`)
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`)
- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`)
- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`)
-

Contributors
Expand Down
27 changes: 27 additions & 0 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,33 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def get_fs_for_path(filepath: str):
"""
Get appropriate filesystem given a filepath.
Supports s3fs, gcs and local file system.

Parameters
----------
filepath : str
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj

Returns
-------
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
Appropriate FileSystem to use. None for local filesystem.
"""
if is_s3_url(filepath):
from pandas.io import s3

return s3.get_fs()
elif is_gcs_url(filepath):
from pandas.io import gcs

return gcs.get_fs()
else:
return None


def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
Expand Down
4 changes: 3 additions & 1 deletion pandas/io/formats/csvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
# Extract compression mode as given, if dict
compression, self.compression_args = get_compression_method(compression)

self.path_or_buf, _, _, _ = get_filepath_or_buffer(
self.path_or_buf, _, _, self.should_close = get_filepath_or_buffer(
path_or_buf, encoding=encoding, compression=compression, mode=mode
)
self.sep = sep
Expand Down Expand Up @@ -224,6 +224,8 @@ def save(self) -> None:
f.close()
for _fh in handles:
_fh.close()
elif self.should_close:
f.close()

def _save_header(self):
writer = self.writer
Expand Down
6 changes: 5 additions & 1 deletion pandas/io/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
)


def get_fs():
return gcsfs.GCSFileSystem()


def get_filepath_or_buffer(
filepath_or_buffer, encoding=None, compression=None, mode=None
):

if mode is None:
mode = "rb"

fs = gcsfs.GCSFileSystem()
fs = get_fs()
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
return filepath_or_buffer, None, compression, True
31 changes: 18 additions & 13 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from pandas import DataFrame, get_option

from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
from pandas.io.common import (
get_filepath_or_buffer,
get_fs_for_path,
is_gcs_url,
is_s3_url,
)


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -92,13 +97,15 @@ def write(
**kwargs,
):
self.validate_dataframe(df)
path, _, _, _ = get_filepath_or_buffer(path, mode="wb")
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")

from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
if index is not None:
from_pandas_kwargs["preserve_index"] = index

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
# write_to_dataset does not support a file-like object when
# a dircetory path is used, so just pass the path string.
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table,
Expand All @@ -111,22 +118,20 @@ def write(
else:
self.api.parquet.write_table(
table,
path,
file_obj_or_path,
compression=compression,
coerce_timestamps=coerce_timestamps,
**kwargs,
)

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)

kwargs["use_pandas_metadata"] = True
result = self.api.parquet.read_table(
path, columns=columns, **kwargs
).to_pandas()
if should_close:
path.close()
file_obj_or_path.close()

def read(self, path, columns=None, **kwargs):
parquet_ds = self.api.parquet.ParquetDataset(
path, filesystem=get_fs_for_path(path), **kwargs
)
kwargs["columns"] = columns
result = parquet_ds.read_pandas(**kwargs).to_pandas()
return result


Expand Down Expand Up @@ -281,7 +286,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables``
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``

If you want to pass in a path object, pandas accepts any
``os.PathLike``.
Expand Down
8 changes: 6 additions & 2 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def _strip_schema(url):
return result.netloc + result.path


def get_fs():
return s3fs.S3FileSystem(anon=False)


def get_file_and_filesystem(
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
) -> Tuple[IO, Any]:
Expand All @@ -24,7 +28,7 @@ def get_file_and_filesystem(
if mode is None:
mode = "rb"

fs = s3fs.S3FileSystem(anon=False)
fs = get_fs()
try:
file = fs.open(_strip_schema(filepath_or_buffer), mode)
except (FileNotFoundError, NoCredentialsError):
Expand All @@ -34,7 +38,7 @@ def get_file_and_filesystem(
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
fs = get_fs()
file = fs.open(_strip_schema(filepath_or_buffer), mode)
return file, fs

Expand Down
31 changes: 29 additions & 2 deletions pandas/tests/io/parser/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def tips_df(datapath):
@pytest.mark.usefixtures("s3_resource")
@td.skip_if_not_us_locale()
class TestS3:
@td.skip_if_no("s3fs")
def test_parse_public_s3_bucket(self, tips_df):
pytest.importorskip("s3fs")

# more of an integration test due to the not-public contents portion
# can probably mock this though.
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_parse_public_s3_bucket_nrows_python(self, tips_df):
assert not df.empty
tm.assert_frame_equal(tips_df.iloc[:10], df)

def test_s3_fails(self):
def test_read_s3_fails(self):
with pytest.raises(IOError):
read_csv("s3://nyqpug/asdf.csv")

Expand All @@ -168,6 +168,33 @@ def test_s3_fails(self):
with pytest.raises(IOError):
read_csv("s3://cant_get_it/file.csv")

def test_write_s3_csv_fails(self, tips_df):
# GH 32486
# Attempting to write to an invalid S3 path should raise
import botocore

# GH 34087
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
# Catch a ClientError since AWS Service Errors are defined dynamically
error = (FileNotFoundError, botocore.exceptions.ClientError)

with pytest.raises(error, match="The specified bucket does not exist"):
tips_df.to_csv("s3://an_s3_bucket_data_doesnt_exit/not_real.csv")

@td.skip_if_no("pyarrow")
def test_write_s3_parquet_fails(self, tips_df):
# GH 27679
# Attempting to write to an invalid S3 path should raise
import botocore

# GH 34087
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
# Catch a ClientError since AWS Service Errors are defined dynamically
error = (FileNotFoundError, botocore.exceptions.ClientError)

with pytest.raises(error, match="The specified bucket does not exist"):
tips_df.to_parquet("s3://an_s3_bucket_data_doesnt_exit/not_real.parquet")

def test_read_csv_handles_boto_s3_object(self, s3_resource, tips_file):
# see gh-16135

Expand Down
32 changes: 9 additions & 23 deletions pandas/tests/io/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ def open(*args):

monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem)
df1.to_csv("gs://test/test.csv", index=True)
df2 = read_csv(StringIO(s.getvalue()), parse_dates=["dt"], index_col=0)

def mock_get_filepath_or_buffer(*args, **kwargs):
return StringIO(df1.to_csv()), None, None, False

monkeypatch.setattr(
"pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer
)

df2 = read_csv("gs://test/test.csv", parse_dates=["dt"], index_col=0)

tm.assert_frame_equal(df1, df2)

Expand Down Expand Up @@ -86,28 +94,6 @@ def open(self, path, mode="r", *args):
)


@td.skip_if_no("gcsfs")
def test_gcs_get_filepath_or_buffer(monkeypatch):
df1 = DataFrame(
{
"int": [1, 3],
"float": [2.0, np.nan],
"str": ["t", "s"],
"dt": date_range("2018-06-18", periods=2),
}
)

def mock_get_filepath_or_buffer(*args, **kwargs):
return (StringIO(df1.to_csv(index=False)), None, None, False)

monkeypatch.setattr(
"pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer
)
df2 = read_csv("gs://test/test.csv", parse_dates=["dt"])

tm.assert_frame_equal(df1, df2)


@td.skip_if_installed("gcsfs")
def test_gcs_not_present_exception():
with pytest.raises(ImportError) as e:
Expand Down
40 changes: 33 additions & 7 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
""" test parquet compat """
import datetime
from distutils.version import LooseVersion
import locale
import os
from warnings import catch_warnings

Expand Down Expand Up @@ -130,6 +129,7 @@ def check_round_trip(
read_kwargs=None,
expected=None,
check_names=True,
check_like=False,
repeat=2,
):
"""Verify parquet serializer and deserializer produce the same results.
Expand All @@ -149,6 +149,8 @@ def check_round_trip(
Expected deserialization result, otherwise will be equal to `df`
check_names: list of str, optional
Closed set of column names to be compared
check_like: bool, optional
If True, ignore the order of index & columns.
repeat: int, optional
How many times to repeat the test
"""
Expand All @@ -169,7 +171,9 @@ def compare(repeat):
with catch_warnings(record=True):
actual = read_parquet(path, **read_kwargs)

tm.assert_frame_equal(expected, actual, check_names=check_names)
tm.assert_frame_equal(
expected, actual, check_names=check_names, check_like=check_like
)

if path is None:
with tm.ensure_clean() as path:
Expand Down Expand Up @@ -485,15 +489,37 @@ def test_categorical(self, pa):
expected = df.astype(object)
check_round_trip(df, pa, expected=expected)

# GH#33077 2020-03-27
@pytest.mark.xfail(
locale.getlocale()[0] == "zh_CN",
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
)
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")

@td.skip_if_no("s3fs")
@pytest.mark.parametrize("partition_col", [["A"], []])
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
from pandas.io.s3 import get_fs as get_s3_fs

# GH #26388
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
# As per pyarrow partitioned columns become 'categorical' dtypes
# and are added to back of dataframe on read

expected_df = df_compat.copy()
if partition_col:
expected_df[partition_col] = expected_df[partition_col].astype("category")
check_round_trip(
df_compat,
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={
"partition_cols": partition_col,
"compression": None,
"filesystem": get_s3_fs(),
},
check_like=True,
repeat=1,
)

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ["bool", "int"]
Expand Down