提交 a865fde2 作者: Łukasz Magiera

reprovider: make reprovide cmd error if reprovider is active

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 0f692baf
...@@ -14,10 +14,11 @@ import ( ...@@ -14,10 +14,11 @@ import (
var log = logging.Logger("reprovider") var log = logging.Logger("reprovider")
type keyChanFunc func(context.Context) (<-chan *cid.Cid, error) type keyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)
type Reprovider struct { type Reprovider struct {
ctx context.Context ctx context.Context
trigger chan context.CancelFunc trigger chan doneFunc
// The routing system to provide values through // The routing system to provide values through
rsys routing.ContentRouting rsys routing.ContentRouting
...@@ -29,7 +30,7 @@ type Reprovider struct { ...@@ -29,7 +30,7 @@ type Reprovider struct {
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider { func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider {
return &Reprovider{ return &Reprovider{
ctx: ctx, ctx: ctx,
trigger: make(chan context.CancelFunc), trigger: make(chan doneFunc),
rsys: rsys, rsys: rsys,
keyProvider: keyProvider, keyProvider: keyProvider,
...@@ -42,7 +43,7 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { ...@@ -42,7 +43,7 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
// may have just started the daemon and shutting it down immediately. // may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime. // probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute) after := time.After(time.Minute)
var done context.CancelFunc var done doneFunc
for { for {
select { select {
case <-rp.ctx.Done(): case <-rp.ctx.Done():
...@@ -51,14 +52,19 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) { ...@@ -51,14 +52,19 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
case <-after: case <-after:
} }
unmute := rp.muteTrigger()
err := rp.Reprovide() err := rp.Reprovide()
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
} }
if done != nil { if done != nil {
done() done(err)
} }
unmute()
after = time.After(tick) after = time.After(tick)
} }
} }
...@@ -93,13 +99,36 @@ func (rp *Reprovider) Reprovide() error { ...@@ -93,13 +99,36 @@ func (rp *Reprovider) Reprovide() error {
func (rp *Reprovider) Trigger(ctx context.Context) error { func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx) progressCtx, done := context.WithCancel(ctx)
var err error
df := func(e error) {
err = e
done()
}
select { select {
case <-rp.ctx.Done(): case <-rp.ctx.Done():
return context.Canceled return context.Canceled
case <-ctx.Done(): case <-ctx.Done():
return context.Canceled return context.Canceled
case rp.trigger <- done: case rp.trigger <- df:
<-progressCtx.Done() <-progressCtx.Done()
return nil return err
} }
} }
func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()
return cf
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论