Skip to content

Commit 161da61

Browse files
committed
Shared index.
1 parent 636b105 commit 161da61

File tree

4 files changed

+137
-102
lines changed

4 files changed

+137
-102
lines changed

litestream/api.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@ func init() {
1919

2020
var (
2121
liteMtx sync.RWMutex
22-
// +checklocks:memoryMtx
23-
liteDBs = map[string]liteDB{}
22+
// +checklocks:liteMtx
23+
liteDBs = map[string]*liteDB{}
2424
)
2525

26-
type liteDB struct {
27-
client litestream.ReplicaClient
28-
opts *ReplicaOptions
29-
}
30-
3126
// ReplicaOptions represents options for [NewReplica].
3227
type ReplicaOptions struct {
3328
// Where to log error messages. May be nil.
@@ -53,7 +48,7 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt
5348

5449
liteMtx.Lock()
5550
defer liteMtx.Unlock()
56-
liteDBs[name] = liteDB{
51+
liteDBs[name] = &liteDB{
5752
client: client,
5853
opts: &options,
5954
}

litestream/go.mod

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,31 @@ go 1.24.4
55
require (
66
github.com/benbjohnson/litestream v0.5.0
77
github.com/ncruces/go-sqlite3 v0.29.1
8+
github.com/ncruces/wbt v0.2.0
89
github.com/superfly/ltx v0.5.0
910
)
1011

1112
require (
1213
filippo.io/age v1.1.1 // indirect
14+
github.com/aws/aws-sdk-go-v2 v1.37.1 // indirect
15+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
16+
github.com/aws/aws-sdk-go-v2/config v1.30.2 // indirect
17+
github.com/aws/aws-sdk-go-v2/credentials v1.18.2 // indirect
18+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.1 // indirect
19+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.2 // indirect
20+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.1 // indirect
21+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.1 // indirect
22+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
23+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 // indirect
24+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect
25+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 // indirect
26+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.1 // indirect
27+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 // indirect
28+
github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1 // indirect
29+
github.com/aws/aws-sdk-go-v2/service/sso v1.26.1 // indirect
30+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.31.1 // indirect
31+
github.com/aws/aws-sdk-go-v2/service/sts v1.35.1 // indirect
32+
github.com/aws/smithy-go v1.22.5 // indirect
1333
github.com/beorn7/perks v1.0.1 // indirect
1434
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1535
github.com/dustin/go-humanize v1.0.1 // indirect

litestream/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh
118118
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
119119
github.com/ncruces/julianday v1.0.0 h1:fH0OKwa7NWvniGQtxdJRxAgkBMolni2BjDHaWTxqt7M=
120120
github.com/ncruces/julianday v1.0.0/go.mod h1:Dusn2KvZrrovOMJuOt0TNXL6tB7U2E8kvza5fFc9G7g=
121+
github.com/ncruces/wbt v0.2.0 h1:Q9zlKOBSZc7Yy/R2cGa35g6RKUUE3BjNIW3tfGC4F04=
122+
github.com/ncruces/wbt v0.2.0/go.mod h1:DtF92amvMxH69EmBFUSFWRDAlo6hOEfoNQnClxj9C/c=
121123
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
122124
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
123125
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=

litestream/vfs.go

Lines changed: 112 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"sync"
910
"time"
1011

1112
"github.com/benbjohnson/litestream"
1213
"github.com/ncruces/go-sqlite3"
1314
"github.com/ncruces/go-sqlite3/util/vfsutil"
1415
"github.com/ncruces/go-sqlite3/vfs"
16+
"github.com/ncruces/wbt"
1517
"github.com/superfly/ltx"
1618
)
1719

@@ -30,21 +32,14 @@ func (liteVFS) Open(name string, flags vfs.OpenFlag) (vfs.File, vfs.OpenFlag, er
3032
liteMtx.RLock()
3133
defer liteMtx.RUnlock()
3234
if db, ok := liteDBs[name]; ok {
33-
f := liteFile{
34-
liteDB: db,
35-
txids: new(levelTXIDs),
36-
pages: map[uint32]ltx.PageIndexElem{},
37-
}
38-
3935
// Build the page index so we can lookup individual pages.
40-
if err := f.buildIndex(context.Background()); err != nil {
41-
f.opts.Logger.Error("build index", "error", err)
36+
if err := db.buildIndex(context.Background()); err != nil {
37+
db.opts.Logger.Error("build index", "error", err)
4238
return nil, 0, err
4339
}
44-
return &f, flags | vfs.OPEN_READONLY, nil
40+
return &liteFile{db: db}, flags | vfs.OPEN_READONLY, nil
4541
}
4642
return nil, flags, sqlite3.CANTOPEN
47-
4843
}
4944

5045
func (liteVFS) Delete(name string, dirSync bool) error {
@@ -62,22 +57,21 @@ func (liteVFS) FullPathname(name string) (string, error) {
6257
}
6358

6459
type liteFile struct {
65-
liteDB
66-
67-
conn *sqlite3.Conn
68-
txids *levelTXIDs
69-
pages map[uint32]ltx.PageIndexElem
70-
71-
lastPoll time.Time
72-
pageSize uint32
73-
pageCount uint32
74-
lock vfs.LockLevel
60+
db *liteDB
61+
conn *sqlite3.Conn
62+
pages *pageIndex
63+
txid ltx.TXID
64+
pageSize uint32
7565
}
7666

7767
func (f *liteFile) Close() error { return nil }
7868

7969
func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
80-
err = f.pollReplica()
70+
ctx := f.context()
71+
pages, txid := f.pages, f.txid
72+
if pages == nil {
73+
pages, txid, err = f.db.pollReplica(ctx)
74+
}
8175
if err != nil {
8276
return 0, err
8377
}
@@ -87,19 +81,14 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
8781
pgno += uint32(off / int64(f.pageSize))
8882
}
8983

90-
elem, ok := f.pages[pgno]
84+
elem, ok := pages.Get(pgno)
9185
if !ok {
9286
return 0, io.EOF
9387
}
9488

95-
ctx := context.Background()
96-
if f.conn != nil {
97-
ctx = f.conn.GetInterrupt()
98-
}
99-
100-
_, data, err := litestream.FetchPage(ctx, f.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
89+
_, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
10190
if err != nil {
102-
f.opts.Logger.Error("fetch page", "error", err)
91+
f.db.opts.Logger.Error("fetch page", "error", err)
10392
return 0, err
10493
}
10594

@@ -109,7 +98,7 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
10998
data[18] >= 1 && data[19] >= 1 &&
11099
data[18] <= 3 && data[19] <= 3 {
111100
data[18], data[19] = 0x01, 0x01
112-
binary.BigEndian.PutUint32(data[24:28], uint32(f.txids[0]))
101+
binary.BigEndian.PutUint32(data[24:28], uint32(txid))
113102
f.pageSize = uint32(256 * binary.LittleEndian.Uint16(data[16:18]))
114103
}
115104

@@ -133,23 +122,22 @@ func (f *liteFile) Sync(flag vfs.SyncFlag) error {
133122
}
134123

135124
func (f *liteFile) Size() (size int64, err error) {
136-
size = int64(f.pageCount) * int64(f.pageSize)
125+
if max := f.pages.Max(); max != nil {
126+
size = int64(max.Key()) * int64(f.pageSize)
127+
}
137128
return
138129
}
139130

140-
func (f *liteFile) Lock(lock vfs.LockLevel) error {
131+
func (f *liteFile) Lock(lock vfs.LockLevel) (err error) {
141132
if lock >= vfs.LOCK_RESERVED {
142133
return sqlite3.IOERR_LOCK
143134
}
144-
if err := f.pollReplica(); err != nil {
145-
return err
146-
}
147-
f.lock = max(f.lock, lock)
148-
return nil
135+
f.pages, f.txid, err = f.db.pollReplica(f.context())
136+
return err
149137
}
150138

151139
func (f *liteFile) Unlock(lock vfs.LockLevel) error {
152-
f.lock = min(f.lock, lock)
140+
f.pages, f.txid = nil, 0
153141
return nil
154142
}
155143

@@ -172,7 +160,32 @@ func (f *liteFile) SetDB(conn any) {
172160
f.conn = conn.(*sqlite3.Conn)
173161
}
174162

175-
func (f *liteFile) buildIndex(ctx context.Context) error {
163+
func (f *liteFile) context() context.Context {
164+
if f.conn != nil {
165+
return f.conn.GetInterrupt()
166+
}
167+
return context.Background()
168+
}
169+
170+
type liteDB struct {
171+
client litestream.ReplicaClient
172+
opts *ReplicaOptions
173+
pages *pageIndex // +checklocks:mtx
174+
lastPoll time.Time // +checklocks:mtx
175+
txids levelTXIDs // +checklocks:mtx
176+
mtx sync.Mutex
177+
}
178+
179+
func (f *liteDB) buildIndex(ctx context.Context) error {
180+
f.mtx.Lock()
181+
defer f.mtx.Unlock()
182+
183+
// Skip if we already have an index.
184+
if f.pages != nil {
185+
return nil
186+
}
187+
188+
// Build the index from scratch from a Litestream restore plan.
176189
infos, err := litestream.CalcRestorePlan(ctx, f.client, 0, time.Time{}, f.opts.Logger)
177190
if err != nil {
178191
if !errors.Is(err, litestream.ErrTxNotAvailable) {
@@ -182,7 +195,7 @@ func (f *liteFile) buildIndex(ctx context.Context) error {
182195
}
183196

184197
for _, info := range infos {
185-
err := f.updateIndex(ctx, info)
198+
err := f.updateInfo(ctx, info)
186199
if err != nil {
187200
return err
188201
}
@@ -192,81 +205,86 @@ func (f *liteFile) buildIndex(ctx context.Context) error {
192205
return nil
193206
}
194207

195-
func (f *liteFile) pollReplica() error {
196-
// Can't poll in a transaction.
197-
if f.lock > vfs.LOCK_NONE {
198-
return nil
199-
}
208+
func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) {
209+
f.mtx.Lock()
210+
defer f.mtx.Unlock()
200211

201212
// Limit polling interval.
202213
if time.Since(f.lastPoll) < f.opts.PollInterval {
203-
return nil
204-
}
205-
206-
ctx := context.Background()
207-
if f.conn != nil {
208-
ctx = f.conn.GetInterrupt()
214+
return f.pages, f.txids[0], nil
209215
}
210216

217+
// Updating from MinLevel to SnapshotLevel is non-racy,
218+
// since LTX files are compacted into higher levels
219+
// before the lower level LTX files are deleted.
211220
for level := f.opts.MinLevel; level <= litestream.SnapshotLevel; level++ {
212-
err := func() error {
213-
var nextTXID ltx.TXID
214-
// Snapshots must start from scratch,
215-
// other levels can start from where they were left.
216-
if level != litestream.SnapshotLevel {
217-
nextTXID = f.txids[level] + 1
218-
}
219-
220-
// Start reading from the next LTX file after the current position.
221-
itr, err := f.client.LTXFiles(ctx, level, nextTXID)
222-
if err != nil {
223-
return fmt.Errorf("ltx files: %w", err)
224-
}
225-
defer itr.Close()
226-
227-
// Build an update across all new LTX files.
228-
for itr.Next() {
229-
info := itr.Item()
230-
231-
// Skip LTX files already in the index.
232-
if info.MaxTXID <= f.txids[level] {
233-
continue
234-
}
235-
236-
err := f.updateIndex(ctx, info)
237-
if err != nil {
238-
return err
239-
}
240-
}
241-
if err := itr.Err(); err != nil {
242-
return err
243-
}
244-
return itr.Close()
245-
}()
246-
if err != nil {
221+
if err := f.updateLevel(ctx, level); err != nil {
247222
f.opts.Logger.Error("cannot poll replica", "error", err)
248-
return err
223+
return nil, 0, err
249224
}
250225
}
251226

252227
f.lastPoll = time.Now()
253-
return nil
228+
return f.pages, f.txids[0], nil
254229
}
255230

256-
func (f *liteFile) updateIndex(ctx context.Context, info *ltx.FileInfo) error {
257-
// Read page index.
231+
// +checklocks:f.mtx
232+
func (f *liteDB) updateLevel(ctx context.Context, level int) error {
233+
var nextTXID ltx.TXID
234+
// Snapshots must start from scratch,
235+
// other levels can start from where they were left.
236+
if level != litestream.SnapshotLevel {
237+
nextTXID = f.txids[level] + 1
238+
}
239+
240+
// Start reading from the next LTX file after the current position.
241+
itr, err := f.client.LTXFiles(ctx, level, nextTXID)
242+
if err != nil {
243+
return fmt.Errorf("ltx files: %w", err)
244+
}
245+
defer itr.Close()
246+
247+
// Build an update across all new LTX files.
248+
for itr.Next() {
249+
info := itr.Item()
250+
251+
// Skip LTX files already fully loaded into the index.
252+
if info.MaxTXID <= f.txids[level] {
253+
continue
254+
}
255+
256+
err := f.updateInfo(ctx, info)
257+
if err != nil {
258+
return err
259+
}
260+
}
261+
if err := itr.Err(); err != nil {
262+
return err
263+
}
264+
return itr.Close()
265+
}
266+
267+
// +checklocks:f.mtx
268+
func (f *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error {
258269
idx, err := litestream.FetchPageIndex(ctx, f.client, info)
259270
if err != nil {
260271
return fmt.Errorf("fetch page index: %w", err)
261272
}
262273

263-
// Replace pages in overall index with new pages.
274+
// Replace pages in the index with new pages.
264275
for k, v := range idx {
265-
f.pageCount = max(f.pageCount, k)
266-
f.pages[k] = v
276+
// Patch avoids mutating the index for an unmodified page.
277+
f.pages = f.pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) {
278+
return v, node == nil || v != node.Value()
279+
})
267280
}
268-
f.txids[info.Level] = max(f.txids[info.Level], info.MaxTXID)
281+
282+
// Track the MaxTXID for each level.
283+
maxTXID := &f.txids[info.Level]
284+
*maxTXID = max(*maxTXID, info.MaxTXID)
269285
return nil
270286
}
271287

288+
// Type aliases; these are a mouthful.
289+
type pageIndex = wbt.Tree[uint32, ltx.PageIndexElem]
272290
type levelTXIDs = [litestream.SnapshotLevel + 1]ltx.TXID

0 commit comments

Comments
 (0)