Skip to content

Commit 12c0585

Browse files
accounts: add migration code from kvdb to SQL
This commit introduces the migration logic for transitioning the accounts store from kvdb to SQL. Note that as of this commit, the migration is not yet triggered by any production code, i.e. only tests execute the migration logic.
1 parent 92689c5 commit 12c0585

File tree

3 files changed

+622
-2
lines changed

3 files changed

+622
-2
lines changed

accounts/sql_migration.go

+272
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package accounts
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"math"
9+
"reflect"
10+
"time"
11+
12+
"github.com/davecgh/go-spew/spew"
13+
"github.com/lightninglabs/lightning-terminal/db/sqlc"
14+
"github.com/lightningnetwork/lnd/lntypes"
15+
"github.com/pmezard/go-difflib/difflib"
16+
)
17+
18+
var (
19+
// ErrMigrationMismatch is returned when the migrated account does not
20+
// match the original account.
21+
ErrMigrationMismatch = fmt.Errorf("migrated account does not match " +
22+
"original account")
23+
)
24+
25+
// MigrateAccountStoreToSQL runs the migration of all accounts and indices from
26+
// the KV database to the SQL database. The migration is done in a single
27+
// transaction to ensure that all accounts are migrated or none at all.
28+
func MigrateAccountStoreToSQL(ctx context.Context, kvStore *BoltStore,
29+
tx SQLQueries) error {
30+
31+
log.Infof("Starting migration of the KV accounts store to SQL")
32+
33+
err := migrateAccountsToSQL(ctx, kvStore, tx)
34+
if err != nil {
35+
return err
36+
}
37+
38+
err = migrateAccountsIndicesToSQL(ctx, kvStore, tx)
39+
if err != nil {
40+
return err
41+
}
42+
43+
return nil
44+
}
45+
46+
// migrateAccountsToSQL runs the migration of all accounts from the KV database
47+
// to the SQL database. The migration is done in a single transaction to ensure
48+
// that all accounts are migrated or none at all.
49+
func migrateAccountsToSQL(ctx context.Context, kvStore *BoltStore,
50+
tx SQLQueries) error {
51+
52+
log.Infof("Starting migration of accounts from KV to SQL")
53+
54+
kvAccounts, err := kvStore.Accounts(ctx)
55+
if err != nil {
56+
return err
57+
}
58+
59+
total := 0
60+
61+
for i, kvAccount := range kvAccounts {
62+
total++
63+
64+
migratedAccountID, err := migrateSingleAccountToSQL(
65+
ctx, tx, kvAccounts[i],
66+
)
67+
if err != nil {
68+
return fmt.Errorf("unable to migrate account(%v): %w",
69+
kvAccount.ID, err)
70+
}
71+
72+
migratedAccount, err := getAndMarshalAccount(
73+
ctx, tx, migratedAccountID,
74+
)
75+
if err != nil {
76+
return fmt.Errorf("unable to fetch migrated "+
77+
"account(%v): %w", kvAccount.ID, err)
78+
}
79+
80+
overrideAccountTimeZone(kvAccount)
81+
overrideAccountTimeZone(migratedAccount)
82+
83+
if !reflect.DeepEqual(kvAccount, migratedAccount) {
84+
diff := difflib.UnifiedDiff{
85+
A: difflib.SplitLines(
86+
spew.Sdump(kvAccount),
87+
),
88+
B: difflib.SplitLines(
89+
spew.Sdump(migratedAccount),
90+
),
91+
FromFile: "Expected",
92+
FromDate: "",
93+
ToFile: "Actual",
94+
ToDate: "",
95+
Context: 3,
96+
}
97+
diffText, _ := difflib.GetUnifiedDiffString(diff)
98+
99+
return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch,
100+
kvAccount.ID, diffText)
101+
}
102+
}
103+
104+
log.Infof("All accounts migrated from KV to SQL. Total number of "+
105+
"accounts migrated: %d", total)
106+
107+
return nil
108+
}
109+
110+
// migrateSingleAccountToSQL runs the migration for a single account from the
111+
// KV database to the SQL database.
112+
func migrateSingleAccountToSQL(ctx context.Context,
113+
tx SQLQueries, account *OffChainBalanceAccount) (int64, error) {
114+
115+
insertAccountParams, err := makeInsertAccountParams(account)
116+
if err != nil {
117+
return 0, err
118+
}
119+
120+
sqlId, err := tx.InsertAccount(ctx, insertAccountParams)
121+
if err != nil {
122+
return 0, err
123+
}
124+
125+
for hash := range account.Invoices {
126+
addInvoiceParams := makeAddAccountInvoiceParams(sqlId, hash)
127+
128+
err = tx.AddAccountInvoice(ctx, addInvoiceParams)
129+
if err != nil {
130+
return sqlId, err
131+
}
132+
}
133+
134+
for hash, paymentEntry := range account.Payments {
135+
upsertPaymentParams := makeUpsertAccountPaymentParams(
136+
sqlId, hash, paymentEntry,
137+
)
138+
139+
err = tx.UpsertAccountPayment(ctx, upsertPaymentParams)
140+
if err != nil {
141+
return sqlId, err
142+
}
143+
}
144+
145+
return sqlId, nil
146+
}
147+
148+
// migrateAccountsIndicesToSQL runs the migration for the account indices from
149+
// the KV database to the SQL database.
150+
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore *BoltStore,
151+
tx SQLQueries) error {
152+
153+
log.Infof("Starting migration of accounts indices from KV to SQL")
154+
155+
addIndex, settleIndex, err := kvStore.LastIndexes(ctx)
156+
if errors.Is(err, ErrNoInvoiceIndexKnown) {
157+
log.Infof("No indices found in KV store, skipping migration")
158+
return nil
159+
} else if err != nil {
160+
return err
161+
}
162+
163+
setAddIndexParams, err := makeSetAccountIndexParams(
164+
addIndex, addIndexName,
165+
)
166+
if err != nil {
167+
return err
168+
}
169+
170+
err = tx.SetAccountIndex(ctx, setAddIndexParams)
171+
if err != nil {
172+
return err
173+
}
174+
175+
setSettleIndexParams, err := makeSetAccountIndexParams(
176+
settleIndex, settleIndexName,
177+
)
178+
if err != nil {
179+
return err
180+
}
181+
182+
err = tx.SetAccountIndex(ctx, setSettleIndexParams)
183+
if err != nil {
184+
return err
185+
}
186+
187+
log.Infof("Successfully migratated accounts indices from KV to SQL")
188+
189+
return nil
190+
}
191+
192+
// overrideAccountTimeZone overrides the time zone of the account to the local
193+
// time zone and chops off the nanosecond part for comparison. This is needed
194+
// because KV database stores times as-is which as an unwanted side effect would
195+
// fail migration due to time comparison expecting both the original and
196+
// migrated accounts to be in the same local time zone and in microsecond
197+
// precision. Note that PostgresSQL stores times in microsecond precision while
198+
// SQLite can store times in nanosecond precision if using TEXT storage class.
199+
func overrideAccountTimeZone(account *OffChainBalanceAccount) {
200+
fixTime := func(t time.Time) time.Time {
201+
return t.In(time.Local).Truncate(time.Microsecond)
202+
}
203+
204+
if !account.ExpirationDate.IsZero() {
205+
account.ExpirationDate = fixTime(account.ExpirationDate)
206+
}
207+
208+
if !account.LastUpdate.IsZero() {
209+
account.LastUpdate = fixTime(account.LastUpdate)
210+
}
211+
}
212+
213+
func makeInsertAccountParams(account *OffChainBalanceAccount) (
214+
sqlc.InsertAccountParams, error) {
215+
216+
var labelVal sql.NullString
217+
if len(account.Label) > 0 {
218+
labelVal = sql.NullString{
219+
String: account.Label,
220+
Valid: true,
221+
}
222+
}
223+
224+
accountId, err := account.ID.ToInt64()
225+
if err != nil {
226+
return sqlc.InsertAccountParams{}, err
227+
}
228+
229+
return sqlc.InsertAccountParams{
230+
Type: int16(account.Type),
231+
InitialBalanceMsat: int64(account.InitialBalance),
232+
CurrentBalanceMsat: account.CurrentBalance,
233+
LastUpdated: account.LastUpdate.UTC(),
234+
Label: labelVal,
235+
Alias: accountId,
236+
Expiration: account.ExpirationDate.UTC(),
237+
}, nil
238+
}
239+
240+
func makeAddAccountInvoiceParams(sqlID int64,
241+
hash lntypes.Hash) sqlc.AddAccountInvoiceParams {
242+
243+
return sqlc.AddAccountInvoiceParams{
244+
AccountID: sqlID,
245+
Hash: hash[:],
246+
}
247+
}
248+
249+
func makeUpsertAccountPaymentParams(sqlID int64, hash lntypes.Hash,
250+
entry *PaymentEntry) sqlc.UpsertAccountPaymentParams {
251+
252+
return sqlc.UpsertAccountPaymentParams{
253+
AccountID: sqlID,
254+
Hash: hash[:],
255+
Status: int16(entry.Status),
256+
FullAmountMsat: int64(entry.FullAmount),
257+
}
258+
}
259+
260+
func makeSetAccountIndexParams(indexValue uint64,
261+
indexName string) (sqlc.SetAccountIndexParams, error) {
262+
263+
if indexValue > math.MaxInt64 {
264+
return sqlc.SetAccountIndexParams{}, fmt.Errorf("%s:%v is "+
265+
"above max int64 value", indexName, indexValue)
266+
}
267+
268+
return sqlc.SetAccountIndexParams{
269+
Name: indexName,
270+
Value: int64(indexValue),
271+
}, nil
272+
}

0 commit comments

Comments
 (0)