Unverified 提交 803e9a7e 作者: Steven Allen 提交者: GitHub

Merge pull request #5962 from ipfs/feat/coreapi-async-ls

Port `ipfs ls` to CoreAPI
......@@ -4,29 +4,22 @@ import (
"fmt"
"io"
"os"
"sort"
"text/tabwriter"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
unixfs "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs"
uio "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/io"
unixfspb "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/pb"
cmds "gx/ipfs/QmR77mMvvh8mJBBWQmBfQBu8oD38NUN4KE9SL2gDgAQNc6/go-ipfs-cmds"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format"
blockservice "gx/ipfs/QmVKQHuzni68SWByzJgBUCwHvvr4TWiXfutNWWwpZpp4rE/go-blockservice"
offline "gx/ipfs/QmYZwey1thDTynSrvd6qQkX24UpTka6TFhQ2v569UpoqxD/go-ipfs-exchange-offline"
merkledag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag"
cidenc "gx/ipfs/QmdPQx9fvN5ExVwMhRmh7YpCQJzJrFhd1AjVBwJmRMFJeX/go-cidutil/cidenc"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)
// LsLink contains printable data for a single ipld link in ls output
type LsLink struct {
Name, Hash string
Size uint64
Type unixfspb.Data_DataType
Type iface.FileType
}
// LsObject is an element of LsOutput
......@@ -72,11 +65,6 @@ The JSON output contains type information.
cmdkit.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
......@@ -84,12 +72,7 @@ The JSON output contains type information.
resolveType, _ := req.Options[lsResolveTypeOptionName].(bool)
resolveSize, _ := req.Options[lsSizeOptionName].(bool)
dserv := nd.DAG
if !resolveType && !resolveSize {
offlineexch := offline.Exchange(nd.Blockstore)
bserv := blockservice.New(nd.Blockstore, offlineexch)
dserv = merkledag.NewDAGService(bserv)
}
stream, _ := req.Options[lsStreamOptionName].(bool)
err = req.ParseBodyArgs()
if err != nil {
......@@ -102,91 +85,79 @@ The JSON output contains type information.
return err
}
var dagnodes []ipld.Node
for _, fpath := range paths {
p, err := iface.ParsePath(fpath)
if err != nil {
return err
}
dagnode, err := api.ResolveNode(req.Context, p)
if err != nil {
return err
}
dagnodes = append(dagnodes, dagnode)
}
ng := merkledag.NewSession(req.Context, nd.DAG)
ro := merkledag.NewReadOnlyDagService(ng)
var processLink func(path string, link LsLink) error
var dirDone func(i int)
stream, _ := req.Options[lsStreamOptionName].(bool)
processDir := func() (func(path string, link LsLink) error, func(i int)) {
return func(path string, link LsLink) error {
output := []LsObject{{
Hash: path,
Links: []LsLink{link},
}}
return res.Emit(&LsOutput{output})
}, func(i int) {}
}
done := func() error { return nil }
if !stream {
output := make([]LsObject, len(req.Arguments))
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}
var links []*ipld.Link
if dir == nil {
links = dagnode.Links()
} else {
links, err = dir.Links(req.Context)
if err != nil {
return err
}
}
outputLinks := make([]LsLink, len(links))
for j, link := range links {
lsLink, err := makeLsLink(req, dserv, resolveType, resolveSize, link, enc)
if err != nil {
return err
processDir = func() (func(path string, link LsLink) error, func(i int)) {
// for each dir
outputLinks := make([]LsLink, 0)
return func(path string, link LsLink) error {
// for each link
outputLinks = append(outputLinks, link)
return nil
}, func(i int) {
// after each dir
sort.Slice(outputLinks, func(i, j int) bool {
return outputLinks[i].Name < outputLinks[j].Name
})
output[i] = LsObject{
Hash: paths[i],
Links: outputLinks,
}
}
outputLinks[j] = *lsLink
}
output[i] = LsObject{
Hash: paths[i],
Links: outputLinks,
}
}
return cmds.EmitOnce(res, &LsOutput{output})
done = func() error {
return cmds.EmitOnce(res, &LsOutput{output})
}
}
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
for i, fpath := range paths {
p, err := iface.ParsePath(fpath)
if err != nil {
return err
}
var linkResults <-chan unixfs.LinkResult
if dir == nil {
linkResults = makeDagNodeLinkResults(req, dagnode)
} else {
linkResults = dir.EnumLinksAsync(req.Context)
results, err := api.Unixfs().Ls(req.Context, p,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
if err != nil {
return err
}
for linkResult := range linkResults {
if linkResult.Err != nil {
return linkResult.Err
processLink, dirDone = processDir()
for link := range results {
if link.Err != nil {
return link.Err
}
link := linkResult.Link
lsLink, err := makeLsLink(req, dserv, resolveType, resolveSize, link, enc)
if err != nil {
return err
lsLink := LsLink{
Name: link.Link.Name,
Hash: enc.Encode(link.Link.Cid),
Size: link.Size,
Type: link.Type,
}
output := []LsObject{{
Hash: paths[i],
Links: []LsLink{*lsLink},
}}
if err = res.Emit(&LsOutput{output}); err != nil {
if err := processLink(paths[i], lsLink); err != nil {
return err
}
}
dirDone(i)
}
return nil
return done()
},
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
......@@ -219,58 +190,6 @@ The JSON output contains type information.
Type: LsOutput{},
}
func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult {
links := dagnode.Links()
linkResults := make(chan unixfs.LinkResult, len(links))
defer close(linkResults)
for _, l := range links {
linkResults <- unixfs.LinkResult{
Link: l,
Err: nil,
}
}
return linkResults
}
func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolveType bool, resolveSize bool, link *ipld.Link, enc cidenc.Encoder) (*LsLink, error) {
t := unixfspb.Data_DataType(-1)
var size uint64
switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
size = link.Size
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolveType && !resolveSize {
// not an error
linkNode = nil
} else if err != nil {
return nil, err
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return nil, err
}
if resolveType {
t = d.Type()
}
if d.Type() == unixfs.TFile && resolveSize {
size = d.FileSize()
}
}
}
return &LsLink{
Name: link.Name,
Hash: enc.Encode(link.Cid),
Size: size,
Type: t,
}, nil
}
func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash string, ignoreBreaks bool) string {
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
stream, _ := req.Options[lsStreamOptionName].(bool)
......@@ -311,9 +230,9 @@ func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash
s := "%[1]s\t%[3]s\n"
switch {
case link.Type == unixfs.TDirectory && size:
case link.Type == iface.TDirectory && size:
s = "%[1]s\t-\t%[3]s/\n"
case link.Type == unixfs.TDirectory && !size:
case link.Type == iface.TDirectory && !size:
s = "%[1]s\t%[3]s/\n"
case size:
s = "%s\t%v\t%s\n"
......
......@@ -42,7 +42,12 @@ type UnixfsAddSettings struct {
Progress bool
}
type UnixfsLsSettings struct {
ResolveChildren bool
}
type UnixfsAddOption func(*UnixfsAddSettings) error
type UnixfsLsOption func(*UnixfsLsSettings) error
func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix, error) {
options := &UnixfsAddSettings{
......@@ -122,6 +127,21 @@ func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix,
return options, prefix, nil
}
func UnixfsLsOptions(opts ...UnixfsLsOption) (*UnixfsLsSettings, error) {
options := &UnixfsLsSettings{
ResolveChildren: true,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
type unixfsOpts struct{}
var Unixfs unixfsOpts
......@@ -290,3 +310,10 @@ func (unixfsOpts) Nocopy(enable bool) UnixfsAddOption {
return nil
}
}
func (unixfsOpts) ResolveChildren(resolve bool) UnixfsLsOption {
return func(settings *UnixfsLsSettings) error {
settings.ResolveChildren = resolve
return nil
}
}
......@@ -754,17 +754,18 @@ func (tp *provider) TestLs(t *testing.T) {
t.Error(err)
}
if len(links) != 1 {
t.Fatalf("expected 1 link, got %d", len(links))
link := (<-links).Link
if link.Size != 23 {
t.Fatalf("expected size = 23, got %d", link.Size)
}
if links[0].Size != 23 {
t.Fatalf("expected size = 23, got %d", links[0].Size)
if link.Name != "name-of-file" {
t.Fatalf("expected name = name-of-file, got %s", link.Name)
}
if links[0].Name != "name-of-file" {
t.Fatalf("expected name = name-of-file, got %s", links[0].Name)
if link.Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" {
t.Fatalf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", link.Cid)
}
if links[0].Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" {
t.Fatalf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", links[0].Cid)
if _, ok := <-links; ok {
t.Errorf("didn't expect a second link")
}
}
......
......@@ -2,11 +2,11 @@ package iface
import (
"context"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs"
ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format"
files "gx/ipfs/QmaXvvAVAQ5ABqM5xtjYmV85xmN5MkWAZsX9H9Fwo4FVXp/go-ipfs-files"
"gx/ipfs/QmaXvvAVAQ5ABqM5xtjYmV85xmN5MkWAZsX9H9Fwo4FVXp/go-ipfs-files"
)
type AddEvent struct {
......@@ -16,6 +16,25 @@ type AddEvent struct {
Size string `json:",omitempty"`
}
type FileType int32
const (
TRaw = FileType(unixfs.TRaw)
TFile = FileType(unixfs.TFile)
TDirectory = FileType(unixfs.TDirectory)
TMetadata = FileType(unixfs.TMetadata)
TSymlink = FileType(unixfs.TSymlink)
THAMTShard = FileType(unixfs.THAMTShard)
)
type LsLink struct {
Link *ipld.Link
Size uint64
Type FileType
Err error
}
// UnixfsAPI is the basic interface to immutable files in IPFS
// NOTE: This API is heavily WIP, things are guaranteed to break frequently
type UnixfsAPI interface {
......@@ -30,6 +49,7 @@ type UnixfsAPI interface {
// to operations performed on the returned file
Get(context.Context, Path) (files.Node, error)
// Ls returns the list of links in a directory
Ls(context.Context, Path) ([]*ipld.Link, error)
// Ls returns the list of links in a directory. Links aren't guaranteed to be
// returned in order
Ls(context.Context, Path, ...options.UnixfsLsOption) (<-chan LsLink, error)
}
......@@ -15,11 +15,13 @@ import (
unixfile "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/file"
uio "gx/ipfs/QmQ1JnYpnzkaurjW1yxkQxC2w3K1PorNE1nv1vaP5Le7sq/go-unixfs/io"
mfs "gx/ipfs/QmR66iEqVtNMbbZxTHPY3F6W5QLFqZEDbFD7gzbE9HpYXU/go-mfs"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
ipld "gx/ipfs/QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s/go-ipld-format"
bstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore"
blockservice "gx/ipfs/QmVKQHuzni68SWByzJgBUCwHvvr4TWiXfutNWWwpZpp4rE/go-blockservice"
files "gx/ipfs/QmaXvvAVAQ5ABqM5xtjYmV85xmN5MkWAZsX9H9Fwo4FVXp/go-ipfs-files"
dag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag"
merkledag "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag"
dagtest "gx/ipfs/Qmb2UEG2TAeVrEJSjqsZF7Y2he7wRDkrdt6c3bECxwZf4k/go-merkledag/test"
cidutil "gx/ipfs/QmdPQx9fvN5ExVwMhRmh7YpCQJzJrFhd1AjVBwJmRMFJeX/go-cidutil"
)
......@@ -143,31 +145,95 @@ func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (files.Node, er
// Ls returns the contents of an IPFS or IPNS object(s) at path p, with the format:
// `<link base58 hash> <link size in bytes> <link name>`
func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path) ([]*ipld.Link, error) {
dagnode, err := api.core().ResolveNode(ctx, p)
func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path, opts ...options.UnixfsLsOption) (<-chan coreiface.LsLink, error) {
settings, err := options.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
}
var ndlinks []*ipld.Link
dir, err := uio.NewDirectoryFromNode(api.dag, dagnode)
switch err {
case nil:
l, err := dir.Links(ctx)
ses := api.core().getSession(ctx)
uses := (*UnixfsAPI)(ses)
dagnode, err := ses.ResolveNode(ctx, p)
if err != nil {
return nil, err
}
dir, err := uio.NewDirectoryFromNode(ses.dag, dagnode)
if err == uio.ErrNotADir {
return uses.lsFromLinks(ctx, dagnode.Links(), settings)
}
if err != nil {
return nil, err
}
return uses.lsFromLinksAsync(ctx, dir, settings)
}
func (api *UnixfsAPI) processLink(ctx context.Context, linkres ft.LinkResult, settings *options.UnixfsLsSettings) coreiface.LsLink {
lnk := coreiface.LsLink{
Link: linkres.Link,
Err: linkres.Err,
}
if lnk.Err != nil {
return lnk
}
switch lnk.Link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
lnk.Type = coreiface.TFile
lnk.Size = lnk.Link.Size
case cid.DagProtobuf:
if !settings.ResolveChildren {
break
}
linkNode, err := lnk.Link.GetNode(ctx, api.dag)
if err != nil {
return nil, err
lnk.Err = err
break
}
if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := ft.FSNodeFromBytes(pn.Data())
if err != nil {
lnk.Err = err
break
}
lnk.Type = coreiface.FileType(d.Type())
lnk.Size = d.FileSize()
}
ndlinks = l
case uio.ErrNotADir:
ndlinks = dagnode.Links()
default:
return nil, err
}
links := make([]*ipld.Link, len(ndlinks))
for i, l := range ndlinks {
links[i] = &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}
return lnk
}
func (api *UnixfsAPI) lsFromLinksAsync(ctx context.Context, dir uio.Directory, settings *options.UnixfsLsSettings) (<-chan coreiface.LsLink, error) {
out := make(chan coreiface.LsLink)
go func() {
defer close(out)
for l := range dir.EnumLinksAsync(ctx) {
select {
case out <- api.processLink(ctx, l, settings): //TODO: perf: processing can be done in background and in parallel
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, settings *options.UnixfsLsSettings) (<-chan coreiface.LsLink, error) {
links := make(chan coreiface.LsLink, len(ndlinks))
for _, l := range ndlinks {
lr := ft.LinkResult{Link: &ipld.Link{Name: l.Name, Size: l.Size, Cid: l.Cid}}
links <- api.processLink(ctx, lr, settings) //TODO: can be parallel if settings.Async
}
close(links)
return links, nil
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论