提交 ef6e9cf2 作者: hannahhoward

feat(commands): --stream option for ls

Convert LS Command to use current cmds lib
Update LS Command to support streaming
Rebase fixes

License: MIT
Signed-off-by: 's avatarhannahhoward <hannah@hannahhoward.net>
上级 31099e88
package commands package commands
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"text/tabwriter" "text/tabwriter"
cmds "github.com/ipfs/go-ipfs/commands" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e" e "github.com/ipfs/go-ipfs/core/commands/e"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface" iface "github.com/ipfs/go-ipfs/core/coreapi/interface"
...@@ -16,29 +15,40 @@ import ( ...@@ -16,29 +15,40 @@ import (
unixfspb "gx/ipfs/QmUnHNqhSB1JgzVCxL1Kz3yb4bdyB4q1Z9AD5AUBVmt3fZ/go-unixfs/pb" unixfspb "gx/ipfs/QmUnHNqhSB1JgzVCxL1Kz3yb4bdyB4q1Z9AD5AUBVmt3fZ/go-unixfs/pb"
blockservice "gx/ipfs/QmVDTbzzTwnuBwNbJdhW3u7LoBQp46bezm9yp4z1RoEepM/go-blockservice" blockservice "gx/ipfs/QmVDTbzzTwnuBwNbJdhW3u7LoBQp46bezm9yp4z1RoEepM/go-blockservice"
offline "gx/ipfs/QmYZwey1thDTynSrvd6qQkX24UpTka6TFhQ2v569UpoqxD/go-ipfs-exchange-offline" offline "gx/ipfs/QmYZwey1thDTynSrvd6qQkX24UpTka6TFhQ2v569UpoqxD/go-ipfs-exchange-offline"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
merkledag "gx/ipfs/QmcGt25mrjuB2kKW2zhPbXVZNHc4yoTDQ65NA8m6auP2f1/go-merkledag" merkledag "gx/ipfs/QmcGt25mrjuB2kKW2zhPbXVZNHc4yoTDQ65NA8m6auP2f1/go-merkledag"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
) )
// LsLink contains printable data for a single ipld link in ls output
type LsLink struct { type LsLink struct {
Name, Hash string Name, Hash string
Size uint64 Size uint64
Type unixfspb.Data_DataType Type unixfspb.Data_DataType
} }
// LsObject is an element of LsOutput
// It can represent a whole directory, a directory header, one or more links,
// Or a the end of a directory
type LsObject struct { type LsObject struct {
Hash string Hash string
Links []LsLink Links []LsLink
HasHeader bool
HasLinks bool
HasFooter bool
} }
// LsOutput is a set of printable data for directories
type LsOutput struct { type LsOutput struct {
MultipleFolders bool
Objects []LsObject Objects []LsObject
} }
const ( const (
lsHeadersOptionNameTime = "headers" lsHeadersOptionNameTime = "headers"
lsResolveTypeOptionName = "resolve-type" lsResolveTypeOptionName = "resolve-type"
lsStreamOptionName = "stream"
) )
var LsCmd = &cmds.Command{ var LsCmd = &cmds.Command{
...@@ -60,32 +70,20 @@ The JSON output contains type information. ...@@ -60,32 +70,20 @@ The JSON output contains type information.
Options: []cmdkit.Option{ Options: []cmdkit.Option{
cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."), cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true), cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmdkit.BoolOption(lsStreamOptionName, "s", "Stream directory entries as they are found."),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := req.InvocContext().GetNode() nd, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
api, err := req.InvocContext().GetApi() api, err := cmdenv.GetApi(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
}
// get options early -> exit early in case of error
if _, _, err := req.Option(lsHeadersOptionNameTime).Bool(); err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
resolve, _, err := req.Option(lsResolveTypeOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
} }
resolve, _ := req.Options[lsResolveTypeOptionName].(bool)
dserv := nd.DAG dserv := nd.DAG
if !resolve { if !resolve {
offlineexch := offline.Exchange(nd.Blockstore) offlineexch := offline.Exchange(nd.Blockstore)
...@@ -93,125 +91,204 @@ The JSON output contains type information. ...@@ -93,125 +91,204 @@ The JSON output contains type information.
dserv = merkledag.NewDAGService(bserv) dserv = merkledag.NewDAGService(bserv)
} }
paths := req.Arguments() err = req.ParseBodyArgs()
if err != nil {
return err
}
paths := req.Arguments
var dagnodes []ipld.Node var dagnodes []ipld.Node
for _, fpath := range paths { for _, fpath := range paths {
p, err := iface.ParsePath(fpath) p, err := iface.ParsePath(fpath)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
dagnode, err := api.ResolveNode(req.Context(), p) dagnode, err := api.ResolveNode(req.Context, p)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
dagnodes = append(dagnodes, dagnode) dagnodes = append(dagnodes, dagnode)
} }
ng := merkledag.NewSession(req.Context, nd.DAG)
output := make([]LsObject, len(req.Arguments()))
ng := merkledag.NewSession(req.Context(), nd.DAG)
ro := merkledag.NewReadOnlyDagService(ng) ro := merkledag.NewReadOnlyDagService(ng)
stream, _ := req.Options[lsStreamOptionName].(bool)
multipleFolders := len(req.Arguments) > 1
if !stream {
output := make([]LsObject, len(req.Arguments))
for i, dagnode := range dagnodes { for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode) dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir { if err != nil && err != uio.ErrNotADir {
res.SetError(fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err), cmdkit.ErrNormal) return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
return
} }
var links []*ipld.Link var links []*ipld.Link
if dir == nil { if dir == nil {
links = dagnode.Links() links = dagnode.Links()
} else { } else {
links, err = dir.Links(req.Context()) links, err = dir.Links(req.Context)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
} }
outputLinks := make([]LsLink, len(links))
for j, link := range links {
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
outputLinks[j] = *lsLink
}
output[i] = newFullDirectoryLsObject(paths[i], outputLinks)
}
output[i] = LsObject{ return cmds.EmitOnce(res, &LsOutput{multipleFolders, output})
Hash: paths[i],
Links: make([]LsLink, len(links)),
} }
for j, link := range links { for i, dagnode := range dagnodes {
t := unixfspb.Data_DataType(-1) dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}
switch link.Cid.Type() { var linkResults <-chan unixfs.LinkResult
case cid.Raw: if dir == nil {
// No need to check with raw leaves linkResults = makeDagNodeLinkResults(req, dagnode)
t = unixfs.TFile } else {
case cid.DagProtobuf: linkResults = dir.EnumLinksAsync(req.Context)
linkNode, err := link.GetNode(req.Context(), dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
} }
if pn, ok := linkNode.(*merkledag.ProtoNode); ok { output := make([]LsObject, 1)
d, err := unixfs.FSNodeFromBytes(pn.Data()) outputLinks := make([]LsLink, 1)
if err != nil {
res.SetError(err, cmdkit.ErrNormal) output[0] = newDirectoryHeaderLsObject(paths[i])
return if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return nil
} }
t = d.Type() for linkResult := range linkResults {
if linkResult.Err != nil {
return linkResult.Err
} }
link := linkResult.Link
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
} }
output[i].Links[j] = LsLink{ outputLinks[0] = *lsLink
Name: link.Name, output[0] = newDirectoryLinksLsObject(outputLinks)
Hash: link.Cid.String(), if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
Size: link.Size, return err
Type: t,
} }
} }
output[0] = newDirectoryFooterLsObject()
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return err
} }
res.SetOutput(&LsOutput{output})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
} }
return nil
headers, _, _ := res.Request().Option(lsHeadersOptionNameTime).Bool() },
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
output, ok := v.(*LsOutput) output, ok := v.(*LsOutput)
if !ok { if !ok {
return nil, e.TypeErr(output, v) return e.TypeErr(output, v)
} }
buf := new(bytes.Buffer) tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, object := range output.Objects { for _, object := range output.Objects {
if len(output.Objects) > 1 { if object.HasHeader {
fmt.Fprintf(w, "%s:\n", object.Hash) if output.MultipleFolders {
fmt.Fprintf(tw, "%s:\n", object.Hash)
} }
if headers { if headers {
fmt.Fprintln(w, "Hash\tSize\tName") fmt.Fprintln(tw, "Hash\tSize\tName")
} }
}
if object.HasLinks {
for _, link := range object.Links { for _, link := range object.Links {
if link.Type == unixfs.TDirectory { if link.Type == unixfs.TDirectory {
link.Name += "/" link.Name += "/"
} }
fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
} }
if len(output.Objects) > 1 {
fmt.Fprintln(w)
} }
if object.HasFooter {
if output.MultipleFolders {
fmt.Fprintln(tw)
} }
w.Flush() }
}
return buf, nil tw.Flush()
}, return nil
}),
}, },
Type: LsOutput{}, Type: LsOutput{},
} }
func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult {
linkResults := make(chan unixfs.LinkResult)
go func() {
defer close(linkResults)
for _, l := range dagnode.Links() {
select {
case linkResults <- unixfs.LinkResult{
Link: l,
Err: nil,
}:
case <-req.Context.Done():
return
}
}
}()
return linkResults
}
func newFullDirectoryLsObject(hash string, links []LsLink) LsObject {
return LsObject{hash, links, true, true, true}
}
func newDirectoryHeaderLsObject(hash string) LsObject {
return LsObject{hash, nil, true, false, false}
}
func newDirectoryLinksLsObject(links []LsLink) LsObject {
return LsObject{"", links, false, true, false}
}
func newDirectoryFooterLsObject() LsObject {
return LsObject{"", nil, false, false, true}
}
func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) {
t := unixfspb.Data_DataType(-1)
switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
return nil, err
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return nil, err
}
t = d.Type()
}
}
return &LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
}, nil
}
...@@ -3,7 +3,6 @@ package commands ...@@ -3,7 +3,6 @@ package commands
import ( import (
"errors" "errors"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
dag "github.com/ipfs/go-ipfs/core/commands/dag" dag "github.com/ipfs/go-ipfs/core/commands/dag"
name "github.com/ipfs/go-ipfs/core/commands/name" name "github.com/ipfs/go-ipfs/core/commands/name"
ocmd "github.com/ipfs/go-ipfs/core/commands/object" ocmd "github.com/ipfs/go-ipfs/core/commands/object"
...@@ -127,7 +126,7 @@ var rootSubcommands = map[string]*cmds.Command{ ...@@ -127,7 +126,7 @@ var rootSubcommands = map[string]*cmds.Command{
"id": IDCmd, "id": IDCmd,
"key": KeyCmd, "key": KeyCmd,
"log": LogCmd, "log": LogCmd,
"ls": lgc.NewCommand(LsCmd), "ls": LsCmd,
"mount": MountCmd, "mount": MountCmd,
"name": name.NameCmd, "name": name.NameCmd,
"object": ocmd.ObjectCmd, "object": ocmd.ObjectCmd,
...@@ -165,7 +164,7 @@ var rootROSubcommands = map[string]*cmds.Command{ ...@@ -165,7 +164,7 @@ var rootROSubcommands = map[string]*cmds.Command{
}, },
"get": GetCmd, "get": GetCmd,
"dns": DNSCmd, "dns": DNSCmd,
"ls": lgc.NewCommand(LsCmd), "ls": LsCmd,
"name": { "name": {
Subcommands: map[string]*cmds.Command{ Subcommands: map[string]*cmds.Command{
"resolve": name.IpnsCmd, "resolve": name.IpnsCmd,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论