提交 efa282d4 作者: Steven Allen

update for pubsub rename

License: MIT
Signed-off-by: 's avatarSteven Allen <steven@stebalien.com>
上级 d3bba25f
...@@ -389,7 +389,7 @@ Some places to get you started on the codebase: ...@@ -389,7 +389,7 @@ Some places to get you started on the codebase:
- libp2p - libp2p
- libp2p: https://github.com/libp2p/go-libp2p - libp2p: https://github.com/libp2p/go-libp2p
- DHT: https://github.com/libp2p/go-libp2p-kad-dht - DHT: https://github.com/libp2p/go-libp2p-kad-dht
- PubSub: https://github.com/libp2p/go-floodsub - PubSub: https://github.com/libp2p/go-libp2p-pubsub
### CLI, HTTP-API, Architecture Diagram ### CLI, HTTP-API, Architecture Diagram
......
...@@ -50,7 +50,7 @@ const ( ...@@ -50,7 +50,7 @@ const (
unencryptTransportKwd = "disable-transport-encryption" unencryptTransportKwd = "disable-transport-encryption"
unrestrictedApiAccessKwd = "unrestricted-api" unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable" writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment" enablePubSubKwd = "enable-pubsub-experiment"
enableIPNSPubSubKwd = "enable-namesys-pubsub" enableIPNSPubSubKwd = "enable-namesys-pubsub"
enableMultiplexKwd = "enable-mplex-experiment" enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api" // apiAddrKwd = "address-api"
...@@ -163,7 +163,7 @@ Headers. ...@@ -163,7 +163,7 @@ Headers.
cmdkit.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").WithDefault(true), cmdkit.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").WithDefault(true),
cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API."), cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API."),
cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."), cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmdkit.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."), cmdkit.BoolOption(enablePubSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."), cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."),
cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true), cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true),
...@@ -285,7 +285,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment ...@@ -285,7 +285,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
offline, _ := req.Options[offlineKwd].(bool) offline, _ := req.Options[offlineKwd].(bool)
ipnsps, _ := req.Options[enableIPNSPubSubKwd].(bool) ipnsps, _ := req.Options[enableIPNSPubSubKwd].(bool)
pubsub, _ := req.Options[enableFloodSubKwd].(bool) pubsub, _ := req.Options[enablePubSubKwd].(bool)
mplex, _ := req.Options[enableMultiplexKwd].(bool) mplex, _ := req.Options[enableMultiplexKwd].(bool)
// Start assembling node config // Start assembling node config
......
...@@ -44,6 +44,7 @@ import ( ...@@ -44,6 +44,7 @@ import (
ic "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" ic "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
nilrouting "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/none" nilrouting "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/none"
offroute "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline" offroute "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline"
psrouter "gx/ipfs/QmQ9qNWAZDDoPYenvsMwoyh4ZrJFZdbfGTVDuZkxUcJKp1/go-libp2p-pubsub-router"
exchange "gx/ipfs/QmR1nncPsZR14A4hWr39mq8Lm7BGgS68bHVT9nop8NpWEM/go-ipfs-exchange-interface" exchange "gx/ipfs/QmR1nncPsZR14A4hWr39mq8Lm7BGgS68bHVT9nop8NpWEM/go-ipfs-exchange-interface"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
...@@ -53,12 +54,10 @@ import ( ...@@ -53,12 +54,10 @@ import (
config "gx/ipfs/QmSoYrBMibm2T3LupaLuez7LPGnyrJwdRxvTfPUyCp691u/go-ipfs-config" config "gx/ipfs/QmSoYrBMibm2T3LupaLuez7LPGnyrJwdRxvTfPUyCp691u/go-ipfs-config"
dht "gx/ipfs/QmSteomMgXnSQxLEY5UpxmkYAd8QF9JuLLeLYBokTHxFru/go-libp2p-kad-dht" dht "gx/ipfs/QmSteomMgXnSQxLEY5UpxmkYAd8QF9JuLLeLYBokTHxFru/go-libp2p-kad-dht"
dhtopts "gx/ipfs/QmSteomMgXnSQxLEY5UpxmkYAd8QF9JuLLeLYBokTHxFru/go-libp2p-kad-dht/opts" dhtopts "gx/ipfs/QmSteomMgXnSQxLEY5UpxmkYAd8QF9JuLLeLYBokTHxFru/go-libp2p-kad-dht/opts"
floodsub "gx/ipfs/QmTcC9Qx2adsdGguNpqZ6dJK7MMsH8sf3yfxZxG3bSwKet/go-libp2p-floodsub"
merkledag "gx/ipfs/QmVvNkTCx8V9Zei8xuTYTBdUXmbnDRS4iNuw1SztYyhQwQ/go-merkledag" merkledag "gx/ipfs/QmVvNkTCx8V9Zei8xuTYTBdUXmbnDRS4iNuw1SztYyhQwQ/go-merkledag"
yamux "gx/ipfs/QmVwYCtShoL74Xi8TgEg9jXvHVVtWwZ3Hg1QZqQvQA9xji/go-smux-yamux" yamux "gx/ipfs/QmVwYCtShoL74Xi8TgEg9jXvHVVtWwZ3Hg1QZqQvQA9xji/go-smux-yamux"
ft "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs" ft "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs"
pstore "gx/ipfs/QmWtCpWB39Rzc2xTB75MKorsxNpo3TyecTEN24CJ3KVohE/go-libp2p-peerstore" pstore "gx/ipfs/QmWtCpWB39Rzc2xTB75MKorsxNpo3TyecTEN24CJ3KVohE/go-libp2p-peerstore"
psrouter "gx/ipfs/QmX5cToCpsigs7ZebYjm8P8aQN5A6Mx9LydrrSyBBtevzn/go-libp2p-pubsub-router"
bitswap "gx/ipfs/QmXBT58TaD2CJThpHy4xkxC1xsW4hXWBGZuKMepwjuzJ5B/go-bitswap" bitswap "gx/ipfs/QmXBT58TaD2CJThpHy4xkxC1xsW4hXWBGZuKMepwjuzJ5B/go-bitswap"
bsnet "gx/ipfs/QmXBT58TaD2CJThpHy4xkxC1xsW4hXWBGZuKMepwjuzJ5B/go-bitswap/network" bsnet "gx/ipfs/QmXBT58TaD2CJThpHy4xkxC1xsW4hXWBGZuKMepwjuzJ5B/go-bitswap/network"
pnet "gx/ipfs/QmY4Q5JC4vxLEi8EpVxJM4rcRryEVtH1zRKVTAm6BKV1pg/go-libp2p-pnet" pnet "gx/ipfs/QmY4Q5JC4vxLEi8EpVxJM4rcRryEVtH1zRKVTAm6BKV1pg/go-libp2p-pnet"
...@@ -75,6 +74,7 @@ import ( ...@@ -75,6 +74,7 @@ import (
bstore "gx/ipfs/QmcDDgAXDbpDUpadCJKLr49KYR4HuL7T8Z1dZTHt6ixsoR/go-ipfs-blockstore" bstore "gx/ipfs/QmcDDgAXDbpDUpadCJKLr49KYR4HuL7T8Z1dZTHt6ixsoR/go-ipfs-blockstore"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
"gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path/resolver" "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path/resolver"
pubsub "gx/ipfs/QmdxgseTjZvbvEKGbpnSitR6oCCanRZiSiqjn1SC4pb7Wy/go-libp2p-pubsub"
mfs "gx/ipfs/Qmf5gumjmXpwmn7uDfAvkXbFQ5sHGGbJGccS8znSYmDQaz/go-mfs" mfs "gx/ipfs/Qmf5gumjmXpwmn7uDfAvkXbFQ5sHGGbJGccS8znSYmDQaz/go-mfs"
p2phost "gx/ipfs/Qmf5yHzmWAyHSJRPAmZzfk3Yd7icydBLi7eec5741aov7v/go-libp2p-host" p2phost "gx/ipfs/Qmf5yHzmWAyHSJRPAmZzfk3Yd7icydBLi7eec5741aov7v/go-libp2p-host"
) )
...@@ -137,7 +137,7 @@ type IpfsNode struct { ...@@ -137,7 +137,7 @@ type IpfsNode struct {
Reprovider *rp.Reprovider // the value reprovider system Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher IpnsRepub *ipnsrp.Republisher
Floodsub *floodsub.PubSub PubSub *pubsub.PubSub
PSRouter *psrouter.PubsubValueStore PSRouter *psrouter.PubsubValueStore
DHT *dht.IpfsDHT DHT *dht.IpfsDHT
P2P *p2p.P2P P2P *p2p.P2P
...@@ -459,26 +459,26 @@ func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) { ...@@ -459,26 +459,26 @@ func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) {
// startOnlineServicesWithHost is the set of services which need to be // startOnlineServicesWithHost is the set of services which need to be
// initialized with the host and _before_ we start listening. // initialized with the host and _before_ we start listening.
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, pubsub bool, ipnsps bool) error { func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, enablePubsub bool, enableIpnsps bool) error {
// setup diagnostics service // setup diagnostics service
n.Ping = ping.NewPingService(host) n.Ping = ping.NewPingService(host)
if pubsub || ipnsps { if enablePubsub || enableIpnsps {
cfg, err := n.Repo.Config() cfg, err := n.Repo.Config()
if err != nil { if err != nil {
return err return err
} }
var service *floodsub.PubSub var service *pubsub.PubSub
switch cfg.Pubsub.Router { switch cfg.Pubsub.Router {
case "": case "":
fallthrough fallthrough
case "floodsub": case "floodsub":
service, err = floodsub.NewFloodSub(ctx, host) service, err = pubsub.NewFloodSub(ctx, host)
case "gossipsub": case "gossipsub":
service, err = floodsub.NewGossipSub(ctx, host) service, err = pubsub.NewGossipSub(ctx, host)
default: default:
err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router)
...@@ -487,7 +487,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost ...@@ -487,7 +487,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
if err != nil { if err != nil {
return err return err
} }
n.Floodsub = service n.PubSub = service
} }
// setup routing service // setup routing service
...@@ -514,12 +514,12 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost ...@@ -514,12 +514,12 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
n.DHT = dht n.DHT = dht
} }
if ipnsps { if enableIpnsps {
n.PSRouter = psrouter.NewPubsubValueStore( n.PSRouter = psrouter.NewPubsubValueStore(
ctx, ctx,
host, host,
n.Routing, n.Routing,
n.Floodsub, n.PubSub,
n.RecordValidator, n.RecordValidator,
) )
n.Routing = rhelpers.Tiered{ n.Routing = rhelpers.Tiered{
......
...@@ -12,20 +12,20 @@ import ( ...@@ -12,20 +12,20 @@ import (
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
floodsub "gx/ipfs/QmTcC9Qx2adsdGguNpqZ6dJK7MMsH8sf3yfxZxG3bSwKet/go-libp2p-floodsub"
pstore "gx/ipfs/QmWtCpWB39Rzc2xTB75MKorsxNpo3TyecTEN24CJ3KVohE/go-libp2p-peerstore" pstore "gx/ipfs/QmWtCpWB39Rzc2xTB75MKorsxNpo3TyecTEN24CJ3KVohE/go-libp2p-peerstore"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
pubsub "gx/ipfs/QmdxgseTjZvbvEKGbpnSitR6oCCanRZiSiqjn1SC4pb7Wy/go-libp2p-pubsub"
) )
type PubSubAPI CoreAPI type PubSubAPI CoreAPI
type pubSubSubscription struct { type pubSubSubscription struct {
cancel context.CancelFunc cancel context.CancelFunc
subscription *floodsub.Subscription subscription *pubsub.Subscription
} }
type pubSubMessage struct { type pubSubMessage struct {
msg *floodsub.Message msg *pubsub.Message
} }
func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
...@@ -33,7 +33,7 @@ func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) { ...@@ -33,7 +33,7 @@ func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
return nil, err return nil, err
} }
return api.node.Floodsub.GetTopics(), nil return api.node.PubSub.GetTopics(), nil
} }
func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
...@@ -46,7 +46,7 @@ func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio ...@@ -46,7 +46,7 @@ func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
return nil, err return nil, err
} }
peers := api.node.Floodsub.ListPeers(settings.Topic) peers := api.node.PubSub.ListPeers(settings.Topic)
out := make([]peer.ID, len(peers)) out := make([]peer.ID, len(peers))
for i, peer := range peers { for i, peer := range peers {
...@@ -61,7 +61,7 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er ...@@ -61,7 +61,7 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
return err return err
} }
return api.node.Floodsub.Publish(topic, data) return api.node.PubSub.Publish(topic, data)
} }
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
...@@ -71,7 +71,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt ...@@ -71,7 +71,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err return nil, err
} }
sub, err := api.node.Floodsub.Subscribe(topic) sub, err := api.node.PubSub.Subscribe(topic)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -122,7 +122,7 @@ func (api *PubSubAPI) checkNode() error { ...@@ -122,7 +122,7 @@ func (api *PubSubAPI) checkNode() error {
return coreiface.ErrOffline return coreiface.ErrOffline
} }
if api.node.Floodsub == nil { if api.node.PubSub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
} }
......
...@@ -183,12 +183,6 @@ ...@@ -183,12 +183,6 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmTcC9Qx2adsdGguNpqZ6dJK7MMsH8sf3yfxZxG3bSwKet",
"name": "go-libp2p-floodsub",
"version": "0.9.36"
},
{
"author": "whyrusleeping",
"hash": "QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL", "hash": "QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL",
"name": "go-ipld-format", "name": "go-ipld-format",
"version": "0.6.0" "version": "0.6.0"
...@@ -464,9 +458,9 @@ ...@@ -464,9 +458,9 @@
}, },
{ {
"author": "stebalien", "author": "stebalien",
"hash": "QmX5cToCpsigs7ZebYjm8P8aQN5A6Mx9LydrrSyBBtevzn", "hash": "QmQ9qNWAZDDoPYenvsMwoyh4ZrJFZdbfGTVDuZkxUcJKp1",
"name": "go-libp2p-pubsub-router", "name": "go-libp2p-pubsub-router",
"version": "0.4.11" "version": "0.4.12"
}, },
{ {
"author": "Stebalien", "author": "Stebalien",
...@@ -586,6 +580,12 @@ ...@@ -586,6 +580,12 @@
"hash": "QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC", "hash": "QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC",
"name": "go-ipfs-files", "name": "go-ipfs-files",
"version": "1.0.1" "version": "1.0.1"
},
{
"author": "whyrusleeping",
"hash": "QmdxgseTjZvbvEKGbpnSitR6oCCanRZiSiqjn1SC4pb7Wy",
"name": "go-libp2p-pubsub",
"version": "0.10.0"
} }
], ],
"gxVersion": "0.10.0", "gxVersion": "0.10.0",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论