提交 b01663ed 作者: Juan Batiz-Benet

Merge pull request #621 from jbenet/ipfs-get

`ipfs get` Command
package commands
import (
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"strings"
cmds "github.com/jbenet/go-ipfs/commands"
core "github.com/jbenet/go-ipfs/core"
tar "github.com/jbenet/go-ipfs/thirdparty/tar"
utar "github.com/jbenet/go-ipfs/unixfs/tar"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
)
var ErrInvalidCompressionLevel = errors.New("Compression level must be between 1 and 9")
var GetCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Download IPFS objects",
ShortDescription: `
Retrieves the object named by <ipfs-path> and stores the data to disk.
By default, the output will be stored at ./<ipfs-path>, but an alternate path
can be specified with '--output=<path>' or '-o=<path>'.
To output a TAR archive instead of unpacked files, use '--archive' or '-a'.
To compress the output with GZIP compression, use '--compress' or '-C'. You
may also specify the level of compression by specifying '-l=<1-9>'.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("ipfs-path", true, false, "The path to the IPFS object(s) to be outputted").EnableStdin(),
},
Options: []cmds.Option{
cmds.StringOption("output", "o", "The path where output should be stored"),
cmds.BoolOption("archive", "a", "Output a TAR archive"),
cmds.BoolOption("compress", "C", "Compress the output with GZIP compression"),
cmds.IntOption("compression-level", "l", "The level of compression (1-9)"),
},
PreRun: func(req cmds.Request) error {
_, err := getCompressOptions(req)
return err
},
Run: func(req cmds.Request, res cmds.Response) {
cmplvl, err := getCompressOptions(req)
if err != nil {
res.SetError(err, cmds.ErrClient)
return
}
node, err := req.Context().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
reader, err := get(node, req.Arguments()[0], cmplvl)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput(reader)
},
PostRun: func(req cmds.Request, res cmds.Response) {
if res.Output() == nil {
return
}
outReader := res.Output().(io.Reader)
res.SetOutput(nil)
outPath, _, _ := req.Option("output").String()
if len(outPath) == 0 {
outPath = req.Arguments()[0]
}
cmplvl, err := getCompressOptions(req)
if err != nil {
res.SetError(err, cmds.ErrClient)
return
}
if archive, _, _ := req.Option("archive").Bool(); archive {
if !strings.HasSuffix(outPath, ".tar") {
outPath += ".tar"
}
if cmplvl != gzip.NoCompression {
outPath += ".gz"
}
fmt.Printf("Saving archive to %s\n", outPath)
file, err := os.Create(outPath)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
defer file.Close()
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = os.Stderr
pbReader := bar.NewProxyReader(outReader)
bar.Start()
defer bar.Finish()
_, err = io.Copy(file, pbReader)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
return
}
fmt.Printf("Saving file(s) to %s\n", outPath)
// TODO: get total length of files
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = os.Stderr
// wrap the reader with the progress bar proxy reader
// if the output is compressed, also wrap it in a gzip.Reader
var reader io.Reader
if cmplvl != gzip.NoCompression {
gzipReader, err := gzip.NewReader(outReader)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
defer gzipReader.Close()
reader = bar.NewProxyReader(gzipReader)
} else {
reader = bar.NewProxyReader(outReader)
}
bar.Start()
defer bar.Finish()
extractor := &tar.Extractor{outPath}
err = extractor.Extract(reader)
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
},
}
func getCompressOptions(req cmds.Request) (int, error) {
cmprs, _, _ := req.Option("compress").Bool()
cmplvl, cmplvlFound, _ := req.Option("compression-level").Int()
switch {
case !cmprs:
return gzip.NoCompression, nil
case cmprs && !cmplvlFound:
return gzip.DefaultCompression, nil
case cmprs && cmplvlFound && (cmplvl < 1 || cmplvl > 9):
return gzip.NoCompression, ErrInvalidCompressionLevel
}
return gzip.NoCompression, nil
}
func get(node *core.IpfsNode, path string, compression int) (io.Reader, error) {
return utar.NewReader(path, node.DAG, node.Resolver, compression)
}
......@@ -82,6 +82,7 @@ var rootSubcommands = map[string]*cmds.Command{
"config": ConfigCmd,
"dht": DhtCmd,
"diag": DiagCmd,
"get": GetCmd,
"id": IDCmd,
"log": LogCmd,
"ls": LsCmd,
......
#!/bin/sh
#
# Copyright (c) 2015 Matt Bell
# MIT Licensed; see the LICENSE file in this repository.
#
test_description="Test get command"
. lib/test-lib.sh
test_init_ipfs
test_launch_ipfs_daemon
test_expect_success "'ipfs get --help' succeeds" '
ipfs get --help > actual
'
test_expect_success "'ipfs get --help' output looks good" '
egrep "ipfs get.*<ipfs-path>" actual > /dev/null ||
fsh cat actual
'
test_expect_success "ipfs get succeeds" '
echo "Hello Worlds!" > data &&
HASH=`ipfs add -q data` &&
ipfs get $HASH > actual
'
test_expect_success "ipfs get output looks good" '
echo "Saving file(s) to $HASH
" > expected &&
test_cmp expected actual
'
test_expect_success "ipfs get file output looks good" '
test_cmp $HASH data
'
test_expect_success "ipfs get errors when trying to overwrite a file" '
test_must_fail ipfs get $HASH > actual &&
rm $HASH
'
test_expect_success "ipfs get -a succeeds" '
ipfs get $HASH -a > actual
'
test_expect_success "ipfs get -a output looks good" '
echo "Saving archive to $HASH.tar
" > expected &&
test_cmp expected actual
'
test_expect_success "ipfs get -a archive output is valid" '
tar -xf "$HASH".tar &&
test_cmp $HASH data &&
rm $HASH.tar &&
rm $HASH
'
test_expect_success "ipfs get -a -C succeeds" '
ipfs get $HASH -a -C > actual
'
test_expect_success "ipfs get -a -C output looks good" '
echo "Saving archive to $HASH.tar.gz
" > expected &&
test_cmp expected actual
'
test_expect_success "gzipped tar archive output is valid" '
tar -xf "$HASH".tar.gz &&
test_cmp $HASH data &&
rm "$HASH".tar.gz &&
rm $HASH
'
test_expect_success "ipfs get succeeds (directory)" '
mkdir dir &&
touch dir/a &&
mkdir dir/b &&
echo "Hello, Worlds!" > dir/b/c &&
HASH2=`ipfs add -r -q dir | tail -n 1` &&
ipfs get $HASH2 > actual
'
test_expect_success "ipfs get output looks good (directory)" '
echo "Saving file(s) to $HASH2
" > expected &&
test_cmp expected actual
'
test_expect_success "ipfs get output is valid (directory)" '
test_cmp dir/a "$HASH2"/a &&
test_cmp dir/b/c "$HASH2"/b/c &&
rm -r $HASH2
'
test_expect_success "ipfs get -a -C succeeds (directory)" '
ipfs get $HASH2 -a -C > actual
'
test_expect_success "ipfs get -a -C output looks good (directory)" '
echo "Saving archive to "$HASH2".tar.gz
" > expected &&
test_cmp expected actual
'
test_expect_success "gzipped tar archive output is valid (directory)" '
tar -xf "$HASH2".tar.gz &&
test_cmp dir/a "$HASH2"/a &&
test_cmp dir/b/c "$HASH2"/b/c &&
rm -r $HASH2
'
test_kill_ipfs_daemon
test_done
package tar
import (
"archive/tar"
"io"
"os"
fp "path/filepath"
"strings"
)
type Extractor struct {
Path string
}
func (te *Extractor) Extract(reader io.Reader) error {
tarReader := tar.NewReader(reader)
// Check if the output path already exists, so we know whether we should
// create our output with that name, or if we should put the output inside
// a preexisting directory
exists := true
pathIsDir := false
if stat, err := os.Stat(te.Path); err != nil && os.IsNotExist(err) {
exists = false
} else if err != nil {
return err
} else if stat.IsDir() {
pathIsDir = true
}
// files come recursively in order (i == 0 is root directory)
for i := 0; ; i++ {
header, err := tarReader.Next()
if err != nil && err != io.EOF {
return err
}
if header == nil || err == io.EOF {
break
}
if header.Typeflag == tar.TypeDir {
err = te.extractDir(header, i, exists)
if err != nil {
return err
}
continue
}
err = te.extractFile(header, tarReader, i, exists, pathIsDir)
if err != nil {
return err
}
}
return nil
}
func (te *Extractor) extractDir(h *tar.Header, depth int, exists bool) error {
pathElements := strings.Split(h.Name, "/")
if !exists {
pathElements = pathElements[1:]
}
path := fp.Join(pathElements...)
path = fp.Join(te.Path, path)
if depth == 0 {
// if this is the root root directory, use it as the output path for remaining files
te.Path = path
}
err := os.MkdirAll(path, 0755)
if err != nil {
return err
}
return nil
}
func (te *Extractor) extractFile(h *tar.Header, r *tar.Reader, depth int, exists bool, pathIsDir bool) error {
var path string
if depth == 0 {
// if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory)
switch {
case exists && !pathIsDir:
return os.ErrExist
case exists && pathIsDir:
path = fp.Join(te.Path, h.Name)
case !exists:
path = te.Path
}
} else {
// we are outputting a directory, this file is inside of it
pathElements := strings.Split(h.Name, "/")[1:]
path = fp.Join(pathElements...)
path = fp.Join(te.Path, path)
}
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, r)
if err != nil {
return err
}
return nil
}
package tar
import (
"archive/tar"
"bytes"
"compress/gzip"
"io"
p "path"
mdag "github.com/jbenet/go-ipfs/merkledag"
path "github.com/jbenet/go-ipfs/path"
uio "github.com/jbenet/go-ipfs/unixfs/io"
upb "github.com/jbenet/go-ipfs/unixfs/pb"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)
type Reader struct {
buf bytes.Buffer
closed bool
signalChan chan struct{}
dag mdag.DAGService
resolver *path.Resolver
writer *tar.Writer
gzipWriter *gzip.Writer
err error
}
func NewReader(path string, dag mdag.DAGService, resolver *path.Resolver, compression int) (*Reader, error) {
reader := &Reader{
signalChan: make(chan struct{}),
dag: dag,
resolver: resolver,
}
var err error
if compression != gzip.NoCompression {
reader.gzipWriter, err = gzip.NewWriterLevel(&reader.buf, compression)
if err != nil {
return nil, err
}
reader.writer = tar.NewWriter(reader.gzipWriter)
} else {
reader.writer = tar.NewWriter(&reader.buf)
}
dagnode, err := resolver.ResolvePath(path)
if err != nil {
return nil, err
}
// writeToBuf will write the data to the buffer, and will signal when there
// is new data to read
go reader.writeToBuf(dagnode, path, 0)
return reader, nil
}
func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
pb := new(upb.Data)
err := proto.Unmarshal(dagnode.Data, pb)
if err != nil {
i.emitError(err)
return
}
if depth == 0 {
defer i.close()
}
if pb.GetType() == upb.Data_Directory {
err = i.writer.WriteHeader(&tar.Header{
Name: path,
Typeflag: tar.TypeDir,
Mode: 0777,
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
i.emitError(err)
return
}
i.flush()
for _, link := range dagnode.Links {
childNode, err := link.GetNode(i.dag)
if err != nil {
i.emitError(err)
return
}
i.writeToBuf(childNode, p.Join(path, link.Name), depth+1)
}
return
}
err = i.writer.WriteHeader(&tar.Header{
Name: path,
Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg,
Mode: 0644,
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
i.emitError(err)
return
}
i.flush()
reader, err := uio.NewDagReader(dagnode, i.dag)
if err != nil {
i.emitError(err)
return
}
err = i.syncCopy(reader)
if err != nil {
i.emitError(err)
return
}
}
func (i *Reader) Read(p []byte) (int, error) {
// wait for the goroutine that is writing data to the buffer to tell us
// there is something to read
if !i.closed {
<-i.signalChan
}
if i.err != nil {
return 0, i.err
}
if !i.closed {
defer i.signal()
}
if i.buf.Len() == 0 {
if i.closed {
return 0, io.EOF
}
return 0, nil
}
n, err := i.buf.Read(p)
if err == io.EOF && !i.closed || i.buf.Len() > 0 {
return n, nil
}
return n, err
}
func (i *Reader) signal() {
i.signalChan <- struct{}{}
}
func (i *Reader) flush() {
i.signal()
<-i.signalChan
}
func (i *Reader) emitError(err error) {
i.err = err
i.signal()
}
func (i *Reader) close() {
i.closed = true
defer i.signal()
err := i.writer.Close()
if err != nil {
i.emitError(err)
return
}
if i.gzipWriter != nil {
err = i.gzipWriter.Close()
if err != nil {
i.emitError(err)
return
}
}
}
func (i *Reader) syncCopy(reader io.Reader) error {
buf := make([]byte, 32*1024)
for {
nr, err := reader.Read(buf)
if nr > 0 {
_, err := i.writer.Write(buf[:nr])
if err != nil {
return err
}
i.flush()
}
if err == io.EOF {
break
}
if err != nil {
return err
}
}
return nil
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论