提交 796db710 作者: Juan Batiz-Benet

Merge pull request #889 from jbenet/ipns/refactor-dagmod

Ipns/refactor dagmod
......@@ -24,6 +24,7 @@ import (
path "github.com/jbenet/go-ipfs/path"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
mod "github.com/jbenet/go-ipfs/unixfs/mod"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
......@@ -211,7 +212,7 @@ type Node struct {
Ipfs *core.IpfsNode
Nd *mdag.Node
dagMod *uio.DagModifier
dagMod *mod.DagModifier
cached *ftpb.Data
}
......@@ -238,7 +239,11 @@ func (s *Node) Attr() fuse.Attr {
size = 0
}
if size == 0 {
size = s.dagMod.Size()
dmsize, err := s.dagMod.Size()
if err != nil {
log.Error(err)
}
size = uint64(dmsize)
}
mode := os.FileMode(0666)
......@@ -344,13 +349,13 @@ func (n *Node) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri
if n.dagMod == nil {
// Create a DagModifier to allow us to change the existing dag node
dmod, err := uio.NewDagModifier(n.Nd, n.Ipfs.DAG, chunk.DefaultSplitter)
dmod, err := mod.NewDagModifier(ctx, n.Nd, n.Ipfs.DAG, n.Ipfs.Pinning.GetManual(), chunk.DefaultSplitter)
if err != nil {
return err
}
n.dagMod = dmod
}
wrote, err := n.dagMod.WriteAt(req.Data, uint64(req.Offset))
wrote, err := n.dagMod.WriteAt(req.Data, int64(req.Offset))
if err != nil {
return err
}
......
......@@ -7,6 +7,7 @@ import (
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
u "github.com/jbenet/go-ipfs/util"
)
// BlockSizeLimit specifies the maximum size an imported block can have.
......@@ -115,7 +116,11 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
}
// Removes the child node at the given index
func (n *UnixfsNode) RemoveChild(index int) {
func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) {
k := u.Key(n.node.Links[index].Hash)
if dbh.mp != nil {
dbh.mp.RemovePinWithMode(k, pin.Indirect)
}
n.ufmt.RemoveBlockSize(index)
n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...)
}
......
......@@ -134,7 +134,7 @@ func appendFillLastChild(ufsn *h.UnixfsNode, depth int, layerFill int, db *h.Dag
}
// Update changed child in parent node
ufsn.RemoveChild(last)
ufsn.RemoveChild(last, db)
err = ufsn.AddChild(nchild, db)
if err != nil {
return err
......@@ -244,6 +244,20 @@ func verifyTDagRec(nd *dag.Node, depth, direct, layerRepeat int, ds dag.DAGServi
return nil
}
// Verify this is a branch node
pbn, err := ft.FromBytes(nd.Data)
if err != nil {
return err
}
if pbn.GetType() != ft.TFile {
return errors.New("expected file as branch node")
}
if len(pbn.Data) > 0 {
return errors.New("branch node should not have data")
}
for i := 0; i < len(nd.Links); i++ {
child, err := nd.Links[i].GetNode(ds)
if err != nil {
......
......@@ -47,6 +47,7 @@ type Pinner interface {
// may not be successful
type ManualPinner interface {
PinWithMode(util.Key, PinMode)
RemovePinWithMode(util.Key, PinMode)
Pinner
}
......@@ -198,6 +199,20 @@ func (p *pinner) IsPinned(key util.Key) bool {
p.indirPin.HasKey(key)
}
func (p *pinner) RemovePinWithMode(key util.Key, mode PinMode) {
switch mode {
case Direct:
p.directPin.RemoveBlock(key)
case Indirect:
p.indirPin.Decrement(key)
case Recursive:
p.recursePin.RemoveBlock(key)
default:
// programmer error, panic OK
panic("unrecognized pin type")
}
}
// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error) {
p := new(pinner)
......
package io
import (
"bytes"
"errors"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("dagio")
// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
dagserv mdag.DAGService
curNode *mdag.Node
pbdata *ftpb.Data
splitter chunk.BlockSplitter
}
func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
pbd, err := ft.FromBytes(from.Data)
if err != nil {
return nil, err
}
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
pbdata: pbd,
splitter: spl,
}, nil
}
// WriteAt will modify a dag file in place
// NOTE: it currently assumes only a single level of indirection
func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
// Check bounds
if dm.pbdata.GetFilesize() < offset {
return 0, errors.New("Attempted to perform write starting past end of file")
}
// First need to find where we are writing at
end := uint64(len(b)) + offset
// This shouldnt be necessary if we do subblocks sizes properly
newsize := dm.pbdata.GetFilesize()
if end > dm.pbdata.GetFilesize() {
newsize = end
}
zeroblocklen := uint64(len(dm.pbdata.Data))
origlen := len(b)
if end <= zeroblocklen {
log.Debug("Writing into zero block")
// Replacing zeroeth data block (embedded in the root node)
//TODO: check chunking here
copy(dm.pbdata.Data[offset:], b)
return len(b), nil
}
// Find where write should start
var traversed uint64
startsubblk := len(dm.pbdata.Blocksizes)
if offset < zeroblocklen {
dm.pbdata.Data = dm.pbdata.Data[:offset]
startsubblk = 0
} else {
traversed = uint64(zeroblocklen)
for i, size := range dm.pbdata.Blocksizes {
if uint64(offset) < traversed+size {
log.Debugf("Starting mod at block %d. [%d < %d + %d]", i, offset, traversed, size)
// Here is where we start
startsubblk = i
lnk := dm.curNode.Links[i]
node, err := dm.dagserv.Get(u.Key(lnk.Hash))
if err != nil {
return 0, err
}
data, err := ft.UnwrapData(node.Data)
if err != nil {
return 0, err
}
// We have to rewrite the data before our write in this block.
b = append(data[:offset-traversed], b...)
break
}
traversed += size
}
if startsubblk == len(dm.pbdata.Blocksizes) {
// TODO: Im not sure if theres any case that isnt being handled here.
// leaving this note here as a future reference in case something breaks
}
}
// Find blocks that need to be overwritten
var changed []int
mid := -1
var midoff uint64
for i, size := range dm.pbdata.Blocksizes[startsubblk:] {
if end > traversed {
changed = append(changed, i+startsubblk)
} else {
break
}
traversed += size
if end < traversed {
mid = i + startsubblk
midoff = end - (traversed - size)
break
}
}
// If our write starts in the middle of a block...
var midlnk *mdag.Link
if mid >= 0 {
midlnk = dm.curNode.Links[mid]
midnode, err := dm.dagserv.Get(u.Key(midlnk.Hash))
if err != nil {
return 0, err
}
// NOTE: this may have to be changed later when we have multiple
// layers of indirection
data, err := ft.UnwrapData(midnode.Data)
if err != nil {
return 0, err
}
b = append(b, data[midoff:]...)
}
// Generate new sub-blocks, and sizes
subblocks := splitBytes(b, dm.splitter)
var links []*mdag.Link
var sizes []uint64
for _, sb := range subblocks {
n := &mdag.Node{Data: ft.WrapData(sb)}
_, err := dm.dagserv.Add(n)
if err != nil {
log.Warningf("Failed adding node to DAG service: %s", err)
return 0, err
}
lnk, err := mdag.MakeLink(n)
if err != nil {
return 0, err
}
links = append(links, lnk)
sizes = append(sizes, uint64(len(sb)))
}
// This is disgusting (and can be rewritten if performance demands)
if len(changed) > 0 {
sechalflink := append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...)
dm.curNode.Links = append(dm.curNode.Links[:changed[0]], sechalflink...)
sechalfblks := append(sizes, dm.pbdata.Blocksizes[changed[len(changed)-1]+1:]...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes[:changed[0]], sechalfblks...)
} else {
dm.curNode.Links = append(dm.curNode.Links, links...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes, sizes...)
}
dm.pbdata.Filesize = proto.Uint64(newsize)
return origlen, nil
}
func (dm *DagModifier) Size() uint64 {
if dm == nil {
return 0
}
return dm.pbdata.GetFilesize()
}
// splitBytes uses a splitterFunc to turn a large array of bytes
// into many smaller arrays of bytes
func splitBytes(b []byte, spl chunk.BlockSplitter) [][]byte {
out := spl.Split(bytes.NewReader(b))
var arr [][]byte
for blk := range out {
arr = append(arr, blk)
}
return arr
}
// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
b, err := proto.Marshal(dm.pbdata)
if err != nil {
return nil, err
}
dm.curNode.Data = b
return dm.curNode.Copy(), nil
}
package io
import (
"fmt"
"io"
"io/ioutil"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/exchange/offline"
imp "github.com/jbenet/go-ipfs/importer"
"github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func getMockDagServ(t *testing.T) mdag.DAGService {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv, err := bs.New(bstore, offline.Exchange(bstore))
if err != nil {
t.Fatal(err)
}
return mdag.NewDAGService(bserv)
}
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
dr, err := NewDagReader(context.Background(), node, dserv)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(dr)
if err != nil {
t.Fatal(err)
}
return b, node
}
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
newdata := make([]byte, size)
r := u.NewTimeSeededRand()
r.Read(newdata)
if size+beg > uint64(len(orig)) {
orig = append(orig, make([]byte, (size+beg)-uint64(len(orig)))...)
}
copy(orig[beg:], newdata)
nmod, err := dm.WriteAt(newdata, uint64(beg))
if err != nil {
t.Fatal(err)
}
if nmod != int(size) {
t.Fatalf("Mod length not correct! %d != %d", nmod, size)
}
nd, err := dm.GetNode()
if err != nil {
t.Fatal(err)
}
rd, err := NewDagReader(context.Background(), nd, dm.dagserv)
if err != nil {
t.Fatal(err)
}
after, err := ioutil.ReadAll(rd)
if err != nil {
t.Fatal(err)
}
err = arrComp(after, orig)
if err != nil {
t.Fatal(err)
}
return orig
}
func TestDagModifierBasic(t *testing.T) {
t.Skip("DAGModifier needs to be fixed to work with indirect blocks.")
if err := u.SetLogLevel("blockservice", "critical"); err != nil {
t.Fatalf("testlog prepare failed: %s", err)
}
if err := u.SetLogLevel("merkledag", "critical"); err != nil {
t.Fatalf("testlog prepare failed: %s", err)
}
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
// Within zero block
beg := uint64(15)
length := uint64(60)
t.Log("Testing mod within zero block")
b = testModWrite(t, beg, length, b, dagmod)
// Within bounds of existing file
beg = 1000
length = 4000
t.Log("Testing mod within bounds of existing file.")
b = testModWrite(t, beg, length, b, dagmod)
// Extend bounds
beg = 49500
length = 4000
t.Log("Testing mod that extends file.")
b = testModWrite(t, beg, length, b, dagmod)
// "Append"
beg = uint64(len(b))
length = 3000
b = testModWrite(t, beg, length, b, dagmod)
// Verify reported length
node, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
size, err := ft.DataSize(node.Data)
if err != nil {
t.Fatal(err)
}
expected := uint64(50000 + 3500 + 3000)
if size != expected {
t.Fatalf("Final reported size is incorrect [%d != %d]", size, expected)
}
}
func TestMultiWrite(t *testing.T) {
t.Skip("DAGModifier needs to be fixed to work with indirect blocks.")
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 4000)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[i:i+1], uint64(i))
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func TestMultiWriteCoal(t *testing.T) {
t.Skip("Skipping test until DagModifier is fixed")
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 4000)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[:i+1], 0)
if err != nil {
t.Fatal(err)
}
if n != i+1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}
......@@ -3,6 +3,7 @@ package io
import (
"bytes"
"errors"
"fmt"
"io"
"os"
......@@ -113,7 +114,7 @@ func (dr *DagReader) precalcNextBuf() error {
pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data, pb)
if err != nil {
return err
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
}
switch pb.GetType() {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论