提交 dc0fbfd3 作者: Juan Batiz-Benet 提交者: Brian Tiger Chow

added some logging

上级 a114e9cd
...@@ -111,7 +111,12 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { ...@@ -111,7 +111,12 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
r.addPeerToQuery(p, nil) // don't have access to self here... r.addPeerToQuery(p, nil) // don't have access to self here...
} }
// wait until we're done. yep. // go do this thing.
go r.spawnWorkers()
// so workers are working.
// wait until they're done.
select { select {
case <-r.peersRemaining.Done(): case <-r.peersRemaining.Done():
r.cancel() // ran all and nothing. cancel all outstanding workers. r.cancel() // ran all and nothing. cancel all outstanding workers.
...@@ -158,6 +163,8 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { ...@@ -158,6 +163,8 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
r.peersSeen[next.Key()] = next r.peersSeen[next.Key()] = next
r.Unlock() r.Unlock()
u.POut("adding peer to query: %v\n", next.ID.Pretty())
// do this after unlocking to prevent possible deadlocks. // do this after unlocking to prevent possible deadlocks.
r.peersRemaining.Increment(1) r.peersRemaining.Increment(1)
select { select {
...@@ -166,8 +173,9 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { ...@@ -166,8 +173,9 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
} }
} }
func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) { func (r *dhtQueryRunner) spawnWorkers() {
for { for {
select { select {
case <-r.peersRemaining.Done(): case <-r.peersRemaining.Done():
return return
...@@ -175,13 +183,19 @@ func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) { ...@@ -175,13 +183,19 @@ func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) {
case <-r.ctx.Done(): case <-r.ctx.Done():
return return
case p := <-r.peersToQuery.DeqChan: case p, more := <-r.peersToQuery.DeqChan:
if !more {
return // channel closed.
}
u.POut("spawning worker for: %v\n", p.ID.Pretty())
go r.queryPeer(p) go r.queryPeer(p)
} }
} }
} }
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) { func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
u.POut("spawned worker for: %v\n", p.ID.Pretty())
// make sure we rate limit concurrency. // make sure we rate limit concurrency.
select { select {
case <-r.rateLimit: case <-r.rateLimit:
...@@ -190,27 +204,33 @@ func (r *dhtQueryRunner) queryPeer(p *peer.Peer) { ...@@ -190,27 +204,33 @@ func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
return return
} }
u.POut("running worker for: %v\n", p.ID.Pretty())
// finally, run the query against this peer // finally, run the query against this peer
res, err := r.query.qfunc(r.ctx, p) res, err := r.query.qfunc(r.ctx, p)
if err != nil { if err != nil {
u.POut("ERROR worker for: %v %v\n", p.ID.Pretty(), err)
r.Lock() r.Lock()
r.errs = append(r.errs, err) r.errs = append(r.errs, err)
r.Unlock() r.Unlock()
} else if res.success { } else if res.success {
u.POut("SUCCESS worker for: %v\n", p.ID.Pretty(), res)
r.Lock() r.Lock()
r.result = res r.result = res
r.Unlock() r.Unlock()
r.cancel() // signal to everyone that we're done. r.cancel() // signal to everyone that we're done.
} else if res.closerPeers != nil { } else if res.closerPeers != nil {
u.POut("PEERS CLOSER -- worker for: %v\n", p.ID.Pretty())
for _, next := range res.closerPeers { for _, next := range res.closerPeers {
r.addPeerToQuery(next, p) r.addPeerToQuery(next, p)
} }
} }
// signal we're done proccessing peer p // signal we're done proccessing peer p
u.POut("completing worker for: %v\n", p.ID.Pretty())
r.peersRemaining.Decrement(1) r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{} r.rateLimit <- struct{}{}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论