提交 9d304768 作者: Juan Batiz-Benet

stash

上级 bd636e1e
......@@ -7,9 +7,10 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
ctxgroup "github.com/jbenet/go-ctxgroup"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
router "github.com/jbenet/go-router"
)
var log = u.Logger("service")
......@@ -40,11 +41,12 @@ type Sender interface {
// Service is an interface for a net resource with both outgoing (sender) and
// incomig (SetHandler) requests.
type Service interface {
Sender
ctxc.ContextCloser
Sender // can use it to send out msgs
router.Node // it is a Node in the net topology.
// GetPipe
GetPipe() *msg.Pipe
// SetUplink assigns the Node to send packets out
SetUplink(router.Node)
Uplink() router.Node
// SetHandler assigns the request Handler for this service.
SetHandler(Handler)
......@@ -62,30 +64,23 @@ type service struct {
Requests RequestMap
RequestsLock sync.RWMutex
// Message Pipe (connected to the outside world)
*msg.Pipe
ctxc.ContextCloser
// the connection to the outside world
uplink router.Node
uplinkLock sync.RWMutex
addr router.Address
}
// NewService creates a service object with given type ID and Handler
func NewService(ctx context.Context, h Handler) Service {
func NewService(addr router.Address, uplink router.Node, h Handler) Service {
s := &service{
Handler: h,
Requests: RequestMap{},
Pipe: msg.NewPipe(10),
ContextCloser: ctxc.NewContextCloser(ctx, nil),
Handler: h,
Requests: RequestMap{},
uplink: uplink,
addr: addr,
}
s.Children().Add(1)
go s.handleIncomingMessages()
return s
}
// GetPipe implements the mux.Protocol interface
func (s *service) GetPipe() *msg.Pipe {
return s.Pipe
}
// sendMessage sends a message out (actual leg work. SendMessage is to export w/o rid)
func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
......@@ -99,12 +94,21 @@ func (s *service) sendMessage(ctx context.Context, m msg.NetMessage, rid Request
// send message
m2 := msg.New(m.Peer(), data)
pkt := msg.Packet{
Src:
}
select {
case s.Outgoing <- m2:
case <-ctx.Done():
return ctx.Err()
}
pkt := msg.Packet{
Src: m.
}
return nil
}
......@@ -193,34 +197,57 @@ func (s *service) handleIncomingMessages() {
}
}
func (s *service) handleIncomingMessage(m msg.NetMessage) {
defer s.Children().Done()
func (s *service) handleIncomingMessage(pkt *msg.Packet) error {
// check the packet has a valid Context
ctx := pkt.Context
if ctx == nil {
return fmt.Errorf("service got pkt without valid Context")
}
// check the source is a peer
srcPeer, ok := pkt.Src.(peer.Peer)
if !ok {
return fmt.Errorf("service got pkt from non-Peer src: %v", pkt.Src)
}
// unwrap the incoming message
data, rid, err := unwrapData(m.Data())
data, rid, err := unwrapData(pkt.Data)
if err != nil {
log.Errorf("service de-serializing error: %v", err)
return
return fmt.Errorf("service de-serializing error: %v", err)
}
m2 := msg.New(m.Peer(), data)
// convert to msg.NetMessage, which the rest of the system expects.
m2 := msg.New(srcPeer, data)
// if it's a request (or has no RequestID), handle it
if rid == nil || rid.IsRequest() {
handler := s.GetHandler()
if handler == nil {
log.Errorf("service dropped msg: %v", m)
return // no handler, drop it.
log.Event()
return nil
// no handler, drop it.
}
// should this be "go HandleMessage ... ?"
r1 := handler.HandleMessage(s.Context(), m2)
// if handler gave us a response, send it back out!
// this go routine is developer friendliness to keep their stacks
// separate (and more readable) from the network goroutine. If
// problems arise and you'd like to see _the full_ stack of where
// this message is coming from, just remove the goroutine part.
response := make(chan msg.NetMessage)
go func() msg.NetMessage {
return handler.HandleMessage(ctx, m2)
}()
r1 := <-response
// Note: HandleMessage *must* respect context. We could co-opt it
// and do a select {} here on the context, BUT that would just drop
// a packet and free up the goroutine to return to the network. the
// problem is still there: the Service handler hasn't returned yet.
// if handler gave us a response, send it out!
if r1 != nil {
err := s.sendMessage(s.Context(), r1, rid.Response())
if err != nil {
log.Errorf("error sending response message: %v", err)
if err := s.sendMessage(ctx, r1, rid.Response()); err != nil {
return fmt.Errorf("error sending response message: %v", err)
}
}
return
......@@ -247,6 +274,21 @@ func (s *service) handleIncomingMessage(m msg.NetMessage) {
}
}
// Address is the router.Node address
func (s *service) Address() router.Address {
return s.addr
}
// HandlePacket implements router.Node
// service only receives packets in HandlePacket
func (s *service) HandlePacket(p router.Packet, from router.Node) error {
pkt, ok := p.(*msg.Packet)
if !ok {
return msg.ErrInvalidPayload
}
}
// SetHandler assigns the request Handler for this service.
func (s *service) SetHandler(h Handler) {
s.HandlerLock.Lock()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论