Skip to content

Commit 9f52597

Browse files
committed
Implement sharding
1 parent a37729f commit 9f52597

File tree

23 files changed

+633
-90
lines changed

23 files changed

+633
-90
lines changed

SUBQL.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@
77
Many other networks and L2s use forks of geth we can apply the changes to these forks using the following steps
88

99
1. Generate a patch between the feature branch and master
10-
<<<<<<< HEAD
1110
`git format-patch --stdout <sha1>..<sha2> > subql.patch`
12-
=======
13-
`git format-patch --stdout master > subql.patch`
14-
>>>>>>> db90ac101 (Implement transactions bloom filter and subql apis)
1511

1612
2. In the for repo ensure that the subquery geth for is a remote
1713

@@ -31,10 +27,8 @@ This will apply the changes but may require resolving some conflicts
3127
These forks are generally behind the master geth branch. This can mean that further changes are requied to get builds suceeding.
3228

3329
It's suggested to create a patch or patches of these changes to make it easier to sync changes from the fork and from geth in the future
34-
<<<<<<< HEAD
3530

3631
You can do this by repeating steps 1, 3
3732

3833
e.g `git format-patch --stdout 2d772be398d851a62be53d1b0c162c45bb4876e3..c9760f18c15b8d36af7ef1a9bf5b21056f633b71 > build_fixes.patch`
39-
=======
40-
>>>>>>> db90ac101 (Implement transactions bloom filter and subql apis)
34+

cmd/geth/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ var (
154154
utils.BeaconGenesisRootFlag,
155155
utils.BeaconGenesisTimeFlag,
156156
utils.BeaconCheckpointFlag,
157+
utils.ShardStartFlag,
158+
utils.ShardEndFlag,
157159
}, utils.NetworkFlags, utils.DatabaseFlags)
158160

159161
rpcFlags = []cli.Flag{

cmd/utils/flags.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,17 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
928928
Value: metrics.DefaultConfig.InfluxDBOrganization,
929929
Category: flags.MetricsCategory,
930930
}
931+
932+
ShardStartFlag = &cli.Uint64Flag{
933+
Name: "shardstart",
934+
Usage: "ShardStart the block height of Shard start",
935+
Category: flags.StateCategory,
936+
}
937+
ShardEndFlag = &cli.Uint64Flag{
938+
Name: "shardend",
939+
Usage: "ShardEnd the block height of Shard end (0 means no end)",
940+
Category: flags.StateCategory,
941+
}
931942
)
932943

933944
var (
@@ -1677,6 +1688,25 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
16771688
cfg.EthDiscoveryURLs = SplitAndTrim(urls)
16781689
}
16791690
}
1691+
1692+
if ctx.IsSet(ShardStartFlag.Name) {
1693+
start := ctx.Uint64(ShardStartFlag.Name)
1694+
cfg.ShardStart = &start
1695+
}
1696+
if cfg.ShardStart != nil {
1697+
log.Info("Set shard start", "ShardStart", *cfg.ShardStart)
1698+
} else {
1699+
log.Info("Shard start disabled")
1700+
}
1701+
if ctx.IsSet(ShardEndFlag.Name) {
1702+
end := ctx.Uint64(ShardEndFlag.Name)
1703+
cfg.ShardEnd = &end
1704+
}
1705+
if cfg.ShardEnd != nil {
1706+
log.Info("Set shard end", "shardend", *cfg.ShardEnd)
1707+
} else {
1708+
log.Info("Shard end disabled")
1709+
}
16801710
// Override any default configs for hard coded networks.
16811711
switch {
16821712
case ctx.Bool(MainnetFlag.Name):

common/math/integer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,17 @@ func SafeMul(x, y uint64) (uint64, bool) {
9191
hi, lo := bits.Mul64(x, y)
9292
return lo, hi != 0
9393
}
94+
95+
func Min(x int, y int) int {
96+
if x <= y {
97+
return x
98+
}
99+
return y
100+
}
101+
102+
func Max(x int, y int) int {
103+
if x >= y {
104+
return x
105+
}
106+
return y
107+
}

core/blockchain.go

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ var (
6262
headFinalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil)
6363
headSafeBlockGauge = metrics.NewRegisteredGauge("chain/head/safe", nil)
6464

65+
tailBlockGauge = metrics.NewRegisteredGauge("chain/tail/block", nil)
66+
tailHeaderGauge = metrics.NewRegisteredGauge("chain/tail/header", nil)
67+
6568
chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil)
6669

6770
accountReadTimer = metrics.NewRegisteredResettingTimer("chain/account/reads", nil)
@@ -248,6 +251,7 @@ type BlockChain struct {
248251
currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync
249252
currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block
250253
currentSafeBlock atomic.Pointer[types.Header] // Latest (consensus) safe block
254+
earliestBlock atomic.Pointer[types.Header] // Earliest head of the chain
251255

252256
bodyCache *lru.Cache[common.Hash, *types.Body]
253257
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
@@ -524,10 +528,24 @@ func (bc *BlockChain) loadLastState() error {
524528
headHeader = header
525529
}
526530
}
531+
532+
tailNumber := uint64(0)
533+
if dataConfig := rawdb.ReadChainDataConfig(bc.db); dataConfig != nil && dataConfig.DesiredChainDataStart != nil {
534+
tailNumber = *dataConfig.DesiredChainDataStart
535+
}
536+
tailBlock := bc.GetBlockByNumber(tailNumber)
537+
if tailBlock == nil {
538+
// Corrupt or empty database, init from scratch
539+
log.Warn("Tail block missing, resetting chain", "number", tailNumber)
540+
return bc.Reset()
541+
}
542+
527543
bc.hc.SetCurrentHeader(headHeader)
544+
bc.hc.SetEarliestHeader(tailBlock.Header())
528545

529546
// Restore the last known head snap block
530547
bc.currentSnapBlock.Store(headBlock.Header())
548+
bc.earliestBlock.Store(tailBlock.Header())
531549
headFastBlockGauge.Update(int64(headBlock.NumberU64()))
532550

533551
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
@@ -900,6 +918,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
900918
// removed in the hc.SetHead function.
901919
rawdb.DeleteBody(db, hash, num)
902920
rawdb.DeleteReceipts(db, hash, num)
921+
rawdb.DeleteTxBloom(db, hash, num)
903922
}
904923
// Todo(rjl493456442) txlookup, bloombits, etc
905924
}
@@ -936,6 +955,8 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
936955
log.Error("SetHead invalidated finalized block")
937956
bc.SetFinalized(nil)
938957
}
958+
959+
log.Info("Rewinding blockchain complete", "target", head)
939960
return rootNumber, bc.loadLastState()
940961
}
941962

@@ -1152,7 +1173,10 @@ func (bc *BlockChain) Stop() {
11521173
}
11531174
}
11541175
for !bc.triegc.Empty() {
1155-
triedb.Dereference(bc.triegc.PopItem())
1176+
// @sq-change
1177+
root, number := bc.triegc.Pop()
1178+
triedb.Dereference(root)
1179+
triedb.SetTail(uint64(-number))
11561180
}
11571181
if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
11581182
log.Error("Dangling trie nodes after full cleanup")
@@ -1526,6 +1550,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
15261550
break
15271551
}
15281552
bc.triedb.Dereference(root)
1553+
bc.triedb.SetTail(uint64(-number))
15291554
}
15301555
return nil
15311556
}
@@ -1716,6 +1741,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
17161741
log.Debug("Abort during block processing")
17171742
break
17181743
}
1744+
1745+
desiredChainEnd := rawdb.ReadChainDataConfig(bc.db).DesiredChainDataEnd
1746+
if desiredChainEnd != nil && block.Number().Uint64() > *desiredChainEnd {
1747+
log.Info("Block after data end, skipping")
1748+
continue
1749+
}
17191750
// If the block is known (in the middle of the chain), it's a special case for
17201751
// Clique blocks where they can share state among each other, so importing an
17211752
// older block might complete the state of the subsequent one. In this case,
@@ -2521,3 +2552,113 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
25212552
func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
25222553
return time.Duration(bc.flushInterval.Load())
25232554
}
2555+
2556+
// ----- @sq changed
2557+
func (bc *BlockChain) truncateStateTail(newTail uint64) error {
2558+
currentTail := bc.triedb.GetTail()
2559+
if newTail <= currentTail {
2560+
return nil
2561+
}
2562+
for height := currentTail; height < newTail; height++ {
2563+
header := bc.GetHeaderByNumber(height)
2564+
// This isn't right,.height cant be used as a priority
2565+
bc.triegc.Push(header.Root, -int64(height))
2566+
}
2567+
// TODO: need to enforce trie truncate on local disk
2568+
return nil
2569+
}
2570+
2571+
func (bc *BlockChain) SetTail(height uint64) error {
2572+
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
2573+
frozen, _ := bc.db.Ancients()
2574+
2575+
if num+1 <= frozen {
2576+
// Headerchain should clear out all relevant ancients data
2577+
log.Crit("delFn called on frozen data")
2578+
} else {
2579+
// Remove body and receipts from the active store, remainder will be cleared in the hc.SetTail
2580+
rawdb.DeleteBody(db, hash, num)
2581+
rawdb.DeleteReceipts(db, hash, num)
2582+
rawdb.DeleteTxBloom(db, hash, num)
2583+
}
2584+
// Todo(stwiname) from equivalent in SetHead.delFn
2585+
// Todo(rjl493456442) txlookup, bloombits, etc
2586+
}
2587+
2588+
if err := bc.hc.SetTail(height, delFn); err != nil {
2589+
return err
2590+
}
2591+
2592+
// TODO delete relevant state
2593+
// bc.triedb.TruncateTail(num)
2594+
2595+
// Clear out any stale content from the caches
2596+
bc.bodyCache.Purge()
2597+
bc.bodyRLPCache.Purge()
2598+
bc.receiptsCache.Purge()
2599+
bc.blockCache.Purge()
2600+
bc.txLookupCache.Purge()
2601+
2602+
return nil
2603+
}
2604+
2605+
func (bc *BlockChain) SetShardStartHeight(height uint64) error {
2606+
config := rawdb.ReadChainDataConfig(bc.db)
2607+
2608+
if config.DesiredChainDataEnd != nil && *config.DesiredChainDataEnd <= height {
2609+
return fmt.Errorf("Start height cannot be after end height")
2610+
}
2611+
2612+
if config.DesiredChainDataStart != nil && *config.DesiredChainDataStart > height {
2613+
return fmt.Errorf("Start height cannot be decreased, there is no way to recover state")
2614+
}
2615+
2616+
if err := bc.SetTail(height); err != nil {
2617+
return err
2618+
}
2619+
2620+
// TODO perisisting this should happen with batch in SetTail
2621+
config.DesiredChainDataStart = &height
2622+
rawdb.WriteChainDataConfig(bc.db, config)
2623+
2624+
return nil
2625+
}
2626+
2627+
func (bc *BlockChain) SetShardEndHeight(height *uint64) error {
2628+
config := rawdb.ReadChainDataConfig(bc.db)
2629+
2630+
if config.DesiredChainDataStart != nil && *config.DesiredChainDataStart >= *height {
2631+
return fmt.Errorf("End height cannot be before start height")
2632+
}
2633+
2634+
if height != nil && bc.CurrentHeader().Number.Uint64() > *height {
2635+
// rollback
2636+
if err := bc.SetHead(*height); err != nil {
2637+
return err
2638+
}
2639+
2640+
// TODO truncate state
2641+
2642+
2643+
// type freezer interface {
2644+
// Freeze(threshold uint64) error
2645+
// }
2646+
// err = api.eth.chainDb.(freezer).Freeze(0)
2647+
// if err != nil {
2648+
// return false, err
2649+
// }
2650+
// TODO truncate state head
2651+
// err = api.eth.blockchain.TrieDB().SetHead(*height)
2652+
// if err != nil {
2653+
// return false, err
2654+
// }
2655+
2656+
// TODO call blockchain.Stop()?
2657+
}
2658+
// else blockchain.go will limit this once the desired height is reached
2659+
2660+
config.DesiredChainDataEnd = height
2661+
rawdb.WriteChainDataConfig(bc.db, config)
2662+
2663+
return nil
2664+
}

core/blockchain_reader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func (bc *BlockChain) CurrentSafeBlock() *types.Header {
6262
return bc.currentSafeBlock.Load()
6363
}
6464

65+
func (bc *BlockChain) EarliestBlock() *types.Header {
66+
return bc.earliestBlock.Load()
67+
}
68+
6569
// HasHeader checks if a block header is present in the database or not, caching
6670
// it if present.
6771
func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool {

0 commit comments

Comments
 (0)