提交 585e7164 作者: Juan Batiz-Benet

Merge pull request #426 from jbenet/feat/dht-bootstrap

feat(dht) Bootstrap
......@@ -2,7 +2,6 @@ package core
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
......@@ -20,9 +19,10 @@ import (
)
const (
period = 30 * time.Second // how often to check connection status
connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect
recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value
period = 30 * time.Second // how often to check connection status
connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect
recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value
numDHTBootstrapQueries = 15 // number of DHT queries to execute
)
func superviseConnections(parent context.Context,
......@@ -84,20 +84,19 @@ func bootstrap(ctx context.Context,
}
}
if len(notConnected) < 1 {
s := "must bootstrap to %d more nodes, but already connected to all candidates"
err := fmt.Errorf(s, numCxnsToCreate)
log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", h.ID(), err)
return err
// if not connected to all bootstrap peer candidates
if len(notConnected) > 0 {
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset)
if err := connect(ctx, ps, r, randomSubset); err != nil {
log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", h.ID(), err)
return err
}
}
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset)
if err := connect(ctx, ps, r, randomSubset); err != nil {
log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", h.ID(), err)
// we can try running dht bootstrap even if we're connected to all bootstrap peers.
if err := r.Bootstrap(ctx, numDHTBootstrapQueries); err != nil {
return err
}
return nil
......
......@@ -341,20 +341,64 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
}
// Bootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) {
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
var merr u.MultiErr
// bootstrap sequentially, as results will compound
for i := 0; i < NumBootstrapQueries; i++ {
randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
pi, err := dht.FindPeer(ctx, peer.ID(id))
return peer.ID(id)
}
// bootstrap sequentially, as results will compound
runQuery := func(ctx context.Context, id peer.ID) {
p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
log.Errorf("Bootstrap peer error: %s", err)
merr = append(merr, err)
} else {
// woah, we got a peer under a random id? it _cannot_ be valid.
log.Errorf("dht seemingly found a peer at a random bootstrap id (%s)...", pi)
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Errorf("%s", err)
merr = append(merr, err)
}
}
sequential := true
if sequential {
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}
} else {
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var wg sync.WaitGroup
for i := 0; i < queries; i++ {
wg.Add(1)
go func() {
defer wg.Done()
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}()
}
wg.Wait()
}
if len(merr) > 0 {
return merr
}
return nil
}
......@@ -3,6 +3,7 @@ package dht
import (
"bytes"
"fmt"
"math/rand"
"sort"
"sync"
"testing"
......@@ -76,6 +77,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
ctx, cancel := context.WithCancel(ctx)
rounds := 1
for i := 0; i < rounds; i++ {
log.Debugf("bootstrapping round %d/%d\n", i, rounds)
......@@ -83,7 +85,10 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
// probably because results compound
for _, dht := range dhts {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
log.Debugf("bootstrapping round %d/%d -- %s\n", i, rounds, dht.self)
dht.Bootstrap(ctx, 3)
}
......@@ -238,7 +243,7 @@ func TestBootstrap(t *testing.T) {
ctx := context.Background()
nDHTs := 15
nDHTs := 30
_, _, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
......@@ -269,12 +274,23 @@ func TestBootstrap(t *testing.T) {
}
// test "well-formed-ness" (>= 3 peers in every routing table)
avgsize := 0
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
avgsize += rtlen
t.Logf("routing table for %s has %d peers", dht.self, rtlen)
if rtlen < 4 {
t.Errorf("routing table for %s only has %d peers", dht.self, rtlen)
// currently, we dont have good bootstrapping guarantees.
// t.Errorf("routing table for %s only has %d peers", dht.self, rtlen)
}
}
avgsize = avgsize / len(dhts)
avgsizeExpected := 6
t.Logf("avg rt size: %d", avgsize)
if avgsize < avgsizeExpected {
t.Errorf("avg rt size: %d < %d", avgsize, avgsizeExpected)
}
}
func TestProvidesMany(t *testing.T) {
......@@ -298,7 +314,7 @@ func TestProvidesMany(t *testing.T) {
<-time.After(100 * time.Millisecond)
t.Logf("bootstrapping them so they find each other", nDHTs)
ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
ctxT, _ := context.WithTimeout(ctx, 20*time.Second)
bootstrap(t, ctxT, dhts)
if u.Debug {
......
......@@ -126,3 +126,21 @@ func GetenvBool(name string) bool {
v := strings.ToLower(os.Getenv(name))
return v == "true" || v == "t" || v == "1"
}
// multiErr is a util to return multiple errors
type MultiErr []error
func (m MultiErr) Error() string {
if len(m) == 0 {
return "no errors"
}
s := "Multiple errors: "
for i, e := range m {
if i != 0 {
s += ", "
}
s += e.Error()
}
return s
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论