提交 80c73f26 作者: Jeromy

Add in some more notifications to help profile queries

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 261cdee4
...@@ -18,6 +18,8 @@ const ( ...@@ -18,6 +18,8 @@ const (
QueryError QueryError
Provider Provider
Value Value
AddingPeer
DialingPeer
) )
type QueryEvent struct { type QueryEvent struct {
......
...@@ -79,6 +79,8 @@ type dhtQueryRunner struct { ...@@ -79,6 +79,8 @@ type dhtQueryRunner struct {
rateLimit chan struct{} // processing semaphore rateLimit chan struct{} // processing semaphore
log logging.EventLogger log logging.EventLogger
runCtx context.Context
proc process.Process proc process.Process
sync.RWMutex sync.RWMutex
} }
...@@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner { ...@@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
r.log = log r.log = log
r.runCtx = ctx
if len(peers) == 0 { if len(peers) == 0 {
log.Warning("Running query with no peers!") log.Warning("Running query with no peers!")
...@@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) { ...@@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
return return
} }
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
})
r.peersRemaining.Increment(1) r.peersRemaining.Increment(1)
select { select {
case r.peersToQuery.EnqChan <- next: case r.peersToQuery.EnqChan <- next:
...@@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { ...@@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// make sure we're connected to the peer. // make sure we're connected to the peer.
// FIXME abstract away into the network layer // FIXME abstract away into the network layer
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("not connected. dialing.") log.Error("not connected. dialing.")
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.DialingPeer,
ID: p,
})
// while we dial, we do not take up a rate limit. this is to allow // while we dial, we do not take up a rate limit. this is to allow
// forward progress during potentially very high latency dials. // forward progress during potentially very high latency dials.
r.rateLimit <- struct{}{} r.rateLimit <- struct{}{}
...@@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { ...@@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
if err := r.query.dht.host.Connect(ctx, pi); err != nil { if err := r.query.dht.host.Connect(ctx, pi); err != nil {
log.Debugf("Error connecting: %s", err) log.Debugf("Error connecting: %s", err)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{ notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.QueryError, Type: notif.QueryError,
Extra: err.Error(), Extra: err.Error(),
ID: p,
}) })
r.Lock() r.Lock()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论