Unverified 提交 b622e334 作者: Steven Allen 提交者: GitHub

Merge pull request #5674 from overbool/refactor/commands/pin

commands/pin: use new cmds lib
package commands
import (
"bytes"
"context"
"fmt"
"io"
"os"
"time"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
......@@ -19,8 +19,9 @@ import (
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
bserv "gx/ipfs/QmVPeMNK9DfGLXDZzs2W4RoFWC9Zq1EnLGmLXtYtWrNdcW/go-blockservice"
"gx/ipfs/QmYMQuypUbgsdNHmuCBSUJV6wdQVsBHRivNAp3efHJwZJD/go-verifcid"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
dag "gx/ipfs/QmaDBne4KeY3UepeqSVKYpSmQGa3q9zP6x3LfVF2UjF3Hc/go-merkledag"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)
var PinCmd = &cmds.Command{
......@@ -65,117 +66,117 @@ var addPinCmd = &cmds.Command{
cmdkit.BoolOption(pinProgressOptionName, "Show progress"),
},
Type: AddPinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
api, err := req.InvocContext().GetApi()
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
defer n.Blockstore.PinLock().Unlock()
// set recursive flag
recursive, _, err := req.Option(pinRecursiveOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
recursive, _ := req.Options[pinRecursiveOptionName].(bool)
showProgress, _ := req.Options[pinProgressOptionName].(bool)
if err := req.ParseBodyArgs(); err != nil {
return err
}
showProgress, _, _ := req.Option(pinProgressOptionName).Bool()
if !showProgress {
added, err := corerepo.Pin(n, api, req.Context(), req.Arguments(), recursive)
added, err := corerepo.Pin(n, api, req.Context, req.Arguments, recursive)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
res.SetOutput(&AddPinOutput{Pins: cidsToStrings(added)})
return
return cmds.EmitOnce(res, &AddPinOutput{Pins: cidsToStrings(added)})
}
out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))
v := new(dag.ProgressTracker)
ctx := v.DeriveContext(req.Context())
ctx := v.DeriveContext(req.Context)
type pinResult struct {
pins []cid.Cid
err error
}
ch := make(chan pinResult, 1)
go func() {
added, err := corerepo.Pin(n, api, ctx, req.Arguments(), recursive)
added, err := corerepo.Pin(n, api, ctx, req.Arguments, recursive)
ch <- pinResult{pins: added, err: err}
}()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
defer close(out)
for {
select {
case val := <-ch:
if val.err != nil {
res.SetError(val.err, cmdkit.ErrNormal)
return
return val.err
}
if pv := v.Value(); pv != 0 {
out <- &AddPinOutput{Progress: v.Value()}
if err := res.Emit(&AddPinOutput{Progress: v.Value()}); err != nil {
return err
}
}
out <- &AddPinOutput{Pins: cidsToStrings(val.pins)}
return
return res.Emit(&AddPinOutput{Pins: cidsToStrings(val.pins)})
case <-ticker.C:
out <- &AddPinOutput{Progress: v.Value()}
if err := res.Emit(&AddPinOutput{Progress: v.Value()}); err != nil {
return err
}
case <-ctx.Done():
log.Error(ctx.Err())
res.SetError(ctx.Err(), cmdkit.ErrNormal)
return
return ctx.Err()
}
}
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
var added []string
switch out := v.(type) {
case *AddPinOutput:
if out.Pins != nil {
added = out.Pins
} else {
// this can only happen if the progress option is set
fmt.Fprintf(res.Stderr(), "Fetched/Processed %d nodes\r", out.Progress)
}
if res.Error() != nil {
return nil, res.Error()
}
default:
return nil, e.TypeErr(out, v)
}
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *AddPinOutput) error {
rec, found := req.Options["recursive"].(bool)
var pintype string
rec, found, _ := res.Request().Option("recursive").Bool()
if rec || !found {
pintype = "recursively"
} else {
pintype = "directly"
}
buf := new(bytes.Buffer)
for _, k := range added {
fmt.Fprintf(buf, "pinned %s %s\n", k, pintype)
for _, k := range out.Pins {
fmt.Fprintf(w, "pinned %s %s\n", k, pintype)
}
return nil
}),
},
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
for {
v, err := res.Next()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
out, ok := v.(*AddPinOutput)
if !ok {
return e.TypeErr(out, v)
}
if out.Pins == nil {
// this can only happen if the progress option is set
fmt.Fprintf(os.Stderr, "Fetched/Processed %d nodes\r", out.Progress)
} else {
err = re.Emit(out)
if err != nil {
return err
}
}
}
return buf, nil
},
},
}
......@@ -196,52 +197,38 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
cmdkit.BoolOption(pinRecursiveOptionName, "r", "Recursively unpin the object linked to by the specified object(s).").WithDefault(true),
},
Type: PinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
api, err := req.InvocContext().GetApi()
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
// set recursive flag
recursive, _, err := req.Option(pinRecursiveOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
recursive, _ := req.Options[pinRecursiveOptionName].(bool)
if err := req.ParseBodyArgs(); err != nil {
return err
}
removed, err := corerepo.Unpin(n, api, req.Context(), req.Arguments(), recursive)
removed, err := corerepo.Unpin(n, api, req.Context, req.Arguments, recursive)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
res.SetOutput(&PinOutput{cidsToStrings(removed)})
return cmds.EmitOnce(res, &PinOutput{cidsToStrings(removed)})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinOutput) error {
for _, k := range out.Pins {
fmt.Fprintf(w, "unpinned %s\n", k)
}
added, ok := v.(*PinOutput)
if !ok {
return nil, e.TypeErr(added, v)
}
buf := new(bytes.Buffer)
for _, k := range added.Pins {
fmt.Fprintf(buf, "unpinned %s\n", k)
}
return buf, nil
},
return nil
}),
},
}
......@@ -301,74 +288,58 @@ Example:
cmdkit.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmdkit.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
api, err := req.InvocContext().GetApi()
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
typeStr, _, err := req.Option(pinTypeOptionName).String()
typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
switch typeStr {
case "all", "direct", "indirect", "recursive":
default:
err = fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr)
res.SetError(err, cmdkit.ErrClient)
return
return err
}
var keys map[string]RefKeyObject
if len(req.Arguments()) > 0 {
keys, err = pinLsKeys(req.Context(), req.Arguments(), typeStr, n, api)
if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api)
} else {
keys, err = pinLsAll(req.Context(), typeStr, n)
keys, err = pinLsAll(req.Context, typeStr, n)
}
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
} else {
res.SetOutput(&RefKeyList{Keys: keys})
return err
}
return cmds.EmitOnce(res, &RefKeyList{Keys: keys})
},
Type: RefKeyList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
quiet, _, err := res.Request().Option(pinQuietOptionName).Bool()
if err != nil {
return nil, err
}
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
keys, ok := v.(*RefKeyList)
if !ok {
return nil, e.TypeErr(keys, v)
}
out := new(bytes.Buffer)
for k, v := range keys.Keys {
for k, v := range out.Keys {
if quiet {
fmt.Fprintf(out, "%s\n", k)
fmt.Fprintf(w, "%s\n", k)
} else {
fmt.Fprintf(out, "%s %s\n", k, v.Type)
fmt.Fprintf(w, "%s %s\n", k, v.Type)
}
}
return out, nil
},
return nil
}),
},
}
......@@ -394,54 +365,36 @@ new pin and removing the old one.
cmdkit.BoolOption(pinUnpinOptionName, "Remove the old pin.").WithDefault(true),
},
Type: PinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
api, err := req.InvocContext().GetApi()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
unpin, _, err := req.Option(pinUnpinOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
unpin, _ := req.Options[pinUnpinOptionName].(bool)
from, err := iface.ParsePath(req.Arguments()[0])
from, err := iface.ParsePath(req.Arguments[0])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
to, err := iface.ParsePath(req.Arguments()[1])
to, err := iface.ParsePath(req.Arguments[1])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
err = api.Pin().Update(req.Context(), from, to, options.Pin.Unpin(unpin))
err = api.Pin().Update(req.Context, from, to, options.Pin.Unpin(unpin))
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
res.SetOutput(&PinOutput{Pins: []string{from.String(), to.String()}})
return cmds.EmitOnce(res, &PinOutput{Pins: []string{from.String(), to.String()}})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
added, ok := v.(*PinOutput)
if !ok {
return nil, e.TypeErr(added, v)
}
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "updated %s to %s\n", added.Pins[0], added.Pins[1])
return buf, nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinOutput) error {
fmt.Fprintf(w, "updated %s to %s\n", out.Pins[0], out.Pins[1])
return nil
}),
},
}
......@@ -457,51 +410,40 @@ var verifyPinCmd = &cmds.Command{
cmdkit.BoolOption(pinVerboseOptionName, "Also write the hashes of non-broken pins."),
cmdkit.BoolOption(pinQuietOptionName, "q", "Write just hashes of broken pins."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
verbose, _, _ := res.Request().Option(pinVerboseOptionName).Bool()
quiet, _, _ := res.Request().Option(pinQuietOptionName).Bool()
verbose, _ := req.Options[pinVerboseOptionName].(bool)
quiet, _ := req.Options[pinQuietOptionName].(bool)
if verbose && quiet {
res.SetError(fmt.Errorf("the --verbose and --quiet options can not be used at the same time"), cmdkit.ErrNormal)
return fmt.Errorf("the --verbose and --quiet options can not be used at the same time")
}
opts := pinVerifyOpts{
explain: !quiet,
includeOk: verbose,
}
out := pinVerify(req.Context(), n, opts)
out := pinVerify(req.Context, n, opts)
res.SetOutput(out)
return res.Emit(out)
},
Type: PinVerifyRes{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
quiet, _, _ := res.Request().Option(pinQuietOptionName).Bool()
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinVerifyRes) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
out, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
r, ok := out.(*PinVerifyRes)
if !ok {
return nil, e.TypeErr(r, out)
}
buf := &bytes.Buffer{}
if quiet && !r.Ok {
fmt.Fprintf(buf, "%s\n", r.Cid)
if quiet && !out.Ok {
fmt.Fprintf(w, "%s\n", out.Cid)
} else if !quiet {
r.Format(buf)
out.Format(w)
}
return buf, nil
},
return nil
}),
},
}
......
......@@ -135,7 +135,7 @@ var rootSubcommands = map[string]*cmds.Command{
"mount": MountCmd,
"name": name.NameCmd,
"object": ocmd.ObjectCmd,
"pin": lgc.NewCommand(PinCmd),
"pin": PinCmd,
"ping": PingCmd,
"p2p": P2PCmd,
"refs": lgc.NewCommand(RefsCmd),
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论