提交 5618fed0 作者: Łukasz Magiera 提交者: Steven Allen

coreapi pubsub: better ctx for connectToPubSubPeers

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 18d5999e
......@@ -20,6 +20,7 @@ import (
type PubSubAPI CoreAPI
type pubSubSubscription struct {
cancel context.CancelFunc
subscription *floodsub.Subscription
}
......@@ -75,19 +76,21 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err
}
pubctx, cancel := context.WithCancel(api.node.Context())
if options.Discover {
go func() {
blk, err := api.core().Block().Put(ctx, strings.NewReader("floodsub:"+topic))
blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic))
if err != nil {
log.Error("pubsub discovery: ", err)
return
}
connectToPubSubPeers(ctx, api.node, blk.Path().Cid())
connectToPubSubPeers(pubctx, api.node, blk.Path().Cid())
}()
}
return &pubSubSubscription{sub}, nil
return &pubSubSubscription{cancel, sub}, nil
}
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
......@@ -95,7 +98,7 @@ func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
defer cancel()
provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
var wg sync.WaitGroup
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
......@@ -127,6 +130,7 @@ func (api *PubSubAPI) checkNode() error {
}
func (sub *pubSubSubscription) Close() error {
sub.cancel()
sub.subscription.Cancel()
return nil
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论