提交 916fc546 作者: Jeromy

add more comments!

上级 dc2dbdbb
...@@ -87,18 +87,16 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -87,18 +87,16 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
// We have to rewrite the data before our write in this block.
b = append(data[:offset-traversed], b...) b = append(data[:offset-traversed], b...)
break break
} }
traversed += size traversed += size
} }
if startsubblk == len(dm.pbdata.Blocksizes) { if startsubblk == len(dm.pbdata.Blocksizes) {
// TODO: something? // TODO: Im not sure if theres any case that isnt being handled here.
/* // leaving this note here as a future reference in case something breaks
if traversed < offset {
return 0, errors.New("Tried to start write outside bounds of file.")
}
*/
} }
} }
...@@ -157,7 +155,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -157,7 +155,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
sizes = append(sizes, uint64(len(sb))) sizes = append(sizes, uint64(len(sb)))
} }
// This is disgusting // This is disgusting (and can be rewritten if performance demands)
if len(changed) > 0 { if len(changed) > 0 {
sechalflink := append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...) sechalflink := append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...)
dm.curNode.Links = append(dm.curNode.Links[:changed[0]], sechalflink...) dm.curNode.Links = append(dm.curNode.Links[:changed[0]], sechalflink...)
...@@ -172,6 +170,8 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -172,6 +170,8 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
return origlen, nil return origlen, nil
} }
// splitBytes uses a splitterFunc to turn a large array of bytes
// into many smaller arrays of bytes
func splitBytes(b []byte, spl imp.BlockSplitter) [][]byte { func splitBytes(b []byte, spl imp.BlockSplitter) [][]byte {
out := spl.Split(bytes.NewReader(b)) out := spl.Split(bytes.NewReader(b))
var arr [][]byte var arr [][]byte
...@@ -181,6 +181,7 @@ func splitBytes(b []byte, spl imp.BlockSplitter) [][]byte { ...@@ -181,6 +181,7 @@ func splitBytes(b []byte, spl imp.BlockSplitter) [][]byte {
return arr return arr
} }
// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) { func (dm *DagModifier) GetNode() (*mdag.Node, error) {
b, err := proto.Marshal(dm.pbdata) b, err := proto.Marshal(dm.pbdata)
if err != nil { if err != nil {
......
...@@ -29,14 +29,23 @@ func NewDagWriter(ds *dag.DAGService, splitter imp.BlockSplitter) *DagWriter { ...@@ -29,14 +29,23 @@ func NewDagWriter(ds *dag.DAGService, splitter imp.BlockSplitter) *DagWriter {
return dw return dw
} }
// startSplitter manages splitting incoming bytes and
// creating dag nodes from them. Created nodes are stored
// in the DAGService and then released to the GC.
func (dw *DagWriter) startSplitter() { func (dw *DagWriter) startSplitter() {
// Since the splitter functions take a reader (and should!)
// we wrap our byte chan input in a reader
r := util.NewByteChanReader(dw.splChan) r := util.NewByteChanReader(dw.splChan)
blkchan := dw.splitter.Split(r) blkchan := dw.splitter.Split(r)
// First data block is reserved for storage in the root node
first := <-blkchan first := <-blkchan
mbf := new(ft.MultiBlock) mbf := new(ft.MultiBlock)
root := new(dag.Node) root := new(dag.Node)
for blkData := range blkchan { for blkData := range blkchan {
// Store the block size in the root node
mbf.AddBlockSize(uint64(len(blkData))) mbf.AddBlockSize(uint64(len(blkData)))
node := &dag.Node{Data: ft.WrapData(blkData)} node := &dag.Node{Data: ft.WrapData(blkData)}
_, err := dw.dagserv.Add(node) _, err := dw.dagserv.Add(node)
...@@ -45,6 +54,8 @@ func (dw *DagWriter) startSplitter() { ...@@ -45,6 +54,8 @@ func (dw *DagWriter) startSplitter() {
log.Critical("Got error adding created node to dagservice: %s", err) log.Critical("Got error adding created node to dagservice: %s", err)
return return
} }
// Add a link to this node without storing a reference to the memory
err = root.AddNodeLinkClean("", node) err = root.AddNodeLinkClean("", node)
if err != nil { if err != nil {
dw.seterr = err dw.seterr = err
...@@ -52,6 +63,8 @@ func (dw *DagWriter) startSplitter() { ...@@ -52,6 +63,8 @@ func (dw *DagWriter) startSplitter() {
return return
} }
} }
// Generate the root node data
mbf.Data = first mbf.Data = first
data, err := mbf.GetBytes() data, err := mbf.GetBytes()
if err != nil { if err != nil {
...@@ -61,6 +74,7 @@ func (dw *DagWriter) startSplitter() { ...@@ -61,6 +74,7 @@ func (dw *DagWriter) startSplitter() {
} }
root.Data = data root.Data = data
// Add root node to the dagservice
_, err = dw.dagserv.Add(root) _, err = dw.dagserv.Add(root)
if err != nil { if err != nil {
dw.seterr = err dw.seterr = err
...@@ -79,6 +93,9 @@ func (dw *DagWriter) Write(b []byte) (int, error) { ...@@ -79,6 +93,9 @@ func (dw *DagWriter) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
// Close the splitters input channel and wait for it to finish
// Must be called to finish up splitting, otherwise split method
// will never halt
func (dw *DagWriter) Close() error { func (dw *DagWriter) Close() error {
close(dw.splChan) close(dw.splChan)
<-dw.done <-dw.done
......
...@@ -8,6 +8,10 @@ import ( ...@@ -8,6 +8,10 @@ import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
) )
var ErrMalformedFileFormat = errors.New("malformed data in file format")
var ErrInvalidDirLocation = errors.New("found directory node in unexpected place")
var ErrUnrecognizedType = errors.New("unrecognized node type")
func FromBytes(data []byte) (*PBData, error) { func FromBytes(data []byte) (*PBData, error) {
pbdata := new(PBData) pbdata := new(PBData)
err := proto.Unmarshal(data, pbdata) err := proto.Unmarshal(data, pbdata)
...@@ -26,12 +30,17 @@ func FilePBData(data []byte, totalsize uint64) []byte { ...@@ -26,12 +30,17 @@ func FilePBData(data []byte, totalsize uint64) []byte {
data, err := proto.Marshal(pbfile) data, err := proto.Marshal(pbfile)
if err != nil { if err != nil {
//this really shouldnt happen, i promise // This really shouldnt happen, i promise
// The only failure case for marshal is if required fields
// are not filled out, and they all are. If the proto object
// gets changed and nobody updates this function, the code
// should panic due to programmer error
panic(err) panic(err)
} }
return data return data
} }
// Returns Bytes that represent a Directory
func FolderPBData() []byte { func FolderPBData() []byte {
pbfile := new(PBData) pbfile := new(PBData)
typ := PBData_Directory typ := PBData_Directory
......
...@@ -20,6 +20,8 @@ type DagReader struct { ...@@ -20,6 +20,8 @@ type DagReader struct {
buf *bytes.Buffer buf *bytes.Buffer
} }
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) { func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) {
pb := new(ft.PBData) pb := new(ft.PBData)
err := proto.Unmarshal(n.Data, pb) err := proto.Unmarshal(n.Data, pb)
...@@ -29,6 +31,7 @@ func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) { ...@@ -29,6 +31,7 @@ func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) {
switch pb.GetType() { switch pb.GetType() {
case ft.PBData_Directory: case ft.PBData_Directory:
// Dont allow reading directories
return nil, ErrIsDir return nil, ErrIsDir
case ft.PBData_File: case ft.PBData_File:
return &DagReader{ return &DagReader{
...@@ -37,12 +40,15 @@ func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) { ...@@ -37,12 +40,15 @@ func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) {
buf: bytes.NewBuffer(pb.GetData()), buf: bytes.NewBuffer(pb.GetData()),
}, nil }, nil
case ft.PBData_Raw: case ft.PBData_Raw:
// Raw block will just be a single level, return a byte buffer
return bytes.NewBuffer(pb.GetData()), nil return bytes.NewBuffer(pb.GetData()), nil
default: default:
panic("Unrecognized node type!") return nil, ft.ErrUnrecognizedType
} }
} }
// Follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error { func (dr *DagReader) precalcNextBuf() error {
if dr.position >= len(dr.node.Links) { if dr.position >= len(dr.node.Links) {
return io.EOF return io.EOF
...@@ -65,7 +71,7 @@ func (dr *DagReader) precalcNextBuf() error { ...@@ -65,7 +71,7 @@ func (dr *DagReader) precalcNextBuf() error {
switch pb.GetType() { switch pb.GetType() {
case ft.PBData_Directory: case ft.PBData_Directory:
panic("Why is there a directory under a file?") return ft.ErrInvalidDirLocation
case ft.PBData_File: case ft.PBData_File:
//TODO: this *should* work, needs testing first //TODO: this *should* work, needs testing first
//return NewDagReader(nxt, dr.serv) //return NewDagReader(nxt, dr.serv)
...@@ -74,11 +80,12 @@ func (dr *DagReader) precalcNextBuf() error { ...@@ -74,11 +80,12 @@ func (dr *DagReader) precalcNextBuf() error {
dr.buf = bytes.NewBuffer(pb.GetData()) dr.buf = bytes.NewBuffer(pb.GetData())
return nil return nil
default: default:
panic("Unrecognized node type!") return ft.ErrUnrecognizedType
} }
} }
func (dr *DagReader) Read(b []byte) (int, error) { func (dr *DagReader) Read(b []byte) (int, error) {
// If no cached buffer, load one
if dr.buf == nil { if dr.buf == nil {
err := dr.precalcNextBuf() err := dr.precalcNextBuf()
if err != nil { if err != nil {
...@@ -87,16 +94,22 @@ func (dr *DagReader) Read(b []byte) (int, error) { ...@@ -87,16 +94,22 @@ func (dr *DagReader) Read(b []byte) (int, error) {
} }
total := 0 total := 0
for { for {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:]) n, err := dr.buf.Read(b[total:])
total += n total += n
if err != nil { if err != nil {
// EOF is expected
if err != io.EOF { if err != io.EOF {
return total, err return total, err
} }
} }
// If weve read enough bytes, return
if total == len(b) { if total == len(b) {
return total, nil return total, nil
} }
// Otherwise, load up the next block
err = dr.precalcNextBuf() err = dr.precalcNextBuf()
if err != nil { if err != nil {
return total, err return total, err
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论