提交 db228e10 作者: Jeromy

pin: use separate dagservice for storing pinsets

License: MIT
Signed-off-by: 's avatarJeromy <why@ipfs.io>
上级 7276fd84
......@@ -171,13 +171,15 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
n.Blocks = bserv.New(n.Blockstore, n.Exchange)
n.DAG = dag.NewDAGService(n.Blocks)
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG)
internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)))
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicity on
// node init instead of implicitly here as a result of the pinner keys
// not being found in the datastore.
// this is kinda sketchy and could cause data loss
n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG)
n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG, internalDag)
}
n.Resolver = &path.Resolver{DAG: n.DAG}
......
......@@ -10,7 +10,6 @@ import (
"sync"
"testing"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice"
bstest "github.com/ipfs/go-ipfs/blockservice/test"
......@@ -19,31 +18,11 @@ import (
chunk "github.com/ipfs/go-ipfs/importer/chunk"
. "github.com/ipfs/go-ipfs/merkledag"
dstest "github.com/ipfs/go-ipfs/merkledag/test"
"github.com/ipfs/go-ipfs/pin"
uio "github.com/ipfs/go-ipfs/unixfs/io"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dssync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
type dagservAndPinner struct {
ds DAGService
mp pin.Pinner
}
func getDagservAndPinner(t *testing.T) dagservAndPinner {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv := bserv.New(bs, offline.Exchange(bs))
dserv := NewDAGService(blockserv)
mpin := pin.NewPinner(db, dserv)
return dagservAndPinner{
ds: dserv,
mp: mpin,
}
}
func TestNode(t *testing.T) {
n1 := NodeWithData([]byte("beep"))
......@@ -254,7 +233,7 @@ func TestEmptyKey(t *testing.T) {
}
func TestCantGet(t *testing.T) {
dsp := getDagservAndPinner(t)
ds := dstest.Mock()
a := NodeWithData([]byte("A"))
k, err := a.Key()
......@@ -262,7 +241,7 @@ func TestCantGet(t *testing.T) {
t.Fatal(err)
}
_, err = dsp.ds.Get(context.Background(), k)
_, err = ds.Get(context.Background(), k)
if !strings.Contains(err.Error(), "not found") {
t.Fatal("expected err not found, got: ", err)
}
......
......@@ -110,15 +110,14 @@ type pinner struct {
// not delete them.
internalPin map[key.Key]struct{}
dserv mdag.DAGService
internal mdag.DAGService // dagservice used to store internal objects
dstore ds.Datastore
}
// NewPinner creates a new pinner using the given datastore as a backend
func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner {
// Load set from given datastore...
rcset := set.NewSimpleBlockSet()
dirset := set.NewSimpleBlockSet()
return &pinner{
......@@ -126,6 +125,7 @@ func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
directPin: dirset,
dserv: serv,
dstore: dstore,
internal: internal,
}
}
......@@ -344,7 +344,7 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) {
}
// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
func LoadPinner(d ds.Datastore, dserv, internal mdag.DAGService) (Pinner, error) {
p := new(pinner)
rootKeyI, err := d.Get(pinDatastoreKey)
......@@ -361,7 +361,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
root, err := dserv.Get(ctx, rootKey)
root, err := internal.Get(ctx, rootKey)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
......@@ -374,7 +374,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
}
{ // load recursive set
recurseKeys, err := loadSet(ctx, dserv, root, linkRecursive, recordInternal)
recurseKeys, err := loadSet(ctx, internal, root, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
......@@ -382,7 +382,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
}
{ // load direct set
directKeys, err := loadSet(ctx, dserv, root, linkDirect, recordInternal)
directKeys, err := loadSet(ctx, internal, root, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
......@@ -394,6 +394,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
// assign services
p.dserv = dserv
p.dstore = d
p.internal = internal
return p, nil
}
......@@ -422,7 +423,7 @@ func (p *pinner) Flush() error {
root := &mdag.Node{}
{
n, err := storeSet(ctx, p.dserv, p.directPin.GetKeys(), recordInternal)
n, err := storeSet(ctx, p.internal, p.directPin.GetKeys(), recordInternal)
if err != nil {
return err
}
......@@ -432,7 +433,7 @@ func (p *pinner) Flush() error {
}
{
n, err := storeSet(ctx, p.dserv, p.recursePin.GetKeys(), recordInternal)
n, err := storeSet(ctx, p.internal, p.recursePin.GetKeys(), recordInternal)
if err != nil {
return err
}
......@@ -442,12 +443,12 @@ func (p *pinner) Flush() error {
}
// add the empty node, its referenced by the pin sets but never created
_, err := p.dserv.Add(new(mdag.Node))
_, err := p.internal.Add(new(mdag.Node))
if err != nil {
return err
}
k, err := p.dserv.Add(root)
k, err := p.internal.Add(root)
if err != nil {
return err
}
......
......@@ -45,7 +45,7 @@ func TestPinnerBasic(t *testing.T) {
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv)
p := NewPinner(dstore, dserv, dserv)
a, ak := randNode()
_, err := dserv.Add(a)
......@@ -133,7 +133,7 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}
np, err := LoadPinner(dstore, dserv)
np, err := LoadPinner(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
......@@ -154,7 +154,7 @@ func TestDuplicateSemantics(t *testing.T) {
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv)
p := NewPinner(dstore, dserv, dserv)
a, _ := randNode()
_, err := dserv.Add(a)
......@@ -187,7 +187,7 @@ func TestFlush(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv)
p := NewPinner(dstore, dserv, dserv)
_, k := randNode()
p.PinWithMode(k, Recursive)
......@@ -204,7 +204,7 @@ func TestPinRecursiveFail(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv)
p := NewPinner(dstore, dserv, dserv)
a, _ := randNode()
b, _ := randNode()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论