提交 7139b836 作者: Steven Allen

switch to StreamingSet in cidutil

License: MIT
Signed-off-by: 's avatarSteven Allen <steven@stebalien.com>
上级 db646d04
......@@ -7,10 +7,10 @@ import (
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set"
dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag"
offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline"
cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil"
blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing"
......@@ -99,7 +99,7 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
}
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error {
provided := streamingset.NewStreamingSet()
provided := cidutil.NewStreamingSet()
errCh := make(chan error)
go func() {
......
......@@ -4,9 +4,9 @@ import (
"context"
pin "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/thirdparty/streaming-cid-set"
merkledag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag"
cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil"
ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
blocks "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore"
......@@ -44,8 +44,8 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool)
}
}
func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingset.StreamingSet, error) {
set := streamingset.NewStreamingSet()
func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) {
set := cidutil.NewStreamingSet()
go func() {
ctx, cancel := context.WithCancel(ctx)
......
package streamingset
import (
"context"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
)
// StreamingSet is an extension of cid.Set which allows to implement back-pressure
// for the Visit function
type StreamingSet struct {
Set *cid.Set
New chan *cid.Cid
}
// NewStreamingSet initializes and returns new Set.
func NewStreamingSet() *StreamingSet {
return &StreamingSet{
Set: cid.NewSet(),
New: make(chan *cid.Cid),
}
}
// Visitor creates new visitor which adds a Cids to the set and emits them to
// the set.New channel
func (s *StreamingSet) Visitor(ctx context.Context) func(c *cid.Cid) bool {
return func(c *cid.Cid) bool {
if s.Set.Visit(c) {
select {
case s.New <- c:
case <-ctx.Done():
}
return true
}
return false
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论