提交 e431f35a 作者: Jeromy

update multistream naming of lazyconn

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 2c4eb609
...@@ -344,7 +344,7 @@ ...@@ -344,7 +344,7 @@
}, },
{ {
"ImportPath": "github.com/whyrusleeping/go-multistream", "ImportPath": "github.com/whyrusleeping/go-multistream",
"Rev": "30c7a81b6c568654147bf6e106870c5d64ccebc8" "Rev": "31bb014803a6eba2261bda5593e42c016a5f33bb"
}, },
{ {
"ImportPath": "github.com/whyrusleeping/multiaddr-filter", "ImportPath": "github.com/whyrusleeping/multiaddr-filter",
......
...@@ -6,7 +6,16 @@ import ( ...@@ -6,7 +6,16 @@ import (
"sync" "sync"
) )
func NewLazyHandshakeConn(c io.ReadWriteCloser, proto string) io.ReadWriteCloser { type Multistream interface {
io.ReadWriteCloser
Protocol() string
}
func NewMSSelect(c io.ReadWriteCloser, proto string) Multistream {
return NewMultistream(NewMultistream(c, ProtocolID), proto)
}
func NewMultistream(c io.ReadWriteCloser, proto string) Multistream {
return &lazyConn{ return &lazyConn{
proto: proto, proto: proto,
con: c, con: c,
...@@ -30,6 +39,10 @@ type lazyConn struct { ...@@ -30,6 +39,10 @@ type lazyConn struct {
con io.ReadWriteCloser con io.ReadWriteCloser
} }
func (l *lazyConn) Protocol() string {
return l.proto
}
func (l *lazyConn) Read(b []byte) (int, error) { func (l *lazyConn) Read(b []byte) (int, error) {
if !l.rhandshake { if !l.rhandshake {
go l.writeHandshake() go l.writeHandshake()
...@@ -58,20 +71,8 @@ func (l *lazyConn) readHandshake() error { ...@@ -58,20 +71,8 @@ func (l *lazyConn) readHandshake() error {
} }
l.rhsync = true l.rhsync = true
// read multistream version
tok, err := ReadNextToken(l.con)
if err != nil {
l.rerr = err
return err
}
if tok != ProtocolID {
l.rerr = fmt.Errorf("multistream protocol mismatch ( %s != %s )", tok, ProtocolID)
return l.rerr
}
// read protocol // read protocol
tok, err = ReadNextToken(l.con) tok, err := ReadNextToken(l.con)
if err != nil { if err != nil {
l.rerr = err l.rerr = err
return err return err
...@@ -95,13 +96,7 @@ func (l *lazyConn) writeHandshake() error { ...@@ -95,13 +96,7 @@ func (l *lazyConn) writeHandshake() error {
l.whsync = true l.whsync = true
err := delimWrite(l.con, []byte(ProtocolID)) err := delimWrite(l.con, []byte(l.proto))
if err != nil {
l.werr = err
return err
}
err = delimWrite(l.con, []byte(l.proto))
if err != nil { if err != nil {
l.werr = err l.werr = err
return err return err
......
...@@ -126,8 +126,8 @@ func TestLazyConns(t *testing.T) { ...@@ -126,8 +126,8 @@ func TestLazyConns(t *testing.T) {
mux.AddHandler("/b", nil) mux.AddHandler("/b", nil)
mux.AddHandler("/c", nil) mux.AddHandler("/c", nil)
la := NewLazyHandshakeConn(a, "/c") la := NewMSSelect(a, "/c")
lb := NewLazyHandshakeConn(b, "/c") lb := NewMSSelect(b, "/c")
verifyPipe(t, la, lb) verifyPipe(t, la, lb)
} }
...@@ -159,7 +159,7 @@ func TestLazyAndMux(t *testing.T) { ...@@ -159,7 +159,7 @@ func TestLazyAndMux(t *testing.T) {
close(done) close(done)
}() }()
lb := NewLazyHandshakeConn(b, "/c") lb := NewMSSelect(b, "/c")
// do a write to push the handshake through // do a write to push the handshake through
_, err := lb.Write([]byte("hello")) _, err := lb.Write([]byte("hello"))
...@@ -202,7 +202,7 @@ func TestLazyAndMuxWrite(t *testing.T) { ...@@ -202,7 +202,7 @@ func TestLazyAndMuxWrite(t *testing.T) {
close(done) close(done)
}() }()
lb := NewLazyHandshakeConn(b, "/c") lb := NewMSSelect(b, "/c")
// do a write to push the handshake through // do a write to push the handshake through
msg := make([]byte, 5) msg := make([]byte, 5)
......
...@@ -170,7 +170,7 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { ...@@ -170,7 +170,7 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
logStream := mstream.WrapStream(s, pid, h.bwc) logStream := mstream.WrapStream(s, pid, h.bwc)
lzcon := msmux.NewLazyHandshakeConn(logStream, string(pid)) lzcon := msmux.NewMSSelect(logStream, string(pid))
return &streamWrapper{ return &streamWrapper{
Stream: logStream, Stream: logStream,
rw: lzcon, rw: lzcon,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论