8
8
import typing as t
9
9
from pathlib import Path
10
10
11
+ import requests
11
12
import sqlalchemy as sa
12
13
import sqlparse
13
14
from boltons .urlutils import URL
15
+ from crate import client as crate_client
14
16
from cratedb_sqlparse import sqlparse as sqlparse_cratedb
15
17
from sqlalchemy .exc import ProgrammingError
16
18
from sqlalchemy .sql .elements import AsBoolean
@@ -52,9 +54,16 @@ def __init__(self, dburi: str, echo: bool = False, internal: bool = False, jwt:
52
54
raise ValueError ("Database URI must be specified" )
53
55
if dburi .startswith ("crate://" ):
54
56
self .dburi = dburi
57
+ # Detect native override
58
+ self .native = "native=true" in dburi .lower ()
59
+ self .native_url = dburi .replace ("crate://" , "https://" ).split ("?" )[0 ]
60
+ self .dburi_clean = dburi .replace ("?native=True" , "" ).replace ("&native=True" , "" )
55
61
else :
56
62
address = DatabaseAddress .from_string (dburi )
57
63
self .dburi = address .dburi
64
+ self .native = False
65
+ self .dburi_clean = self .dburi
66
+
58
67
self .internal = internal
59
68
self .jwt = jwt
60
69
self .ctx : contextlib .AbstractContextManager
@@ -63,10 +72,16 @@ def __init__(self, dburi: str, echo: bool = False, internal: bool = False, jwt:
63
72
else :
64
73
self .ctx = contextlib .nullcontext ()
65
74
with self .ctx :
66
- self .engine = sa .create_engine (self .dburi , echo = echo )
67
- # TODO: Make that go away.
68
- logger .debug (f"Connecting to CrateDB: { dburi } " )
69
- self .connection = self .engine .connect ()
75
+ if self .native :
76
+ self .native_connection = crate_client .connect (
77
+ [self .dburi_clean .replace ("crate://" , "https://" )],
78
+ verify_ssl_cert = False ,
79
+ )
80
+ logger .debug (f"[Native] Connecting to CrateDB: { self .dburi_clean } " )
81
+ else :
82
+ self .engine = sa .create_engine (self .dburi_clean , echo = echo )
83
+ logger .debug (f"[SQLAlchemy] Connecting to CrateDB: { self .dburi_clean } " )
84
+ self .connection = self .engine .connect ()
70
85
71
86
@staticmethod
72
87
def quote_relation_name (ident : str ) -> str :
@@ -137,33 +152,41 @@ def run_sql(
137
152
return None
138
153
139
154
def run_sql_real (self , sql : str , parameters : t .Mapping [str , str ] = None , records : bool = False ):
140
- """
141
- Invoke an SQL statement and return results.
142
- """
143
155
results = []
144
156
for statement in sqlparse .split (sql ):
145
157
if self .internal :
146
158
statement += self .internal_tag
147
- # FIXME: Persistent self.connection risks leaks & thread-unsafety.
148
- # https://github.com/crate/cratedb-toolkit/pull/81#discussion_r2071499204
149
- with self .ctx :
150
- result = self .connection .execute (sa .text (statement ), parameters )
151
- data : t .Any
152
- if result .returns_rows :
159
+
160
+ if self .native :
161
+ # Make a native HTTP POST request
162
+ response = requests .post (
163
+ self .native_url + "/_sql" ,
164
+ verify = False ,
165
+ json = {"stmt" : statement , "args" : list (parameters .values ()) if parameters else []},
166
+ headers = {"Content-Type" : "application/json" },
167
+ )
168
+ response .raise_for_status ()
169
+ result = response .json ()
170
+
153
171
if records :
154
- rows = result .mappings ().fetchall ()
155
- data = [dict (row .items ()) for row in rows ]
172
+ data = [dict (zip (result ["cols" ], row )) for row in result ["rows" ]]
156
173
else :
157
- data = result . fetchall ()
174
+ data = result [ "rows" ]
158
175
else :
159
- data = None
176
+ with self .ctx :
177
+ result = self .connection .execute (sa .text (statement ), parameters )
178
+ if result .returns_rows :
179
+ if records :
180
+ rows = result .mappings ().fetchall ()
181
+ data = [dict (row .items ()) for row in rows ]
182
+ else :
183
+ data = result .fetchall ()
184
+ else :
185
+ data = None
186
+
160
187
results .append (data )
161
188
162
- # Backward-compatibility.
163
- if len (results ) == 1 :
164
- return results [0 ]
165
- else :
166
- return results
189
+ return results [0 ] if len (results ) == 1 else results
167
190
168
191
def count_records (self , name : str , errors : Literal ["raise" , "ignore" ] = "raise" , where : str = "" ):
169
192
"""
0 commit comments