提交 a2bba212 作者: Jeromy Johnson 提交者: GitHub

Merge pull request #3106 from ipfs/feat/cmds/dht-provide

cmds: implement ipfs dht provide command
...@@ -9,12 +9,15 @@ import ( ...@@ -9,12 +9,15 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
dag "github.com/ipfs/go-ipfs/merkledag"
notif "github.com/ipfs/go-ipfs/notifications" notif "github.com/ipfs/go-ipfs/notifications"
path "github.com/ipfs/go-ipfs/path" path "github.com/ipfs/go-ipfs/path"
routing "github.com/ipfs/go-ipfs/routing"
ipdht "github.com/ipfs/go-ipfs/routing/dht" ipdht "github.com/ipfs/go-ipfs/routing/dht"
pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore" pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
var ErrNotDHT = errors.New("routing service is not a DHT") var ErrNotDHT = errors.New("routing service is not a DHT")
...@@ -31,6 +34,7 @@ var DhtCmd = &cmds.Command{ ...@@ -31,6 +34,7 @@ var DhtCmd = &cmds.Command{
"findpeer": findPeerDhtCmd, "findpeer": findPeerDhtCmd,
"get": getValueDhtCmd, "get": getValueDhtCmd,
"put": putValueDhtCmd, "put": putValueDhtCmd,
"provide": provideRefDhtCmd,
}, },
} }
...@@ -227,6 +231,160 @@ var findProvidersDhtCmd = &cmds.Command{ ...@@ -227,6 +231,160 @@ var findProvidersDhtCmd = &cmds.Command{
Type: notif.QueryEvent{}, Type: notif.QueryEvent{},
} }
var provideRefDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Announce to the network that you are providing given values.",
},
Arguments: []cmds.Argument{
cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
cmds.BoolOption("recursive", "r", "Recursively provide entire graph.").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if n.Routing == nil {
res.SetError(errNotOnline, cmds.ErrNormal)
return
}
rec, _, _ := req.Option("recursive").Bool()
var keys []key.Key
for _, arg := range req.Arguments() {
k := key.B58KeyDecode(arg)
if k == "" {
res.SetError(fmt.Errorf("incorrectly formatted key: ", arg), cmds.ErrNormal)
return
}
has, err := n.Blockstore.Has(k)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !has {
res.SetError(fmt.Errorf("block %s not found locally, cannot provide", k), cmds.ErrNormal)
return
}
keys = append(keys, k)
}
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))
events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)
go func() {
defer close(outChan)
for e := range events {
outChan <- e
}
}()
go func() {
defer close(events)
var err error
if rec {
err = provideKeysRec(ctx, n.Routing, n.DAG, keys)
} else {
err = provideKeys(ctx, n.Routing, keys)
}
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
verbose, _, _ := res.Request().Option("v").Bool()
pfm := pfuncMap{
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
if verbose {
fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID)
}
},
}
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
printEvent(obj, buf, verbose, pfm)
return buf, nil
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
},
},
Type: notif.QueryEvent{},
}
func provideKeys(ctx context.Context, r routing.IpfsRouting, keys []key.Key) error {
for _, k := range keys {
err := r.Provide(ctx, k)
if err != nil {
return err
}
}
return nil
}
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGService, keys []key.Key) error {
provided := make(map[key.Key]struct{})
for _, k := range keys {
kset := key.NewKeySet()
node, err := dserv.Get(ctx, k)
if err != nil {
return err
}
err = dag.EnumerateChildrenAsync(ctx, dserv, node, kset)
if err != nil {
return err
}
for _, k := range kset.Keys() {
if _, ok := provided[k]; ok {
continue
}
err = r.Provide(ctx, k)
if err != nil {
return err
}
provided[k] = struct{}{}
}
}
return nil
}
var findPeerDhtCmd = &cmds.Command{ var findPeerDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{ Helptext: cmds.HelpText{
Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.", Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.",
......
...@@ -263,6 +263,10 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { ...@@ -263,6 +263,10 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
go func(p peer.ID) { go func(p peer.ID) {
defer wg.Done() defer wg.Done()
log.Debugf("putProvider(%s, %s)", key, p) log.Debugf("putProvider(%s, %s)", key, p)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
ID: p,
})
err := dht.sendMessage(ctx, p, mes) err := dht.sendMessage(ctx, p, mes)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
...@@ -272,6 +276,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { ...@@ -272,6 +276,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
wg.Wait() wg.Wait()
return nil return nil
} }
func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) { func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
pi := pstore.PeerInfo{ pi := pstore.PeerInfo{
ID: dht.self, ID: dht.self,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论