提交 ca4271b7 作者: Hector Sanjuan

Golint: unixfs/io

License: MIT
Signed-off-by: 's avatarHector Sanjuan <hector@protocol.ai>
上级 f074292e
...@@ -10,7 +10,9 @@ type bufDagReader struct { ...@@ -10,7 +10,9 @@ type bufDagReader struct {
*bytes.Reader *bytes.Reader
} }
func NewBufDagReader(b []byte) *bufDagReader { // newBufDagReader returns a DAG reader for the given byte slice.
// BufDagReader is used to read RawNodes.
func newBufDagReader(b []byte) *bufDagReader {
return &bufDagReader{bytes.NewReader(b)} return &bufDagReader{bytes.NewReader(b)}
} }
......
...@@ -14,10 +14,15 @@ import ( ...@@ -14,10 +14,15 @@ import (
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
) )
var ErrIsDir = errors.New("this dag node is a directory") // Common errors
var (
var ErrCantReadSymlinks = errors.New("cannot currently read symlinks") ErrIsDir = errors.New("this dag node is a directory")
ErrCantReadSymlinks = errors.New("cannot currently read symlinks")
)
// A DagReader represents a ReadSeekCloser which offers additional methods
// like Size. Different implementations of readers are used for the different
// types of unixfs/protobuf-encoded nodes.
type DagReader interface { type DagReader interface {
ReadSeekCloser ReadSeekCloser
Size() uint64 Size() uint64
...@@ -25,6 +30,7 @@ type DagReader interface { ...@@ -25,6 +30,7 @@ type DagReader interface {
Offset() int64 Offset() int64
} }
// A ReadSeekCloser implements interfaces to read, write, seek and close.
type ReadSeekCloser interface { type ReadSeekCloser interface {
io.Reader io.Reader
io.Seeker io.Seeker
...@@ -37,7 +43,7 @@ type ReadSeekCloser interface { ...@@ -37,7 +43,7 @@ type ReadSeekCloser interface {
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) { func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) {
switch n := n.(type) { switch n := n.(type) {
case *mdag.RawNode: case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil return newBufDagReader(n.RawData()), nil
case *mdag.ProtoNode: case *mdag.ProtoNode:
pb := new(ftpb.Data) pb := new(ftpb.Data)
if err := proto.Unmarshal(n.Data(), pb); err != nil { if err := proto.Unmarshal(n.Data(), pb); err != nil {
......
...@@ -102,7 +102,7 @@ func TestSeekAndReadLarge(t *testing.T) { ...@@ -102,7 +102,7 @@ func TestSeekAndReadLarge(t *testing.T) {
t.Fatal("seeked read failed") t.Fatal("seeked read failed")
} }
pbdr := reader.(*pbDagReader) pbdr := reader.(*PBDagReader)
var count int var count int
for i, p := range pbdr.promises { for i, p := range pbdr.promises {
if i > 20 && i < 30 { if i > 20 && i < 30 {
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
mdag "github.com/ipfs/go-ipfs/merkledag" mdag "github.com/ipfs/go-ipfs/merkledag"
format "github.com/ipfs/go-ipfs/unixfs" format "github.com/ipfs/go-ipfs/unixfs"
hamt "github.com/ipfs/go-ipfs/unixfs/hamt" hamt "github.com/ipfs/go-ipfs/unixfs/hamt"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
) )
...@@ -25,11 +25,14 @@ var UseHAMTSharding = false ...@@ -25,11 +25,14 @@ var UseHAMTSharding = false
// DefaultShardWidth is the default value used for hamt sharding width. // DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256 var DefaultShardWidth = 256
// Directory allows to work with UnixFS directory nodes, adding and removing
// children. It allows to work with different directory schemes,
// like the classic or the HAMT one.
type Directory struct { type Directory struct {
dserv ipld.DAGService dserv ipld.DAGService
dirnode *mdag.ProtoNode dirnode *mdag.ProtoNode
shard *hamt.HamtShard shard *hamt.Shard
} }
// NewDirectory returns a Directory. It needs a DAGService to add the Children // NewDirectory returns a Directory. It needs a DAGService to add the Children
...@@ -37,7 +40,7 @@ func NewDirectory(dserv ipld.DAGService) *Directory { ...@@ -37,7 +40,7 @@ func NewDirectory(dserv ipld.DAGService) *Directory {
db := new(Directory) db := new(Directory)
db.dserv = dserv db.dserv = dserv
if UseHAMTSharding { if UseHAMTSharding {
s, err := hamt.NewHamtShard(dserv, DefaultShardWidth) s, err := hamt.NewShard(dserv, DefaultShardWidth)
if err != nil { if err != nil {
panic(err) // will only panic if DefaultShardWidth is a bad value panic(err) // will only panic if DefaultShardWidth is a bad value
} }
...@@ -113,7 +116,7 @@ func (d *Directory) AddChild(ctx context.Context, name string, nd ipld.Node) err ...@@ -113,7 +116,7 @@ func (d *Directory) AddChild(ctx context.Context, name string, nd ipld.Node) err
} }
func (d *Directory) switchToSharding(ctx context.Context) error { func (d *Directory) switchToSharding(ctx context.Context) error {
s, err := hamt.NewHamtShard(d.dserv, DefaultShardWidth) s, err := hamt.NewShard(d.dserv, DefaultShardWidth)
if err != nil { if err != nil {
return err return err
} }
...@@ -136,6 +139,7 @@ func (d *Directory) switchToSharding(ctx context.Context) error { ...@@ -136,6 +139,7 @@ func (d *Directory) switchToSharding(ctx context.Context) error {
return nil return nil
} }
// ForEachLink applies the given function to Links in the directory.
func (d *Directory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error { func (d *Directory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
if d.shard == nil { if d.shard == nil {
for _, l := range d.dirnode.Links() { for _, l := range d.dirnode.Links() {
...@@ -149,6 +153,7 @@ func (d *Directory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) e ...@@ -149,6 +153,7 @@ func (d *Directory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) e
return d.shard.ForEachLink(ctx, f) return d.shard.ForEachLink(ctx, f)
} }
// Links returns the all the links in the directory node.
func (d *Directory) Links(ctx context.Context) ([]*ipld.Link, error) { func (d *Directory) Links(ctx context.Context) ([]*ipld.Link, error) {
if d.shard == nil { if d.shard == nil {
return d.dirnode.Links(), nil return d.dirnode.Links(), nil
...@@ -157,6 +162,9 @@ func (d *Directory) Links(ctx context.Context) ([]*ipld.Link, error) { ...@@ -157,6 +162,9 @@ func (d *Directory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.shard.EnumLinks(ctx) return d.shard.EnumLinks(ctx)
} }
// Find returns the ipld.Node with the given name, if it is contained in this
// directory. Find only searches in the most inmediate links, and not
// recursively in the tree.
func (d *Directory) Find(ctx context.Context, name string) (ipld.Node, error) { func (d *Directory) Find(ctx context.Context, name string) (ipld.Node, error) {
if d.shard == nil { if d.shard == nil {
lnk, err := d.dirnode.GetNodeLink(name) lnk, err := d.dirnode.GetNodeLink(name)
...@@ -179,6 +187,7 @@ func (d *Directory) Find(ctx context.Context, name string) (ipld.Node, error) { ...@@ -179,6 +187,7 @@ func (d *Directory) Find(ctx context.Context, name string) (ipld.Node, error) {
return lnk.GetNode(ctx, d.dserv) return lnk.GetNode(ctx, d.dserv)
} }
// RemoveChild removes the child with the given name.
func (d *Directory) RemoveChild(ctx context.Context, name string) error { func (d *Directory) RemoveChild(ctx context.Context, name string) error {
if d.shard == nil { if d.shard == nil {
return d.dirnode.RemoveNodeLink(name) return d.dirnode.RemoveNodeLink(name)
......
// package unixfs/io implements convenience objects for working with the ipfs // Package io implements convenience objects for working with the ipfs
// unixfs data format. // unixfs data format.
package io package io
...@@ -15,8 +15,8 @@ import ( ...@@ -15,8 +15,8 @@ import (
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
) )
// DagReader provides a way to easily read the data contained in a dag. // PBDagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct { type PBDagReader struct {
serv ipld.NodeGetter serv ipld.NodeGetter
// the node being read // the node being read
...@@ -48,16 +48,16 @@ type pbDagReader struct { ...@@ -48,16 +48,16 @@ type pbDagReader struct {
cancel func() cancel func()
} }
var _ DagReader = (*pbDagReader)(nil) var _ DagReader = (*PBDagReader)(nil)
// NewPBFileReader constructs a new PBFileReader. // NewPBFileReader constructs a new PBFileReader.
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *pbDagReader { func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *PBDagReader {
fctx, cancel := context.WithCancel(ctx) fctx, cancel := context.WithCancel(ctx)
curLinks := getLinkCids(n) curLinks := getLinkCids(n)
return &pbDagReader{ return &PBDagReader{
node: n, node: n,
serv: serv, serv: serv,
buf: NewBufDagReader(pb.GetData()), buf: newBufDagReader(pb.GetData()),
promises: make([]*ipld.NodePromise, len(curLinks)), promises: make([]*ipld.NodePromise, len(curLinks)),
links: curLinks, links: curLinks,
ctx: fctx, ctx: fctx,
...@@ -68,7 +68,7 @@ func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ...@@ -68,7 +68,7 @@ func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv
const preloadSize = 10 const preloadSize = 10
func (dr *pbDagReader) preloadNextNodes(ctx context.Context) { func (dr *PBDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition beg := dr.linkPosition
end := beg + preloadSize end := beg + preloadSize
if end >= len(dr.links) { if end >= len(dr.links) {
...@@ -82,7 +82,7 @@ func (dr *pbDagReader) preloadNextNodes(ctx context.Context) { ...@@ -82,7 +82,7 @@ func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
// precalcNextBuf follows the next link in line and loads it from the // precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from // DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error { func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
if dr.buf != nil { if dr.buf != nil {
dr.buf.Close() // Just to make sure dr.buf.Close() // Just to make sure
dr.buf = nil dr.buf = nil
...@@ -119,7 +119,7 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error { ...@@ -119,7 +119,7 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv) dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv)
return nil return nil
case ftpb.Data_Raw: case ftpb.Data_Raw:
dr.buf = NewBufDagReader(pb.GetData()) dr.buf = newBufDagReader(pb.GetData())
return nil return nil
case ftpb.Data_Metadata: case ftpb.Data_Metadata:
return errors.New("shouldnt have had metadata object inside file") return errors.New("shouldnt have had metadata object inside file")
...@@ -145,17 +145,17 @@ func getLinkCids(n ipld.Node) []*cid.Cid { ...@@ -145,17 +145,17 @@ func getLinkCids(n ipld.Node) []*cid.Cid {
} }
// Size return the total length of the data from the DAG structured file. // Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 { func (dr *PBDagReader) Size() uint64 {
return dr.pbdata.GetFilesize() return dr.pbdata.GetFilesize()
} }
// Read reads data from the DAG structured file // Read reads data from the DAG structured file
func (dr *pbDagReader) Read(b []byte) (int, error) { func (dr *PBDagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b) return dr.CtxReadFull(dr.ctx, b)
} }
// CtxReadFull reads data from the DAG structured file // CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { func (dr *PBDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if dr.buf == nil { if dr.buf == nil {
if err := dr.precalcNextBuf(ctx); err != nil { if err := dr.precalcNextBuf(ctx); err != nil {
return 0, err return 0, err
...@@ -189,7 +189,8 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { ...@@ -189,7 +189,8 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
} }
} }
func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) { // WriteTo writes to the given writer.
func (dr *PBDagReader) WriteTo(w io.Writer) (int64, error) {
if dr.buf == nil { if dr.buf == nil {
if err := dr.precalcNextBuf(dr.ctx); err != nil { if err := dr.precalcNextBuf(dr.ctx); err != nil {
return 0, err return 0, err
...@@ -220,12 +221,14 @@ func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) { ...@@ -220,12 +221,14 @@ func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
} }
} }
func (dr *pbDagReader) Close() error { // Close closes the reader.
func (dr *PBDagReader) Close() error {
dr.cancel() dr.cancel()
return nil return nil
} }
func (dr *pbDagReader) Offset() int64 { // Offset returns the current reader offset
func (dr *PBDagReader) Offset() int64 {
return dr.offset return dr.offset
} }
...@@ -233,7 +236,7 @@ func (dr *pbDagReader) Offset() int64 { ...@@ -233,7 +236,7 @@ func (dr *pbDagReader) Offset() int64 {
// interface matches standard unix seek // interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader // TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen. // recreations that need to happen.
func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) { func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
switch whence { switch whence {
case io.SeekStart: case io.SeekStart:
if offset < 0 { if offset < 0 {
...@@ -253,16 +256,16 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) { ...@@ -253,16 +256,16 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
if dr.buf != nil { if dr.buf != nil {
dr.buf.Close() dr.buf.Close()
} }
dr.buf = NewBufDagReader(pb.GetData()[offset:]) dr.buf = newBufDagReader(pb.GetData()[offset:])
// start reading links from the beginning // start reading links from the beginning
dr.linkPosition = 0 dr.linkPosition = 0
dr.offset = offset dr.offset = offset
return offset, nil return offset, nil
} else { }
// skip past root block data // skip past root block data
left -= int64(len(pb.Data)) left -= int64(len(pb.Data))
}
// iterate through links and find where we need to be // iterate through links and find where we need to be
for i := 0; i < len(pb.Blocksizes); i++ { for i := 0; i < len(pb.Blocksizes); i++ {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论