Skip to content

Commit 0ebf60a

Browse files
authored
FIX: close BigQuery Storage client transport channel after use (#295)
* FIX: close BigQuery Storage client transport channel after use This fixes a file descriptor leak. * blacken
1 parent 05c68c4 commit 0ebf60a

File tree

2 files changed

+25
-18
lines changed

2 files changed

+25
-18
lines changed

noxfile.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
supported_pythons = ["3.5", "3.6", "3.7"]
1414
latest_python = "3.7"
1515

16+
# Use a consistent version of black so CI is deterministic.
17+
black_package = "black==19.10b0"
18+
1619

1720
@nox.session
1821
def lint(session, python=latest_python):
19-
session.install("black", "flake8")
22+
session.install(black_package, "flake8")
2023
session.install("-e", ".")
2124
session.run("flake8", "pandas_gbq")
2225
session.run("flake8", "tests")
@@ -25,7 +28,7 @@ def lint(session, python=latest_python):
2528

2629
@nox.session(python=latest_python)
2730
def blacken(session):
28-
session.install("black")
31+
session.install(black_package)
2932
session.run("black", ".")
3033

3134

pandas_gbq/gbq.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -370,9 +370,7 @@ def __init__(
370370
context.project = self.project_id
371371

372372
self.client = self.get_client()
373-
self.bqstorage_client = _make_bqstorage_client(
374-
use_bqstorage_api, self.credentials
375-
)
373+
self.use_bqstorage_api = use_bqstorage_api
376374

377375
# BQ Queries costs $5 per TB. First 1 TB per month is free
378376
# see here for more: https://cloud.google.com/bigquery/pricing
@@ -541,29 +539,35 @@ def _download_results(
541539
if max_results == 0:
542540
return None
543541

544-
if max_results is None:
545-
# Only use the BigQuery Storage API if the full result set is requested.
546-
bqstorage_client = self.bqstorage_client
547-
else:
542+
try:
548543
bqstorage_client = None
544+
if max_results is None:
545+
# Only use the BigQuery Storage API if the full result set is requested.
546+
bqstorage_client = _make_bqstorage_client(
547+
self.use_bqstorage_api, self.credentials
548+
)
549549

550-
try:
551550
query_job.result()
552551
# Get the table schema, so that we can list rows.
553552
destination = self.client.get_table(query_job.destination)
554553
rows_iter = self.client.list_rows(
555554
destination, max_results=max_results
556555
)
556+
557+
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
558+
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
559+
df = rows_iter.to_dataframe(
560+
dtypes=nullsafe_dtypes,
561+
bqstorage_client=bqstorage_client,
562+
progress_bar_type=progress_bar_type,
563+
)
557564
except self.http_error as ex:
558565
self.process_http_error(ex)
559-
560-
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
561-
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
562-
df = rows_iter.to_dataframe(
563-
dtypes=nullsafe_dtypes,
564-
bqstorage_client=bqstorage_client,
565-
progress_bar_type=progress_bar_type,
566-
)
566+
finally:
567+
if bqstorage_client:
568+
# Clean up open socket resources. See:
569+
# https://github.com/pydata/pandas-gbq/issues/294
570+
bqstorage_client.transport.channel.close()
567571

568572
if df.empty:
569573
df = _cast_empty_df_dtypes(schema_fields, df)

0 commit comments

Comments
 (0)