提交 1edc5c0a 作者: Brian Tiger Chow

refactor(core) Close in teardown

This declarative style is simpler to compose than the imperative wiring
up of objects.

+ pass context to StartOnlineServices as parameter. one by one, trying
to remove dependencies on node state so these initialization steps can
be broken down.
上级 deb1bbe7
...@@ -2,6 +2,7 @@ package core ...@@ -2,6 +2,7 @@ package core
import ( import (
"fmt" "fmt"
"io"
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -100,11 +101,22 @@ type Mounts struct { ...@@ -100,11 +101,22 @@ type Mounts struct {
type ConfigOption func(ctx context.Context) (*IpfsNode, error) type ConfigOption func(ctx context.Context) (*IpfsNode, error)
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) { func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) {
ctxg := ctxgroup.WithContext(parent)
ctx := ctxg.Context()
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
ctxg.Close()
}
}()
node, err := option(ctx) node, err := option(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
node.ContextGroup = ctxg
ctxg.SetTeardown(node.teardown)
// Need to make sure it's perfectly clear 1) which variables are expected // Need to make sure it's perfectly clear 1) which variables are expected
// to be initialized at this point, and 2) which variables will be // to be initialized at this point, and 2) which variables will be
...@@ -123,6 +135,7 @@ func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) { ...@@ -123,6 +135,7 @@ func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG) node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG)
} }
node.Resolver = &path.Resolver{DAG: node.DAG} node.Resolver = &path.Resolver{DAG: node.DAG}
success = true
return node, nil return node, nil
} }
...@@ -138,13 +151,6 @@ func Online(r repo.Repo) ConfigOption { ...@@ -138,13 +151,6 @@ func Online(r repo.Repo) ConfigOption {
func Standard(r repo.Repo, online bool) ConfigOption { func Standard(r repo.Repo, online bool) ConfigOption {
return func(ctx context.Context) (n *IpfsNode, err error) { return func(ctx context.Context) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success && n != nil {
n.Close()
}
}()
if r == nil { if r == nil {
return nil, debugerror.Errorf("repo required") return nil, debugerror.Errorf("repo required")
} }
...@@ -158,9 +164,6 @@ func Standard(r repo.Repo, online bool) ConfigOption { ...@@ -158,9 +164,6 @@ func Standard(r repo.Repo, online bool) ConfigOption {
Repo: r, Repo: r,
} }
n.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, n.teardown)
ctx = n.ContextGroup.Context()
// setup Peerstore // setup Peerstore
n.Peerstore = peer.NewPeerstore() n.Peerstore = peer.NewPeerstore()
...@@ -176,20 +179,18 @@ func Standard(r repo.Repo, online bool) ConfigOption { ...@@ -176,20 +179,18 @@ func Standard(r repo.Repo, online bool) ConfigOption {
// setup online services // setup online services
if online { if online {
if err := n.StartOnlineServices(); err != nil { if err := n.StartOnlineServices(ctx); err != nil {
return nil, err // debugerror.Wraps. return nil, err // debugerror.Wraps.
} }
} else { } else {
n.Exchange = offline.Exchange(n.Blockstore) n.Exchange = offline.Exchange(n.Blockstore)
} }
success = true
return n, nil return n, nil
} }
} }
func (n *IpfsNode) StartOnlineServices() error { func (n *IpfsNode) StartOnlineServices(ctx context.Context) error {
ctx := n.Context()
if n.PeerHost != nil { // already online. if n.PeerHost != nil { // already online.
return debugerror.New("node already online") return debugerror.New("node already online")
...@@ -200,7 +201,7 @@ func (n *IpfsNode) StartOnlineServices() error { ...@@ -200,7 +201,7 @@ func (n *IpfsNode) StartOnlineServices() error {
return err return err
} }
peerhost, err := constructPeerHost(ctx, n.ContextGroup, n.Repo.Config(), n.Identity, n.Peerstore) peerhost, err := constructPeerHost(ctx, n.Repo.Config(), n.Identity, n.Peerstore)
if err != nil { if err != nil {
return debugerror.Wrap(err) return debugerror.Wrap(err)
} }
...@@ -210,7 +211,7 @@ func (n *IpfsNode) StartOnlineServices() error { ...@@ -210,7 +211,7 @@ func (n *IpfsNode) StartOnlineServices() error {
n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost) n.Diagnostics = diag.NewDiagnostics(n.Identity, n.PeerHost)
// setup routing service // setup routing service
dhtRouting, err := constructDHTRouting(ctx, n.ContextGroup, n.PeerHost, n.Repo.Datastore()) dhtRouting, err := constructDHTRouting(ctx, n.PeerHost, n.Repo.Datastore())
if err != nil { if err != nil {
return debugerror.Wrap(err) return debugerror.Wrap(err)
} }
...@@ -250,9 +251,27 @@ func (n *IpfsNode) StartOnlineServices() error { ...@@ -250,9 +251,27 @@ func (n *IpfsNode) StartOnlineServices() error {
return nil return nil
} }
// teardown closes children
func (n *IpfsNode) teardown() error { func (n *IpfsNode) teardown() error {
if err := n.Repo.Close(); err != nil { var errs []error
return err closers := []io.Closer{
n.Repo,
}
if n.DHT != nil {
closers = append(closers, n.DHT)
}
if n.PeerHost != nil {
closers = append(closers, n.PeerHost)
}
for _, closer := range closers {
if closer != nil {
if err := closer.Close(); err != nil {
errs = append(errs, err)
}
}
}
if len(errs) > 0 {
return errs[0]
} }
return nil return nil
} }
...@@ -351,7 +370,7 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { ...@@ -351,7 +370,7 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
} }
// isolates the complex initialization steps // isolates the complex initialization steps
func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) { func constructPeerHost(ctx context.Context, cfg *config.Config, id peer.ID, ps peer.Peerstore) (p2phost.Host, error) {
listenAddrs, err := listenAddresses(cfg) listenAddrs, err := listenAddresses(cfg)
if err != nil { if err != nil {
return nil, debugerror.Wrap(err) return nil, debugerror.Wrap(err)
...@@ -369,7 +388,6 @@ func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *con ...@@ -369,7 +388,6 @@ func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *con
if err != nil { if err != nil {
return nil, debugerror.Wrap(err) return nil, debugerror.Wrap(err)
} }
ctxg.AddChildGroup(network.CtxGroup())
peerhost := p2pbhost.New(network) peerhost := p2pbhost.New(network)
// explicitly set these as our listen addrs. // explicitly set these as our listen addrs.
...@@ -384,9 +402,8 @@ func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *con ...@@ -384,9 +402,8 @@ func constructPeerHost(ctx context.Context, ctxg ctxgroup.ContextGroup, cfg *con
return peerhost, nil return peerhost, nil
} }
func constructDHTRouting(ctx context.Context, ctxg ctxgroup.ContextGroup, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) { func constructDHTRouting(ctx context.Context, host p2phost.Host, ds datastore.ThreadSafeDatastore) (*dht.IpfsDHT, error) {
dhtRouting := dht.NewDHT(ctx, host, ds) dhtRouting := dht.NewDHT(ctx, host, ds)
dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord
ctxg.AddChildGroup(dhtRouting)
return dhtRouting, nil return dhtRouting, nil
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论