提交 50b00eb9 作者: Jeromy

use @maybebtc's ForwardBlocks function

上级 d5e7fd67
...@@ -98,7 +98,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er ...@@ -98,7 +98,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
// GetBlocks gets a list of blocks asynchronously and returns through // GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel. // the returned channel.
// NB: No guarantees are made about order. // NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(parent context.Context, ks []u.Key) <-chan *blocks.Block { func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block {
out := make(chan *blocks.Block, 32) out := make(chan *blocks.Block, 32)
go func() { go func() {
var toFetch []u.Key var toFetch []u.Key
...@@ -112,7 +112,6 @@ func (s *BlockService) GetBlocks(parent context.Context, ks []u.Key) <-chan *blo ...@@ -112,7 +112,6 @@ func (s *BlockService) GetBlocks(parent context.Context, ks []u.Key) <-chan *blo
out <- block out <- block
} }
ctx, cancel := context.WithCancel(parent)
nblocks, err := s.Remote.GetBlocks(ctx, toFetch) nblocks, err := s.Remote.GetBlocks(ctx, toFetch)
if err != nil { if err != nil {
log.Errorf("Error with GetBlocks: %s", err) log.Errorf("Error with GetBlocks: %s", err)
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
async "github.com/jbenet/go-ipfs/util/async"
"github.com/jbenet/go-ipfs/util/eventlog" "github.com/jbenet/go-ipfs/util/eventlog"
) )
...@@ -128,35 +129,12 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. ...@@ -128,35 +129,12 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
promise := bs.notifications.Subscribe(ctx, keys...) promise := bs.notifications.Subscribe(ctx, keys...)
select { select {
case bs.batchRequests <- keys: case bs.batchRequests <- keys:
return pipeBlocks(ctx, promise, len(keys)), nil return async.ForwardN(ctx, promise, len(keys)), nil
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
} }
} }
func pipeBlocks(ctx context.Context, in <-chan *blocks.Block, count int) <-chan *blocks.Block {
out := make(chan *blocks.Block, 1)
go func() {
defer close(out)
for i := 0; i < count; i++ {
select {
case blk, ok := <-in:
if !ok {
return
}
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
if peers == nil { if peers == nil {
panic("Cant send wantlist to nil peerchan") panic("Cant send wantlist to nil peerchan")
......
...@@ -3,8 +3,11 @@ package async ...@@ -3,8 +3,11 @@ package async
import ( import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/blocks" "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
) )
var log = u.Logger("async")
// ForwardN forwards up to |num| blocks to the returned channel. // ForwardN forwards up to |num| blocks to the returned channel.
func ForwardN(ctx context.Context, in <-chan *blocks.Block, num int) <-chan *blocks.Block { func ForwardN(ctx context.Context, in <-chan *blocks.Block, num int) <-chan *blocks.Block {
out := make(chan *blocks.Block) out := make(chan *blocks.Block)
...@@ -14,6 +17,7 @@ func ForwardN(ctx context.Context, in <-chan *blocks.Block, num int) <-chan *blo ...@@ -14,6 +17,7 @@ func ForwardN(ctx context.Context, in <-chan *blocks.Block, num int) <-chan *blo
select { select {
case block, ok := <-in: case block, ok := <-in:
if !ok { if !ok {
log.Error("Forwarder exiting early!")
return // otherwise nil value is forwarded to output return // otherwise nil value is forwarded to output
} }
select { select {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论