Skip to content

Commit 338060b

Browse files
committed
no v1 to v2 index migration
1 parent ffe5f21 commit 338060b

File tree

6 files changed

+110
-26
lines changed

6 files changed

+110
-26
lines changed

market/indexstore/indexstore.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"golang.org/x/sync/errgroup"
2020
"golang.org/x/xerrors"
2121

22+
commcid "github.com/filecoin-project/go-fil-commcid"
23+
2224
"github.com/filecoin-project/curio/deps/config"
2325
)
2426

@@ -349,10 +351,10 @@ func (i *IndexStore) RemoveIndexes(ctx context.Context, pieceCidv2 cid.Cid) erro
349351
return nil
350352
}
351353

352-
// PieceInfo contains PieceCidV2 and BlockSize
354+
// PieceInfo contains PieceCid and BlockSize. PieceCid can be either v1 or v2.
353355
type PieceInfo struct {
354-
PieceCidV2 cid.Cid
355-
BlockSize uint64
356+
PieceCid cid.Cid
357+
BlockSize uint64
356358
}
357359

358360
// PiecesContainingMultihash gets all pieces that contain a multihash along with their BlockSize
@@ -369,8 +371,8 @@ func (i *IndexStore) PiecesContainingMultihash(ctx context.Context, m multihash.
369371
return nil, fmt.Errorf("parsing piece cid: %w", err)
370372
}
371373
pieces = append(pieces, PieceInfo{
372-
PieceCidV2: pcid,
373-
BlockSize: blockSize,
374+
PieceCid: pcid,
375+
BlockSize: blockSize,
374376
})
375377
}
376378
if err := iter.Close(); err != nil {
@@ -397,21 +399,41 @@ func (i *IndexStore) GetOffset(ctx context.Context, pieceCidv2 cid.Cid, hash mul
397399
}
398400

399401
func (i *IndexStore) GetPieceHashRange(ctx context.Context, piecev2 cid.Cid, start multihash.Multihash, num int64) ([]multihash.Multihash, error) {
400-
qry := "SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash >= ? ORDER BY PayloadMultihash ASC LIMIT ?"
401-
iter := i.session.Query(qry, piecev2.Bytes(), []byte(start), num).WithContext(ctx).Iter()
402+
getHashes := func(pieceCid cid.Cid, start multihash.Multihash, num int64) ([]multihash.Multihash, error) {
403+
qry := "SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash >= ? ORDER BY PayloadMultihash ASC LIMIT ?"
404+
iter := i.session.Query(qry, pieceCid.Bytes(), []byte(start), num).WithContext(ctx).Iter()
402405

403-
var hashes []multihash.Multihash
404-
var r []byte
405-
for iter.Scan(&r) {
406-
m := multihash.Multihash(r)
407-
hashes = append(hashes, m)
406+
var hashes []multihash.Multihash
407+
var r []byte
408+
for iter.Scan(&r) {
409+
m := multihash.Multihash(r)
410+
hashes = append(hashes, m)
408411

409-
// Allocate new r, preallocating the typical size of a multihash (36 bytes)
410-
r = make([]byte, 0, 36)
412+
// Allocate new r, preallocating the typical size of a multihash (36 bytes)
413+
r = make([]byte, 0, 36)
414+
}
415+
if err := iter.Close(); err != nil {
416+
return nil, xerrors.Errorf("iterating piece hash range (P:0x%02x, H:0x%02x, n:%d): %w", pieceCid.Bytes(), []byte(start), num, err)
417+
}
418+
return hashes, nil
411419
}
412-
if err := iter.Close(); err != nil {
413-
return nil, xerrors.Errorf("iterating piece hash range (P:0x%02x, H:0x%02x, n:%d): %w", piecev2.Bytes(), []byte(start), num, err)
420+
421+
hashes, err := getHashes(piecev2, start, num)
422+
if err != nil {
423+
return nil, err
414424
}
425+
426+
if len(hashes) == 0 {
427+
pcid1, _, err := commcid.PieceCidV1FromV2(piecev2)
428+
if err != nil {
429+
return nil, xerrors.Errorf("getting piece cid v1 from v2: %w", err)
430+
}
431+
hashes, err = getHashes(pcid1, start, num)
432+
if err != nil {
433+
return nil, err
434+
}
435+
}
436+
415437
if len(hashes) != int(num) {
416438
return nil, xerrors.Errorf("expected %d hashes, got %d (possibly missing indexes)", num, len(hashes))
417439
}

market/ipni/chunker/serve-chunker.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"encoding/hex"
88
"errors"
9+
"fmt"
910
"io"
1011
"time"
1112

@@ -177,6 +178,43 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated
177178
return nil, xerrors.Errorf("parsing piece CID: %w", err)
178179
}
179180

181+
// Convert to pcid2 if needed
182+
yes := commcidv2.IsPieceCidV2(pieceCidv2)
183+
if !yes {
184+
var rawSize int64
185+
var singlePiece bool
186+
err := p.db.QueryRow(ctx, `WITH meta AS (
187+
SELECT piece_size
188+
FROM market_piece_metadata
189+
WHERE piece_cid = $1
190+
),
191+
exact AS (
192+
SELECT COUNT(*) AS n, MIN(piece_size) AS piece_size
193+
FROM meta
194+
),
195+
raw AS (
196+
SELECT MAX(mpd.raw_size) AS raw_size
197+
FROM market_piece_deal mpd
198+
WHERE mpd.piece_cid = $1
199+
AND mpd.piece_length = (SELECT piece_size FROM exact)
200+
AND (SELECT n FROM exact) = 1
201+
)
202+
SELECT
203+
COALESCE((SELECT raw_size FROM raw), 0) AS raw_size,
204+
((SELECT n FROM exact) = 1) AS has_single_metadata;`, pieceCidv2.String()).Scan(&rawSize, &singlePiece)
205+
if err != nil {
206+
return nil, fmt.Errorf("failed to get piece metadata: %w", err)
207+
}
208+
if !singlePiece {
209+
return nil, fmt.Errorf("more than 1 piece metadata found for piece cid %s, please use piece cid v2", pieceCidv2.String())
210+
}
211+
pcid2, err := commcidv2.PieceCidV2FromV1(pieceCidv2, uint64(rawSize))
212+
if err != nil {
213+
return nil, fmt.Errorf("failed to convert piece cid v1 to v2: %w", err)
214+
}
215+
pieceCidv2 = pcid2
216+
}
217+
180218
if leave, ok := p.noSkipCache.Get(pieceCidv2); !ok || time.Now().After(leave) {
181219
skip, err := p.checkIsEntrySkip(ctx, block)
182220
if err != nil {

market/retrieval/remoteblockstore/remoteblockstore.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block,
115115
var merr error
116116
for _, piece := range pieces {
117117
data, err := func() ([]byte, error) {
118-
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2)
118+
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCid)
119119
if err != nil {
120120
return nil, fmt.Errorf("getting piece reader: %w", err)
121121
}
@@ -124,19 +124,19 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block,
124124
}(reader)
125125

126126
// Get the offset of the block within the piece (CAR file)
127-
offset, err := ro.idxApi.GetOffset(ctx, piece.PieceCidV2, c.Hash()) // This can be pieceCidV2 or pieceCidV1, but we don't care because we are feeding back the db output
127+
offset, err := ro.idxApi.GetOffset(ctx, piece.PieceCid, c.Hash()) // This can be pieceCidV2 or pieceCidV1, but we don't care because we are feeding back the db output
128128
if err != nil {
129-
return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, piece.PieceCidV2, err)
129+
return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, piece.PieceCid, err)
130130
}
131131

132132
// Seek to the section offset
133133
readerAt := io.NewSectionReader(reader, int64(offset), int64(piece.BlockSize+MaxCarBlockPrefixSize))
134134
readCid, data, err := util.ReadNode(bufio.NewReader(readerAt))
135135
if err != nil {
136-
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, piece.PieceCidV2, err)
136+
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, piece.PieceCid, err)
137137
}
138138
if !bytes.Equal(readCid.Hash(), c.Hash()) {
139-
return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, piece.PieceCidV2, c)
139+
return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, piece.PieceCid, c)
140140
}
141141
return data, nil
142142
}()

tasks/indexing/task_ipni.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
101101
return false, xerrors.Errorf("getting ipni task params: %w", err)
102102
}
103103

104+
if len(tasks) == 0 {
105+
return true, nil
106+
}
107+
104108
if len(tasks) != 1 {
105109
return false, xerrors.Errorf("expected 1 ipni task params, got %d", len(tasks))
106110
}
@@ -511,7 +515,7 @@ func (I *IPNITask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskE
511515
type task struct {
512516
TaskID harmonytask.TaskID `db:"task_id"`
513517
ID string `db:"id"`
514-
StorageID string `db:"storage_id"`
518+
StorageID sql.NullString `db:"storage_id"`
515519
IsRm bool `db:"is_rm"`
516520
}
517521

@@ -563,8 +567,8 @@ func (I *IPNITask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskE
563567
var mk12Tasks []task
564568
err := I.db.Select(ctx, &mk12Tasks, `
565569
SELECT dp.task_id, dp.id, l.storage_id FROM ipni_task dp
566-
INNER JOIN sector_location l ON dp.sp_id = l.miner_id AND dp.sector = l.sector_num
567-
WHERE dp.task_id = ANY ($1) AND l.sector_filetype = 1`, indIDs)
570+
LEFT JOIN sector_location l ON dp.sp_id = l.miner_id AND dp.sector = l.sector_num
571+
WHERE dp.task_id = ANY ($1) AND (l.sector_filetype IS NULL OR l.sector_filetype = 1)`, indIDs)
568572
if err != nil {
569573
return nil, xerrors.Errorf("getting storage details: %w", err)
570574
}
@@ -608,13 +612,29 @@ func (I *IPNITask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskE
608612
continue
609613
}
610614

615+
acceptables[t.TaskID] = false // note the task was found
616+
617+
if !t.StorageID.Valid {
618+
// no unsealed copy
619+
return &t.TaskID, nil
620+
}
621+
611622
for _, l := range ls {
612-
if string(l.ID) == t.StorageID {
623+
if string(l.ID) == t.StorageID.String {
613624
return &t.TaskID, nil
614625
}
615626
}
616627
}
617628

629+
// special case for orphan tasks which are created for non-announced pieces
630+
for taskID, notAccepted := range acceptables {
631+
if !notAccepted {
632+
continue
633+
}
634+
635+
return &taskID, nil
636+
}
637+
618638
return nil, nil
619639
}
620640

tasks/indexing/task_pdp_ipni.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func (P *PDPIPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don
8383
return false, xerrors.Errorf("getting ipni task params: %w", err)
8484
}
8585

86+
if len(tasks) == 0 {
87+
return true, nil
88+
}
89+
8690
if len(tasks) != 1 {
8791
return false, xerrors.Errorf("expected 1 ipni task params, got %d", len(tasks))
8892
}

0 commit comments

Comments
 (0)