diff --git a/Makefile b/Makefile index 5ea13f9e0..f0e7dba11 100644 --- a/Makefile +++ b/Makefile @@ -89,6 +89,12 @@ boost: $(BUILD_DEPS) .PHONY: boost BINS+=boost boostx boostd +booster-http: $(BUILD_DEPS) + rm -f booster-http + $(GOCC) build $(GOFLAGS) -o booster-http ./cmd/booster-http +.PHONY: booster-http +BINS+=booster-http + devnet: $(BUILD_DEPS) rm -f devnet $(GOCC) build $(GOFLAGS) -o devnet ./cmd/devnet diff --git a/api/api.go b/api/api.go index 2dc9e0146..711f62183 100644 --- a/api/api.go +++ b/api/api.go @@ -15,6 +15,7 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multihash" ) // MODIFYING THE API INTERFACE @@ -43,8 +44,8 @@ type Boost interface { BoostDagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:admin BoostDagstoreRecoverShard(ctx context.Context, key string) error //perm:admin BoostDagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin - - BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read + BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read + BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read // RuntimeSubsystems returns the subsystems that are enabled // in this instance. @@ -67,6 +68,7 @@ type Boost interface { PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) //perm:read PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) //perm:read PiecesGetCIDInfo(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error) //perm:read + PiecesGetMaxOffset(ctx context.Context, pieceCid cid.Cid) (uint64, error) //perm:read // MethodGroup: Actor ActorSectorSize(context.Context, address.Address) (abi.SectorSize, error) //perm:read diff --git a/api/proxy_gen.go b/api/proxy_gen.go index ade455970..e476313f6 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -23,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/multiformats/go-multihash" ) var ErrNotSupported = errors.New("method not supported") @@ -43,6 +44,8 @@ type BoostStruct struct { BoostDagstoreListShards func(p0 context.Context) ([]DagstoreShardInfo, error) `perm:"read"` + BoostDagstorePiecesContainingMultihash func(p0 context.Context, p1 multihash.Multihash) ([]cid.Cid, error) `perm:"read"` + BoostDagstoreRecoverShard func(p0 context.Context, p1 string) error `perm:"admin"` BoostDagstoreRegisterShard func(p0 context.Context, p1 string) error `perm:"admin"` @@ -109,6 +112,8 @@ type BoostStruct struct { PiecesGetCIDInfo func(p0 context.Context, p1 cid.Cid) (*piecestore.CIDInfo, error) `perm:"read"` + PiecesGetMaxOffset func(p0 context.Context, p1 cid.Cid) (uint64, error) `perm:"read"` + PiecesGetPieceInfo func(p0 context.Context, p1 cid.Cid) (*piecestore.PieceInfo, error) `perm:"read"` PiecesListCidInfos func(p0 context.Context) ([]cid.Cid, error) `perm:"read"` @@ -281,6 +286,17 @@ func (s *BoostStub) BoostDagstoreListShards(p0 context.Context) ([]DagstoreShard return *new([]DagstoreShardInfo), ErrNotSupported } +func (s *BoostStruct) BoostDagstorePiecesContainingMultihash(p0 context.Context, p1 multihash.Multihash) ([]cid.Cid, error) { + if s.Internal.BoostDagstorePiecesContainingMultihash == nil { + return *new([]cid.Cid), ErrNotSupported + } + return s.Internal.BoostDagstorePiecesContainingMultihash(p0, p1) +} + +func (s *BoostStub) BoostDagstorePiecesContainingMultihash(p0 context.Context, p1 multihash.Multihash) ([]cid.Cid, error) { + return *new([]cid.Cid), ErrNotSupported +} + func (s *BoostStruct) BoostDagstoreRecoverShard(p0 context.Context, p1 string) error { if s.Internal.BoostDagstoreRecoverShard == nil { return ErrNotSupported @@ -644,6 +660,17 @@ func (s *BoostStub) PiecesGetCIDInfo(p0 context.Context, p1 cid.Cid) (*piecestor return nil, ErrNotSupported } +func (s *BoostStruct) PiecesGetMaxOffset(p0 context.Context, p1 cid.Cid) (uint64, error) { + if s.Internal.PiecesGetMaxOffset == nil { + return 0, ErrNotSupported + } + return s.Internal.PiecesGetMaxOffset(p0, p1) +} + +func (s *BoostStub) PiecesGetMaxOffset(p0 context.Context, p1 cid.Cid) (uint64, error) { + return 0, ErrNotSupported +} + func (s *BoostStruct) PiecesGetPieceInfo(p0 context.Context, p1 cid.Cid) (*piecestore.PieceInfo, error) { if s.Internal.PiecesGetPieceInfo == nil { return nil, ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index 9c7bc4afc..91b3f1065 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/cmd/boostd/run.go b/cmd/boostd/run.go index 686c772fe..5505850ac 100644 --- a/cmd/boostd/run.go +++ b/cmd/boostd/run.go @@ -28,6 +28,10 @@ var runCmd = &cli.Command{ Name: "pprof", Usage: "run pprof web server on localhost:6060", }, + &cli.BoolFlag{ + Name: "nosync", + Usage: "dont wait for the full node to sync with the chain", + }, }, Action: func(cctx *cli.Context) error { if cctx.Bool("pprof") { diff --git a/cmd/booster-http/main.go b/cmd/booster-http/main.go new file mode 100644 index 000000000..27e2a405e --- /dev/null +++ b/cmd/booster-http/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "os" + + "github.com/filecoin-project/boost/build" + cliutil "github.com/filecoin-project/boost/cli/util" + logging "github.com/ipfs/go-log/v2" + "github.com/urfave/cli/v2" +) + +var log = logging.Logger("booster") + +func main() { + app := &cli.App{ + Name: "booster-http", + Usage: "HTTP endpoint for retrieval from Filecoin", + EnableBashCompletion: true, + Version: build.UserVersion(), + Flags: []cli.Flag{ + cliutil.FlagVeryVerbose, + }, + Commands: []*cli.Command{ + runCmd, + }, + } + app.Setup() + + if err := app.Run(os.Args); err != nil { + os.Stderr.WriteString("Error: " + err.Error() + "\n") + } +} + +func before(cctx *cli.Context) error { + _ = logging.SetLogLevel("booster", "INFO") + + if cliutil.IsVeryVerbose { + _ = logging.SetLogLevel("booster", "DEBUG") + } + + return nil +} diff --git a/cmd/booster-http/run.go b/cmd/booster-http/run.go new file mode 100644 index 000000000..bed04629f --- /dev/null +++ b/cmd/booster-http/run.go @@ -0,0 +1,270 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net/http" + _ "net/http/pprof" + "strings" + + "github.com/filecoin-project/boost/api" + bclient "github.com/filecoin-project/boost/api/client" + cliutil "github.com/filecoin-project/boost/cli/util" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/go-state-types/abi" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/client" + "github.com/filecoin-project/lotus/api/v0api" + "github.com/filecoin-project/lotus/api/v1api" + lcli "github.com/filecoin-project/lotus/cli" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/markets/dagstore" + "github.com/filecoin-project/lotus/markets/sectoraccessor" + lotus_modules "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/urfave/cli/v2" +) + +var runCmd = &cli.Command{ + Name: "run", + Usage: "Start a booster-http process", + Before: before, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "pprof", + Usage: "run pprof web server on localhost:6070", + }, + &cli.StringFlag{ + Name: "base-path", + Usage: "the base path at which to run the web server", + Value: "", + }, + &cli.UintFlag{ + Name: "port", + Usage: "the port the web server listens on", + Value: 7777, + }, + &cli.StringFlag{ + Name: "api-boost", + Usage: "the endpoint for the boost API", + Required: true, + }, + &cli.StringFlag{ + Name: "api-fullnode", + Usage: "the endpoint for the full node API", + Required: true, + }, + &cli.StringFlag{ + Name: "api-sealer", + Usage: "the endpoint for the sealer API", + Required: true, + }, + }, + Action: func(cctx *cli.Context) error { + if cctx.Bool("pprof") { + go func() { + err := http.ListenAndServe("localhost:6070", nil) + if err != nil { + log.Error(err) + } + }() + } + + // Connect to the Boost API + ctx := lcli.ReqContext(cctx) + boostApiInfo := cctx.String("api-boost") + bapi, bcloser, err := getBoostApi(ctx, boostApiInfo) + if err != nil { + return fmt.Errorf("getting boost API: %w", err) + } + defer bcloser() + + // Connect to the full node API + fnApiInfo := cctx.String("api-fullnode") + fullnodeApi, ncloser, err := getFullNodeApi(ctx, fnApiInfo) + if err != nil { + return fmt.Errorf("getting full node API: %w", err) + } + defer ncloser() + + // Connect to the sealing API + sealingApiInfo := cctx.String("api-sealer") + sauth, err := storageAuthWithURL(sealingApiInfo) + if err != nil { + return fmt.Errorf("parsing sealing API endpoint: %w", err) + } + sealingService, sealerCloser, err := getMinerApi(ctx, sealingApiInfo) + if err != nil { + return fmt.Errorf("getting miner API: %w", err) + } + defer sealerCloser() + + maddr, err := sealingService.ActorAddress(ctx) + if err != nil { + return fmt.Errorf("getting miner actor address: %w", err) + } + log.Infof("Miner address: %s", maddr) + + // Use an in-memory repo because we don't need any functions + // of a real repo, we just need to supply something that satisfies + // the LocalStorage interface to the store + memRepo := repo.NewMemory(nil) + lr, err := memRepo.Lock(repo.StorageMiner) + if err != nil { + return fmt.Errorf("locking mem repo: %w", err) + } + defer lr.Close() + + // Create the store interface + var urls []string + lstor, err := stores.NewLocal(ctx, lr, sealingService, urls) + if err != nil { + return fmt.Errorf("creating new local store: %w", err) + } + storage := lotus_modules.RemoteStorage(lstor, sealingService, sauth, sectorstorage.Config{ + // TODO: Not sure if I need this, or any of the other fields in this struct + ParallelFetchLimit: 1, + }) + // Create the piece provider and sector accessors + pp := sectorstorage.NewPieceProvider(storage, sealingService, sealingService) + sa := sectoraccessor.NewSectorAccessor(dtypes.MinerAddress(maddr), sealingService, pp, fullnodeApi) + // Create the server API + sapi := serverApi{ctx: ctx, bapi: bapi, sa: sa} + server := NewHttpServer(cctx.String("base-path"), cctx.Int("port"), sapi) + + // Start the server + log.Infof("Starting booster-http node on port %d with base path '%s'", + cctx.Int("port"), cctx.String("base-path")) + server.Start(ctx) + + // Monitor for shutdown. + <-ctx.Done() + + log.Info("Shutting down...") + + err = server.Stop() + if err != nil { + return err + } + log.Info("Graceful shutdown successful") + + // Sync all loggers. + _ = log.Sync() //nolint:errcheck + + return nil + }, +} + +func storageAuthWithURL(apiInfo string) (sectorstorage.StorageAuth, error) { + s := strings.Split(apiInfo, ":") + if len(s) != 2 { + return nil, errors.New("unexpected format of `apiInfo`") + } + headers := http.Header{} + headers.Add("Authorization", "Bearer "+s[0]) + return sectorstorage.StorageAuth(headers), nil +} + +type serverApi struct { + ctx context.Context + bapi api.Boost + sa dagstore.SectorAccessor +} + +var _ HttpServerApi = (*serverApi)(nil) + +func (s serverApi) PiecesContainingMultihash(mh multihash.Multihash) ([]cid.Cid, error) { + return s.bapi.BoostDagstorePiecesContainingMultihash(s.ctx, mh) +} + +func (s serverApi) GetMaxPieceOffset(pieceCid cid.Cid) (uint64, error) { + return s.bapi.PiecesGetMaxOffset(s.ctx, pieceCid) +} + +func (s serverApi) GetPieceInfo(pieceCID cid.Cid) (*piecestore.PieceInfo, error) { + return s.bapi.PiecesGetPieceInfo(s.ctx, pieceCID) +} + +func (s serverApi) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { + return s.sa.IsUnsealed(ctx, sectorID, offset, length) +} + +func (s serverApi) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) { + return s.sa.UnsealSectorAt(ctx, sectorID, offset, length) +} + +func getBoostApi(ctx context.Context, ai string) (api.Boost, jsonrpc.ClientCloser, error) { + ai = strings.TrimPrefix(strings.TrimSpace(ai), "BOOST_API_INFO=") + info := cliutil.ParseApiInfo(ai) + addr, err := info.DialArgs("v0") + if err != nil { + return nil, nil, fmt.Errorf("could not get DialArgs: %w", err) + } + + log.Infof("Using boost API at %s", addr) + api, closer, err := bclient.NewBoostRPCV0(ctx, addr, info.AuthHeader()) + if err != nil { + return nil, nil, fmt.Errorf("creating full node service API: %w", err) + } + + return api, closer, nil +} + +func getFullNodeApi(ctx context.Context, ai string) (v1api.FullNode, jsonrpc.ClientCloser, error) { + ai = strings.TrimPrefix(strings.TrimSpace(ai), "FULLNODE_API_INFO=") + info := cliutil.ParseApiInfo(ai) + addr, err := info.DialArgs("v1") + if err != nil { + return nil, nil, fmt.Errorf("could not get DialArgs: %w", err) + } + + log.Infof("Using full node API at %s", addr) + api, closer, err := client.NewFullNodeRPCV1(ctx, addr, info.AuthHeader()) + if err != nil { + return nil, nil, fmt.Errorf("creating full node service API: %w", err) + } + + v, err := api.Version(ctx) + if err != nil { + return nil, nil, fmt.Errorf("checking full node service API version: %w", err) + } + + if !v.APIVersion.EqMajorMinor(lapi.FullAPIVersion1) { + return nil, nil, fmt.Errorf("full node service API version didn't match (expected %s, remote %s)", lapi.FullAPIVersion1, v.APIVersion) + } + + return api, closer, nil +} + +func getMinerApi(ctx context.Context, ai string) (v0api.StorageMiner, jsonrpc.ClientCloser, error) { + ai = strings.TrimPrefix(strings.TrimSpace(ai), "MINER_API_INFO=") + info := cliutil.ParseApiInfo(ai) + addr, err := info.DialArgs("v0") + if err != nil { + return nil, nil, fmt.Errorf("could not get DialArgs: %w", err) + } + + log.Infof("Using sealing API at %s", addr) + api, closer, err := client.NewStorageMinerRPCV0(ctx, addr, info.AuthHeader()) + if err != nil { + return nil, nil, fmt.Errorf("creating miner service API: %w", err) + } + + v, err := api.Version(ctx) + if err != nil { + return nil, nil, fmt.Errorf("checking miner service API version: %w", err) + } + + if !v.APIVersion.EqMajorMinor(lapi.MinerAPIVersion0) { + return nil, nil, fmt.Errorf("miner service API version didn't match (expected %s, remote %s)", lapi.MinerAPIVersion0, v.APIVersion) + } + + return api, closer, nil +} diff --git a/cmd/booster-http/server.go b/cmd/booster-http/server.go new file mode 100644 index 000000000..eb16ce39b --- /dev/null +++ b/cmd/booster-http/server.go @@ -0,0 +1,433 @@ +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" + + "github.com/fatih/color" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-state-types/abi" + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/multiformats/go-multihash" + "github.com/multiformats/go-varint" +) + +var ErrNotFound = errors.New("not found") + +type HttpServer struct { + path string + port int + api HttpServerApi + + ctx context.Context + cancel context.CancelFunc + server *http.Server +} + +type HttpServerApi interface { + PiecesContainingMultihash(mh multihash.Multihash) ([]cid.Cid, error) + GetMaxPieceOffset(pieceCid cid.Cid) (uint64, error) + GetPieceInfo(pieceCID cid.Cid) (*piecestore.PieceInfo, error) + IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) + UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) +} + +func NewHttpServer(path string, port int, api HttpServerApi) *HttpServer { + return &HttpServer{path: path, port: port, api: api} +} + +func (s *HttpServer) payloadBasePath() string { + return s.path + "/payload/" +} + +func (s *HttpServer) pieceBasePath() string { + return s.path + "/piece/" +} + +func (s *HttpServer) Start(ctx context.Context) { + s.ctx, s.cancel = context.WithCancel(ctx) + + listenAddr := fmt.Sprintf(":%d", s.port) + handler := http.NewServeMux() + handler.HandleFunc(s.payloadBasePath(), s.handleByPayloadCid) + handler.HandleFunc(s.pieceBasePath(), s.handleByPieceCid) + handler.HandleFunc("/", s.handleIndex) + handler.HandleFunc("/index.html", s.handleIndex) + s.server = &http.Server{ + Addr: listenAddr, + Handler: handler, + // This context will be the parent of the context associated with all + // incoming requests + BaseContext: func(listener net.Listener) context.Context { + return s.ctx + }, + } + + go func() { + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("http.ListenAndServe(): %v", err) + } + }() +} + +func (s *HttpServer) Stop() error { + s.cancel() + return s.server.Close() +} + +const idxPage = ` + + +

Booster HTTP Server

+ Endpoints: + + + + + + + + + + + +
+ Download a CAR file by payload CID + + /payload/ +
+ Download a CAR file by piece CID + + /piece/ +
+ + +` + +func (s *HttpServer) handleIndex(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(idxPage)) //nolint:errcheck +} + +func (s *HttpServer) handleByPayloadCid(w http.ResponseWriter, r *http.Request) { + prefixLen := len(s.payloadBasePath()) + if len(r.URL.Path) <= prefixLen { + msg := fmt.Sprintf("path '%s' is missing piece CID", r.URL.Path) + writeError(w, r, http.StatusBadRequest, msg) + return + } + + fileName := r.URL.Path[prefixLen:] + payloadCidStr := strings.Replace(fileName, ".car", "", 1) + payloadCid, err := cid.Parse(payloadCidStr) + if err != nil { + msg := fmt.Sprintf("parsing payload CID '%s': %s", payloadCidStr, err.Error()) + writeError(w, r, http.StatusBadRequest, msg) + return + } + + pieces, err := s.api.PiecesContainingMultihash(payloadCid.Hash()) + if err != nil { + if isNotFoundError(err) { + msg := fmt.Sprintf("getting piece that contains payload CID '%s': %s", payloadCid, err.Error()) + writeError(w, r, http.StatusNotFound, msg) + return + } + log.Errorf("getting piece that contains payload CID '%s': %s", payloadCid, err) + msg := fmt.Sprintf("server error getting piece that contains payload CID '%s'", payloadCidStr) + writeError(w, r, http.StatusInternalServerError, msg) + return + } + + // Just get the content of the first piece returned (if the client wants a + // different piece they can just call the /piece endpoint) + pieceCid := pieces[0] + ctx := r.Context() + content, err := s.getPieceContent(ctx, pieceCid) + if err != nil { + if isNotFoundError(err) { + msg := fmt.Sprintf("getting content for payload CID %s in piece %s: %s", payloadCidStr, pieceCid, err) + writeError(w, r, http.StatusNotFound, msg) + return + } + log.Errorf("getting content for payload CID %s in piece %s: %s", payloadCid, pieceCid, err) + msg := fmt.Sprintf("server error getting content for payload CID %s in piece %s", payloadCidStr, pieceCid) + writeError(w, r, http.StatusInternalServerError, msg) + return + } + + serveCAR(w, r, content) +} + +func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { + prefixLen := len(s.pieceBasePath()) + if len(r.URL.Path) <= prefixLen { + msg := fmt.Sprintf("path '%s' is missing piece CID", r.URL.Path) + writeError(w, r, http.StatusBadRequest, msg) + return + } + + fileName := r.URL.Path[prefixLen:] + pieceCidStr := strings.Replace(fileName, ".car", "", 1) + pieceCid, err := cid.Parse(pieceCidStr) + if err != nil { + msg := fmt.Sprintf("parsing piece CID '%s': %s", pieceCidStr, err.Error()) + writeError(w, r, http.StatusBadRequest, msg) + return + } + + ctx := r.Context() + content, err := s.getPieceContent(ctx, pieceCid) + if err != nil { + if isNotFoundError(err) { + writeError(w, r, http.StatusNotFound, err.Error()) + return + } + log.Errorf("getting content for piece %s: %s", pieceCid, err) + msg := fmt.Sprintf("server error getting content for piece CID %s", pieceCidStr) + writeError(w, r, http.StatusInternalServerError, msg) + return + } + + serveCAR(w, r, content) +} + +func serveCAR(w http.ResponseWriter, r *http.Request, content io.ReadSeeker) { + // Set the Content-Type header explicitly so that http.ServeContent doesn't + // try to do it implicitly + w.Header().Set("Content-Type", "application/vnd.ipld.car") + + if r.Method == "HEAD" { + // For an HTTP HEAD request we don't send any data (just headers) + http.ServeContent(w, r, "", time.Time{}, content) + alog("%s\tHEAD %s", color.New(color.FgGreen).Sprintf("%d", http.StatusOK), r.URL) + return + } + + // Send the CAR file + // http.ServeContent ignores errors when writing to the stream, so we + // replace the writer with a class that watches for errors + var err error + writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) { + err = e + }} + + // Send the content + start := time.Now() + alogAt(start, "%s\tGET %s", color.New(color.FgGreen).Sprintf("%d", http.StatusOK), r.URL) + http.ServeContent(writeErrWatcher, r, "", time.Time{}, content) + + // Check if there was an error during the transfer + end := time.Now() + completeMsg := fmt.Sprintf("GET %s\n%s - %s: %s / %s bytes transferred", + r.URL, end.Format(timeFmt), start.Format(timeFmt), time.Since(start), addCommas(writeErrWatcher.count)) + if err == nil { + alogAt(end, "%s\t%s", color.New(color.FgGreen).Sprint("DONE"), completeMsg) + } else { + alogAt(end, "%s\t%s\n%s", + color.New(color.FgRed).Sprint("FAIL"), completeMsg, err) + } +} + +// isNotFoundError falls back to checking the error string for "not found". +// Unfortunately we can't always use errors.Is() because the error might +// have crossed an RPC boundary. +func isNotFoundError(err error) bool { + if errors.Is(err, ErrNotFound) { + return true + } + if errors.Is(err, datastore.ErrNotFound) { + return true + } + if errors.Is(err, retrievalmarket.ErrNotFound) { + return true + } + return strings.Contains(strings.ToLower(err.Error()), "not found") +} + +func writeError(w http.ResponseWriter, r *http.Request, status int, msg string) { + w.WriteHeader(status) + w.Write([]byte("Error: " + msg)) //nolint:errcheck + alog("%s\tGET %s\n%s", + color.New(color.FgRed).Sprintf("%d", status), r.URL, msg) +} + +func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io.ReadSeeker, error) { + // Get the deals for the piece + pieceInfo, err := s.api.GetPieceInfo(pieceCid) + if err != nil { + return nil, fmt.Errorf("getting sector info for piece %s: %w", pieceCid, err) + } + + // Get the first unsealed deal + di, err := s.unsealedDeal(ctx, *pieceInfo) + if err != nil { + return nil, fmt.Errorf("getting unsealed CAR file: %w", err) + } + + // Get the raw piece data from the sector + pieceReader, err := s.api.UnsealSectorAt(ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded()) + if err != nil { + return nil, fmt.Errorf("getting raw data from sector %d: %w", di.SectorID, err) + } + + maxOffset, err := s.api.GetMaxPieceOffset(pieceCid) + if err != nil { + return nil, fmt.Errorf("getting max offset for piece %s: %w", pieceCid, err) + } + + _, err = pieceReader.Seek(int64(maxOffset), io.SeekStart) + if err != nil { + return nil, fmt.Errorf("seeking to offset %d in piece data: %w", maxOffset, err) + } + + // A section consists of + // + + // Get + cr := &countReader{r: bufio.NewReader(pieceReader)} + dataLength, err := varint.ReadUvarint(cr) + if err != nil { + return nil, fmt.Errorf("reading CAR section length: %w", err) + } + + // The number of bytes in the uvarint that records + dataLengthUvarSize := cr.count + + // Get the size of the (unpadded) CAR file + unpaddedCarSize := maxOffset + dataLengthUvarSize + dataLength + + // Seek to the end of the CAR to get its (padded) size + paddedCarSize, err := pieceReader.Seek(0, io.SeekEnd) + if err != nil { + return nil, fmt.Errorf("seeking to end of CAR: %w", err) + } + + // Seek back to the start of the CAR + _, err = pieceReader.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("seeking to start of CAR: %w", err) + } + + lr := &limitSeekReader{ + Reader: io.LimitReader(pieceReader, int64(unpaddedCarSize)), + readSeeker: pieceReader, + unpaddedSize: int64(unpaddedCarSize), + paddedSize: paddedCarSize, + } + return lr, nil +} + +type limitSeekReader struct { + io.Reader + readSeeker io.ReadSeeker + unpaddedSize int64 + paddedSize int64 +} + +var _ io.ReadSeeker = (*limitSeekReader)(nil) + +func (l *limitSeekReader) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekEnd { + offset -= (l.paddedSize - l.unpaddedSize) + } + return l.readSeeker.Seek(offset, whence) +} + +func (s *HttpServer) unsealedDeal(ctx context.Context, pieceInfo piecestore.PieceInfo) (*piecestore.DealInfo, error) { + // There should always been deals in the PieceInfo, but check just in case + if len(pieceInfo.Deals) == 0 { + return nil, fmt.Errorf("there are no deals containing piece %s: %w", pieceInfo.PieceCID, ErrNotFound) + } + + // The same piece can be in many deals. Find the first unsealed deal. + sealedCount := 0 + var allErr error + for _, di := range pieceInfo.Deals { + isUnsealed, err := s.api.IsUnsealed(ctx, di.SectorID, di.Offset.Unpadded(), di.Length.Unpadded()) + if err != nil { + allErr = multierror.Append(allErr, err) + continue + } + if isUnsealed { + return &di, nil + } + sealedCount++ + } + + // Try to return an error message with as much useful information as possible + dealSectors := make([]string, 0, len(pieceInfo.Deals)) + for _, di := range pieceInfo.Deals { + dealSectors = append(dealSectors, fmt.Sprintf("Deal %d: Sector %d", di.DealID, di.SectorID)) + } + + if allErr == nil { + dealSectorsErr := fmt.Errorf("%s: %w", strings.Join(dealSectors, ", "), ErrNotFound) + return nil, fmt.Errorf("checked unsealed status of %d deals containing piece %s: none are unsealed: %w", + len(pieceInfo.Deals), pieceInfo.PieceCID, dealSectorsErr) + } + + if len(pieceInfo.Deals) == 1 { + return nil, fmt.Errorf("checking unsealed status of deal %d (sector %d) containing piece %s: %w", + pieceInfo.Deals[0].DealID, pieceInfo.Deals[0].SectorID, pieceInfo.PieceCID, allErr) + } + + if sealedCount == 0 { + return nil, fmt.Errorf("checking unsealed status of %d deals containing piece %s: %s: %w", + len(pieceInfo.Deals), pieceInfo.PieceCID, dealSectors, allErr) + } + + return nil, fmt.Errorf("checking unsealed status of %d deals containing piece %s - %d are sealed, %d had errors: %s: %w", + len(pieceInfo.Deals), pieceInfo.PieceCID, sealedCount, len(pieceInfo.Deals)-sealedCount, dealSectors, allErr) +} + +// writeErrorWatcher calls onError if there is an error writing to the writer +type writeErrorWatcher struct { + http.ResponseWriter + count uint64 + onError func(err error) +} + +func (w *writeErrorWatcher) Write(bz []byte) (int, error) { + count, err := w.ResponseWriter.Write(bz) + if err != nil { + w.onError(err) + } + w.count += uint64(count) + return count, err +} + +// countReader just counts the number of bytes read +type countReader struct { + r *bufio.Reader + count uint64 +} + +func (c *countReader) ReadByte() (byte, error) { + b, err := c.r.ReadByte() + if err == nil { + c.count++ + } + return b, err +} + +const timeFmt = "2006-01-02T15:04:05.000Z0700" + +func alog(l string, args ...interface{}) { + alogAt(time.Now(), l, args...) +} + +func alogAt(at time.Time, l string, args ...interface{}) { + fmt.Printf(at.Format(timeFmt)+"\t"+l+"\n", args...) +} diff --git a/cmd/booster-http/util.go b/cmd/booster-http/util.go new file mode 100644 index 000000000..a09165076 --- /dev/null +++ b/cmd/booster-http/util.go @@ -0,0 +1,11 @@ +package main + +import "fmt" + +func addCommas(count uint64) string { + str := fmt.Sprintf("%d", count) + for i := len(str) - 3; i > 0; i -= 3 { + str = str[:i] + "," + str[i:] + } + return str +} diff --git a/cmd/booster-http/util_test.go b/cmd/booster-http/util_test.go new file mode 100644 index 000000000..02954cde0 --- /dev/null +++ b/cmd/booster-http/util_test.go @@ -0,0 +1,52 @@ +package main + +import ( + "testing" +) + +func Test_addCommas(t *testing.T) { + type args struct { + count int + } + tests := []struct { + args args + want string + }{{ + args: args{count: 1}, + want: "1", + }, { + args: args{count: 12}, + want: "12", + }, { + args: args{count: 123}, + want: "123", + }, { + args: args{count: 1234}, + want: "1,234", + }, { + args: args{count: 12345}, + want: "12,345", + }, { + args: args{count: 123456}, + want: "123,456", + }, { + args: args{count: 1234567}, + want: "1,234,567", + }, { + args: args{count: 12345678}, + want: "12,345,678", + }, { + args: args{count: 123456789}, + want: "123,456,789", + }, { + args: args{count: 1234567890}, + want: "1,234,567,890", + }} + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + if got := addCommas(uint64(tt.args.count)); got != tt.want { + t.Errorf("addCommas() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 7042379f4..6a3968a32 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -9,6 +9,7 @@ * [BoostDagstoreInitializeAll](#boostdagstoreinitializeall) * [BoostDagstoreInitializeShard](#boostdagstoreinitializeshard) * [BoostDagstoreListShards](#boostdagstorelistshards) + * [BoostDagstorePiecesContainingMultihash](#boostdagstorepiecescontainingmultihash) * [BoostDagstoreRecoverShard](#boostdagstorerecovershard) * [BoostDagstoreRegisterShard](#boostdagstoreregistershard) * [BoostDeal](#boostdeal) @@ -66,6 +67,7 @@ * [NetPeers](#netpeers) * [Pieces](#pieces) * [PiecesGetCIDInfo](#piecesgetcidinfo) + * [PiecesGetMaxOffset](#piecesgetmaxoffset) * [PiecesGetPieceInfo](#piecesgetpieceinfo) * [PiecesListCidInfos](#pieceslistcidinfos) * [PiecesListPieces](#pieceslistpieces) @@ -206,6 +208,27 @@ Response: ] ``` +### BoostDagstorePiecesContainingMultihash + + +Perms: read + +Inputs: +```json +[ + "Bw==" +] +``` + +Response: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +] +``` + ### BoostDagstoreRecoverShard @@ -1332,6 +1355,22 @@ Response: } ``` +### PiecesGetMaxOffset + + +Perms: read + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +] +``` + +Response: `42` + ### PiecesGetPieceInfo diff --git a/go.mod b/go.mod index 57d249c2a..8c4233f5e 100644 --- a/go.mod +++ b/go.mod @@ -86,6 +86,7 @@ require ( github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multihash v0.1.0 + github.com/multiformats/go-varint v0.0.6 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pressly/goose/v3 v3.5.3 github.com/prometheus/client_golang v1.12.1 diff --git a/node/impl/boost.go b/node/impl/boost.go index 03f0a2344..6b15938cf 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/multiformats/go-multihash" "net/http" "sort" @@ -184,6 +185,28 @@ func (sm *BoostAPI) BoostDagstoreListShards(ctx context.Context) ([]api.Dagstore return ret, nil } +func (sm *BoostAPI) BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) { + if sm.DAGStore == nil { + return nil, fmt.Errorf("dagstore not available on this node") + } + + ks, err := sm.DAGStore.ShardsContainingMultihash(ctx, mh) + if err != nil { + return nil, fmt.Errorf("getting pieces containing multihash %s from DAG store: %w", mh, err) + } + + pieceCids := make([]cid.Cid, 0, len(ks)) + for _, k := range ks { + pieceCid, err := cid.Parse(k.String()) + if err != nil { + return nil, fmt.Errorf("parsing DAG store shard key '%s' into cid: %w", k, err) + } + pieceCids = append(pieceCids, pieceCid) + } + + return pieceCids, nil +} + func (sm *BoostAPI) BoostDagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreInitializeAllEvent, error) { if sm.DAGStore == nil { return nil, fmt.Errorf("dagstore not available on this node") diff --git a/node/impl/boost_legacy.go b/node/impl/boost_legacy.go index c698311d7..e4b72d826 100644 --- a/node/impl/boost_legacy.go +++ b/node/impl/boost_legacy.go @@ -3,6 +3,8 @@ package impl import ( "context" "fmt" + "github.com/filecoin-project/dagstore/shard" + "github.com/multiformats/go-multihash" "os" "time" @@ -202,7 +204,7 @@ func (sm *BoostAPI) PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) { func (sm *BoostAPI) PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) { pi, err := sm.PieceStore.GetPieceInfo(pieceCid) if err != nil { - return nil, err + return nil, fmt.Errorf("getting piece from piece store: %w", err) } return &pi, nil } @@ -216,6 +218,27 @@ func (sm *BoostAPI) PiecesGetCIDInfo(ctx context.Context, payloadCid cid.Cid) (* return &ci, nil } +func (sm *BoostAPI) PiecesGetMaxOffset(ctx context.Context, pieceCid cid.Cid) (uint64, error) { + var maxOffset uint64 + + it, err := sm.DAGStore.GetIterableIndex(shard.KeyFromCID(pieceCid)) + if err != nil { + return maxOffset, fmt.Errorf("getting iterable index for piece %s from DAG store: %w", pieceCid, err) + } + + err = it.ForEach(func(mh multihash.Multihash, offset uint64) error { + if offset > maxOffset { + maxOffset = offset + } + return nil + }) + if err != nil { + return maxOffset, fmt.Errorf("iterating over CAR index: %w", err) + } + + return maxOffset, err +} + func (sm *BoostAPI) RuntimeSubsystems(context.Context) (res lapi.MinerSubsystems, err error) { return []lapi.MinerSubsystem{lapi.SubsystemMarkets}, nil } diff --git a/transport/httptransport/libp2p_server.go b/transport/httptransport/libp2p_server.go index f8853e3dd..4f603da03 100644 --- a/transport/httptransport/libp2p_server.go +++ b/transport/httptransport/libp2p_server.go @@ -192,7 +192,7 @@ func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, a // Set the Content-Type header explicitly so that http.ServeContent doesn't // try to do it implicitly - w.Header().Set("Content-Type", "application/car") + w.Header().Set("Content-Type", "application/vnd.ipld.car") if r.Method == "HEAD" { // For an HTTP HEAD request we don't send any data (just headers)