提交 7fdafaf1 作者: Juan Batiz-Benet

stress test

上级 fd3cd399
......@@ -2,17 +2,21 @@
package mocknet
import (
"fmt"
"io"
"sync"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
var log = eventlog.Logger("mocknet")
type Stream struct {
io.Reader
io.Writer
......@@ -98,6 +102,7 @@ func (c *Conn) removeStream(s *Stream) {
func (c *Conn) NewStreamWithProtocol(pr inet.ProtocolID, p peer.Peer) (inet.Stream, error) {
log.Debugf("NewStreamWithProtocol: %s --> %s", c.local, p)
ss, _ := newStreamPair(c.local, p)
if err := inet.WriteProtocolHeader(pr, ss); err != nil {
......@@ -149,13 +154,12 @@ func MakeNetworks(ctx context.Context, peers []peer.Peer) (nets []*Network, err
}
}
i := 0
for _, n1 := range nets {
for _, n2 := range nets {
if n1 == n2 {
continue
}
n1.conns[n2.local] = &Conn{local: n1, remote: n2}
log.Debugf("%d setup %s -> %s", i, n1, n2)
i++
}
}
......@@ -175,6 +179,9 @@ func newNetwork(ctx context.Context, local peer.Peer, peers peer.Peerstore) (*Ne
n.cg.SetTeardown(n.close)
return n, nil
}
func (n *Network) String() string {
return fmt.Sprintf("<Network %s - %d conns>", n.local, len(n.conns))
}
func (n *Network) handle(s inet.Stream) {
go n.mux.Handle(s)
......
......@@ -3,6 +3,8 @@ package mocknet
import (
"bytes"
"io"
"math/rand"
"sync"
"testing"
inet "github.com/jbenet/go-ipfs/net"
......@@ -35,15 +37,11 @@ func TestNetworkSetup(t *testing.T) {
t.Error("peer mismatch")
}
if len(n.conns) != (len(nets) - 1) {
if len(n.conns) != len(nets) {
t.Error("conn mismatch")
}
for _, c := range n.conns {
if c.remote.local == n.local {
t.Error("conn to self")
}
if c.remote.conns[n.local] == nil {
t.Error("conn other side fail")
}
......@@ -101,3 +99,96 @@ func TestStreams(t *testing.T) {
}
}
func makePinger(st string, n int) func(inet.Stream) {
return func(s inet.Stream) {
go func() {
defer s.Close()
for i := 0; i < n; i++ {
b := make([]byte, 4+len(st))
if _, err := s.Write([]byte("ping" + st)); err != nil {
panic(err)
}
if _, err := io.ReadFull(s, b); err != nil {
panic(err)
}
if !bytes.Equal(b, []byte("pong"+st)) {
panic("bytes mismatch")
}
}
}()
}
}
func makePonger(st string) func(inet.Stream) {
return func(s inet.Stream) {
go func() {
defer s.Close()
for {
b := make([]byte, 4+len(st))
if _, err := io.ReadFull(s, b); err != nil {
if err == io.EOF {
return
}
panic(err)
}
if !bytes.Equal(b, []byte("ping"+st)) {
panic("bytes mismatch")
}
if _, err := s.Write([]byte("pong" + st)); err != nil {
panic(err)
}
}
}()
}
}
func TestStreamsStress(t *testing.T) {
peers := []peer.Peer{}
for i := 0; i < 100; i++ {
peers = append(peers, testutil.RandPeer())
}
nets, err := MakeNetworks(context.Background(), peers)
if err != nil {
t.Fatal(err)
}
protos := []inet.ProtocolID{
inet.ProtocolDHT,
inet.ProtocolBitswap,
inet.ProtocolDiag,
}
for _, n := range nets {
for _, p := range protos {
n.SetHandler(p, makePonger(string(p)))
}
}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
from := rand.Intn(len(peers))
to := rand.Intn(len(peers))
p := rand.Intn(3)
proto := protos[p]
log.Debug("%d (%s) %d (%s) %d (%s)", from, nets[from], to, nets[to], p, protos[p])
s, err := nets[from].NewStream(protos[p], nets[to].local)
if err != nil {
panic(err)
}
log.Infof("%d start pinging", i)
makePinger(string(proto), rand.Intn(100))(s)
log.Infof("%d done pinging", i)
}(i)
}
wg.Done()
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论