提交 43191299 作者: Brian Tiger Chow

Merge pull request #606 from jbenet/fix-424-blockservice-async-HasBlock

fix #424: when adding blocks, provide to exchange asynchronously
...@@ -22,6 +22,7 @@ func TestBlocks(t *testing.T) { ...@@ -22,6 +22,7 @@ func TestBlocks(t *testing.T) {
t.Error("failed to construct block service", err) t.Error("failed to construct block service", err)
return return
} }
defer bs.Close()
b := blocks.NewBlock([]byte("beep boop")) b := blocks.NewBlock([]byte("beep boop"))
h := u.Hash([]byte("beep boop")) h := u.Hash([]byte("beep boop"))
...@@ -61,6 +62,9 @@ func TestBlocks(t *testing.T) { ...@@ -61,6 +62,9 @@ func TestBlocks(t *testing.T) {
func TestGetBlocksSequential(t *testing.T) { func TestGetBlocksSequential(t *testing.T) {
var servs = Mocks(t, 4) var servs = Mocks(t, 4)
for _, s := range servs {
defer s.Close()
}
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
blks := bg.Blocks(50) blks := bg.Blocks(50)
...@@ -73,7 +77,7 @@ func TestGetBlocksSequential(t *testing.T) { ...@@ -73,7 +77,7 @@ func TestGetBlocksSequential(t *testing.T) {
t.Log("one instance at a time, get blocks concurrently") t.Log("one instance at a time, get blocks concurrently")
for i := 1; i < len(servs); i++ { for i := 1; i < len(servs); i++ {
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) ctx, _ := context.WithTimeout(context.TODO(), time.Second*50)
out := servs[i].GetBlocks(ctx, keys) out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[u.Key]*blocks.Block) gotten := make(map[u.Key]*blocks.Block)
for blk := range out { for blk := range out {
......
...@@ -8,20 +8,33 @@ import ( ...@@ -8,20 +8,33 @@ import (
"fmt" "fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procrl "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/blocks/blockstore" "github.com/jbenet/go-ipfs/blocks/blockstore"
worker "github.com/jbenet/go-ipfs/blockservice/worker"
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
var wc = worker.Config{
// When running on a single core, NumWorkers has a harsh negative effect on
// throughput. (-80% when < 25)
// Running a lot more workers appears to have very little effect on both
// single and multicore configurations.
NumWorkers: 25,
// These have no effect on when running on multiple cores, but harsh
// negative effect on throughput when running on a single core
// On multicore configurations these buffers have little effect on
// throughput.
// On single core configurations, larger buffers have severe adverse
// effects on throughput.
ClientBufferSize: 0,
WorkerBufferSize: 0,
}
var log = u.Logger("blockservice") var log = u.Logger("blockservice")
var ErrNotFound = errors.New("blockservice: key not found") var ErrNotFound = errors.New("blockservice: key not found")
// MaxExchangeAddWorkers rate limits the number of exchange workers
var MaxExchangeAddWorkers = 100
// BlockService is a hybrid block datastore. It stores data in a local // BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange. // datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values. // It uses an internal `datastore.Datastore` instance to store values.
...@@ -30,8 +43,7 @@ type BlockService struct { ...@@ -30,8 +43,7 @@ type BlockService struct {
Blockstore blockstore.Blockstore Blockstore blockstore.Blockstore
Exchange exchange.Interface Exchange exchange.Interface
rateLimiter *procrl.RateLimiter worker *worker.Worker
exchangeAdd chan blocks.Block
} }
// NewBlockService creates a BlockService with given datastore instance. // NewBlockService creates a BlockService with given datastore instance.
...@@ -43,15 +55,10 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error ...@@ -43,15 +55,10 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error
log.Warning("blockservice running in local (offline) mode.") log.Warning("blockservice running in local (offline) mode.")
} }
// exchangeAdd is a channel for async workers to add to the exchange.
// 100 blocks buffer. not clear what this number should be
exchangeAdd := make(chan blocks.Block, 100)
return &BlockService{ return &BlockService{
Blockstore: bs, Blockstore: bs,
Exchange: rem, Exchange: rem,
exchangeAdd: exchangeAdd, worker: worker.NewWorker(rem, wc),
rateLimiter: procrl.NewRateLimiter(process.Background(), MaxExchangeAddWorkers),
}, nil }, nil
} }
...@@ -63,22 +70,8 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { ...@@ -63,22 +70,8 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
if err != nil { if err != nil {
return k, err return k, err
} }
if err := s.worker.HasBlock(b); err != nil {
// this operation rate-limits blockservice operations, so it is return "", errors.New("blockservice is closed")
// now an async process.
if s.Exchange != nil {
// LimitedGo will spawn a goroutine but provide proper backpressure.
// it will not spawn the goroutine until the ratelimiter's work load
// is under the threshold.
s.rateLimiter.LimitedGo(func(worker process.Process) {
ctx := context.TODO()
if err := s.Exchange.HasBlock(ctx, b); err != nil {
// suppress error, as the client shouldn't care about bitswap.
// the client only cares about the blockstore.Put.
log.Errorf("Exchange.HasBlock error: %s", err)
}
})
} }
return k, nil return k, nil
} }
...@@ -148,3 +141,8 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks ...@@ -148,3 +141,8 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
func (s *BlockService) DeleteBlock(k u.Key) error { func (s *BlockService) DeleteBlock(k u.Key) error {
return s.Blockstore.DeleteBlock(k) return s.Blockstore.DeleteBlock(k)
} }
func (s *BlockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.worker.Close()
}
package main
import (
"log"
"math"
"testing"
"time"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
worker "github.com/jbenet/go-ipfs/blockservice/worker"
"github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/thirdparty/delay"
"github.com/jbenet/go-ipfs/util/datastore2"
)
const kEstRoutingDelay = time.Second
const kBlocksPerOp = 100
func main() {
var bestConfig worker.Config
var quickestNsPerOp int64 = math.MaxInt64
for NumWorkers := 1; NumWorkers < 10; NumWorkers++ {
for ClientBufferSize := 0; ClientBufferSize < 10; ClientBufferSize++ {
for WorkerBufferSize := 0; WorkerBufferSize < 10; WorkerBufferSize++ {
c := worker.Config{
NumWorkers: NumWorkers,
ClientBufferSize: ClientBufferSize,
WorkerBufferSize: WorkerBufferSize,
}
result := testing.Benchmark(BenchmarkWithConfig(c))
if result.NsPerOp() < quickestNsPerOp {
bestConfig = c
quickestNsPerOp = result.NsPerOp()
}
log.Printf("benched %+v \t result: %+v", c, result)
}
}
}
log.Println(bestConfig)
}
func BenchmarkWithConfig(c worker.Config) func(b *testing.B) {
return func(b *testing.B) {
routingDelay := delay.Fixed(0) // during setup
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), routingDelay))
bstore := blockstore.NewBlockstore(dstore)
var testdata []*blocks.Block
var i int64
for i = 0; i < kBlocksPerOp; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(kBlocksPerOp)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := worker.NewWorker(offline.Exchange(bstore), c)
b.StartTimer()
prev := routingDelay.Set(kEstRoutingDelay) // during measured section
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
routingDelay.Set(prev) // to hasten the unmeasured close period
b.StopTimer()
w.Close()
b.StartTimer()
}
}
}
package worker
import (
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
"github.com/jbenet/go-ipfs/exchange/offline"
)
func BenchmarkHandle10KBlocks(b *testing.B) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
var testdata []*blocks.Block
for i := 0; i < 10000; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(10000)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := NewWorker(offline.Exchange(bstore), Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
})
b.StartTimer()
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
b.StopTimer()
w.Close()
b.StartTimer()
}
}
// TODO FIXME name me
package worker
import (
"container/list"
"errors"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/jbenet/go-ipfs/blocks"
exchange "github.com/jbenet/go-ipfs/exchange"
util "github.com/jbenet/go-ipfs/util"
)
var log = util.Logger("blockservice")
var DefaultConfig = Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
}
type Config struct {
// NumWorkers sets the number of background workers that provide blocks to
// the exchange.
NumWorkers int
// ClientBufferSize allows clients of HasBlock to send up to
// |ClientBufferSize| blocks without blocking.
ClientBufferSize int
// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
// communication-coordination within the worker.
WorkerBufferSize int
}
// TODO FIXME name me
type Worker struct {
// added accepts blocks from client
added chan *blocks.Block
exchange exchange.Interface
// workQueue is owned by the client worker
// process manages life-cycle
process process.Process
}
func NewWorker(e exchange.Interface, c Config) *Worker {
if c.NumWorkers < 1 {
c.NumWorkers = 1 // provide a sane default
}
w := &Worker{
exchange: e,
added: make(chan *blocks.Block, c.ClientBufferSize),
process: process.WithParent(process.Background()), // internal management
}
w.start(c)
return w
}
func (w *Worker) HasBlock(b *blocks.Block) error {
select {
case <-w.process.Closed():
return errors.New("blockservice worker is closed")
case w.added <- b:
return nil
}
}
func (w *Worker) Close() error {
log.Debug("blockservice provide worker is shutting down...")
return w.process.Close()
}
func (w *Worker) start(c Config) {
workerChan := make(chan *blocks.Block, c.WorkerBufferSize)
// clientWorker handles incoming blocks from |w.added| and sends to
// |workerChan|. This will never block the client.
w.process.Go(func(proc process.Process) {
defer close(workerChan)
var workQueue BlockList
debugInfo := time.NewTicker(5 * time.Second)
defer debugInfo.Stop()
for {
// take advantage of the fact that sending on nil channel always
// blocks so that a message is only sent if a block exists
sendToWorker := workerChan
nextBlock := workQueue.Pop()
if nextBlock == nil {
sendToWorker = nil
}
select {
// if worker is ready and there's a block to process, send the
// block
case sendToWorker <- nextBlock:
case <-debugInfo.C:
if workQueue.Len() > 0 {
log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
}
case block := <-w.added:
if nextBlock != nil {
workQueue.Push(nextBlock) // missed the chance to send it
}
// if the client sends another block, add it to the queue.
workQueue.Push(block)
case <-proc.Closing():
return
}
}
})
// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
ctx := childContext(proc) // shut down in-progress HasBlock when time to die
limiter := ratelimit.NewRateLimiter(proc, c.NumWorkers)
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
limiter.LimitedGo(func(proc process.Process) {
if err := w.exchange.HasBlock(ctx, block); err != nil {
log.Infof("blockservice worker error: %s", err)
}
})
}
}
})
}
type BlockList struct {
list list.List
uniques map[util.Key]*list.Element
}
func (s *BlockList) PushFront(b *blocks.Block) {
if s.uniques == nil {
s.uniques = make(map[util.Key]*list.Element)
}
_, ok := s.uniques[b.Key()]
if !ok {
e := s.list.PushFront(b)
s.uniques[b.Key()] = e
}
}
func (s *BlockList) Push(b *blocks.Block) {
if s.uniques == nil {
s.uniques = make(map[util.Key]*list.Element)
}
_, ok := s.uniques[b.Key()]
if !ok {
e := s.list.PushBack(b)
s.uniques[b.Key()] = e
}
}
func (s *BlockList) Pop() *blocks.Block {
if s.list.Len() == 0 {
return nil
}
e := s.list.Front()
s.list.Remove(e)
b := e.Value.(*blocks.Block)
delete(s.uniques, b.Key())
return b
}
func (s *BlockList) Len() int {
return s.list.Len()
}
// TODO extract
type waitable interface {
Closing() <-chan struct{}
}
// TODO extract
func childContext(w waitable) context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-w.Closing()
cancel()
}()
return ctx
}
package worker
import (
blocks "github.com/jbenet/go-ipfs/blocks"
"testing"
)
func TestStartClose(t *testing.T) {
numRuns := 50
if testing.Short() {
numRuns = 5
}
for i := 0; i < numRuns; i++ {
w := NewWorker(nil, DefaultConfig)
w.Close()
}
}
func TestQueueDeduplication(t *testing.T) {
numUniqBlocks := 5 // arbitrary
var firstBatch []*blocks.Block
for i := 0; i < numUniqBlocks; i++ {
firstBatch = append(firstBatch, blockFromInt(i))
}
// to get different pointer values and prevent the implementation from
// cheating. The impl must check equality using Key.
var secondBatch []*blocks.Block
for i := 0; i < numUniqBlocks; i++ {
secondBatch = append(secondBatch, blockFromInt(i))
}
var workQueue BlockList
for _, b := range append(firstBatch, secondBatch...) {
workQueue.Push(b)
}
for i := 0; i < numUniqBlocks; i++ {
b := workQueue.Pop()
if b.Key() != firstBatch[i].Key() {
t.Fatal("list is not FIFO")
}
}
if b := workQueue.Pop(); b != nil {
t.Fatal("the workQueue did not de-duplicate the blocks")
}
}
func TestPushPopPushPop(t *testing.T) {
var workQueue BlockList
orig := blockFromInt(1)
dup := blockFromInt(1)
workQueue.PushFront(orig)
workQueue.Pop()
workQueue.Push(dup)
if workQueue.Len() != 1 {
t.Fatal("the block list's internal state is corrupt")
}
}
func blockFromInt(i int) *blocks.Block {
return blocks.NewBlock([]byte(string(i)))
}
...@@ -270,6 +270,9 @@ func (n *IpfsNode) teardown() error { ...@@ -270,6 +270,9 @@ func (n *IpfsNode) teardown() error {
if n.Repo != nil { if n.Repo != nil {
closers = append(closers, n.Repo) closers = append(closers, n.Repo)
} }
if n.Blocks != nil {
closers = append(closers, n.Blocks)
}
if n.Routing != nil { if n.Routing != nil {
if dht, ok := n.Routing.(*dht.IpfsDHT); ok { if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht) closers = append(closers, dht)
......
...@@ -38,8 +38,6 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { ...@@ -38,8 +38,6 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// update the peer (on valid msgs only) // update the peer (on valid msgs only)
dht.updateFromMessage(ctx, mPeer, pmes) dht.updateFromMessage(ctx, mPeer, pmes)
log.Event(ctx, "foo", dht.self, mPeer, pmes)
// get handler for this msg type. // get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType()) handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil { if handler == nil {
......
...@@ -215,7 +215,7 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer ...@@ -215,7 +215,7 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer
// run it! // run it!
_, err := query.Run(ctx, tablepeers) _, err := query.Run(ctx, tablepeers)
if err != nil { if err != nil {
log.Errorf("closestPeers query run error: %s", err) log.Debugf("closestPeers query run error: %s", err)
} }
}() }()
......
...@@ -9,6 +9,7 @@ PACKAGE_DIR = test/epictest ...@@ -9,6 +9,7 @@ PACKAGE_DIR = test/epictest
BUILD_DIR = ./build/bench BUILD_DIR = ./build/bench
CONTAINER_WORKING_DIR = /go CONTAINER_WORKING_DIR = /go
CPU_PROF_NAME = cpu.out CPU_PROF_NAME = cpu.out
EXTRA_TEST_ARGS =
all: collect all: collect
...@@ -22,7 +23,7 @@ build_image: ...@@ -22,7 +23,7 @@ build_image:
cd $(IPFS_ROOT) && docker build -t $(IMAGE) . cd $(IPFS_ROOT) && docker build -t $(IMAGE) .
run_profiler: run_profiler:
docker run --name $(CONTAINER) -it --entrypoint go $(IMAGE) test ./src/github.com/jbenet/go-ipfs/$(PACKAGE_DIR) --cpuprofile=$(CPU_PROF_NAME) docker run --name $(CONTAINER) -it --entrypoint go $(IMAGE) test ./src/github.com/jbenet/go-ipfs/$(PACKAGE_DIR) --cpuprofile=$(CPU_PROF_NAME) $(EXTRA_TEST_ARGS)
clean: clean:
......
...@@ -108,10 +108,12 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { ...@@ -108,10 +108,12 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
if err != nil { if err != nil {
return err return err
} }
defer adder.Close()
catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf))) catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf)))
if err != nil { if err != nil {
return err return err
} }
defer catter.Close()
catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}) catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)})
adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}) adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)})
......
...@@ -51,14 +51,17 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { ...@@ -51,14 +51,17 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
if err != nil { if err != nil {
return err return err
} }
defer adder.Close()
catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf))) catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf)))
if err != nil { if err != nil {
return err return err
} }
defer catter.Close()
bootstrap, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[2], mn.Host(peers[2]), conf))) bootstrap, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[2], mn.Host(peers[2]), conf)))
if err != nil { if err != nil {
return err return err
} }
defer bootstrap.Close()
boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论