提交 84ccfe70 作者: Jeromy

Merge branch 'feat/get-faster'

...@@ -60,20 +60,20 @@ func NewReader(path path.Path, dag mdag.DAGService, resolver *path.Resolver, com ...@@ -60,20 +60,20 @@ func NewReader(path path.Path, dag mdag.DAGService, resolver *path.Resolver, com
return reader, nil return reader, nil
} }
func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) { func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
pb := new(upb.Data) pb := new(upb.Data)
err := proto.Unmarshal(dagnode.Data, pb) err := proto.Unmarshal(dagnode.Data, pb)
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
if depth == 0 { if depth == 0 {
defer i.close() defer r.close()
} }
if pb.GetType() == upb.Data_Directory { if pb.GetType() == upb.Data_Directory {
err = i.writer.WriteHeader(&tar.Header{ err = r.writer.WriteHeader(&tar.Header{
Name: path, Name: path,
Typeflag: tar.TypeDir, Typeflag: tar.TypeDir,
Mode: 0777, Mode: 0777,
...@@ -81,23 +81,25 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) { ...@@ -81,23 +81,25 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
// TODO: set mode, dates, etc. when added to unixFS // TODO: set mode, dates, etc. when added to unixFS
}) })
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
i.flush() r.flush()
for _, link := range dagnode.Links { ctx, _ := context.WithTimeout(context.TODO(), time.Second*60)
childNode, err := link.GetNode(i.dag)
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get()
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
i.writeToBuf(childNode, gopath.Join(path, link.Name), depth+1) r.writeToBuf(childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
} }
return return
} }
err = i.writer.WriteHeader(&tar.Header{ err = r.writer.WriteHeader(&tar.Header{
Name: path, Name: path,
Size: int64(pb.GetFilesize()), Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg, Typeflag: tar.TypeReg,
...@@ -106,95 +108,95 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) { ...@@ -106,95 +108,95 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
// TODO: set mode, dates, etc. when added to unixFS // TODO: set mode, dates, etc. when added to unixFS
}) })
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
i.flush() r.flush()
reader, err := uio.NewDagReader(context.TODO(), dagnode, i.dag) reader, err := uio.NewDagReader(context.TODO(), dagnode, r.dag)
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
err = i.syncCopy(reader) err = r.syncCopy(reader)
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
} }
func (i *Reader) Read(p []byte) (int, error) { func (r *Reader) Read(p []byte) (int, error) {
// wait for the goroutine that is writing data to the buffer to tell us // wait for the goroutine that is writing data to the buffer to tell us
// there is something to read // there is something to read
if !i.closed { if !r.closed {
<-i.signalChan <-r.signalChan
} }
if i.err != nil { if r.err != nil {
return 0, i.err return 0, r.err
} }
if !i.closed { if !r.closed {
defer i.signal() defer r.signal()
} }
if i.buf.Len() == 0 { if r.buf.Len() == 0 {
if i.closed { if r.closed {
return 0, io.EOF return 0, io.EOF
} }
return 0, nil return 0, nil
} }
n, err := i.buf.Read(p) n, err := r.buf.Read(p)
if err == io.EOF && !i.closed || i.buf.Len() > 0 { if err == io.EOF && !r.closed || r.buf.Len() > 0 {
return n, nil return n, nil
} }
return n, err return n, err
} }
func (i *Reader) signal() { func (r *Reader) signal() {
i.signalChan <- struct{}{} r.signalChan <- struct{}{}
} }
func (i *Reader) flush() { func (r *Reader) flush() {
i.signal() r.signal()
<-i.signalChan <-r.signalChan
} }
func (i *Reader) emitError(err error) { func (r *Reader) emitError(err error) {
i.err = err r.err = err
i.signal() r.signal()
} }
func (i *Reader) close() { func (r *Reader) close() {
i.closed = true r.closed = true
defer i.signal() defer r.signal()
err := i.writer.Close() err := r.writer.Close()
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
if i.gzipWriter != nil { if r.gzipWriter != nil {
err = i.gzipWriter.Close() err = r.gzipWriter.Close()
if err != nil { if err != nil {
i.emitError(err) r.emitError(err)
return return
} }
} }
} }
func (i *Reader) syncCopy(reader io.Reader) error { func (r *Reader) syncCopy(reader io.Reader) error {
buf := make([]byte, 32*1024) buf := make([]byte, 32*1024)
for { for {
nr, err := reader.Read(buf) nr, err := reader.Read(buf)
if nr > 0 { if nr > 0 {
_, err := i.writer.Write(buf[:nr]) _, err := r.writer.Write(buf[:nr])
if err != nil { if err != nil {
return err return err
} }
i.flush() r.flush()
} }
if err == io.EOF { if err == io.EOF {
break break
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论