Unverified 提交 20e64aeb 作者: Steven Allen 提交者: GitHub

Merge pull request #6493 from MichaelMure/streamed-pin-ls

pin cmd: stream recursive pins
......@@ -65,7 +65,7 @@ The JSON output contains type information.
cmds.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmds.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmds.BoolOption(lsSizeOptionName, "Resolve linked objects to find out their file size.").WithDefault(true),
cmds.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
cmds.BoolOption(lsStreamOptionName, "s", "Enable experimental streaming of directory entries as they are traversed."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
......
......@@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
}
const (
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinStreamOptionName = "stream"
)
var listPinCmd = &cmds.Command{
......@@ -313,6 +314,7 @@ Example:
Options: []cmds.Option{
cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
cmds.BoolOption(pinStreamOptionName, "s", "Enable streaming of pins as they are discovered."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
......@@ -326,9 +328,7 @@ Example:
}
typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil {
return err
}
stream, _ := req.Options[pinStreamOptionName].(bool)
switch typeStr {
case "all", "direct", "indirect", "recursive":
......@@ -337,34 +337,50 @@ Example:
return err
}
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
lgcList := map[string]PinLsType{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type}
return nil
}
}
var keys map[cid.Cid]RefKeyObject
if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api)
err = pinLsKeys(req, typeStr, n, api, emit)
} else {
keys, err = pinLsAll(req.Context, typeStr, n)
err = pinLsAll(req, typeStr, n, emit)
}
if err != nil {
return err
}
refKeys := make(map[string]RefKeyObject, len(keys))
for k, v := range keys {
refKeys[enc.Encode(k)] = v
if !stream {
return cmds.EmitOnce(res, &PinLsOutputWrapper{
PinLsList: PinLsList{Keys: lgcList},
})
}
return cmds.EmitOnce(res, &RefKeyList{Keys: refKeys})
return nil
},
Type: RefKeyList{},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)
if stream {
if quiet {
fmt.Fprintf(w, "%s\n", out.PinLsObject.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", out.PinLsObject.Cid, out.PinLsObject.Type)
}
return nil
}
for k, v := range out.Keys {
for k, v := range out.PinLsList.Keys {
if quiet {
fmt.Fprintf(w, "%s\n", k)
} else {
......@@ -377,6 +393,144 @@ Example:
},
}
// PinLsOutputWrapper is the output type of the pin ls command.
// Pin ls needs to output two different type depending on if it's streamed or not.
// We use this to bypass the cmds lib refusing to have interface{}
type PinLsOutputWrapper struct {
PinLsList
PinLsObject
}
// PinLsList is a set of pins with their type
type PinLsList struct {
Keys map[string]PinLsType `json:",omitempty"`
}
// PinLsType contains the type of a pin
type PinLsType struct {
Type string
}
// PinLsObject contains the description of a pin
type PinLsObject struct {
Cid string `json:",omitempty"`
Type string `json:",omitempty"`
}
func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}
for _, p := range req.Arguments {
c, err := api.ResolvePath(req.Context, path.New(p))
if err != nil {
return err
}
pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return err
}
if !pinned {
return fmt.Errorf("path '%s' is not pinned", p)
}
switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
err = emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(c.Cid()),
},
})
if err != nil {
return err
}
}
return nil
}
func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}
keys := cid.NewSet()
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: typeStr,
Cid: enc.Encode(c),
},
})
if err != nil {
return err
}
}
}
return nil
}
if typeStr == "direct" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
var visitErr error
err := dag.EnumerateChildren(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: "indirect",
Cid: enc.Encode(c),
},
})
if err != nil {
visitErr = err
}
}
return r
})
if visitErr != nil {
return visitErr
}
if err != nil {
return err
}
}
}
return nil
}
const (
pinUnpinOptionName = "unpin"
)
......@@ -491,83 +645,6 @@ var verifyPinCmd = &cmds.Command{
},
}
type RefKeyObject struct {
Type string
}
type RefKeyList struct {
Keys map[string]RefKeyObject
}
func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI) (map[cid.Cid]RefKeyObject, error) {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr)
}
keys := make(map[cid.Cid]RefKeyObject)
for _, p := range args {
c, err := api.ResolvePath(ctx, path.New(p))
if err != nil {
return nil, err
}
pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return nil, err
}
if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p)
}
switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
keys[c.Cid()] = RefKeyObject{
Type: pinType,
}
}
return keys, nil
}
func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[cid.Cid]RefKeyObject, error) {
keys := make(map[cid.Cid]RefKeyObject)
AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
for _, c := range keyList {
keys[c] = RefKeyObject{
Type: typeStr,
}
}
}
if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
}
return keys, nil
}
// PinVerifyRes is the result returned for each pin checked in "pin verify"
type PinVerifyRes struct {
Cid string
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论