提交 d47115bc 作者: Juan Batiz-Benet

swarm: msg wrapping

上级 afed188d
...@@ -126,7 +126,7 @@ func (s *Swarm) connVersionExchange(r conn.Conn) error { ...@@ -126,7 +126,7 @@ func (s *Swarm) connVersionExchange(r conn.Conn) error {
return err return err
} }
r.MsgOut() <- msg.New(rpeer, myVerBytes) r.Out() <- myVerBytes
log.Debug("Sent my version(%s) [to = %s]", localH, rpeer) log.Debug("Sent my version(%s) [to = %s]", localH, rpeer)
select { select {
...@@ -136,13 +136,13 @@ func (s *Swarm) connVersionExchange(r conn.Conn) error { ...@@ -136,13 +136,13 @@ func (s *Swarm) connVersionExchange(r conn.Conn) error {
// case <-remote.Done(): // case <-remote.Done():
// return errors.New("remote closed connection during version exchange") // return errors.New("remote closed connection during version exchange")
case data, ok := <-r.MsgIn(): case data, ok := <-r.In():
if !ok { if !ok {
return fmt.Errorf("Error retrieving from conn: %v", rpeer) return fmt.Errorf("Error retrieving from conn: %v", rpeer)
} }
remoteH = new(handshake.Handshake1) remoteH = new(handshake.Handshake1)
err = proto.Unmarshal(data.Data(), remoteH) err = proto.Unmarshal(data, remoteH)
if err != nil { if err != nil {
s.Close() s.Close()
return fmt.Errorf("connSetup: could not decode remote version: %q", err) return fmt.Errorf("connSetup: could not decode remote version: %q", err)
...@@ -174,7 +174,7 @@ func (s *Swarm) fanOut() { ...@@ -174,7 +174,7 @@ func (s *Swarm) fanOut() {
} }
s.connsLock.RLock() s.connsLock.RLock()
conn, found := s.conns[msg.Peer().Key()] c, found := s.conns[msg.Peer().Key()]
s.connsLock.RUnlock() s.connsLock.RUnlock()
if !found { if !found {
...@@ -187,7 +187,7 @@ func (s *Swarm) fanOut() { ...@@ -187,7 +187,7 @@ func (s *Swarm) fanOut() {
// log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer()) // log.Debug("[peer: %s] Sent message [to = %s]", s.local, msg.Peer())
// queue it in the connection's buffer // queue it in the connection's buffer
conn.MsgOut() <- msg c.Out() <- msg.Data()
} }
} }
} }
...@@ -202,7 +202,7 @@ func (s *Swarm) fanIn(c conn.Conn) { ...@@ -202,7 +202,7 @@ func (s *Swarm) fanIn(c conn.Conn) {
c.Close() c.Close()
goto out goto out
case data, ok := <-c.MsgIn(): case data, ok := <-c.In():
if !ok { if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", c.RemotePeer()) e := fmt.Errorf("Error retrieving from conn: %v", c.RemotePeer())
s.errChan <- e s.errChan <- e
...@@ -210,7 +210,7 @@ func (s *Swarm) fanIn(c conn.Conn) { ...@@ -210,7 +210,7 @@ func (s *Swarm) fanIn(c conn.Conn) {
} }
// log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer) // log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer)
s.Incoming <- data s.Incoming <- msg.New(c.RemotePeer(), data)
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论