提交 3ac60574 作者: Łukasz Magiera 提交者: Steven Allen

Separate function to parse BuildCfg into Options

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 fd0c06a8
...@@ -2,154 +2,25 @@ package core ...@@ -2,154 +2,25 @@ package core
import ( import (
"context" "context"
"crypto/rand"
"encoding/base64"
"errors"
"go.uber.org/fx" "go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/core/node"
repo "github.com/ipfs/go-ipfs/repo"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
cfg "github.com/ipfs/go-ipfs-config"
metrics "github.com/ipfs/go-metrics-interface"
resolver "github.com/ipfs/go-path/resolver"
ci "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
) )
type BuildCfg node.BuildCfg type BuildCfg = node.BuildCfg // Alias for compatibility until we properly refactor the constructor interface
func (cfg *BuildCfg) fillDefaults() error {
if cfg.Repo != nil && cfg.NilRepo {
return errors.New("cannot set a Repo and specify nilrepo at the same time")
}
if cfg.Repo == nil {
var d ds.Datastore
if cfg.NilRepo {
d = ds.NewNullDatastore()
} else {
d = ds.NewMapDatastore()
}
r, err := defaultRepo(dsync.MutexWrap(d))
if err != nil {
return err
}
cfg.Repo = r
}
if cfg.Routing == nil {
cfg.Routing = node.DHTOption
}
if cfg.Host == nil {
cfg.Host = node.DefaultHostOption
}
return nil
}
func defaultRepo(dstore repo.Datastore) (repo.Repo, error) {
c := cfg.Config{}
priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, rand.Reader)
if err != nil {
return nil, err
}
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
return nil, err
}
privkeyb, err := priv.Bytes()
if err != nil {
return nil, err
}
c.Bootstrap = cfg.DefaultBootstrapAddresses
c.Addresses.Swarm = []string{"/ip4/0.0.0.0/tcp/4001"}
c.Identity.PeerID = pid.Pretty()
c.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb)
return &repo.Mock{
D: dstore,
C: c,
}, nil
}
// NewNode constructs and returns an IpfsNode using the given cfg. // NewNode constructs and returns an IpfsNode using the given cfg.
func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
if cfg == nil {
cfg = new(BuildCfg)
}
err := cfg.fillDefaults()
if err != nil {
return nil, err
}
ctx = metrics.CtxScope(ctx, "ipfs")
repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo {
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return cfg.Repo.Close()
},
})
return cfg.Repo
})
metricsCtx := fx.Provide(func() node.MetricsCtx {
return node.MetricsCtx(ctx)
})
hostOption := fx.Provide(func() node.HostOption {
return cfg.Host
})
routingOption := fx.Provide(func() node.RoutingOption {
return cfg.Routing
})
params := fx.Options(
repoOption,
hostOption,
routingOption,
metricsCtx,
)
core := fx.Options(
fx.Provide(node.BlockServiceCtor),
fx.Provide(node.DagCtor),
fx.Provide(resolver.NewBasicResolver),
fx.Provide(node.Pinning),
fx.Provide(node.Files),
)
n := &IpfsNode{ n := &IpfsNode{
ctx: ctx, ctx: ctx,
} }
app := fx.New( app := fx.New(
fx.NopLogger, node.IPFS(ctx, cfg),
fx.Provide(baseProcess),
params,
node.Storage((*node.BuildCfg)(cfg)),
node.Identity,
node.IPNS,
node.Networked((*node.BuildCfg)(cfg)),
fx.Invoke(setupSharding),
core,
fx.NopLogger,
fx.Extract(n), fx.Extract(n),
) )
......
package core
import (
"context"
"github.com/jbenet/goprocess"
"go.uber.org/fx"
iconfig "github.com/ipfs/go-ipfs-config"
uio "github.com/ipfs/go-unixfs/io"
)
////////////////////
// libp2p
func setupSharding(cfg *iconfig.Config) {
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled
}
func baseProcess(lc fx.Lifecycle) goprocess.Process {
p := goprocess.WithParent(goprocess.Background())
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return p.Close()
},
})
return p
}
package node package node
import ( import (
"crypto/rand"
"encoding/base64"
"errors"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
cfg "github.com/ipfs/go-ipfs-config"
ci "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo"
) )
...@@ -34,3 +44,61 @@ func (cfg *BuildCfg) getOpt(key string) bool { ...@@ -34,3 +44,61 @@ func (cfg *BuildCfg) getOpt(key string) bool {
return cfg.ExtraOpts[key] return cfg.ExtraOpts[key]
} }
func (cfg *BuildCfg) fillDefaults() error {
if cfg.Repo != nil && cfg.NilRepo {
return errors.New("cannot set a Repo and specify nilrepo at the same time")
}
if cfg.Repo == nil {
var d ds.Datastore
if cfg.NilRepo {
d = ds.NewNullDatastore()
} else {
d = ds.NewMapDatastore()
}
r, err := defaultRepo(dsync.MutexWrap(d))
if err != nil {
return err
}
cfg.Repo = r
}
if cfg.Routing == nil {
cfg.Routing = DHTOption
}
if cfg.Host == nil {
cfg.Host = DefaultHostOption
}
return nil
}
func defaultRepo(dstore repo.Datastore) (repo.Repo, error) {
c := cfg.Config{}
priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, rand.Reader)
if err != nil {
return nil, err
}
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
return nil, err
}
privkeyb, err := priv.Bytes()
if err != nil {
return nil, err
}
c.Bootstrap = cfg.DefaultBootstrapAddresses
c.Addresses.Swarm = []string{"/ip4/0.0.0.0/tcp/4001"}
c.Identity.PeerID = pid.Pretty()
c.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb)
return &repo.Mock{
D: dstore,
C: c,
}, nil
}
package node package node
import ( import (
"context"
offline "github.com/ipfs/go-ipfs-exchange-offline" offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-metrics-interface"
"github.com/ipfs/go-path/resolver"
"go.uber.org/fx" "go.uber.org/fx"
offroute "github.com/ipfs/go-ipfs-routing/offline" offroute "github.com/ipfs/go-ipfs-routing/offline"
"github.com/ipfs/go-ipfs/p2p" "github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
) )
var BaseLibP2P = fx.Options( var BaseLibP2P = fx.Options(
...@@ -35,8 +40,8 @@ func LibP2P(cfg *BuildCfg) fx.Option { ...@@ -35,8 +40,8 @@ func LibP2P(cfg *BuildCfg) fx.Option {
return fx.Options( return fx.Options(
BaseLibP2P, BaseLibP2P,
MaybeProvide(P2PNoSecurity, cfg.DisableEncryptedConnections), maybeProvide(P2PNoSecurity, cfg.DisableEncryptedConnections),
MaybeProvide(Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")), maybeProvide(Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")),
fx.Provide(P2PSmuxTransport(cfg.getOpt("mplex"))), fx.Provide(P2PSmuxTransport(cfg.getOpt("mplex"))),
fx.Provide(P2POnlineRouting(cfg.getOpt("ipnsps"))), fx.Provide(P2POnlineRouting(cfg.getOpt("ipnsps"))),
...@@ -84,6 +89,7 @@ func Online(cfg *BuildCfg) fx.Option { ...@@ -84,6 +89,7 @@ func Online(cfg *BuildCfg) fx.Option {
Providers, Providers,
) )
} }
var Offline = fx.Options( var Offline = fx.Options(
fx.Provide(offline.Exchange), fx.Provide(offline.Exchange),
fx.Provide(OfflineNamesysCtor), fx.Provide(OfflineNamesysCtor),
...@@ -91,6 +97,14 @@ var Offline = fx.Options( ...@@ -91,6 +97,14 @@ var Offline = fx.Options(
fx.Provide(provider.NewOfflineProvider), fx.Provide(provider.NewOfflineProvider),
) )
var Core = fx.Options(
fx.Provide(BlockServiceCtor),
fx.Provide(DagCtor),
fx.Provide(resolver.NewBasicResolver),
fx.Provide(Pinning),
fx.Provide(Files),
)
func Networked(cfg *BuildCfg) fx.Option { func Networked(cfg *BuildCfg) fx.Option {
if cfg.Online { if cfg.Online {
return Online(cfg) return Online(cfg)
...@@ -98,9 +112,57 @@ func Networked(cfg *BuildCfg) fx.Option { ...@@ -98,9 +112,57 @@ func Networked(cfg *BuildCfg) fx.Option {
return Offline return Offline
} }
func MaybeProvide(opt interface{}, enable bool) fx.Option { func IPFS(ctx context.Context, cfg *BuildCfg) fx.Option {
if enable { if cfg == nil {
return fx.Provide(opt) cfg = new(BuildCfg)
}
err := cfg.fillDefaults()
if err != nil {
return fx.Error(err)
} }
return fx.Options()
ctx = metrics.CtxScope(ctx, "ipfs")
repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo {
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return cfg.Repo.Close()
},
})
return cfg.Repo
})
metricsCtx := fx.Provide(func() MetricsCtx {
return MetricsCtx(ctx)
})
hostOption := fx.Provide(func() HostOption {
return cfg.Host
})
routingOption := fx.Provide(func() RoutingOption {
return cfg.Routing
})
params := fx.Options(
repoOption,
hostOption,
routingOption,
metricsCtx,
)
return fx.Options(
params,
fx.Provide(baseProcess),
fx.Invoke(setupSharding),
Storage(cfg),
Identity,
IPNS,
Networked(cfg),
Core,
)
} }
...@@ -3,6 +3,8 @@ package node ...@@ -3,6 +3,8 @@ package node
import ( import (
"context" "context"
config "github.com/ipfs/go-ipfs-config"
uio "github.com/ipfs/go-unixfs/io"
"github.com/jbenet/goprocess" "github.com/jbenet/goprocess"
"go.uber.org/fx" "go.uber.org/fx"
) )
...@@ -41,3 +43,25 @@ func (lp *lcProcess) Run(f goprocess.ProcessFunc) { ...@@ -41,3 +43,25 @@ func (lp *lcProcess) Run(f goprocess.ProcessFunc) {
}, },
}) })
} }
func maybeProvide(opt interface{}, enable bool) fx.Option {
if enable {
return fx.Provide(opt)
}
return fx.Options()
}
func setupSharding(cfg *config.Config) {
// TEMP: setting global sharding switch here
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled
}
func baseProcess(lc fx.Lifecycle) goprocess.Process {
p := goprocess.WithParent(goprocess.Background())
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return p.Close()
},
})
return p
}
...@@ -122,7 +122,7 @@ type Libp2pOpts struct { ...@@ -122,7 +122,7 @@ type Libp2pOpts struct {
Opts []libp2p.Option `group:"libp2p"` Opts []libp2p.Option `group:"libp2p"`
} }
type PNetFingerprint []byte // TODO: find some better place type PNetFingerprint []byte
func P2PPNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) { func P2PPNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) {
swarmkey, err := repo.SwarmKey() swarmkey, err := repo.SwarmKey()
if err != nil || swarmkey == nil { if err != nil || swarmkey == nil {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论