提交 c4debb59 作者: Łukasz Magiera

constructor: Add few docstrings

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 456084e6
...@@ -83,6 +83,7 @@ func (cfg *BuildCfg) fillDefaults() error { ...@@ -83,6 +83,7 @@ func (cfg *BuildCfg) fillDefaults() error {
return nil return nil
} }
// options creates fx option group from this build config
func (cfg *BuildCfg) options(ctx context.Context) fx.Option { func (cfg *BuildCfg) options(ctx context.Context) fx.Option {
err := cfg.fillDefaults() err := cfg.fillDefaults()
if err != nil { if err != nil {
......
...@@ -25,7 +25,8 @@ import ( ...@@ -25,7 +25,8 @@ import (
"go.uber.org/fx" "go.uber.org/fx"
) )
func BlockServiceCtor(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { // BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem) bsvc := blockservice.New(bs, rem)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
...@@ -37,6 +38,7 @@ func BlockServiceCtor(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.In ...@@ -37,6 +38,7 @@ func BlockServiceCtor(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.In
return bsvc return bsvc
} }
// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag) pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag)
...@@ -51,11 +53,13 @@ func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) ...@@ -51,11 +53,13 @@ func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo)
return pinning, nil return pinning, nil
} }
func DagCtor(bs blockservice.BlockService) format.DAGService { // Dag creates new DAGService
func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs) return merkledag.NewDAGService(bs)
} }
func OnlineExchangeCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface { // OnlineExchange creates new LibP2P backed block exchange (BitSwap)
func OnlineExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(host, rt) bitswapNetwork := network.NewFromIpfsHost(host, rt)
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
...@@ -66,6 +70,7 @@ func OnlineExchangeCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host ...@@ -66,6 +70,7 @@ func OnlineExchangeCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host
return exch return exch
} }
// Files loads persisted MFS root
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot") dsk := datastore.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error { pf := func(ctx context.Context, c cid.Cid) error {
......
...@@ -51,25 +51,29 @@ func LibP2P(cfg *BuildCfg) fx.Option { ...@@ -51,25 +51,29 @@ func LibP2P(cfg *BuildCfg) fx.Option {
return opts return opts
} }
// Storage groups units which setup datastore based persistence and blockstore layers
func Storage(cfg *BuildCfg) fx.Option { func Storage(cfg *BuildCfg) fx.Option {
return fx.Options( return fx.Options(
fx.Provide(RepoConfig), fx.Provide(RepoConfig),
fx.Provide(DatastoreCtor), fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)), fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)),
fx.Provide(GcBlockstoreCtor), fx.Provide(GcBlockstoreCtor),
) )
} }
// Identity groups units providing cryptographic identity
var Identity = fx.Options( var Identity = fx.Options(
fx.Provide(PeerID), fx.Provide(PeerID),
fx.Provide(PrivateKey), fx.Provide(PrivateKey),
fx.Provide(libp2p.Peerstore), fx.Provide(libp2p.Peerstore),
) )
// IPNS groups namesys related units
var IPNS = fx.Options( var IPNS = fx.Options(
fx.Provide(RecordValidator), fx.Provide(RecordValidator),
) )
// Providers groups units managing provider routing records
var Providers = fx.Options( var Providers = fx.Options(
fx.Provide(ProviderQueue), fx.Provide(ProviderQueue),
fx.Provide(ProviderCtor), fx.Provide(ProviderCtor),
...@@ -78,10 +82,11 @@ var Providers = fx.Options( ...@@ -78,10 +82,11 @@ var Providers = fx.Options(
fx.Invoke(Reprovider), fx.Invoke(Reprovider),
) )
// Online groups online-only units
func Online(cfg *BuildCfg) fx.Option { func Online(cfg *BuildCfg) fx.Option {
return fx.Options( return fx.Options(
fx.Provide(OnlineExchangeCtor), fx.Provide(OnlineExchange),
fx.Provide(OnlineNamesysCtor), fx.Provide(OnlineNamesys),
fx.Invoke(IpnsRepublisher), fx.Invoke(IpnsRepublisher),
...@@ -92,16 +97,18 @@ func Online(cfg *BuildCfg) fx.Option { ...@@ -92,16 +97,18 @@ func Online(cfg *BuildCfg) fx.Option {
) )
} }
// Offline groups offline alternatives to Online units
var Offline = fx.Options( var Offline = fx.Options(
fx.Provide(offline.Exchange), fx.Provide(offline.Exchange),
fx.Provide(OfflineNamesysCtor), fx.Provide(OfflineNamesys),
fx.Provide(offroute.NewOfflineRouter), fx.Provide(offroute.NewOfflineRouter),
fx.Provide(provider.NewOfflineProvider), fx.Provide(provider.NewOfflineProvider),
) )
// Core groups basic IPFS services
var Core = fx.Options( var Core = fx.Options(
fx.Provide(BlockServiceCtor), fx.Provide(BlockService),
fx.Provide(DagCtor), fx.Provide(Dag),
fx.Provide(resolver.NewBasicResolver), fx.Provide(resolver.NewBasicResolver),
fx.Provide(Pinning), fx.Provide(Pinning),
fx.Provide(Files), fx.Provide(Files),
...@@ -114,6 +121,7 @@ func Networked(cfg *BuildCfg) fx.Option { ...@@ -114,6 +121,7 @@ func Networked(cfg *BuildCfg) fx.Option {
return Offline return Offline
} }
// IPFS builds a group of fx Options based on the passed BuildCfg
func IPFS(ctx context.Context, cfg *BuildCfg) fx.Option { func IPFS(ctx context.Context, cfg *BuildCfg) fx.Option {
if cfg == nil { if cfg == nil {
cfg = new(BuildCfg) cfg = new(BuildCfg)
......
...@@ -50,6 +50,7 @@ func setupSharding(cfg *config.Config) { ...@@ -50,6 +50,7 @@ func setupSharding(cfg *config.Config) {
uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled
} }
// baseProcess creates a goprocess which is closed when the lifecycle signals it to stop
func baseProcess(lc fx.Lifecycle) goprocess.Process { func baseProcess(lc fx.Lifecycle) goprocess.Process {
p := goprocess.WithParent(goprocess.Background()) p := goprocess.WithParent(goprocess.Background())
lc.Append(fx.Hook{ lc.Append(fx.Hook{
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-peer" "github.com/libp2p/go-libp2p-peer"
) )
// PeerID loads peer identity form config
func PeerID(cfg *config.Config) (peer.ID, error) { func PeerID(cfg *config.Config) (peer.ID, error) {
cid := cfg.Identity.PeerID cid := cfg.Identity.PeerID
if cid == "" { if cid == "" {
...@@ -26,6 +27,7 @@ func PeerID(cfg *config.Config) (peer.ID, error) { ...@@ -26,6 +27,7 @@ func PeerID(cfg *config.Config) (peer.ID, error) {
return id, nil return id, nil
} }
// PrivateKey loads the private key from config
func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) { func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) {
if cfg.Identity.PrivKey == "" { if cfg.Identity.PrivKey == "" {
return nil, nil return nil, nil
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
const DefaultIpnsCacheSize = 128 const DefaultIpnsCacheSize = 128
// RecordValidator provides namesys compatible routing record validator
func RecordValidator(ps peerstore.Peerstore) record.Validator { func RecordValidator(ps peerstore.Peerstore) record.Validator {
return record.NamespacedValidator{ return record.NamespacedValidator{
"pk": record.PublicKeyValidator{}, "pk": record.PublicKeyValidator{},
...@@ -26,11 +27,13 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator { ...@@ -26,11 +27,13 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator {
} }
} }
func OfflineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { // OfflineNamesys creates namesys setup for offline operation
func OfflineNamesys(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) {
return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil
} }
func OnlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) { // OnlineNamesys createn new namesys setup for online operation
func OnlineNamesys(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) {
cs := cfg.Ipns.ResolveCacheSize cs := cfg.Ipns.ResolveCacheSize
if cs == 0 { if cs == 0 {
cs = DefaultIpnsCacheSize cs = DefaultIpnsCacheSize
...@@ -41,6 +44,7 @@ func OnlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Confi ...@@ -41,6 +44,7 @@ func OnlineNamesysCtor(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Confi
return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil
} }
// IpnsRepublisher runs new IPNS republisher service
func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error {
repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore())
......
...@@ -19,10 +19,12 @@ import ( ...@@ -19,10 +19,12 @@ import (
const kReprovideFrequency = time.Hour * 12 const kReprovideFrequency = time.Hour * 12
// ProviderQueue creates new datastore backed provider queue
func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) { func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) {
return provider.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) return provider.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
} }
// ProviderCtor creates new record provider
func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider { func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider {
p := provider.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) p := provider.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)
...@@ -39,6 +41,7 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu ...@@ -39,6 +41,7 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu
return p return p
} }
// ReproviderCtor creates new reprovider
func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) {
var keyProvider reprovide.KeyChanFunc var keyProvider reprovide.KeyChanFunc
...@@ -57,6 +60,7 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config ...@@ -57,6 +60,7 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil
} }
// Reprovider runs the reprovider service
func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error {
reproviderInterval := kReprovideFrequency reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" { if cfg.Reprovider.Interval != "" {
...@@ -68,6 +72,6 @@ func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { ...@@ -68,6 +72,6 @@ func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error {
reproviderInterval = dur reproviderInterval = dur
} }
go reprovider.Run(reproviderInterval) go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle
return nil return nil
} }
...@@ -28,16 +28,20 @@ func isTooManyFDError(err error) bool { ...@@ -28,16 +28,20 @@ func isTooManyFDError(err error) bool {
return false return false
} }
// RepoConfig loads configuration from the repo
func RepoConfig(repo repo.Repo) (*config.Config, error) { func RepoConfig(repo repo.Repo) (*config.Config, error) {
return repo.Config() return repo.Config()
} }
func DatastoreCtor(repo repo.Repo) datastore.Datastore { // Datastore provides the datastore
func Datastore(repo repo.Repo) datastore.Datastore {
return repo.Datastore() return repo.Datastore()
} }
// BaseBlocks is the lower level blockstore without GC or Filestore layers
type BaseBlocks blockstore.Blockstore type BaseBlocks blockstore.Blockstore
// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { return func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) {
rds := &retrystore.Datastore{ rds := &retrystore.Datastore{
...@@ -82,6 +86,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC ...@@ -82,6 +86,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC
} }
} }
// GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers
func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
gclocker = blockstore.NewGCLocker() gclocker = blockstore.NewGCLocker()
gcbs = blockstore.NewGCBlockstore(bb, gclocker) gcbs = blockstore.NewGCBlockstore(bb, gclocker)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论