diff --git a/hamt/hamt.go b/hamt/hamt.go index bbe7dfa24..f2b5c75a6 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -393,8 +393,8 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return } // FIXME: Make concurrency an option for testing. - //err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent()) - err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit) + err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent()) + //err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit) if err != nil { emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) } diff --git a/hamt/util.go b/hamt/util.go index 45cf27168..8b22ba274 100644 --- a/hamt/util.go +++ b/hamt/util.go @@ -91,7 +91,7 @@ func IdHash(val []byte) []byte { // * all leaf Shard nodes have the same depth (and have only 'value' links). // * all internal Shard nodes point only to other Shards (and hence have zero 'value' links). // * the total number of 'value' links (directory entries) is: -// io.DefaultShardWidth ^ treeHeight. +// io.DefaultShardWidth ^ (treeHeight + 1). // FIXME: HAMTHashFunction needs to be set to IdHash by the caller. We depend on // this simplification for the current logic to work. (HAMTHashFunction is a // global setting of the package, it is hard-coded in the serialized Shard node @@ -100,7 +100,7 @@ func IdHash(val []byte) []byte { // the fake hash as in io.SetAndPrevious through `newHashBits()` and pass // it as an argument making the hash independent of tree manipulation; that // sounds as the correct way to go in general and we wouldn't need this.) -func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) { +func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) (ipld.Node, error) { if treeHeight < 1 { panic("treeHeight < 1") } @@ -112,11 +112,6 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) { //} // FIXME: Any clean and simple way to do this? Otherwise remove check. - //childsPerNode := io.DefaultShardWidth - childsPerNode := 256 // (FIXME: hard-coded as we have an 'import cycle not - // allowed' error from io package otherwise.) - // FIXME: Evaluate making this an argument. - rootShard, err := NewShard(ds, childsPerNode) if err != nil { return nil, err @@ -131,6 +126,8 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) { var hashbuf [8]byte binary.LittleEndian.PutUint64(hashbuf[:], uint64(i)) var oldLink *ipld.Link + // FIXME: This is wrong for childsPerNode/DefaultShardWidth different + // than 256 (i.e., one byte of key per level). oldLink, err = rootShard.SetAndPrevious(context.Background(), string(hashbuf[:treeHeight]), unixfs.EmptyFileNode()) if err != nil { return nil, err diff --git a/hamt/util_test.go b/hamt/util_test.go index 1cdb10813..5af98a0f8 100644 --- a/hamt/util_test.go +++ b/hamt/util_test.go @@ -73,7 +73,7 @@ func TestCreateCompleteShard(t *testing.T) { treeHeight := 2 // This is the limit of what we can fastly generate, // the default width is too big (256). We may need to refine // CreateCompleteHAMT encoding of the key to reduce the tableSize. - node, err := CreateCompleteHAMT(ds, treeHeight) + node, err := CreateCompleteHAMT(ds, treeHeight, 256) assert.NoError(t, err) shard, err := NewHamtFromDag(ds, node) diff --git a/io/directory_test.go b/io/directory_test.go index c85fdb013..2e969e620 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "testing" + "time" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -328,13 +329,16 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { oldHashFunc := hamt.HAMTHashFunction defer func() { hamt.HAMTHashFunction = oldHashFunc }() hamt.HAMTHashFunction = hamt.IdHash - //oldShardWidth := DefaultShardWidth - //defer func() { DefaultShardWidth = oldShardWidth }() - //DefaultShardWidth = 8 - // FIXME: We should be able to use a smaller DefaultShardWidth to have - // a deeper tree and cheaper tests once the import cycle is resolved - // in hamt.CreateCompleteHAMT and the DefaultShardWidth value is not - // hardcoded there. + oldShardWidth := DefaultShardWidth + defer func() { DefaultShardWidth = oldShardWidth }() + DefaultShardWidth = 16 // FIXME: Review number. From 256 to 16 or 8 (if + // (if we fix CreateCompleteHAMT). + + // FIXME: Taken from private github.com/ipfs/go-merkledag@v0.2.3/merkledag.go. + // (We can also pass an explicit concurrency value in `(*Shard).EnumLinksAsync()` + // and take ownership of this configuration, but departing from the more + // standard and reliable one in `go-merkledag`. + defaultConcurrentFetch := 32 // We create a "complete" HAMT (see CreateCompleteHAMT for more details) // with a regular structure to be able to predict how many Shard nodes we @@ -343,19 +347,30 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { oldHamtOption := HAMTShardingSize defer func() { HAMTShardingSize = oldHamtOption }() // (Some arbitrary values below that make this test not that expensive.) - treeHeight := 2 - thresholdToWidthRatio := 4 // How many leaf shards nodes (with value links, + treeHeight := 3 // FIXME: Review number. From 2 to 3. + // How many leaf shards nodes (with value links, // i.e., directory entries) do we need to reach the threshold. + thresholdToWidthRatio := 4 + // FIXME: Review dag.Walk algorithm to better figure out this estimate. + HAMTShardingSize = DefaultShardWidth * thresholdToWidthRatio - // With this structure we will then need to fetch the following nodes: + // With this structure and a BFS traversal (from `parallelWalkDepth`) then + // we would roughly fetch the following nodes: + nodesToFetch := 0 + // * all layers up to (but not including) the last one with leaf nodes + // (because it's a BFS) + for i := 0; i < treeHeight; i++ { + nodesToFetch += int(math.Pow(float64(DefaultShardWidth), float64(i))) + } // * `thresholdToWidthRatio` leaf Shards with enough value links to reach // the HAMTShardingSize threshold. - // * `(treeHeight - 1)` internal nodes to reach those leaf Shard nodes - // (assuming we have thresholdToWidthRatio below the DefaultShardWidth, - // i.e., all leaf nodes come from the same parent). - nodesToFetch := thresholdToWidthRatio + treeHeight - 1 + nodesToFetch += thresholdToWidthRatio + // * `defaultConcurrentFetch` potential extra nodes of the threads working + // in parallel + nodesToFetch += defaultConcurrentFetch + ds := mdtest.Mock() - completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight) + completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight, DefaultShardWidth) assert.NoError(t, err) countGetsDS := newCountGetsDS(ds) @@ -363,6 +378,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { assert.NoError(t, err) countGetsDS.resetCounter() + countGetsDS.setRequestDelay(10 * time.Millisecond) // FIXME: Only works with sequential DAG walk (now hardcoded, needs to be // added to the internal API) where we can predict the Get requests and // tree traversal. It would be desirable to have some test for the concurrent @@ -370,7 +386,13 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0) assert.NoError(t, err) assert.False(t, below) - assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched()) + t.Logf("fetched %d/%d nodes", countGetsDS.uniqueCidsFetched(), nodesToFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch-defaultConcurrentFetch) + // (Without the `setRequestDelay` above the number of nodes fetched + // drops dramatically and unpredictably as the BFS starts to behave + // more like a DFS because some search paths are fetched faster than + // others.) } // Compare entries in the leftDir against the rightDir and possibly @@ -519,6 +541,8 @@ type countGetsDS struct { cidsFetched map[cid.Cid]struct{} mapLock sync.Mutex + + getRequestDelay time.Duration } var _ ipld.DAGService = (*countGetsDS)(nil) @@ -528,6 +552,7 @@ func newCountGetsDS(ds ipld.DAGService) *countGetsDS { ds, make(map[cid.Cid]struct{}), sync.Mutex{}, + 0, } } @@ -543,6 +568,10 @@ func (d *countGetsDS) uniqueCidsFetched() int { return len(d.cidsFetched) } +func (d *countGetsDS) setRequestDelay(timeout time.Duration) { + d.getRequestDelay = timeout +} + func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { node, err := d.DAGService.Get(ctx, c) if err != nil { @@ -550,23 +579,20 @@ func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { } d.mapLock.Lock() + _, cidRequestedBefore := d.cidsFetched[c] d.cidsFetched[c] = struct{}{} d.mapLock.Unlock() + if d.getRequestDelay != 0 && !cidRequestedBefore { + // First request gets a timeout to simulate a network fetch. + // Subsequent requests get no timeout simulating an in-disk cache. + time.Sleep(d.getRequestDelay) + } + return node, nil } // Process sequentially (blocking) calling Get which tracks requests. func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { - out := make(chan *ipld.NodeOption, len(cids)) - defer close(out) - for _, c := range cids { - node, err := d.Get(ctx, c) - if err != nil { - out <- &ipld.NodeOption{Err: err} - break - } - out <- &ipld.NodeOption{Node: node} - } - return out + panic("GetMany not supported") }