提交 0061f0c1 作者: Juan Batiz-Benet

new swarm -- it's so simple

上级 393842e2
package swarm
import (
conn "github.com/jbenet/go-ipfs/net/conn"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
)
// ListenAddresses returns a list of addresses at which this swarm listens.
func (s *Swarm) ListenAddresses() []ma.Multiaddr {
listeners := s.swarm.Listeners()
addrs := make([]ma.Multiaddr, 0, len(listeners))
for _, l := range listeners {
if l2, ok := l.NetListener().(conn.Listener); ok {
addrs = append(addrs, l2.Multiaddr())
}
}
return addrs
}
// InterfaceListenAddresses returns a list of addresses at which this swarm
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func InterfaceListenAddresses(s *Swarm) ([]ma.Multiaddr, error) {
return resolveUnspecifiedAddresses(s.ListenAddresses())
}
// resolveUnspecifiedAddresses expands unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func resolveUnspecifiedAddresses(unspecifiedAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
var outputAddrs []ma.Multiaddr
// todo optimize: only fetch these if we have a "any" addr.
ifaceAddrs, err := interfaceAddresses()
if err != nil {
return nil, err
}
for _, a := range unspecifiedAddrs {
// split address into its components
split := ma.Split(a)
// if first component (ip) is not unspecified, use it as is.
if !manet.IsIPUnspecified(split[0]) {
outputAddrs = append(outputAddrs, a)
continue
}
// unspecified? add one address per interface.
for _, ia := range ifaceAddrs {
split[0] = ia
joined := ma.Join(split...)
outputAddrs = append(outputAddrs, joined)
}
}
log.Event(context.TODO(), "interfaceListenAddresses", func() eventlog.Loggable {
var addrs []string
for _, addr := range outputAddrs {
addrs = append(addrs, addr.String())
}
return eventlog.Metadata{"addresses": addrs}
}())
log.Debug("InterfaceListenAddresses:", outputAddrs)
return outputAddrs, nil
}
// interfaceAddresses returns a list of addresses associated with local machine
func interfaceAddresses() ([]ma.Multiaddr, error) {
maddrs, err := manet.InterfaceMultiaddrs()
if err != nil {
return nil, err
}
var nonLoopback []ma.Multiaddr
for _, a := range maddrs {
if !manet.IsIPLoopback(a) {
nonLoopback = append(nonLoopback, a)
}
}
return nonLoopback, nil
}
// addrInList returns whether or not an address is part of a list.
// this is useful to check if NAT is happening (or other bugs?)
func addrInList(addr ma.Multiaddr, list []ma.Multiaddr) bool {
for _, addr2 := range list {
if addr.Equal(addr2) {
return true
}
}
return false
}
// checkNATWarning checks if our observed addresses differ. if so,
// informs the user that certain things might not work yet
func checkNATWarning(s *Swarm, observed ma.Multiaddr, expected ma.Multiaddr) {
if observed.Equal(expected) {
return
}
listen, err := InterfaceListenAddresses(s)
if err != nil {
log.Errorf("Error retrieving swarm.InterfaceListenAddresses: %s", err)
return
}
if !addrInList(observed, listen) { // probably a nat
log.Warningf(natWarning, observed, listen)
}
}
const natWarning = `Remote peer observed our address to be: %s
The local addresses are: %s
Thus, connection is going through NAT, and other connections may fail.
IPFS NAT traversal is still under development. Please bug us on github or irc to fix this.
Baby steps: http://jbenet.static.s3.amazonaws.com/271dfcf/baby-steps.gif
`
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
package swarm
import (
conn "github.com/jbenet/go-ipfs/net/conn"
peer "github.com/jbenet/go-ipfs/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/jbenet/go-peerstream"
)
var log = eventlog.Logger("swarm2")
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
//
// Uses peerstream.Swarm
type Swarm struct {
swarm *ps.Swarm
local peer.Peer
peers peer.Peerstore
cg ctxgroup.ContextGroup
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.Peer, peers peer.Peerstore) (*Swarm, error) {
s := &Swarm{
swarm: ps.NewSwarm(),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
}
// configure Swarm
s.cg.SetTeardown(s.teardown)
s.swarm.SetConnHandler(s.connHandler)
return s, s.listen(listenAddrs)
}
func (s *Swarm) teardown() error {
return s.swarm.Close()
}
func (s *Swarm) Close() error {
return s.cg.Close()
}
func (s *Swarm) StreamSwarm() *ps.Swarm {
return s.swarm
}
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
conns1 := s.swarm.Conns()
conns2 := make([]conn.Conn, len(conns1))
for i, c1 := range conns1 {
conns2[i] = UnwrapConn(c1)
}
return conns2
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p peer.Peer) error {
conns := s.swarm.ConnsWithGroup(p) // boom.
for _, c := range conns {
c.Close()
}
return nil
}
// GetPeerList returns a copy of the set of peers swarm is connected to.
func (s *Swarm) GetPeerList() []peer.Peer {
conns := s.swarm.Conns()
seen := make(map[peer.Peer]struct{})
peers := make([]peer.Peer, 0, len(conns))
for _, c := range conns {
c2 := UnwrapConn(c)
p := c2.RemotePeer()
if _, found := seen[p]; found {
continue
}
peers = append(peers, p)
}
return peers
}
// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.Peer {
return s.local
}
func UnwrapConn(c *ps.Conn) conn.Conn {
return c.NetConn().(conn.Conn)
}
package swarm
import (
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
multierr "github.com/jbenet/go-ipfs/util/multierr"
ps "github.com/jbenet/go-peerstream"
)
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
retErr := multierr.New()
// listen on every address
for i, addr := range addrs {
err := s.setupListener(addr)
if err != nil {
retErr.Errors[i] = err
log.Errorf("Failed to listen on: %s - %s", addr, err)
}
}
if len(retErr.Errors) > 0 {
return retErr
}
return nil
}
// Listen for new connections on the given multiaddr
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
if err != nil {
return err
}
list, err := conn.Listen(s.cg.Context(), maddr, s.local, s.peers)
if err != nil {
return err
}
// add resolved local addresses to peer
for _, addr := range resolved {
s.local.AddAddress(addr)
}
// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
_, err = s.swarm.AddListener(list)
return err
}
// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c1 *ps.Conn) {
// grab the underlying connection.
if c2, ok := c1.NetConn().(conn.Conn); ok {
// set the RemotePeer as a group on the conn. this lets us group
// connections in the StreamSwarm by peer, and get a streams from
// any available connection in the group (better multiconn):
// swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
c1.AddGroup(c2.RemotePeer())
go func() {
ctx := context.Background()
err := runHandshake3(ctx, s, c1, c2)
if err != nil {
log.Error("Handshake3 failed. disconnecting", err)
log.Event(ctx, "Handshake3FailureDisconnect", c2.LocalPeer(), c2.RemotePeer())
c1.Close() // boom.
}
}()
}
}
func runHandshake3(ctx context.Context, s *Swarm, sc *ps.Conn, c conn.Conn) error {
log.Event(ctx, "newConnection", c.LocalPeer(), c.RemotePeer())
stream, err := sc.NewStream()
if err != nil {
return err
}
// handshake3
h3result, err := conn.Handshake3(ctx, stream, c)
if err != nil {
return fmt.Errorf("Handshake3 failed: %s", err)
}
// check for nats. you know, just in case.
if h3result.LocalObservedAddress != nil {
checkNATWarning(s, h3result.LocalObservedAddress, c.LocalMultiaddr())
} else {
log.Warningf("Received nil observed address from %s", c.RemotePeer())
}
stream.Close()
log.Event(ctx, "handshake3Succeeded", c.LocalPeer(), c.RemotePeer())
return nil
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论