提交 479761ec 作者: Jeromy

change batch fetching methods of dagserv

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 2600a029
...@@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) { ...@@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) {
} }
var count int var count int
for i, ng := range rw.DAG.GetDAG(rw.Ctx, n) { for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) {
lk := key.Key(n.Links[i].Hash) lk := key.Key(n.Links[i].Hash)
if rw.skip(lk) { if rw.skip(lk) {
continue continue
......
...@@ -24,8 +24,7 @@ type DAGService interface { ...@@ -24,8 +24,7 @@ type DAGService interface {
// GetDAG returns, in order, all the single leve child // GetDAG returns, in order, all the single leve child
// nodes of the passed in node. // nodes of the passed in node.
GetDAG(context.Context, *Node) []NodeGetter GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error)
GetNodes(context.Context, []key.Key) []NodeGetter
Batch() *Batch Batch() *Batch
} }
...@@ -146,21 +145,52 @@ func FindLinks(links []key.Key, k key.Key, start int) []int { ...@@ -146,21 +145,52 @@ func FindLinks(links []key.Key, k key.Key, start int) []int {
return out return out
} }
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) {
out := make(chan *Node)
errs := make(chan error, 1)
blocks := ds.Blocks.GetBlocks(ctx, keys)
go func() {
defer close(out)
defer close(errs)
for {
select {
case b, ok := <-blocks:
if !ok {
return
}
nd, err := Decoded(b.Data)
if err != nil {
errs <- err
return
}
select {
case out <- nd:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, errs
}
// GetDAG will fill out all of the links of the given Node. // GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive // It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order. // all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter { func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
var keys []key.Key var keys []key.Key
for _, lnk := range root.Links { for _, lnk := range root.Links {
keys = append(keys, key.Key(lnk.Hash)) keys = append(keys, key.Key(lnk.Hash))
} }
return ds.GetNodes(ctx, keys) return GetNodes(ctx, ds, keys)
} }
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding // GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys // to the key with the same index as the passed in keys
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter { func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
// Early out if no work to do // Early out if no work to do
if len(keys) == 0 { if len(keys) == 0 {
...@@ -178,26 +208,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter ...@@ -178,26 +208,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys) nodechan, errchan := ds.GetMany(ctx, dedupedKeys)
for count := 0; count < len(keys); { for count := 0; count < len(keys); {
select { select {
case blk, ok := <-blkchan: case nd, ok := <-nodechan:
if !ok { if !ok {
return return
} }
nd, err := Decoded(blk.Data) k, err := nd.Key()
if err != nil { if err != nil {
// NB: can happen with improperly formatted input data log.Error("Failed to get node key: ", err)
log.Debug("Got back bad block!") continue
return
} }
is := FindLinks(keys, blk.Key(), 0)
is := FindLinks(keys, k, 0)
for _, i := range is { for _, i := range is {
count++ count++
sendChans[i] <- nd sendChans[i] <- nd
} }
case err := <-errchan:
log.Error("error fetching: ", err)
return
case <-ctx.Done(): case <-ctx.Done():
return return
} }
...@@ -389,9 +422,10 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out cha ...@@ -389,9 +422,10 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out cha
} }
for ks := range in { for ks := range in {
ng := ds.GetNodes(ctx, ks) ng := GetNodes(ctx, ds, ks)
for _, g := range ng { for _, g := range ng {
go get(g) go get(g)
} }
} }
} }
...@@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error { ...@@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
return err return err
} }
for i, ng := range w.Dag.GetDAG(w.ctx, nd) { for i, ng := range mdag.GetDAG(w.ctx, w.Dag, nd) {
child, err := ng.Get(w.ctx) child, err := ng.Get(w.ctx)
if err != nil { if err != nil {
return err return err
......
...@@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag ...@@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx) fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n) promises := mdag.GetDAG(fctx, serv, n)
return &DagReader{ return &DagReader{
node: n, node: n,
serv: serv, serv: serv,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论