提交 7302c3ab 作者: Kevin Atkinson

Provide support for raw leaves in DAG modifier.

License: MIT
Signed-off-by: 's avatarKevin Atkinson <k@kevina.org>
上级 189cf0cb
...@@ -120,8 +120,8 @@ func (db *DagBuilderHelper) NewUnixfsNode() *UnixfsNode { ...@@ -120,8 +120,8 @@ func (db *DagBuilderHelper) NewUnixfsNode() *UnixfsNode {
return n return n
} }
// NewUnixfsBlock creates a new Unixfs node to represent a raw data block // newUnixfsBlock creates a new Unixfs node to represent a raw data block
func (db *DagBuilderHelper) NewUnixfsBlock() *UnixfsNode { func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode {
n := &UnixfsNode{ n := &UnixfsNode{
node: new(dag.ProtoNode), node: new(dag.ProtoNode),
ufmt: &ft.FSNode{Type: ft.TRaw}, ufmt: &ft.FSNode{Type: ft.TRaw},
...@@ -181,7 +181,7 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { ...@@ -181,7 +181,7 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
}, nil }, nil
} }
} else { } else {
blk := db.NewUnixfsBlock() blk := db.newUnixfsBlock()
blk.SetData(data) blk.SetData(data)
return blk, nil return blk, nil
} }
......
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
func TestBasicRead(t *testing.T) { func TestBasicRead(t *testing.T) {
dserv := testu.GetDAGServ() dserv := testu.GetDAGServ()
inbuf, node := testu.GetRandomNode(t, dserv, 1024) inbuf, node := testu.GetRandomNode(t, dserv, 1024, testu.ProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background()) ctx, closer := context.WithCancel(context.Background())
defer closer() defer closer()
...@@ -44,7 +44,7 @@ func TestSeekAndRead(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestSeekAndRead(t *testing.T) {
inbuf[i] = byte(i) inbuf[i] = byte(i)
} }
node := testu.GetNode(t, dserv, inbuf) node := testu.GetNode(t, dserv, inbuf, testu.ProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background()) ctx, closer := context.WithCancel(context.Background())
defer closer() defer closer()
...@@ -84,7 +84,7 @@ func TestRelativeSeek(t *testing.T) { ...@@ -84,7 +84,7 @@ func TestRelativeSeek(t *testing.T) {
} }
inbuf[1023] = 1 // force the reader to be 1024 bytes inbuf[1023] = 1 // force the reader to be 1024 bytes
node := testu.GetNode(t, dserv, inbuf) node := testu.GetNode(t, dserv, inbuf, testu.ProtoBufLeaves)
reader, err := NewDagReader(ctx, node, dserv) reader, err := NewDagReader(ctx, node, dserv)
if err != nil { if err != nil {
...@@ -160,7 +160,7 @@ func TestBadPBData(t *testing.T) { ...@@ -160,7 +160,7 @@ func TestBadPBData(t *testing.T) {
func TestMetadataNode(t *testing.T) { func TestMetadataNode(t *testing.T) {
dserv := testu.GetDAGServ() dserv := testu.GetDAGServ()
rdata, rnode := testu.GetRandomNode(t, dserv, 512) rdata, rnode := testu.GetRandomNode(t, dserv, 512, testu.ProtoBufLeaves)
_, err := dserv.Add(rnode) _, err := dserv.Add(rnode)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -203,7 +203,7 @@ func TestMetadataNode(t *testing.T) { ...@@ -203,7 +203,7 @@ func TestMetadataNode(t *testing.T) {
func TestWriteTo(t *testing.T) { func TestWriteTo(t *testing.T) {
dserv := testu.GetDAGServ() dserv := testu.GetDAGServ()
inbuf, node := testu.GetRandomNode(t, dserv, 1024) inbuf, node := testu.GetRandomNode(t, dserv, 1024, testu.ProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background()) ctx, closer := context.WithCancel(context.Background())
defer closer() defer closer()
...@@ -225,7 +225,7 @@ func TestWriteTo(t *testing.T) { ...@@ -225,7 +225,7 @@ func TestWriteTo(t *testing.T) {
func TestReaderSzie(t *testing.T) { func TestReaderSzie(t *testing.T) {
dserv := testu.GetDAGServ() dserv := testu.GetDAGServ()
size := int64(1024) size := int64(1024)
_, node := testu.GetRandomNode(t, dserv, size) _, node := testu.GetRandomNode(t, dserv, size, testu.ProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background()) ctx, closer := context.WithCancel(context.Background())
defer closer() defer closer()
......
...@@ -40,6 +40,8 @@ type DagModifier struct { ...@@ -40,6 +40,8 @@ type DagModifier struct {
curWrOff uint64 curWrOff uint64
wrBuf *bytes.Buffer wrBuf *bytes.Buffer
RawLeaves bool
read uio.DagReader read uio.DagReader
} }
...@@ -113,17 +115,7 @@ func (dm *DagModifier) expandSparse(size int64) error { ...@@ -113,17 +115,7 @@ func (dm *DagModifier) expandSparse(size int64) error {
return err return err
} }
_, err = dm.dagserv.Add(nnode) _, err = dm.dagserv.Add(nnode)
if err != nil { return err
return err
}
pbnnode, ok := nnode.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
dm.curNode = pbnnode
return nil
} }
// Write continues writing to the dag at the current offset // Write continues writing to the dag at the current offset
...@@ -149,26 +141,28 @@ func (dm *DagModifier) Write(b []byte) (int, error) { ...@@ -149,26 +141,28 @@ func (dm *DagModifier) Write(b []byte) (int, error) {
return n, nil return n, nil
} }
var ErrNoRawYet = fmt.Errorf("currently only fully support protonodes in the dagmodifier")
// Size returns the Filesize of the node // Size returns the Filesize of the node
func (dm *DagModifier) Size() (int64, error) { func (dm *DagModifier) Size() (int64, error) {
switch nd := dm.curNode.(type) { fileSize, err := fileSize(dm.curNode)
if err != nil {
return 0, err
}
if dm.wrBuf != nil && int64(dm.wrBuf.Len())+int64(dm.writeStart) > int64(fileSize) {
return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
}
return int64(fileSize), nil
}
func fileSize(n node.Node) (uint64, error) {
switch nd := n.(type) {
case *mdag.ProtoNode: case *mdag.ProtoNode:
pbn, err := ft.FromBytes(nd.Data()) f, err := ft.FromBytes(nd.Data())
if err != nil { if err != nil {
return 0, err return 0, err
} }
if dm.wrBuf != nil && uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() { return f.GetFilesize(), nil
return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
}
return int64(pbn.GetFilesize()), nil
case *mdag.RawNode: case *mdag.RawNode:
if dm.wrBuf != nil { return uint64(len(nd.RawData())), nil
return 0, ErrNoRawYet
}
sz, err := nd.Size()
return int64(sz), err
default: default:
return 0, ErrNotUnixfs return 0, ErrNotUnixfs
} }
...@@ -196,36 +190,22 @@ func (dm *DagModifier) Sync() error { ...@@ -196,36 +190,22 @@ func (dm *DagModifier) Sync() error {
return err return err
} }
nd, err := dm.dagserv.Get(dm.ctx, thisc) dm.curNode, err = dm.dagserv.Get(dm.ctx, thisc)
if err != nil { if err != nil {
return err return err
} }
pbnd, ok := nd.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
dm.curNode = pbnd
// need to write past end of current dag // need to write past end of current dag
if !done { if !done {
nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf)) dm.curNode, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
if err != nil { if err != nil {
return err return err
} }
_, err = dm.dagserv.Add(nd) _, err = dm.dagserv.Add(dm.curNode)
if err != nil { if err != nil {
return err return err
} }
pbnode, ok := nd.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
dm.curNode = pbnode
} }
dm.writeStart += uint64(buflen) dm.writeStart += uint64(buflen)
...@@ -238,43 +218,86 @@ func (dm *DagModifier) Sync() error { ...@@ -238,43 +218,86 @@ func (dm *DagModifier) Sync() error {
// returns the new key of the passed in node and whether or not all the data in the reader // returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed. // has been consumed.
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) { func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
node, ok := n.(*mdag.ProtoNode) // If we've reached a leaf node.
if !ok { if len(n.Links()) == 0 {
return nil, false, ErrNoRawYet switch nd0 := n.(type) {
} case *mdag.ProtoNode:
f, err := ft.FromBytes(nd0.Data())
if err != nil {
return nil, false, err
}
f, err := ft.FromBytes(node.Data()) n, err := data.Read(f.Data[offset:])
if err != nil { if err != nil && err != io.EOF {
return nil, false, err return nil, false, err
} }
// If we've reached a leaf node. // Update newly written node..
if len(node.Links()) == 0 { b, err := proto.Marshal(f)
n, err := data.Read(f.Data[offset:]) if err != nil {
if err != nil && err != io.EOF { return nil, false, err
return nil, false, err }
}
// Update newly written node.. nd := new(mdag.ProtoNode)
b, err := proto.Marshal(f) nd.SetData(b)
if err != nil { k, err := dm.dagserv.Add(nd)
return nil, false, err if err != nil {
} return nil, false, err
}
nd := new(mdag.ProtoNode) // Hey look! we're done!
nd.SetData(b) var done bool
k, err := dm.dagserv.Add(nd) if n < len(f.Data[offset:]) {
if err != nil { done = true
return nil, false, err }
}
return k, done, nil
case *mdag.RawNode:
origData := nd0.RawData()
bytes := make([]byte, len(origData))
// Hey look! we're done! // copy orig data up to offset
var done bool copy(bytes, origData[:offset])
if n < len(f.Data[offset:]) {
done = true // copy in new data
n, err := data.Read(bytes[offset:])
if err != nil && err != io.EOF {
return nil, false, err
}
// copy remaining data
offsetPlusN := int(offset) + n
if offsetPlusN < len(origData) {
copy(bytes[offsetPlusN:], origData[offsetPlusN:])
}
nd, err := mdag.NewRawNodeWPrefix(bytes, nd0.Cid().Prefix())
if err != nil {
return nil, false, err
}
k, err := dm.dagserv.Add(nd)
if err != nil {
return nil, false, err
}
// Hey look! we're done!
var done bool
if n < len(bytes[offset:]) {
done = true
}
return k, done, nil
} }
}
return k, done, nil node, ok := n.(*mdag.ProtoNode)
if !ok {
return nil, false, ErrNotUnixfs
}
f, err := ft.FromBytes(node.Data())
if err != nil {
return nil, false, err
} }
var cur uint64 var cur uint64
...@@ -287,12 +310,7 @@ func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*c ...@@ -287,12 +310,7 @@ func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*c
return nil, false, err return nil, false, err
} }
childpb, ok := child.(*mdag.ProtoNode) k, sdone, err := dm.modifyDag(child, offset-cur, data)
if !ok {
return nil, false, mdag.ErrNotProtobuf
}
k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
...@@ -323,14 +341,13 @@ func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*c ...@@ -323,14 +341,13 @@ func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*c
// appendData appends the blocks from the given chan to the end of this dag // appendData appends the blocks from the given chan to the end of this dag
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) { func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {
switch nd := nd.(type) { switch nd := nd.(type) {
case *mdag.ProtoNode: case *mdag.ProtoNode, *mdag.RawNode:
dbp := &help.DagBuilderParams{ dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv, Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock, Maxlinks: help.DefaultLinksPerBlock,
RawLeaves: dm.RawLeaves,
} }
return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl)) return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl))
case *mdag.RawNode:
return nil, fmt.Errorf("appending to raw node types not yet supported")
default: default:
return nil, ErrNotUnixfs return nil, ErrNotUnixfs
} }
...@@ -478,26 +495,30 @@ func (dm *DagModifier) Truncate(size int64) error { ...@@ -478,26 +495,30 @@ func (dm *DagModifier) Truncate(size int64) error {
} }
// dagTruncate truncates the given node to 'size' and returns the modified Node // dagTruncate truncates the given node to 'size' and returns the modified Node
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) { func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (node.Node, error) {
nd, ok := n.(*mdag.ProtoNode) if len(n.Links()) == 0 {
if !ok { switch nd := n.(type) {
return nil, ErrNoRawYet case *mdag.ProtoNode:
} // TODO: this can likely be done without marshaling and remarshaling
pbn, err := ft.FromBytes(nd.Data())
if len(nd.Links()) == 0 { if err != nil {
// TODO: this can likely be done without marshaling and remarshaling return nil, err
pbn, err := ft.FromBytes(nd.Data()) }
if err != nil { nd.SetData(ft.WrapData(pbn.Data[:size]))
return nil, err return nd, nil
case *mdag.RawNode:
return mdag.NewRawNodeWPrefix(nd.RawData()[:size], nd.Cid().Prefix())
} }
}
nd.SetData(ft.WrapData(pbn.Data[:size])) nd, ok := n.(*mdag.ProtoNode)
return nd, nil if !ok {
return nil, ErrNotUnixfs
} }
var cur uint64 var cur uint64
end := 0 end := 0
var modified *mdag.ProtoNode var modified node.Node
ndata := new(ft.FSNode) ndata := new(ft.FSNode)
for i, lnk := range nd.Links() { for i, lnk := range nd.Links() {
child, err := lnk.GetNode(ctx, ds) child, err := lnk.GetNode(ctx, ds)
...@@ -505,19 +526,14 @@ func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGServi ...@@ -505,19 +526,14 @@ func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGServi
return nil, err return nil, err
} }
childpb, ok := child.(*mdag.ProtoNode) childsize, err := fileSize(child)
if !ok {
return nil, err
}
childsize, err := ft.DataSize(childpb.Data())
if err != nil { if err != nil {
return nil, err return nil, err
} }
// found the child we want to cut // found the child we want to cut
if size < cur+childsize { if size < cur+childsize {
nchild, err := dagTruncate(ctx, childpb, size-cur, ds) nchild, err := dagTruncate(ctx, child, size-cur, ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -8,8 +8,9 @@ import ( ...@@ -8,8 +8,9 @@ import (
"io/ioutil" "io/ioutil"
"testing" "testing"
imp "github.com/ipfs/go-ipfs/importer"
"github.com/ipfs/go-ipfs/importer/chunk" "github.com/ipfs/go-ipfs/importer/chunk"
h "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
mdag "github.com/ipfs/go-ipfs/merkledag" mdag "github.com/ipfs/go-ipfs/merkledag"
mdagmock "github.com/ipfs/go-ipfs/merkledag/test" mdagmock "github.com/ipfs/go-ipfs/merkledag/test"
ft "github.com/ipfs/go-ipfs/unixfs" ft "github.com/ipfs/go-ipfs/unixfs"
...@@ -28,9 +29,23 @@ func GetDAGServ() mdag.DAGService { ...@@ -28,9 +29,23 @@ func GetDAGServ() mdag.DAGService {
return mdagmock.Mock() return mdagmock.Mock()
} }
func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) node.Node { type UseRawLeaves bool
const (
ProtoBufLeaves UseRawLeaves = false
RawLeaves UseRawLeaves = true
)
func GetNode(t testing.TB, dserv mdag.DAGService, data []byte, rawLeaves UseRawLeaves) node.Node {
in := bytes.NewReader(data) in := bytes.NewReader(data)
node, err := imp.BuildTrickleDagFromReader(dserv, SizeSplitterGen(500)(in))
dbp := h.DagBuilderParams{
Dagserv: dserv,
Maxlinks: h.DefaultLinksPerBlock,
RawLeaves: bool(rawLeaves),
}
node, err := trickle.TrickleLayout(dbp.New(SizeSplitterGen(500)(in)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -38,18 +53,18 @@ func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) node.Node { ...@@ -38,18 +53,18 @@ func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) node.Node {
return node return node
} }
func GetEmptyNode(t testing.TB, dserv mdag.DAGService) node.Node { func GetEmptyNode(t testing.TB, dserv mdag.DAGService, rawLeaves UseRawLeaves) node.Node {
return GetNode(t, dserv, []byte{}) return GetNode(t, dserv, []byte{}, rawLeaves)
} }
func GetRandomNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, node.Node) { func GetRandomNode(t testing.TB, dserv mdag.DAGService, size int64, rawLeaves UseRawLeaves) ([]byte, node.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size) in := io.LimitReader(u.NewTimeSeededRand(), size)
buf, err := ioutil.ReadAll(in) buf, err := ioutil.ReadAll(in)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
node := GetNode(t, dserv, buf) node := GetNode(t, dserv, buf, rawLeaves)
return buf, node return buf, node
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论