diff --git a/go.mod b/go.mod index aac677992..d981b36f7 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,13 @@ require ( github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/ipfs/go-bitfield v1.0.0 github.com/ipfs/go-bitswap v0.1.2 // indirect + github.com/ipfs/go-block-format v0.0.2 + github.com/ipfs/go-blockservice v0.1.0 github.com/ipfs/go-cid v0.0.7 + github.com/ipfs/go-datastore v0.0.5 + github.com/ipfs/go-ipfs-blockstore v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.1 + github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.3 github.com/ipfs/go-ipfs-posinfo v0.0.1 github.com/ipfs/go-ipfs-util v0.0.1 @@ -21,6 +26,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.7.0 github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) go 1.16 diff --git a/go.sum b/go.sum index 5d067042f..ca32d1143 100644 --- a/go.sum +++ b/go.sum @@ -335,6 +335,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/hamt/hamt.go b/hamt/hamt.go index 74a8d2759..c3e7ce1a7 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -24,6 +24,9 @@ import ( "context" "fmt" "os" + "sync" + + "golang.org/x/sync/errgroup" format "github.com/ipfs/go-unixfs" "github.com/ipfs/go-unixfs/internal" @@ -372,14 +375,11 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { go func() { defer close(linkResults) defer cancel() - getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults) - cset := cid.NewSet() - rootNode, err := ds.Node() - if err != nil { - emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) - return - } - err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent()) + + err := parallelShardWalk(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error { + emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) + return nil + }) if err != nil { emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) } @@ -387,44 +387,178 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return linkResults } -// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync -// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called -// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation -func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks { - - return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { - node, err := dagService.Get(ctx, currentCid) - if err != nil { - return nil, err - } - directoryShard, err := NewHamtFromDag(dagService, node) - if err != nil { - return nil, err - } +type listCidsAndShards struct { + cids []cid.Cid + shards []*Shard +} - childShards := make([]*ipld.Link, 0, directoryShard.childer.length()) - links := directoryShard.childer.links - for idx := range directoryShard.childer.children { - lnk := links[idx] - lnkLinkType, err := directoryShard.childLinkType(lnk) +func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) error) (*listCidsAndShards, error) { + res := &listCidsAndShards{} + for idx, lnk := range ds.childer.links { + if nextShard := ds.childer.children[idx]; nextShard == nil { + lnkLinkType, err := ds.childLinkType(lnk) if err != nil { return nil, err } - if lnkLinkType == shardLink { - childShards = append(childShards, lnk) - } else { - sv, err := directoryShard.makeShardValue(lnk) + + switch lnkLinkType { + case shardValueLink: + sv, err := ds.makeShardValue(lnk) if err != nil { return nil, err } formattedLink := sv.val formattedLink.Name = sv.key - emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) + + if err := processLinkValues(formattedLink); err != nil { + return nil, err + } + case shardLink: + res.cids = append(res.cids, lnk.Cid) + default: + return nil, fmt.Errorf("unsupported shard link type") + } + + } else { + if nextShard.val != nil { + formattedLink := &ipld.Link{ + Name: nextShard.key, + Size: nextShard.val.Size, + Cid: nextShard.val.Cid, + } + if err := processLinkValues(formattedLink); err != nil { + return nil, err + } + } else { + res.shards = append(res.shards, nextShard) + } + } + } + return res, nil +} + +// parallelShardWalk is quite similar to the DAG walking algorithm from https://github.com/ipfs/go-merkledag/blob/594e515f162e764183243b72c2ba84f743424c8c/merkledag.go#L464 +// However, there are a few notable differences: +// 1. Some children are actualized Shard structs and some are in the blockstore, this will leverage walking over the in memory Shards as well as the stored blocks +// 2. Instead of just passing each child into the worker pool by itself we group them so that we can leverage optimizations from GetMany. +// This optimization also makes the walk a little more biased towards depth (as opposed to BFS) in the earlier part of the DAG. +// This is particularly helpful for operations like estimating the directory size which should complete quickly when possible. +// 3. None of the extra options from that package are needed +func parallelShardWalk(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error { + const concurrency = 32 + + var visitlk sync.Mutex + visitSet := cid.NewSet() + visit := visitSet.Visit + + // Setup synchronization + grp, errGrpCtx := errgroup.WithContext(ctx) + + // Input and output queues for workers. + feed := make(chan *listCidsAndShards) + out := make(chan *listCidsAndShards) + done := make(chan struct{}) + + for i := 0; i < concurrency; i++ { + grp.Go(func() error { + for feedChildren := range feed { + for _, nextShard := range feedChildren.shards { + nextChildren, err := nextShard.walkChildren(processShardValues) + if err != nil { + return err + } + + select { + case out <- nextChildren: + case <-errGrpCtx.Done(): + return nil + } + } + + var linksToVisit []cid.Cid + for _, nextCid := range feedChildren.cids { + var shouldVisit bool + + visitlk.Lock() + shouldVisit = visit(nextCid) + visitlk.Unlock() + + if shouldVisit { + linksToVisit = append(linksToVisit, nextCid) + } + } + + chNodes := dserv.GetMany(errGrpCtx, linksToVisit) + for optNode := range chNodes { + if optNode.Err != nil { + return optNode.Err + } + + nextShard, err := NewHamtFromDag(dserv, optNode.Node) + if err != nil { + return err + } + + nextChildren, err := nextShard.walkChildren(processShardValues) + if err != nil { + return err + } + + select { + case out <- nextChildren: + case <-errGrpCtx.Done(): + return nil + } + } + + select { + case done <- struct{}{}: + case <-errGrpCtx.Done(): + } + } + return nil + }) + } + + send := feed + var todoQueue []*listCidsAndShards + var inProgress int + + next := &listCidsAndShards{ + shards: []*Shard{root}, + } + +dispatcherLoop: + for { + select { + case send <- next: + inProgress++ + if len(todoQueue) > 0 { + next = todoQueue[0] + todoQueue = todoQueue[1:] + } else { + next = nil + send = nil + } + case <-done: + inProgress-- + if inProgress == 0 && next == nil { + break dispatcherLoop + } + case nextNodes := <-out: + if next == nil { + next = nextNodes + send = feed + } else { + todoQueue = append(todoQueue, nextNodes) } + case <-errGrpCtx.Done(): + break dispatcherLoop } - return childShards, nil } + close(feed) + return grp.Wait() } func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) { diff --git a/io/completehamt_test.go b/io/completehamt_test.go index 1bb3d8720..7995ac1d7 100644 --- a/io/completehamt_test.go +++ b/io/completehamt_test.go @@ -21,7 +21,8 @@ import ( // * all leaf Shard nodes have the same depth (and have only 'value' links). // * all internal Shard nodes point only to other Shards (and hence have zero 'value' links). // * the total number of 'value' links (directory entries) is: -// io.DefaultShardWidth ^ (treeHeight + 1). +// childsPerNode ^ (treeHeight). +// treeHeight: The number of layers of non-value HAMT nodes (e.g. height = 1 is a single shard pointing to some values) // FIXME: HAMTHashFunction needs to be set to idHash by the caller. We depend on // this simplification for the current logic to work. (HAMTHashFunction is a // global setting of the package, it is hard-coded in the serialized Shard node diff --git a/io/directory_test.go b/io/directory_test.go index 909f9b4fd..fd58438ed 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -11,7 +11,13 @@ import ( "testing" "time" + blocks "github.com/ipfs/go-block-format" + bsrv "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" ipld "github.com/ipfs/go-ipld-format" mdag "github.com/ipfs/go-merkledag" mdtest "github.com/ipfs/go-merkledag/test" @@ -358,10 +364,23 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { // with a regular structure to be able to predict how many Shard nodes we // will need to fetch in order to reach the HAMTShardingSize threshold in // sizeBelowThreshold (assuming a sequential DAG walk function). - ds := mdtest.Mock() - completeHAMTRoot, err := CreateCompleteHAMT(ds, treeHeight, shardWidth) + + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + countGetsDS := newCountGetsDS(bstore) + dsrv := mdag.NewDAGService(bsrv.New(countGetsDS, offline.Exchange(countGetsDS))) + completeHAMTRoot, err := CreateCompleteHAMT(dsrv, treeHeight, shardWidth) assert.NoError(t, err) + // Calculate the optimal number of nodes to traverse + optimalNodesToFetch := 0 + nodesToProcess := HAMTShardingSize + for i := 0; i < treeHeight-1; i++ { + // divide by the shard width to get the parents and continue up the tree + parentNodes := int(math.Ceil(float64(nodesToProcess) / float64(shardWidth))) + optimalNodesToFetch += parentNodes + nodesToProcess = parentNodes + } + // With this structure and a BFS traversal (from `parallelWalkDepth`) then // we would roughly fetch the following nodes: nodesToFetch := 0 @@ -374,8 +393,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { // the HAMTShardingSize threshold. nodesToFetch += thresholdToWidthRatio - countGetsDS := newCountGetsDS(ds) - hamtDir, err := newHAMTDirectoryFromNode(countGetsDS, completeHAMTRoot) + hamtDir, err := newHAMTDirectoryFromNode(dsrv, completeHAMTRoot) assert.NoError(t, err) countGetsDS.resetCounter() @@ -388,12 +406,12 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { assert.NoError(t, err) assert.False(t, below) t.Logf("fetched %d nodes (predicted range: %d-%d)", - countGetsDS.uniqueCidsFetched(), nodesToFetch, nodesToFetch+defaultConcurrentFetch) + countGetsDS.uniqueCidsFetched(), optimalNodesToFetch, nodesToFetch+defaultConcurrentFetch) // Check that the actual number of nodes fetched is within the margin of the // estimated `nodesToFetch` plus an extra of `defaultConcurrentFetch` since // we are fetching in parallel. assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch+defaultConcurrentFetch) - assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() >= optimalNodesToFetch) } // Compare entries in the leftDir against the rightDir and possibly @@ -537,21 +555,23 @@ func newEmptyHAMTDirectory(dserv ipld.DAGService, shardWidth int) (*HAMTDirector // countGetsDS is a DAG service that keeps track of the number of // unique CIDs fetched. type countGetsDS struct { - ipld.DAGService + blockstore.Blockstore cidsFetched map[cid.Cid]struct{} mapLock sync.Mutex + started bool getRequestDelay time.Duration } -var _ ipld.DAGService = (*countGetsDS)(nil) +var _ blockstore.Blockstore = (*countGetsDS)(nil) -func newCountGetsDS(ds ipld.DAGService) *countGetsDS { +func newCountGetsDS(bs blockstore.Blockstore) *countGetsDS { return &countGetsDS{ - ds, + bs, make(map[cid.Cid]struct{}), sync.Mutex{}, + false, 0, } } @@ -560,6 +580,7 @@ func (d *countGetsDS) resetCounter() { d.mapLock.Lock() defer d.mapLock.Unlock() d.cidsFetched = make(map[cid.Cid]struct{}) + d.started = true } func (d *countGetsDS) uniqueCidsFetched() int { @@ -572,12 +593,7 @@ func (d *countGetsDS) setRequestDelay(timeout time.Duration) { d.getRequestDelay = timeout } -func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { - node, err := d.DAGService.Get(ctx, c) - if err != nil { - return nil, err - } - +func (d *countGetsDS) maybeSleep(c cid.Cid) { d.mapLock.Lock() _, cidRequestedBefore := d.cidsFetched[c] d.cidsFetched[c] = struct{}{} @@ -588,11 +604,35 @@ func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { // Subsequent requests get no timeout simulating an in-disk cache. time.Sleep(d.getRequestDelay) } +} - return node, nil +func (d *countGetsDS) Has(c cid.Cid) (bool, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.Has(c) } -// Process sequentially (blocking) calling Get which tracks requests. -func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { - panic("GetMany not supported") +func (d *countGetsDS) Get(c cid.Cid) (blocks.Block, error) { + blk, err := d.Blockstore.Get(c) + if err != nil { + return nil, err + } + + d.maybeSleep(c) + return blk, nil +} + +func (d *countGetsDS) GetSize(c cid.Cid) (int, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.GetSize(c) +} + +func (d *countGetsDS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.AllKeysChan(ctx) }