提交 6bc26f17 作者: Juan Batiz-Benet

writing swarm2 using go-peerstream

omg wow such pass
上级 b9525f15
......@@ -2,13 +2,13 @@ package conn
import (
"fmt"
"io"
handshake "github.com/jbenet/go-ipfs/net/handshake"
hspb "github.com/jbenet/go-ipfs/net/handshake/pb"
ggprotoio "code.google.com/p/gogoprotobuf/io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ps "github.com/jbenet/go-peerstream"
)
// Handshake1 exchanges local and remote versions and compares them
......@@ -53,7 +53,7 @@ func Handshake1(ctx context.Context, c Conn) error {
}
// Handshake3 exchanges local and remote service information
func Handshake3(ctx context.Context, stream ps.Stream, c Conn) (*handshake.Handshake3Result, error) {
func Handshake3(ctx context.Context, stream io.ReadWriter, c Conn) (*handshake.Handshake3Result, error) {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
......
package swarm
import (
"fmt"
"sync"
"testing"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestSimultOpen(t *testing.T) {
t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/1244",
"/ip4/127.0.0.1/tcp/1245",
}
ctx := context.Background()
swarms, _ := makeSwarms(ctx, t, addrs)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp := testutil.NewPeerWithID(dst.ID())
cp.AddAddress(dst.Addresses()[0])
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].local)
go connect(swarms[1], swarms[0].local)
wg.Wait()
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpenMany(t *testing.T) {
t.Skip("very very slow")
many := 500
addrs := []string{}
for i := 2200; i < (2200 + many); i++ {
s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i)
addrs = append(addrs, s)
}
SubtestSwarm(t, addrs, 10)
}
func TestSimultOpenFewStress(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Skip("skipping for another test")
num := 10
// num := 100
for i := 0; i < num; i++ {
addrs := []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 1900+i),
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2900+i),
}
SubtestSwarm(t, addrs, 10)
}
}
......@@ -3,7 +3,6 @@
package swarm
import (
conn "github.com/jbenet/go-ipfs/net/conn"
peer "github.com/jbenet/go-ipfs/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
......@@ -51,22 +50,43 @@ func (s *Swarm) teardown() error {
return s.swarm.Close()
}
// Close stops the Swarm. See
func (s *Swarm) Close() error {
return s.cg.Close()
}
// StreamSwarm returns the underlying peerstream.Swarm
func (s *Swarm) StreamSwarm() *ps.Swarm {
return s.swarm
}
// SetStreamHandler assigns the handler for new streams.
// See peerstream.
func (s *Swarm) SetStreamHandler(handler StreamHandler) {
s.swarm.SetStreamHandler(func(s *ps.Stream) {
handler(wrapStream(s))
})
}
// NewStreamWithPeer creates a new stream on any available connection to p
func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) {
st, err := s.swarm.NewStreamWithGroup(p)
return wrapStream(st), err
}
// StreamsWithPeer returns all the live Streams to p
func (s *Swarm) StreamsWithPeer(p peer.Peer) []*Stream {
return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
}
// ConnectionsToPeer returns all the live connections to p
func (s *Swarm) ConnectionsToPeer(p peer.Peer) []*SwarmConn {
return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns()))
}
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
conns1 := s.swarm.Conns()
conns2 := make([]conn.Conn, len(conns1))
for i, c1 := range conns1 {
conns2[i] = UnwrapConn(c1)
}
return conns2
func (s *Swarm) Connections() []*SwarmConn {
return wrapConns(s.swarm.Conns())
}
// CloseConnection removes a given peer from swarm + closes the connection
......@@ -80,16 +100,16 @@ func (s *Swarm) CloseConnection(p peer.Peer) error {
// GetPeerList returns a copy of the set of peers swarm is connected to.
func (s *Swarm) GetPeerList() []peer.Peer {
conns := s.swarm.Conns()
conns := s.Connections()
seen := make(map[peer.Peer]struct{})
peers := make([]peer.Peer, 0, len(conns))
for _, c := range conns {
c2 := UnwrapConn(c)
p := c2.RemotePeer()
p := c.RemotePeer()
if _, found := seen[p]; found {
continue
}
peers = append(peers, p)
}
return peers
......@@ -99,7 +119,3 @@ func (s *Swarm) GetPeerList() []peer.Peer {
func (s *Swarm) LocalPeer() peer.Peer {
return s.local
}
func UnwrapConn(c *ps.Conn) conn.Conn {
return c.NetConn().(conn.Conn)
}
package swarm
import (
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/jbenet/go-peerstream"
)
// a SwarmConn is a simple wrapper around a ps.Conn that also exposes
// some of the methods from the underlying conn.Conn.
// There's **five** "layers" to each connection:
// - 0. the net.Conn - underlying net.Conn (TCP/UDP/UTP/etc)
// - 1. the manet.Conn - provides multiaddr friendly Conn
// - 2. the conn.Conn - provides Peer friendly Conn (inc Secure channel)
// - 3. the peerstream.Conn - provides peerstream / spdysptream happiness
// - 4. the SwarmConn - abstracts everyting out, exposing only key parts of underlying layers
// (I know, this is kinda crazy. it's more historical than a good design. though the
// layers do build up pieces of functionality. and they're all just io.RW :) )
type SwarmConn ps.Conn
func (c *SwarmConn) StreamConn() *ps.Conn {
return (*ps.Conn)(c)
}
func (c *SwarmConn) RawConn() conn.Conn {
// righly panic if these things aren't true. it is an expected
// invariant that these Conns are all of the typewe expect:
// ps.Conn wrapping a conn.Conn
// if we get something else it is programmer error.
return (*ps.Conn)(c).NetConn().(conn.Conn)
}
// LocalMultiaddr is the Multiaddr on this side
func (c *SwarmConn) LocalMultiaddr() ma.Multiaddr {
return c.RawConn().LocalMultiaddr()
}
// LocalPeer is the Peer on our side of the connection
func (c *SwarmConn) LocalPeer() peer.Peer {
return c.RawConn().LocalPeer()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *SwarmConn) RemoteMultiaddr() ma.Multiaddr {
return c.RawConn().RemoteMultiaddr()
}
// RemotePeer is the Peer on the remote side
func (c *SwarmConn) RemotePeer() peer.Peer {
return c.RawConn().RemotePeer()
}
// NewStream returns a new Stream from this connection
func (c *SwarmConn) NewStream() (*Stream, error) {
s, err := c.StreamConn().NewStream()
return wrapStream(s), err
}
func (c *SwarmConn) Close() error {
return c.StreamConn().Close()
}
func wrapConn(psc *ps.Conn) (*SwarmConn, error) {
// grab the underlying connection.
if _, ok := psc.NetConn().(conn.Conn); !ok {
// this should never happen. if we see it ocurring it means that we added
// a Listener to the ps.Swarm that is NOT one of our net/conn.Listener.
return nil, fmt.Errorf("swarm connHandler: invalid conn (not a conn.Conn): %s", psc)
}
return (*SwarmConn)(psc), nil
}
// wrapConns returns a *SwarmConn for all these ps.Conns
func wrapConns(conns1 []*ps.Conn) []*SwarmConn {
conns2 := make([]*SwarmConn, len(conns1))
for i, c1 := range conns1 {
if c2, err := wrapConn(c1); err == nil {
conns2[i] = c2
}
}
return conns2
}
// newConnSetup does the swarm's "setup" for a connection. returns the underlying
// conn.Conn this method is used by both swarm.Dial and ps.Swarm connHandler
func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*SwarmConn, error) {
// wrap with a SwarmConn
sc, err := wrapConn(psConn)
if err != nil {
return nil, err
}
// removing this for now, as it has to change. we can put this in a different
// sub-protocol anyway.
// // run Handshake3
// if err := runHandshake3(ctx, s, sc); err != nil {
// return nil, err
// }
// ok great! we can use it. add it to our group.
// set the RemotePeer as a group on the conn. this lets us group
// connections in the StreamSwarm by peer, and get a streams from
// any available connection in the group (better multiconn):
// swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
psConn.AddGroup(sc.RemotePeer())
return sc, nil
}
// func runHandshake3(ctx context.Context, s *Swarm, c *SwarmConn) error {
// log.Event(ctx, "newConnection", c.LocalPeer(), c.RemotePeer())
// stream, err := c.NewStream()
// if err != nil {
// return err
// }
// // handshake3 (this whole thing is ugly. maybe lets get rid of it...)
// h3result, err := conn.Handshake3(ctx, stream, c.RawConn())
// if err != nil {
// return fmt.Errorf("Handshake3 failed: %s", err)
// }
// // check for nats. you know, just in case.
// if h3result.LocalObservedAddress != nil {
// checkNATWarning(s, h3result.LocalObservedAddress, c.LocalMultiaddr())
// } else {
// log.Warningf("Received nil observed address from %s", c.RemotePeer())
// }
// stream.Close()
// log.Event(ctx, "handshake3Succeeded", c.LocalPeer(), c.RemotePeer())
// return nil
// }
package swarm
import (
"errors"
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
peer "github.com/jbenet/go-ipfs/peer"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(p peer.Peer) (*SwarmConn, error) {
ctx := context.TODO()
if p.ID().Equal(s.local.ID()) {
return nil, errors.New("Attempted connection to self!")
}
// check if we already have an open connection first
cs := s.ConnectionsToPeer(p)
for _, c := range cs {
if c != nil { // dump out the first one we find
return c, nil
}
}
// check if we don't have the peer in Peerstore
p, err := s.peers.Add(p)
if err != nil {
return nil, err
}
// open connection to peer
d := &conn.Dialer{
LocalPeer: s.local,
Peerstore: s.peers,
}
if len(p.Addresses()) == 0 {
return nil, errors.New("peer has no addresses")
}
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
var connC conn.Conn
for _, addr := range p.Addresses() {
connC, err = d.DialAddr(ctx, addr, p)
if err == nil {
break
}
}
if err != nil {
return nil, err
}
// ok try to setup the new connection.
swarmC, err := dialConnSetup(ctx, s, connC)
if err != nil {
log.Error("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
swarmC.Close() // close the connection. didn't work out :(
return nil, err
}
log.Event(ctx, "dial", p)
return swarmC, nil
}
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*SwarmConn, error) {
psC, err := s.swarm.AddConn(connC)
if err != nil {
// connC is closed by caller if we fail.
return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
}
// ok try to setup the new connection. (newConnSetup will add to group)
swarmC, err := s.newConnSetup(ctx, psC)
if err != nil {
log.Error("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
swarmC.Close() // we need to call this to make sure psC is Closed.
return nil, err
}
return swarmC, err
}
package swarm
import (
"fmt"
conn "github.com/jbenet/go-ipfs/net/conn"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
......@@ -58,51 +57,21 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c1 *ps.Conn) {
// grab the underlying connection.
if c2, ok := c1.NetConn().(conn.Conn); ok {
// set the RemotePeer as a group on the conn. this lets us group
// connections in the StreamSwarm by peer, and get a streams from
// any available connection in the group (better multiconn):
// swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
c1.AddGroup(c2.RemotePeer())
go func() {
ctx := context.Background()
err := runHandshake3(ctx, s, c1, c2)
if err != nil {
log.Error("Handshake3 failed. disconnecting", err)
log.Event(ctx, "Handshake3FailureDisconnect", c2.LocalPeer(), c2.RemotePeer())
c1.Close() // boom.
}
}()
}
}
func runHandshake3(ctx context.Context, s *Swarm, sc *ps.Conn, c conn.Conn) error {
log.Event(ctx, "newConnection", c.LocalPeer(), c.RemotePeer())
stream, err := sc.NewStream()
if err != nil {
return err
}
// handshake3
h3result, err := conn.Handshake3(ctx, stream, c)
if err != nil {
return fmt.Errorf("Handshake3 failed: %s", err)
}
// check for nats. you know, just in case.
if h3result.LocalObservedAddress != nil {
checkNATWarning(s, h3result.LocalObservedAddress, c.LocalMultiaddr())
} else {
log.Warningf("Received nil observed address from %s", c.RemotePeer())
}
stream.Close()
log.Event(ctx, "handshake3Succeeded", c.LocalPeer(), c.RemotePeer())
return nil
func (s *Swarm) connHandler(c *ps.Conn) {
go func() {
ctx := context.Background()
// this context is for running the handshake, which -- when receiveing connections
// -- we have no bound on beyond what the transport protocol bounds it at.
// note that setup + the handshake are bounded by underlying io.
// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)
if _, err := s.newConnSetup(ctx, c); err != nil {
log.Error(err)
log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
c.Close() // boom. close it.
return
}
}()
}
package swarm
import (
ps "github.com/jbenet/go-peerstream"
)
// a Stream is a wrapper around a ps.Stream that exposes a way to get
// our SwarmConn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream ps.Stream
// StreamHandler is called when new streams are opened from remote peers.
// See peerstream.StreamHandler
type StreamHandler func(*Stream)
// Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream {
return (*ps.Stream)(s)
}
// Conn returns the Conn associated with this Stream
func (s *Stream) Conn() *SwarmConn {
return (*SwarmConn)(s.Stream().Conn())
}
// Write writes bytes to a stream, calling write data for each call.
func (s *Stream) Wait() error {
return s.Stream().Wait()
}
func (s *Stream) Read(p []byte) (n int, err error) {
return s.Stream().Read(p)
}
func (s *Stream) Write(p []byte) (n int, err error) {
return s.Stream().Write(p)
}
func (s *Stream) Close() error {
return s.Stream().Close()
}
func wrapStream(pss *ps.Stream) *Stream {
return (*Stream)(pss)
}
func wrapStreams(st []*ps.Stream) []*Stream {
out := make([]*Stream, len(st))
for i, s := range st {
out[i] = wrapStream(s)
}
return out
}
package swarm
import (
"bytes"
"io"
"sync"
"testing"
"time"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func EchoStreamHandler(stream *Stream) {
go func() {
defer stream.Close()
// pull out the ipfs conn
c := stream.Conn().RawConn()
log.Debugf("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
buf := make([]byte, 4)
for {
if _, err := stream.Read(buf); err != nil {
if err != io.EOF {
log.Error("ping receive error:", err)
}
return
}
if !bytes.Equal(buf, []byte("ping")) {
log.Errorf("ping receive error: ping != %s %v", buf, buf)
return
}
if _, err := stream.Write([]byte("pong")); err != nil {
log.Error("pond send error:", err)
return
}
}
}()
}
func setupPeer(t *testing.T, addr string) peer.Peer {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
t.Fatal(err)
}
p, err := testutil.NewPeerWithKeyPair(sk, pk)
if err != nil {
t.Fatal(err)
}
p.AddAddress(tcp)
return p
}
func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) {
swarms := []*Swarm{}
for _, addr := range addrs {
local := setupPeer(t, addr)
peerstore := peer.NewPeerstore()
swarm, err := NewSwarm(ctx, local.Addresses(), local, peerstore)
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
swarms = append(swarms, swarm)
}
peers := make([]peer.Peer, len(swarms))
for i, s := range swarms {
peers[i] = s.local
}
return swarms, peers
}
func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peers := makeSwarms(ctx, t, addrs)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp, err := s.peers.FindOrCreate(dst.ID())
if err != nil {
t.Fatal(err)
}
cp.AddAddress(dst.Addresses()[0])
log.Infof("SWARM TEST: %s dialing %s", s.local, dst)
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
log.Infof("SWARM TEST: %s connected to %s", s.local, dst)
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for _, s := range swarms {
for _, p := range peers {
if p != s.local { // don't connect to self.
wg.Add(1)
connect(s, p)
}
}
}
wg.Wait()
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.local, s.GetPeerList())
}
}
// ping/pong
for _, s1 := range swarms {
log.Debugf("-------------------------------------------------------")
log.Debugf("%s ping pong round", s1.local)
log.Debugf("-------------------------------------------------------")
_, cancel := context.WithCancel(ctx)
peers, err := s1.peers.All()
if err != nil {
t.Fatal(err)
}
got := map[u.Key]int{}
errChan := make(chan error, MsgNum*len(*peers))
streamChan := make(chan *Stream, MsgNum)
// send out "ping" x MsgNum to every peer
go func() {
defer close(streamChan)
var wg sync.WaitGroup
send := func(p peer.Peer) {
defer wg.Done()
// first, one stream per peer (nice)
stream, err := s1.NewStreamWithPeer(p)
if err != nil {
errChan <- errors.Wrap(err)
return
}
// send out ping!
for k := 0; k < MsgNum; k++ { // with k messages
msg := "ping"
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
stream.Write([]byte(msg))
}
// read it later
streamChan <- stream
}
for _, p := range *peers {
wg.Add(1)
go send(p)
}
wg.Wait()
}()
// receive "pong" x MsgNum from every peer
go func() {
defer close(errChan)
count := 0
countShouldBe := (MsgNum * len(*peers))
for stream := range streamChan { // one per peer
defer stream.Close()
// get peer on the other side
p := stream.Conn().RemotePeer()
// receive pings
msgCount := 0
msg := make([]byte, 4)
for k := 0; k < MsgNum; k++ { // with k messages
// read from the stream
if _, err := stream.Read(msg); err != nil {
errChan <- errors.Wrap(err)
continue
}
if string(msg) != "pong" {
errChan <- errors.Errorf("unexpected message: %s", msg)
continue
}
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
msgCount++
}
got[p.Key()] = msgCount
count += msgCount
}
if count != countShouldBe {
errChan <- errors.Errorf("count mismatch: %d != %d", count, countShouldBe)
}
}()
// check any errors (blocks till consumer is done)
for err := range errChan {
if err != nil {
t.Fatal(err.Error())
}
}
log.Debugf("%s got pongs", s1.local)
if len(*peers) != len(got) {
t.Error("got less messages than sent")
}
for p, n := range got {
if n != MsgNum {
t.Error("peer did not get all msgs", p, n, "/", MsgNum)
}
}
cancel()
<-time.After(10 * time.Millisecond)
}
for _, s := range swarms {
s.Close()
}
}
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/10234",
"/ip4/127.0.0.1/tcp/10235",
"/ip4/127.0.0.1/tcp/10236",
"/ip4/127.0.0.1/tcp/10237",
"/ip4/127.0.0.1/tcp/10238",
}
// msgs := 1000
msgs := 100
SubtestSwarm(t, addrs, msgs)
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论