提交 59858540 作者: Brian Tiger Chow

fix: return pointer

@whyrusleeping
上级 60e288ed
...@@ -281,7 +281,10 @@ func (bs *bitswap) taskWorker(ctx context.Context) { ...@@ -281,7 +281,10 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case envelope := <-nextEnvelope: case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
bs.send(ctx, envelope.Peer, envelope.Message) bs.send(ctx, envelope.Peer, envelope.Message)
} }
} }
......
...@@ -71,7 +71,7 @@ type Engine struct { ...@@ -71,7 +71,7 @@ type Engine struct {
// outbox contains outgoing messages to peers. This is owned by the // outbox contains outgoing messages to peers. This is owned by the
// taskWorker goroutine // taskWorker goroutine
outbox chan (<-chan Envelope) outbox chan (<-chan *Envelope)
bs bstore.Blockstore bs bstore.Blockstore
...@@ -85,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ...@@ -85,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
ledgerMap: make(map[peer.ID]*ledger), ledgerMap: make(map[peer.ID]*ledger),
bs: bs, bs: bs,
peerRequestQueue: newPRQ(), peerRequestQueue: newPRQ(),
outbox: make(chan (<-chan Envelope), outboxChanBuffer), outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}), workSignal: make(chan struct{}),
} }
go e.taskWorker(ctx) go e.taskWorker(ctx)
...@@ -95,7 +95,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ...@@ -95,7 +95,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
func (e *Engine) taskWorker(ctx context.Context) { func (e *Engine) taskWorker(ctx context.Context) {
defer close(e.outbox) // because taskWorker uses the channel exclusively defer close(e.outbox) // because taskWorker uses the channel exclusively
for { for {
oneTimeUse := make(chan Envelope, 1) // buffer to prevent blocking oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
...@@ -108,7 +108,7 @@ func (e *Engine) taskWorker(ctx context.Context) { ...@@ -108,7 +108,7 @@ func (e *Engine) taskWorker(ctx context.Context) {
close(oneTimeUse) close(oneTimeUse)
return // ctx cancelled return // ctx cancelled
} }
oneTimeUse <- *envelope // buffered. won't block oneTimeUse <- envelope // buffered. won't block
close(oneTimeUse) close(oneTimeUse)
} }
} }
...@@ -141,7 +141,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { ...@@ -141,7 +141,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
} }
// Outbox returns a channel of one-time use Envelope channels. // Outbox returns a channel of one-time use Envelope channels.
func (e *Engine) Outbox() <-chan (<-chan Envelope) { func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
return e.outbox return e.outbox
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论