提交 8e6609e7 作者: Jeromy

add timeout opt to transport dialer creation

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 d26a9182
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"strings" "strings"
"time"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
...@@ -19,7 +18,7 @@ import ( ...@@ -19,7 +18,7 @@ import (
type WrapFunc func(transport.Conn) transport.Conn type WrapFunc func(transport.Conn) transport.Conn
func NewDialer(p peer.ID, pk ci.PrivKey, tout time.Duration, wrap WrapFunc) *Dialer { func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer {
return &Dialer{ return &Dialer{
LocalPeer: p, LocalPeer: p,
PrivateKey: pk, PrivateKey: pk,
......
...@@ -103,7 +103,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, ...@@ -103,7 +103,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
bwc: bwc, bwc: bwc,
fdRateLimit: make(chan struct{}, concurrentFdDials), fdRateLimit: make(chan struct{}, concurrentFdDials),
Filters: filter.NewFilters(), Filters: filter.NewFilters(),
dialer: conn.NewDialer(local, peers.PrivKey(local), DialTimeout, wrap), dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
} }
// configure Swarm // configure Swarm
......
...@@ -22,7 +22,7 @@ func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error { ...@@ -22,7 +22,7 @@ func (s *Swarm) setupAddresses(addrs []ma.Multiaddr) error {
return fmt.Errorf("no transport for address: %s", a) return fmt.Errorf("no transport for address: %s", a)
} }
d, err := tpt.Dialer(a) d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout))
if err != nil { if err != nil {
return err return err
} }
......
package transport package transport
import ( import (
"fmt"
"net" "net"
"sync" "sync"
"time"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
...@@ -26,7 +28,7 @@ func NewTCPTransport() *TcpTransport { ...@@ -26,7 +28,7 @@ func NewTCPTransport() *TcpTransport {
} }
} }
func (t *TcpTransport) Dialer(laddr ma.Multiaddr) (Dialer, error) { func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error) {
t.dlock.Lock() t.dlock.Lock()
defer t.dlock.Unlock() defer t.dlock.Unlock()
s := laddr.String() s := laddr.String()
...@@ -34,8 +36,17 @@ func (t *TcpTransport) Dialer(laddr ma.Multiaddr) (Dialer, error) { ...@@ -34,8 +36,17 @@ func (t *TcpTransport) Dialer(laddr ma.Multiaddr) (Dialer, error) {
if found { if found {
return d, nil return d, nil
} }
var base manet.Dialer var base manet.Dialer
for _, o := range opts {
switch o := o.(type) {
case TimeoutOpt:
base.Timeout = o.(time.Duration)
default:
return nil, fmt.Errorf("unrecognized option: %#v", o)
}
}
tcpd, err := t.newTcpDialer(base, laddr) tcpd, err := t.newTcpDialer(base, laddr)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -119,10 +130,17 @@ func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr) (*tcp ...@@ -119,10 +130,17 @@ func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr) (*tcp
}, nil }, nil
} }
rd := reuseport.Dialer{
D: net.Dialer{
LocalAddr: la,
Timeout: base.Timeout,
},
}
return &tcpDialer{ return &tcpDialer{
doReuse: true, doReuse: true,
laddr: laddr, laddr: laddr,
rd: reuseport.Dialer{D: net.Dialer{LocalAddr: la}}, rd: rd,
madialer: base, madialer: base,
transport: t, transport: t,
}, nil }, nil
......
...@@ -17,7 +17,7 @@ type Conn interface { ...@@ -17,7 +17,7 @@ type Conn interface {
} }
type Transport interface { type Transport interface {
Dialer(laddr ma.Multiaddr) (Dialer, error) Dialer(laddr ma.Multiaddr, opts ...DialOpt) (Dialer, error)
Listener(laddr ma.Multiaddr) (Listener, error) Listener(laddr ma.Multiaddr) (Listener, error)
Matches(ma.Multiaddr) bool Matches(ma.Multiaddr) bool
} }
...@@ -43,6 +43,9 @@ func (cw *connWrap) Transport() Transport { ...@@ -43,6 +43,9 @@ func (cw *connWrap) Transport() Transport {
return cw.transport return cw.transport
} }
type DialOpt interface{}
type TimeoutOpt interface{}
func IsTcpMultiaddr(a ma.Multiaddr) bool { func IsTcpMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols() p := a.Protocols()
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论