提交 c4c66539 作者: Jeromy

add global config switch for sharding

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 e8764345
......@@ -18,6 +18,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
uio "github.com/ipfs/go-ipfs/unixfs/io"
ci "gx/ipfs/QmPGxZ1DP2w45WcogpW1h43BvseXbfke9N91qotpoQcUeS/go-libp2p-crypto"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
......@@ -175,6 +176,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = conf.Experimental.ShardingEnabled
opts.HasBloomFilterSize = conf.Datastore.BloomFilterSize
if !cfg.Permament {
opts.HasBloomFilterSize = 0
......
......@@ -105,11 +105,24 @@ The JSON output contains type information.
output := make([]LsObject, len(req.Arguments()))
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(nd.DAG, dagnode)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
links, err := dir.Links()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(dagnode.Links())),
Links: make([]LsLink, len(links)),
}
for j, link := range dagnode.Links() {
for j, link := range links {
t := unixfspb.Data_DataType(-1)
linkNode, err := link.GetNode(req.Context(), dserv)
......
......@@ -190,15 +190,18 @@ func (adder *Adder) PinRoot() error {
func (adder *Adder) Finalize() (node.Node, error) {
root := adder.mr.GetValue()
// cant just call adder.RootNode() here as we need the name for printing
rootNode, err := root.GetNode()
err := root.Flush()
if err != nil {
return nil, err
}
var name string
if !adder.Wrap {
name = rootNode.Links()[0].Name
children, err := root.(*mfs.Directory).ListNames()
if err != nil {
return nil, err
}
name = children[0]
dir, ok := adder.mr.GetValue().(*mfs.Directory)
if !ok {
......
......@@ -59,7 +59,7 @@ func NewDirectory(ctx context.Context, name string, node node.Node, parent child
// closeChild updates the child by the given name to the dag node 'nd'
// and changes its own dag node
func (d *Directory) closeChild(name string, nd *dag.ProtoNode, sync bool) error {
func (d *Directory) closeChild(name string, nd node.Node, sync bool) error {
mynd, err := d.closeChildUpdate(name, nd, sync)
if err != nil {
return err
......@@ -72,7 +72,7 @@ func (d *Directory) closeChild(name string, nd *dag.ProtoNode, sync bool) error
}
// closeChildUpdate is the portion of closeChild that needs to be locked around
func (d *Directory) closeChildUpdate(name string, nd *dag.ProtoNode, sync bool) (*dag.ProtoNode, error) {
func (d *Directory) closeChildUpdate(name string, nd node.Node, sync bool) (*dag.ProtoNode, error) {
d.lock.Lock()
defer d.lock.Unlock()
......@@ -329,13 +329,10 @@ func (d *Directory) Unlink(name string) error {
}
func (d *Directory) Flush() error {
d.lock.Lock()
nd, err := d.flushCurrentNode()
nd, err := d.GetNode()
if err != nil {
d.lock.Unlock()
return err
}
d.lock.Unlock()
return d.parent.closeChild(d.name, nd, true)
}
......
......@@ -752,7 +752,7 @@ func TestMfsHugeDir(t *testing.T) {
defer cancel()
_, rt := setupRoot(ctx, t)
for i := 0; i < 100000; i++ {
for i := 0; i < 10000; i++ {
err := Mkdir(rt, fmt.Sprintf("/dir%d", i), false, false)
if err != nil {
t.Fatal(err)
......
......@@ -12,6 +12,7 @@ package mfs
import (
"context"
"errors"
"fmt"
"sync"
"time"
......@@ -30,7 +31,7 @@ var log = logging.Logger("mfs")
var ErrIsDirectory = errors.New("error: is a directory")
type childCloser interface {
closeChild(string, *dag.ProtoNode, bool) error
closeChild(string, node.Node, bool) error
}
type NodeType int
......@@ -87,7 +88,7 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf
}
switch pbn.GetType() {
case ft.TDirectory:
case ft.TDirectory, ft.THAMTShard:
rval, err := NewDirectory(parent, node.String(), node, root, ds)
if err != nil {
return nil, err
......@@ -101,7 +102,7 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf
}
root.val = fi
default:
panic("unrecognized! (NYI)")
return nil, fmt.Errorf("unrecognized unixfs type: %s", pbn.GetType())
}
return root, nil
}
......@@ -124,7 +125,7 @@ func (kr *Root) Flush() error {
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd *dag.ProtoNode, sync bool) error {
func (kr *Root) closeChild(name string, nd node.Node, sync bool) error {
c, err := kr.dserv.Add(nd)
if err != nil {
return err
......
......@@ -2,4 +2,5 @@ package config
type Experiments struct {
FilestoreEnabled bool
ShardingEnabled bool
}
......@@ -207,20 +207,6 @@ test_add_named_pipe() {
'
}
test_add_sharded_dir() {
mkdir testdata
for i in `seq 2000`
do
echo $i > testdata/file$i
done
test_expect_success "ipfs add on very large directory succeeds" '
ipfs add -r -q testdata | tail -n1 > sharddir_out &&
echo QmSCJD1KYLhVVHqBK3YyXuoEqHt7vggyJhzoFYbT8v1XYL > sharddir_exp &&
test_cmp sharddir_exp sharddir_out
'
}
test_add_pwd_is_symlink() {
test_expect_success "ipfs add -r adds directory content when ./ is symlink" '
mkdir hellodir &&
......@@ -453,8 +439,6 @@ test_kill_ipfs_daemon
test_add_cat_file
test_add_sharded_dir
test_add_cat_raw
test_expect_success "ipfs add --only-hash succeeds" '
......@@ -475,8 +459,6 @@ test_launch_ipfs_daemon --offline
test_add_cat_file
test_add_sharded_dir
test_kill_ipfs_daemon
test_done
......@@ -51,9 +51,9 @@ test_sharding() {
ipfs files mkdir /foo
'
test_expect_success "can make 1100 files in a directory" '
test_expect_success "can make 100 files in a directory" '
printf "" > list_exp_raw
for i in `seq 1100`
for i in `seq 100`
do
echo $i | ipfs files write --create /foo/file$i
echo file$i >> list_exp_raw
......@@ -71,6 +71,12 @@ test_sharding() {
echo "65" > file_exp &&
test_cmp file_out file_exp
'
test_expect_success "output object was really sharded" '
ipfs files stat --hash /foo > expected_foo_hash &&
echo QmPkwLJTYZRGPJ8Lazr9qPdrLmswPtUjaDbEpmR9jEh1se > actual_foo_hash &&
test_cmp expected_foo_hash actual_foo_hash
'
}
test_files_api() {
......@@ -508,7 +514,7 @@ test_files_api() {
}
# test offline and online
#test_files_api
test_files_api
test_expect_success "clean up objects from previous test run" '
ipfs repo gc
......@@ -518,6 +524,14 @@ test_launch_ipfs_daemon
ONLINE=1 # set online flag so tests can easily tell
test_files_api
test_kill_ipfs_daemon
test_expect_success "enable sharding in config" '
ipfs config --json Experimental.ShardingEnabled true
'
test_launch_ipfs_daemon
test_sharding
test_kill_ipfs_daemon
test_done
#!/bin/sh
#
# Copyright (c) 2014 Christian Couder
# MIT Licensed; see the LICENSE file in this repository.
#
test_description="Test global enable sharding flag"
. lib/test-lib.sh
test_expect_success "set up test data" '
mkdir testdata
for i in `seq 2000`
do
echo $i > testdata/file$i
done
'
test_add_large_dir() {
exphash="$1"
test_expect_success "ipfs add on very large directory succeeds" '
ipfs add -r -q testdata | tail -n1 > sharddir_out &&
echo "$exphash" > sharddir_exp &&
test_cmp sharddir_exp sharddir_out
'
}
test_init_ipfs
UNSHARDED="QmavrTrQG4VhoJmantURAYuw3bowq3E2WcvP36NRQDAC1N"
test_add_large_dir "$UNSHARDED"
test_launch_ipfs_daemon
test_add_large_dir "$UNSHARDED"
test_kill_ipfs_daemon
test_expect_success "enable sharding" '
ipfs config --json Experimental.ShardingEnabled true
'
SHARDED="QmSCJD1KYLhVVHqBK3YyXuoEqHt7vggyJhzoFYbT8v1XYL"
test_add_large_dir "$SHARDED"
test_launch_ipfs_daemon
test_add_large_dir "$SHARDED"
test_kill_ipfs_daemon
test_expect_success "sharded and unsharded output look the same" '
ipfs ls "$SHARDED" | sort > sharded_out &&
ipfs ls "$UNSHARDED" | sort > unsharded_out &&
test_cmp sharded_out unsharded_out
'
test_done
......@@ -17,6 +17,10 @@ import (
// result in the node being restructured into a sharded object.
var ShardSplitThreshold = 1000
// UseHAMTSharding is a global flag that signifies whether or not to use the
// HAMT sharding scheme for directory creation
var UseHAMTSharding = false
// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256
......@@ -31,7 +35,15 @@ type Directory struct {
func NewDirectory(dserv mdag.DAGService) *Directory {
db := new(Directory)
db.dserv = dserv
db.dirnode = format.EmptyDirNode()
if UseHAMTSharding {
s, err := hamt.NewHamtShard(dserv, DefaultShardWidth)
if err != nil {
panic(err) // will only panic if DefaultShardWidth is a bad value
}
db.shard = s
} else {
db.dirnode = format.EmptyDirNode()
}
return db
}
......@@ -70,7 +82,7 @@ func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, erro
// AddChild adds a (name, key)-pair to the root node.
func (d *Directory) AddChild(ctx context.Context, name string, nd node.Node) error {
if d.shard == nil {
if len(d.dirnode.Links()) < ShardSplitThreshold {
if !UseHAMTSharding {
_ = d.dirnode.RemoveNodeLink(name)
return d.dirnode.AddNodeLinkClean(name, nd)
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论