提交 d765c14e 作者: Jeromy

refactor task queue to have queues per peer

上级 9c3f6570
...@@ -55,6 +55,9 @@ type Envelope struct { ...@@ -55,6 +55,9 @@ type Envelope struct {
Peer peer.ID Peer peer.ID
// Message is the payload // Message is the payload
Message bsmsg.BitSwapMessage Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
} }
type Engine struct { type Engine struct {
...@@ -137,7 +140,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { ...@@ -137,7 +140,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
m := bsmsg.New() // TODO: maybe add keys from our wantlist? m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block) m.AddBlock(block)
return &Envelope{Peer: nextTask.Target, Message: m}, nil return &Envelope{
Peer: nextTask.Target,
Message: m,
Sent: nextTask.Done,
}, nil
} }
} }
......
...@@ -22,7 +22,8 @@ type peerRequestQueue interface { ...@@ -22,7 +22,8 @@ type peerRequestQueue interface {
func newPRQ() peerRequestQueue { func newPRQ() peerRequestQueue {
return &prq{ return &prq{
taskMap: make(map[string]*peerRequestTask), taskMap: make(map[string]*peerRequestTask),
taskQueue: pq.New(wrapCmp(V1)), partners: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
} }
} }
...@@ -33,41 +34,72 @@ var _ peerRequestQueue = &prq{} ...@@ -33,41 +34,72 @@ var _ peerRequestQueue = &prq{}
// tasks (on getnext). For now, we are assuming a dumb/nice strategy. // tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct { type prq struct {
lock sync.Mutex lock sync.Mutex
taskQueue pq.PQ pQueue pq.PQ
taskMap map[string]*peerRequestTask taskMap map[string]*peerRequestTask
partners map[peer.ID]*activePartner
} }
// Push currently adds a new peerRequestTask to the end of the list // Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock() tl.lock.Lock()
defer tl.lock.Unlock() defer tl.lock.Unlock()
partner, ok := tl.partners[to]
if !ok {
partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))}
tl.pQueue.Push(partner)
tl.partners[to] = partner
}
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority task.Entry.Priority = entry.Priority
tl.taskQueue.Update(task.index) partner.taskQueue.Update(task.index)
return return
} }
task := &peerRequestTask{ task := &peerRequestTask{
Entry: entry, Entry: entry,
Target: to, Target: to,
created: time.Now(), created: time.Now(),
Done: func() {
partner.TaskDone()
tl.lock.Lock()
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
} }
tl.taskQueue.Push(task)
partner.taskQueue.Push(task)
tl.taskMap[task.Key()] = task tl.taskMap[task.Key()] = task
partner.requests++
tl.pQueue.Update(partner.Index())
} }
// Pop 'pops' the next task to be performed. Returns nil if no task exists. // Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask { func (tl *prq) Pop() *peerRequestTask {
tl.lock.Lock() tl.lock.Lock()
defer tl.lock.Unlock() defer tl.lock.Unlock()
if tl.pQueue.Len() == 0 {
return nil
}
pElem := tl.pQueue.Pop()
if pElem == nil {
return nil
}
partner := pElem.(*activePartner)
var out *peerRequestTask var out *peerRequestTask
for tl.taskQueue.Len() > 0 { for partner.taskQueue.Len() > 0 {
out = tl.taskQueue.Pop().(*peerRequestTask) out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key()) delete(tl.taskMap, out.Key())
if out.trash { if out.trash {
continue // discarding tasks that have been removed continue // discarding tasks that have been removed
} }
break // and return |out| break // and return |out|
} }
partner.StartTask()
partner.requests--
tl.pQueue.Push(partner)
return out return out
} }
...@@ -80,13 +112,16 @@ func (tl *prq) Remove(k u.Key, p peer.ID) { ...@@ -80,13 +112,16 @@ func (tl *prq) Remove(k u.Key, p peer.ID) {
// simply mark it as trash, so it'll be dropped when popped off the // simply mark it as trash, so it'll be dropped when popped off the
// queue. // queue.
t.trash = true t.trash = true
tl.partners[p].requests--
} }
tl.lock.Unlock() tl.lock.Unlock()
} }
type peerRequestTask struct { type peerRequestTask struct {
Entry wantlist.Entry Entry wantlist.Entry
Target peer.ID // required Target peer.ID
Done func()
// trash in a book-keeping field // trash in a book-keeping field
trash bool trash bool
...@@ -132,3 +167,55 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool { ...@@ -132,3 +167,55 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
return f(a.(*peerRequestTask), b.(*peerRequestTask)) return f(a.(*peerRequestTask), b.(*peerRequestTask))
} }
} }
type activePartner struct {
lk sync.Mutex
// Active is the number of blocks this peer is currently being sent
active int
// requests is the number of blocks this peer is currently requesting
requests int
index int
// priority queue of
taskQueue pq.PQ
}
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
pb := b.(*activePartner)
// having no blocks in their wantlist means lowest priority
if pa.requests == 0 {
return false
}
if pb.requests == 0 {
return true
}
return pa.active < pb.active
}
func (p *activePartner) StartTask() {
p.lk.Lock()
p.active++
p.lk.Unlock()
}
func (p *activePartner) TaskDone() {
p.lk.Lock()
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
}
p.lk.Unlock()
}
func (p *activePartner) Index() int {
return p.index
}
func (p *activePartner) SetIndex(i int) {
p.index = i
}
...@@ -47,10 +47,68 @@ func TestPushPop(t *testing.T) { ...@@ -47,10 +47,68 @@ func TestPushPop(t *testing.T) {
prq.Remove(util.Key(consonant), partner) prq.Remove(util.Key(consonant), partner)
} }
for _, expected := range vowels { var out []string
received := prq.Pop().Entry.Key for {
if received != util.Key(expected) { received := prq.Pop()
t.Fatal("received", string(received), "expected", string(expected)) if received == nil {
break
}
out = append(out, string(received.Entry.Key))
}
// Entries popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
t.Fatal("received", out[i], "expected", expected)
}
}
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
prq := newPRQ()
a := testutil.RandPeerIDFatal(t)
b := testutil.RandPeerIDFatal(t)
c := testutil.RandPeerIDFatal(t)
d := testutil.RandPeerIDFatal(t)
// Have each push some blocks
for i := 0; i < 5; i++ {
prq.Push(wantlist.Entry{Key: util.Key(i)}, a)
prq.Push(wantlist.Entry{Key: util.Key(i)}, b)
prq.Push(wantlist.Entry{Key: util.Key(i)}, c)
prq.Push(wantlist.Entry{Key: util.Key(i)}, d)
}
// now, pop off four entries, there should be one from each
var targets []string
var tasks []*peerRequestTask
for i := 0; i < 4; i++ {
t := prq.Pop()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}
expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
} }
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
tasks[0].Done()
ntask := prq.Pop()
if ntask.Target != tasks[0].Target {
t.Fatal("Expected task from peer with lowest active count")
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论