提交 aa7494b0 作者: keks 提交者: Steven Allen

repo gc: use cmds2

License: MIT
Signed-off-by: 's avatarkeks <keks@cryptoscope.co>
上级 caf855b4
......@@ -38,7 +38,7 @@ var RepoCmd = &cmds.Command{
Subcommands: map[string]*cmds.Command{
"stat": repoStatCmd,
"gc": lgc.NewCommand(repoGcCmd),
"gc": repoGcCmd,
"fsck": lgc.NewCommand(RepoFsckCmd),
"version": lgc.NewCommand(repoVersionCmd),
"verify": lgc.NewCommand(repoVerifyCmd),
......@@ -51,7 +51,7 @@ type GcResult struct {
Error string `json:",omitempty"`
}
var repoGcCmd = &oldcmds.Command{
var repoGcCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Perform a garbage collection sweep on the repo.",
ShortDescription: `
......@@ -64,87 +64,63 @@ order to reclaim hard disk space.
cmdkit.BoolOption("stream-errors", "Stream errors."),
cmdkit.BoolOption("quiet", "q", "Write minimal output."),
},
Run: func(req oldcmds.Request, res oldcmds.Response) {
n, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
streamErrors, _, _ := res.Request().Option("stream-errors").Bool()
gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())
outChan := make(chan interface{})
res.SetOutput(outChan)
go func() {
defer close(outChan)
if streamErrors {
errs := false
for res := range gcOutChan {
if res.Error != nil {
select {
case outChan <- &GcResult{Error: res.Error.Error()}:
case <-req.Context().Done():
return
}
errs = true
} else {
select {
case outChan <- &GcResult{Key: res.KeyRemoved}:
case <-req.Context().Done():
return
}
}
}
if errs {
res.SetError(errors.New("encountered errors during gc run"), cmdkit.ErrNormal)
}
} else {
err := corerepo.CollectResult(req.Context(), gcOutChan, func(k cid.Cid) {
select {
case outChan <- &GcResult{Key: k}:
case <-req.Context().Done():
}
})
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
streamErrors, _ := req.Options["stream-errors"].(bool)
gcOutChan := corerepo.GarbageCollectAsync(n, req.Context)
if streamErrors {
errs := false
for res := range gcOutChan {
if res.Error != nil {
re.Emit(&GcResult{Error: res.Error.Error()})
errs = true
} else {
re.Emit(&GcResult{Key: res.KeyRemoved})
}
}
}()
},
Type: GcResult{},
Marshalers: oldcmds.MarshalerMap{
oldcmds.Text: func(res oldcmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
if errs {
return errors.New("encountered errors during gc run")
}
quiet, _, err := res.Request().Option("quiet").Bool()
} else {
err := corerepo.CollectResult(req.Context, gcOutChan, func(k cid.Cid) {
re.Emit(&GcResult{Key: k})
})
if err != nil {
return nil, err
return err
}
}
return nil
},
Type: GcResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
quiet, _ := req.Options["quiet"].(bool)
obj, ok := v.(*GcResult)
if !ok {
return nil, e.TypeErr(obj, v)
return e.TypeErr(obj, v)
}
if obj.Error != "" {
fmt.Fprintf(res.Stderr(), "Error: %s\n", obj.Error)
return nil, nil
_, err := fmt.Fprintf(w, "Error: %s\n", obj.Error)
return err
}
msg := obj.Key.String() + "\n"
if !quiet {
msg = "removed " + msg
prefix := "removed "
if quiet {
prefix = ""
}
return bytes.NewBufferString(msg), nil
},
_, err := fmt.Fprintf(w, "%s%s\n", prefix, obj.Key)
return err
}),
},
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论