提交 65b657e0 作者: Jeromy 提交者: Juan Batiz-Benet

stream back diagnostic responses as they are received

上级 f6278735
...@@ -4,11 +4,9 @@ ...@@ -4,11 +4,9 @@
package diagnostics package diagnostics
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
"time" "time"
...@@ -33,6 +31,8 @@ var log = util.Logger("diagnostics") ...@@ -33,6 +31,8 @@ var log = util.Logger("diagnostics")
// ProtocolDiag is the diagnostics protocol.ID // ProtocolDiag is the diagnostics protocol.ID
var ProtocolDiag protocol.ID = "/ipfs/diagnostics" var ProtocolDiag protocol.ID = "/ipfs/diagnostics"
var ErrAlreadyRunning = errors.New("diagnostic with that ID already running")
const ResponseTimeout = time.Second * 10 const ResponseTimeout = time.Second * 10
const HopTimeoutDecrement = time.Second * 2 const HopTimeoutDecrement = time.Second * 2
...@@ -159,86 +159,56 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) ...@@ -159,86 +159,56 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error)
return nil, fmt.Errorf("diagnostic from peers err: %s", err) return nil, fmt.Errorf("diagnostic from peers err: %s", err)
} }
var out []*DiagInfo
di := d.getDiagInfo() di := d.getDiagInfo()
out = append(out, di) out := []*DiagInfo{di}
for _, dpi := range dpeers { for dpi := range dpeers {
out = appendDiagnostics(out, dpi) out = append(out, dpi)
} }
return out, nil return out, nil
} }
func appendDiagnostics(cur []*DiagInfo, data []byte) []*DiagInfo { func decodeDiagJson(data []byte) (*DiagInfo, error) {
buf := bytes.NewBuffer(data) di := new(DiagInfo)
dec := json.NewDecoder(buf) err := json.Unmarshal(data, di)
for { if err != nil {
di := new(DiagInfo) return nil, err
err := dec.Decode(di)
if err != nil {
if err != io.EOF {
log.Errorf("error decoding DiagInfo: %v", err)
}
break
}
cur = append(cur, di)
} }
return cur
}
func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) ([][]byte, error) { return di, nil
timeout := pmes.GetTimeoutDuration() }
if timeout < 1 {
return nil, fmt.Errorf("timeout too short: %s", timeout)
}
ctx, _ = context.WithTimeout(ctx, timeout)
respdata := make(chan []byte) func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) (<-chan *DiagInfo, error) {
sendcount := 0 respdata := make(chan *DiagInfo)
wg := sync.WaitGroup{}
for p, _ := range peers { for p, _ := range peers {
wg.Add(1)
log.Debugf("Sending diagnostic request to peer: %s", p) log.Debugf("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p peer.ID) { go func(p peer.ID) {
defer wg.Done()
out, err := d.getDiagnosticFromPeer(ctx, p, pmes) out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil { if err != nil {
log.Errorf("getDiagnostic error: %v", err) log.Errorf("Error getting diagnostic from %s: %s", p, err)
respdata <- nil
return return
} }
respdata <- out for d := range out {
respdata <- d
}
}(p) }(p)
} }
outall := make([][]byte, 0, len(peers)) go func() {
for i := 0; i < sendcount; i++ { wg.Wait()
out := <-respdata close(respdata)
outall = append(outall, out) }()
}
return outall, nil
}
// TODO: this method no longer needed.
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) {
rpmes, err := d.sendRequest(ctx, p, mes)
if err != nil {
return nil, err
}
return rpmes.GetData(), nil
}
func newMessage(diagID string) *pb.Message { return respdata, nil
pmes := new(pb.Message)
pmes.DiagID = proto.String(diagID)
return pmes
} }
func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (<-chan *DiagInfo, error) {
s, err := d.host.NewStream(ProtocolDiag, p) s, err := d.host.NewStream(ProtocolDiag, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer s.Close()
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
...@@ -251,51 +221,57 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Messa ...@@ -251,51 +221,57 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Messa
return nil, err return nil, err
} }
rpmes := new(pb.Message) out := make(chan *DiagInfo)
if err := r.ReadMsg(rpmes); err != nil { go func() {
return nil, err
}
if rpmes == nil {
return nil, errors.New("no response to request")
}
rtt := time.Since(start) defer func() {
log.Infof("diagnostic request took: %s", rtt.String()) close(out)
return rpmes, nil s.Close()
} rtt := time.Since(start)
log.Infof("diagnostic request took: %s", rtt.String())
}()
func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message, error) { for {
log.Debugf("HandleDiagnostic from %s for id = %s", p, util.Key(pmes.GetDiagID()).B58String()) rpmes := new(pb.Message)
resp := newMessage(pmes.GetDiagID()) if err := r.ReadMsg(rpmes); err != nil {
log.Errorf("Error reading diagnostic from stream: %s", err)
return
}
if rpmes == nil {
log.Error("Got no response back from diag request.")
return
}
// Make sure we havent already handled this request to prevent loops di, err := decodeDiagJson(rpmes.GetData())
d.diagLock.Lock() if err != nil {
_, found := d.diagMap[pmes.GetDiagID()] log.Error(err)
if found { return
d.diagLock.Unlock() }
return resp, nil
}
d.diagMap[pmes.GetDiagID()] = time.Now()
d.diagLock.Unlock()
di := d.getDiagInfo() select {
resp.Data = di.Marshal() case out <- di:
dpeers, err := d.getDiagnosticFromPeers(context.TODO(), d.getPeers(), pmes) case <-ctx.Done():
if err != nil { return
log.Errorf("diagnostic from peers err: %s", err) }
} else {
for _, b := range dpeers {
resp.Data = append(resp.Data, b...) // concatenate them all.
} }
}
return resp, nil }()
return out, nil
}
func newMessage(diagID string) *pb.Message {
pmes := new(pb.Message)
pmes.DiagID = proto.String(diagID)
return pmes
} }
func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
r := ggio.NewDelimitedReader(s, 32768) // maxsize cr := ctxutil.NewReader(ctx, s)
w := ggio.NewDelimitedWriter(s) cw := ctxutil.NewWriter(ctx, s)
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize
w := ggio.NewDelimitedWriter(cw)
// deserialize msg // deserialize msg
pmes := new(pb.Message) pmes := new(pb.Message)
...@@ -308,25 +284,51 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { ...@@ -308,25 +284,51 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
log.Infof("[peer: %s] Got message from [%s]\n", log.Infof("[peer: %s] Got message from [%s]\n",
d.self.Pretty(), s.Conn().RemotePeer()) d.self.Pretty(), s.Conn().RemotePeer())
// dispatch handler. // Make sure we havent already handled this request to prevent loops
p := s.Conn().RemotePeer() if err := d.startDiag(pmes.GetDiagID()); err != nil {
rpmes, err := d.handleDiagnostic(p, pmes)
if err != nil {
log.Errorf("handleDiagnostic error: %s", err)
return nil return nil
} }
// if nil response, return it before serializing resp := newMessage(pmes.GetDiagID())
if rpmes == nil { resp.Data = d.getDiagInfo().Marshal()
return nil if err := w.WriteMsg(resp); err != nil {
log.Errorf("Failed to write protobuf message over stream: %s", err)
return err
} }
// serialize + send response msg timeout := pmes.GetTimeoutDuration()
if err := w.WriteMsg(rpmes); err != nil { if timeout < HopTimeoutDecrement {
log.Errorf("Failed to encode protobuf message: %v", err) return fmt.Errorf("timeout too short: %s", timeout)
return nil }
ctx, _ = context.WithTimeout(ctx, timeout)
pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement)
dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes)
if err != nil {
log.Errorf("diagnostic from peers err: %s", err)
return err
} }
for b := range dpeers {
resp := newMessage(pmes.GetDiagID())
resp.Data = b.Marshal()
if err := w.WriteMsg(resp); err != nil {
log.Errorf("Failed to write protobuf message over stream: %s", err)
return err
}
}
return nil
}
func (d *Diagnostics) startDiag(id string) error {
d.diagLock.Lock()
_, found := d.diagMap[id]
if found {
d.diagLock.Unlock()
return ErrAlreadyRunning
}
d.diagMap[id] = time.Now()
d.diagLock.Unlock()
return nil return nil
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论