Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.

Commit cdcb270

Browse files
authored
Concurrent walk test (#106)
* wip * fix CreateCompleteHAMT test * relax test constraint * fix value of nodes fetched in test according to a BFS walk
1 parent 2f9afe9 commit cdcb270

File tree

4 files changed

+60
-37
lines changed

4 files changed

+60
-37
lines changed

hamt/hamt.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,8 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
393393
return
394394
}
395395
// FIXME: Make concurrency an option for testing.
396-
//err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent())
397-
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit)
396+
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent())
397+
//err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit)
398398
if err != nil {
399399
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
400400
}

hamt/util.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func IdHash(val []byte) []byte {
9191
// * all leaf Shard nodes have the same depth (and have only 'value' links).
9292
// * all internal Shard nodes point only to other Shards (and hence have zero 'value' links).
9393
// * the total number of 'value' links (directory entries) is:
94-
// io.DefaultShardWidth ^ treeHeight.
94+
// io.DefaultShardWidth ^ (treeHeight + 1).
9595
// FIXME: HAMTHashFunction needs to be set to IdHash by the caller. We depend on
9696
// this simplification for the current logic to work. (HAMTHashFunction is a
9797
// global setting of the package, it is hard-coded in the serialized Shard node
@@ -100,7 +100,7 @@ func IdHash(val []byte) []byte {
100100
// the fake hash as in io.SetAndPrevious through `newHashBits()` and pass
101101
// it as an argument making the hash independent of tree manipulation; that
102102
// sounds as the correct way to go in general and we wouldn't need this.)
103-
func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) {
103+
func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) (ipld.Node, error) {
104104
if treeHeight < 1 {
105105
panic("treeHeight < 1")
106106
}
@@ -112,11 +112,6 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) {
112112
//}
113113
// FIXME: Any clean and simple way to do this? Otherwise remove check.
114114

115-
//childsPerNode := io.DefaultShardWidth
116-
childsPerNode := 256 // (FIXME: hard-coded as we have an 'import cycle not
117-
// allowed' error from io package otherwise.)
118-
// FIXME: Evaluate making this an argument.
119-
120115
rootShard, err := NewShard(ds, childsPerNode)
121116
if err != nil {
122117
return nil, err
@@ -131,6 +126,8 @@ func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int) (ipld.Node, error) {
131126
var hashbuf [8]byte
132127
binary.LittleEndian.PutUint64(hashbuf[:], uint64(i))
133128
var oldLink *ipld.Link
129+
// FIXME: This is wrong for childsPerNode/DefaultShardWidth different
130+
// than 256 (i.e., one byte of key per level).
134131
oldLink, err = rootShard.SetAndPrevious(context.Background(), string(hashbuf[:treeHeight]), unixfs.EmptyFileNode())
135132
if err != nil {
136133
return nil, err

hamt/util_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestCreateCompleteShard(t *testing.T) {
7373
treeHeight := 2 // This is the limit of what we can fastly generate,
7474
// the default width is too big (256). We may need to refine
7575
// CreateCompleteHAMT encoding of the key to reduce the tableSize.
76-
node, err := CreateCompleteHAMT(ds, treeHeight)
76+
node, err := CreateCompleteHAMT(ds, treeHeight, 256)
7777
assert.NoError(t, err)
7878

7979
shard, err := NewHamtFromDag(ds, node)

io/directory_test.go

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"sync"
1111
"testing"
12+
"time"
1213

1314
cid "github.com/ipfs/go-cid"
1415
ipld "github.com/ipfs/go-ipld-format"
@@ -328,13 +329,16 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) {
328329
oldHashFunc := hamt.HAMTHashFunction
329330
defer func() { hamt.HAMTHashFunction = oldHashFunc }()
330331
hamt.HAMTHashFunction = hamt.IdHash
331-
//oldShardWidth := DefaultShardWidth
332-
//defer func() { DefaultShardWidth = oldShardWidth }()
333-
//DefaultShardWidth = 8
334-
// FIXME: We should be able to use a smaller DefaultShardWidth to have
335-
// a deeper tree and cheaper tests once the import cycle is resolved
336-
// in hamt.CreateCompleteHAMT and the DefaultShardWidth value is not
337-
// hardcoded there.
332+
oldShardWidth := DefaultShardWidth
333+
defer func() { DefaultShardWidth = oldShardWidth }()
334+
DefaultShardWidth = 16 // FIXME: Review number. From 256 to 16 or 8 (if
335+
// (if we fix CreateCompleteHAMT).
336+
337+
// FIXME: Taken from private github.com/ipfs/[email protected]/merkledag.go.
338+
// (We can also pass an explicit concurrency value in `(*Shard).EnumLinksAsync()`
339+
// and take ownership of this configuration, but departing from the more
340+
// standard and reliable one in `go-merkledag`.
341+
defaultConcurrentFetch := 32
338342

339343
// We create a "complete" HAMT (see CreateCompleteHAMT for more details)
340344
// with a regular structure to be able to predict how many Shard nodes we
@@ -343,34 +347,52 @@ func TestHAMTEnumerationWhenComputingSize(t *testing.T) {
343347
oldHamtOption := HAMTShardingSize
344348
defer func() { HAMTShardingSize = oldHamtOption }()
345349
// (Some arbitrary values below that make this test not that expensive.)
346-
treeHeight := 2
347-
thresholdToWidthRatio := 4 // How many leaf shards nodes (with value links,
350+
treeHeight := 3 // FIXME: Review number. From 2 to 3.
351+
// How many leaf shards nodes (with value links,
348352
// i.e., directory entries) do we need to reach the threshold.
353+
thresholdToWidthRatio := 4
354+
// FIXME: Review dag.Walk algorithm to better figure out this estimate.
355+
349356
HAMTShardingSize = DefaultShardWidth * thresholdToWidthRatio
350-
// With this structure we will then need to fetch the following nodes:
357+
// With this structure and a BFS traversal (from `parallelWalkDepth`) then
358+
// we would roughly fetch the following nodes:
359+
nodesToFetch := 0
360+
// * all layers up to (but not including) the last one with leaf nodes
361+
// (because it's a BFS)
362+
for i := 0; i < treeHeight; i++ {
363+
nodesToFetch += int(math.Pow(float64(DefaultShardWidth), float64(i)))
364+
}
351365
// * `thresholdToWidthRatio` leaf Shards with enough value links to reach
352366
// the HAMTShardingSize threshold.
353-
// * `(treeHeight - 1)` internal nodes to reach those leaf Shard nodes
354-
// (assuming we have thresholdToWidthRatio below the DefaultShardWidth,
355-
// i.e., all leaf nodes come from the same parent).
356-
nodesToFetch := thresholdToWidthRatio + treeHeight - 1
367+
nodesToFetch += thresholdToWidthRatio
368+
// * `defaultConcurrentFetch` potential extra nodes of the threads working
369+
// in parallel
370+
nodesToFetch += defaultConcurrentFetch
371+
357372
ds := mdtest.Mock()
358-
completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight)
373+
completeHAMTRoot, err := hamt.CreateCompleteHAMT(ds, treeHeight, DefaultShardWidth)
359374
assert.NoError(t, err)
360375

361376
countGetsDS := newCountGetsDS(ds)
362377
hamtDir, err := newHAMTDirectoryFromNode(countGetsDS, completeHAMTRoot)
363378
assert.NoError(t, err)
364379

365380
countGetsDS.resetCounter()
381+
countGetsDS.setRequestDelay(10 * time.Millisecond)
366382
// FIXME: Only works with sequential DAG walk (now hardcoded, needs to be
367383
// added to the internal API) where we can predict the Get requests and
368384
// tree traversal. It would be desirable to have some test for the concurrent
369385
// walk (which is the one used in production).
370386
below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0)
371387
assert.NoError(t, err)
372388
assert.False(t, below)
373-
assert.Equal(t, nodesToFetch, countGetsDS.uniqueCidsFetched())
389+
t.Logf("fetched %d/%d nodes", countGetsDS.uniqueCidsFetched(), nodesToFetch)
390+
assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch)
391+
assert.True(t, countGetsDS.uniqueCidsFetched() >= nodesToFetch-defaultConcurrentFetch)
392+
// (Without the `setRequestDelay` above the number of nodes fetched
393+
// drops dramatically and unpredictably as the BFS starts to behave
394+
// more like a DFS because some search paths are fetched faster than
395+
// others.)
374396
}
375397

376398
// Compare entries in the leftDir against the rightDir and possibly
@@ -519,6 +541,8 @@ type countGetsDS struct {
519541

520542
cidsFetched map[cid.Cid]struct{}
521543
mapLock sync.Mutex
544+
545+
getRequestDelay time.Duration
522546
}
523547

524548
var _ ipld.DAGService = (*countGetsDS)(nil)
@@ -528,6 +552,7 @@ func newCountGetsDS(ds ipld.DAGService) *countGetsDS {
528552
ds,
529553
make(map[cid.Cid]struct{}),
530554
sync.Mutex{},
555+
0,
531556
}
532557
}
533558

@@ -543,30 +568,31 @@ func (d *countGetsDS) uniqueCidsFetched() int {
543568
return len(d.cidsFetched)
544569
}
545570

571+
func (d *countGetsDS) setRequestDelay(timeout time.Duration) {
572+
d.getRequestDelay = timeout
573+
}
574+
546575
func (d *countGetsDS) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
547576
node, err := d.DAGService.Get(ctx, c)
548577
if err != nil {
549578
return nil, err
550579
}
551580

552581
d.mapLock.Lock()
582+
_, cidRequestedBefore := d.cidsFetched[c]
553583
d.cidsFetched[c] = struct{}{}
554584
d.mapLock.Unlock()
555585

586+
if d.getRequestDelay != 0 && !cidRequestedBefore {
587+
// First request gets a timeout to simulate a network fetch.
588+
// Subsequent requests get no timeout simulating an in-disk cache.
589+
time.Sleep(d.getRequestDelay)
590+
}
591+
556592
return node, nil
557593
}
558594

559595
// Process sequentially (blocking) calling Get which tracks requests.
560596
func (d *countGetsDS) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption {
561-
out := make(chan *ipld.NodeOption, len(cids))
562-
defer close(out)
563-
for _, c := range cids {
564-
node, err := d.Get(ctx, c)
565-
if err != nil {
566-
out <- &ipld.NodeOption{Err: err}
567-
break
568-
}
569-
out <- &ipld.NodeOption{Node: node}
570-
}
571-
return out
597+
panic("GetMany not supported")
572598
}

0 commit comments

Comments
 (0)