Skip to content

Commit 8c95ced

Browse files
authored
Merge pull request #187 from alexander-beedie/main
Automatically select 'cursor' protocol for redshift-prefixed connection strings
2 parents 5adc0a7 + 83be51a commit 8c95ced

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

connectorx-python/connectorx/__init__.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,47 +20,50 @@ def read_sql(
2020
query: Union[List[str], str],
2121
*,
2222
return_type: str = "pandas",
23-
protocol: str = "binary",
23+
protocol: Optional[str] = None,
2424
partition_on: Optional[str] = None,
2525
partition_range: Optional[Tuple[int, int]] = None,
2626
partition_num: Optional[int] = None,
2727
index_col: Optional[str] = None,
2828
):
2929
"""
30-
Run the SQL query, download the data from database into a Pandas dataframe.
30+
Run the SQL query, download the data from database into a dataframe.
3131
3232
Parameters
3333
==========
3434
conn
3535
the connection string.
3636
query
37-
a SQL query or a list of SQL query.
37+
a SQL query or a list of SQL queries.
3838
return_type
39-
the return type of this function. It can be "arrow", "pandas", "modin", "dask" or "polars".
39+
the return type of this function; one of "arrow", "pandas", "modin", "dask" or "polars".
40+
protocol
41+
backend-specific transfer protocol directive; defaults to 'binary' (except for redshift
42+
connection strings, where 'cursor' will be used instead).
4043
partition_on
41-
the column to partition the result.
44+
the column on which to partition the result.
4245
partition_range
4346
the value range of the partition column.
4447
partition_num
45-
how many partition to generate.
48+
how many partitions to generate.
4649
index_col
47-
the index column to set, only applicable for return type "pandas", "modin", "dask".
50+
the index column to set; only applicable for return type "pandas", "modin", "dask".
4851
4952
Examples
5053
========
51-
Read a DataFrame from a SQL using a single thread:
54+
Read a DataFrame from a SQL query using a single thread:
5255
5356
>>> postgres_url = "postgresql://username:password@server:port/database"
5457
>>> query = "SELECT * FROM lineitem"
5558
>>> read_sql(postgres_url, query)
5659
57-
Read a DataFrame parallelly using 10 threads by automatically partitioning the provided SQL on the partition column:
60+
Read a DataFrame in parallel using 10 threads by automatically partitioning the provided SQL on the partition column:
5861
5962
>>> postgres_url = "postgresql://username:password@server:port/database"
6063
>>> query = "SELECT * FROM lineitem"
6164
>>> read_sql(postgres_url, query, partition_on="partition_col", partition_num=10)
6265
63-
Read a DataFrame parallelly using 2 threads by manually providing two partition SQLs:
66+
Read a DataFrame in parallel using 2 threads by explicitly providing two SQL queries:
6467
6568
>>> postgres_url = "postgresql://username:password@server:port/database"
6669
>>> queries = ["SELECT * FROM lineitem WHERE partition_col <= 10", "SELECT * FROM lineitem WHERE partition_col > 10"]
@@ -93,6 +96,16 @@ def read_sql(
9396
else:
9497
raise ValueError("query must be either str or a list of str")
9598

99+
if not protocol:
100+
# note: redshift cannot currently use the faster binary/csv protocols.
101+
# set compatible protocol ('cursor') and masquerade as postgres.
102+
backend, connection_details = conn.split(":",1)
103+
if "redshift" in backend:
104+
conn = f"postgresql:{connection_details}"
105+
protocol = "cursor"
106+
else:
107+
protocol = "binary"
108+
96109
if return_type in {"modin", "dask", "pandas"}:
97110
try:
98111
import pandas

0 commit comments

Comments
 (0)