提交 4ef73ee2 作者: Steven Allen

bitswap: serialize connect/disconnect notifications over one channel.

Otherwise, we could end up receiving a disconnect notification before a connect
notification (and think we have a connection that we don't have).

License: MIT
Signed-off-by: 's avatarSteven Allen <steven@stebalien.com>
上级 28be1d4c
...@@ -18,8 +18,7 @@ import ( ...@@ -18,8 +18,7 @@ import (
type WantManager struct { type WantManager struct {
// sync channels for Run loop // sync channels for Run loop
incoming chan *wantSet incoming chan *wantSet
connect chan peer.ID // notification channel for new peers connecting connectEvent chan peerStatus // notification channel for peers connecting/disconnecting
disconnect chan peer.ID // notification channel for peers disconnecting
peerReqs chan chan []peer.ID // channel to request connected peers on peerReqs chan chan []peer.ID // channel to request connected peers on
// synchronized by Run loop, only touch inside there // synchronized by Run loop, only touch inside there
...@@ -35,6 +34,11 @@ type WantManager struct { ...@@ -35,6 +34,11 @@ type WantManager struct {
sentHistogram metrics.Histogram sentHistogram metrics.Histogram
} }
type peerStatus struct {
connect bool
peer peer.ID
}
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
...@@ -43,8 +47,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana ...@@ -43,8 +47,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
" this bitswap").Histogram(metricsBuckets) " this bitswap").Histogram(metricsBuckets)
return &WantManager{ return &WantManager{
incoming: make(chan *wantSet, 10), incoming: make(chan *wantSet, 10),
connect: make(chan peer.ID, 10), connectEvent: make(chan peerStatus, 10),
disconnect: make(chan peer.ID, 10),
peerReqs: make(chan chan []peer.ID), peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue), peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(), wl: wantlist.NewThreadSafe(),
...@@ -270,22 +273,22 @@ func (mq *msgQueue) openSender(ctx context.Context) error { ...@@ -270,22 +273,22 @@ func (mq *msgQueue) openSender(ctx context.Context) error {
func (pm *WantManager) Connected(p peer.ID) { func (pm *WantManager) Connected(p peer.ID) {
select { select {
case pm.connect <- p: case pm.connectEvent <- peerStatus{peer: p, connect: true}:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
func (pm *WantManager) Disconnected(p peer.ID) { func (pm *WantManager) Disconnected(p peer.ID) {
select { select {
case pm.disconnect <- p: case pm.connectEvent <- peerStatus{peer: p, connect: false}:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// TODO: use goprocess here once i trust it // TODO: use goprocess here once i trust it
func (pm *WantManager) Run() { func (pm *WantManager) Run() {
tock := time.NewTicker(rebroadcastDelay.Get()) // NOTE: Do not open any streams or connections from anywhere in this
defer tock.Stop() // event loop. Really, just don't do anything likely to block.
for { for {
select { select {
case ws := <-pm.incoming: case ws := <-pm.incoming:
...@@ -329,10 +332,12 @@ func (pm *WantManager) Run() { ...@@ -329,10 +332,12 @@ func (pm *WantManager) Run() {
} }
} }
case p := <-pm.connect: case p := <-pm.connectEvent:
pm.startPeerHandler(p) if p.connect {
case p := <-pm.disconnect: pm.startPeerHandler(p.peer)
pm.stopPeerHandler(p) } else {
pm.stopPeerHandler(p.peer)
}
case req := <-pm.peerReqs: case req := <-pm.peerReqs:
var peers []peer.ID var peers []peer.ID
for p := range pm.peers { for p := range pm.peers {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论