提交 f756088d 作者: Brian Tiger Chow

fix(routing/dht) _always_ close chan on exit of FindProvidersAsync

the important change here is that within FindProvidersAsync, the channel
is closed using a `defer`. This ensures the channel is always closed,
regardless of the path taken to exit.

+ misc cleanup

cc @whyrusleeping @jbenet

License: MIT
Signed-off-by: 's avatarBrian Tiger Chow <brian@perfmode.com>
上级 c80a7941
......@@ -194,24 +194,21 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa
start := time.Now()
log.Event(ctx, "sentMessage", dht.self, p, pmes)
rmes, err := dht.sender.SendRequest(ctx, mes)
rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
if err != nil {
return nil, err
}
if rmes == nil {
return nil, errors.New("no response to request")
}
log.Event(ctx, "sentMessage", dht.self, p, pmes)
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)
rmes.Peer().SetLatency(time.Since(start))
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
return rpmes, nil
}
......
......@@ -129,21 +129,27 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
log.Event(ctx, "findProviders", &key)
peerOut := make(chan peer.Peer, count)
go func() {
defer close(peerOut)
ps := newPeerSet()
// TODO may want to make this function async to hide latency
provs := dht.providers.GetProviders(key)
for _, p := range provs {
count--
// NOTE: assuming that this list of peers is unique
ps.Add(p)
peerOut <- p
select {
case peerOut <- p:
case <-ctx.Done():
return
}
if count <= 0 {
return
}
}
wg := new(sync.WaitGroup)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
var wg sync.WaitGroup
for _, pp := range dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) {
wg.Add(1)
go func(p peer.Peer) {
defer wg.Done()
......@@ -156,16 +162,16 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
}(pp)
}
wg.Wait()
close(peerOut)
}()
return peerOut
}
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
done := make(chan struct{})
var wg sync.WaitGroup
for _, pbp := range peers {
wg.Add(1)
go func(mp *pb.Message_Peer) {
defer func() { done <- struct{}{} }()
defer wg.Done()
// construct new peer
p, err := dht.ensureConnectedToPeer(ctx, mp)
if err != nil {
......@@ -179,15 +185,17 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M
dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
select {
case out <- p:
case <-ctx.Done():
return
}
} else if ps.Size() >= count {
return
}
}(pbp)
}
for _ = range peers {
<-done
}
wg.Wait()
}
// Find specific Peer
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论