提交 198aa195 作者: Brian Tiger Chow 提交者: Juan Batiz-Benet

it's not a queue yet but it's okay to name it as such

License: MIT
Signed-off-by: 's avatarBrian Tiger Chow <brian@perfmode.com>
上级 7280aac8
...@@ -26,9 +26,9 @@ type LedgerManager struct { ...@@ -26,9 +26,9 @@ type LedgerManager struct {
lock sync.RWMutex lock sync.RWMutex
ledgerMap ledgerMap ledgerMap ledgerMap
bs bstore.Blockstore bs bstore.Blockstore
// FIXME tasklist isn't threadsafe nor is it protected by a mutex. consider // FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider
// a way to avoid sharing the tasklist between the worker and the receiver // a way to avoid sharing the taskqueue between the worker and the receiver
tasklist *taskList taskqueue *taskQueue
outbox chan Envelope outbox chan Envelope
workSignal chan struct{} workSignal chan struct{}
} }
...@@ -37,7 +37,7 @@ func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager ...@@ -37,7 +37,7 @@ func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager
lm := &LedgerManager{ lm := &LedgerManager{
ledgerMap: make(ledgerMap), ledgerMap: make(ledgerMap),
bs: bs, bs: bs,
tasklist: newTaskList(), taskqueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}), workSignal: make(chan struct{}),
} }
...@@ -47,7 +47,7 @@ func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager ...@@ -47,7 +47,7 @@ func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager
func (lm *LedgerManager) taskWorker(ctx context.Context) { func (lm *LedgerManager) taskWorker(ctx context.Context) {
for { for {
nextTask := lm.tasklist.Pop() nextTask := lm.taskqueue.Pop()
if nextTask == nil { if nextTask == nil {
// No tasks in the list? // No tasks in the list?
// Wait until there are! // Wait until there are!
...@@ -124,11 +124,11 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er ...@@ -124,11 +124,11 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er
for _, e := range m.Wantlist() { for _, e := range m.Wantlist() {
if e.Cancel { if e.Cancel {
l.CancelWant(e.Key) l.CancelWant(e.Key)
lm.tasklist.Cancel(e.Key, p) lm.taskqueue.Cancel(e.Key, p)
} else { } else {
l.Wants(e.Key, e.Priority) l.Wants(e.Key, e.Priority)
newWorkExists = true newWorkExists = true
lm.tasklist.Push(e.Key, e.Priority, p) lm.taskqueue.Push(e.Key, e.Priority, p)
} }
} }
...@@ -138,7 +138,7 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er ...@@ -138,7 +138,7 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er
for _, l := range lm.ledgerMap { for _, l := range lm.ledgerMap {
if l.WantListContains(block.Key()) { if l.WantListContains(block.Key()) {
newWorkExists = true newWorkExists = true
lm.tasklist.Push(block.Key(), 1, l.Partner) lm.taskqueue.Push(block.Key(), 1, l.Partner)
} }
} }
} }
...@@ -159,7 +159,7 @@ func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error ...@@ -159,7 +159,7 @@ func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error
for _, block := range m.Blocks() { for _, block := range m.Blocks() {
l.SentBytes(len(block.Data)) l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key()) l.wantList.Remove(block.Key())
lm.tasklist.Cancel(block.Key(), p) lm.taskqueue.Cancel(block.Key(), p)
} }
return nil return nil
......
...@@ -8,13 +8,13 @@ import ( ...@@ -8,13 +8,13 @@ import (
// TODO: at some point, the strategy needs to plug in here // TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select // to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy. // tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type taskList struct { type taskQueue struct {
tasks []*Task tasks []*Task
taskmap map[string]*Task taskmap map[string]*Task
} }
func newTaskList() *taskList { func newTaskQueue() *taskQueue {
return &taskList{ return &taskQueue{
taskmap: make(map[string]*Task), taskmap: make(map[string]*Task),
} }
} }
...@@ -27,7 +27,7 @@ type Task struct { ...@@ -27,7 +27,7 @@ type Task struct {
// Push currently adds a new task to the end of the list // Push currently adds a new task to the end of the list
// TODO: make this into a priority queue // TODO: make this into a priority queue
func (tl *taskList) Push(block u.Key, priority int, to peer.Peer) { func (tl *taskQueue) Push(block u.Key, priority int, to peer.Peer) {
if task, ok := tl.taskmap[taskKey(to, block)]; ok { if task, ok := tl.taskmap[taskKey(to, block)]; ok {
// TODO: when priority queue is implemented, // TODO: when priority queue is implemented,
// rearrange this Task // rearrange this Task
...@@ -44,7 +44,7 @@ func (tl *taskList) Push(block u.Key, priority int, to peer.Peer) { ...@@ -44,7 +44,7 @@ func (tl *taskList) Push(block u.Key, priority int, to peer.Peer) {
} }
// Pop 'pops' the next task to be performed. Returns nil no task exists. // Pop 'pops' the next task to be performed. Returns nil no task exists.
func (tl *taskList) Pop() *Task { func (tl *taskQueue) Pop() *Task {
var out *Task var out *Task
for len(tl.tasks) > 0 { for len(tl.tasks) > 0 {
// TODO: instead of zero, use exponential distribution // TODO: instead of zero, use exponential distribution
...@@ -63,7 +63,7 @@ func (tl *taskList) Pop() *Task { ...@@ -63,7 +63,7 @@ func (tl *taskList) Pop() *Task {
} }
// Cancel lazily cancels the sending of a block to a given peer // Cancel lazily cancels the sending of a block to a given peer
func (tl *taskList) Cancel(k u.Key, p peer.Peer) { func (tl *taskQueue) Cancel(k u.Key, p peer.Peer) {
t, ok := tl.taskmap[taskKey(p, k)] t, ok := tl.taskmap[taskKey(p, k)]
if ok { if ok {
t.theirPriority = -1 t.theirPriority = -1
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论