提交 3899194c 作者: Kevin Atkinson

Add DAGService.GetLinks() method and use it in the GC and elsewhere.

This method will use the (also new) LinkService if it is available to
retrieving just the links for a MerkleDAG without necessary having to
retrieve the underlying block.

For now the main benefit is that the pinner will not break when a block
becomes invalid due to a change in the backing file.  This is possible
because the metadata for a block (that includes the Links) is stored
separately and thus always available even if the backing file changes.

License: MIT
Signed-off-by: 's avatarKevin Atkinson <k@kevina.org>
上级 67a1b3e1
......@@ -328,12 +328,11 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(ctx, k)
links, err := n.DAG.GetLinks(ctx, k)
if err != nil {
return nil, err
}
err = dag.EnumerateChildren(n.Context(), n.DAG, nd, set.Visit, false)
err = dag.EnumerateChildren(n.Context(), n.DAG, links, set.Visit, false)
if err != nil {
return nil, err
}
......
......@@ -94,14 +94,15 @@ type IpfsNode struct {
PrivateKey ic.PrivKey // the local node's private Key
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
LinkService merkledag.LinkService
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
// Online
PeerHost p2phost.Host // the network host (server+client)
......
......@@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots)
if err != nil {
return err
}
......@@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
if err != nil {
return nil, err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots)
if err != nil {
return nil, err
}
......
......@@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) {
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
......@@ -162,7 +162,7 @@ func TestAddGCLive(t *testing.T) {
}
set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG, root, set.Visit, false)
err = dag.EnumerateChildren(ctx, node.DAG, root.Links, set.Visit, false)
if err != nil {
t.Fatal(err)
}
......
......@@ -23,6 +23,10 @@ type DAGService interface {
Get(context.Context, *cid.Cid) (*Node, error)
Remove(*Node) error
// Return all links for a node, may be more effect than
// calling Get
GetLinks(context.Context, *cid.Cid) ([]*Link, error)
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
......@@ -30,8 +34,14 @@ type DAGService interface {
Batch() *Batch
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
// A LinkService returns the links for a node if they are available
// locally without having to retrieve the block from the datastore.
type LinkService interface {
Get(*cid.Cid) ([]*Link, error)
}
func NewDAGService(bs *bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}
// dagService is an IPFS Merkle DAG service.
......@@ -40,7 +50,8 @@ func NewDAGService(bs *bserv.BlockService) DAGService {
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type dagService struct {
Blocks *bserv.BlockService
Blocks *bserv.BlockService
LinkService LinkService
}
// Add adds a node to the dagService, storing the block in the BlockService
......@@ -93,6 +104,20 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
return res, nil
}
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
if n.LinkService != nil {
links, err := n.LinkService.Get(c)
if err == nil {
return links, nil
}
}
node, err := n.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links, nil
}
func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteObject(nd)
}
......@@ -366,11 +391,11 @@ func legacyCidFromLink(lnk *Link) *cid.Cid {
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range root.Links {
func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range links {
c := legacyCidFromLink(lnk)
if visit(c) {
child, err := ds.Get(ctx, c)
children, err := ds.GetLinks(ctx, c)
if err != nil {
if bestEffort && err == ErrNotFound {
continue
......@@ -378,7 +403,7 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit fun
return err
}
}
err = EnumerateChildren(ctx, ds, child, visit, bestEffort)
err = EnumerateChildren(ctx, ds, children, visit, bestEffort)
if err != nil {
return err
}
......
......@@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) {
offline_ds := NewDAGService(bs)
err = EnumerateChildren(context.Background(), offline_ds, root, func(_ *cid.Cid) bool { return true }, false)
err = EnumerateChildren(context.Background(), offline_ds, root.Links, func(_ *cid.Cid) bool { return true }, false)
if err != nil {
t.Fatal(err)
}
......@@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) {
}
set := cid.NewSet()
err = EnumerateChildren(context.Background(), ds, root, set.Visit, false)
err = EnumerateChildren(context.Background(), ds, root.Links, set.Visit, false)
if err != nil {
t.Fatal(err)
}
......
......@@ -24,11 +24,12 @@ var log = logging.Logger("gc")
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
unlocker := bs.GCLock()
bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
ds.LinkService = ls
gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots)
if err != nil {
......@@ -74,13 +75,13 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo
func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
for _, c := range roots {
set.Add(key.Key(c.Hash()))
nd, err := ds.Get(ctx, c)
links, err := ds.GetLinks(ctx, c)
if err != nil {
return err
}
// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool {
err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool {
k := key.Key(c.Hash())
seen := set.Has(k)
if seen {
......
......@@ -279,12 +279,12 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error
// Default is Indirect
for _, rc := range p.recursePin.Keys() {
rnd, err := p.dserv.Get(context.Background(), rc)
links, err := p.dserv.GetLinks(context.Background(), rc)
if err != nil {
return "", false, err
}
has, err := hasChild(p.dserv, rnd, k)
has, err := hasChild(p.dserv, links, k)
if err != nil {
return "", false, err
}
......@@ -317,11 +317,11 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) {
// Now walk all recursive pins to check for indirect pins
var checkChildren func(*cid.Cid, *cid.Cid) error
checkChildren = func(rk, parentKey *cid.Cid) error {
parent, err := p.dserv.Get(context.Background(), parentKey)
links, err := p.dserv.GetLinks(context.Background(), parentKey)
if err != nil {
return err
}
for _, lnk := range parent.Links {
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)
if toCheck.Has(c) {
......@@ -521,19 +521,19 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) {
}
}
func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) {
for _, lnk := range root.Links {
func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) {
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)
if key.Key(c.Hash()) == child {
return true, nil
}
nd, err := ds.Get(context.Background(), c)
children, err := ds.GetLinks(context.Background(), c)
if err != nil {
return false, err
}
has, err := hasChild(ds, nd, child)
has, err := hasChild(ds, children, child)
if err != nil {
return false, err
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论