提交 29ab6dec 作者: Juan Batiz-Benet

added msg counters to logs

上级 63d6ee6d
...@@ -132,6 +132,7 @@ func (c *MultiConn) fanOut() { ...@@ -132,6 +132,7 @@ func (c *MultiConn) fanOut() {
c.Children().Add(1) c.Children().Add(1)
defer c.Children().Done() defer c.Children().Done()
i := 0
for { for {
select { select {
case <-c.Closing(): case <-c.Closing():
...@@ -140,6 +141,7 @@ func (c *MultiConn) fanOut() { ...@@ -140,6 +141,7 @@ func (c *MultiConn) fanOut() {
// send data out through our "best connection" // send data out through our "best connection"
case m, more := <-c.duplex.Out: case m, more := <-c.duplex.Out:
if !more { if !more {
log.Info("%s out channel closed", c)
return return
} }
sc := c.BestConn() sc := c.BestConn()
...@@ -147,6 +149,9 @@ func (c *MultiConn) fanOut() { ...@@ -147,6 +149,9 @@ func (c *MultiConn) fanOut() {
// maybe this should be a logged error, not a panic. // maybe this should be a logged error, not a panic.
panic("sending out multiconn without any live connection") panic("sending out multiconn without any live connection")
} }
i++
log.Info("%s sending (%d)", sc, i)
sc.Out() <- m sc.Out() <- m
} }
} }
...@@ -160,6 +165,8 @@ func (c *MultiConn) fanInSingle(child Conn) { ...@@ -160,6 +165,8 @@ func (c *MultiConn) fanInSingle(child Conn) {
// cleanup all data associated with this child Connection. // cleanup all data associated with this child Connection.
defer func() { defer func() {
log.Info("closing: %s", child)
// in case it still is in the map, remove it. // in case it still is in the map, remove it.
c.Lock() c.Lock()
delete(c.conns, child.ID()) delete(c.conns, child.ID())
...@@ -174,6 +181,7 @@ func (c *MultiConn) fanInSingle(child Conn) { ...@@ -174,6 +181,7 @@ func (c *MultiConn) fanInSingle(child Conn) {
} }
}() }()
i := 0
for { for {
select { select {
case <-c.Closing(): // multiconn closing case <-c.Closing(): // multiconn closing
...@@ -184,8 +192,11 @@ func (c *MultiConn) fanInSingle(child Conn) { ...@@ -184,8 +192,11 @@ func (c *MultiConn) fanInSingle(child Conn) {
case m, more := <-child.In(): // receiving data case m, more := <-child.In(): // receiving data
if !more { if !more {
log.Info("%s in channel closed", child)
return // closed return // closed
} }
i++
log.Info("%s received (%d)", child, i)
c.duplex.In <- m c.duplex.In <- m
} }
} }
......
...@@ -136,6 +136,7 @@ func (s *Swarm) fanOut() { ...@@ -136,6 +136,7 @@ func (s *Swarm) fanOut() {
s.Children().Add(1) s.Children().Add(1)
defer s.Children().Done() defer s.Children().Done()
i := 0
for { for {
select { select {
case <-s.Closing(): case <-s.Closing():
...@@ -143,6 +144,7 @@ func (s *Swarm) fanOut() { ...@@ -143,6 +144,7 @@ func (s *Swarm) fanOut() {
case msg, ok := <-s.Outgoing: case msg, ok := <-s.Outgoing:
if !ok { if !ok {
log.Info("%s outgoing channel closed", s)
return return
} }
...@@ -157,8 +159,8 @@ func (s *Swarm) fanOut() { ...@@ -157,8 +159,8 @@ func (s *Swarm) fanOut() {
continue continue
} }
// log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer()) i++
log.Debug("%s sent message to %s (%d)", s.local, msg.Peer(), i)
// queue it in the connection's buffer // queue it in the connection's buffer
c.Out() <- msg.Data() c.Out() <- msg.Data()
} }
...@@ -182,6 +184,7 @@ func (s *Swarm) fanInSingle(c conn.Conn) { ...@@ -182,6 +184,7 @@ func (s *Swarm) fanInSingle(c conn.Conn) {
c.Children().Done() // child of Conn as well. c.Children().Done() // child of Conn as well.
}() }()
i := 0
for { for {
select { select {
case <-s.Closing(): // Swarm closing case <-s.Closing(): // Swarm closing
...@@ -192,9 +195,11 @@ func (s *Swarm) fanInSingle(c conn.Conn) { ...@@ -192,9 +195,11 @@ func (s *Swarm) fanInSingle(c conn.Conn) {
case data, ok := <-c.In(): case data, ok := <-c.In():
if !ok { if !ok {
log.Info("%s in channel closed", c)
return // channel closed. return // channel closed.
} }
// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer) i++
log.Debug("%s received message from %s (%d)", s.local, c.RemotePeer(), i)
s.Incoming <- msg.New(c.RemotePeer(), data) s.Incoming <- msg.New(c.RemotePeer(), data)
} }
} }
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
) )
func pong(ctx context.Context, swarm *Swarm) { func pong(ctx context.Context, swarm *Swarm) {
i := 0
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
...@@ -23,7 +24,8 @@ func pong(ctx context.Context, swarm *Swarm) { ...@@ -23,7 +24,8 @@ func pong(ctx context.Context, swarm *Swarm) {
case m1 := <-swarm.Incoming: case m1 := <-swarm.Incoming:
if bytes.Equal(m1.Data(), []byte("ping")) { if bytes.Equal(m1.Data(), []byte("ping")) {
m2 := msg.New(m1.Peer(), []byte("pong")) m2 := msg.New(m1.Peer(), []byte("pong"))
log.Debug("%s pong %s", swarm.local, m1.Peer()) i++
log.Debug("%s pong %s (%d)", swarm.local, m1.Peer(), i)
swarm.Outgoing <- m2 swarm.Outgoing <- m2
} }
} }
...@@ -132,7 +134,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { ...@@ -132,7 +134,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
for k := 0; k < MsgNum; k++ { for k := 0; k < MsgNum; k++ {
for _, p := range *peers { for _, p := range *peers {
log.Debug("%s ping %s", s1.local, p) log.Debug("%s ping %s (%d)", s1.local, p, k)
s1.Outgoing <- msg.New(p, []byte("ping")) s1.Outgoing <- msg.New(p, []byte("ping"))
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论