Unverified 提交 a98a7155 作者: Steven Allen 提交者: GitHub

Merge pull request #5671 from overbool/refactor/commands/dht

commands/dht: use new cmds lib
package commands package commands
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"time" "time"
cmds "github.com/ipfs/go-ipfs/commands" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag"
path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format" ipld "gx/ipfs/QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o/go-ipld-format"
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
dag "gx/ipfs/QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc/go-merkledag"
path "gx/ipfs/QmT3rzed1ppXefourpmoZ7tyVQfsGPQZ1pHDngLmCvXxd3/go-path"
peer "gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer" peer "gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer"
pstore "gx/ipfs/QmTTJcDL3gsnGDALjh2fDGg1onGRUdVgNL2hU2WEZcVrMX/go-libp2p-peerstore" pstore "gx/ipfs/QmTTJcDL3gsnGDALjh2fDGg1onGRUdVgNL2hU2WEZcVrMX/go-libp2p-peerstore"
b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58" b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58"
routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing" routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing"
notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications" notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
) )
var ErrNotDHT = errors.New("routing service is not a DHT") var ErrNotDHT = errors.New("routing service is not a DHT")
...@@ -60,32 +59,28 @@ var queryDhtCmd = &cmds.Command{ ...@@ -60,32 +59,28 @@ var queryDhtCmd = &cmds.Command{
Options: []cmdkit.Option{ Options: []cmdkit.Option{
cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := req.InvocContext().GetNode() nd, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if n.DHT == nil { if nd.DHT == nil {
res.SetError(ErrNotDHT, cmdkit.ErrNormal) return ErrNotDHT
return
} }
id, err := peer.IDB58Decode(req.Arguments()[0]) id, err := peer.IDB58Decode(req.Arguments[0])
if err != nil { if err != nil {
res.SetError(cmds.ClientError("invalid peer ID"), cmdkit.ErrClient) return cmds.ClientError("invalid peer ID")
return
} }
ctx, cancel := context.WithCancel(req.Context()) ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx) ctx, events := notif.RegisterForQueryEvents(ctx)
closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id)) closestPeers, err := nd.DHT.GetClosestPeers(ctx, string(id))
if err != nil { if err != nil {
cancel() cancel()
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
go func() { go func() {
...@@ -98,22 +93,16 @@ var queryDhtCmd = &cmds.Command{ ...@@ -98,22 +93,16 @@ var queryDhtCmd = &cmds.Command{
} }
}() }()
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))
go func() {
defer close(outChan)
for e := range events { for e := range events {
select { if err := res.Emit(e); err != nil {
case outChan <- e: return err
case <-req.Context().Done():
return
} }
} }
}()
return nil
}, },
Marshalers: cmds.MarshalerMap{ Encoders: cmds.EncoderMap{
cmds.Text: func() cmds.Marshaler { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{ pfm := pfuncMap{
notif.PeerResponse: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { notif.PeerResponse: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
for _, p := range obj.Responses { for _, p := range obj.Responses {
...@@ -121,25 +110,10 @@ var queryDhtCmd = &cmds.Command{ ...@@ -121,25 +110,10 @@ var queryDhtCmd = &cmds.Command{
} }
}, },
} }
verbose, _ := req.Options[dhtVerboseOptionName].(bool)
return func(res cmds.Response) (io.Reader, error) { printEvent(out, w, verbose, pfm)
v, err := unwrapOutput(res.Output()) return nil
if err != nil { }),
return nil, err
}
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, e.TypeErr(obj, v)
}
verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool()
buf := new(bytes.Buffer)
printEvent(obj, buf, verbose, pfm)
return buf, nil
}
}(),
}, },
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
...@@ -161,52 +135,34 @@ var findProvidersDhtCmd = &cmds.Command{ ...@@ -161,52 +135,34 @@ var findProvidersDhtCmd = &cmds.Command{
cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."),
cmdkit.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20), cmdkit.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := req.InvocContext().GetNode() n, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if n.Routing == nil { if n.Routing == nil {
res.SetError(ErrNotOnline, cmdkit.ErrNormal) return ErrNotOnline
return
} }
numProviders, _, err := res.Request().Option(numProvidersOptionName).Int() numProviders, _ := req.Options[numProvidersOptionName].(int)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if numProviders < 1 { if numProviders < 1 {
res.SetError(fmt.Errorf("number of providers must be greater than 0"), cmdkit.ErrNormal) return fmt.Errorf("number of providers must be greater than 0")
return
} }
c, err := cid.Parse(req.Arguments()[0]) c, err := cid.Parse(req.Arguments[0])
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
outChan := make(chan interface{}) ctx, cancel := context.WithCancel(req.Context)
res.SetOutput((<-chan interface{})(outChan))
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx) ctx, events := notif.RegisterForQueryEvents(ctx)
pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context().Done():
return
}
}
}()
go func() { go func() {
defer cancel() defer cancel()
...@@ -218,9 +174,16 @@ var findProvidersDhtCmd = &cmds.Command{ ...@@ -218,9 +174,16 @@ var findProvidersDhtCmd = &cmds.Command{
}) })
} }
}() }()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
return nil
}, },
Marshalers: cmds.MarshalerMap{ Encoders: cmds.EncoderMap{
cmds.Text: func() func(cmds.Response) (io.Reader, error) { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{ pfm := pfuncMap{
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
if verbose { if verbose {
...@@ -241,23 +204,11 @@ var findProvidersDhtCmd = &cmds.Command{ ...@@ -241,23 +204,11 @@ var findProvidersDhtCmd = &cmds.Command{
}, },
} }
return func(res cmds.Response) (io.Reader, error) { verbose, _ := req.Options[dhtVerboseOptionName].(bool)
verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() printEvent(out, w, verbose, pfm)
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, e.TypeErr(obj, v)
}
buf := new(bytes.Buffer) return nil
printEvent(obj, buf, verbose, pfm) }),
return buf, nil
}
}(),
}, },
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
...@@ -278,71 +229,51 @@ var provideRefDhtCmd = &cmds.Command{ ...@@ -278,71 +229,51 @@ var provideRefDhtCmd = &cmds.Command{
cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."),
cmdkit.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."), cmdkit.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := req.InvocContext().GetNode() nd, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if n.Routing == nil { if nd.Routing == nil {
res.SetError(ErrNotOnline, cmdkit.ErrNormal) return ErrNotOnline
return
} }
if len(n.PeerHost.Network().Conns()) == 0 { if len(nd.PeerHost.Network().Conns()) == 0 {
res.SetError(errors.New("cannot provide, no connected peers"), cmdkit.ErrNormal) return errors.New("cannot provide, no connected peers")
return
} }
rec, _, _ := req.Option(recursiveOptionName).Bool() rec, _ := req.Options[recursiveOptionName].(bool)
var cids []cid.Cid var cids []cid.Cid
for _, arg := range req.Arguments() { for _, arg := range req.Arguments {
c, err := cid.Decode(arg) c, err := cid.Decode(arg)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
has, err := n.Blockstore.Has(c) has, err := nd.Blockstore.Has(c)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if !has { if !has {
res.SetError(fmt.Errorf("block %s not found locally, cannot provide", c), cmdkit.ErrNormal) return fmt.Errorf("block %s not found locally, cannot provide", c)
return
} }
cids = append(cids, c) cids = append(cids, c)
} }
outChan := make(chan interface{}) ctx, cancel := context.WithCancel(req.Context)
res.SetOutput((<-chan interface{})(outChan))
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx) ctx, events := notif.RegisterForQueryEvents(ctx)
go func() { go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context().Done():
return
}
}
}()
go func() {
defer cancel() defer cancel()
var err error var err error
if rec { if rec {
err = provideKeysRec(ctx, n.Routing, n.DAG, cids) err = provideKeysRec(ctx, nd.Routing, nd.DAG, cids)
} else { } else {
err = provideKeys(ctx, n.Routing, cids) err = provideKeys(ctx, nd.Routing, cids)
} }
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
...@@ -351,9 +282,17 @@ var provideRefDhtCmd = &cmds.Command{ ...@@ -351,9 +282,17 @@ var provideRefDhtCmd = &cmds.Command{
}) })
} }
}() }()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
return nil
}, },
Marshalers: cmds.MarshalerMap{ Encoders: cmds.EncoderMap{
cmds.Text: func() func(res cmds.Response) (io.Reader, error) { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{ pfm := pfuncMap{
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
if verbose { if verbose {
...@@ -362,22 +301,11 @@ var provideRefDhtCmd = &cmds.Command{ ...@@ -362,22 +301,11 @@ var provideRefDhtCmd = &cmds.Command{
}, },
} }
return func(res cmds.Response) (io.Reader, error) { verbose, _ := req.Options[dhtVerboseOptionName].(bool)
verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() printEvent(out, w, verbose, pfm)
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, e.TypeErr(obj, v)
}
buf := new(bytes.Buffer) return nil
printEvent(obj, buf, verbose, pfm) }),
return buf, nil
}
}(),
}, },
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
...@@ -430,44 +358,27 @@ var findPeerDhtCmd = &cmds.Command{ ...@@ -430,44 +358,27 @@ var findPeerDhtCmd = &cmds.Command{
Options: []cmdkit.Option{ Options: []cmdkit.Option{
cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := req.InvocContext().GetNode() nd, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if n.Routing == nil { if nd.Routing == nil {
res.SetError(ErrNotOnline, cmdkit.ErrNormal) return ErrNotOnline
return
} }
pid, err := peer.IDB58Decode(req.Arguments()[0]) pid, err := peer.IDB58Decode(req.Arguments[0])
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
outChan := make(chan interface{}) ctx, cancel := context.WithCancel(req.Context)
res.SetOutput((<-chan interface{})(outChan))
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx) ctx, events := notif.RegisterForQueryEvents(ctx)
go func() { go func() {
defer close(outChan)
for v := range events {
select {
case outChan <- v:
case <-req.Context().Done():
}
}
}()
go func() {
defer cancel() defer cancel()
pi, err := n.Routing.FindPeer(ctx, pid) pi, err := nd.Routing.FindPeer(ctx, pid)
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
...@@ -481,9 +392,17 @@ var findPeerDhtCmd = &cmds.Command{ ...@@ -481,9 +392,17 @@ var findPeerDhtCmd = &cmds.Command{
Responses: []*pstore.PeerInfo{&pi}, Responses: []*pstore.PeerInfo{&pi},
}) })
}() }()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
return nil
}, },
Marshalers: cmds.MarshalerMap{ Encoders: cmds.EncoderMap{
cmds.Text: func() func(cmds.Response) (io.Reader, error) { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{ pfm := pfuncMap{
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
pi := obj.Responses[0] pi := obj.Responses[0]
...@@ -493,24 +412,10 @@ var findPeerDhtCmd = &cmds.Command{ ...@@ -493,24 +412,10 @@ var findPeerDhtCmd = &cmds.Command{
}, },
} }
return func(res cmds.Response) (io.Reader, error) { verbose, _ := req.Options[dhtVerboseOptionName].(bool)
verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() printEvent(out, w, verbose, pfm)
v, err := unwrapOutput(res.Output()) return nil
if err != nil { }),
return nil, err
}
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, e.TypeErr(obj, v)
}
buf := new(bytes.Buffer)
printEvent(obj, buf, verbose, pfm)
return buf, nil
}
}(),
}, },
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
...@@ -535,43 +440,27 @@ Different key types can specify other 'best' rules. ...@@ -535,43 +440,27 @@ Different key types can specify other 'best' rules.
Options: []cmdkit.Option{ Options: []cmdkit.Option{
cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := req.InvocContext().GetNode() nd, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if n.Routing == nil { if nd.Routing == nil {
res.SetError(ErrNotOnline, cmdkit.ErrNormal) return ErrNotOnline
return
} }
dhtkey, err := escapeDhtKey(req.Arguments()[0]) dhtkey, err := escapeDhtKey(req.Arguments[0])
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
outChan := make(chan interface{}) ctx, cancel := context.WithCancel(req.Context)
res.SetOutput((<-chan interface{})(outChan))
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx) ctx, events := notif.RegisterForQueryEvents(ctx)
go func() { go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context().Done():
}
}
}()
go func() {
defer cancel() defer cancel()
val, err := n.Routing.GetValue(ctx, dhtkey) val, err := nd.Routing.GetValue(ctx, dhtkey)
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
...@@ -584,9 +473,17 @@ Different key types can specify other 'best' rules. ...@@ -584,9 +473,17 @@ Different key types can specify other 'best' rules.
}) })
} }
}() }()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
return nil
}, },
Marshalers: cmds.MarshalerMap{ Encoders: cmds.EncoderMap{
cmds.Text: func() func(cmds.Response) (io.Reader, error) { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{ pfm := pfuncMap{
notif.Value: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { notif.Value: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
if verbose { if verbose {
...@@ -597,25 +494,11 @@ Different key types can specify other 'best' rules. ...@@ -597,25 +494,11 @@ Different key types can specify other 'best' rules.
}, },
} }
return func(res cmds.Response) (io.Reader, error) { verbose, _ := req.Options[dhtVerboseOptionName].(bool)
verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool() printEvent(out, w, verbose, pfm)
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, e.TypeErr(obj, v)
}
buf := new(bytes.Buffer) return nil
}),
printEvent(obj, buf, verbose, pfm)
return buf, nil
}
}(),
}, },
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
...@@ -649,46 +532,29 @@ NOTE: A value may not exceed 2048 bytes. ...@@ -649,46 +532,29 @@ NOTE: A value may not exceed 2048 bytes.
Options: []cmdkit.Option{ Options: []cmdkit.Option{
cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."), cmdkit.BoolOption("verbose", dhtVerboseOptionName, "Print extra information."),
}, },
Run: func(req cmds.Request, res cmds.Response) { Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := req.InvocContext().GetNode() nd, err := cmdenv.GetNode(env)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
if n.Routing == nil { if nd.Routing == nil {
res.SetError(ErrNotOnline, cmdkit.ErrNormal) return ErrNotOnline
return
} }
key, err := escapeDhtKey(req.Arguments()[0]) key, err := escapeDhtKey(req.Arguments[0])
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) return err
return
} }
outChan := make(chan interface{}) data := req.Arguments[1]
res.SetOutput((<-chan interface{})(outChan))
data := req.Arguments()[1] ctx, cancel := context.WithCancel(req.Context)
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx) ctx, events := notif.RegisterForQueryEvents(ctx)
go func() { go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context().Done():
return
}
}
}()
go func() {
defer cancel() defer cancel()
err := n.Routing.PutValue(ctx, key, []byte(data)) err := nd.Routing.PutValue(ctx, key, []byte(data))
if err != nil { if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
...@@ -696,9 +562,17 @@ NOTE: A value may not exceed 2048 bytes. ...@@ -696,9 +562,17 @@ NOTE: A value may not exceed 2048 bytes.
}) })
} }
}() }()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
return nil
}, },
Marshalers: cmds.MarshalerMap{ Encoders: cmds.EncoderMap{
cmds.Text: func() func(cmds.Response) (io.Reader, error) { cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
pfm := pfuncMap{ pfm := pfuncMap{
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) { notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
if verbose { if verbose {
...@@ -710,23 +584,12 @@ NOTE: A value may not exceed 2048 bytes. ...@@ -710,23 +584,12 @@ NOTE: A value may not exceed 2048 bytes.
}, },
} }
return func(res cmds.Response) (io.Reader, error) { verbose, _ := req.Options[dhtVerboseOptionName].(bool)
verbose, _, _ := res.Request().Option(dhtVerboseOptionName).Bool()
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, e.TypeErr(obj, v)
}
buf := new(bytes.Buffer) printEvent(out, w, verbose, pfm)
printEvent(obj, buf, verbose, pfm)
return buf, nil return nil
} }),
}(),
}, },
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
......
...@@ -125,7 +125,7 @@ var rootSubcommands = map[string]*cmds.Command{ ...@@ -125,7 +125,7 @@ var rootSubcommands = map[string]*cmds.Command{
"bootstrap": lgc.NewCommand(BootstrapCmd), "bootstrap": lgc.NewCommand(BootstrapCmd),
"config": lgc.NewCommand(ConfigCmd), "config": lgc.NewCommand(ConfigCmd),
"dag": dag.DagCmd, "dag": dag.DagCmd,
"dht": lgc.NewCommand(DhtCmd), "dht": DhtCmd,
"diag": lgc.NewCommand(DiagCmd), "diag": lgc.NewCommand(DiagCmd),
"dns": DNSCmd, "dns": DNSCmd,
"id": IDCmd, "id": IDCmd,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论