提交 91a79bc2 作者: Juan Batiz-Benet

ratelimiter: fixing rate limiter use

Use of the ratelimiter should be conscious of the ratelimiter's
potential closing. any loops that add work to ratelimiter
should (a) only do so if the rate limiter is not closed,
or (b) prevent limiter while work is added
(i.e. use limiter.Go(addWorkHere))
上级 1d01c035
...@@ -117,11 +117,10 @@ func (w *Worker) start(c Config) { ...@@ -117,11 +117,10 @@ func (w *Worker) start(c Config) {
} }
}) })
// reads from |workerChan| until process closes // reads from |workerChan| until w.process closes
w.process.Go(func(proc process.Process) { limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) {
ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers)
defer limiter.Close()
for { for {
select { select {
case <-proc.Closing(): case <-proc.Closing():
......
...@@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote ...@@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
go func() { go func() {
// rate limiting just in case. at most 10 addrs at once. // rate limiting just in case. at most 10 addrs at once.
limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10) limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10)
limiter.Go(func(worker process.Process) {
// permute addrs so we try different sets first each time. // permute addrs so we try different sets first each time.
for _, i := range rand.Perm(len(remoteAddrs)) { for _, i := range rand.Perm(len(remoteAddrs)) {
select { select {
case <-foundConn: // if one of them succeeded already case <-foundConn: // if one of them succeeded already
break break
default: case <-worker.Closing(): // our context was cancelled
break
default:
}
workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.LimitedGo(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
} }
})
workerAddr := remoteAddrs[i] // shadow variable to avoid race
limiter.Go(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}
}() }()
// wair fot the results. // wair fot the results.
......
...@@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) { ...@@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) {
// hooks into your object that block you accidentally. // hooks into your object that block you accidentally.
func (n *Notifier) NotifyAll(notify func(Notifiee)) { func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock() n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used. defer n.mu.Unlock()
for notifiee := range n.nots {
if n.nots == nil { // so that zero-value is ready to be used.
return
}
if n.lim == nil { // no rate limit // no rate limiting.
go notify(notifiee) if n.lim == nil {
} else { for notifiee := range n.nots {
notifiee := notifiee // rebind for data races go notify(notifiee)
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
} }
return
} }
n.mu.Unlock()
// with rate limiting.
n.lim.Go(func(worker process.Process) {
for notifiee := range n.nots {
notifiee := notifiee // rebind for loop data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
})
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论