提交 ccc576b6 作者: Łukasz Magiera 提交者: Steven Allen

More constructor fixes

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 65d8fad0
...@@ -162,6 +162,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { ...@@ -162,6 +162,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
storage := fx.Options( storage := fx.Options(
fx.Provide(repoConfig), fx.Provide(repoConfig),
fx.Provide(datastoreCtor),
fx.Provide(baseBlockstoreCtor), fx.Provide(baseBlockstoreCtor),
fx.Provide(gcBlockstoreCtor), fx.Provide(gcBlockstoreCtor),
) )
...@@ -169,23 +170,39 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { ...@@ -169,23 +170,39 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
ident := fx.Options( ident := fx.Options(
fx.Provide(identity), fx.Provide(identity),
fx.Provide(privateKey), fx.Provide(privateKey),
fx.Provide(peerstore),
) )
ipns := fx.Options( ipns := fx.Options(
fx.Provide(recordValidator), fx.Provide(recordValidator),
) )
providers := fx.Options(
fx.Provide(providerQueue),
fx.Provide(providerCtor),
fx.Provide(reproviderCtor),
fx.Invoke(reprovider),
fx.Invoke(provider.Provider.Run),
)
online := fx.Options( online := fx.Options(
fx.Provide(onlineExchangeCtor), fx.Provide(onlineExchangeCtor),
fx.Provide(onlineNamesysCtor), fx.Provide(onlineNamesysCtor),
fx.Invoke(ipnsRepublisher), fx.Invoke(ipnsRepublisher),
fx.Invoke(provider.Provider.Run),
fx.Provide(p2p.NewP2P),
ipfsp2p,
providers,
) )
if !cfg.Online { if !cfg.Online {
online = fx.Options( online = fx.Options(
fx.Provide(offline.Exchange), fx.Provide(offline.Exchange),
fx.Provide(offlineNamesysCtor), fx.Provide(offlineNamesysCtor),
fx.Provide(offroute.NewOfflineRouter),
fx.Provide(provider.NewOfflineProvider),
) )
} }
...@@ -197,13 +214,6 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { ...@@ -197,13 +214,6 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
fx.Provide(files), fx.Provide(files),
) )
providers := fx.Options(
fx.Provide(providerQueue),
fx.Provide(providerCtor),
fx.Provide(reproviderCtor),
fx.Invoke(reprovider),
)
n := &IpfsNode{ n := &IpfsNode{
ctx: ctx, ctx: ctx,
} }
...@@ -212,21 +222,18 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { ...@@ -212,21 +222,18 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
params, params,
storage, storage,
ident, ident,
ipfsp2p,
ipns, ipns,
online, online,
fx.Invoke(setupSharding), fx.Invoke(setupSharding),
core, core,
providers,
fx.Provide(p2p.NewP2P),
fx.Extract(n), fx.Extract(n),
) )
n.IsOnline = cfg.Online n.IsOnline = cfg.Online
n.app = app
/* n := &IpfsNode{ /* n := &IpfsNode{
IsOnline: cfg.Online, IsOnline: cfg.Online,
......
...@@ -20,6 +20,8 @@ import ( ...@@ -20,6 +20,8 @@ import (
"strings" "strings"
"time" "time"
"go.uber.org/fx"
version "github.com/ipfs/go-ipfs" version "github.com/ipfs/go-ipfs"
rp "github.com/ipfs/go-ipfs/exchange/reprovide" rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore" filestore "github.com/ipfs/go-ipfs/filestore"
...@@ -101,10 +103,10 @@ type IpfsNode struct { ...@@ -101,10 +103,10 @@ type IpfsNode struct {
Pinning pin.Pinner // the pinning manager Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any. Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key PrivateKey ic.PrivKey // the local node's private Key
PNetFingerprint PNetFingerprint // fingerprint of private network PNetFingerprint PNetFingerprint `optional:"true"` // fingerprint of private network
// Services // Services
Peerstore pstore.Peerstore // storage for other Peer instances Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level) Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore Filestore *filestore.Filestore // the filestore blockstore
BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping
...@@ -112,30 +114,32 @@ type IpfsNode struct { ...@@ -112,30 +114,32 @@ type IpfsNode struct {
Blocks bserv.BlockService // the block service, get/add blocks. Blocks bserv.BlockService // the block service, get/add blocks.
DAG ipld.DAGService // the merkle dag service, get/add objects. DAG ipld.DAGService // the merkle dag service, get/add objects.
Resolver *resolver.Resolver // the path resolution system Resolver *resolver.Resolver // the path resolution system
Reporter metrics.Reporter Reporter metrics.Reporter `optional:"true"`
Discovery discovery.Service `optional:"true"` Discovery discovery.Service `optional:"true"`
FilesRoot *mfs.Root FilesRoot *mfs.Root
RecordValidator record.Validator RecordValidator record.Validator
// Online // Online
PeerHost p2phost.Host // the network host (server+client) PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Routing routing.IpfsRouting `optional:"true"` // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap) Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider provider.Provider // the value provider system Provider provider.Provider // the value provider system
Reprovider *rp.Reprovider // the value reprovider system Reprovider *rp.Reprovider `optional:"true"` // the value reprovider system
IpnsRepub *ipnsrp.Republisher `optional:"true"` IpnsRepub *ipnsrp.Republisher `optional:"true"`
AutoNAT *autonat.AutoNATService `optional:"true"` AutoNAT *autonat.AutoNATService `optional:"true"`
PubSub *pubsub.PubSub PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore PSRouter *psrouter.PubsubValueStore `optional:"true"`
DHT *dht.IpfsDHT DHT *dht.IpfsDHT `optional:"true"`
P2P *p2p.P2P P2P *p2p.P2P `optional:"true"`
proc goprocess.Process proc goprocess.Process //TODO: remove
ctx context.Context ctx context.Context
app *fx.App
// Flags // Flags
IsOnline bool `optional:"true"` // Online is set when networking is enabled. IsOnline bool `optional:"true"` // Online is set when networking is enabled.
IsDaemon bool `optional:"true"` // Daemon is set when running on a long-running daemon. IsDaemon bool `optional:"true"` // Daemon is set when running on a long-running daemon.
...@@ -648,9 +652,9 @@ func (n *IpfsNode) Process() goprocess.Process { ...@@ -648,9 +652,9 @@ func (n *IpfsNode) Process() goprocess.Process {
return n.proc return n.proc
} }
// Close calls Close() on the Process object // Close calls Close() on the App object
func (n *IpfsNode) Close() error { func (n *IpfsNode) Close() error {
return n.proc.Close() return n.app.Stop(n.ctx)
} }
// Context returns the IpfsNode context // Context returns the IpfsNode context
......
...@@ -111,6 +111,10 @@ func privateKey(cfg *iconfig.Config, id peer.ID) (ic.PrivKey, error) { ...@@ -111,6 +111,10 @@ func privateKey(cfg *iconfig.Config, id peer.ID) (ic.PrivKey, error) {
return sk, nil return sk, nil
} }
func datastoreCtor(repo repo.Repo) ds.Datastore {
return repo.Datastore()
}
func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs bstore.Blockstore, err error) { func baseBlockstoreCtor(repo repo.Repo, cfg *iconfig.Config, bcfg *BuildCfg, lc fx.Lifecycle) (bs bstore.Blockstore, err error) {
rds := &retry.Datastore{ rds := &retry.Datastore{
Batching: repo.Datastore(), Batching: repo.Datastore(),
...@@ -180,8 +184,6 @@ func recordValidator(ps pstore.Peerstore) record.Validator { ...@@ -180,8 +184,6 @@ func recordValidator(ps pstore.Peerstore) record.Validator {
// libp2p // libp2p
var ipfsp2p = fx.Options( var ipfsp2p = fx.Options(
fx.Provide(peerstore),
fx.Provide(p2pAddrFilters), fx.Provide(p2pAddrFilters),
fx.Provide(p2pBandwidthCounter), fx.Provide(p2pBandwidthCounter),
fx.Provide(p2pPNet), fx.Provide(p2pPNet),
...@@ -411,6 +413,9 @@ func p2pHost(lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) { ...@@ -411,6 +413,9 @@ func p2pHost(lc fx.Lifecycle, params p2pHostIn) (out p2pHostOut, err error) {
})) }))
out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...)
if err != nil {
return p2pHostOut{}, err
}
// this code is necessary just for tests: mock network constructions // this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing! // ignore the libp2p constructor options that actually construct the routing!
...@@ -758,14 +763,14 @@ func lifecycleCtx(lc fx.Lifecycle) context.Context { ...@@ -758,14 +763,14 @@ func lifecycleCtx(lc fx.Lifecycle) context.Context {
} }
func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) { func lcGoProc(lc fx.Lifecycle, processFunc goprocess.ProcessFunc) {
proc := goprocess.Background() proc := make(chan goprocess.Process, 1)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
proc.Go(processFunc) proc <- goprocess.Go(processFunc)
return nil return nil
}, },
OnStop: func(ctx context.Context) error { OnStop: func(ctx context.Context) error {
return proc.Close() // todo: respect ctx return (<-proc).Close() // todo: respect ctx
}, },
}) })
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论