提交 721df367 作者: Kevin Atkinson

Don't use a separate LinkService for DAGService.GetLinks()

Instead make LinkService a part of DAGService.  The LinkService is now
simply an interface that DAGService implements.  Also provide a
GetOfflineLinkService() method that the GC uses to get an offline
instance.

License: MIT
Signed-off-by: 's avatarKevin Atkinson <k@kevina.org>
上级 3899194c
...@@ -98,7 +98,6 @@ type IpfsNode struct { ...@@ -98,7 +98,6 @@ type IpfsNode struct {
Blockstore bstore.GCBlockstore // the block store (lower level) Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks. Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects. DAG merkledag.DAGService // the merkle dag service, get/add objects.
LinkService merkledag.LinkService
Resolver *path.Resolver // the path resolution system Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter Reporter metrics.Reporter
Discovery discovery.Service Discovery discovery.Service
......
...@@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { ...@@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil { if err != nil {
return err return err
} }
...@@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo ...@@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
if err != nil { if err != nil {
return nil, err return nil, err
} }
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) { ...@@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) {
gcstarted := make(chan struct{}) gcstarted := make(chan struct{})
go func() { go func() {
defer close(gcstarted) defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil) gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
if err != nil { if err != nil {
log.Error("GC ERROR:", err) log.Error("GC ERROR:", err)
errs <- err errs <- err
......
...@@ -422,3 +422,7 @@ func (bs *Bitswap) GetWantlist() []key.Key { ...@@ -422,3 +422,7 @@ func (bs *Bitswap) GetWantlist() []key.Key {
} }
return out return out
} }
func (bs *Bitswap) IsOnline() bool {
return true
}
...@@ -22,5 +22,7 @@ type Interface interface { // type Exchanger interface ...@@ -22,5 +22,7 @@ type Interface interface { // type Exchanger interface
// available on the network? // available on the network?
HasBlock(blocks.Block) error HasBlock(blocks.Block) error
IsOnline() bool
io.Closer io.Closer
} }
...@@ -67,3 +67,7 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan b ...@@ -67,3 +67,7 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan b
}() }()
return out, nil return out, nil
} }
func (e *offlineExchange) IsOnline() bool {
return false
}
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"sync" "sync"
bserv "github.com/ipfs/go-ipfs/blockservice" bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"context" "context"
...@@ -23,21 +24,21 @@ type DAGService interface { ...@@ -23,21 +24,21 @@ type DAGService interface {
Get(context.Context, *cid.Cid) (*Node, error) Get(context.Context, *cid.Cid) (*Node, error)
Remove(*Node) error Remove(*Node) error
// Return all links for a node, may be more effect than
// calling Get
GetLinks(context.Context, *cid.Cid) ([]*Link, error)
// GetDAG returns, in order, all the single leve child // GetDAG returns, in order, all the single leve child
// nodes of the passed in node. // nodes of the passed in node.
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
Batch() *Batch Batch() *Batch
LinkService
} }
// A LinkService returns the links for a node if they are available
// locally without having to retrieve the block from the datastore.
type LinkService interface { type LinkService interface {
Get(*cid.Cid) ([]*Link, error) // Return all links for a node, may be more effect than
// calling Get in DAGService
GetLinks(context.Context, *cid.Cid) ([]*Link, error)
GetOfflineLinkService() LinkService
} }
func NewDAGService(bs *bserv.BlockService) *dagService { func NewDAGService(bs *bserv.BlockService) *dagService {
...@@ -51,7 +52,6 @@ func NewDAGService(bs *bserv.BlockService) *dagService { ...@@ -51,7 +52,6 @@ func NewDAGService(bs *bserv.BlockService) *dagService {
// able to free some of them when vm pressure is high // able to free some of them when vm pressure is high
type dagService struct { type dagService struct {
Blocks *bserv.BlockService Blocks *bserv.BlockService
LinkService LinkService
} }
// Add adds a node to the dagService, storing the block in the BlockService // Add adds a node to the dagService, storing the block in the BlockService
...@@ -105,12 +105,6 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) { ...@@ -105,12 +105,6 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
} }
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) { func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
if n.LinkService != nil {
links, err := n.LinkService.Get(c)
if err == nil {
return links, nil
}
}
node, err := n.Get(ctx, c) node, err := n.Get(ctx, c)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -118,6 +112,15 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) ...@@ -118,6 +112,15 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error)
return node.Links, nil return node.Links, nil
} }
func (n *dagService) GetOfflineLinkService() LinkService {
if n.Blocks.Exchange.IsOnline() {
bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore))
return NewDAGService(bsrv)
} else {
return n
}
}
func (n *dagService) Remove(nd *Node) error { func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteObject(nd) return n.Blocks.DeleteObject(nd)
} }
...@@ -391,7 +394,7 @@ func legacyCidFromLink(lnk *Link) *cid.Cid { ...@@ -391,7 +394,7 @@ func legacyCidFromLink(lnk *Link) *cid.Cid {
// EnumerateChildren will walk the dag below the given root node and add all // EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set. // unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits? // TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { func EnumerateChildren(ctx context.Context, ds LinkService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range links { for _, lnk := range links {
c := legacyCidFromLink(lnk) c := legacyCidFromLink(lnk)
if visit(c) { if visit(c) {
......
...@@ -2,8 +2,6 @@ package gc ...@@ -2,8 +2,6 @@ package gc
import ( import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag" dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin" pin "github.com/ipfs/go-ipfs/pin"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
...@@ -27,11 +25,9 @@ var log = logging.Logger("gc") ...@@ -27,11 +25,9 @@ var log = logging.Logger("gc")
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
unlocker := bs.GCLock() unlocker := bs.GCLock()
bsrv := bserv.New(bs, offline.Exchange(bs)) ls = ls.GetOfflineLinkService()
ds := dag.NewDAGService(bsrv)
ds.LinkService = ls
gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots) gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -72,16 +68,16 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. ...@@ -72,16 +68,16 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
return output, nil return output, nil
} }
func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
for _, c := range roots { for _, c := range roots {
set.Add(key.Key(c.Hash())) set.Add(key.Key(c.Hash()))
links, err := ds.GetLinks(ctx, c) links, err := ls.GetLinks(ctx, c)
if err != nil { if err != nil {
return err return err
} }
// EnumerateChildren recursively walks the dag and adds the keys to the given set // EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool { err = dag.EnumerateChildren(ctx, ls, links, func(c *cid.Cid) bool {
k := key.Key(c.Hash()) k := key.Key(c.Hash())
seen := set.Has(k) seen := set.Has(k)
if seen { if seen {
...@@ -98,16 +94,16 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [ ...@@ -98,16 +94,16 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [
return nil return nil
} }
func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (key.KeySet, error) { func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (key.KeySet, error) {
// KeySet currently implemented in memory, in the future, may be bloom filter or // KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory. // disk backed to conserve memory.
gcs := key.NewKeySet() gcs := key.NewKeySet()
err := Descendants(ctx, ds, gcs, pn.RecursiveKeys(), false) err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = Descendants(ctx, ds, gcs, bestEffortRoots, true) err = Descendants(ctx, ls, gcs, bestEffortRoots, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -116,7 +112,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor ...@@ -116,7 +112,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor
gcs.Add(key.Key(k.Hash())) gcs.Add(key.Key(k.Hash()))
} }
err = Descendants(ctx, ds, gcs, pn.InternalPins(), false) err = Descendants(ctx, ls, gcs, pn.InternalPins(), false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -521,7 +521,7 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { ...@@ -521,7 +521,7 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) {
} }
} }
func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) { func hasChild(ds mdag.LinkService, links []*mdag.Link, child key.Key) (bool, error) {
for _, lnk := range links { for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash) c := cid.NewCidV0(lnk.Hash)
if key.Key(c.Hash()) == child { if key.Key(c.Hash()) == child {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论