Skip to content
This repository was archived by the owner on Oct 5, 2023. It is now read-only.

feat: pubsub http rpc with multibase #151

Merged
merged 3 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/go-test-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ description: install go-ipfs
runs:
using: "composite"
steps:
- name: Step 1
- name: Install go-ipfs
shell: bash
run: (cd /tmp && go install github.com/ipfs/go-ipfs/cmd/ipfs@master)
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# go-ipfs-http-api

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](https://ipfs.io/)
[![](https://img.shields.io/badge/matrix-%23ipfs-blue.svg?style=flat-square)](https://app.element.io/#/room/#ipfs:matrix.org)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-http-api?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-http-api)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/libp2p/go-libp2p-core v0.8.6
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.15
github.com/pkg/errors v0.9.1
)
Expand Down
78 changes: 61 additions & 17 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
iface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/libp2p/go-libp2p-core/peer"
mbase "github.com/multiformats/go-multibase"
)

type PubsubAPI HttpApi
Expand All @@ -21,8 +22,15 @@ func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
if err := api.core().Request("pubsub/ls").Exec(ctx, &out); err != nil {
return nil, err
}

return out.Strings, nil
topics := make([]string, len(out.Strings))
for n, mb := range out.Strings {
_, topic, err := mbase.Decode(mb)
if err != nil {
return nil, err
}
topics[n] = string(topic)
}
return topics, nil
}

func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
Expand All @@ -35,7 +43,11 @@ func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
Strings []string
}

if err := api.core().Request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil {
var optionalTopic string
if len(options.Topic) > 0 {
optionalTopic = toMultibase([]byte(options.Topic))
}
if err := api.core().Request("pubsub/peers", optionalTopic).Exec(ctx, &out); err != nil {
return nil, err
}

Expand All @@ -51,7 +63,7 @@ func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
}

func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
return api.core().Request("pubsub/pub", topic).
return api.core().Request("pubsub/pub", toMultibase([]byte(topic))).
FileBody(bytes.NewReader(message)).
Exec(ctx, nil)
}
Expand All @@ -64,29 +76,36 @@ type pubsubSub struct {
}

type pubsubMessage struct {
JFrom []byte `json:"from,omitempty"`
JData []byte `json:"data,omitempty"`
JSeqno []byte `json:"seqno,omitempty"`
JFrom string `json:"from,omitempty"`
JData string `json:"data,omitempty"`
JSeqno string `json:"seqno,omitempty"`
JTopicIDs []string `json:"topicIDs,omitempty"`

from peer.ID
err error
// real values after unpacking from text/multibase envelopes
from peer.ID
data []byte
seqno []byte
topics []string

err error
}

func (msg *pubsubMessage) From() peer.ID {
return msg.from
}

func (msg *pubsubMessage) Data() []byte {
return msg.JData
return msg.data
}

func (msg *pubsubMessage) Seq() []byte {
return msg.JSeqno
return msg.seqno
}

// TODO: do we want to keep this interface as []string,
// or change to more correct [][]byte?
func (msg *pubsubMessage) Topics() []string {
return msg.JTopicIDs
return msg.topics
}

func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
Expand All @@ -98,22 +117,41 @@ func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
if msg.err != nil {
return nil, msg.err
}
// unpack values from text/multibase envelopes
var err error
msg.from, err = peer.IDFromBytes(msg.JFrom)
return &msg, err
msg.from, err = peer.Decode(msg.JFrom)
if err != nil {
return nil, err
}
_, msg.data, err = mbase.Decode(msg.JData)
if err != nil {
return nil, err
}
_, msg.seqno, err = mbase.Decode(msg.JSeqno)
if err != nil {
return nil, err
}
for _, mbt := range msg.JTopicIDs {
_, topic, err := mbase.Decode(mbt)
if err != nil {
return nil, err
}
msg.topics = append(msg.topics, string(topic))
}
return &msg, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
/* right now we have no options (discover got deprecated)
options, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}

resp, err := api.core().Request("pubsub/sub", topic).
Option("discover", options.Discover).Send(ctx)
*/
resp, err := api.core().Request("pubsub/sub", toMultibase([]byte(topic))).Send(ctx)

if err != nil {
return nil, err
Expand Down Expand Up @@ -168,3 +206,9 @@ func (s *pubsubSub) Close() error {
func (api *PubsubAPI) core() *HttpApi {
return (*HttpApi)(api)
}

// Encodes bytes into URL-safe multibase that can be sent over HTTP RPC (URL or body)
func toMultibase(data []byte) string {
mb, _ := mbase.Encode(mbase.Base64url, data)
return mb
}