提交 acb9e231 作者: Lucas Molas

pbdagreader: use FSNode instead of protobuf structure

Focus on the UnixFS layer and avoid explicit references to protocol buffers
format (used to serialize objects of that layer). Use the `unixfs.FSNode`
structure which it abstracts from the `unixfs.pb.Data` format.

Replace `PBDagReader` field `ftpb.Data` with `ft.FSNode`, renaming it to `file`
(which is the type of UnixFS object represented in the reader) and changing its
comment removing the "cached" reference, as this structure is not used here as a
cache (`PBDagReader` doesn't modify the DAG, it's read-only). Also, removed
unused `ProtoNode` field to avoid confusions, as it would normally be present if
the `FSNode` was in fact used as a cache of the contents of the `ProtoNode`.

An example of the advantage of shifting the focus from the format to the UnixFS
layer is dropping the of use `len(pb.Blocksizes)` in favor of the more clear
`NumChildren()` abstraction.

Added `BlockSize()` accessor.

License: MIT
Signed-off-by: 's avatarLucas Molas <schomatis@gmail.com>
上级 7e8f6c96
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
upb "github.com/ipfs/go-ipfs/unixfs/pb" upb "github.com/ipfs/go-ipfs/unixfs/pb"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
) )
// Writer is a utility structure that helps to write // Writer is a utility structure that helps to write
...@@ -57,12 +56,12 @@ func (w *Writer) writeDir(nd *mdag.ProtoNode, fpath string) error { ...@@ -57,12 +56,12 @@ func (w *Writer) writeDir(nd *mdag.ProtoNode, fpath string) error {
}) })
} }
func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error { func (w *Writer) writeFile(nd *mdag.ProtoNode, fsNode *ft.FSNode, fpath string) error {
if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil { if err := writeFileHeader(w.TarW, fpath, fsNode.FileSize()); err != nil {
return err return err
} }
dagr := uio.NewPBFileReader(w.ctx, nd, pb, w.Dag) dagr := uio.NewPBFileReader(w.ctx, nd, fsNode, w.Dag)
if _, err := dagr.WriteTo(w.TarW); err != nil { if _, err := dagr.WriteTo(w.TarW); err != nil {
return err return err
} }
...@@ -74,12 +73,12 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error ...@@ -74,12 +73,12 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error
func (w *Writer) WriteNode(nd ipld.Node, fpath string) error { func (w *Writer) WriteNode(nd ipld.Node, fpath string) error {
switch nd := nd.(type) { switch nd := nd.(type) {
case *mdag.ProtoNode: case *mdag.ProtoNode:
pb := new(upb.Data) fsNode, err := ft.FSNodeFromBytes(nd.Data())
if err := proto.Unmarshal(nd.Data(), pb); err != nil { if err != nil {
return err return err
} }
switch pb.GetType() { switch fsNode.GetType() {
case upb.Data_Metadata: case upb.Data_Metadata:
fallthrough fallthrough
case upb.Data_Directory, upb.Data_HAMTShard: case upb.Data_Directory, upb.Data_HAMTShard:
...@@ -87,9 +86,9 @@ func (w *Writer) WriteNode(nd ipld.Node, fpath string) error { ...@@ -87,9 +86,9 @@ func (w *Writer) WriteNode(nd ipld.Node, fpath string) error {
case upb.Data_Raw: case upb.Data_Raw:
fallthrough fallthrough
case upb.Data_File: case upb.Data_File:
return w.writeFile(nd, pb, fpath) return w.writeFile(nd, fsNode, fpath)
case upb.Data_Symlink: case upb.Data_Symlink:
return writeSymlinkHeader(w.TarW, string(pb.GetData()), fpath) return writeSymlinkHeader(w.TarW, string(fsNode.GetData()), fpath)
default: default:
return ft.ErrUnrecognizedType return ft.ErrUnrecognizedType
} }
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
ftpb "github.com/ipfs/go-ipfs/unixfs/pb" ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
) )
// Common errors // Common errors
...@@ -45,17 +44,17 @@ func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagRe ...@@ -45,17 +44,17 @@ func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagRe
case *mdag.RawNode: case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil return NewBufDagReader(n.RawData()), nil
case *mdag.ProtoNode: case *mdag.ProtoNode:
pb := new(ftpb.Data) fsNode, err := ft.FSNodeFromBytes(n.Data())
if err := proto.Unmarshal(n.Data(), pb); err != nil { if err != nil {
return nil, err return nil, err
} }
switch pb.GetType() { switch fsNode.GetType() {
case ftpb.Data_Directory, ftpb.Data_HAMTShard: case ftpb.Data_Directory, ftpb.Data_HAMTShard:
// Dont allow reading directories // Dont allow reading directories
return nil, ErrIsDir return nil, ErrIsDir
case ftpb.Data_File, ftpb.Data_Raw: case ftpb.Data_File, ftpb.Data_Raw:
return NewPBFileReader(ctx, n, pb, serv), nil return NewPBFileReader(ctx, n, fsNode, serv), nil
case ftpb.Data_Metadata: case ftpb.Data_Metadata:
if len(n.Links()) == 0 { if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object") return nil, errors.New("incorrectly formatted metadata object")
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
ftpb "github.com/ipfs/go-ipfs/unixfs/pb" ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
) )
...@@ -19,11 +18,8 @@ import ( ...@@ -19,11 +18,8 @@ import (
type PBDagReader struct { type PBDagReader struct {
serv ipld.NodeGetter serv ipld.NodeGetter
// the node being read // UnixFS file (it should be of type `Data_File` or `Data_Raw` only).
node *mdag.ProtoNode file *ft.FSNode
// cached protobuf structure from node.Data
pbdata *ftpb.Data
// the current data buffer to be read from // the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader // will either be a bytes.Reader or a child DagReader
...@@ -51,18 +47,17 @@ type PBDagReader struct { ...@@ -51,18 +47,17 @@ type PBDagReader struct {
var _ DagReader = (*PBDagReader)(nil) var _ DagReader = (*PBDagReader)(nil)
// NewPBFileReader constructs a new PBFileReader. // NewPBFileReader constructs a new PBFileReader.
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *PBDagReader { func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, file *ft.FSNode, serv ipld.NodeGetter) *PBDagReader {
fctx, cancel := context.WithCancel(ctx) fctx, cancel := context.WithCancel(ctx)
curLinks := getLinkCids(n) curLinks := getLinkCids(n)
return &PBDagReader{ return &PBDagReader{
node: n,
serv: serv, serv: serv,
buf: NewBufDagReader(pb.GetData()), buf: NewBufDagReader(file.GetData()),
promises: make([]*ipld.NodePromise, len(curLinks)), promises: make([]*ipld.NodePromise, len(curLinks)),
links: curLinks, links: curLinks,
ctx: fctx, ctx: fctx,
cancel: cancel, cancel: cancel,
pbdata: pb, file: file,
} }
} }
...@@ -105,21 +100,20 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error { ...@@ -105,21 +100,20 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
switch nxt := nxt.(type) { switch nxt := nxt.(type) {
case *mdag.ProtoNode: case *mdag.ProtoNode:
pb := new(ftpb.Data) fsNode, err := ft.FSNodeFromBytes(nxt.Data())
err = proto.Unmarshal(nxt.Data(), pb)
if err != nil { if err != nil {
return fmt.Errorf("incorrectly formatted protobuf: %s", err) return fmt.Errorf("incorrectly formatted protobuf: %s", err)
} }
switch pb.GetType() { switch fsNode.GetType() {
case ftpb.Data_Directory, ftpb.Data_HAMTShard: case ftpb.Data_Directory, ftpb.Data_HAMTShard:
// A directory should not exist within a file // A directory should not exist within a file
return ft.ErrInvalidDirLocation return ft.ErrInvalidDirLocation
case ftpb.Data_File: case ftpb.Data_File:
dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv) dr.buf = NewPBFileReader(dr.ctx, nxt, fsNode, dr.serv)
return nil return nil
case ftpb.Data_Raw: case ftpb.Data_Raw:
dr.buf = NewBufDagReader(pb.GetData()) dr.buf = NewBufDagReader(fsNode.GetData())
return nil return nil
case ftpb.Data_Metadata: case ftpb.Data_Metadata:
return errors.New("shouldnt have had metadata object inside file") return errors.New("shouldnt have had metadata object inside file")
...@@ -146,7 +140,7 @@ func getLinkCids(n ipld.Node) []*cid.Cid { ...@@ -146,7 +140,7 @@ func getLinkCids(n ipld.Node) []*cid.Cid {
// Size return the total length of the data from the DAG structured file. // Size return the total length of the data from the DAG structured file.
func (dr *PBDagReader) Size() uint64 { func (dr *PBDagReader) Size() uint64 {
return dr.pbdata.GetFilesize() return dr.file.FileSize()
} }
// Read reads data from the DAG structured file // Read reads data from the DAG structured file
...@@ -244,17 +238,14 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) { ...@@ -244,17 +238,14 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
return offset, nil return offset, nil
} }
// Grab cached protobuf object (solely to make code look cleaner)
pb := dr.pbdata
// left represents the number of bytes remaining to seek to (from beginning) // left represents the number of bytes remaining to seek to (from beginning)
left := offset left := offset
if int64(len(pb.Data)) >= offset { if int64(len(dr.file.GetData())) >= offset {
// Close current buf to close potential child dagreader // Close current buf to close potential child dagreader
if dr.buf != nil { if dr.buf != nil {
dr.buf.Close() dr.buf.Close()
} }
dr.buf = NewBufDagReader(pb.GetData()[offset:]) dr.buf = NewBufDagReader(dr.file.GetData()[offset:])
// start reading links from the beginning // start reading links from the beginning
dr.linkPosition = 0 dr.linkPosition = 0
...@@ -263,15 +254,15 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) { ...@@ -263,15 +254,15 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
} }
// skip past root block data // skip past root block data
left -= int64(len(pb.Data)) left -= int64(len(dr.file.GetData()))
// iterate through links and find where we need to be // iterate through links and find where we need to be
for i := 0; i < len(pb.Blocksizes); i++ { for i := 0; i < dr.file.NumChildren(); i++ {
if pb.Blocksizes[i] > uint64(left) { if dr.file.BlockSize(i) > uint64(left) {
dr.linkPosition = i dr.linkPosition = i
break break
} else { } else {
left -= int64(pb.Blocksizes[i]) left -= int64(dr.file.BlockSize(i))
} }
} }
...@@ -303,14 +294,14 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) { ...@@ -303,14 +294,14 @@ func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
noffset := dr.offset + offset noffset := dr.offset + offset
return dr.Seek(noffset, io.SeekStart) return dr.Seek(noffset, io.SeekStart)
case io.SeekEnd: case io.SeekEnd:
noffset := int64(dr.pbdata.GetFilesize()) - offset noffset := int64(dr.file.FileSize()) - offset
n, err := dr.Seek(noffset, io.SeekStart) n, err := dr.Seek(noffset, io.SeekStart)
// Return negative number if we can't figure out the file size. Using io.EOF // Return negative number if we can't figure out the file size. Using io.EOF
// for this seems to be good(-enough) solution as it's only returned by // for this seems to be good(-enough) solution as it's only returned by
// precalcNextBuf when we step out of file range. // precalcNextBuf when we step out of file range.
// This is needed for gateway to function properly // This is needed for gateway to function properly
if err == io.EOF && *dr.pbdata.Type == ftpb.Data_File { if err == io.EOF && dr.file.GetType() == ftpb.Data_File {
return -1, nil return -1, nil
} }
return n, err return n, err
......
...@@ -195,6 +195,12 @@ func (n *FSNode) RemoveBlockSize(i int) { ...@@ -195,6 +195,12 @@ func (n *FSNode) RemoveBlockSize(i int) {
n.format.Blocksizes = append(n.format.Blocksizes[:i], n.format.Blocksizes[i+1:]...) n.format.Blocksizes = append(n.format.Blocksizes[:i], n.format.Blocksizes[i+1:]...)
} }
// BlockSize returns the block size indexed by `i`.
// TODO: Evaluate if this function should be bounds checking.
func (n *FSNode) BlockSize(i int) uint64 {
return n.format.Blocksizes[i]
}
// GetBytes marshals this node as a protobuf message. // GetBytes marshals this node as a protobuf message.
func (n *FSNode) GetBytes() ([]byte, error) { func (n *FSNode) GetBytes() ([]byte, error) {
return proto.Marshal(&n.format) return proto.Marshal(&n.format)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论