From 23f0ae47bee6b3d75c27d03f0b8b9ef35f945b91 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 1 Sep 2021 14:02:47 -0300 Subject: [PATCH 1/4] wip --- hamt/hamt.go | 4 ++-- hamt/util.go | 9 ++------ io/directory_test.go | 54 ++++++++++++++++++++++++++------------------ 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/hamt/hamt.go b/hamt/hamt.go index bbe7dfa24..f2b5c75a6 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -393,8 +393,8 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return } // FIXME: Make concurrency an option for testing. - //err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent()) - err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit) + err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent()) + //err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit) if err != nil { emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) } diff --git a/hamt/util.go b/hamt/util.go index 45cf27168..ba378b0ab 100644 --- a/hamt/util.go +++ b/hamt/util.go @@ -91,7 +91,7 @@ func IdHash(val []byte) []byte { // * all leaf Shard nodes have the same depth (and have only 'value' links). // * all internal Shard nodes point only to other Shards (and hence have zero 'value' links). // * the total number of 'value' links (directory entries) is: -// io.DefaultShardWidth ^ treeHeight. +// io.DefaultShardWidth ^ (treeHeight + 1). // FIXME: HAMTHashFunction needs to be set to IdHash by the caller. We depend on // this simplification for the current logic to work. (HAMTHashFunction is a // global setting of the package, it is hard-coded in the serialized Shard node @@ -100,7 +100,7 @@ func IdHash(val []byte) []byte { // the fake hash as in io.SetAndPrevious through `newHashBits()` and pass // it as an argument making the hash independent of tree manipulation; that // sounds as the correct way to go in general and we wouldn't need this.) -func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) { +func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) (ipld.Node, error) { if treeHeight < 1 { panic("treeHeight < 1") } @@ -112,11 +112,6 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) { //} // FIXME: Any clean and simple way to do this? Otherwise remove check. - //childsPerNode := io.DefaultShardWidth - childsPerNode := 256 // (FIXME: hard-coded as we have an 'import cycle not - // allowed' error from io package otherwise.) - // FIXME: Evaluate making this an argument. - rootShard, err := NewShard(ds, childsPerNode) if err != nil { return nil, err diff --git a/io/directory_test.go b/io/directory_test.go index c85fdb013..55cb2bec1 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "testing" + "time" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -328,13 +329,12 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { oldHashFunc := hamt.HAMTHashFunction defer func() { hamt.HAMTHashFunction = oldHashFunc }() hamt.HAMTHashFunction = hamt.IdHash - //oldShardWidth := DefaultShardWidth - //defer func() { DefaultShardWidth = oldShardWidth }() - //DefaultShardWidth = 8 - // FIXME: We should be able to use a smaller DefaultShardWidth to have - // a deeper tree and cheaper tests once the import cycle is resolved - // in hamt.CreateCompleteHAMT and the DefaultShardWidth value is not - // hardcoded there. + oldShardWidth := DefaultShardWidth + defer func() { DefaultShardWidth = oldShardWidth }() + DefaultShardWidth = 256 // FIXME: Review number. From 256 to 8. + + // FIXME: Taken from private github.com/ipfs/go-merkledag@v0.2.3/merkledag.go. + defaultConcurrentFetch := 32 // We create a "complete" HAMT (see CreateCompleteHAMT for more details) // with a regular structure to be able to predict how many Shard nodes we @@ -343,9 +343,12 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { oldHamtOption := HAMTShardingSize defer func() { HAMTShardingSize = oldHamtOption }() // (Some arbitrary values below that make this test not that expensive.) - treeHeight := 2 - thresholdToWidthRatio := 4 // How many leaf shards nodes (with value links, + treeHeight := 2 // FIXME: Review number. From 2 to 3. + // How many leaf shards nodes (with value links, // i.e., directory entries) do we need to reach the threshold. + thresholdToWidthRatio := defaultConcurrentFetch + // FIXME: Review dag.Walk algorithm to better figure out this estimate. + HAMTShardingSize = DefaultShardWidth * thresholdToWidthRatio // With this structure we will then need to fetch the following nodes: // * `thresholdToWidthRatio` leaf Shards with enough value links to reach @@ -353,9 +356,10 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { // * `(treeHeight - 1)` internal nodes to reach those leaf Shard nodes // (assuming we have thresholdToWidthRatio below the DefaultShardWidth, // i.e., all leaf nodes come from the same parent). - nodesToFetch := thresholdToWidthRatio + treeHeight - 1 + //nodesToFetch := thresholdToWidthRatio + treeHeight - 1 + nodesToFetch := defaultConcurrentFetch * 2 // FIXME: Review. ds := mdtest.Mock() - completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight) + completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight, DefaultShardWidth) assert.NoError(t, err) countGetsDS := newCountGetsDS(ds) @@ -363,6 +367,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { assert.NoError(t, err) countGetsDS.resetCounter() + countGetsDS.setRequestDelay(100 * time.Millisecond) // FIXME: Only works with sequential DAG walk (now hardcoded, needs to be // added to the internal API) where we can predict the Get requests and // tree traversal. It would be desirable to have some test for the concurrent @@ -371,6 +376,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { assert.NoError(t, err) assert.False(t, below) assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched()) + //assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch) } // Compare entries in the leftDir against the rightDir and possibly @@ -519,6 +525,8 @@ type countGetsDS struct { cidsFetched map[cid.Cid]struct{} mapLock sync.Mutex + + getRequestDelay time.Duration } var _ ipld.DAGService = (*countGetsDS)(nil) @@ -528,6 +536,7 @@ func newCountGetsDS(ds ipld.DAGService) *countGetsDS { ds, make(map[cid.Cid]struct{}), sync.Mutex{}, + 0, } } @@ -543,6 +552,10 @@ func (d *countGetsDS) uniqueCidsFetched() int { return len(d.cidsFetched) } +func (d *countGetsDS) setRequestDelay(timeout time.Duration) { + d.getRequestDelay = timeout +} + func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { node, err := d.DAGService.Get(ctx, c) if err != nil { @@ -550,23 +563,20 @@ func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) { } d.mapLock.Lock() + _, cidRequestedBefore := d.cidsFetched[c] d.cidsFetched[c] = struct{}{} d.mapLock.Unlock() + if d.getRequestDelay != 0 && !cidRequestedBefore { + // First request gets a timeout to simulate a network fetch. + // Subsequent requests get no timeout simulating an in-disk cache. + time.Sleep(d.getRequestDelay) + } + return node, nil } // Process sequentially (blocking) calling Get which tracks requests. func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption { - out := make(chan *ipld.NodeOption, len(cids)) - defer close(out) - for _, c := range cids { - node, err := d.Get(ctx, c) - if err != nil { - out <- &ipld.NodeOption{Err: err} - break - } - out <- &ipld.NodeOption{Node: node} - } - return out + panic("GetMany not supported") } From 2485f1fcf701156e7dc56642ba9c04fbcb042275 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 1 Sep 2021 14:13:17 -0300 Subject: [PATCH 2/4] fix CreateCompleteHAMT test --- hamt/util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hamt/util_test.go b/hamt/util_test.go index 1cdb10813..5af98a0f8 100644 --- a/hamt/util_test.go +++ b/hamt/util_test.go @@ -73,7 +73,7 @@ func TestCreateCompleteShard(t *testing.T) { treeHeight := 2 // This is the limit of what we can fastly generate, // the default width is too big (256). We may need to refine // CreateCompleteHAMT encoding of the key to reduce the tableSize. - node, err := CreateCompleteHAMT(ds, treeHeight) + node, err := CreateCompleteHAMT(ds, treeHeight, 256) assert.NoError(t, err) shard, err := NewHamtFromDag(ds, node) From f376de797041dfd7f504ef2b2b1c3b99401f2d90 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 1 Sep 2021 14:14:03 -0300 Subject: [PATCH 3/4] relax test constraint --- io/directory_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/io/directory_test.go b/io/directory_test.go index 55cb2bec1..c08e16485 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -375,8 +375,8 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0) assert.NoError(t, err) assert.False(t, below) - assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched()) - //assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch) + //assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched()) + assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch) } // Compare entries in the leftDir against the rightDir and possibly From 2c0e94d1b1eef980f6eb21a3ea6930871bea22aa Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Thu, 2 Sep 2021 15:18:29 -0300 Subject: [PATCH 4/4] fix value of nodes fetched in test according to a BFS walk --- hamt/util.go | 2 ++ io/directory_test.go | 38 +++++++++++++++++++++++++++----------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/hamt/util.go b/hamt/util.go index ba378b0ab..8b22ba274 100644 --- a/hamt/util.go +++ b/hamt/util.go @@ -126,6 +126,8 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) ( var hashbuf [8]byte binary.LittleEndian.PutUint64(hashbuf[:], uint64(i)) var oldLink *ipld.Link + // FIXME: This is wrong for childsPerNode/DefaultShardWidth different + // than 256 (i.e., one byte of key per level). oldLink, err = rootShard.SetAndPrevious(context.Background(), string(hashbuf[:treeHeight]), unixfs.EmptyFileNode()) if err != nil { return nil, err diff --git a/io/directory_test.go b/io/directory_test.go index c08e16485..2e969e620 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -331,9 +331,13 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { hamt.HAMTHashFunction = hamt.IdHash oldShardWidth := DefaultShardWidth defer func() { DefaultShardWidth = oldShardWidth }() - DefaultShardWidth = 256 // FIXME: Review number. From 256 to 8. + DefaultShardWidth = 16 // FIXME: Review number. From 256 to 16 or 8 (if + // (if we fix CreateCompleteHAMT). // FIXME: Taken from private github.com/ipfs/go-merkledag@v0.2.3/merkledag.go. + // (We can also pass an explicit concurrency value in `(*Shard).EnumLinksAsync()` + // and take ownership of this configuration, but departing from the more + // standard and reliable one in `go-merkledag`. defaultConcurrentFetch := 32 // We create a "complete" HAMT (see CreateCompleteHAMT for more details) @@ -343,21 +347,28 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { oldHamtOption := HAMTShardingSize defer func() { HAMTShardingSize = oldHamtOption }() // (Some arbitrary values below that make this test not that expensive.) - treeHeight := 2 // FIXME: Review number. From 2 to 3. + treeHeight := 3 // FIXME: Review number. From 2 to 3. // How many leaf shards nodes (with value links, // i.e., directory entries) do we need to reach the threshold. - thresholdToWidthRatio := defaultConcurrentFetch + thresholdToWidthRatio := 4 // FIXME: Review dag.Walk algorithm to better figure out this estimate. HAMTShardingSize = DefaultShardWidth * thresholdToWidthRatio - // With this structure we will then need to fetch the following nodes: + // With this structure and a BFS traversal (from `parallelWalkDepth`) then + // we would roughly fetch the following nodes: + nodesToFetch := 0 + // * all layers up to (but not including) the last one with leaf nodes + // (because it's a BFS) + for i := 0; i < treeHeight; i++ { + nodesToFetch += int(math.Pow(float64(DefaultShardWidth), float64(i))) + } // * `thresholdToWidthRatio` leaf Shards with enough value links to reach // the HAMTShardingSize threshold. - // * `(treeHeight - 1)` internal nodes to reach those leaf Shard nodes - // (assuming we have thresholdToWidthRatio below the DefaultShardWidth, - // i.e., all leaf nodes come from the same parent). - //nodesToFetch := thresholdToWidthRatio + treeHeight - 1 - nodesToFetch := defaultConcurrentFetch * 2 // FIXME: Review. + nodesToFetch += thresholdToWidthRatio + // * `defaultConcurrentFetch` potential extra nodes of the threads working + // in parallel + nodesToFetch += defaultConcurrentFetch + ds := mdtest.Mock() completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight, DefaultShardWidth) assert.NoError(t, err) @@ -367,7 +378,7 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { assert.NoError(t, err) countGetsDS.resetCounter() - countGetsDS.setRequestDelay(100 * time.Millisecond) + countGetsDS.setRequestDelay(10 * time.Millisecond) // FIXME: Only works with sequential DAG walk (now hardcoded, needs to be // added to the internal API) where we can predict the Get requests and // tree traversal. It would be desirable to have some test for the concurrent @@ -375,8 +386,13 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) { below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0) assert.NoError(t, err) assert.False(t, below) - //assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched()) + t.Logf("fetched %d/%d nodes", countGetsDS.uniqueCidsFetched(), nodesToFetch) assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch-defaultConcurrentFetch) + // (Without the `setRequestDelay` above the number of nodes fetched + // drops dramatically and unpredictably as the BFS starts to behave + // more like a DFS because some search paths are fetched faster than + // others.) } // Compare entries in the leftDir against the rightDir and possibly