提交 102c43a3 作者: Juan Batiz-Benet

Merge pull request #551 from jbenet/fix-swarm-more-connect

p2p/net/swarm: more connection bugs
......@@ -11,51 +11,9 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func TestSimultDials(t *testing.T) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for i := 0; i < 10; i++ { // connect 10x for each.
wg.Add(2)
go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0])
go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0])
}
wg.Wait()
}
// should still just have 1, at most 2 connections :)
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
if c01l > 2 {
t.Error("0->1 has", c01l)
}
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
if c10l > 2 {
t.Error("1->0 has", c10l)
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpen(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
......@@ -87,6 +45,7 @@ func TestSimultOpen(t *testing.T) {
func TestSimultOpenMany(t *testing.T) {
// t.Skip("very very slow")
t.Parallel()
addrs := 20
SubtestSwarm(t, addrs, 10)
......@@ -97,6 +56,7 @@ func TestSimultOpenFewStress(t *testing.T) {
t.SkipNow()
}
// t.Skip("skipping for another test")
t.Parallel()
msgs := 40
swarms := 2
......
......@@ -4,6 +4,7 @@ package swarm
import (
"fmt"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
......@@ -32,7 +33,10 @@ type Swarm struct {
local peer.ID
peers peer.Peerstore
connh ConnHandler
dsync dialsync
backf dialbackoff
dialT time.Duration // mainly for tests
cg ctxgroup.ContextGroup
}
......@@ -50,10 +54,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
}
s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
dialT: DialTimeout,
}
// configure Swarm
......
......@@ -22,14 +22,14 @@ func (s *Swarm) ListenAddresses() []ma.Multiaddr {
// InterfaceListenAddresses returns a list of addresses at which this swarm
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func InterfaceListenAddresses(s *Swarm) ([]ma.Multiaddr, error) {
func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return addrutil.ResolveUnspecifiedAddresses(s.ListenAddresses(), nil)
}
// checkNATWarning checks if our observed addresses differ. if so,
// informs the user that certain things might not work yet
func checkNATWarning(s *Swarm, observed ma.Multiaddr, expected ma.Multiaddr) {
listen, err := InterfaceListenAddresses(s)
listen, err := s.InterfaceListenAddresses()
if err != nil {
log.Errorf("Error retrieving swarm.InterfaceListenAddresses: %s", err)
return
......
......@@ -3,7 +3,9 @@ package swarm
import (
"errors"
"fmt"
"net"
"sync"
"time"
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
......@@ -12,11 +14,28 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
)
// Diagram of dial sync:
//
// many callers of Dial() synched w. dials many addrs results to callers
// ----------------------\ dialsync use earliest /--------------
// -----------------------\ |----------\ /----------------
// ------------------------>------------<------- >---------<-----------------
// -----------------------| \----x \----------------
// ----------------------| \-----x \---------------
// any may fail if no addr at end
// retry dialAttempt x
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
var DialTimeout time.Duration = time.Second * 10
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
......@@ -88,6 +107,71 @@ func (ds *dialsync) Unlock(dst peer.ID) {
ds.lock.Unlock()
}
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
// for {
// if ok, wait := dialsync.Lock(p); !ok {
// if backoff.Backoff(p) {
// return errDialFailed
// }
// <-wait
// continue
// }
// defer dialsync.Unlock(p)
// c, err := actuallyDial(p)
// if err != nil {
// dialbackoff.AddBackoff(p)
// continue
// }
// dialbackoff.Clear(p)
// }
//
type dialbackoff struct {
entries map[peer.ID]struct{}
lock sync.RWMutex
}
func (db *dialbackoff) init() {
if db.entries == nil {
db.entries = make(map[peer.ID]struct{})
}
}
// Backoff returns whether the client should backoff from dialing
// peeer p
func (db *dialbackoff) Backoff(p peer.ID) bool {
db.lock.Lock()
db.init()
_, found := db.entries[p]
db.lock.Unlock()
return found
}
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
func (db *dialbackoff) AddBackoff(p peer.ID) {
db.lock.Lock()
db.init()
db.entries[p] = struct{}{}
db.lock.Unlock()
}
// Clear removes a backoff record. Clients should call this after a
// successful Dial.
func (db *dialbackoff) Clear(p peer.ID) {
db.lock.Lock()
db.init()
delete(db.entries, p)
db.lock.Unlock()
}
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
......@@ -95,6 +179,7 @@ func (ds *dialsync) Unlock(dst peer.ID) {
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
log := log.Prefix("swarm %s dialing %s", s.local, p)
if p == s.local {
return nil, errors.New("Attempted connection to self!")
}
......@@ -118,6 +203,13 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); !ok {
if s.backf.Backoff(p) {
log.Debugf("backoff")
return nil, fmt.Errorf("%s failed to dial %s, backing off.", s.local, p)
}
log.Debugf("waiting for ongoing dial")
select {
case <-wait: // wait for that dial to finish.
continue // and see if it worked (loop), OR we got an incoming dial.
......@@ -128,13 +220,22 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
conn, err = s.dial(ctx, p)
log.Debugf("dial start")
ctxT, _ := context.WithTimeout(ctx, s.dialT)
conn, err = s.dial(ctxT, p)
s.dsync.Unlock(p)
log.Debugf("dial end %s", conn)
if err != nil {
s.backf.AddBackoff(p) // let others know to backoff
continue // ok, we failed. try again. (if loop is done, our error is output)
}
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
}
if err == nil {
err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
}
return nil, err
}
......@@ -162,13 +263,21 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
// drop out any addrs that would just dial ourselves. use ListenAddresses
// as that is a more authoritative view than localAddrs.
remoteAddrs = addrutil.Subtract(remoteAddrs, s.ListenAddresses())
ila, _ := s.InterfaceListenAddresses()
remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
if len(remoteAddrs) == 0 {
return nil, errors.New("peer has no addresses")
}
// open connection to peer
d := &conn.Dialer{
Dialer: manet.Dialer{
Dialer: net.Dialer{
Timeout: s.dialT,
},
},
LocalPeer: s.local,
LocalAddrs: localAddrs,
PrivateKey: sk,
......@@ -196,34 +305,83 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
// we dial concurrently to each of the addresses, which:
// * makes the process faster overall
// * attempts to get the fastest connection available.
// * mitigates the waste of trying bad addresses
log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel work when we exit func
foundConn := make(chan struct{})
conns := make(chan conn.Conn, len(remoteAddrs))
errs := make(chan error, len(remoteAddrs))
//TODO: rate limiting just in case?
for _, addr := range remoteAddrs {
connC, err := d.Dial(ctx, addr, p)
if err != nil {
continue
}
go func(addr ma.Multiaddr) {
connC, err := s.dialAddr(ctx, d, p, addr)
// if the connection is not to whom we thought it would be...
if connC.RemotePeer() != p {
log.Infof("misdial to %s through %s (got %s)", p, addr, connC.RemoteMultiaddr())
connC.Close()
continue
}
// check parent still wants our results
select {
case <-foundConn:
if connC != nil {
connC.Close()
}
return
default:
}
if err != nil {
errs <- err
} else if connC == nil {
errs <- fmt.Errorf("failed to dial %s %s", p, addr)
} else {
conns <- connC
}
}(addr)
}
// if the connection is to ourselves...
// this can happen TONS when Loopback addrs are advertized.
// (this should be caught by two checks above, but let's just make sure.)
if connC.RemotePeer() == s.local {
log.Infof("misdial to %s through %s", p, addr)
connC.Close()
continue
err := fmt.Errorf("failed to dial %s", p)
for i := 0; i < len(remoteAddrs); i++ {
select {
case err = <-errs:
log.Info(err)
case connC := <-conns:
// take the first + return asap
close(foundConn)
return connC, nil
}
}
return nil, err
}
func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
// success! we got one!
return connC, nil
connC, err := d.Dial(ctx, addr, p)
if err != nil {
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
}
// if the connection is not to whom we thought it would be...
remotep := connC.RemotePeer()
if remotep != p {
connC.Close()
return nil, fmt.Errorf("misdial to %s through %s (got %s)", p, addr, remotep)
}
return nil, fmt.Errorf("failed to dial %s", p)
// if the connection is to ourselves...
// this can happen TONS when Loopback addrs are advertized.
// (this should be caught by two checks above, but let's just make sure.)
if remotep == s.local {
connC.Close()
return nil, fmt.Errorf("misdial to %s through %s (got self)", p, addr)
}
// success! we got one!
return connC, nil
}
// dialConnSetup is the setup logic for a connection from the dial side. it
......
......@@ -111,7 +111,7 @@ func (n *Network) ListenAddresses() []ma.Multiaddr {
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return InterfaceListenAddresses(n.Swarm())
return n.Swarm().InterfaceListenAddresses()
}
// Connectedness returns a state signaling connection capabilities
......
......@@ -227,6 +227,7 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
// msgs := 1000
msgs := 100
......@@ -236,6 +237,7 @@ func TestSwarm(t *testing.T) {
func TestConnHandler(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 5)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论