提交 36168542 作者: Łukasz Magiera 提交者: Steven Allen

coreapi: implement dht api

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 a63e224e
......@@ -62,3 +62,8 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
func (api *CoreAPI) Pin() coreiface.PinAPI {
return (*PinAPI)(api)
}
// Dht returns the DhtAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Dht() coreiface.DhtAPI {
return &DhtAPI{api, nil}
}
package coreapi
import (
"context"
"errors"
"fmt"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
dag "github.com/ipfs/go-ipfs/merkledag"
routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing"
notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications"
ipdht "gx/ipfs/QmVSep2WwKcXxMonPASsAJ3nZVjfVMKgMcaSigxKnUWpJv/go-libp2p-kad-dht"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)
var ErrNotDHT = errors.New("routing service is not a DHT")
type DhtAPI struct {
*CoreAPI
*caopts.DhtOptions
}
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) {
dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}
outChan := make(chan ma.Multiaddr)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)
go func() {
defer close(outChan)
sendAddrs := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
for _, addr := range response.Addrs {
select {
case outChan <- addr:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}
for event := range events {
if event.Type == notif.FinalPeer {
err := sendAddrs(event.Responses)
if err != nil {
return
}
}
}
}()
go func() {
defer close(events)
pi, err := dht.FindPeer(ctx, peer.ID(p))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
return
}
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
Responses: []*pstore.PeerInfo{&pi},
})
}()
return outChan, nil
}
func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) {
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}
dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}
p, err = api.ResolvePath(ctx, p)
if err != nil {
return nil, err
}
c := p.Cid()
numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
}
outChan := make(chan peer.ID)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)
pchan := dht.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)
sendProviders := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
select {
case outChan <- response.ID:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
for event := range events {
if event.Type == notif.Provider {
err := sendProviders(event.Responses)
if err != nil {
return
}
}
}
}()
go func() {
defer close(events)
for p := range pchan {
np := p
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Provider,
Responses: []*pstore.PeerInfo{&np},
})
}
}()
return outChan, nil
}
func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
settings, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}
if api.node.Routing == nil {
return errors.New("cannot provide in offline mode")
}
if len(api.node.PeerHost.Network().Conns()) == 0 {
return errors.New("cannot provide, no connected peers")
}
c := path.Cid()
has, err := api.node.Blockstore.Has(c)
if err != nil {
return err
}
if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
}
//TODO: either remove or use
//outChan := make(chan interface{})
//events := make(chan *notif.QueryEvent)
//ctx = notif.RegisterForQueryEvents(ctx, events)
/*go func() {
defer close(outChan)
for range events {
select {
case <-ctx.Done():
return
default:
}
}
}()*/
//defer close(events)
if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c})
} else {
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
}
if err != nil {
return err
}
return nil
}
func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
}
for _, k := range kset.Keys() {
if provided.Has(k) {
continue
}
err = r.Provide(ctx, k, true)
if err != nil {
return err
}
provided.Add(k)
}
}
return nil
}
......@@ -31,6 +31,9 @@ type CoreAPI interface {
// ObjectAPI returns an implementation of Object API
Object() ObjectAPI
// Dht returns an implementation of Dht API
Dht() DhtAPI
// ResolvePath resolves the path using Unixfs resolver
ResolvePath(context.Context, Path) (ResolvedPath, error)
......
......@@ -17,7 +17,11 @@ type DhtAPI interface {
// FindProviders finds peers in the DHT who can provide a specific value
// given a key.
FindProviders(context.Context, Path) (<-chan peer.ID, error) //TODO: is path the right choice here?
FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan peer.ID, error) //TODO: is path the right choice here?
// WithNumProviders is an option for FindProviders which specifies the
// number of peers to look for. Default is 20
WithNumProviders(numProviders int) options.DhtFindProvidersOption
// Provide announces to the network that you are providing given values
Provide(context.Context, Path, ...options.DhtProvideOption) error
......
......@@ -4,7 +4,12 @@ type DhtProvideSettings struct {
Recursive bool
}
type DhtFindProvidersSettings struct {
NumProviders int
}
type DhtProvideOption func(*DhtProvideSettings) error
type DhtFindProvidersOption func(*DhtFindProvidersSettings) error
func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) {
options := &DhtProvideSettings{
......@@ -20,6 +25,20 @@ func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) {
return options, nil
}
func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) {
options := &DhtFindProvidersSettings{
NumProviders: 20,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
type DhtOptions struct{}
func (api *DhtOptions) WithRecursive(recursive bool) DhtProvideOption {
......@@ -28,3 +47,10 @@ func (api *DhtOptions) WithRecursive(recursive bool) DhtProvideOption {
return nil
}
}
func (api *DhtOptions) WithNumProviders(numProviders int) DhtFindProvidersOption {
return func(settings *DhtFindProvidersSettings) error {
settings.NumProviders = numProviders
return nil
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论