提交 10ddd40f 作者: Jeromy

add option to enable go-multiplex experiment

License: MIT
Signed-off-by: 's avatarJeromy <why@ipfs.io>
上级 f81cccc3
...@@ -48,6 +48,7 @@ const ( ...@@ -48,6 +48,7 @@ const (
unrestrictedApiAccessKwd = "unrestricted-api" unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable" writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment" enableFloodSubKwd = "enable-pubsub-experiment"
enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api" // apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm" // swarmAddrKwd = "address-swarm"
) )
...@@ -158,6 +159,7 @@ Headers. ...@@ -158,6 +159,7 @@ Headers.
cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false), cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction."),
// TODO: add way to override addresses. tricky part: updating the config if also --init. // TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"), // cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
...@@ -288,6 +290,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { ...@@ -288,6 +290,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
offline, _, _ := req.Option(offlineKwd).Bool() offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool() pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
mplex, _, _ := req.Option(enableMultiplexKwd).Bool()
// Start assembling node config // Start assembling node config
ncfg := &core.BuildCfg{ ncfg := &core.BuildCfg{
...@@ -296,6 +299,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { ...@@ -296,6 +299,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
Online: !offline, Online: !offline,
ExtraOpts: map[string]bool{ ExtraOpts: map[string]bool{
"pubsub": pubsub, "pubsub": pubsub,
"mplex": mplex,
}, },
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral //TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
} }
......
...@@ -197,7 +197,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { ...@@ -197,7 +197,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
if cfg.Online { if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery) do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil { if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil {
return err return err
} }
} else { } else {
......
...@@ -14,7 +14,10 @@ import ( ...@@ -14,7 +14,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net" "net"
"os"
"strings"
"time" "time"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
...@@ -44,19 +47,24 @@ import ( ...@@ -44,19 +47,24 @@ import (
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58"
mssmux "gx/ipfs/QmTfjLsou9ic6L4KqCcmbLSZcdiFu8q1v6njKp121pbbXx/go-smux-multistream"
ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr"
floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub" floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub"
addrutil "gx/ipfs/QmVDnc2zvyQm8LhT72n22THcshvH7j3qPMnhvjerQER62T/go-addr-util" addrutil "gx/ipfs/QmVDnc2zvyQm8LhT72n22THcshvH7j3qPMnhvjerQER62T/go-addr-util"
spdy "gx/ipfs/QmWUNsat6Jb19nC5CiJCDXepTkxjdxi3eZqeoB6mrmmaGu/go-smux-spdystream"
swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm" swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm"
mplex "gx/ipfs/QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on/go-smux-multiplex"
metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
routing "gx/ipfs/QmbkGVaN9W6RYJK4Ws5FvMKXKDqdRQ5snhtaa92qP6L8eU/go-libp2p-routing" routing "gx/ipfs/QmbkGVaN9W6RYJK4Ws5FvMKXKDqdRQ5snhtaa92qP6L8eU/go-libp2p-routing"
yamux "gx/ipfs/Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U/go-smux-yamux"
discovery "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/discovery" discovery "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/discovery"
p2pbhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic" p2pbhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic"
rhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/routed" rhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/routed"
ping "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/protocol/ping" ping "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/protocol/ping"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore"
smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
ic "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto" ic "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto"
) )
...@@ -129,7 +137,7 @@ type Mounts struct { ...@@ -129,7 +137,7 @@ type Mounts struct {
Ipns mount.Mount Ipns mount.Mount
} }
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error { func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error {
if n.PeerHost != nil { // already online. if n.PeerHost != nil { // already online.
return errors.New("node already online") return errors.New("node already online")
...@@ -159,7 +167,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin ...@@ -159,7 +167,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
n.Reporter = metrics.NewBandwidthCounter() n.Reporter = metrics.NewBandwidthCounter()
} }
peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter) tpt := makeSmuxTransport(mplex)
peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter, tpt)
if err != nil { if err != nil {
return err return err
} }
...@@ -207,6 +217,34 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin ...@@ -207,6 +217,34 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return n.Bootstrap(DefaultBootstrapConfig) return n.Bootstrap(DefaultBootstrapConfig)
} }
func makeSmuxTransport(mplexExp bool) smux.Transport {
mstpt := mssmux.NewBlankTransport()
ymxtpt := &yamux.Transport{
AcceptBacklog: 8192,
ConnectionWriteTimeout: time.Second * 10,
KeepAliveInterval: time.Second * 30,
EnableKeepAlive: true,
MaxStreamWindowSize: uint32(1024 * 512),
LogOutput: ioutil.Discard,
}
mstpt.AddTransport("/yamux/1.0.0", ymxtpt)
mstpt.AddTransport("/spdy/3.1.0", spdy.Transport)
if mplexExp {
mstpt.AddTransport("/mplex/6.7.0", mplex.DefaultTransport)
}
// Allow muxer preference order overriding
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
mstpt.OrderPreference = strings.Fields(prefs)
}
return mstpt
}
func setupDiscoveryOption(d config.Discovery) DiscoveryOption { func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
if d.MDNS.Enabled { if d.MDNS.Enabled {
return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) { return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) {
...@@ -616,19 +654,21 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { ...@@ -616,19 +654,21 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
return listen, nil return listen, nil
} }
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error)
var DefaultHostOption HostOption = constructPeerHost var DefaultHostOption HostOption = constructPeerHost
// isolates the complex initialization steps // isolates the complex initialization steps
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) { func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) {
// no addresses to begin with. we'll start later. // no addresses to begin with. we'll start later.
network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr) swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, nil, tpt, bwr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
network := (*swarm.Network)(swrm)
for _, f := range fs { for _, f := range fs {
network.Swarm().Filters.AddDialFilter(f) network.Swarm().Filters.AddDialFilter(f)
} }
......
package coremock package coremock
import ( import (
"context"
"net" "net"
context "context"
"gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
commands "github.com/ipfs/go-ipfs/commands" commands "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config" config "github.com/ipfs/go-ipfs/repo/config"
ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2" ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2"
testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host" host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host"
"gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics" metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics"
mocknet "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/net/mock" mocknet "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/net/mock"
pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore" pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore"
smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
) )
...@@ -32,7 +33,7 @@ func NewMockNode() (*core.IpfsNode, error) { ...@@ -32,7 +33,7 @@ func NewMockNode() (*core.IpfsNode, error) {
} }
func MockHostOption(mn mocknet.Mocknet) core.HostOption { func MockHostOption(mn mocknet.Mocknet) core.HostOption {
return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (host.Host, error) { return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport) (host.Host, error) {
return mn.AddPeerWithPeerstore(id, ps) return mn.AddPeerWithPeerstore(id, ps)
} }
} }
......
...@@ -287,6 +287,12 @@ ...@@ -287,6 +287,12 @@
"hash": "QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93", "hash": "QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93",
"name": "go-os-helper", "name": "go-os-helper",
"version": "0.0.0" "version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on",
"name": "go-smux-multiplex",
"version": "1.1.4"
} }
], ],
"gxVersion": "0.4.0", "gxVersion": "0.4.0",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论