提交 b9e5cfaa 作者: Jeromy

merkledag FetchGraph and EnumerateChildren

This commit improves (fixes) the FetchGraph call for recursively
fetching every descendant node of a given merkledag node. This operation
should be the simplest way of ensuring that you have replicated a dag
locally.

This commit also implements a method in the merkledag package called
EnumerateChildren, this method is used to get a set of the keys of every
descendant node of the given node. All keys found are noted in the
passed in KeySet, which may in the future be implemented on disk to
avoid excessive memory consumption.

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 da0d48e6
...@@ -90,7 +90,7 @@ type IpfsNode struct { ...@@ -90,7 +90,7 @@ type IpfsNode struct {
// Services // Services
Peerstore peer.Peerstore // storage for other Peer instances Peerstore peer.Peerstore // storage for other Peer instances
Blockstore bstore.Blockstore // the block store (lower level) Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks. Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects. DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system Resolver *path.Resolver // the path resolution system
......
...@@ -3,7 +3,6 @@ package merkledag ...@@ -3,7 +3,6 @@ package merkledag
import ( import (
"fmt" "fmt"
"sync"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
...@@ -121,41 +120,86 @@ func (n *dagService) Remove(nd *Node) error { ...@@ -121,41 +120,86 @@ func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteBlock(k) return n.Blocks.DeleteBlock(k)
} }
// FetchGraph asynchronously fetches all nodes that are children of the given // FetchGraph fetches all nodes that are children of the given node
// node, and returns a channel that may be waited upon for the fetch to complete func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { toprocess := make(chan []key.Key, 8)
log.Warning("Untested.") nodes := make(chan *Node, 8)
var wg sync.WaitGroup errs := make(chan error, 1)
done := make(chan struct{})
for _, l := range root.Links { ctx, cancel := context.WithCancel(ctx)
wg.Add(1) defer cancel()
go func(lnk *Link) { defer close(toprocess)
// Signal child is done on way out go fetchNodes(ctx, serv, toprocess, nodes, errs)
defer wg.Done()
select { nodes <- root
case <-ctx.Done(): live := 1
return
for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
} }
nd, err := lnk.GetNode(ctx, serv) var keys []key.Key
if err != nil { for _, lnk := range nd.Links {
log.Debug(err) keys = append(keys, key.Key(lnk.Hash))
return
} }
keys = dedupeKeys(keys)
// Wait for children to finish // keep track of open request, when zero, we're done
<-FetchGraph(ctx, nd, serv) live += len(keys) - 1
}(l)
if live == 0 {
return nil
}
if len(keys) > 0 {
select {
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
}
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
} }
}
go func() { func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
wg.Wait() defer close(out)
done <- struct{}{} for {
}() select {
case ks, ok := <-in:
if !ok {
return
}
return done ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}
select {
case out <- nd:
case <-ctx.Done():
return
}
}(g)
}
}
}
} }
// FindLinks searches this nodes links for the given key, // FindLinks searches this nodes links for the given key,
...@@ -318,3 +362,24 @@ func (t *Batch) Commit() error { ...@@ -318,3 +362,24 @@ func (t *Batch) Commit() error {
t.size = 0 t.size = 0
return err return err
} }
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
for _, lnk := range root.Links {
k := key.Key(lnk.Hash)
if !set.Has(k) {
set.Add(k)
child, err := ds.Get(ctx, k)
if err != nil {
return err
}
err = EnumerateChildren(ctx, ds, child, set)
if err != nil {
return err
}
}
}
return nil
}
...@@ -130,7 +130,7 @@ func SubtestNodeStat(t *testing.T, n *Node) { ...@@ -130,7 +130,7 @@ func SubtestNodeStat(t *testing.T, n *Node) {
} }
if expected != *actual { if expected != *actual {
t.Errorf("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual) t.Error("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual)
} else { } else {
fmt.Printf("n.Stat correct: %s\n", actual) fmt.Printf("n.Stat correct: %s\n", actual)
} }
...@@ -232,7 +232,6 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { ...@@ -232,7 +232,6 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
} }
} }
} }
func TestRecursiveAdd(t *testing.T) { func TestRecursiveAdd(t *testing.T) {
a := &Node{Data: []byte("A")} a := &Node{Data: []byte("A")}
b := &Node{Data: []byte("B")} b := &Node{Data: []byte("B")}
...@@ -298,3 +297,79 @@ func TestCantGet(t *testing.T) { ...@@ -298,3 +297,79 @@ func TestCantGet(t *testing.T) {
t.Fatal("expected err not found, got: ", err) t.Fatal("expected err not found, got: ", err)
} }
} }
func TestFetchGraph(t *testing.T) {
bsi := bstest.Mocks(t, 1)[0]
ds := NewDAGService(bsi)
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, ds, spl, nil)
if err != nil {
t.Fatal(err)
}
err = FetchGraph(context.TODO(), root, ds)
if err != nil {
t.Fatal(err)
}
}
func TestFetchGraphOther(t *testing.T) {
var dservs []DAGService
for _, bsi := range bstest.Mocks(t, 2) {
dservs = append(dservs, NewDAGService(bsi))
}
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
if err != nil {
t.Fatal(err)
}
err = FetchGraph(context.TODO(), root, dservs[1])
if err != nil {
t.Fatal(err)
}
}
func TestEnumerateChildren(t *testing.T) {
bsi := bstest.Mocks(t, 1)
ds := NewDAGService(bsi[0])
spl := &chunk.SizeSplitter{512}
read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024)
root, err := imp.BuildDagFromReader(read, ds, spl, nil)
if err != nil {
t.Fatal(err)
}
ks := key.NewKeySet()
err = EnumerateChildren(context.Background(), ds, root, ks)
if err != nil {
t.Fatal(err)
}
var traverse func(n *Node)
traverse = func(n *Node) {
// traverse dag and check
for _, lnk := range n.Links {
k := key.Key(lnk.Hash)
if !ks.Has(k) {
t.Fatal("missing key in set!")
}
child, err := ds.Get(context.Background(), k)
if err != nil {
t.Fatal(err)
}
traverse(child)
}
}
traverse(root)
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论