提交 7a7bf8d8 作者: Juan Batiz-Benet

conn: raw []byte, not msg

This commit actually removes the previously introduced
chan net.NetMessage, in favor of raw []byte. It plays
nicer with crypto/spipe, and it makes more sense in the
context of a "single connection", i.e. I already know the
peer I'm talking to, from the connection. The NetMessage
additional Peer is useful swarm and up.
上级 e45a6ced
......@@ -10,7 +10,6 @@ import (
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -48,12 +47,8 @@ type singleConn struct {
secure *spipe.SecurePipe
insecure *msgioPipe
msgpipe *msg.Pipe
}
// Map maps Keys (Peer.IDs) to Connections.
type Map map[u.Key]Conn
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote *peer.Peer,
peers peer.Peerstore, maconn manet.Conn) (Conn, error) {
......@@ -67,7 +62,6 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer,
ctx: ctx,
cancel: cancel,
insecure: newMsgioPipe(10),
msgpipe: msg.NewPipe(10),
}
log.Info("newSingleConn: %v to %v", local, remote)
......@@ -117,45 +111,9 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error {
panic("peers not being constructed correctly.")
}
// silly we have to do it this way.
go c.unwrapOutMsgs()
go c.wrapInMsgs()
return nil
}
// unwrapOutMsgs sends just the raw data of a message through secure
func (c *singleConn) unwrapOutMsgs() {
for {
select {
case <-c.ctx.Done():
return
case m, more := <-c.msgpipe.Outgoing:
if !more {
return
}
c.secure.Out <- m.Data()
}
}
}
// wrapInMsgs wraps a message
func (c *singleConn) wrapInMsgs() {
for {
select {
case <-c.ctx.Done():
return
case d, more := <-c.secure.In:
if !more {
return
}
c.msgpipe.Incoming <- msg.New(c.remote, d)
}
}
}
// waitToClose waits on the given context's Done before closing Conn.
func (c *singleConn) waitToClose() {
select {
......@@ -170,7 +128,6 @@ func (c *singleConn) waitToClose() {
if c.secure != nil { // may never have gotten here.
c.secure.Close()
}
close(c.msgpipe.Incoming)
}
// isClosed returns whether this Conn is open or closed.
......@@ -205,14 +162,14 @@ func (c *singleConn) RemotePeer() *peer.Peer {
return c.remote
}
// MsgIn returns a readable message channel
func (c *singleConn) MsgIn() <-chan msg.NetMessage {
return c.msgpipe.Incoming
// In returns a readable message channel
func (c *singleConn) In() <-chan []byte {
return c.secure.In
}
// MsgOut returns a writable message channel
func (c *singleConn) MsgOut() chan<- msg.NetMessage {
return c.msgpipe.Outgoing
// Out returns a writable message channel
func (c *singleConn) Out() chan<- []byte {
return c.secure.Out
}
// Dialer is an object that can open connections. We could have a "convenience"
......
......@@ -4,7 +4,6 @@ import (
"testing"
ci "github.com/jbenet/go-ipfs/crypto"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -50,8 +49,8 @@ func echo(ctx context.Context, c Conn) {
select {
case <-ctx.Done():
return
case m := <-c.MsgIn():
c.MsgOut() <- m
case m := <-c.In():
c.Out() <- m
}
}
}
......@@ -93,19 +92,19 @@ func TestDialer(t *testing.T) {
}
// fmt.Println("sending")
c.MsgOut() <- msg.New(p2, []byte("beep"))
c.MsgOut() <- msg.New(p2, []byte("boop"))
c.Out() <- []byte("beep")
c.Out() <- []byte("boop")
out := <-c.MsgIn()
out := <-c.In()
// fmt.Println("recving", string(out))
data := string(out.Data())
data := string(out)
if data != "beep" {
t.Error("unexpected conn output", data)
}
out = <-c.MsgIn()
data = string(out.Data())
if string(out.Data()) != "boop" {
out = <-c.In()
data = string(out)
if string(out) != "boop" {
t.Error("unexpected conn output", data)
}
......
package conn
import (
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// Map maps Keys (Peer.IDs) to Connections.
type Map map[u.Key]Conn
// Conn is a generic message-based Peer-to-Peer connection.
type Conn interface {
......@@ -16,11 +19,11 @@ type Conn interface {
// RemotePeer is the Peer on the remote side
RemotePeer() *peer.Peer
// MsgIn returns a readable message channel
MsgIn() <-chan msg.NetMessage
// In returns a readable message channel
In() <-chan []byte
// MsgOut returns a writable message channel
MsgOut() chan<- msg.NetMessage
// Out returns a writable message channel
Out() chan<- []byte
// Close ends the connection
Close() error
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论