提交 46d626a6 作者: Jeromy

cleanup stream reuse

License: MIT
Signed-off-by: 's avatarJeromy <why@ipfs.io>
上级 3e9717aa
...@@ -126,6 +126,8 @@ type messageSender struct { ...@@ -126,6 +126,8 @@ type messageSender struct {
lk sync.Mutex lk sync.Mutex
p peer.ID p peer.ID
dht *IpfsDHT dht *IpfsDHT
singleMes int
} }
func (dht *IpfsDHT) newMessageSender(p peer.ID) *messageSender { func (dht *IpfsDHT) newMessageSender(p peer.ID) *messageSender {
...@@ -156,12 +158,40 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro ...@@ -156,12 +158,40 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
return err return err
} }
if err := ms.writeMessage(pmes); err != nil {
return err
}
if ms.singleMes > 3 {
ms.s.Close()
ms.s = nil
}
return nil
}
func (ms *messageSender) writeMessage(pmes *pb.Message) error {
err := ms.w.WriteMsg(pmes) err := ms.w.WriteMsg(pmes)
if err != nil { if err != nil {
// If the other side isnt expecting us to be reusing streams, we're gonna
// end up erroring here. To make sure things work seamlessly, lets retry once
// before continuing
log.Infof("error writing message: ", err)
ms.s.Close() ms.s.Close()
ms.s = nil ms.s = nil
if err := ms.prep(); err != nil {
return err return err
} }
if err := ms.w.WriteMsg(pmes); err != nil {
return err
}
// keep track of this happening. If it happens a few times, its
// likely we can assume the otherside will never support stream reuse
ms.singleMes++
}
return nil return nil
} }
...@@ -172,22 +202,23 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb ...@@ -172,22 +202,23 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
return nil, err return nil, err
} }
err := ms.w.WriteMsg(pmes) if err := ms.writeMessage(pmes); err != nil {
if err != nil {
ms.s.Close()
ms.s = nil
return nil, err return nil, err
} }
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
mes := new(pb.Message) mes := new(pb.Message)
err = ms.r.ReadMsg(mes) if err := ms.r.ReadMsg(mes); err != nil {
if err != nil {
ms.s.Close() ms.s.Close()
ms.s = nil ms.s = nil
return nil, err return nil, err
} }
if ms.singleMes > 3 {
ms.s.Close()
ms.s = nil
}
return mes, nil return mes, nil
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论