提交 d36b6dbd 作者: Łukasz Magiera

reprovider: use goprocess

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 14866308
...@@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu ...@@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu
func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) {
var keyProvider reprovide.KeyChanFunc var keyProvider reprovide.KeyChanFunc
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return nil, err
}
reproviderInterval = dur
}
switch cfg.Reprovider.Strategy { switch cfg.Reprovider.Strategy {
case "all": case "all":
fallthrough fallthrough
...@@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config ...@@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config
default: default:
return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
} }
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
} }
// Reprovider runs the reprovider service // Reprovider runs the reprovider service
func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error {
reproviderInterval := kReprovideFrequency lp.Append(reprovider.Run)
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}
reproviderInterval = dur
}
go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle
return nil return nil
} }
...@@ -3,13 +3,13 @@ package reprovide ...@@ -3,13 +3,13 @@ package reprovide
import ( import (
"context" "context"
"github.com/ipfs/go-ipfs/pin" pin "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil" cidutil "github.com/ipfs/go-cidutil"
blocks "github.com/ipfs/go-ipfs-blockstore" blocks "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag" merkledag "github.com/ipfs/go-merkledag"
) )
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan // NewBlockstoreProvider returns key provider using bstore.AllKeysChan
......
...@@ -2,67 +2,77 @@ package reprovide ...@@ -2,67 +2,77 @@ package reprovide
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
backoff "github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
cid "github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/ipfs/go-verifcid" "github.com/ipfs/go-verifcid"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
routing "github.com/libp2p/go-libp2p-routing" routing "github.com/libp2p/go-libp2p-routing"
) )
var log = logging.Logger("reprovider") var log = logging.Logger("reprovider")
//KeyChanFunc is function streaming CIDs to pass to content routing // KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
type doneFunc func(error) type doneFunc func(error)
type Reprovider struct { type Reprovider struct {
ctx context.Context ctx context.Context
trigger chan doneFunc trigger chan doneFunc
closing chan struct{}
// The routing system to provide values through // The routing system to provide values through
rsys routing.ContentRouting rsys routing.ContentRouting
keyProvider KeyChanFunc keyProvider KeyChanFunc
tick time.Duration
} }
// NewReprovider creates new Reprovider instance. // NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{ return &Reprovider{
ctx: ctx, ctx: ctx,
trigger: make(chan doneFunc), trigger: make(chan doneFunc),
closing: make(chan struct{}),
rsys: rsys, rsys: rsys,
keyProvider: keyProvider, keyProvider: keyProvider,
tick: tick,
} }
} }
// Run re-provides keys with 'tick' interval or when triggered // Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run(tick time.Duration) { func (rp *Reprovider) Run(proc goprocess.Process) {
ctx := goprocessctx.WithProcessClosing(rp.ctx, proc)
defer close(rp.closing)
// dont reprovide immediately. // dont reprovide immediately.
// may have just started the daemon and shutting it down immediately. // may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime. // probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute) after := time.After(time.Minute)
var done doneFunc var done doneFunc
for { for {
if tick == 0 { if rp.tick == 0 {
after = make(chan time.Time) after = make(chan time.Time)
} }
select { select {
case <-rp.ctx.Done(): case <-ctx.Done():
return return
case done = <-rp.trigger: case done = <-rp.trigger:
case <-after: case <-after:
} }
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called // 'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned // a 'reprovider is already running' error is returned
unmute := rp.muteTrigger() unmute := rp.muteTrigger()
err := rp.Reprovide() err := rp.reprovide(ctx)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
} }
...@@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) { ...@@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) {
unmute() unmute()
after = time.After(tick) after = time.After(rp.tick)
} }
} }
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing // reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error { func (rp *Reprovider) reprovide(ctx context.Context) error {
keychan, err := rp.keyProvider(rp.ctx) keychan, err := rp.keyProvider(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get key chan: %s", err) return fmt.Errorf("failed to get key chan: %s", err)
} }
...@@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error { ...@@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error {
continue continue
} }
op := func() error { op := func() error {
err := rp.rsys.Provide(rp.ctx, c, true) err := rp.rsys.Provide(ctx, c, true)
if err != nil { if err != nil {
log.Debugf("Failed to provide key: %s", err) log.Debugf("Failed to provide key: %s", err)
} }
...@@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error { ...@@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error {
} }
select { select {
case <-rp.closing:
return errors.New("reprovider is closed")
case <-rp.ctx.Done(): case <-rp.ctx.Done():
return context.Canceled return rp.ctx.Err()
case <-ctx.Done(): case <-ctx.Done():
return context.Canceled return ctx.Err()
case rp.trigger <- df: case rp.trigger <- df:
<-progressCtx.Done() <-progressCtx.Done()
return err return err
......
package reprovide_test package reprovide
import ( import (
"context" "context"
"github.com/ipfs/go-ipfs"
"testing" "testing"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
...@@ -34,9 +33,9 @@ func TestReprovide(t *testing.T) { ...@@ -34,9 +33,9 @@ func TestReprovide(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
keyProvider := ipfs.NewBlockstoreProvider(bstore) keyProvider := NewBlockstoreProvider(bstore)
reprov := ipfs.NewReprovider(ctx, clA, keyProvider) reprov := NewReprovider(ctx, 0, clA, keyProvider)
err = reprov.Reprovide() err = reprov.reprovide(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论