Unverified 提交 3c95f65b 作者: Steven Allen 提交者: GitHub

Merge pull request #6785 from ipfs/feat/async-ds

Support Asynchronous Datastores
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
blockservice "github.com/ipfs/go-blockservice" blockservice "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil" cidutil "github.com/ipfs/go-cidutil"
filestore "github.com/ipfs/go-filestore"
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format"
...@@ -96,7 +97,29 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options ...@@ -96,7 +97,29 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
bserv := blockservice.New(addblockstore, exch) // hash security 001 bserv := blockservice.New(addblockstore, exch) // hash security 001
dserv := dag.NewDAGService(bserv) dserv := dag.NewDAGService(bserv)
fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv) // add a sync call to the DagService
// this ensures that data written to the DagService is persisted to the underlying datastore
// TODO: propagate the Sync function from the datastore through the blockstore, blockservice and dagservice
var syncDserv *syncDagService
if settings.OnlyHash {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func() error { return nil },
}
} else {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func() error {
ds := api.repo.Datastore()
if err := ds.Sync(bstore.BlockPrefix); err != nil {
return err
}
return ds.Sync(filestore.FilestorePrefix)
},
}
}
fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -272,3 +295,13 @@ func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, set ...@@ -272,3 +295,13 @@ func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, set
func (api *UnixfsAPI) core() *CoreAPI { func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api) return (*CoreAPI)(api)
} }
// syncDagService is used by the Adder to ensure blocks get persisted to the underlying datastore
type syncDagService struct {
ipld.DAGService
syncFn func() error
}
func (s *syncDagService) Sync() error {
return s.syncFn()
}
...@@ -38,6 +38,10 @@ type Link struct { ...@@ -38,6 +38,10 @@ type Link struct {
Size uint64 Size uint64
} }
type syncer interface {
Sync() error
}
// NewAdder Returns a new Adder used for a file add operation. // NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) { func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) {
bufferedDS := ipld.NewBufferedDAG(ctx, ds) bufferedDS := ipld.NewBufferedDAG(ctx, ds)
...@@ -316,6 +320,13 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) { ...@@ -316,6 +320,13 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
return nil, err return nil, err
} }
if asyncDagService, ok := adder.dagService.(syncer); ok {
err = asyncDagService.Sync()
if err != nil {
return nil, err
}
}
if !adder.Pin { if !adder.Pin {
return nd, nil return nd, nil
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface" "github.com/ipfs/go-ipfs-exchange-interface"
"github.com/ipfs/go-ipfs-exchange-offline" "github.com/ipfs/go-ipfs-exchange-offline"
...@@ -41,18 +42,39 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf ...@@ -41,18 +42,39 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
// Pinning creates new pinner which tells GC which blocks should be kept // Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag) rootDS := repo.Datastore()
syncFn := func() error {
if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
return err
}
return rootDS.Sync(filestore.FilestorePrefix)
}
syncDs := &syncDagService{ds, syncFn}
syncInternalDag := &syncDagService{internalDag, syncFn}
pinning, err := pin.LoadPinner(rootDS, syncDs, syncInternalDag)
if err != nil { if err != nil {
// TODO: we should move towards only running 'NewPinner' explicitly on // TODO: we should move towards only running 'NewPinner' explicitly on
// node init instead of implicitly here as a result of the pinner keys // node init instead of implicitly here as a result of the pinner keys
// not being found in the datastore. // not being found in the datastore.
// this is kinda sketchy and could cause data loss // this is kinda sketchy and could cause data loss
pinning = pin.NewPinner(repo.Datastore(), ds, internalDag) pinning = pin.NewPinner(rootDS, syncDs, syncInternalDag)
} }
return pinning, nil return pinning, nil
} }
// syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore
type syncDagService struct {
format.DAGService
syncFn func() error
}
func (s *syncDagService) Sync() error {
return s.syncFn()
}
// Dag creates new DAGService // Dag creates new DAGService
func Dag(bs blockservice.BlockService) format.DAGService { func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs) return merkledag.NewDAGService(bs)
...@@ -77,7 +99,18 @@ func OnlineExchange(provide bool) interface{} { ...@@ -77,7 +99,18 @@ func OnlineExchange(provide bool) interface{} {
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot") dsk := datastore.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error { pf := func(ctx context.Context, c cid.Cid) error {
return repo.Datastore().Put(dsk, c.Bytes()) rootDS := repo.Datastore()
if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
return err
}
if err := rootDS.Sync(filestore.FilestorePrefix); err != nil {
return err
}
if err := rootDS.Put(dsk, c.Bytes()); err != nil {
return err
}
return rootDS.Sync(dsk)
} }
var nd *merkledag.ProtoNode var nd *merkledag.ProtoNode
......
...@@ -35,7 +35,7 @@ require ( ...@@ -35,7 +35,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4 github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-pinner v0.0.2 github.com/ipfs/go-ipfs-pinner v0.0.3
github.com/ipfs/go-ipfs-posinfo v0.0.1 github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.3.0 github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-routing v0.1.0
......
...@@ -226,8 +226,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN ...@@ -226,8 +226,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN
github.com/ipfs/go-ipfs-files v0.0.4 h1:WzRCivcybUQch/Qh6v8LBRhKtRsjnwyiuOV09mK7mrE= github.com/ipfs/go-ipfs-files v0.0.4 h1:WzRCivcybUQch/Qh6v8LBRhKtRsjnwyiuOV09mK7mrE=
github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4= github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA= github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA=
github.com/ipfs/go-ipfs-pinner v0.0.2 h1:KRXt2V0TzoTd3mO1aONSw8C9wnZtl7RLpPruN/XDnlQ= github.com/ipfs/go-ipfs-pinner v0.0.3 h1:ez/yNYYyH1W7DiCF/L29tmp6L7lBO8eqbJtPi2pHicA=
github.com/ipfs/go-ipfs-pinner v0.0.2/go.mod h1:KZGyGAR+yLthGEkG9tuA2zweB7O6auXaJNjX6IbEbOs= github.com/ipfs/go-ipfs-pinner v0.0.3/go.mod h1:s4kFZWLWGDudN8Jyd/GTpt222A12C2snA2+OTdy/7p8=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs= github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A= github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU= github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
......
...@@ -179,7 +179,11 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value pa ...@@ -179,7 +179,11 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value pa
} }
// Put the new record. // Put the new record.
if err := p.ds.Put(IpnsDsKey(id), data); err != nil { key := IpnsDsKey(id)
if err := p.ds.Put(key, data); err != nil {
return nil, err
}
if err := p.ds.Sync(key); err != nil {
return nil, err return nil, err
} }
return entry, nil return entry, nil
......
...@@ -3,6 +3,7 @@ package namesys ...@@ -3,6 +3,7 @@ package namesys
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"github.com/ipfs/go-path"
"testing" "testing"
"time" "time"
...@@ -110,3 +111,45 @@ func TestRSAPublisher(t *testing.T) { ...@@ -110,3 +111,45 @@ func TestRSAPublisher(t *testing.T) {
func TestEd22519Publisher(t *testing.T) { func TestEd22519Publisher(t *testing.T) {
testNamekeyPublisher(t, ci.Ed25519, ds.ErrNotFound, false) testNamekeyPublisher(t, ci.Ed25519, ds.ErrNotFound, false)
} }
func TestAsyncDS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rt := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t))
ds := &checkSyncDS{
Datastore: ds.NewMapDatastore(),
syncKeys: make(map[ds.Key]struct{}),
}
publisher := NewIpnsPublisher(rt, ds)
ipnsFakeID := testutil.RandIdentityOrFatal(t)
ipnsVal, err := path.ParsePath("/ipns/foo.bar")
if err != nil {
t.Fatal(err)
}
if err := publisher.Publish(ctx, ipnsFakeID.PrivateKey(), ipnsVal); err != nil {
t.Fatal(err)
}
ipnsKey := IpnsDsKey(ipnsFakeID.ID())
for k := range ds.syncKeys {
if k.IsAncestorOf(ipnsKey) || k.Equal(ipnsKey) {
return
}
}
t.Fatal("ipns key not synced")
}
type checkSyncDS struct {
ds.Datastore
syncKeys map[ds.Key]struct{}
}
func (d *checkSyncDS) Sync(prefix ds.Key) error {
d.syncKeys[prefix] = struct{}{}
return d.Datastore.Sync(prefix)
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论