提交 95d58b2a 作者: Juan Batiz-Benet

core: cleaned up bootstrap process

上级 dd9c1b62
...@@ -11,33 +11,36 @@ import ( ...@@ -11,33 +11,36 @@ import (
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
diag "github.com/jbenet/go-ipfs/diagnostics"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
p2phost "github.com/jbenet/go-ipfs/p2p/host"
p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic"
swarm "github.com/jbenet/go-ipfs/p2p/net/swarm"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
offroute "github.com/jbenet/go-ipfs/routing/offline"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bserv "github.com/jbenet/go-ipfs/blockservice" bserv "github.com/jbenet/go-ipfs/blockservice"
diag "github.com/jbenet/go-ipfs/diagnostics"
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
offline "github.com/jbenet/go-ipfs/exchange/offline" offline "github.com/jbenet/go-ipfs/exchange/offline"
rp "github.com/jbenet/go-ipfs/exchange/reprovide" rp "github.com/jbenet/go-ipfs/exchange/reprovide"
mount "github.com/jbenet/go-ipfs/fuse/mount" mount "github.com/jbenet/go-ipfs/fuse/mount"
merkledag "github.com/jbenet/go-ipfs/merkledag" merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys" namesys "github.com/jbenet/go-ipfs/namesys"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
p2phost "github.com/jbenet/go-ipfs/p2p/host"
p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic"
swarm "github.com/jbenet/go-ipfs/p2p/net/swarm"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path" path "github.com/jbenet/go-ipfs/path"
pin "github.com/jbenet/go-ipfs/pin" pin "github.com/jbenet/go-ipfs/pin"
repo "github.com/jbenet/go-ipfs/repo" repo "github.com/jbenet/go-ipfs/repo"
config "github.com/jbenet/go-ipfs/repo/config" config "github.com/jbenet/go-ipfs/repo/config"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
offroute "github.com/jbenet/go-ipfs/routing/offline"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
) )
const IpnsValidatorTag = "ipns" const IpnsValidatorTag = "ipns"
...@@ -75,13 +78,14 @@ type IpfsNode struct { ...@@ -75,13 +78,14 @@ type IpfsNode struct {
Resolver *path.Resolver // the path resolution system Resolver *path.Resolver // the path resolution system
// Online // Online
PrivateKey ic.PrivKey // the local node's private Key PrivateKey ic.PrivKey // the local node's private Key
PeerHost p2phost.Host // the network host (server+client) PeerHost p2phost.Host // the network host (server+client)
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Bootstrapper io.Closer // the periodic bootstrapper
Exchange exchange.Interface // the block exchange + strategy (bitswap) Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Namesys namesys.NameSystem // the name system, resolves paths to hashes Exchange exchange.Interface // the block exchange + strategy (bitswap)
Diagnostics *diag.Diagnostics // the diagnostics service Namesys namesys.NameSystem // the name system, resolves paths to hashes
Reprovider *rp.Reprovider // the value reprovider system Diagnostics *diag.Diagnostics // the diagnostics service
Reprovider *rp.Reprovider // the value reprovider system
ctxgroup.ContextGroup ctxgroup.ContextGroup
...@@ -238,14 +242,7 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { ...@@ -238,14 +242,7 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error {
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
// prepare bootstrap peers from config return n.Bootstrap(DefaultBootstrapConfig)
bpeers, err := n.loadBootstrapPeers()
if err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", n.Identity, err)
return debugerror.Wrap(err)
}
return n.Bootstrap(ctx, bpeers)
} }
// teardown closes owned children. If any errors occur, this function returns // teardown closes owned children. If any errors occur, this function returns
...@@ -254,20 +251,20 @@ func (n *IpfsNode) teardown() error { ...@@ -254,20 +251,20 @@ func (n *IpfsNode) teardown() error {
// owned objects are closed in this teardown to ensure that they're closed // owned objects are closed in this teardown to ensure that they're closed
// regardless of which constructor was used to add them to the node. // regardless of which constructor was used to add them to the node.
var closers []io.Closer var closers []io.Closer
if n.Repo != nil { addCloser := func(c io.Closer) {
closers = append(closers, n.Repo) if c != nil {
} closers = append(closers, c)
if n.Blocks != nil {
closers = append(closers, n.Blocks)
}
if n.Routing != nil {
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
} }
} }
if n.PeerHost != nil {
closers = append(closers, n.PeerHost) addCloser(n.Bootstrapper)
addCloser(n.Repo)
addCloser(n.Blocks)
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
addCloser(dht)
} }
addCloser(n.PeerHost)
var errs []error var errs []error
for _, closer := range closers { for _, closer := range closers {
if err := closer.Close(); err != nil { if err := closer.Close(); err != nil {
...@@ -293,16 +290,34 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) { ...@@ -293,16 +290,34 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) {
return n.Resolver.ResolvePath(path) return n.Resolver.ResolvePath(path)
} }
// Bootstrap is undefined when node is not in OnlineMode func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error {
// TODO what should return value be when in offlineMode? // TODO what should return value be when in offlineMode?
if n.Routing == nil { if n.Routing == nil {
return nil return nil
} }
nb := nodeBootstrapper{n} if n.Bootstrapper != nil {
return nb.TryToBootstrap(ctx, peers) n.Bootstrapper.Close() // stop previous bootstrap process.
}
// if the caller did not specify a bootstrap peer function, get the
// freshest bootstrap peers from config. this responds to live changes.
if cfg.BootstrapPeers == nil {
cfg.BootstrapPeers = func() []peer.PeerInfo {
bpeers := n.Repo.Config().Bootstrap
ps, err := toPeerInfos(bpeers)
if err != nil {
log.Error("failed to parse bootstrap peers from config: %s", bpeers)
return nil
}
return ps
}
}
var err error
n.Bootstrapper, err = Bootstrap(n, cfg)
return err
} }
func (n *IpfsNode) loadID() error { func (n *IpfsNode) loadID() error {
...@@ -342,18 +357,6 @@ func (n *IpfsNode) loadPrivateKey() error { ...@@ -342,18 +357,6 @@ func (n *IpfsNode) loadPrivateKey() error {
return nil return nil
} }
func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) {
var peers []peer.PeerInfo
for _, bootstrap := range n.Repo.Config().Bootstrap {
p, err := toPeer(bootstrap)
if err != nil {
return nil, err
}
peers = append(peers, p)
}
return peers, nil
}
// SetupOfflineRouting loads the local nodes private key and // SetupOfflineRouting loads the local nodes private key and
// uses it to instantiate a routing system in offline mode. // uses it to instantiate a routing system in offline mode.
// This is primarily used for offline ipns modifications. // This is primarily used for offline ipns modifications.
......
...@@ -17,52 +17,42 @@ import ( ...@@ -17,52 +17,42 @@ import (
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
) )
// DefaultBootstrapQueries specifies how many queries to run, // BootstrapConfig specifies parameters used bootstrapping the DHT.
// if the user does not specify a different number as an option.
// //
// For now, this is set to 16 queries, which is an aggressive number. // Note there is a tradeoff between the bootstrap period and the
// We are currently more interested in ensuring we have a properly formed // number of queries. We could support a higher period with less
// DHT than making sure our dht minimizes traffic. Once we are more certain // queries.
// of our implementation's robustness, we should lower this down to 8 or 4. type BootstrapConfig struct {
// Queries int // how many queries to run per period
// Note there is also a tradeoff between the bootstrap period and the number Period time.Duration // how often to run periodi cbootstrap.
// of queries. We could support a higher period with a smaller number of Timeout time.Duration // how long to wait for a bootstrao query to run
// queries }
const DefaultBootstrapQueries = 1
// DefaultBootstrapPeriod specifies how often to periodically run bootstrap, var DefaultBootstrapConfig = BootstrapConfig{
// if the user does not specify a different number as an option. // For now, this is set to 1 query.
// // We are currently more interested in ensuring we have a properly formed
// For now, this is set to 10 seconds, which is an aggressive period. We are // DHT than making sure our dht minimizes traffic. Once we are more certain
// We are currently more interested in ensuring we have a properly formed // of our implementation's robustness, we should lower this down to 8 or 4.
// DHT than making sure our dht minimizes traffic. Once we are more certain Queries: 1,
// implementation's robustness, we should lower this down to 30s or 1m.
//
// Note there is also a tradeoff between the bootstrap period and the number
// of queries. We could support a higher period with a smaller number of
// queries
const DefaultBootstrapPeriod = time.Duration(10 * time.Second)
// DefaultBootstrapTimeout specifies how long to wait for a bootstrap query
// to run.
const DefaultBootstrapTimeout = time.Duration(10 * time.Second)
// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default
// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows
// the user to catch an error off the bat if the connections are faulty. It also
// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful
// for instrumenting it on tests, or delaying bootstrap until the network is online
// and connected to at least a few nodes.
//
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) {
if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil { // For now, this is set to 10 seconds, which is an aggressive period. We are
return nil, err // We are currently more interested in ensuring we have a properly formed
} // DHT than making sure our dht minimizes traffic. Once we are more certain
// implementation's robustness, we should lower this down to 30s or 1m.
Period: time.Duration(20 * time.Second),
sig := time.Tick(DefaultBootstrapPeriod) Timeout: time.Duration(20 * time.Second),
return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig) }
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// Bootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) Bootstrap(config BootstrapConfig) (goprocess.Process, error) {
sig := time.Tick(config.Period)
return dht.BootstrapOnSignal(config, sig)
} }
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go. // SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
...@@ -71,9 +61,9 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { ...@@ -71,9 +61,9 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) {
// These parameters are configurable. // These parameters are configurable.
// //
// SignalBootstrap returns a process, so the user can stop it. // SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) { func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
if queries <= 0 { if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", queries) return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
} }
if signal == nil { if signal == nil {
...@@ -85,27 +75,9 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop ...@@ -85,27 +75,9 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop
// maybe this is a good case for whole module event pub/sub? // maybe this is a good case for whole module event pub/sub?
ctx := dht.Context() ctx := dht.Context()
if err := dht.runBootstrap(ctx, queries); err != nil { if err := dht.runBootstrap(ctx, cfg); err != nil {
log.Error(err) log.Error(err)
// A bootstrapping error is important to notice but not fatal. // A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
} }
}) })
...@@ -113,7 +85,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop ...@@ -113,7 +85,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop
} }
// runBootstrap builds up list of peers by requesting random peer IDs // runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
bslog := func(msg string) { bslog := func(msg string) {
log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
} }
...@@ -133,7 +105,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { ...@@ -133,7 +105,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
} }
// bootstrap sequentially, as results will compound // bootstrap sequentially, as results will compound
ctx, cancel := context.WithTimeout(ctx, DefaultBootstrapTimeout) ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel() defer cancel()
runQuery := func(ctx context.Context, id peer.ID) { runQuery := func(ctx context.Context, id peer.ID) {
p, err := dht.FindPeer(ctx, id) p, err := dht.FindPeer(ctx, id)
...@@ -154,9 +126,9 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { ...@@ -154,9 +126,9 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
if sequential { if sequential {
// these should be parallel normally. but can make them sequential for debugging. // these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that. // note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < queries; i++ { for i := 0; i < cfg.Queries; i++ {
id := randomID() id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
runQuery(ctx, id) runQuery(ctx, id)
} }
...@@ -166,13 +138,13 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { ...@@ -166,13 +138,13 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
// normally, we should be selecting on ctx.Done() here too, but this gets // normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit. // complicated to do with WaitGroup, and doesnt wait for the children to exit.
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < queries; i++ { for i := 0; i < cfg.Queries; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
id := randomID() id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
runQuery(ctx, id) runQuery(ctx, id)
}() }()
} }
......
...@@ -115,8 +115,15 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { ...@@ -115,8 +115,15 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
} }
defer catter.Close() defer catter.Close()
catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}) bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}
adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}) bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}
if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil {
return err
}
if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil {
return err
}
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil { if err != nil {
......
...@@ -62,9 +62,15 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { ...@@ -62,9 +62,15 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
return err return err
} }
defer bootstrap.Close() defer bootstrap.Close()
boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis})
if err := adder.Bootstrap(bcfg); err != nil {
return err
}
if err := catter.Bootstrap(bcfg); err != nil {
return err
}
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil { if err != nil {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论