Skip to content

Commit a2db32b

Browse files
k-anshulmarcboeker
andauthored
Appender fixes (#147)
* appender fixes * appender fixes * some refactor * Fix indentation * review comments - use separate type for uuid * review comments - move UUID to types.go --------- Co-authored-by: Marc Boeker <[email protected]>
1 parent 495b2cf commit a2db32b

File tree

3 files changed

+81
-26
lines changed

3 files changed

+81
-26
lines changed

appender.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,23 @@ func (a *Appender) Error() error {
6767

6868
// Flush the appender to the underlying table and clear the internal cache.
6969
func (a *Appender) Flush() error {
70-
// set the size of the current chunk to the current row
71-
C.duckdb_data_chunk_set_size(a.chunks[a.currentChunkIdx], C.uint64_t(a.currentRow))
70+
if a.currentChunkIdx == 0 && a.currentRow == 0 {
71+
return nil
72+
}
7273

73-
// append all chunks to the appender and destroy them
74-
var state C.duckdb_state
75-
for i, chunk := range a.chunks {
76-
state = C.duckdb_append_data_chunk(*a.appender, chunk)
77-
if state == C.DuckDBError {
78-
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
79-
return fmt.Errorf("duckdb error appending chunk %d of %d: %s", i+1, a.currentChunkIdx+1, dbErr)
80-
}
81-
C.duckdb_destroy_data_chunk(&chunk)
74+
err := a.appendChunks()
75+
if err != nil {
76+
return err
8277
}
8378

8479
if state := C.duckdb_appender_flush(*a.appender); state == C.DuckDBError {
8580
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
8681
return errors.New(dbErr)
8782
}
8883

84+
a.currentRow = 0
85+
a.currentChunkIdx = 0
86+
a.chunks = a.chunks[:0]
8987
return nil
9088
}
9189

@@ -96,6 +94,12 @@ func (a *Appender) Close() error {
9694
}
9795

9896
a.closed = true
97+
// append chunks if not already done via flush
98+
if a.currentChunkIdx != 0 || a.currentRow != 0 {
99+
if err := a.appendChunks(); err != nil {
100+
return err
101+
}
102+
}
99103

100104
if state := C.duckdb_appender_destroy(a.appender); state == C.DuckDBError {
101105
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
@@ -161,6 +165,8 @@ func (a *Appender) initializeChunkTypes(args []driver.Value) {
161165
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BOOLEAN)
162166
case []byte:
163167
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BLOB)
168+
case UUID:
169+
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UUID)
164170
case string:
165171
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_VARCHAR)
166172
case time.Time:
@@ -236,6 +242,8 @@ func (a *Appender) appendRowArray(args []driver.Value) error {
236242
set[bool](a.chunkVectors[i], a.currentRow, v)
237243
case []byte:
238244
set[[]byte](a.chunkVectors[i], a.currentRow, v)
245+
case UUID:
246+
set[C.duckdb_hugeint](a.chunkVectors[i], a.currentRow, uuidToHugeInt(v))
239247
case string:
240248
str := C.CString(v)
241249
C.duckdb_vector_assign_string_element(a.chunkVectors[i], C.uint64_t(a.currentRow), str)
@@ -253,4 +261,21 @@ func (a *Appender) appendRowArray(args []driver.Value) error {
253261
return nil
254262
}
255263

264+
func (a *Appender) appendChunks() error {
265+
// set the size of the current chunk to the current row
266+
C.duckdb_data_chunk_set_size(a.chunks[a.currentChunkIdx], C.uint64_t(a.currentRow))
267+
268+
// append all chunks to the appender and destroy them
269+
var state C.duckdb_state
270+
for i, chunk := range a.chunks {
271+
state = C.duckdb_append_data_chunk(*a.appender, chunk)
272+
if state == C.DuckDBError {
273+
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
274+
return fmt.Errorf("duckdb error appending chunk %d of %d: %s", i+1, a.currentChunkIdx+1, dbErr)
275+
}
276+
C.duckdb_destroy_data_chunk(&chunk)
277+
}
278+
return nil
279+
}
280+
256281
var errCouldNotAppend = errors.New("could not append parameter")

appender_test.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,28 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/stretchr/testify/require"
1112
)
1213

1314
const (
1415
testAppenderTableDDL = `
1516
CREATE TABLE test(
16-
id BIGINT,
17-
uint8 UTINYINT,
18-
int8 TINYINT,
19-
uint16 USMALLINT,
20-
int16 SMALLINT,
17+
id BIGINT,
18+
uuid UUID,
19+
uint8 UTINYINT,
20+
int8 TINYINT,
21+
uint16 USMALLINT,
22+
int16 SMALLINT,
2123
uint32 UINTEGER,
22-
int32 INTEGER,
23-
uint64 UBIGINT,
24-
int64 BIGINT,
24+
int32 INTEGER,
25+
uint64 UBIGINT,
26+
int64 BIGINT,
2527
timestamp TIMESTAMP,
26-
float REAL,
27-
double DOUBLE,
28-
string VARCHAR,
29-
bool BOOLEAN
28+
float REAL,
29+
double DOUBLE,
30+
string VARCHAR,
31+
bool BOOLEAN
3032
)`
3133
)
3234

@@ -66,6 +68,7 @@ func TestAppender(t *testing.T) {
6668

6769
type dataRow struct {
6870
ID int
71+
UUID UUID
6972
UInt8 uint8
7073
Int8 int8
7174
UInt16 uint16
@@ -86,9 +89,16 @@ func TestAppender(t *testing.T) {
8689
if u64 > 9223372036854775807 {
8790
u64 = 9223372036854775807
8891
}
92+
93+
b, err := uuid.New().MarshalBinary()
94+
require.NoError(t, err)
95+
var uuidBytes [16]byte
96+
copy(uuidBytes[:], b)
97+
8998
return dataRow{
9099
ID: i,
91100
UInt8: uint8(randInt(0, 255)),
101+
UUID: UUID(uuidBytes),
92102
Int8: int8(randInt(-128, 127)),
93103
UInt16: uint16(randInt(0, 65535)),
94104
Int16: int16(randInt(-32768, 32767)),
@@ -114,11 +124,15 @@ func TestAppender(t *testing.T) {
114124

115125
appender, err := NewAppenderFromConn(conn, "", "test")
116126
require.NoError(t, err)
117-
defer appender.Close()
118127

119-
for _, row := range rows {
128+
for i, row := range rows {
129+
if i%1024 == 0 {
130+
err = appender.Flush()
131+
require.NoError(t, err)
132+
}
120133
err := appender.AppendRow(
121134
row.ID,
135+
row.UUID,
122136
row.UInt8,
123137
row.Int8,
124138
row.UInt16,
@@ -135,12 +149,13 @@ func TestAppender(t *testing.T) {
135149
)
136150
require.NoError(t, err)
137151
}
138-
err = appender.Flush()
152+
err = appender.Close()
139153
require.NoError(t, err)
140154

141155
res, err := db.QueryContext(
142156
context.Background(), `
143157
SELECT id,
158+
uuid,
144159
uint8,
145160
int8,
146161
uint16,
@@ -160,10 +175,12 @@ func TestAppender(t *testing.T) {
160175
defer res.Close()
161176

162177
i := 0
178+
var scannedUUID []byte
163179
for res.Next() {
164180
r := dataRow{}
165181
err := res.Scan(
166182
&r.ID,
183+
&scannedUUID,
167184
&r.UInt8,
168185
&r.Int8,
169186
&r.UInt16,
@@ -179,6 +196,7 @@ func TestAppender(t *testing.T) {
179196
&r.Bool,
180197
)
181198
require.NoError(t, err)
199+
copy(r.UUID[:], scannedUUID)
182200
require.Equal(t, rows[i], r)
183201
i++
184202
}

types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/mitchellh/mapstructure"
1414
)
1515

16+
type UUID [16]byte
17+
1618
// duckdb_hugeint is composed of (lower, upper) components.
1719
// The value is computed as: upper * 2^64 + lower
1820

@@ -24,6 +26,16 @@ func hugeIntToUUID(hi C.duckdb_hugeint) []byte {
2426
return uuid[:]
2527
}
2628

29+
func uuidToHugeInt(uuid UUID) C.duckdb_hugeint {
30+
var dt C.duckdb_hugeint
31+
upper := binary.BigEndian.Uint64(uuid[:8])
32+
// flip the sign bit
33+
upper = upper ^ (1 << 63)
34+
dt.upper = C.int64_t(upper)
35+
dt.lower = C.uint64_t(binary.BigEndian.Uint64(uuid[8:]))
36+
return dt
37+
}
38+
2739
func hugeIntToNative(hi C.duckdb_hugeint) *big.Int {
2840
i := big.NewInt(int64(hi.upper))
2941
i.Lsh(i, 64)

0 commit comments

Comments
 (0)