提交 3a6b6c69 作者: Jeromy

cleanup, use a workgroup over channels

上级 9835c1e3
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
package bitswap package bitswap
import ( import (
"sync"
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -180,8 +181,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -180,8 +181,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
} }
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
done := make(chan struct{}) wg := sync.WaitGroup{}
for _, k := range ks { for _, k := range ks {
wg.Add(1)
go func(k u.Key) { go func(k u.Key) {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest) providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
...@@ -189,12 +191,10 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { ...@@ -189,12 +191,10 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
if err != nil { if err != nil {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
done <- struct{}{} wg.Done()
}(k) }(k)
} }
for _ = range ks { wg.Wait()
<-done
}
} }
// TODO ensure only one active request per key // TODO ensure only one active request per key
...@@ -255,6 +255,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -255,6 +255,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.routing.Provide(ctx, blk.Key()) return bs.routing.Provide(ctx, blk.Key())
} }
// receiveBlock handles storing the block in the blockstore and calling HasBlock
func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) { func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
// TODO verify blocks? // TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil { if err := bs.blockstore.Put(block); err != nil {
......
...@@ -28,7 +28,10 @@ type DAGService interface { ...@@ -28,7 +28,10 @@ type DAGService interface {
AddRecursive(*Node) error AddRecursive(*Node) error
Get(u.Key) (*Node, error) Get(u.Key) (*Node, error)
Remove(*Node) error Remove(*Node) error
GetKeysAsync(context.Context, *Node) <-chan *Node
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) <-chan *Node
} }
func NewDAGService(bs *bserv.BlockService) DAGService { func NewDAGService(bs *bserv.BlockService) DAGService {
...@@ -298,10 +301,10 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) { ...@@ -298,10 +301,10 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
return -1, u.ErrNotFound return -1, u.ErrNotFound
} }
// GetKeysAsync will fill out all of the links of the given Node. // GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive // It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order. // all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetKeysAsync(ctx context.Context, root *Node) <-chan *Node { func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node) sig := make(chan *Node)
go func() { go func() {
var keys []u.Key var keys []u.Key
......
...@@ -40,7 +40,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { ...@@ -40,7 +40,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
case ftpb.Data_File: case ftpb.Data_File:
var fetchChan <-chan *mdag.Node var fetchChan <-chan *mdag.Node
if serv != nil { if serv != nil {
fetchChan = serv.GetKeysAsync(context.TODO(), n) fetchChan = serv.GetDAG(context.TODO(), n)
} }
return &DagReader{ return &DagReader{
node: n, node: n,
...@@ -62,6 +62,7 @@ func (dr *DagReader) precalcNextBuf() error { ...@@ -62,6 +62,7 @@ func (dr *DagReader) precalcNextBuf() error {
var nxt *mdag.Node var nxt *mdag.Node
var ok bool var ok bool
// TODO: require non-nil dagservice, use offline bitswap exchange
if dr.serv == nil { if dr.serv == nil {
// Only used when fetchChan is nil, // Only used when fetchChan is nil,
// which only happens when passed in a nil dagservice // which only happens when passed in a nil dagservice
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论