提交 2462bf91 作者: Overbool 提交者: Steven Allen

commands/filestore: use res.Emit directly

License: MIT
Signed-off-by: 's avatarOverbool <overbool.xu@gmail.com>
上级 013de54d
package commands package commands
import ( import (
"context"
"fmt" "fmt"
"io" "io"
...@@ -56,11 +55,7 @@ The output is: ...@@ -56,11 +55,7 @@ The output is:
} }
args := req.Arguments args := req.Arguments
if len(args) > 0 { if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes { return listByArgs(res, fs, args)
return filestore.List(fs, c)
})
return res.Emit(out)
} }
fileOrder, _ := req.Options[fileOrderOptionName].(bool) fileOrder, _ := req.Options[fileOrderOptionName].(bool)
...@@ -69,8 +64,17 @@ The output is: ...@@ -69,8 +64,17 @@ The output is:
return err return err
} }
out := listResToChan(req.Context, next) for {
return res.Emit(out) r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
}
return nil
}, },
PostRun: cmds.PostRunMap{ PostRun: cmds.PostRunMap{
cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError { cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError {
...@@ -122,10 +126,7 @@ For ERROR entries the error will also be printed to stderr. ...@@ -122,10 +126,7 @@ For ERROR entries the error will also be printed to stderr.
} }
args := req.Arguments args := req.Arguments
if len(args) > 0 { if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes { return listByArgs(res, fs, args)
return filestore.Verify(fs, c)
})
return res.Emit(out)
} }
fileOrder, _ := req.Options[fileOrderOptionName].(bool) fileOrder, _ := req.Options[fileOrderOptionName].(bool)
...@@ -133,8 +134,18 @@ For ERROR entries the error will also be printed to stderr. ...@@ -133,8 +134,18 @@ For ERROR entries the error will also be printed to stderr.
if err != nil { if err != nil {
return err return err
} }
out := listResToChan(req.Context, next)
return res.Emit(out) for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
}
return nil
}, },
Encoders: cmds.EncoderMap{ Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *filestore.ListRes) error { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *filestore.ListRes) error {
...@@ -162,29 +173,19 @@ var dupsFileStore = &cmds.Command{ ...@@ -162,29 +173,19 @@ var dupsFileStore = &cmds.Command{
return err return err
} }
out := make(chan interface{}, 128)
go func() {
defer close(out)
for cid := range ch { for cid := range ch {
have, err := fs.MainBlockstore().Has(cid) have, err := fs.MainBlockstore().Has(cid)
if err != nil { if err != nil {
select { return res.Emit(&RefWrapper{Err: err.Error()})
case out <- &RefWrapper{Err: err.Error()}:
case <-req.Context.Done():
}
return
} }
if have { if have {
select { if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil {
case out <- &RefWrapper{Ref: cid.String()}: return err
case <-req.Context.Done():
return
} }
} }
} }
}()
return res.Emit(out) return nil
}, },
Encoders: cmds.EncoderMap{ Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error {
...@@ -212,49 +213,24 @@ func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, e ...@@ -212,49 +213,24 @@ func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, e
return n, fs, err return n, fs, err
} }
func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} { func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for {
r := next()
if r == nil {
return
}
select {
case out <- r:
case <-ctx.Done():
return
}
}
}()
return out
}
func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for _, arg := range args { for _, arg := range args {
c, err := cid.Decode(arg) c, err := cid.Decode(arg)
if err != nil { if err != nil {
select { ret := &filestore.ListRes{
case out <- &filestore.ListRes{
Status: filestore.StatusOtherError, Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err), ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}:
case <-ctx.Done():
} }
if err := res.Emit(ret); err != nil {
return err
}
continue continue
} }
r := action(c) r := filestore.Verify(fs, c)
select { if err := res.Emit(r); err != nil {
case out <- r: return err
case <-ctx.Done():
return
} }
} }
}()
return out return nil
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论