提交 ec481b5a 作者: Brian Tiger Chow

refactor(net/mux) move proto to internal pb package

上级 e3a9a11a
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"sync" "sync"
msg "github.com/jbenet/go-ipfs/net/message" msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -13,6 +14,12 @@ import ( ...@@ -13,6 +14,12 @@ import (
var log = u.Logger("muxer") var log = u.Logger("muxer")
var (
ProtocolID_Routing = pb.ProtocolID_Routing
ProtocolID_Exchange = pb.ProtocolID_Exchange
ProtocolID_Diagnostic = pb.ProtocolID_Diagnostic
)
// Protocol objects produce + consume raw data. They are added to the Muxer // Protocol objects produce + consume raw data. They are added to the Muxer
// with a ProtocolID, which is added to outgoing payloads. Muxer properly // with a ProtocolID, which is added to outgoing payloads. Muxer properly
// encapsulates and decapsulates when interfacing with its Protocols. The // encapsulates and decapsulates when interfacing with its Protocols. The
...@@ -22,7 +29,7 @@ type Protocol interface { ...@@ -22,7 +29,7 @@ type Protocol interface {
} }
// ProtocolMap maps ProtocolIDs to Protocols. // ProtocolMap maps ProtocolIDs to Protocols.
type ProtocolMap map[ProtocolID]Protocol type ProtocolMap map[pb.ProtocolID]Protocol
// Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing // Muxer is a simple multiplexor that reads + writes to Incoming and Outgoing
// channels. It multiplexes various protocols, wrapping and unwrapping data // channels. It multiplexes various protocols, wrapping and unwrapping data
...@@ -107,7 +114,7 @@ func (m *Muxer) Stop() { ...@@ -107,7 +114,7 @@ func (m *Muxer) Stop() {
} }
// AddProtocol adds a Protocol with given ProtocolID to the Muxer. // AddProtocol adds a Protocol with given ProtocolID to the Muxer.
func (m *Muxer) AddProtocol(p Protocol, pid ProtocolID) error { func (m *Muxer) AddProtocol(p Protocol, pid pb.ProtocolID) error {
if _, found := m.Protocols[pid]; found { if _, found := m.Protocols[pid]; found {
return errors.New("Another protocol already using this ProtocolID") return errors.New("Another protocol already using this ProtocolID")
} }
...@@ -170,7 +177,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { ...@@ -170,7 +177,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
// handleOutgoingMessages consumes the messages on the proto.Outgoing channel, // handleOutgoingMessages consumes the messages on the proto.Outgoing channel,
// wraps them and sends them out. // wraps them and sends them out.
func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) { func (m *Muxer) handleOutgoingMessages(pid pb.ProtocolID, proto Protocol) {
defer m.wg.Done() defer m.wg.Done()
for { for {
...@@ -188,7 +195,7 @@ func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) { ...@@ -188,7 +195,7 @@ func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) {
} }
// handleOutgoingMessage wraps out a message and sends it out the // handleOutgoingMessage wraps out a message and sends it out the
func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) { func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
data, err := wrapData(m1.Data(), pid) data, err := wrapData(m1.Data(), pid)
if err != nil { if err != nil {
log.Error("muxer serializing error: %v", err) log.Error("muxer serializing error: %v", err)
...@@ -208,9 +215,9 @@ func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) { ...@@ -208,9 +215,9 @@ func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) {
} }
} }
func wrapData(data []byte, pid ProtocolID) ([]byte, error) { func wrapData(data []byte, pid pb.ProtocolID) ([]byte, error) {
// Marshal // Marshal
pbm := new(PBProtocolMessage) pbm := new(pb.PBProtocolMessage)
pbm.ProtocolID = &pid pbm.ProtocolID = &pid
pbm.Data = data pbm.Data = data
b, err := proto.Marshal(pbm) b, err := proto.Marshal(pbm)
...@@ -221,9 +228,9 @@ func wrapData(data []byte, pid ProtocolID) ([]byte, error) { ...@@ -221,9 +228,9 @@ func wrapData(data []byte, pid ProtocolID) ([]byte, error) {
return b, nil return b, nil
} }
func unwrapData(data []byte) ([]byte, ProtocolID, error) { func unwrapData(data []byte) ([]byte, pb.ProtocolID, error) {
// Unmarshal // Unmarshal
pbm := new(PBProtocolMessage) pbm := new(pb.PBProtocolMessage)
err := proto.Unmarshal(data, pbm) err := proto.Unmarshal(data, pbm)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
msg "github.com/jbenet/go-ipfs/net/message" msg "github.com/jbenet/go-ipfs/net/message"
pb "github.com/jbenet/go-ipfs/net/mux/internal/pb"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -37,7 +38,7 @@ func testMsg(t *testing.T, m msg.NetMessage, data []byte) { ...@@ -37,7 +38,7 @@ func testMsg(t *testing.T, m msg.NetMessage, data []byte) {
} }
} }
func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) { func testWrappedMsg(t *testing.T, m msg.NetMessage, pid pb.ProtocolID, data []byte) {
data2, pid2, err := unwrapData(m.Data()) data2, pid2, err := unwrapData(m.Data())
if err != nil { if err != nil {
t.Error(err) t.Error(err)
...@@ -57,8 +58,8 @@ func TestSimpleMuxer(t *testing.T) { ...@@ -57,8 +58,8 @@ func TestSimpleMuxer(t *testing.T) {
// setup // setup
p1 := &TestProtocol{Pipe: msg.NewPipe(10)} p1 := &TestProtocol{Pipe: msg.NewPipe(10)}
p2 := &TestProtocol{Pipe: msg.NewPipe(10)} p2 := &TestProtocol{Pipe: msg.NewPipe(10)}
pid1 := ProtocolID_Test pid1 := pb.ProtocolID_Test
pid2 := ProtocolID_Routing pid2 := pb.ProtocolID_Routing
mux1 := NewMuxer(ProtocolMap{ mux1 := NewMuxer(ProtocolMap{
pid1: p1, pid1: p1,
pid2: p2, pid2: p2,
...@@ -108,8 +109,8 @@ func TestSimultMuxer(t *testing.T) { ...@@ -108,8 +109,8 @@ func TestSimultMuxer(t *testing.T) {
// setup // setup
p1 := &TestProtocol{Pipe: msg.NewPipe(10)} p1 := &TestProtocol{Pipe: msg.NewPipe(10)}
p2 := &TestProtocol{Pipe: msg.NewPipe(10)} p2 := &TestProtocol{Pipe: msg.NewPipe(10)}
pid1 := ProtocolID_Test pid1 := pb.ProtocolID_Test
pid2 := ProtocolID_Identify pid2 := pb.ProtocolID_Identify
mux1 := NewMuxer(ProtocolMap{ mux1 := NewMuxer(ProtocolMap{
pid1: p1, pid1: p1,
pid2: p2, pid2: p2,
...@@ -127,7 +128,7 @@ func TestSimultMuxer(t *testing.T) { ...@@ -127,7 +128,7 @@ func TestSimultMuxer(t *testing.T) {
counts := [2][2][2]int{} counts := [2][2][2]int{}
// run producers at every end sending incrementing messages // run producers at every end sending incrementing messages
produceOut := func(pid ProtocolID, size int) { produceOut := func(pid pb.ProtocolID, size int) {
limiter := time.Tick(speed) limiter := time.Tick(speed)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
<-limiter <-limiter
...@@ -139,7 +140,7 @@ func TestSimultMuxer(t *testing.T) { ...@@ -139,7 +140,7 @@ func TestSimultMuxer(t *testing.T) {
} }
} }
produceIn := func(pid ProtocolID, size int) { produceIn := func(pid pb.ProtocolID, size int) {
limiter := time.Tick(speed) limiter := time.Tick(speed)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
<-limiter <-limiter
...@@ -175,7 +176,7 @@ func TestSimultMuxer(t *testing.T) { ...@@ -175,7 +176,7 @@ func TestSimultMuxer(t *testing.T) {
} }
} }
consumeIn := func(pid ProtocolID) { consumeIn := func(pid pb.ProtocolID) {
for { for {
select { select {
case m := <-mux1.Protocols[pid].GetPipe().Incoming: case m := <-mux1.Protocols[pid].GetPipe().Incoming:
...@@ -217,8 +218,8 @@ func TestStopping(t *testing.T) { ...@@ -217,8 +218,8 @@ func TestStopping(t *testing.T) {
// setup // setup
p1 := &TestProtocol{Pipe: msg.NewPipe(10)} p1 := &TestProtocol{Pipe: msg.NewPipe(10)}
p2 := &TestProtocol{Pipe: msg.NewPipe(10)} p2 := &TestProtocol{Pipe: msg.NewPipe(10)}
pid1 := ProtocolID_Test pid1 := pb.ProtocolID_Test
pid2 := ProtocolID_Identify pid2 := pb.ProtocolID_Identify
mux1 := NewMuxer(ProtocolMap{ mux1 := NewMuxer(ProtocolMap{
pid1: p1, pid1: p1,
pid2: p2, pid2: p2,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论