提交 c77ed6d2 作者: Jeromy 提交者: Juan Batiz-Benet

fix up FindProvidersAsync

上级 c894b1d2
...@@ -368,8 +368,8 @@ func (dht *IpfsDHT) Update(p *peer.Peer) { ...@@ -368,8 +368,8 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
// after some deadline of inactivity. // after some deadline of inactivity.
} }
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) { func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routingTables { for _, table := range dht.routingTables {
p := table.Find(id) p := table.Find(id)
if p != nil { if p != nil {
...@@ -465,7 +465,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { ...@@ -465,7 +465,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
p, _ := dht.peerstore.Get(id) p, _ := dht.peerstore.Get(id)
if p == nil { if p == nil {
p, _ = dht.Find(id) p, _ = dht.FindLocal(id)
if p != nil { if p != nil {
panic("somehow peer not getting into peerstore") panic("somehow peer not getting into peerstore")
} }
......
...@@ -227,13 +227,16 @@ func TestProvides(t *testing.T) { ...@@ -227,13 +227,16 @@ func TestProvides(t *testing.T) {
time.Sleep(time.Millisecond * 60) time.Sleep(time.Millisecond * 60)
ctxT, _ := context.WithTimeout(context.Background(), time.Second) ctxT, _ := context.WithTimeout(context.Background(), time.Second)
provs, err := dhts[0].FindProviders(ctxT, u.Key("hello")) provchan := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 1)
if err != nil {
t.Fatal(err)
}
if len(provs) != 1 { after := time.After(time.Second)
t.Fatal("Didnt get back providers") select {
case prov := <-provchan:
if prov == nil {
t.Fatal("Got back nil provider")
}
case <-after:
t.Fatal("Did not get a provider back.")
} }
} }
......
...@@ -3,6 +3,7 @@ package dht ...@@ -3,6 +3,7 @@ package dht
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -117,26 +118,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { ...@@ -117,26 +118,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return nil return nil
} }
// NB: not actually async. Used to keep the interface consistent while the
// actual async method, FindProvidersAsync2 is under construction
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer { func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer {
ch := make(chan *peer.Peer)
providers, err := dht.FindProviders(ctx, key)
if err != nil {
close(ch)
return ch
}
go func() {
defer close(ch)
for _, p := range providers {
ch <- p
}
}()
return ch
}
// FIXME: there's a bug here!
func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count int) <-chan *peer.Peer {
peerOut := make(chan *peer.Peer, count) peerOut := make(chan *peer.Peer, count)
go func() { go func() {
ps := newPeerSet() ps := newPeerSet()
...@@ -151,9 +133,12 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in ...@@ -151,9 +133,12 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in
} }
} }
wg := new(sync.WaitGroup)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers { for _, pp := range peers {
wg.Add(1)
go func(p *peer.Peer) { go func(p *peer.Peer) {
defer wg.Done()
pmes, err := dht.findProvidersSingle(ctx, p, key, 0) pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
if err != nil { if err != nil {
log.Error("%s", err) log.Error("%s", err)
...@@ -162,7 +147,8 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in ...@@ -162,7 +147,8 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in
dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut) dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut)
}(pp) }(pp)
} }
wg.Wait()
close(peerOut)
}() }()
return peerOut return peerOut
} }
...@@ -186,61 +172,16 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet ...@@ -186,61 +172,16 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet
} }
} }
// FindProviders searches for peers who can provide the value for given key.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) {
// get closest peer
log.Debug("Find providers for: '%s'", key)
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
log.Warning("Got no nearest peer for find providers: '%s'", key)
return nil, nil
}
for level := 0; level < len(dht.routingTables); {
// attempt retrieving providers
pmes, err := dht.findProvidersSingle(ctx, p, key, level)
if err != nil {
return nil, err
}
// handle providers
provs := pmes.GetProviderPeers()
if provs != nil {
log.Debug("Got providers back from findProviders call!")
return dht.addProviders(key, provs), nil
}
log.Debug("Didnt get providers, just closer peers.")
closer := pmes.GetCloserPeers()
if len(closer) == 0 {
level++
continue
}
np, err := dht.peerFromInfo(closer[0])
if err != nil {
log.Debug("no peerFromInfo")
level++
continue
}
p = np
}
return nil, u.ErrNotFound
}
// Find specific Peer // Find specific Peer
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) { func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) {
// Check if were already connected to them // Check if were already connected to them
p, _ := dht.Find(id) p, _ := dht.FindLocal(id)
if p != nil { if p != nil {
return p, nil return p, nil
} }
// @whyrusleeping why is this here? doesn't the dht.Find above cover it?
routeLevel := 0 routeLevel := 0
p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id)) p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
if p == nil { if p == nil {
...@@ -277,7 +218,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error ...@@ -277,7 +218,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) { func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) {
// Check if were already connected to them // Check if were already connected to them
p, _ := dht.Find(id) p, _ := dht.FindLocal(id)
if p != nil { if p != nil {
return p, nil return p, nil
} }
......
...@@ -26,11 +26,7 @@ type IpfsRouting interface { ...@@ -26,11 +26,7 @@ type IpfsRouting interface {
// Announce that this node can provide value for given key // Announce that this node can provide value for given key
Provide(context.Context, u.Key) error Provide(context.Context, u.Key) error
// FindProviders searches for peers who can provide the value for given key.
FindProviders(context.Context, u.Key) ([]*peer.Peer, error)
// Find specific Peer // Find specific Peer
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
FindPeer(context.Context, peer.ID) (*peer.Peer, error) FindPeer(context.Context, peer.ID) (*peer.Peer, error)
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论