提交 438ffa1d 作者: Brian Tiger Chow 提交者: Jeromy

feat(util) ForwardNBlocks

License: MIT
Signed-off-by: 's avatarBrian Tiger Chow <brian@perfmode.com>
上级 ed450992
package async
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/blocks"
)
// ForwardN forwards up to |num| blocks to the returned channel.
func ForwardN(ctx context.Context, in <-chan *blocks.Block, num int) <-chan *blocks.Block {
out := make(chan *blocks.Block)
go func() {
defer close(out)
for i := 0; i < num; i++ {
select {
case block, ok := <-in:
if !ok {
return
}
select {
case out <- block:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
package async
import (
"testing"
"code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/blocks"
)
func TestForwardTwo(t *testing.T) {
const n = 2
in := make(chan *blocks.Block, n)
ctx := context.Background()
out := ForwardN(ctx, in, n)
in <- blocks.NewBlock([]byte("one"))
in <- blocks.NewBlock([]byte("two"))
_ = <-out // 1
_ = <-out // 2
_, ok := <-out // closed
if !ok {
return
}
t.Fail()
}
func TestCloseInput(t *testing.T) {
const n = 2
in := make(chan *blocks.Block, 0)
ctx := context.Background()
out := ForwardN(ctx, in, n)
close(in)
_, ok := <-out // closed
if !ok {
return
}
t.Fatal("input channel closed, but output channel not")
}
func TestContextClosedWhenBlockingOnInput(t *testing.T) {
const n = 1 // but we won't ever send a block
ctx, cancel := context.WithCancel(context.Background())
out := ForwardN(ctx, make(chan *blocks.Block), n)
cancel() // before sending anything
_, ok := <-out
if !ok {
return
}
t.Fail()
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论