提交 0ba7661d 作者: Łukasz Magiera 提交者: Steven Allen

Fix context propagation sortof

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 1acb4503
...@@ -129,6 +129,8 @@ func defaultRepo(dstore repo.Datastore) (repo.Repo, error) { ...@@ -129,6 +129,8 @@ func defaultRepo(dstore repo.Datastore) (repo.Repo, error) {
}, nil }, nil
} }
type MetricsCtx context.Context
// NewNode constructs and returns an IpfsNode using the given cfg. // NewNode constructs and returns an IpfsNode using the given cfg.
func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
if cfg == nil { if cfg == nil {
...@@ -157,9 +159,14 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { ...@@ -157,9 +159,14 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
return cfg return cfg
}) })
metricsCtx := fx.Provide(func() MetricsCtx {
return MetricsCtx(ctx)
})
params := fx.Options( params := fx.Options(
repoOption, repoOption,
cfgOption, cfgOption,
metricsCtx,
) )
storage := fx.Options( storage := fx.Options(
......
...@@ -117,7 +117,7 @@ func datastoreCtor(repo repo.Repo) ds.Datastore { ...@@ -117,7 +117,7 @@ func datastoreCtor(repo repo.Repo) ds.Datastore {
type BaseBlocks bstore.Blockstore type BaseBlocks bstore.Blockstore
func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs BaseBlocks, err error) { func baseBlockstoreCtor(mctx MetricsCtx, repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs BaseBlocks, err error) {
rds := &retry.Datastore{ rds := &retry.Datastore{
Batching: repo.Datastore(), Batching: repo.Datastore(),
Delay: time.Millisecond * 200, Delay: time.Millisecond * 200,
...@@ -135,7 +135,7 @@ func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc ...@@ -135,7 +135,7 @@ func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc
} }
if !bcfg.NilRepo { if !bcfg.NilRepo {
ctx, cancel := context.WithCancel(context.TODO()) //TODO: needed for mertics ctx, cancel := context.WithCancel(mctx)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStop: func(context context.Context) error { OnStop: func(context context.Context) error {
...@@ -395,13 +395,13 @@ type p2pHostOut struct { ...@@ -395,13 +395,13 @@ type p2pHostOut struct {
} }
// TODO: move some of this into params struct // TODO: move some of this into params struct
func p2pHost(lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) { func p2pHost(mctx MetricsCtx, lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) {
opts := []libp2p.Option{libp2p.NoListenAddrs} opts := []libp2p.Option{libp2p.NoListenAddrs}
for _, o := range params.Opts { for _, o := range params.Opts {
opts = append(opts, o...) opts = append(opts, o...)
} }
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(mctx)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { OnStop: func(_ context.Context) error {
cancel() cancel()
...@@ -471,12 +471,12 @@ type p2pRoutingOut struct { ...@@ -471,12 +471,12 @@ type p2pRoutingOut struct {
PSRouter *psrouter.PubsubValueStore //TODO: optional PSRouter *psrouter.PubsubValueStore //TODO: optional
} }
func p2pOnlineRouting(lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { func p2pOnlineRouting(mctx MetricsCtx, lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) {
out.IpfsRouting = in.BaseRouting out.IpfsRouting = in.BaseRouting
if in.BCfg.getOpt("ipnsps") { if in.BCfg.getOpt("ipnsps") {
out.PSRouter = psrouter.NewPubsubValueStore( out.PSRouter = psrouter.NewPubsubValueStore(
lifecycleCtx(lc), lifecycleCtx(mctx, lc),
in.Host, in.Host,
in.BaseRouting, in.BaseRouting,
in.PubSub, in.PubSub,
...@@ -503,7 +503,7 @@ func p2pOnlineRouting(lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) { ...@@ -503,7 +503,7 @@ func p2pOnlineRouting(lc fx.Lifecycle, in p2pRoutingIn) (out p2pRoutingOut) {
//////////// ////////////
// P2P services // P2P services
func autoNATService(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) error { func autoNATService(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) error {
if !cfg.Swarm.EnableAutoNATService { if !cfg.Swarm.EnableAutoNATService {
return nil return nil
} }
...@@ -512,11 +512,11 @@ func autoNATService(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) err ...@@ -512,11 +512,11 @@ func autoNATService(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host) err
opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(quic.NewTransport)) opts = append(opts, libp2p.DefaultTransports, libp2p.Transport(quic.NewTransport))
} }
_, err := autonat.NewAutoNATService(lifecycleCtx(lc), host, opts...) _, err := autonat.NewAutoNATService(lifecycleCtx(mctx, lc), host, opts...)
return err return err
} }
func pubsubCtor(lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig.Config) (service *pubsub.PubSub, err error) { func pubsubCtor(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig.Config) (service *pubsub.PubSub, err error) {
if !(bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps")) { if !(bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps")) {
return nil, nil // TODO: mark optional return nil, nil // TODO: mark optional
} }
...@@ -534,10 +534,10 @@ func pubsubCtor(lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig ...@@ -534,10 +534,10 @@ func pubsubCtor(lc fx.Lifecycle, host p2phost.Host, bcfg *BuildCfg, cfg *iconfig
case "": case "":
fallthrough fallthrough
case "floodsub": case "floodsub":
service, err = pubsub.NewFloodSub(lifecycleCtx(lc), host, pubsubOptions...) service, err = pubsub.NewFloodSub(lifecycleCtx(mctx, lc), host, pubsubOptions...)
case "gossipsub": case "gossipsub":
service, err = pubsub.NewGossipSub(lifecycleCtx(lc), host, pubsubOptions...) service, err = pubsub.NewGossipSub(lifecycleCtx(mctx, lc), host, pubsubOptions...)
default: default:
err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router)
...@@ -577,9 +577,9 @@ func dagCtor(bs bserv.BlockService) format.DAGService { ...@@ -577,9 +577,9 @@ func dagCtor(bs bserv.BlockService) format.DAGService {
return merkledag.NewDAGService(bs) return merkledag.NewDAGService(bs)
} }
func onlineExchangeCtor(lc fx.Lifecycle, host p2phost.Host, rt routing.IpfsRouting, bs bstore.GCBlockstore) exchange.Interface { func onlineExchangeCtor(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host, rt routing.IpfsRouting, bs bstore.GCBlockstore) exchange.Interface {
bitswapNetwork := bsnet.NewFromIpfsHost(host, rt) bitswapNetwork := bsnet.NewFromIpfsHost(host, rt)
return bitswap.New(lifecycleCtx(lc), bitswapNetwork, bs) return bitswap.New(lifecycleCtx(mctx, lc), bitswapNetwork, bs)
} }
func onlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *iconfig.Config) (namesys.NameSystem, error) { func onlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *iconfig.Config) (namesys.NameSystem, error) {
...@@ -636,20 +636,20 @@ func (dh *discoveryHandler) HandlePeerFound(p pstore.PeerInfo) { ...@@ -636,20 +636,20 @@ func (dh *discoveryHandler) HandlePeerFound(p pstore.PeerInfo) {
} }
} }
func newDiscoveryHandler(lc fx.Lifecycle, host p2phost.Host) *discoveryHandler { func newDiscoveryHandler(mctx MetricsCtx, lc fx.Lifecycle, host p2phost.Host) *discoveryHandler {
return &discoveryHandler{ return &discoveryHandler{
ctx: lifecycleCtx(lc), ctx: lifecycleCtx(mctx, lc),
host: host, host: host,
} }
} }
func setupDiscovery(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, handler *discoveryHandler) error { func setupDiscovery(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, handler *discoveryHandler) error {
if cfg.Discovery.MDNS.Enabled { if cfg.Discovery.MDNS.Enabled {
mdns := cfg.Discovery.MDNS mdns := cfg.Discovery.MDNS
if mdns.Interval == 0 { if mdns.Interval == 0 {
mdns.Interval = 5 mdns.Interval = 5
} }
service, err := discovery.NewMdnsService(lifecycleCtx(lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) service, err := discovery.NewMdnsService(lifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag)
if err != nil { if err != nil {
log.Error("mdns error: ", err) log.Error("mdns error: ", err)
return nil return nil
...@@ -659,15 +659,15 @@ func setupDiscovery(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, han ...@@ -659,15 +659,15 @@ func setupDiscovery(lc fx.Lifecycle, cfg *iconfig.Config, host p2phost.Host, han
return nil return nil
} }
func providerQueue(lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { func providerQueue(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) {
return provider.NewQueue(lifecycleCtx(lc), "provider-v1", repo.Datastore()) return provider.NewQueue(lifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
} }
func providerCtor(lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { func providerCtor(mctx MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider {
return provider.NewProvider(lifecycleCtx(lc), queue, rt) return provider.NewProvider(lifecycleCtx(mctx, lc), queue, rt)
} }
func reproviderCtor(lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*rp.Reprovider, error) { func reproviderCtor(mctx MetricsCtx, lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*rp.Reprovider, error) {
var keyProvider rp.KeyChanFunc var keyProvider rp.KeyChanFunc
switch cfg.Reprovider.Strategy { switch cfg.Reprovider.Strategy {
...@@ -682,7 +682,7 @@ func reproviderCtor(lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds form ...@@ -682,7 +682,7 @@ func reproviderCtor(lc fx.Lifecycle, cfg *iconfig.Config, bs BaseBlocks, ds form
default: default:
return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
} }
return rp.NewReprovider(lifecycleCtx(lc), rt, keyProvider), nil return rp.NewReprovider(lifecycleCtx(mctx, lc), rt, keyProvider), nil
} }
func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error { func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error {
...@@ -700,7 +700,7 @@ func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error { ...@@ -700,7 +700,7 @@ func reprovider(cfg *iconfig.Config, reprovider *rp.Reprovider) error {
return nil return nil
} }
func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { func files(mctx MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := ds.NewKey("/local/filesroot") dsk := ds.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error { pf := func(ctx context.Context, c cid.Cid) error {
return repo.Datastore().Put(dsk, c.Bytes()) return repo.Datastore().Put(dsk, c.Bytes())
...@@ -708,7 +708,7 @@ func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, e ...@@ -708,7 +708,7 @@ func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, e
var nd *merkledag.ProtoNode var nd *merkledag.ProtoNode
val, err := repo.Datastore().Get(dsk) val, err := repo.Datastore().Get(dsk)
ctx := lifecycleCtx(lc) ctx := lifecycleCtx(mctx, lc)
switch { switch {
case err == ds.ErrNotFound || val == nil: case err == ds.ErrNotFound || val == nil:
...@@ -748,8 +748,8 @@ func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, e ...@@ -748,8 +748,8 @@ func files(lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, e
// //
// This is a hack which we need because most of our services use contexts in a // This is a hack which we need because most of our services use contexts in a
// wrong way // wrong way
func lifecycleCtx(lc fx.Lifecycle) context.Context { func lifecycleCtx(mctx MetricsCtx, lc fx.Lifecycle) context.Context {
ctx, cancel := context.WithCancel(context.TODO()) // TODO: really wire this context up, things (like metrics) may depend on it ctx, cancel := context.WithCancel(mctx)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { OnStop: func(_ context.Context) error {
cancel() cancel()
...@@ -774,7 +774,7 @@ func (lp *lcProcess) Run(f goprocess.ProcessFunc) { ...@@ -774,7 +774,7 @@ func (lp *lcProcess) Run(f goprocess.ProcessFunc) {
return nil return nil
}, },
OnStop: func(ctx context.Context) error { OnStop: func(ctx context.Context) error {
return (<-proc).Close() // todo: respect ctx return (<-proc).Close() // todo: respect ctx, somehow
}, },
}) })
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论