提交 6bf33ad6 作者: Jeromy 提交者: Juan Batiz-Benet

WIP: super awesome bitswap cleanup fixtime

上级 ef967cee
...@@ -4,7 +4,6 @@ package bitswap ...@@ -4,7 +4,6 @@ package bitswap
import ( import (
"errors" "errors"
"fmt"
"math" "math"
"sync" "sync"
"time" "time"
...@@ -23,7 +22,6 @@ import ( ...@@ -23,7 +22,6 @@ import (
"github.com/ipfs/go-ipfs/thirdparty/delay" "github.com/ipfs/go-ipfs/thirdparty/delay"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
pset "github.com/ipfs/go-ipfs/util/peerset" // TODO move this to peerstore
) )
var log = eventlog.Logger("bitswap") var log = eventlog.Logger("bitswap")
...@@ -45,9 +43,7 @@ const ( ...@@ -45,9 +43,7 @@ const (
provideWorkers = 4 provideWorkers = 4
) )
var ( var rebroadcastDelay = delay.Fixed(time.Second * 10)
rebroadcastDelay = delay.Fixed(time.Second * 10)
)
// New initializes a BitSwap instance that communicates over the provided // New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network // BitSwapNetwork. This function registers the returned instance as the network
...@@ -86,14 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -86,14 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif, notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network, network: network,
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan *blockRequest, sizeBatchRequestChan), batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key), provideKeys: make(chan u.Key),
pm: NewPeerManager(network), wm: NewWantManager(network),
} }
go bs.pm.Run(ctx) go bs.wm.Run(ctx)
network.SetDelegate(bs) network.SetDelegate(bs)
// Start up bitswaps async worker routines // Start up bitswaps async worker routines
...@@ -112,7 +107,7 @@ type Bitswap struct { ...@@ -112,7 +107,7 @@ type Bitswap struct {
// the peermanager manages sending messages to peers in a way that // the peermanager manages sending messages to peers in a way that
// wont block bitswap operation // wont block bitswap operation
pm *PeerManager wm *WantManager
// blockstore is the local database // blockstore is the local database
// NB: ensure threadsafety // NB: ensure threadsafety
...@@ -127,8 +122,6 @@ type Bitswap struct { ...@@ -127,8 +122,6 @@ type Bitswap struct {
engine *decision.Engine engine *decision.Engine
wantlist *wantlist.ThreadSafe
process process.Process process process.Process
newBlocks chan *blocks.Block newBlocks chan *blocks.Block
...@@ -233,60 +226,21 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -233,60 +226,21 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return err return err
} }
bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
select { select {
case bs.newBlocks <- blk: case bs.newBlocks <- blk:
// send block off to be reprovided
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
return nil return nil
} }
func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
set := pset.New()
loop:
for {
select {
case peerToQuery, ok := <-peers:
if !ok {
break loop
}
if !set.TryAdd(peerToQuery) { //Do once per peer
continue
}
bs.pm.Send(peerToQuery, m)
case <-ctx.Done():
return nil
}
}
return nil
}
func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
entries := bs.wantlist.Entries()
if len(entries) == 0 {
return nil
}
message := bsmsg.New()
message.SetFull(true)
for _, wanted := range entries {
message.AddEntry(wanted.Key, wanted.Priority)
}
return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
// prepare a channel to hand off to sendWantlistToPeers
sendToPeers := make(chan peer.ID)
// Get providers for all entries in wantlist (could take a while) // Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, e := range entries { for _, e := range entries {
...@@ -298,97 +252,61 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli ...@@ -298,97 +252,61 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
defer cancel() defer cancel()
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers { for prov := range providers {
sendToPeers <- prov go func(p peer.ID) {
bs.network.ConnectTo(ctx, p)
}(prov)
} }
}(e.Key) }(e.Key)
} }
go func() { wg.Wait() // make sure all our children do finish.
wg.Wait() // make sure all our children do finish.
close(sendToPeers)
}()
err := bs.sendWantlistToPeers(ctx, sendToPeers)
if err != nil {
log.Debugf("sendWantlistToPeers error: %s", err)
}
} }
// TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
// This call records changes to wantlists, blocks received, // This call records changes to wantlists, blocks received,
// and number of bytes transfered. // and number of bytes transfered.
bs.engine.MessageReceived(p, incoming) bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused. // TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger // Should only track *useful* messages in ledger
if len(incoming.Blocks()) == 0 {
return
}
// quickly send out cancels, reduces chances of duplicate block receives
var keys []u.Key var keys []u.Key
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
keys = append(keys, block.Key())
}
bs.wm.CancelWants(keys)
for _, block := range incoming.Blocks() {
bs.blocksRecvd++ bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has { if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++ bs.dupBlocksRecvd++
} }
log.Debugf("got block %s from %s", block, p) log.Debugf("got block %s from %s", block, p)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil { if err := bs.HasBlock(hasBlockCtx, block); err != nil {
return fmt.Errorf("ReceiveMessage HasBlock error: %s", err) log.Warningf("ReceiveMessage HasBlock error: %s", err)
} }
cancel() cancel()
keys = append(keys, block.Key())
} }
bs.cancelBlocks(ctx, keys)
return nil
} }
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) { func (bs *Bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker?? // TODO: add to clientWorker??
bs.pm.Connected(p) bs.wm.Connected(p)
peers := make(chan peer.ID, 1)
peers <- p
close(peers)
err := bs.sendWantlistToPeers(context.TODO(), peers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
} }
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) { func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p) bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p) bs.engine.PeerDisconnected(p)
} }
func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
message := bsmsg.New()
message.SetFull(false)
for _, k := range bkeys {
log.Debug("cancel block: %s", k)
message.Cancel(k)
}
bs.pm.Broadcast(message)
return
}
func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
message := bsmsg.New()
message.SetFull(false)
for i, k := range bkeys {
message.AddEntry(k, kMaxPriority-i)
}
bs.pm.Broadcast(message)
}
func (bs *Bitswap) ReceiveError(err error) { func (bs *Bitswap) ReceiveError(err error) {
log.Debugf("Bitswap ReceiveError: %s", err) log.Debugf("Bitswap ReceiveError: %s", err)
// TODO log the network error // TODO log the network error
...@@ -401,7 +319,7 @@ func (bs *Bitswap) Close() error { ...@@ -401,7 +319,7 @@ func (bs *Bitswap) Close() error {
func (bs *Bitswap) GetWantlist() []u.Key { func (bs *Bitswap) GetWantlist() []u.Key {
var out []u.Key var out []u.Key
for _, e := range bs.wantlist.Entries() { for _, e := range bs.wm.wl.Entries() {
out = append(out, e.Key) out = append(out, e.Key)
} }
return out return out
......
...@@ -120,6 +120,16 @@ func TestLargeFile(t *testing.T) { ...@@ -120,6 +120,16 @@ func TestLargeFile(t *testing.T) {
PerformDistributionTest(t, numInstances, numBlocks) PerformDistributionTest(t, numInstances, numBlocks)
} }
func TestLargeFileTwoPeers(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
numInstances := 2
numBlocks := 100
PerformDistributionTest(t, numInstances, numBlocks)
}
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() { if testing.Short() {
t.SkipNow() t.SkipNow()
...@@ -129,8 +139,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -129,8 +139,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
defer sg.Close() defer sg.Close()
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")
instances := sg.Instances(numInstances) instances := sg.Instances(numInstances)
blocks := bg.Blocks(numBlocks) blocks := bg.Blocks(numBlocks)
...@@ -238,7 +246,7 @@ func TestBasicBitswap(t *testing.T) { ...@@ -238,7 +246,7 @@ func TestBasicBitswap(t *testing.T) {
defer sg.Close() defer sg.Close()
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks") t.Log("Test a one node trying to get one block from another")
instances := sg.Instances(2) instances := sg.Instances(2)
blocks := bg.Blocks(1) blocks := bg.Blocks(1)
......
...@@ -92,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ...@@ -92,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
bs: bs, bs: bs,
peerRequestQueue: newPRQ(), peerRequestQueue: newPRQ(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer), outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}), workSignal: make(chan struct{}, 1),
} }
go e.taskWorker(ctx) go e.taskWorker(ctx)
return e return e
...@@ -156,7 +156,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { ...@@ -156,7 +156,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return &Envelope{ return &Envelope{
Peer: nextTask.Target, Peer: nextTask.Target,
Block: block, Block: block,
Sent: nextTask.Done, Sent: func() {
nextTask.Done()
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
// work to be done.
default:
}
},
}, nil }, nil
} }
} }
...@@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { ...@@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() { for _, entry := range m.Wantlist() {
if entry.Cancel { if entry.Cancel {
log.Debugf("cancel %s", entry.Key) log.Errorf("cancel %s", entry.Key)
l.CancelWant(entry.Key) l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p) e.peerRequestQueue.Remove(entry.Key, p)
} else { } else {
log.Debugf("wants %s - %d", entry.Key, entry.Priority) log.Errorf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists { if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p) e.peerRequestQueue.Push(entry.Entry, p)
......
...@@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { ...@@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
tl.partners[to] = partner tl.partners[to] = partner
} }
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
return
}
partner.activelk.Lock() partner.activelk.Lock()
defer partner.activelk.Unlock() defer partner.activelk.Unlock()
_, ok = partner.activeBlocks[entry.Key] _, ok = partner.activeBlocks[entry.Key]
...@@ -64,6 +58,12 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { ...@@ -64,6 +58,12 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
return return
} }
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
return
}
task := &peerRequestTask{ task := &peerRequestTask{
Entry: entry, Entry: entry,
Target: to, Target: to,
...@@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool { ...@@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool {
if pb.requests == 0 { if pb.requests == 0 {
return true return true
} }
if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
return pa.taskQueue.Len() > pb.taskQueue.Len()
}
return pa.active < pb.active return pa.active < pb.active
} }
......
...@@ -33,7 +33,7 @@ type Receiver interface { ...@@ -33,7 +33,7 @@ type Receiver interface {
ReceiveMessage( ReceiveMessage(
ctx context.Context, ctx context.Context,
sender peer.ID, sender peer.ID,
incoming bsmsg.BitSwapMessage) error incoming bsmsg.BitSwapMessage)
ReceiveError(error) ReceiveError(error)
......
...@@ -7,28 +7,36 @@ import ( ...@@ -7,28 +7,36 @@ import (
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
) )
type PeerManager struct { type WantManager struct {
receiver bsnet.Receiver receiver bsnet.Receiver
incoming chan *msgPair incoming chan []*bsmsg.Entry
connect chan peer.ID
// notification channel for new peers connecting
connect chan peer.ID
// notification channel for peers disconnecting
disconnect chan peer.ID disconnect chan peer.ID
peers map[peer.ID]*msgQueue peers map[peer.ID]*msgQueue
wl *wantlist.Wantlist
network bsnet.BitSwapNetwork network bsnet.BitSwapNetwork
} }
func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager { func NewWantManager(network bsnet.BitSwapNetwork) *WantManager {
return &PeerManager{ return &WantManager{
incoming: make(chan *msgPair, 10), incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10), connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10),
peers: make(map[peer.ID]*msgQueue), peers: make(map[peer.ID]*msgQueue),
wl: wantlist.New(),
network: network, network: network,
} }
} }
...@@ -53,37 +61,68 @@ type msgQueue struct { ...@@ -53,37 +61,68 @@ type msgQueue struct {
done chan struct{} done chan struct{}
} }
func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) { func (pm *WantManager) WantBlocks(ks []u.Key) {
log.Error("WANT: ", ks)
pm.addEntries(ks, false)
}
func (pm *WantManager) CancelWants(ks []u.Key) {
log.Error("CANCEL: ", ks)
pm.addEntries(ks, true)
}
func (pm *WantManager) addEntries(ks []u.Key, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.Entry{
Key: k,
Priority: kMaxPriority - i,
},
})
}
pm.incoming <- entries
}
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure // Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack // throughout the network stack
defer env.Sent() defer env.Sent()
msg := bsmsg.New() msg := bsmsg.New()
msg.AddBlock(env.Block) msg.AddBlock(env.Block)
msg.SetFull(false)
err := pm.network.SendMessage(ctx, env.Peer, msg) err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
} }
func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue {
_, ok := pm.peers[p] _, ok := pm.peers[p]
if ok { if ok {
// TODO: log an error? // TODO: log an error?
return nil return nil
} }
mq := new(msgQueue) mq := newMsgQueue(p)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1) // new peer, we will want to give them our full wantlist
mq.p = p fullwantlist := bsmsg.New()
for _, e := range pm.wl.Entries() {
fullwantlist.AddEntry(e.Key, e.Priority)
}
fullwantlist.SetFull(true)
mq.out = fullwantlist
mq.work <- struct{}{}
pm.peers[p] = mq pm.peers[p] = mq
go pm.runQueue(ctx, mq) go pm.runQueue(ctx, mq)
return mq return mq
} }
func (pm *PeerManager) stopPeerHandler(p peer.ID) { func (pm *WantManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peers[p] pq, ok := pm.peers[p]
if !ok { if !ok {
// TODO: log error? // TODO: log error?
...@@ -94,32 +133,38 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { ...@@ -94,32 +133,38 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) {
delete(pm.peers, p) delete(pm.peers, p)
} }
func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) {
for { for {
select { select {
case <-mq.work: // there is work to be done case <-mq.work: // there is work to be done
// TODO: this might not need to be done every time, figure out
// a good heuristic
err := pm.network.ConnectTo(ctx, mq.p) err := pm.network.ConnectTo(ctx, mq.p)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
// TODO: cant connect, what now? // TODO: cant connect, what now?
} }
// grab outgoin message // grab outgoing message
mq.outlk.Lock() mq.outlk.Lock()
wlm := mq.out wlm := mq.out
mq.out = nil mq.out = nil
mq.outlk.Unlock() mq.outlk.Unlock()
if wlm != nil && !wlm.Empty() { // no message or empty message, continue
// send wantlist updates if wlm == nil {
err = pm.network.SendMessage(ctx, mq.p, wlm) log.Error("nil wantlist")
if err != nil { continue
log.Error("bitswap send error: ", err) }
// TODO: what do we do if this fails? if wlm.Empty() {
} log.Error("empty wantlist")
continue
}
// send wantlist updates
err = pm.network.SendMessage(ctx, mq.p, wlm)
if err != nil {
log.Error("bitswap send error: ", err)
// TODO: what do we do if this fails?
} }
case <-mq.done: case <-mq.done:
return return
...@@ -127,46 +172,38 @@ func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { ...@@ -127,46 +172,38 @@ func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) {
} }
} }
func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) { func (pm *WantManager) Connected(p peer.ID) {
if len(msg.Blocks()) > 0 {
panic("no blocks here!")
}
pm.incoming <- &msgPair{to: to, msg: msg}
}
func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) {
pm.incoming <- &msgPair{msg: msg}
}
func (pm *PeerManager) Connected(p peer.ID) {
pm.connect <- p pm.connect <- p
} }
func (pm *PeerManager) Disconnected(p peer.ID) { func (pm *WantManager) Disconnected(p peer.ID) {
pm.disconnect <- p pm.disconnect <- p
} }
// TODO: use goprocess here once i trust it // TODO: use goprocess here once i trust it
func (pm *PeerManager) Run(ctx context.Context) { func (pm *WantManager) Run(ctx context.Context) {
for { for {
select { select {
case msgp := <-pm.incoming: case entries := <-pm.incoming:
// Broadcast message to all if recipient not set msg := bsmsg.New()
if msgp.to == "" { msg.SetFull(false)
for _, p := range pm.peers { // add changes to our wantlist
p.addMessage(msgp.msg) for _, e := range entries {
if e.Cancel {
pm.wl.Remove(e.Key)
msg.Cancel(e.Key)
} else {
pm.wl.Add(e.Key, e.Priority)
msg.AddEntry(e.Key, e.Priority)
} }
continue
} }
p, ok := pm.peers[msgp.to] // broadcast those wantlist changes
if !ok { for _, p := range pm.peers {
//TODO: decide, drop message? or dial? p.addMessage(msg)
p = pm.startPeerHandler(ctx, msgp.to)
} }
p.addMessage(msgp.msg)
case p := <-pm.connect: case p := <-pm.connect:
pm.startPeerHandler(ctx, p) pm.startPeerHandler(ctx, p)
case p := <-pm.disconnect: case p := <-pm.disconnect:
...@@ -177,6 +214,15 @@ func (pm *PeerManager) Run(ctx context.Context) { ...@@ -177,6 +214,15 @@ func (pm *PeerManager) Run(ctx context.Context) {
} }
} }
func newMsgQueue(p peer.ID) *msgQueue {
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.p = p
return mq
}
func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
mq.outlk.Lock() mq.outlk.Lock()
defer func() { defer func() {
...@@ -187,6 +233,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { ...@@ -187,6 +233,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
} }
}() }()
if msg.Full() {
log.Error("GOt FULL MESSAGE")
}
// if we have no message held, or the one we are given is full // if we have no message held, or the one we are given is full
// overwrite the one we are holding // overwrite the one we are holding
if mq.out == nil || msg.Full() { if mq.out == nil || msg.Full() {
...@@ -199,8 +249,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { ...@@ -199,8 +249,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
// one passed in // one passed in
for _, e := range msg.Wantlist() { for _, e := range msg.Wantlist() {
if e.Cancel { if e.Cancel {
log.Error("add message cancel: ", e.Key, mq.p)
mq.out.Cancel(e.Key) mq.out.Cancel(e.Key)
} else { } else {
log.Error("add message want: ", e.Key, mq.p)
mq.out.AddEntry(e.Key, e.Priority) mq.out.AddEntry(e.Key, e.Priority)
} }
} }
......
...@@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder.SetDelegate(lambda(func( responder.SetDelegate(lambda(func(
ctx context.Context, ctx context.Context,
fromWaiter peer.ID, fromWaiter peer.ID,
msgFromWaiter bsmsg.BitSwapMessage) error { msgFromWaiter bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New() msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
waiter.SendMessage(ctx, fromWaiter, msgToWaiter) waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
return nil
})) }))
waiter.SetDelegate(lambda(func( waiter.SetDelegate(lambda(func(
ctx context.Context, ctx context.Context,
fromResponder peer.ID, fromResponder peer.ID,
msgFromResponder bsmsg.BitSwapMessage) error { msgFromResponder bsmsg.BitSwapMessage) {
// TODO assert that this came from the correct peer and that the message contents are as expected // TODO assert that this came from the correct peer and that the message contents are as expected
ok := false ok := false
...@@ -54,9 +52,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -54,9 +52,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
if !ok { if !ok {
t.Fatal("Message not received from the responder") t.Fatal("Message not received from the responder")
} }
return nil
})) }))
messageSentAsync := bsmsg.New() messageSentAsync := bsmsg.New()
...@@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
} }
type receiverFunc func(ctx context.Context, p peer.ID, type receiverFunc func(ctx context.Context, p peer.ID,
incoming bsmsg.BitSwapMessage) error incoming bsmsg.BitSwapMessage)
// lambda returns a Receiver instance given a receiver function // lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver { func lambda(f receiverFunc) bsnet.Receiver {
...@@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver { ...@@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver {
} }
type lambdaImpl struct { type lambdaImpl struct {
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage)
} }
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p peer.ID, incoming bsmsg.BitSwapMessage) error { p peer.ID, incoming bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming) lam.f(ctx, p, incoming)
} }
func (lam *lambdaImpl) ReceiveError(err error) { func (lam *lambdaImpl) ReceiveError(err error) {
......
...@@ -72,7 +72,8 @@ func (n *network) deliver( ...@@ -72,7 +72,8 @@ func (n *network) deliver(
n.delay.Wait() n.delay.Wait()
return r.ReceiveMessage(context.TODO(), from, message) r.ReceiveMessage(context.TODO(), from, message)
return nil
} }
type networkClient struct { type networkClient struct {
......
...@@ -42,9 +42,11 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { ...@@ -42,9 +42,11 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
} }
// Start up a worker to manage periodically resending our wantlist out to peers // Start up a worker to manage periodically resending our wantlist out to peers
px.Go(func(px process.Process) { /*
bs.rebroadcastWorker(ctx) px.Go(func(px process.Process) {
}) bs.rebroadcastWorker(ctx)
})
*/
// Start up a worker to manage sending out provides messages // Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
...@@ -72,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { ...@@ -72,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
continue continue
} }
bs.pm.SendBlock(ctx, envelope) bs.wm.SendBlock(ctx, envelope)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
...@@ -146,30 +148,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) { ...@@ -146,30 +148,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) {
log.Warning("Received batch request for zero blocks") log.Warning("Received batch request for zero blocks")
continue continue
} }
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
done := make(chan struct{}) bs.wm.WantBlocks(keys)
go func() {
bs.wantNewBlocks(req.ctx, keys)
close(done)
}()
// NB: Optimization. Assumes that providers of key[0] are likely to // NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most // be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true. // every situation. Later, this assumption may not hold as true.
child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout) child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers) for p := range providers {
if err != nil { go bs.network.ConnectTo(req.ctx, p)
log.Debugf("error sending wantlist: %s", err)
} }
cancel() cancel()
// Wait for wantNewBlocks to finish
<-done
case <-parent.Done(): case <-parent.Done():
return return
} }
...@@ -180,22 +171,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -180,22 +171,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent) ctx, cancel := context.WithCancel(parent)
defer cancel() defer cancel()
broadcastSignal := time.After(rebroadcastDelay.Get()) broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
tick := time.Tick(10 * time.Second) defer broadcastSignal.Stop()
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for { for {
select { select {
case <-tick: case <-tick.C:
n := bs.wantlist.Len() n := bs.wm.wl.Len()
if n > 0 { if n > 0 {
log.Debug(n, "keys in bitswap wantlist") log.Debug(n, "keys in bitswap wantlist")
} }
case <-broadcastSignal: // resend unfulfilled wantlist keys case <-broadcastSignal.C: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries() entries := bs.wm.wl.Entries()
if len(entries) > 0 { if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries) bs.connectToProviders(ctx, entries)
} }
broadcastSignal = time.After(rebroadcastDelay.Get())
case <-parent.Done(): case <-parent.Done():
return return
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论