提交 2d94b784 作者: Jeromy

rewrite FindProvidersAsync

上级 beb70c94
...@@ -182,7 +182,6 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p ...@@ -182,7 +182,6 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p
providers = append(providers, dht.self) providers = append(providers, dht.self)
} }
// if we've got providers, send thos those.
if providers != nil && len(providers) > 0 { if providers != nil && len(providers) > 0 {
resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, providers) resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, providers)
} }
......
...@@ -138,11 +138,14 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { ...@@ -138,11 +138,14 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
log.Event(ctx, "findProviders", &key) log.Event(ctx, "findProviders", &key)
peerOut := make(chan peer.Peer, count) peerOut := make(chan peer.Peer, count)
go func() { go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
return peerOut
}
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
defer close(peerOut) defer close(peerOut)
ps := newPeerSet() ps := newPeerSet()
// TODO may want to make this function async to hide latency
provs := dht.providers.GetProviders(ctx, key) provs := dht.providers.GetProviders(ctx, key)
for _, p := range provs { for _, p := range provs {
count-- count--
...@@ -158,23 +161,55 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int ...@@ -158,23 +161,55 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
} }
} }
var wg sync.WaitGroup // setup the Query
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
for _, pp := range peers {
wg.Add(1)
go func(p peer.Peer) {
defer wg.Done()
pmes, err := dht.findProvidersSingle(ctx, p, key, 0) pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
if err != nil { if err != nil {
log.Error(err) return nil, err
return }
provs, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetProviderPeers())
for _, err := range errs {
if err != nil {
log.Warning(err)
} }
dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut)
}(pp)
} }
wg.Wait()
}() // Add unique providers from request, up to 'count'
return peerOut for _, prov := range provs {
if ps.Contains(prov) {
continue
}
select {
case peerOut <- prov:
case <-ctx.Done():
log.Error("Context timed out sending more providers")
return nil, ctx.Err()
}
ps.Add(prov)
if ps.Size() >= count {
return &dhtQueryResult{success: true}, nil
}
}
// Give closer peers back to the query to be queried
closer := pmes.GetCloserPeers()
clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
for _, err := range errs {
if err != nil {
log.Warning(err)
}
}
return &dhtQueryResult{closerPeers: clpeers}, nil
})
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
_, err := query.Run(ctx, peers)
if err != nil {
log.Errorf("FindProviders Query error: %s", err)
}
} }
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论