提交 d6aa9527 作者: Łukasz Magiera 提交者: Steven Allen

pubsub cmd: switch to coreapi

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 d3f3afa5
......@@ -3,23 +3,16 @@ package commands
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sort"
"sync"
"time"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore"
cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
floodsub "gx/ipfs/QmUK4h113Hh7bR2gPpsMcbUEbbzc7hspocmPi91Bmi69nH/go-libp2p-floodsub"
cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
)
......@@ -48,6 +41,13 @@ const (
pubsubDiscoverOptionName = "discover"
)
type pubsubMessage struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}
var PubsubSubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Subscribe to messages on a given topic.",
......@@ -79,40 +79,16 @@ This command outputs data in the following encodings:
cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}
// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}
if n.Floodsub == nil {
return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}
topic := req.Arguments[0]
sub, err := n.Floodsub.Subscribe(topic)
if err != nil {
return err
}
defer sub.Cancel()
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}
connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}
sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
defer sub.Close()
if f, ok := res.(http.Flusher); ok {
f.Flush()
......@@ -126,15 +102,17 @@ This command outputs data in the following encodings:
return err
}
err = res.Emit(msg)
if err != nil {
return err
}
res.Emit(&pubsubMessage{
Data: msg.Data(),
From: []byte(msg.From()),
Seqno: msg.Seq(),
TopicIDs: msg.Topics(),
})
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
......@@ -143,7 +121,7 @@ This command outputs data in the following encodings:
return err
}),
"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
......@@ -153,7 +131,7 @@ This command outputs data in the following encodings:
return err
}),
"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
......@@ -166,31 +144,7 @@ This command outputs data in the following encodings:
return err
}),
},
Type: floodsub.Message{},
}
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}
wg.Wait()
Type: pubsubMessage{},
}
var PubsubPubCmd = &cmds.Command{
......@@ -210,20 +164,11 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}
// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}
if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}
topic := req.Arguments[0]
err = req.ParseBodyArgs()
......@@ -232,7 +177,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
}
for _, data := range req.Arguments[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
return err
}
}
......@@ -254,21 +199,17 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}
// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}
if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
l, err := api.PubSub().Ls(req.Context)
if err != nil {
return err
}
return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
return cmds.EmitOnce(res, stringList{l})
},
Type: stringList{},
Encoders: cmds.EncoderMap{
......@@ -308,26 +249,21 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}
// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}
if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}
var topic string
if len(req.Arguments) == 1 {
topic = req.Arguments[0]
}
peers := n.Floodsub.ListPeers(topic)
peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
if err != nil {
return err
}
list := &stringList{make([]string, 0, len(peers))}
for _, peer := range peers {
......
......@@ -16,8 +16,12 @@ package coreapi
import (
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
)
var log = logging.Logger("core/coreapi")
type CoreAPI struct {
node *core.IpfsNode
}
......
......@@ -6,7 +6,7 @@ import (
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
)
// PubSubSubscription is an active PubSub subscription
......@@ -24,6 +24,12 @@ type PubSubMessage interface {
// Data returns the message body
Data() []byte
// Seq returns message identifier
Seq() []byte
// Topics returns list of topics this message was set to
Topics() []string
}
// PubSubAPI specifies the interface to PubSub
......
......@@ -3,12 +3,18 @@ package coreapi
import (
"context"
"errors"
"strings"
"sync"
"time"
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore"
floodsub "gx/ipfs/QmUK4h113Hh7bR2gPpsMcbUEbbzc7hspocmPi91Bmi69nH/go-libp2p-floodsub"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
)
type PubSubAPI CoreAPI
......@@ -58,6 +64,8 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
}
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)
if err := api.checkNode(); err != nil {
return nil, err
}
......@@ -67,9 +75,45 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err
}
if options.Discover {
go func() {
blk, err := api.core().Block().Put(ctx, strings.NewReader("floodsub:"+topic))
if err != nil {
log.Error("pubsub discovery: ", err)
return
}
connectToPubSubPeers(ctx, api.node, blk.Path().Cid())
}()
}
return &pubSubSubscription{sub}, nil
}
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}
wg.Wait()
}
func (api *PubSubAPI) checkNode() error {
if !api.node.OnlineMode() {
return coreiface.ErrOffline
......@@ -103,3 +147,15 @@ func (msg *pubSubMessage) From() peer.ID {
func (msg *pubSubMessage) Data() []byte {
return msg.msg.Data
}
func (msg *pubSubMessage) Seq() []byte {
return msg.msg.Seqno
}
func (msg *pubSubMessage) Topics() []string {
return msg.msg.TopicIDs
}
func (api *PubSubAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论