提交 009301bf 作者: Jeromy

hide pubsub behind feature flag

License: MIT
Signed-off-by: 's avatarJeromy <why@ipfs.io>
上级 ff770fad
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
"gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net" "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
"gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus" "gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus"
conn "gx/ipfs/QmUuwQUJmtvC6ReYcu7xaYKEUM3pD46H18dFn3LBhVt2Di/go-libp2p/p2p/net/conn"
mprome "gx/ipfs/QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw/go-metrics-prometheus" mprome "gx/ipfs/QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw/go-metrics-prometheus"
pstore "gx/ipfs/QmYkwVGkwoPbMVQEbf6LonZg4SsCxGP3H7PBEtdNCNRyxD/go-libp2p-peerstore" pstore "gx/ipfs/QmYkwVGkwoPbMVQEbf6LonZg4SsCxGP3H7PBEtdNCNRyxD/go-libp2p-peerstore"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr" ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
...@@ -47,6 +47,7 @@ const ( ...@@ -47,6 +47,7 @@ const (
unencryptTransportKwd = "disable-transport-encryption" unencryptTransportKwd = "disable-transport-encryption"
unrestrictedApiAccessKwd = "unrestricted-api" unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable" writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
// apiAddrKwd = "address-api" // apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm" // swarmAddrKwd = "address-swarm"
) )
...@@ -145,6 +146,7 @@ Headers. ...@@ -145,6 +146,7 @@ Headers.
cmds.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").Default(true), cmds.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").Default(true),
cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false), cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
// TODO: add way to override addresses. tricky part: updating the config if also --init. // TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), // cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
...@@ -266,14 +268,19 @@ func daemonFunc(req cmds.Request, res cmds.Response) { ...@@ -266,14 +268,19 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
return return
} }
offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
// Start assembling node config // Start assembling node config
ncfg := &core.BuildCfg{ ncfg := &core.BuildCfg{
Repo: repo, Repo: repo,
Permament: true, // It is temporary way to signify that node is permament Permament: true, // It is temporary way to signify that node is permament
//TODO(Kubuxu): refactor Online vs Offline by adding Permement vs Epthemeral Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
} }
offline, _, _ := req.Option(offlineKwd).Bool()
ncfg.Online = !offline
routingOption, _, err := req.Option(routingOptionKwd).String() routingOption, _, err := req.Option(routingOptionKwd).String()
if err != nil { if err != nil {
......
...@@ -32,6 +32,9 @@ type BuildCfg struct { ...@@ -32,6 +32,9 @@ type BuildCfg struct {
// If online is set, the node will have networking enabled // If online is set, the node will have networking enabled
Online bool Online bool
// ExtraOpts is a map of extra options used to configure the ipfs nodes creation
ExtraOpts map[string]bool
// If permament then node should run more expensive processes // If permament then node should run more expensive processes
// that will improve performance in long run // that will improve performance in long run
Permament bool Permament bool
...@@ -44,6 +47,14 @@ type BuildCfg struct { ...@@ -44,6 +47,14 @@ type BuildCfg struct {
Repo repo.Repo Repo repo.Repo
} }
func (cfg *BuildCfg) getOpt(key string) bool {
if cfg.ExtraOpts == nil {
return false
}
return cfg.ExtraOpts[key]
}
func (cfg *BuildCfg) fillDefaults() error { func (cfg *BuildCfg) fillDefaults() error {
if cfg.Repo != nil && cfg.NilRepo { if cfg.Repo != nil && cfg.NilRepo {
return errors.New("cannot set a repo and specify nilrepo at the same time") return errors.New("cannot set a repo and specify nilrepo at the same time")
...@@ -184,7 +195,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { ...@@ -184,7 +195,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
if cfg.Online { if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery) do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do); err != nil { if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil {
return err return err
} }
} else { } else {
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"fmt"
"io" "io"
"sync" "sync"
"time" "time"
...@@ -28,6 +29,8 @@ subscribe to new messages on a given topic. ...@@ -28,6 +29,8 @@ subscribe to new messages on a given topic.
This is an experimental feature. It is not intended in its current state This is an experimental feature. It is not intended in its current state
to be used in a production environment. to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`, `,
}, },
Subcommands: map[string]*cmds.Command{ Subcommands: map[string]*cmds.Command{
...@@ -44,6 +47,8 @@ ipfs pubsub sub subscribes to messages on a given topic. ...@@ -44,6 +47,8 @@ ipfs pubsub sub subscribes to messages on a given topic.
This is an experimental feature. It is not intended in its current state This is an experimental feature. It is not intended in its current state
to be used in a production environment. to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`, `,
}, },
Arguments: []cmds.Argument{ Arguments: []cmds.Argument{
...@@ -65,6 +70,11 @@ to be used in a production environment. ...@@ -65,6 +70,11 @@ to be used in a production environment.
return return
} }
if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
return
}
topic := req.Arguments()[0] topic := req.Arguments()[0]
msgs, err := n.Floodsub.Subscribe(topic) msgs, err := n.Floodsub.Subscribe(topic)
if err != nil { if err != nil {
...@@ -176,6 +186,8 @@ ipfs pubsub pub publishes a message to a specified topic. ...@@ -176,6 +186,8 @@ ipfs pubsub pub publishes a message to a specified topic.
This is an experimental feature. It is not intended in its current state This is an experimental feature. It is not intended in its current state
to be used in a production environment. to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`, `,
}, },
Arguments: []cmds.Argument{ Arguments: []cmds.Argument{
...@@ -196,6 +208,11 @@ to be used in a production environment. ...@@ -196,6 +208,11 @@ to be used in a production environment.
return return
} }
if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
return
}
topic := req.Arguments()[0] topic := req.Arguments()[0]
for _, data := range req.Arguments()[1:] { for _, data := range req.Arguments()[1:] {
......
...@@ -99,13 +99,13 @@ var rootSubcommands = map[string]*cmds.Command{ ...@@ -99,13 +99,13 @@ var rootSubcommands = map[string]*cmds.Command{
"object": ocmd.ObjectCmd, "object": ocmd.ObjectCmd,
"pin": PinCmd, "pin": PinCmd,
"ping": PingCmd, "ping": PingCmd,
"pubsub": PubsubCmd,
"refs": RefsCmd, "refs": RefsCmd,
"repo": RepoCmd, "repo": RepoCmd,
"resolve": ResolveCmd, "resolve": ResolveCmd,
"stats": StatsCmd, "stats": StatsCmd,
"swarm": SwarmCmd, "swarm": SwarmCmd,
"tar": TarCmd, "tar": TarCmd,
"pubsub": PubsubCmd,
"tour": tourCmd, "tour": tourCmd,
"file": unixfs.UnixFSCmd, "file": unixfs.UnixFSCmd,
"update": ExternalBinary(), "update": ExternalBinary(),
......
...@@ -129,7 +129,7 @@ type Mounts struct { ...@@ -129,7 +129,7 @@ type Mounts struct {
Ipns mount.Mount Ipns mount.Mount
} }
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error { func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error {
if n.PeerHost != nil { // already online. if n.PeerHost != nil { // already online.
return errors.New("node already online") return errors.New("node already online")
...@@ -187,7 +187,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin ...@@ -187,7 +187,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
go n.Reprovider.ProvideEvery(ctx, interval) go n.Reprovider.ProvideEvery(ctx, interval)
} }
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) if pubsub {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
// setup local discovery // setup local discovery
if do != nil { if do != nil {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论