提交 447f1efb 作者: Jeromy Johnson

Merge pull request #1009 from ipfs/refactor/taskqueue

refactor task queue to have queues per peer
...@@ -55,6 +55,9 @@ type Envelope struct { ...@@ -55,6 +55,9 @@ type Envelope struct {
Peer peer.ID Peer peer.ID
// Message is the payload // Message is the payload
Message bsmsg.BitSwapMessage Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
} }
type Engine struct { type Engine struct {
...@@ -132,12 +135,19 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { ...@@ -132,12 +135,19 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
block, err := e.bs.Get(nextTask.Entry.Key) block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil { if err != nil {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
continue continue
} }
m := bsmsg.New() // TODO: maybe add keys from our wantlist? m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block) m.AddBlock(block)
return &Envelope{Peer: nextTask.Target, Message: m}, nil return &Envelope{
Peer: nextTask.Target,
Message: m,
Sent: nextTask.Done,
}, nil
} }
} }
......
...@@ -21,53 +21,84 @@ type peerRequestQueue interface { ...@@ -21,53 +21,84 @@ type peerRequestQueue interface {
func newPRQ() peerRequestQueue { func newPRQ() peerRequestQueue {
return &prq{ return &prq{
taskMap: make(map[string]*peerRequestTask), taskMap: make(map[string]*peerRequestTask),
taskQueue: pq.New(wrapCmp(V1)), partners: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
} }
} }
// verify interface implementation
var _ peerRequestQueue = &prq{} var _ peerRequestQueue = &prq{}
// TODO: at some point, the strategy needs to plug in here // TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select // to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy. // tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct { type prq struct {
lock sync.Mutex lock sync.Mutex
taskQueue pq.PQ pQueue pq.PQ
taskMap map[string]*peerRequestTask taskMap map[string]*peerRequestTask
partners map[peer.ID]*activePartner
} }
// Push currently adds a new peerRequestTask to the end of the list // Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock() tl.lock.Lock()
defer tl.lock.Unlock() defer tl.lock.Unlock()
partner, ok := tl.partners[to]
if !ok {
partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))}
tl.pQueue.Push(partner)
tl.partners[to] = partner
}
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority task.Entry.Priority = entry.Priority
tl.taskQueue.Update(task.index) partner.taskQueue.Update(task.index)
return return
} }
task := &peerRequestTask{ task := &peerRequestTask{
Entry: entry, Entry: entry,
Target: to, Target: to,
created: time.Now(), created: time.Now(),
Done: func() {
partner.TaskDone()
tl.lock.Lock()
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
} }
tl.taskQueue.Push(task)
partner.taskQueue.Push(task)
tl.taskMap[task.Key()] = task tl.taskMap[task.Key()] = task
partner.requests++
tl.pQueue.Update(partner.Index())
} }
// Pop 'pops' the next task to be performed. Returns nil if no task exists. // Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask { func (tl *prq) Pop() *peerRequestTask {
tl.lock.Lock() tl.lock.Lock()
defer tl.lock.Unlock() defer tl.lock.Unlock()
if tl.pQueue.Len() == 0 {
return nil
}
partner := tl.pQueue.Pop().(*activePartner)
var out *peerRequestTask var out *peerRequestTask
for tl.taskQueue.Len() > 0 { for partner.taskQueue.Len() > 0 {
out = tl.taskQueue.Pop().(*peerRequestTask) out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key()) delete(tl.taskMap, out.Key())
if out.trash { if out.trash {
out = nil
continue // discarding tasks that have been removed continue // discarding tasks that have been removed
} }
partner.StartTask()
partner.requests--
break // and return |out| break // and return |out|
} }
tl.pQueue.Push(partner)
return out return out
} }
...@@ -80,13 +111,19 @@ func (tl *prq) Remove(k u.Key, p peer.ID) { ...@@ -80,13 +111,19 @@ func (tl *prq) Remove(k u.Key, p peer.ID) {
// simply mark it as trash, so it'll be dropped when popped off the // simply mark it as trash, so it'll be dropped when popped off the
// queue. // queue.
t.trash = true t.trash = true
// having canceled a block, we now account for that in the given partner
tl.partners[p].requests--
} }
tl.lock.Unlock() tl.lock.Unlock()
} }
type peerRequestTask struct { type peerRequestTask struct {
Entry wantlist.Entry Entry wantlist.Entry
Target peer.ID // required Target peer.ID
// A callback to signal that this task has been completed
Done func()
// trash in a book-keeping field // trash in a book-keeping field
trash bool trash bool
...@@ -100,10 +137,12 @@ func (t *peerRequestTask) Key() string { ...@@ -100,10 +137,12 @@ func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Key) return taskKey(t.Target, t.Entry.Key)
} }
// Index implements pq.Elem
func (t *peerRequestTask) Index() int { func (t *peerRequestTask) Index() int {
return t.index return t.index
} }
// SetIndex implements pq.Elem
func (t *peerRequestTask) SetIndex(i int) { func (t *peerRequestTask) SetIndex(i int) {
t.index = i t.index = i
} }
...@@ -132,3 +171,65 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool { ...@@ -132,3 +171,65 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
return f(a.(*peerRequestTask), b.(*peerRequestTask)) return f(a.(*peerRequestTask), b.(*peerRequestTask))
} }
} }
type activePartner struct {
// Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally
activelk sync.Mutex
active int
// requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
// the peerRequestQueue's locks
requests int
// for the PQ interface
index int
// priority queue of tasks belonging to this peer
taskQueue pq.PQ
}
// partnerCompare implements pq.ElemComparator
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
pb := b.(*activePartner)
// having no blocks in their wantlist means lowest priority
// having both of these checks ensures stability of the sort
if pa.requests == 0 {
return false
}
if pb.requests == 0 {
return true
}
return pa.active < pb.active
}
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask() {
p.activelk.Lock()
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone() {
p.activelk.Lock()
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
}
p.activelk.Unlock()
}
// Index implements pq.Elem
func (p *activePartner) Index() int {
return p.index
}
// SetIndex implements pq.Elem
func (p *activePartner) SetIndex(i int) {
p.index = i
}
...@@ -47,10 +47,73 @@ func TestPushPop(t *testing.T) { ...@@ -47,10 +47,73 @@ func TestPushPop(t *testing.T) {
prq.Remove(util.Key(consonant), partner) prq.Remove(util.Key(consonant), partner)
} }
for _, expected := range vowels { var out []string
received := prq.Pop().Entry.Key for {
if received != util.Key(expected) { received := prq.Pop()
t.Fatal("received", string(received), "expected", string(expected)) if received == nil {
break
}
out = append(out, string(received.Entry.Key))
}
// Entries popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
t.Fatal("received", out[i], "expected", expected)
}
}
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
prq := newPRQ()
a := testutil.RandPeerIDFatal(t)
b := testutil.RandPeerIDFatal(t)
c := testutil.RandPeerIDFatal(t)
d := testutil.RandPeerIDFatal(t)
// Have each push some blocks
for i := 0; i < 5; i++ {
prq.Push(wantlist.Entry{Key: util.Key(i)}, a)
prq.Push(wantlist.Entry{Key: util.Key(i)}, b)
prq.Push(wantlist.Entry{Key: util.Key(i)}, c)
prq.Push(wantlist.Entry{Key: util.Key(i)}, d)
}
// now, pop off four entries, there should be one from each
var targets []string
var tasks []*peerRequestTask
for i := 0; i < 4; i++ {
t := prq.Pop()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}
expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
for i := 0; i < 4; i++ {
// its okay to mark the same task done multiple times here (JUST FOR TESTING)
tasks[i].Done()
ntask := prq.Pop()
if ntask.Target != tasks[i].Target {
t.Fatal("Expected task from peer with lowest active count")
}
} }
} }
} }
...@@ -51,6 +51,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { ...@@ -51,6 +51,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
} }
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message) bs.send(ctx, envelope.Peer, envelope.Message)
envelope.Sent()
case <-ctx.Done(): case <-ctx.Done():
return return
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论