提交 ba1b196d 作者: Juan Batiz-Benet

Merge pull request #939 from jbenet/ipns/fuse

refactor ipns fuse to utilize ipnsfs
......@@ -254,7 +254,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) {
return count, err
}
nd, err := ng.Get()
nd, err := ng.Get(rw.Ctx)
if err != nil {
return count, err
}
......
......@@ -5,16 +5,19 @@ package ipns
import (
"bytes"
"crypto/rand"
"fmt"
"io/ioutil"
mrand "math/rand"
"os"
"sync"
"testing"
"time"
fstest "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs/fstestutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
racedet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
core "github.com/jbenet/go-ipfs/core"
u "github.com/jbenet/go-ipfs/util"
nsfs "github.com/jbenet/go-ipfs/ipnsfs"
ci "github.com/jbenet/go-ipfs/util/testutil/ci"
)
......@@ -30,6 +33,13 @@ func randBytes(size int) []byte {
return b
}
func mkdir(t *testing.T, path string) {
err := os.Mkdir(path, os.ModeDir)
if err != nil {
t.Fatal(err)
}
}
func writeFile(t *testing.T, size int, path string) []byte {
return writeFileData(t, randBytes(size), path)
}
......@@ -57,6 +67,48 @@ func writeFileData(t *testing.T, data []byte, path string) []byte {
return data
}
func verifyFile(t *testing.T, path string, data []byte) {
fi, err := os.Open(path)
if err != nil {
t.Fatal(err)
}
defer fi.Close()
buf := make([]byte, 1024)
offset := 0
for {
n, err := fi.Read(buf)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf[:n], data[offset:offset+n]) {
t.Fatal("Data not equal")
}
if n < len(buf) {
break
}
offset += n
}
}
func checkExists(t *testing.T, path string) {
_, err := os.Stat(path)
if err != nil {
t.Fatal(err)
}
}
func closeMount(mnt *fstest.Mount) {
if err := recover(); err != nil {
log.Error("Recovered panic")
log.Error(err)
}
mnt.Close()
}
func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.Mount) {
maybeSkipFuseTests(t)
......@@ -66,6 +118,13 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
if err != nil {
t.Fatal(err)
}
ipnsfs, err := nsfs.NewFilesystem(context.TODO(), node.DAG, node.Namesys, node.Pinning, node.PrivateKey)
if err != nil {
t.Fatal(err)
}
node.IpnsFs = ipnsfs
}
fs, err := NewFileSystem(node, node.PrivateKey, "")
......@@ -80,17 +139,36 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
return node, mnt
}
func TestIpnsLocalLink(t *testing.T) {
nd, mnt := setupIpnsTest(t, nil)
defer mnt.Close()
name := mnt.Dir + "/local"
_, err := os.Stat(name)
if err != nil {
t.Fatal(err)
}
linksto, err := os.Readlink(name)
if err != nil {
t.Fatal(err)
}
if linksto != nd.Identity.Pretty() {
t.Fatal("Link invalid")
}
}
// Test writing a file and reading it back
func TestIpnsBasicIO(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
_, mnt := setupIpnsTest(t, nil)
defer mnt.Close()
defer closeMount(mnt)
fname := mnt.Dir + "/local/testfile"
data := writeFile(t, 12345, fname)
data := writeFile(t, 10, fname)
rbuf, err := ioutil.ReadFile(fname)
if err != nil {
......@@ -104,7 +182,6 @@ func TestIpnsBasicIO(t *testing.T) {
// Test to make sure file changes persist over mounts of ipns
func TestFilePersistence(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
......@@ -113,11 +190,9 @@ func TestFilePersistence(t *testing.T) {
fname := "/local/atestfile"
data := writeFile(t, 127, mnt.Dir+fname)
// Wait for publish: TODO: make publish happen faster in tests
time.Sleep(time.Millisecond * 40)
mnt.Close()
t.Log("Closed, opening new fs")
node, mnt = setupIpnsTest(t, node)
defer mnt.Close()
......@@ -131,9 +206,45 @@ func TestFilePersistence(t *testing.T) {
}
}
func TestMultipleDirs(t *testing.T) {
node, mnt := setupIpnsTest(t, nil)
t.Log("make a top level dir")
dir1 := "/local/test1"
mkdir(t, mnt.Dir+dir1)
checkExists(t, mnt.Dir+dir1)
t.Log("write a file in it")
data1 := writeFile(t, 4000, mnt.Dir+dir1+"/file1")
verifyFile(t, mnt.Dir+dir1+"/file1", data1)
t.Log("sub directory")
mkdir(t, mnt.Dir+dir1+"/dir2")
checkExists(t, mnt.Dir+dir1+"/dir2")
t.Log("file in that subdirectory")
data2 := writeFile(t, 5000, mnt.Dir+dir1+"/dir2/file2")
verifyFile(t, mnt.Dir+dir1+"/dir2/file2", data2)
mnt.Close()
t.Log("closing mount, then restarting")
_, mnt = setupIpnsTest(t, node)
checkExists(t, mnt.Dir+dir1)
verifyFile(t, mnt.Dir+dir1+"/file1", data1)
verifyFile(t, mnt.Dir+dir1+"/dir2/file2", data2)
mnt.Close()
}
// Test to make sure the filesystem reports file sizes correctly
func TestFileSizeReporting(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
......@@ -155,7 +266,6 @@ func TestFileSizeReporting(t *testing.T) {
// Test to make sure you cant create multiple entries with the same name
func TestDoubleEntryFailure(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
......@@ -175,7 +285,6 @@ func TestDoubleEntryFailure(t *testing.T) {
}
func TestAppendFile(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
......@@ -216,8 +325,126 @@ func TestAppendFile(t *testing.T) {
}
}
func TestConcurrentWrites(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
_, mnt := setupIpnsTest(t, nil)
defer mnt.Close()
nactors := 4
filesPerActor := 400
fileSize := 2000
data := make([][][]byte, nactors)
if racedet.WithRace() {
nactors = 2
filesPerActor = 50
}
wg := sync.WaitGroup{}
for i := 0; i < nactors; i++ {
data[i] = make([][]byte, filesPerActor)
wg.Add(1)
go func(n int) {
defer wg.Done()
for j := 0; j < filesPerActor; j++ {
out := writeFile(t, fileSize, mnt.Dir+fmt.Sprintf("/local/%dFILE%d", n, j))
data[n][j] = out
}
}(i)
}
wg.Wait()
for i := 0; i < nactors; i++ {
for j := 0; j < filesPerActor; j++ {
verifyFile(t, mnt.Dir+fmt.Sprintf("/local/%dFILE%d", i, j), data[i][j])
}
}
}
func TestFSThrash(t *testing.T) {
files := make(map[string][]byte)
if testing.Short() {
t.SkipNow()
}
_, mnt := setupIpnsTest(t, nil)
defer mnt.Close()
base := mnt.Dir + "/local"
dirs := []string{base}
dirlock := sync.RWMutex{}
filelock := sync.Mutex{}
ndirWorkers := 2
nfileWorkers := 2
ndirs := 100
nfiles := 200
wg := sync.WaitGroup{}
// Spawn off workers to make directories
for i := 0; i < ndirWorkers; i++ {
wg.Add(1)
go func(worker int) {
defer wg.Done()
for j := 0; j < ndirs; j++ {
dirlock.RLock()
n := mrand.Intn(len(dirs))
dir := dirs[n]
dirlock.RUnlock()
newDir := fmt.Sprintf("%s/dir%d-%d", dir, worker, j)
err := os.Mkdir(newDir, os.ModeDir)
if err != nil {
t.Fatal(err)
}
dirlock.Lock()
dirs = append(dirs, newDir)
dirlock.Unlock()
}
}(i)
}
// Spawn off workers to make files
for i := 0; i < nfileWorkers; i++ {
wg.Add(1)
go func(worker int) {
defer wg.Done()
for j := 0; j < nfiles; j++ {
dirlock.RLock()
n := mrand.Intn(len(dirs))
dir := dirs[n]
dirlock.RUnlock()
newFileName := fmt.Sprintf("%s/file%d-%d", dir, worker, j)
data := writeFile(t, 2000+mrand.Intn(5000), newFileName)
filelock.Lock()
files[newFileName] = data
filelock.Unlock()
}
}(i)
}
wg.Wait()
for name, data := range files {
out, err := ioutil.ReadFile(name)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, out) {
t.Fatal("Data didnt match")
}
}
}
/*
func TestFastRepublish(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
......@@ -319,10 +546,11 @@ func TestFastRepublish(t *testing.T) {
close(closed)
}
*/
// Test writing a medium sized file one byte at a time
func TestMultiWrite(t *testing.T) {
t.Skip("Skipping until DAGModifier can be fixed.")
if testing.Short() {
t.SkipNow()
}
......
// +build !nofuse
package ipns
import "time"
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
node *Node
}
func NewRepublisher(n *Node, tshort, tlong time.Duration) *Republisher {
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}),
node: n,
}
}
func (np *Republisher) Run() {
for _ = range np.Publish {
quick := time.After(np.TimeoutShort)
longer := time.After(np.TimeoutLong)
wait:
select {
case <-quick:
case <-longer:
case <-np.Publish:
quick = time.After(np.TimeoutShort)
goto wait
}
log.Info("Publishing Changes!")
err := np.node.republishRoot()
if err != nil {
log.Critical("republishRoot error: %s", err)
}
}
}
......@@ -51,13 +51,20 @@ func (fi *File) Read(b []byte) (int, error) {
return fi.mod.Read(b)
}
// Read reads into the given buffer from the current offset
func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.CtxReadFull(ctx, b)
}
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *File) Close() error {
fi.Lock()
defer fi.Unlock()
if fi.hasChanges {
err := fi.mod.Flush()
err := fi.mod.Sync()
if err != nil {
return err
}
......@@ -80,11 +87,11 @@ func (fi *File) Close() error {
return nil
}
// Flush flushes the changes in the file to disk
func (fi *File) Flush() error {
// Sync flushes the changes in the file to disk
func (fi *File) Sync() error {
fi.Lock()
defer fi.Unlock()
return fi.mod.Flush()
return fi.mod.Sync()
}
// Seek implements io.Seeker
......
......@@ -257,10 +257,10 @@ type nodePromise struct {
// from its internal channels, subsequent calls will return the
// cached node.
type NodeGetter interface {
Get() (*Node, error)
Get(context.Context) (*Node, error)
}
func (np *nodePromise) Get() (*Node, error) {
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
if np.cache != nil {
return np.cache, nil
}
......@@ -270,6 +270,8 @@ func (np *nodePromise) Get() (*Node, error) {
np.cache = blk
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
}
return np.cache, nil
}
......@@ -87,6 +87,7 @@ func (l *Link) GetNode(serv DAGService) (*Node, error) {
// AddNodeLink adds a link to another node.
func (n *Node) AddNodeLink(name string, that *Node) error {
n.encoded = nil
lnk, err := MakeLink(that)
if err != nil {
return err
......@@ -101,6 +102,7 @@ func (n *Node) AddNodeLink(name string, that *Node) error {
// AddNodeLink adds a link to another node. without keeping a reference to
// the child node
func (n *Node) AddNodeLinkClean(name string, that *Node) error {
n.encoded = nil
lnk, err := MakeLink(that)
if err != nil {
return err
......@@ -113,6 +115,7 @@ func (n *Node) AddNodeLinkClean(name string, that *Node) error {
// Remove a link on this node by the given name
func (n *Node) RemoveNodeLink(name string) error {
n.encoded = nil
for i, l := range n.Links {
if l.Name == name {
n.Links = append(n.Links[:i], n.Links[i+1:]...)
......
......@@ -177,7 +177,7 @@ func (p *pinner) pinLinks(node *mdag.Node) error {
defer cancel()
for _, ng := range p.dserv.GetDAG(ctx, node) {
subnode, err := ng.Get()
subnode, err := ng.Get(ctx)
if err != nil {
// TODO: Maybe just log and continue?
return err
......
......@@ -100,12 +100,13 @@ func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv md
// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}
nxt, err := dr.promises[dr.linkPosition].Get()
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
......@@ -141,6 +142,11 @@ func (dr *DagReader) Size() int64 {
// Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}
// CtxReadFull reads data from the DAG structured file
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
// If no cached buffer, load one
total := 0
for {
......@@ -161,7 +167,7 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
// Otherwise, load up the next block
err = dr.precalcNextBuf()
err = dr.precalcNextBuf(ctx)
if err != nil {
return total, err
}
......@@ -183,7 +189,7 @@ func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
}
// Otherwise, load up the next block
err = dr.precalcNextBuf()
err = dr.precalcNextBuf(dr.ctx)
if err != nil {
if err == io.EOF {
return total, nil
......@@ -239,7 +245,7 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
}
// start sub-block request
err := dr.precalcNextBuf()
err := dr.precalcNextBuf(dr.ctx)
if err != nil {
return 0, err
}
......
......@@ -80,7 +80,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
}
}
err = dm.Flush()
err = dm.Sync()
if err != nil {
return 0, err
}
......@@ -133,7 +133,7 @@ func (dm *DagModifier) Write(b []byte) (int, error) {
}
dm.curWrOff += uint64(n)
if dm.wrBuf.Len() > writebufferSize {
err := dm.Flush()
err := dm.Sync()
if err != nil {
return n, err
}
......@@ -156,8 +156,8 @@ func (dm *DagModifier) Size() (int64, error) {
return int64(pbn.GetFilesize()), nil
}
// Flush writes changes to this dag to disk
func (dm *DagModifier) Flush() error {
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
// No buffer? Nothing to do
if dm.wrBuf == nil {
return nil
......@@ -315,39 +315,60 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No
// Read data from this dag starting at the current offset
func (dm *DagModifier) Read(b []byte) (int, error) {
err := dm.Flush()
err := dm.readPrep()
if err != nil {
return 0, err
}
n, err := dm.read.Read(b)
dm.curWrOff += uint64(n)
return n, err
}
func (dm *DagModifier) readPrep() error {
err := dm.Sync()
if err != nil {
return err
}
if dm.read == nil {
ctx, cancel := context.WithCancel(dm.ctx)
dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
if err != nil {
return 0, err
return err
}
i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
if err != nil {
return 0, err
return err
}
if i != int64(dm.curWrOff) {
return 0, ErrSeekFail
return ErrSeekFail
}
dm.readCancel = cancel
dm.read = dr
}
n, err := dm.read.Read(b)
return nil
}
// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
err := dm.readPrep()
if err != nil {
return 0, err
}
n, err := dm.read.CtxReadFull(ctx, b)
dm.curWrOff += uint64(n)
return n, err
}
// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
err := dm.Flush()
err := dm.Sync()
if err != nil {
return nil, err
}
......@@ -360,7 +381,7 @@ func (dm *DagModifier) HasChanges() bool {
}
func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
err := dm.Flush()
err := dm.Sync()
if err != nil {
return 0, err
}
......@@ -389,7 +410,7 @@ func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
}
func (dm *DagModifier) Truncate(size int64) error {
err := dm.Flush()
err := dm.Sync()
if err != nil {
return err
}
......
......@@ -246,7 +246,7 @@ func TestMultiWriteAndFlush(t *testing.T) {
if n != 1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
err = dagmod.Flush()
err = dagmod.Sync()
if err != nil {
t.Fatal(err)
}
......
......@@ -90,7 +90,7 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
defer cancel()
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get()
childNode, err := ng.Get(ctx)
if err != nil {
r.emitError(err)
return
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论