提交 930196aa 作者: Łukasz Magiera

namecache: prefetch option

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 0bfcf7ea
......@@ -296,7 +296,7 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)
if follow {
n.Namecache = namecache.NewNameCache(ctx, n.Namesys, n.Pinning, n.DAG, n.Blockstore)
n.Namecache = namecache.NewNameCache(ctx, n.Namesys, n.DAG)
n.Namecache, err = namecache.NewPersistentCache(n.Namecache, n.Repo.Datastore())
if err != nil {
return err
......
......@@ -99,7 +99,7 @@ func ApplyChange(ctx context.Context, ds ipld.DAGService, nd *dag.ProtoNode, cs
// 1. two node's links number are greater than 0.
// 2. both of two nodes are ProtoNode.
// Otherwise, it compares the cid and emits a Mod change object.
func Diff(ctx context.Context, ds ipld.DAGService, a, b ipld.Node) ([]*Change, error) {
func Diff(ctx context.Context, ds ipld.NodeGetter, a, b ipld.Node) ([]*Change, error) {
// Base case where both nodes are leaves, just compare
// their CIDs.
if len(a.Links()) == 0 && len(b.Links()) == 0 {
......
......@@ -8,14 +8,14 @@ import (
"sync"
"time"
namesys "github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/dagutils"
"github.com/ipfs/go-ipfs/namesys"
uio "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/io"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format"
bstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore"
path "gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path"
resolver "gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path/resolver"
"gx/ipfs/QmWqh9oob7ZHQRwU5CdTqpnC8ip8BEkFNrwXRxeNo5Y7vA/go-path"
dag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag"
logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log"
)
......@@ -39,19 +39,17 @@ type NameCache interface {
type nameCache struct {
nsys namesys.NameSystem
dag ipld.NodeGetter
bstore bstore.GCBlockstore
ctx context.Context
follows map[string]func()
mx sync.Mutex
}
func NewNameCache(ctx context.Context, nsys namesys.NameSystem, dag ipld.NodeGetter, bstore bstore.GCBlockstore) NameCache {
func NewNameCache(ctx context.Context, nsys namesys.NameSystem, dag ipld.NodeGetter) NameCache {
return &nameCache{
ctx: ctx,
nsys: nsys,
dag: dag,
bstore: bstore,
follows: make(map[string]func()),
}
}
......@@ -67,7 +65,7 @@ func (nc *nameCache) Follow(name string, prefetch bool, followInterval time.Dura
}
if _, ok := nc.follows[name]; ok {
return fmt.Errorf("Already following %s", name)
return fmt.Errorf("already following %s", name)
}
ctx, cancel := context.WithCancel(nc.ctx)
......@@ -88,7 +86,7 @@ func (nc *nameCache) Unfollow(name string) error {
cancel, ok := nc.follows[name]
if !ok {
return fmt.Errorf("Unknown name %s", name)
return fmt.Errorf("unknown name %s", name)
}
cancel()
......@@ -110,8 +108,9 @@ func (nc *nameCache) ListFollows() []string {
}
func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool, followInterval time.Duration) {
// if cid != nil, we have prefetched data under the node
c, err := nc.resolveAndFetch(ctx, name, prefetch)
emptynode := new(dag.ProtoNode)
c, err := nc.resolveAndUpdate(ctx, name, prefetch, emptynode.Cid())
if err != nil {
log.Errorf("Error following %s: %s", name, err.Error())
}
......@@ -122,11 +121,7 @@ func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool,
for {
select {
case <-ticker.C:
if c != cid.Undef {
c, err = nc.resolveAndUpdate(ctx, name, c)
} else {
c, err = nc.resolveAndFetch(ctx, name, prefetch)
}
c, err = nc.resolveAndUpdate(ctx, name, prefetch, c)
if err != nil {
log.Errorf("Error following %s: %s", name, err.Error())
......@@ -138,47 +133,55 @@ func (nc *nameCache) followName(ctx context.Context, name string, prefetch bool,
}
}
func (nc *nameCache) resolveAndFetch(ctx context.Context, name string, prefetch bool) (cid.Cid, error) {
func (nc *nameCache) resolveAndUpdate(ctx context.Context, name string, prefetch bool, oldcid cid.Cid) (cid.Cid, error) {
ptr, err := nc.resolve(ctx, name)
if err != nil {
return cid.Undef, err
}
if !prefetch {
return cid.Undef, nil
}
c, err := pathToCid(ptr)
newcid, err := pathToCid(ptr)
if err != nil {
return cid.Undef, err
}
defer nc.bstore.PinLock().Unlock()
if newcid.Equals(oldcid) || !prefetch {
return newcid, nil
}
n, err := nc.pathToNode(ctx, ptr)
oldnd, err := nc.dag.Get(ctx, oldcid)
if err != nil {
return cid.Undef, err
}
return c, err
}
func (nc *nameCache) resolveAndUpdate(ctx context.Context, name string, oldcid cid.Cid) (cid.Cid, error) {
ptr, err := nc.resolve(ctx, name)
newnd, err := nc.dag.Get(ctx, newcid)
if err != nil {
return cid.Undef, err
}
newcid, err := pathToCid(ptr)
changes, err := dagutils.Diff(ctx, nc.dag, oldnd, newnd)
if err != nil {
return cid.Undef, err
}
if newcid.Equals(oldcid) {
return oldcid, nil
}
log.Debugf("fetching changes in %s (%s -> %s)", name, oldcid, newcid)
for _, change := range changes {
if change.Type == iface.DiffRemove {
continue
}
toFetch, err := nc.dag.Get(ctx, change.After)
if err != nil {
return cid.Undef, err
}
// TODO: handle prefetching
// just iterate over all nodes
walker := ipld.NewWalker(ctx, ipld.NewNavigableIPLDNode(toFetch, nc.dag))
if err := walker.Iterate(func(node ipld.NavigableNode) error {
return nil
}); err != ipld.EndOfDag {
return cid.Undef, fmt.Errorf("unexpected error when prefetching followed name: %s", err)
}
}
return newcid, err
}
......@@ -196,20 +199,9 @@ func (nc *nameCache) resolve(ctx context.Context, name string) (path.Path, error
log.Debugf("resolved %s to %s", name, p)
// TODO: handle prefetching
return p, nil
}
func pathToCid(p path.Path) (cid.Cid, error) {
return cid.Decode(p.Segments()[1])
}
func (nc *nameCache) pathToNode(ctx context.Context, p path.Path) (ipld.Node, error) {
r := &resolver.Resolver{
DAG: nc.dag,
ResolveOnce: uio.ResolveUnixfsOnce,
}
return r.ResolvePath(ctx, p)
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论