提交 bb601845 作者: Michael Muré 提交者: Michael Muré

pin cmd: stream recursive pins

Add a --stream flag to stream the results instead of
accumulating the final result in memory.

This is a rework of https://github.com/ipfs/go-ipfs/pull/5005
上级 810cb607
...@@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.) ...@@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
} }
const ( const (
pinTypeOptionName = "type" pinTypeOptionName = "type"
pinQuietOptionName = "quiet" pinQuietOptionName = "quiet"
pinStreamOptionName = "stream"
) )
var listPinCmd = &cmds.Command{ var listPinCmd = &cmds.Command{
...@@ -313,6 +314,7 @@ Example: ...@@ -313,6 +314,7 @@ Example:
Options: []cmds.Option{ Options: []cmds.Option{
cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"), cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."), cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
cmds.BoolOption(pinStreamOptionName, "s", "Don't buffer pins before sending."),
}, },
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env) n, err := cmdenv.GetNode(env)
...@@ -326,9 +328,7 @@ Example: ...@@ -326,9 +328,7 @@ Example:
} }
typeStr, _ := req.Options[pinTypeOptionName].(string) typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil { stream, _ := req.Options[pinStreamOptionName].(bool)
return err
}
switch typeStr { switch typeStr {
case "all", "direct", "indirect", "recursive": case "all", "direct", "indirect", "recursive":
...@@ -337,34 +337,50 @@ Example: ...@@ -337,34 +337,50 @@ Example:
return err return err
} }
enc, err := cmdenv.GetCidEncoder(req) // For backward compatibility, we accumulate the pins in the same output type as before.
if err != nil { emit := res.Emit
return err lgcList := map[string]RefObject{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.RefKeyObject.Cid] = RefObject{Type: obj.RefKeyObject.Type}
return nil
}
} }
var keys map[cid.Cid]RefKeyObject
if len(req.Arguments) > 0 { if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api) err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api, emit)
} else { } else {
keys, err = pinLsAll(req.Context, typeStr, n) err = pinLsAll(req.Context, typeStr, n, emit)
} }
if err != nil { if err != nil {
return err return err
} }
refKeys := make(map[string]RefKeyObject, len(keys)) if !stream {
for k, v := range keys { return cmds.EmitOnce(res, &PinLsOutputWrapper{
refKeys[enc.Encode(k)] = v RefKeyList: RefKeyList{Keys: lgcList},
})
} }
return cmds.EmitOnce(res, &RefKeyList{Keys: refKeys}) return nil
}, },
Type: RefKeyList{}, Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{ Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool) quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)
if stream {
if quiet {
fmt.Fprintf(w, "%s\n", out.RefKeyObject.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", out.RefKeyObject.Cid, out.RefKeyObject.Type)
}
return nil
}
for k, v := range out.Keys { for k, v := range out.RefKeyList.Keys {
if quiet { if quiet {
fmt.Fprintf(w, "%s\n", k) fmt.Fprintf(w, "%s\n", k)
} else { } else {
...@@ -492,35 +508,44 @@ var verifyPinCmd = &cmds.Command{ ...@@ -492,35 +508,44 @@ var verifyPinCmd = &cmds.Command{
} }
type RefKeyObject struct { type RefKeyObject struct {
Cid string
Type string
}
type RefObject struct {
Type string Type string
} }
type RefKeyList struct { type RefKeyList struct {
Keys map[string]RefKeyObject Keys map[string]RefObject
} }
func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI) (map[cid.Cid]RefKeyObject, error) { // Pin ls needs to output two different type depending on if it's streamed or not.
// We use this to bypass the cmds lib refusing to have interface{}
type PinLsOutputWrapper struct {
RefKeyList
RefKeyObject
}
func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr) mode, ok := pin.StringToMode(typeStr)
if !ok { if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr) return fmt.Errorf("invalid pin mode '%s'", typeStr)
} }
keys := make(map[cid.Cid]RefKeyObject)
for _, p := range args { for _, p := range args {
c, err := api.ResolvePath(ctx, path.New(p)) c, err := api.ResolvePath(ctx, path.New(p))
if err != nil { if err != nil {
return nil, err return err
} }
pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode) pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil { if err != nil {
return nil, err return err
} }
if !pinned { if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p) return fmt.Errorf("path '%s' is not pinned", p)
} }
switch pinType { switch pinType {
...@@ -528,44 +553,82 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN ...@@ -528,44 +553,82 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN
default: default:
pinType = "indirect through " + pinType pinType = "indirect through " + pinType
} }
keys[c.Cid()] = RefKeyObject{
Type: pinType, err = emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: pinType,
Cid: c.Cid().String(),
},
})
if err != nil {
return err
} }
} }
return keys, nil return nil
} }
func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[cid.Cid]RefKeyObject, error) { func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
keys := cid.NewSet()
keys := make(map[cid.Cid]RefKeyObject)
AddToResultKeys := func(keyList []cid.Cid, typeStr string) { AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList { for _, c := range keyList {
keys[c] = RefKeyObject{ if keys.Visit(c) {
Type: typeStr, err := emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: typeStr,
Cid: c.String(),
},
})
if err != nil {
return err
}
} }
} }
return nil
} }
if typeStr == "direct" || typeStr == "all" { if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(n.Pinning.DirectKeys(), "direct") err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
if err != nil {
return err
}
} }
if typeStr == "indirect" || typeStr == "all" { if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() { for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit) var visitErr error
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: "indirect",
Cid: c.String(),
},
})
if err != nil {
visitErr = err
}
}
return r
})
if visitErr != nil {
return visitErr
}
if err != nil { if err != nil {
return nil, err return err
} }
} }
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
} }
return keys, nil return nil
} }
// PinVerifyRes is the result returned for each pin checked in "pin verify" // PinVerifyRes is the result returned for each pin checked in "pin verify"
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论