提交 1b1ef6aa 作者: Juan Batiz-Benet

add local to net/conn

上级 a7bca102
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
spipe "github.com/jbenet/go-ipfs/crypto/spipe" spipe "github.com/jbenet/go-ipfs/crypto/spipe"
...@@ -22,9 +21,9 @@ const MaxMessageSize = 1 << 20 ...@@ -22,9 +21,9 @@ const MaxMessageSize = 1 << 20
// Conn represents a connection to another Peer (IPFS Node). // Conn represents a connection to another Peer (IPFS Node).
type Conn struct { type Conn struct {
Peer *peer.Peer Local *peer.Peer
Addr ma.Multiaddr Remote *peer.Peer
Conn manet.Conn Conn manet.Conn
Closed chan bool Closed chan bool
Outgoing *msgio.Chan Outgoing *msgio.Chan
...@@ -36,11 +35,11 @@ type Conn struct { ...@@ -36,11 +35,11 @@ type Conn struct {
type Map map[u.Key]*Conn type Map map[u.Key]*Conn
// NewConn constructs a new connection // NewConn constructs a new connection
func NewConn(peer *peer.Peer, addr ma.Multiaddr, mconn manet.Conn) (*Conn, error) { func NewConn(local, remote *peer.Peer, mconn manet.Conn) (*Conn, error) {
conn := &Conn{ conn := &Conn{
Peer: peer, Local: local,
Addr: addr, Remote: remote,
Conn: mconn, Conn: mconn,
} }
if err := conn.newChans(); err != nil { if err := conn.newChans(); err != nil {
...@@ -52,18 +51,28 @@ func NewConn(peer *peer.Peer, addr ma.Multiaddr, mconn manet.Conn) (*Conn, error ...@@ -52,18 +51,28 @@ func NewConn(peer *peer.Peer, addr ma.Multiaddr, mconn manet.Conn) (*Conn, error
// Dial connects to a particular peer, over a given network // Dial connects to a particular peer, over a given network
// Example: Dial("udp", peer) // Example: Dial("udp", peer)
func Dial(network string, peer *peer.Peer) (*Conn, error) { func Dial(network string, local, remote *peer.Peer) (*Conn, error) {
addr := peer.NetAddress(network) laddr := local.NetAddress(network)
if addr == nil { if laddr == nil {
return nil, fmt.Errorf("No address for network %s", network) return nil, fmt.Errorf("No local address for network %s", network)
} }
nconn, err := manet.Dial(addr) raddr := remote.NetAddress(network)
if raddr == nil {
return nil, fmt.Errorf("No remote address for network %s", network)
}
// TODO: try to get reusing addr/ports to work.
// dialer := manet.Dialer{LocalAddr: laddr}
dialer := manet.Dialer{}
log.Info("%s %s dialing %s %s", local, laddr, remote, raddr)
nconn, err := dialer.Dial(raddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewConn(peer, addr, nconn) return NewConn(local, remote, nconn)
} }
// Construct new channels for given Conn. // Construct new channels for given Conn.
...@@ -84,7 +93,7 @@ func (c *Conn) newChans() error { ...@@ -84,7 +93,7 @@ func (c *Conn) newChans() error {
// Close closes the connection, and associated channels. // Close closes the connection, and associated channels.
func (c *Conn) Close() error { func (c *Conn) Close() error {
log.Debug("Closing Conn with %v", c.Peer) log.Debug("%s closing Conn with %s", c.Local, c.Remote)
if c.Conn == nil { if c.Conn == nil {
return fmt.Errorf("Already closed") // already closed return fmt.Errorf("Already closed") // already closed
} }
......
...@@ -65,12 +65,17 @@ func TestDial(t *testing.T) { ...@@ -65,12 +65,17 @@ func TestDial(t *testing.T) {
} }
go echoListen(listener) go echoListen(listener)
p, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234") p1, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234")
if err != nil { if err != nil {
t.Fatal("error setting up peer", err) t.Fatal("error setting up peer", err)
} }
c, err := Dial("tcp", p) p2, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34", "/ip4/127.0.0.1/tcp/3456")
if err != nil {
t.Fatal("error setting up peer", err)
}
c, err := Dial("tcp", p2, p1)
if err != nil { if err != nil {
t.Fatal("error dialing peer", err) t.Fatal("error dialing peer", err)
} }
......
...@@ -3,6 +3,8 @@ package swarm ...@@ -3,6 +3,8 @@ package swarm
import ( import (
"errors" "errors"
"fmt" "fmt"
"net"
"syscall"
spipe "github.com/jbenet/go-ipfs/crypto/spipe" spipe "github.com/jbenet/go-ipfs/crypto/spipe"
conn "github.com/jbenet/go-ipfs/net/conn" conn "github.com/jbenet/go-ipfs/net/conn"
...@@ -44,6 +46,11 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error { ...@@ -44,6 +46,11 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error {
return err return err
} }
// make sure port can be reused. TOOD this doesn't work...
// if err := setSocketReuse(list); err != nil {
// return err
// }
// NOTE: this may require a lock around it later. currently, only run on setup // NOTE: this may require a lock around it later. currently, only run on setup
s.listeners = append(s.listeners, list) s.listeners = append(s.listeners, list)
...@@ -71,11 +78,9 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error { ...@@ -71,11 +78,9 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error {
// Handle getting ID from this peer, handshake, and adding it into the map // Handle getting ID from this peer, handshake, and adding it into the map
func (s *Swarm) handleIncomingConn(nconn manet.Conn) { func (s *Swarm) handleIncomingConn(nconn manet.Conn) {
addr := nconn.RemoteMultiaddr()
// Construct conn with nil peer for now, because we don't know its ID yet. // Construct conn with nil peer for now, because we don't know its ID yet.
// connSetup will figure this out, and pull out / construct the peer. // connSetup will figure this out, and pull out / construct the peer.
c, err := conn.NewConn(nil, addr, nconn) c, err := conn.NewConn(s.local, nil, nconn)
if err != nil { if err != nil {
s.errChan <- err s.errChan <- err
return return
...@@ -96,20 +101,20 @@ func (s *Swarm) connSetup(c *conn.Conn) error { ...@@ -96,20 +101,20 @@ func (s *Swarm) connSetup(c *conn.Conn) error {
return errors.New("Tried to start nil connection.") return errors.New("Tried to start nil connection.")
} }
if c.Peer != nil { if c.Remote != nil {
log.Debug("Starting connection: %s", c.Peer) log.Debug("%s Starting connection: %s", c.Local, c.Remote)
} else { } else {
log.Debug("Starting connection: [unknown peer]") log.Debug("%s Starting connection: [unknown peer]", c.Local)
} }
if err := s.connSecure(c); err != nil { if err := s.connSecure(c); err != nil {
return fmt.Errorf("Conn securing error: %v", err) return fmt.Errorf("Conn securing error: %v", err)
} }
log.Debug("Secured connection: %s", c.Peer) log.Debug("%s secured connection: %s", c.Local, c.Remote)
// add address of connection to Peer. Maybe it should happen in connSecure. // add address of connection to Peer. Maybe it should happen in connSecure.
c.Peer.AddAddress(c.Addr) c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
if err := s.connVersionExchange(c); err != nil { if err := s.connVersionExchange(c); err != nil {
return fmt.Errorf("Conn version exchange error: %v", err) return fmt.Errorf("Conn version exchange error: %v", err)
...@@ -117,12 +122,12 @@ func (s *Swarm) connSetup(c *conn.Conn) error { ...@@ -117,12 +122,12 @@ func (s *Swarm) connSetup(c *conn.Conn) error {
// add to conns // add to conns
s.connsLock.Lock() s.connsLock.Lock()
if _, ok := s.conns[c.Peer.Key()]; ok { if _, ok := s.conns[c.Remote.Key()]; ok {
log.Debug("Conn already open!") log.Debug("Conn already open!")
s.connsLock.Unlock() s.connsLock.Unlock()
return ErrAlreadyOpen return ErrAlreadyOpen
} }
s.conns[c.Peer.Key()] = c s.conns[c.Remote.Key()] = c
log.Debug("Added conn to map!") log.Debug("Added conn to map!")
s.connsLock.Unlock() s.connsLock.Unlock()
...@@ -147,10 +152,10 @@ func (s *Swarm) connSecure(c *conn.Conn) error { ...@@ -147,10 +152,10 @@ func (s *Swarm) connSecure(c *conn.Conn) error {
return err return err
} }
if c.Peer == nil { if c.Remote == nil {
c.Peer = sp.RemotePeer() c.Remote = sp.RemotePeer()
} else if c.Peer != sp.RemotePeer() { } else if c.Remote != sp.RemotePeer() {
panic("peers not being constructed correctly.") panic("peers not being constructed correctly.")
} }
...@@ -251,20 +256,43 @@ func (s *Swarm) fanIn(c *conn.Conn) { ...@@ -251,20 +256,43 @@ func (s *Swarm) fanIn(c *conn.Conn) {
case data, ok := <-c.Secure.In: case data, ok := <-c.Secure.In:
if !ok { if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", c.Peer) e := fmt.Errorf("Error retrieving from conn: %v", c.Remote)
s.errChan <- e s.errChan <- e
goto out goto out
} }
// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer) // log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer)
msg := msg.New(c.Peer, data) msg := msg.New(c.Remote, data)
s.Incoming <- msg s.Incoming <- msg
} }
} }
out: out:
s.connsLock.Lock() s.connsLock.Lock()
delete(s.conns, c.Peer.Key()) delete(s.conns, c.Remote.Key())
s.connsLock.Unlock() s.connsLock.Unlock()
} }
func setSocketReuse(l manet.Listener) error {
nl := l.NetListener()
// for now only TCP. TODO change this when more networks.
file, err := nl.(*net.TCPListener).File()
if err != nil {
return err
}
fd := file.Fd()
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return err
}
err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
if err != nil {
return err
}
return nil
}
...@@ -129,7 +129,7 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) { ...@@ -129,7 +129,7 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) {
} }
// open connection to peer // open connection to peer
c, err = conn.Dial("tcp", peer) c, err = conn.Dial("tcp", s.local, peer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -153,7 +153,7 @@ func (s *Swarm) DialAddr(addr ma.Multiaddr) (*conn.Conn, error) { ...@@ -153,7 +153,7 @@ func (s *Swarm) DialAddr(addr ma.Multiaddr) (*conn.Conn, error) {
npeer := new(peer.Peer) npeer := new(peer.Peer)
npeer.AddAddress(addr) npeer.AddAddress(addr)
c, err := conn.Dial("tcp", npeer) c, err := conn.Dial("tcp", s.local, npeer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -201,11 +201,12 @@ func (s *Swarm) GetErrChan() chan error { ...@@ -201,11 +201,12 @@ func (s *Swarm) GetErrChan() chan error {
return s.errChan return s.errChan
} }
// GetPeerList returns a copy of the set of peers swarm is connected to.
func (s *Swarm) GetPeerList() []*peer.Peer { func (s *Swarm) GetPeerList() []*peer.Peer {
var out []*peer.Peer var out []*peer.Peer
s.connsLock.RLock() s.connsLock.RLock()
for _, p := range s.conns { for _, p := range s.conns {
out = append(out, p.Peer) out = append(out, p.Remote)
} }
s.connsLock.RUnlock() s.connsLock.RUnlock()
return out return out
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论