提交 b76581d6 作者: Tommi Virtanen 提交者: Jeromy

fsrepo: Refactor to extract datastore internals

License: MIT
Signed-off-by: 's avatarTommi Virtanen <tv@eagain.net>
上级 dc737440
......@@ -51,7 +51,7 @@ type GCBlockstore interface {
PinLock() func()
}
func NewBlockstore(d ds.ThreadSafeDatastore) *blockstore {
func NewBlockstore(d ds.Datastore) *blockstore {
dd := dsns.Wrap(d, BlockPrefix)
return &blockstore{
datastore: dd,
......
......@@ -63,7 +63,7 @@ func (cfg *BuildCfg) fillDefaults() error {
return nil
}
func defaultRepo(dstore ds.ThreadSafeDatastore) (repo.Repo, error) {
func defaultRepo(dstore repo.Datastore) (repo.Repo, error) {
c := cfg.Config{}
priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, rand.Reader)
if err != nil {
......
......@@ -570,14 +570,14 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config)
return nil
}
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Datastore) (routing.IpfsRouting, error) {
dhtRouting := dht.NewDHT(ctx, host, dstore)
dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator
dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc
return dhtRouting, nil
}
type RoutingOption func(context.Context, p2phost.Host, ds.ThreadSafeDatastore) (routing.IpfsRouting, error)
type RoutingOption func(context.Context, p2phost.Host, ds.Datastore) (routing.IpfsRouting, error)
type DiscoveryOption func(p2phost.Host) (discovery.Service, error)
......
......@@ -28,7 +28,7 @@ var (
// routing records to the provided datastore. Only routing records are store in
// the datastore.
func SupernodeServer(recordSource ds.ThreadSafeDatastore) core.RoutingOption {
return func(ctx context.Context, ph host.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
return func(ctx context.Context, ph host.Host, dstore ds.Datastore) (routing.IpfsRouting, error) {
server, err := supernode.NewServer(recordSource, ph.Peerstore(), ph.ID())
if err != nil {
return nil, err
......@@ -44,7 +44,7 @@ func SupernodeServer(recordSource ds.ThreadSafeDatastore) core.RoutingOption {
// TODO doc
func SupernodeClient(remotes ...peer.PeerInfo) core.RoutingOption {
return func(ctx context.Context, ph host.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
return func(ctx context.Context, ph host.Host, dstore ds.Datastore) (routing.IpfsRouting, error) {
if len(remotes) < 1 {
return nil, errServersMissing
}
......
......@@ -64,11 +64,11 @@ type pinner struct {
// not delete them.
internalPin map[key.Key]struct{}
dserv mdag.DAGService
dstore ds.ThreadSafeDatastore
dstore ds.Datastore
}
// NewPinner creates a new pinner using the given datastore as a backend
func NewPinner(dstore ds.ThreadSafeDatastore, serv mdag.DAGService) Pinner {
func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {
// Load set from given datastore...
rcset := set.NewSimpleBlockSet()
......@@ -207,7 +207,7 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) {
}
// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error) {
func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
p := new(pinner)
rootKeyI, err := d.Get(pinDatastoreKey)
......
package fsrepo
import (
"fmt"
"path"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs"
levelds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount"
ldbopts "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
repo "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/dir"
)
const (
leveldbDirectory = "datastore"
flatfsDirectory = "blocks"
)
type defaultDatastore struct {
repo.Datastore
// tracked separately for use in Close; do not use directly.
leveldbDS repo.Datastore
metricsBlocks repo.Datastore
metricsLevelDB repo.Datastore
}
func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) {
d := &defaultDatastore{}
leveldbPath := path.Join(r.path, leveldbDirectory)
var err error
// save leveldb reference so it can be neatly closed afterward
d.leveldbDS, err = levelds.NewDatastore(leveldbPath, &levelds.Options{
Compression: ldbopts.NoCompression,
})
if err != nil {
return nil, fmt.Errorf("unable to open leveldb datastore: %v", err)
}
// 4TB of 256kB objects ~=17M objects, splitting that 256-way
// leads to ~66k objects per dir, splitting 256*256-way leads to
// only 256.
//
// The keys seen by the block store have predictable prefixes,
// including "/" from datastore.Key and 2 bytes from multihash. To
// reach a uniform 256-way split, we need approximately 4 bytes of
// prefix.
blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 4)
if err != nil {
return nil, fmt.Errorf("unable to open flatfs datastore: %v", err)
}
// Add our PeerID to metrics paths to keep them unique
//
// As some tests just pass a zero-value Config to fsrepo.Init,
// cope with missing PeerID.
id := r.config.Identity.PeerID
if id == "" {
// the tests pass in a zero Config; cope with it
id = fmt.Sprintf("uninitialized_%p", r)
}
prefix := "fsrepo." + id + ".datastore."
d.metricsBlocks = measure.New(prefix+"blocks", blocksDS)
d.metricsLevelDB = measure.New(prefix+"leveldb", d.leveldbDS)
mountDS := mount.New([]mount.Mount{
{
Prefix: ds.NewKey("/blocks"),
Datastore: d.metricsBlocks,
},
{
Prefix: ds.NewKey("/"),
Datastore: d.metricsLevelDB,
},
})
// Make sure it's ok to claim the virtual datastore from mount as
// threadsafe. There's no clean way to make mount itself provide
// this information without copy-pasting the code into two
// variants. This is the same dilemma as the `[].byte` attempt at
// introducing const types to Go.
d.Datastore = mountDS
return d, nil
}
func initDefaultDatastore(repoPath string, conf *config.Config) error {
// The actual datastore contents are initialized lazily when Opened.
// During Init, we merely check that the directory is writeable.
leveldbPath := path.Join(repoPath, leveldbDirectory)
if err := dir.Writable(leveldbPath); err != nil {
return fmt.Errorf("datastore: %s", err)
}
flatfsPath := path.Join(repoPath, flatfsDirectory)
if err := dir.Writable(flatfsPath); err != nil {
return fmt.Errorf("datastore: %s", err)
}
return nil
}
var _ repo.Datastore = (*defaultDatastore)(nil)
......@@ -10,12 +10,6 @@ import (
"strings"
"sync"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs"
levelds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount"
ldbopts "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
repo "github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/common"
config "github.com/ipfs/go-ipfs/repo/config"
......@@ -24,7 +18,6 @@ import (
serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize"
dir "github.com/ipfs/go-ipfs/thirdparty/dir"
util "github.com/ipfs/go-ipfs/util"
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
......@@ -56,11 +49,7 @@ func (err NoRepoError) Error() string {
return fmt.Sprintf("no ipfs repo found in %s.\nplease run: ipfs init", err.Path)
}
const (
leveldbDirectory = "datastore"
flatfsDirectory = "blocks"
apiFile = "api"
)
const apiFile = "api"
var (
......@@ -94,7 +83,7 @@ type FSRepo struct {
// the same fsrepo path concurrently
lockfile io.Closer
config *config.Config
ds ds.ThreadSafeDatastore
ds repo.Datastore
}
var _ repo.Repo = (*FSRepo)(nil)
......@@ -247,16 +236,8 @@ func Init(repoPath string, conf *config.Config) error {
return err
}
// The actual datastore contents are initialized lazily when Opened.
// During Init, we merely check that the directory is writeable.
leveldbPath := filepath.Join(repoPath, leveldbDirectory)
if err := dir.Writable(leveldbPath); err != nil {
return fmt.Errorf("datastore: %s", err)
}
flatfsPath := filepath.Join(repoPath, flatfsDirectory)
if err := dir.Writable(flatfsPath); err != nil {
return fmt.Errorf("datastore: %s", err)
if err := initDefaultDatastore(repoPath, conf); err != nil {
return err
}
if err := dir.Writable(filepath.Join(repoPath, "logs")); err != nil {
......@@ -343,59 +324,11 @@ func (r *FSRepo) openConfig() error {
// openDatastore returns an error if the config file is not present.
func (r *FSRepo) openDatastore() error {
leveldbPath := filepath.Join(r.path, leveldbDirectory)
var err error
// save leveldb reference so it can be neatly closed afterward
leveldbDS, err := levelds.NewDatastore(leveldbPath, &levelds.Options{
Compression: ldbopts.NoCompression,
})
d, err := openDefaultDatastore(r)
if err != nil {
return errors.New("unable to open leveldb datastore")
}
// 4TB of 256kB objects ~=17M objects, splitting that 256-way
// leads to ~66k objects per dir, splitting 256*256-way leads to
// only 256.
//
// The keys seen by the block store have predictable prefixes,
// including "/" from datastore.Key and 2 bytes from multihash. To
// reach a uniform 256-way split, we need approximately 4 bytes of
// prefix.
blocksDS, err := flatfs.New(filepath.Join(r.path, flatfsDirectory), 4)
if err != nil {
return errors.New("unable to open flatfs datastore")
return err
}
// Add our PeerID to metrics paths to keep them unique
//
// As some tests just pass a zero-value Config to fsrepo.Init,
// cope with missing PeerID.
id := r.config.Identity.PeerID
if id == "" {
// the tests pass in a zero Config; cope with it
id = fmt.Sprintf("uninitialized_%p", r)
}
prefix := "fsrepo." + id + ".datastore."
metricsBlocks := measure.New(prefix+"blocks", blocksDS)
metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS)
mountDS := mount.New([]mount.Mount{
{
Prefix: ds.NewKey("/blocks"),
Datastore: metricsBlocks,
},
{
Prefix: ds.NewKey("/"),
Datastore: metricsLevelDB,
},
})
// Make sure it's ok to claim the virtual datastore from mount as
// threadsafe. There's no clean way to make mount itself provide
// this information without copy-pasting the code into two
// variants. This is the same dilemma as the `[].byte` attempt at
// introducing const types to Go.
var _ ds.ThreadSafeDatastore = blocksDS
var _ ds.ThreadSafeDatastore = leveldbDS
r.ds = ds2.ClaimThreadSafe{mountDS}
r.ds = d
return nil
}
......@@ -408,15 +341,15 @@ func (r *FSRepo) Close() error {
return errors.New("repo is closed")
}
if err := r.ds.(io.Closer).Close(); err != nil {
return err
}
err := os.Remove(filepath.Join(r.path, apiFile))
if err != nil {
log.Warning("error removing api file: ", err)
}
if err := r.ds.Close(); err != nil {
return err
}
// This code existed in the previous versions, but
// EventlogComponent.Close was never called. Preserving here
// pending further discussion.
......@@ -579,7 +512,7 @@ func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
// Datastore returns a repo-owned datastore. If FSRepo is Closed, return value
// is undefined.
func (r *FSRepo) Datastore() ds.ThreadSafeDatastore {
func (r *FSRepo) Datastore() repo.Datastore {
packageLock.Lock()
d := r.ds
packageLock.Unlock()
......
......@@ -3,7 +3,6 @@ package repo
import (
"errors"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/repo/config"
)
......@@ -12,7 +11,7 @@ var errTODO = errors.New("TODO")
// Mock is not thread-safe
type Mock struct {
C config.Config
D ds.ThreadSafeDatastore
D Datastore
}
func (m *Mock) Config() (*config.Config, error) {
......@@ -32,7 +31,7 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) {
return nil, errTODO
}
func (m *Mock) Datastore() ds.ThreadSafeDatastore { return m.D }
func (m *Mock) Datastore() Datastore { return m.D }
func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil }
......
......@@ -4,7 +4,7 @@ import (
"errors"
"io"
datastore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
config "github.com/ipfs/go-ipfs/repo/config"
)
......@@ -20,7 +20,7 @@ type Repo interface {
SetConfigKey(key string, value interface{}) error
GetConfigKey(key string) (interface{}, error)
Datastore() datastore.ThreadSafeDatastore
Datastore() Datastore
GetStorageUsage() (uint64, error)
// SetAPIAddr sets the API address in the repo.
......@@ -28,3 +28,10 @@ type Repo interface {
io.Closer
}
// Datastore is the interface required from a datastore to be
// acceptable to FSRepo.
type Datastore interface {
ds.Datastore // should be threadsafe, just be careful
io.Closer
}
......@@ -44,7 +44,7 @@ type IpfsDHT struct {
self peer.ID // Local peer (yourself)
peerstore peer.Peerstore // Peer Registry
datastore ds.ThreadSafeDatastore // Local data
datastore ds.Datastore // Local data
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *ProviderManager
......@@ -60,7 +60,7 @@ type IpfsDHT struct {
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *IpfsDHT {
func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.datastore = dstore
dht.self = h.ID()
......
......@@ -47,7 +47,7 @@ func (c *nilclient) Bootstrap(_ context.Context) error {
return nil
}
func ConstructNilRouting(_ context.Context, _ p2phost.Host, _ ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
func ConstructNilRouting(_ context.Context, _ p2phost.Host, _ ds.Datastore) (routing.IpfsRouting, error) {
return &nilclient{}, nil
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论