提交 5d90aa2a 作者: Hector Sanjuan

Docs: golint-ify "importers" module

This fixes all golint warnings in the importers module, adding
documentation and module descriptions.

License: MIT
Signed-off-by: 's avatarHector Sanjuan <hector@protocol.ai>
上级 3f2c7746
...@@ -148,10 +148,10 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) { ...@@ -148,10 +148,10 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
} }
if adder.Trickle { if adder.Trickle {
return trickle.TrickleLayout(params.New(chnk)) return trickle.Layout(params.New(chnk))
} }
return balanced.BalancedLayout(params.New(chnk)) return balanced.Layout(params.New(chnk))
} }
// RootNode returns the root node of the Added. // RootNode returns the root node of the Added.
......
// Package balanced provides methods to build balanced DAGs.
// In a balanced DAG, nodes are added to a single root
// until the maximum number of links is reached (with leaves
// being at depth 0). Then, a new root is created, and points to the
// old root, and incorporates a new child, which proceeds to be
// filled up (link) to more leaves. In all cases, the Data (chunks)
// is stored only at the leaves, with the rest of nodes only
// storing links to their children.
//
// In a balanced DAG, nodes fill their link capacity before
// creating new ones, thus depth only increases when the
// current tree is completely full.
//
// Balanced DAGs are generalistic DAGs in which all leaves
// are at the same distance from the root.
package balanced package balanced
import ( import (
...@@ -8,8 +23,11 @@ import ( ...@@ -8,8 +23,11 @@ import (
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
) )
func BalancedLayout(db *h.DagBuilderHelper) (ipld.Node, error) { // Layout builds a balanced DAG. Data is stored at the leaves
var offset uint64 = 0 // and depth only increases when the tree is full, that is, when
// the root node has reached the maximum number of links.
func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
var offset uint64
var root *h.UnixfsNode var root *h.UnixfsNode
for level := 0; !db.Done(); level++ { for level := 0; !db.Done(); level++ {
......
...@@ -8,6 +8,9 @@ import ( ...@@ -8,6 +8,9 @@ import (
"strings" "strings"
) )
// FromString returns a Splitter depending on the given string:
// it supports "default" (""), "size-{size}", "rabin", "rabin-{blocksize}" and
// "rabin-{min}-{avg}-{max}".
func FromString(r io.Reader, chunker string) (Splitter, error) { func FromString(r io.Reader, chunker string) (Splitter, error) {
switch { switch {
case chunker == "" || chunker == "default": case chunker == "" || chunker == "default":
......
...@@ -7,13 +7,18 @@ import ( ...@@ -7,13 +7,18 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker"
) )
// IpfsRabinPoly is the irreducible polynomial of degree 53 used by for Rabin.
var IpfsRabinPoly = chunker.Pol(17437180132763653) var IpfsRabinPoly = chunker.Pol(17437180132763653)
// Rabin implements the Splitter interface and splits content with Rabin
// fingerprints.
type Rabin struct { type Rabin struct {
r *chunker.Chunker r *chunker.Chunker
reader io.Reader reader io.Reader
} }
// NewRabin creates a new Rabin splitter with the given
// average block size.
func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
min := avgBlkSize / 3 min := avgBlkSize / 3
max := avgBlkSize + (avgBlkSize / 2) max := avgBlkSize + (avgBlkSize / 2)
...@@ -21,6 +26,8 @@ func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { ...@@ -21,6 +26,8 @@ func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
return NewRabinMinMax(r, min, avgBlkSize, max) return NewRabinMinMax(r, min, avgBlkSize, max)
} }
// NewRabinMinMax returns a new Rabin splitter which uses
// the given min, average and max block sizes.
func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
h := fnv.New32a() h := fnv.New32a()
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max) ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)
...@@ -31,6 +38,7 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { ...@@ -31,6 +38,7 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
} }
} }
// NextBytes reads the next bytes from the reader and returns a slice.
func (r *Rabin) NextBytes() ([]byte, error) { func (r *Rabin) NextBytes() ([]byte, error) {
ch, err := r.r.Next() ch, err := r.r.Next()
if err != nil { if err != nil {
...@@ -40,6 +48,7 @@ func (r *Rabin) NextBytes() ([]byte, error) { ...@@ -40,6 +48,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {
return ch.Data, nil return ch.Data, nil
} }
// Reader returns the io.Reader associated to this Splitter.
func (r *Rabin) Reader() io.Reader { func (r *Rabin) Reader() io.Reader {
return r.reader return r.reader
} }
...@@ -68,7 +68,7 @@ func TestRabinChunkReuse(t *testing.T) { ...@@ -68,7 +68,7 @@ func TestRabinChunkReuse(t *testing.T) {
ch2 := chunkData(t, data) ch2 := chunkData(t, data)
var extra int var extra int
for k, _ := range ch2 { for k := range ch2 {
_, ok := ch1[k] _, ok := ch1[k]
if !ok { if !ok {
extra++ extra++
......
// package chunk implements streaming block splitters // Package chunk implements streaming block splitters.
// Splitters read data from a reader and provide byte slices (chunks)
// The size and contents of these slices depend on the splitting method
// used.
package chunk package chunk
import ( import (
...@@ -10,25 +13,34 @@ import ( ...@@ -10,25 +13,34 @@ import (
var log = logging.Logger("chunk") var log = logging.Logger("chunk")
// DefaultBlockSize is the chunk size that splitters produce (or aim to).
var DefaultBlockSize int64 = 1024 * 256 var DefaultBlockSize int64 = 1024 * 256
// A Splitter reads bytes from a Reader and creates "chunks" (byte slices)
// that can be used to build DAG nodes.
type Splitter interface { type Splitter interface {
Reader() io.Reader Reader() io.Reader
NextBytes() ([]byte, error) NextBytes() ([]byte, error)
} }
// SplitterGen is a splitter generator, given a reader.
type SplitterGen func(r io.Reader) Splitter type SplitterGen func(r io.Reader) Splitter
// DefaultSplitter returns a SizeSplitter with the DefaultBlockSize.
func DefaultSplitter(r io.Reader) Splitter { func DefaultSplitter(r io.Reader) Splitter {
return NewSizeSplitter(r, DefaultBlockSize) return NewSizeSplitter(r, DefaultBlockSize)
} }
// SizeSplitterGen returns a SplitterGen function which will create
// a splitter with the given size when called.
func SizeSplitterGen(size int64) SplitterGen { func SizeSplitterGen(size int64) SplitterGen {
return func(r io.Reader) Splitter { return func(r io.Reader) Splitter {
return NewSizeSplitter(r, size) return NewSizeSplitter(r, size)
} }
} }
// Chan returns a channel that receives each of the chunks produced
// by a splitter, along with another one for errors.
func Chan(s Splitter) (<-chan []byte, <-chan error) { func Chan(s Splitter) (<-chan []byte, <-chan error) {
out := make(chan []byte) out := make(chan []byte)
errs := make(chan error, 1) errs := make(chan error, 1)
...@@ -56,6 +68,7 @@ type sizeSplitterv2 struct { ...@@ -56,6 +68,7 @@ type sizeSplitterv2 struct {
err error err error
} }
// NewSizeSplitter returns a new size-based Splitter with the given block size.
func NewSizeSplitter(r io.Reader, size int64) Splitter { func NewSizeSplitter(r io.Reader, size int64) Splitter {
return &sizeSplitterv2{ return &sizeSplitterv2{
r: r, r: r,
...@@ -63,6 +76,7 @@ func NewSizeSplitter(r io.Reader, size int64) Splitter { ...@@ -63,6 +76,7 @@ func NewSizeSplitter(r io.Reader, size int64) Splitter {
} }
} }
// NextBytes produces a new chunk.
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
if ss.err != nil { if ss.err != nil {
return nil, ss.err return nil, ss.err
...@@ -85,6 +99,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { ...@@ -85,6 +99,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
} }
} }
// Reader returns the io.Reader associated to this Splitter.
func (ss *sizeSplitterv2) Reader() io.Reader { func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r return ss.r
} }
...@@ -29,6 +29,8 @@ type DagBuilderHelper struct { ...@@ -29,6 +29,8 @@ type DagBuilderHelper struct {
prefix *cid.Prefix prefix *cid.Prefix
} }
// DagBuilderParams wraps configuration options to create a DagBuilderHelper
// from a chunk.Splitter.
type DagBuilderParams struct { type DagBuilderParams struct {
// Maximum number of links per intermediate node // Maximum number of links per intermediate node
Maxlinks int Maxlinks int
...@@ -48,8 +50,8 @@ type DagBuilderParams struct { ...@@ -48,8 +50,8 @@ type DagBuilderParams struct {
NoCopy bool NoCopy bool
} }
// Generate a new DagBuilderHelper from the given params, which data source comes // New generates a new DagBuilderHelper from the given params and a given
// from chunks object // chunk.Splitter as data source.
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
db := &DagBuilderHelper{ db := &DagBuilderHelper{
dserv: dbp.Dagserv, dserv: dbp.Dagserv,
...@@ -94,16 +96,15 @@ func (db *DagBuilderHelper) Done() bool { ...@@ -94,16 +96,15 @@ func (db *DagBuilderHelper) Done() bool {
// Next returns the next chunk of data to be inserted into the dag // Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and // if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish // that the current building operation should finish.
func (db *DagBuilderHelper) Next() ([]byte, error) { func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent db.prepareNext() // idempotent
d := db.nextData d := db.nextData
db.nextData = nil // signal we've consumed it db.nextData = nil // signal we've consumed it
if db.recvdErr != nil { if db.recvdErr != nil {
return nil, db.recvdErr return nil, db.recvdErr
} else {
return d, nil
} }
return d, nil
} }
// GetDagServ returns the dagservice object this Helper is using // GetDagServ returns the dagservice object this Helper is using
...@@ -132,8 +133,7 @@ func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode { ...@@ -132,8 +133,7 @@ func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode {
} }
// FillNodeLayer will add datanodes as children to the give node until // FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize ndoes are added // at most db.indirSize nodes are added.
//
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
// while we have room AND we're not done // while we have room AND we're not done
...@@ -151,6 +151,9 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { ...@@ -151,6 +151,9 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
return nil return nil
} }
// GetNextDataNode builds a UnixFsNode with the data obtained from the
// Splitter, given the constraints (BlockSizeLimit, RawLeaves) specified
// when creating the DagBuilderHelper.
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data, err := db.Next() data, err := db.Next()
if err != nil { if err != nil {
...@@ -171,29 +174,31 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { ...@@ -171,29 +174,31 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
rawnode: dag.NewRawNode(data), rawnode: dag.NewRawNode(data),
raw: true, raw: true,
}, nil }, nil
} else {
rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix)
if err != nil {
return nil, err
}
return &UnixfsNode{
rawnode: rawnode,
raw: true,
}, nil
} }
} else { rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix)
blk := db.newUnixfsBlock() if err != nil {
blk.SetData(data) return nil, err
return blk, nil }
return &UnixfsNode{
rawnode: rawnode,
raw: true,
}, nil
} }
blk := db.newUnixfsBlock()
blk.SetData(data)
return blk, nil
} }
// SetPosInfo sets the offset information of a node using the fullpath and stat
// from the DagBuilderHelper.
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) { func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.fullPath != "" { if db.fullPath != "" {
node.SetPosInfo(offset, db.fullPath, db.stat) node.SetPosInfo(offset, db.fullPath, db.stat)
} }
} }
// Add sends a node to the DAGService, and returns it.
func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) { func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
dn, err := node.GetDagNode() dn, err := node.GetDagNode()
if err != nil { if err != nil {
...@@ -208,10 +213,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) { ...@@ -208,10 +213,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
return dn, nil return dn, nil
} }
// Maxlinks returns the configured maximum number for links
// for nodes built with this helper.
func (db *DagBuilderHelper) Maxlinks() int { func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks return db.maxlinks
} }
// Close has the DAGServce perform a batch Commit operation.
// It should be called at the end of the building process to make
// sure all data is persisted.
func (db *DagBuilderHelper) Close() error { func (db *DagBuilderHelper) Close() error {
return db.batch.Commit() return db.batch.Commit()
} }
...@@ -70,7 +70,8 @@ func (n *UnixfsNode) NumChildren() int { ...@@ -70,7 +70,8 @@ func (n *UnixfsNode) NumChildren() int {
return n.ufmt.NumChildren() return n.ufmt.NumChildren()
} }
// Set replaces this UnixfsNode with another UnixfsNode // Set replaces the current UnixfsNode with another one. It performs
// a shallow copy.
func (n *UnixfsNode) Set(other *UnixfsNode) { func (n *UnixfsNode) Set(other *UnixfsNode) {
n.node = other.node n.node = other.node
n.raw = other.raw n.raw = other.raw
...@@ -97,7 +98,7 @@ func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds ipld.DAGService) (* ...@@ -97,7 +98,7 @@ func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds ipld.DAGService) (*
// AddChild adds the given UnixfsNode as a child of the receiver. // AddChild adds the given UnixfsNode as a child of the receiver.
// The passed in DagBuilderHelper is used to store the child node an // The passed in DagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost // pin it locally so it doesnt get lost.
func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
n.ufmt.AddBlockSize(child.FileSize()) n.ufmt.AddBlockSize(child.FileSize())
...@@ -118,16 +119,20 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { ...@@ -118,16 +119,20 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
return err return err
} }
// RemoveChild removes the child node at the given index // RemoveChild deletes the child node at the given index.
func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) {
n.ufmt.RemoveBlockSize(index) n.ufmt.RemoveBlockSize(index)
n.node.SetLinks(append(n.node.Links()[:index], n.node.Links()[index+1:]...)) n.node.SetLinks(append(n.node.Links()[:index], n.node.Links()[index+1:]...))
} }
// SetData stores data in this node.
func (n *UnixfsNode) SetData(data []byte) { func (n *UnixfsNode) SetData(data []byte) {
n.ufmt.Data = data n.ufmt.Data = data
} }
// FileSize returns the total file size of this tree (including children)
// In the case of raw nodes, it returns the length of the
// raw data.
func (n *UnixfsNode) FileSize() uint64 { func (n *UnixfsNode) FileSize() uint64 {
if n.raw { if n.raw {
return uint64(len(n.rawnode.RawData())) return uint64(len(n.rawnode.RawData()))
...@@ -135,6 +140,8 @@ func (n *UnixfsNode) FileSize() uint64 { ...@@ -135,6 +140,8 @@ func (n *UnixfsNode) FileSize() uint64 {
return n.ufmt.FileSize() return n.ufmt.FileSize()
} }
// SetPosInfo sets information about the offset of the data of this node in a
// filesystem file.
func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) { func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) {
n.posInfo = &pi.PosInfo{ n.posInfo = &pi.PosInfo{
Offset: offset, Offset: offset,
...@@ -144,7 +151,7 @@ func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo ...@@ -144,7 +151,7 @@ func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo
} }
// GetDagNode fills out the proper formatting for the unixfs node // GetDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node // inside of a DAG node and returns the dag node.
func (n *UnixfsNode) GetDagNode() (ipld.Node, error) { func (n *UnixfsNode) GetDagNode() (ipld.Node, error) {
nd, err := n.getBaseDagNode() nd, err := n.getBaseDagNode()
if err != nil { if err != nil {
......
...@@ -6,17 +6,18 @@ import ( ...@@ -6,17 +6,18 @@ import (
"fmt" "fmt"
"os" "os"
"gx/ipfs/QmQp2a2Hhb7F6eK2A5hN8f9aJy4mtkEikL9Zj4cgB7d1dD/go-ipfs-cmdkit/files"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
bal "github.com/ipfs/go-ipfs/importer/balanced" bal "github.com/ipfs/go-ipfs/importer/balanced"
"github.com/ipfs/go-ipfs/importer/chunk" "github.com/ipfs/go-ipfs/importer/chunk"
h "github.com/ipfs/go-ipfs/importer/helpers" h "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle" trickle "github.com/ipfs/go-ipfs/importer/trickle"
"gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
) )
// BuildDagFromFile builds a DAG from the given file, writing created blocks to // BuildDagFromFile builds a DAG from the given file, writing created blocks to
// disk as they are created // disk as they are created.
func BuildDagFromFile(fpath string, ds ipld.DAGService) (ipld.Node, error) { func BuildDagFromFile(fpath string, ds ipld.DAGService) (ipld.Node, error) {
stat, err := os.Lstat(fpath) stat, err := os.Lstat(fpath)
if err != nil { if err != nil {
...@@ -36,23 +37,24 @@ func BuildDagFromFile(fpath string, ds ipld.DAGService) (ipld.Node, error) { ...@@ -36,23 +37,24 @@ func BuildDagFromFile(fpath string, ds ipld.DAGService) (ipld.Node, error) {
return BuildDagFromReader(ds, chunk.DefaultSplitter(f)) return BuildDagFromReader(ds, chunk.DefaultSplitter(f))
} }
// BuildDagFromReader builds a DAG from the chunks returned by the given chunk // BuildDagFromReader creates a DAG given a DAGService and a Splitter
// splitter. // implementation (Splitters are io.Readers), using a Balanced layout.
func BuildDagFromReader(ds ipld.DAGService, spl chunk.Splitter) (ipld.Node, error) { func BuildDagFromReader(ds ipld.DAGService, spl chunk.Splitter) (ipld.Node, error) {
dbp := h.DagBuilderParams{ dbp := h.DagBuilderParams{
Dagserv: ds, Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock, Maxlinks: h.DefaultLinksPerBlock,
} }
return bal.BalancedLayout(dbp.New(spl)) return bal.Layout(dbp.New(spl))
} }
// BuildTrickleDagFromReader is similar to BuildDagFromReader but uses the trickle layout. // BuildTrickleDagFromReader creates a DAG given a DAGService and a Splitter
// implementation (Splitters are io.Readers), using a Trickle Layout.
func BuildTrickleDagFromReader(ds ipld.DAGService, spl chunk.Splitter) (ipld.Node, error) { func BuildTrickleDagFromReader(ds ipld.DAGService, spl chunk.Splitter) (ipld.Node, error) {
dbp := h.DagBuilderParams{ dbp := h.DagBuilderParams{
Dagserv: ds, Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock, Maxlinks: h.DefaultLinksPerBlock,
} }
return trickle.TrickleLayout(dbp.New(spl)) return trickle.Layout(dbp.New(spl))
} }
...@@ -39,7 +39,7 @@ func buildTestDag(ds ipld.DAGService, spl chunk.Splitter, rawLeaves UseRawLeaves ...@@ -39,7 +39,7 @@ func buildTestDag(ds ipld.DAGService, spl chunk.Splitter, rawLeaves UseRawLeaves
RawLeaves: bool(rawLeaves), RawLeaves: bool(rawLeaves),
} }
nd, err := TrickleLayout(dbp.New(spl)) nd, err := Layout(dbp.New(spl))
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -503,7 +503,7 @@ func testAppend(t *testing.T, rawLeaves UseRawLeaves) { ...@@ -503,7 +503,7 @@ func testAppend(t *testing.T, rawLeaves UseRawLeaves) {
r := bytes.NewReader(should[nbytes/2:]) r := bytes.NewReader(should[nbytes/2:])
ctx := context.Background() ctx := context.Background()
nnode, err := TrickleAppend(ctx, nd, dbp.New(chunk.NewSizeSplitter(r, 500))) nnode, err := Append(ctx, nd, dbp.New(chunk.NewSizeSplitter(r, 500)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -564,7 +564,7 @@ func testMultipleAppends(t *testing.T, rawLeaves UseRawLeaves) { ...@@ -564,7 +564,7 @@ func testMultipleAppends(t *testing.T, rawLeaves UseRawLeaves) {
ctx := context.Background() ctx := context.Background()
for i := 0; i < len(should); i++ { for i := 0; i < len(should); i++ {
nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(should[i:i+1])))) nnode, err := Append(ctx, nd, dbp.New(spl(bytes.NewReader(should[i:i+1]))))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -612,12 +612,12 @@ func TestAppendSingleBytesToEmpty(t *testing.T) { ...@@ -612,12 +612,12 @@ func TestAppendSingleBytesToEmpty(t *testing.T) {
spl := chunk.SizeSplitterGen(500) spl := chunk.SizeSplitterGen(500)
ctx := context.Background() ctx := context.Background()
nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(data[:1])))) nnode, err := Append(ctx, nd, dbp.New(spl(bytes.NewReader(data[:1]))))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
nnode, err = TrickleAppend(ctx, nnode, dbp.New(spl(bytes.NewReader(data[1:])))) nnode, err = Append(ctx, nnode, dbp.New(spl(bytes.NewReader(data[1:]))))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
// Package trickle allows to build trickle DAGs.
// In this type of DAG, non-leave nodes are first filled
// with data leaves, and then incorporate "layers" of subtrees
// as additional links.
//
// Each layer is a trickle sub-tree and is limited by an increasing
// maxinum depth. Thus, the nodes first layer
// can only hold leaves (depth 1) but subsequent layers can grow deeper.
// By default, this module places 4 nodes per layer (that is, 4 subtrees
// of the same maxinum depth before increasing it).
//
// Trickle DAGs are very good for sequentially reading data, as the
// first data leaves are directly reachable from the root and those
// coming next are always nearby. They are
// suited for things like streaming applications.
package trickle package trickle
import ( import (
...@@ -18,7 +33,10 @@ import ( ...@@ -18,7 +33,10 @@ import (
// improves seek speeds. // improves seek speeds.
const layerRepeat = 4 const layerRepeat = 4
func TrickleLayout(db *h.DagBuilderHelper) (ipld.Node, error) { // Layout builds a new DAG with the trickle format using the provided
// DagBuilderHelper. See the module's description for a more detailed
// explanation.
func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
root := db.NewUnixfsNode() root := db.NewUnixfsNode()
if err := db.FillNodeLayer(root); err != nil { if err := db.FillNodeLayer(root); err != nil {
return nil, err return nil, err
...@@ -68,17 +86,17 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error ...@@ -68,17 +86,17 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error
return nil return nil
} }
// TrickleAppend appends the data in `db` to the dag, using the Trickledag format // Append appends the data in `db` to the dag, using the Trickledag format
func TrickleAppend(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out ipld.Node, err_out error) { func Append(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out ipld.Node, errOut error) {
base, ok := basen.(*dag.ProtoNode) base, ok := basen.(*dag.ProtoNode)
if !ok { if !ok {
return nil, dag.ErrNotProtobuf return nil, dag.ErrNotProtobuf
} }
defer func() { defer func() {
if err_out == nil { if errOut == nil {
if err := db.Close(); err != nil { if err := db.Close(); err != nil {
err_out = err errOut = err
} }
} }
}() }()
...@@ -148,7 +166,7 @@ func appendFillLastChild(ctx context.Context, ufsn *h.UnixfsNode, depth int, lay ...@@ -148,7 +166,7 @@ func appendFillLastChild(ctx context.Context, ufsn *h.UnixfsNode, depth int, lay
} }
// Fill out last child (may not be full tree) // Fill out last child (may not be full tree)
nchild, err := trickleAppendRec(ctx, lastChild, db, depth-1) nchild, err := appendRec(ctx, lastChild, db, depth-1)
if err != nil { if err != nil {
return err return err
} }
...@@ -179,8 +197,8 @@ func appendFillLastChild(ctx context.Context, ufsn *h.UnixfsNode, depth int, lay ...@@ -179,8 +197,8 @@ func appendFillLastChild(ctx context.Context, ufsn *h.UnixfsNode, depth int, lay
return nil return nil
} }
// recursive call for TrickleAppend // recursive call for Append
func trickleAppendRec(ctx context.Context, ufsn *h.UnixfsNode, db *h.DagBuilderHelper, depth int) (*h.UnixfsNode, error) { func appendRec(ctx context.Context, ufsn *h.UnixfsNode, db *h.DagBuilderHelper, depth int) (*h.UnixfsNode, error) {
if depth == 0 || db.Done() { if depth == 0 || db.Done() {
return ufsn, nil return ufsn, nil
} }
...@@ -337,7 +355,7 @@ func verifyTDagRec(n ipld.Node, depth int, p VerifyParams) error { ...@@ -337,7 +355,7 @@ func verifyTDagRec(n ipld.Node, depth int, p VerifyParams) error {
// Recursive trickle dags // Recursive trickle dags
rdepth := ((i - p.Direct) / p.LayerRepeat) + 1 rdepth := ((i - p.Direct) / p.LayerRepeat) + 1
if rdepth >= depth && depth > 0 { if rdepth >= depth && depth > 0 {
return errors.New("Child dag was too deep!") return errors.New("child dag was too deep")
} }
err := verifyTDagRec(child, rdepth, p) err := verifyTDagRec(child, rdepth, p)
if err != nil { if err != nil {
......
...@@ -174,6 +174,8 @@ func (n *FSNode) GetBytes() ([]byte, error) { ...@@ -174,6 +174,8 @@ func (n *FSNode) GetBytes() ([]byte, error) {
return proto.Marshal(pbn) return proto.Marshal(pbn)
} }
// FileSize returns the total size of this tree. That is, the size of
// the data in this node plus the size of all its children.
func (n *FSNode) FileSize() uint64 { func (n *FSNode) FileSize() uint64 {
return uint64(len(n.Data)) + n.subtotal return uint64(len(n.Data)) + n.subtotal
} }
......
...@@ -362,7 +362,7 @@ func (dm *DagModifier) appendData(nd ipld.Node, spl chunk.Splitter) (ipld.Node, ...@@ -362,7 +362,7 @@ func (dm *DagModifier) appendData(nd ipld.Node, spl chunk.Splitter) (ipld.Node,
Prefix: &dm.Prefix, Prefix: &dm.Prefix,
RawLeaves: dm.RawLeaves, RawLeaves: dm.RawLeaves,
} }
return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl)) return trickle.Append(dm.ctx, nd, dbp.New(spl))
default: default:
return nil, ErrNotUnixfs return nil, ErrNotUnixfs
} }
......
...@@ -63,7 +63,7 @@ func GetNode(t testing.TB, dserv ipld.DAGService, data []byte, opts NodeOpts) ip ...@@ -63,7 +63,7 @@ func GetNode(t testing.TB, dserv ipld.DAGService, data []byte, opts NodeOpts) ip
RawLeaves: opts.RawLeavesUsed, RawLeaves: opts.RawLeavesUsed,
} }
node, err := trickle.TrickleLayout(dbp.New(SizeSplitterGen(500)(in))) node, err := trickle.Layout(dbp.New(SizeSplitterGen(500)(in)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论