Skip to content

Commit 584041c

Browse files
aeddizivkovicmilos
andauthored
feat(backup): add a batch feature to the RPC client (#45)
* feat(backup): add ws protocol to rpc client * feat(backup): add a batchSize parameter * feat(cmd): add a verbose parameter * feat(backup): add batch feature to rpc client * feat(backup): add batch feature to cmd * test(backup): edit unit test for batch feature --------- Co-authored-by: Miloš Živković <[email protected]>
1 parent 09edd61 commit 584041c

File tree

12 files changed

+635
-369
lines changed

12 files changed

+635
-369
lines changed

.github/golangci.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,6 @@ linters-settings:
122122
rules:
123123
json: goCamel
124124
yaml: goCamel
125+
gosec:
126+
excludes:
127+
- G115 # Potential integer overflow when converting between integer types

backup/backup.go

Lines changed: 103 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Service struct {
2121
writer writer.Writer
2222
logger log.Logger
2323

24+
batchSize uint
2425
watchInterval time.Duration // interval for the watch routine
2526
}
2627

@@ -37,6 +38,11 @@ func NewService(client client.Client, writer writer.Writer, opts ...Option) *Ser
3738
opt(s)
3839
}
3940

41+
// Batch size needs to be at least 1
42+
if s.batchSize == 0 {
43+
s.batchSize = 1
44+
}
45+
4046
return s
4147
}
4248

@@ -53,62 +59,117 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
5359
return fmt.Errorf("unable to determine right bound, %w", boundErr)
5460
}
5561

56-
// Keep track of total txs backed up
57-
totalTxs := uint64(0)
62+
// Log info about what will be backed up
63+
s.logger.Info(
64+
"Existing blocks to backup",
65+
"from block", cfg.FromBlock,
66+
"to block", toBlock,
67+
"total", toBlock-cfg.FromBlock+1,
68+
)
69+
70+
// Keep track of what has been backed up
71+
var results struct {
72+
blocksFetched uint64
73+
blocksWithTxs uint64
74+
txsBackedUp uint64
75+
}
5876

59-
fetchAndWrite := func(height uint64) error {
60-
block, txErr := s.client.GetBlock(height)
61-
if txErr != nil {
62-
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
63-
}
77+
// Log results on exit
78+
defer func() {
79+
s.logger.Info(
80+
"Total data backed up",
81+
"blocks fetched", results.blocksFetched,
82+
"blocks with transactions", results.blocksWithTxs,
83+
"transactions written", results.txsBackedUp,
84+
)
85+
}()
86+
87+
// Internal function that fetches and writes a range of blocks
88+
fetchAndWrite := func(fromBlock, toBlock uint64) error {
89+
// Fetch by batches
90+
for batchStart := fromBlock; batchStart <= toBlock; {
91+
// Determine batch stop block
92+
batchStop := batchStart + uint64(s.batchSize) - 1
93+
if batchStop > toBlock {
94+
batchStop = toBlock
95+
}
6496

65-
// Skip empty blocks
66-
if len(block.Txs) == 0 {
67-
return nil
68-
}
97+
batchSize := batchStop - batchStart + 1
6998

70-
// Save the block transaction data, if any
71-
for _, tx := range block.Txs {
72-
data := &gnoland.TxWithMetadata{
73-
Tx: tx,
74-
Metadata: &gnoland.GnoTxMetadata{
75-
Timestamp: block.Timestamp,
76-
},
77-
}
99+
// Verbose log for blocks to be fetched
100+
s.logger.Debug(
101+
"Fetching batch of blocks",
102+
"from", batchStart,
103+
"to", batchStop,
104+
"size", batchSize,
105+
)
78106

79-
// Write the tx data to the file
80-
if writeErr := s.writer.WriteTxData(data); writeErr != nil {
81-
return fmt.Errorf("unable to write tx data, %w", writeErr)
107+
// Fetch current batch
108+
blocks, txErr := s.client.GetBlocks(ctx, batchStart, batchStop)
109+
if txErr != nil {
110+
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
82111
}
83112

84-
totalTxs++
113+
// Keep track of the number of fetched blocks & those containing transactions
114+
results.blocksFetched += batchSize
115+
results.blocksWithTxs += uint64(len(blocks))
85116

86-
// Log the progress
87-
s.logger.Info(
88-
"Transaction backed up",
89-
"total", totalTxs,
117+
// Verbose log for blocks containing transactions
118+
s.logger.Debug(
119+
"Batch fetched successfully",
120+
"blocks with transactions", fmt.Sprintf("%d/%d", len(blocks), batchSize),
90121
)
91-
}
92122

93-
return nil
94-
}
123+
// Iterate over the list of blocks containing transactions
124+
for _, block := range blocks {
125+
for i, tx := range block.Txs {
126+
// Write the tx data to the file
127+
txData := &gnoland.TxWithMetadata{
128+
Tx: tx,
129+
Metadata: &gnoland.GnoTxMetadata{
130+
Timestamp: block.Timestamp,
131+
},
132+
}
95133

96-
// Gather the chain data from the node
97-
for block := cfg.FromBlock; block <= toBlock; block++ {
98-
select {
99-
case <-ctx.Done():
100-
s.logger.Info("backup procedure stopped")
134+
if writeErr := s.writer.WriteTxData(txData); writeErr != nil {
135+
return fmt.Errorf("unable to write tx data, %w", writeErr)
136+
}
137+
138+
// Keep track of the number of backed up transactions
139+
results.txsBackedUp++
101140

102-
return nil
103-
default:
104-
if fetchErr := fetchAndWrite(block); fetchErr != nil {
105-
return fetchErr
141+
// Verbose log for each transaction written
142+
s.logger.Debug(
143+
"Transaction backed up",
144+
"blockNum", block.Height,
145+
"tx count (block)", i+1,
146+
"tx count (total)", results.txsBackedUp,
147+
)
148+
}
106149
}
150+
151+
batchStart = batchStop + 1
107152
}
153+
154+
return nil
155+
}
156+
157+
// Backup the existing transactions
158+
if fetchErr := fetchAndWrite(cfg.FromBlock, toBlock); fetchErr != nil {
159+
return fetchErr
108160
}
109161

110162
// Check if there needs to be a watcher setup
111163
if cfg.Watch {
164+
s.logger.Info(
165+
"Existing blocks backup complete",
166+
"blocks fetched", results.blocksFetched,
167+
"blocks with transactions", results.blocksWithTxs,
168+
"transactions written", results.txsBackedUp,
169+
)
170+
171+
s.logger.Info("Watch for new blocks to backup")
172+
112173
ticker := time.NewTicker(s.watchInterval)
113174
defer ticker.Stop()
114175

@@ -117,7 +178,7 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
117178
for {
118179
select {
119180
case <-ctx.Done():
120-
s.logger.Info("export procedure stopped")
181+
s.logger.Info("Stop watching for new blocks to backup")
121182

122183
return nil
123184
case <-ticker.C:
@@ -133,10 +194,8 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
133194
}
134195

135196
// Catch up to the latest block
136-
for block := lastBlock + 1; block <= latest; block++ {
137-
if fetchErr := fetchAndWrite(block); fetchErr != nil {
138-
return fetchErr
139-
}
197+
if fetchErr := fetchAndWrite(lastBlock+1, latest); fetchErr != nil {
198+
return fetchErr
140199
}
141200

142201
// Update the last exported block
@@ -145,8 +204,6 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
145204
}
146205
}
147206

148-
s.logger.Info("Backup complete")
149-
150207
return nil
151208
}
152209

0 commit comments

Comments
 (0)