Skip to content

Commit ab6ee34

Browse files
levakinmarcboeker
andauthored
Feature: query with Apache Arrow result (#134)
* appender: fix typo * appender: remove unused error * make duckdb.Conn exported to be used with conn.Raw method in std SQL package * appender: remove not needed parenthesis * Implement Apache Arrow interface and Arrow type structs Added and defined structures for Arrow schema and Arrow array in a new file "arrow.h". This allows Arrow-compatible communication and data sharing, which is memory-efficient. Implemented Arrow interface in the connection.go file. Also updated the 'duckdb_test.go' file to include tests for the Arrow interface. * call queryArrowArray only when there are more rows to get * Rename QueryArrowContext to QueryArrow Renames the function QueryArrowContext to QueryArrow. This change is to simplify the function name as 'Context' in the name isn't required or adds any meaning. * prepareConfig: refactor to free option string pointers * use calloc instead of malloc to avoid potential bug golang/go#19928 * arrow: separate DuckDB Arrow interface - conn structure was made unexported - NewConnector was renamed to OpenConnector to be consistent with sql package and now returns ConnectorCloser interface to expose Close method - appender: NewAppenderFromConn receives connection of type any to be compatible with sql.Conn.Raw method. Tests and docs were adopted as well. * Refactor to use driver.Conn instead of any. * styleguide fixes * arrow: remove Query method leaving only QueryContext * Simplifications on pointer handling * Fix tests * Fix indentation * Fix formatting --------- Co-authored-by: Marc Boeker <[email protected]>
1 parent a2db32b commit ab6ee34

File tree

8 files changed

+438
-24
lines changed

8 files changed

+438
-24
lines changed

README.md

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,33 +70,65 @@ Please refer to the [database/sql](https://godoc.org/database/sql) GoDoc for fur
7070
If you want to use the [DuckDB Appender API](https://duckdb.org/docs/data/appender.html), you can obtain a new Appender by supplying a DuckDB connection to `NewAppenderFromConn()`.
7171

7272
```go
73-
connector, err := NewConnector("test.db", nil)
74-
if err != {
73+
connector, err := duckdb.NewConnector("test.db", nil)
74+
if err != nil {
7575
...
7676
}
7777
conn, err := connector.Connect(context.Background())
78-
if err != {
78+
if err != nil {
7979
...
8080
}
8181
defer conn.Close()
8282

8383
// Retrieve appender from connection (note that you have to create the table 'test' beforehand).
8484
appender, err := NewAppenderFromConn(conn, "", "test")
85-
if err != {
85+
if err != nil {
8686
...
8787
}
8888
defer appender.Close()
8989

9090
err = appender.AppendRow(...)
91-
if err != {
91+
if err != nil {
9292
...
9393
}
9494

9595
// Optional, if you want to access the appended rows immediately.
9696
err = appender.Flush()
97-
if err != {
97+
if err != nil {
98+
...
99+
}
100+
```
101+
102+
## DuckDB Apache Arrow Interface
103+
104+
If you want to use the [DuckDB Arrow Interface](https://duckdb.org/docs/api/c/api#arrow-interface), you can obtain a new Arrow by supplying a DuckDB connection to `NewArrowFromConn()`.
105+
106+
```go
107+
connector, err := duckdb.NewConnector("", nil)
108+
if err != nil {
109+
...
110+
}
111+
conn, err := connector.Connect(context.Background())
112+
if err != nil {
113+
...
114+
}
115+
defer conn.Close()
116+
117+
// Retrieve Arrow from connection.
118+
ar, err := duckdb.NewArrowFromConn(conn)
119+
if err != nil {
98120
...
99121
}
122+
123+
rdr, err := ar.QueryContext(context.Background(), "SELECT * FROM generate_series(1, 10)")
124+
if err != nil {
125+
...
126+
}
127+
defer rdr.Release()
128+
129+
for rdr.Next() {
130+
// Process records.
131+
}
100132
```
101133

102134
## Linking DuckDB

appender.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Appender struct {
3030
}
3131

3232
// NewAppenderFromConn returns a new Appender from a DuckDB driver connection.
33-
func NewAppenderFromConn(driverConn driver.Conn, schema string, table string) (*Appender, error) {
33+
func NewAppenderFromConn(driverConn driver.Conn, schema, table string) (*Appender, error) {
3434
dbConn, ok := driverConn.(*conn)
3535
if !ok {
3636
return nil, fmt.Errorf("not a duckdb driver connection")
@@ -87,7 +87,7 @@ func (a *Appender) Flush() error {
8787
return nil
8888
}
8989

90-
// Closes closes the appender.
90+
// Close closes the appender.
9191
func (a *Appender) Close() error {
9292
if a.closed {
9393
panic("database/sql/driver: misuse of duckdb driver: double Close of Appender")
@@ -277,5 +277,3 @@ func (a *Appender) appendChunks() error {
277277
}
278278
return nil
279279
}
280-
281-
var errCouldNotAppend = errors.New("could not append parameter")

appender_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ func TestAppender(t *testing.T) {
130130
err = appender.Flush()
131131
require.NoError(t, err)
132132
}
133+
133134
err := appender.AppendRow(
134135
row.ID,
135136
row.UUID,

arrow.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package duckdb
2+
3+
/*
4+
#include <duckdb.h>
5+
#include <stdint.h>
6+
7+
#ifndef ARROW_C_DATA_INTERFACE
8+
#define ARROW_C_DATA_INTERFACE
9+
10+
#define ARROW_FLAG_DICTIONARY_ORDERED 1
11+
#define ARROW_FLAG_NULLABLE 2
12+
#define ARROW_FLAG_MAP_KEYS_SORTED 4
13+
14+
struct ArrowSchema {
15+
// Array type description
16+
const char* format;
17+
const char* name;
18+
const char* metadata;
19+
int64_t flags;
20+
int64_t n_children;
21+
struct ArrowSchema** children;
22+
struct ArrowSchema* dictionary;
23+
24+
// Release callback
25+
void (*release)(struct ArrowSchema*);
26+
// Opaque producer-specific data
27+
void* private_data;
28+
};
29+
30+
struct ArrowArray {
31+
// Array data description
32+
int64_t length;
33+
int64_t null_count;
34+
int64_t offset;
35+
int64_t n_buffers;
36+
int64_t n_children;
37+
const void** buffers;
38+
struct ArrowArray** children;
39+
struct ArrowArray* dictionary;
40+
41+
// Release callback
42+
void (*release)(struct ArrowArray*);
43+
// Opaque producer-specific data
44+
void* private_data;
45+
};
46+
47+
#endif // ARROW_C_DATA_INTERFACE
48+
*/
49+
import "C"
50+
51+
import (
52+
"context"
53+
"database/sql/driver"
54+
"errors"
55+
"fmt"
56+
"unsafe"
57+
58+
"github.com/apache/arrow/go/v14/arrow"
59+
"github.com/apache/arrow/go/v14/arrow/array"
60+
"github.com/apache/arrow/go/v14/arrow/cdata"
61+
)
62+
63+
// Arrow exposes DuckDB Apache Arrow interface.
64+
// https://duckdb.org/docs/api/c/api#arrow-interface
65+
type Arrow struct {
66+
c *conn
67+
}
68+
69+
// NewArrowFromConn returns a new Arrow from a DuckDB driver connection.
70+
func NewArrowFromConn(driverConn driver.Conn) (*Arrow, error) {
71+
dbConn, ok := driverConn.(*conn)
72+
if !ok {
73+
return nil, fmt.Errorf("not a duckdb driver connection")
74+
}
75+
76+
if dbConn.closed {
77+
panic("database/sql/driver: misuse of duckdb driver: Arrow after Close")
78+
}
79+
80+
return &Arrow{c: dbConn}, nil
81+
}
82+
83+
// QueryContext prepares statements, executes them, returns Apache Arrow array.RecordReader as a result of the last
84+
// executed statement. Arguments are bound to the last statement.
85+
func (a *Arrow) QueryContext(ctx context.Context, query string, args ...any) (array.RecordReader, error) {
86+
if a.c.closed {
87+
panic("database/sql/driver: misuse of duckdb driver: Arrow.Query after Close")
88+
}
89+
90+
stmts, size, err := a.c.extractStmts(query)
91+
if err != nil {
92+
return nil, err
93+
}
94+
defer C.duckdb_destroy_extracted(&stmts)
95+
96+
// execute all statements without args, except the last one
97+
for i := C.idx_t(0); i < size-1; i++ {
98+
stmt, err := a.c.prepareExtractedStmt(stmts, i)
99+
if err != nil {
100+
return nil, err
101+
}
102+
// send nil args to execute statement and ignore result (using ExecContext since we're ignoring the result anyway)
103+
_, err = stmt.ExecContext(ctx, nil)
104+
stmt.Close()
105+
if err != nil {
106+
return nil, err
107+
}
108+
}
109+
110+
// prepare and execute last statement with args and return result
111+
stmt, err := a.c.prepareExtractedStmt(stmts, size-1)
112+
if err != nil {
113+
return nil, err
114+
}
115+
defer stmt.Close()
116+
117+
res, err := a.execute(stmt, a.anyArgsToNamedArgs(args))
118+
if err != nil {
119+
return nil, err
120+
}
121+
defer C.duckdb_destroy_arrow(res)
122+
123+
sc, err := a.queryArrowSchema(res)
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
var recs []arrow.Record
129+
defer func() {
130+
for _, r := range recs {
131+
r.Release()
132+
}
133+
}()
134+
135+
rowCount := uint64(C.duckdb_arrow_row_count(*res))
136+
137+
var retrievedRows uint64
138+
139+
for retrievedRows < rowCount {
140+
select {
141+
case <-ctx.Done():
142+
return nil, ctx.Err()
143+
default:
144+
}
145+
146+
rec, err := a.queryArrowArray(res, sc)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
recs = append(recs, rec)
152+
153+
retrievedRows += uint64(rec.NumRows())
154+
}
155+
156+
return array.NewRecordReader(sc, recs)
157+
}
158+
159+
// queryArrowSchema fetches the internal arrow schema from the arrow result.
160+
func (a *Arrow) queryArrowSchema(res *C.duckdb_arrow) (*arrow.Schema, error) {
161+
schema := C.calloc(1, C.sizeof_struct_ArrowSchema)
162+
defer func() {
163+
cdata.ReleaseCArrowSchema((*cdata.CArrowSchema)(schema))
164+
C.free(schema)
165+
}()
166+
167+
if state := C.duckdb_query_arrow_schema(
168+
*res,
169+
(*C.duckdb_arrow_schema)(unsafe.Pointer(&schema)),
170+
); state == C.DuckDBError {
171+
return nil, errors.New("duckdb_query_arrow_schema")
172+
}
173+
174+
sc, err := cdata.ImportCArrowSchema((*cdata.CArrowSchema)(schema))
175+
if err != nil {
176+
return nil, fmt.Errorf("%w: ImportCArrowSchema", err)
177+
}
178+
179+
return sc, nil
180+
}
181+
182+
// queryArrowArray fetches an internal arrow array from the arrow result.
183+
//
184+
// This function can be called multiple time to get next chunks,
185+
// which will free the previous out_array.
186+
func (a *Arrow) queryArrowArray(res *C.duckdb_arrow, sc *arrow.Schema) (arrow.Record, error) {
187+
arr := C.calloc(1, C.sizeof_struct_ArrowArray)
188+
defer func() {
189+
cdata.ReleaseCArrowArray((*cdata.CArrowArray)(arr))
190+
C.free(arr)
191+
}()
192+
193+
if state := C.duckdb_query_arrow_array(
194+
*res,
195+
(*C.duckdb_arrow_array)(unsafe.Pointer(&arr)),
196+
); state == C.DuckDBError {
197+
return nil, errors.New("duckdb_query_arrow_array")
198+
}
199+
200+
rec, err := cdata.ImportCRecordBatchWithSchema((*cdata.CArrowArray)(arr), sc)
201+
if err != nil {
202+
return nil, fmt.Errorf("%w: ImportCRecordBatchWithSchema", err)
203+
}
204+
205+
return rec, nil
206+
}
207+
208+
func (a *Arrow) execute(s *stmt, args []driver.NamedValue) (*C.duckdb_arrow, error) {
209+
if s.closed {
210+
panic("database/sql/driver: misuse of duckdb driver: executeArrow after Close")
211+
}
212+
213+
if err := s.start(args); err != nil {
214+
return nil, err
215+
}
216+
217+
var res C.duckdb_arrow
218+
if state := C.duckdb_execute_prepared_arrow(*s.stmt, &res); state == C.DuckDBError {
219+
dbErr := C.GoString(C.duckdb_query_arrow_error(res))
220+
C.duckdb_destroy_arrow(&res)
221+
return nil, fmt.Errorf("duckdb_execute_prepared_arrow: %v", dbErr)
222+
}
223+
224+
return &res, nil
225+
}
226+
227+
func (a *Arrow) anyArgsToNamedArgs(args []any) []driver.NamedValue {
228+
if len(args) == 0 {
229+
return nil
230+
}
231+
232+
values := make([]driver.Value, len(args))
233+
for i, arg := range args {
234+
values[i] = arg
235+
}
236+
237+
return argsToNamedArgs(values)
238+
}

0 commit comments

Comments
 (0)