Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
G
go-ipfs
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
jihao
go-ipfs
Commits
eca93bc9
Unverified
提交
eca93bc9
authored
7月 03, 2019
作者:
Steven Allen
提交者:
GitHub
7月 03, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #6480 from ipfs/bugs/node-construction-queue-error
Fix node construction queue error
上级
8e5ea5f2
143e4157
隐藏空白字符变更
内嵌
并排
正在显示
2 个修改的文件
包含
38 行增加
和
129 行删除
+38
-129
queue.go
provider/queue/queue.go
+30
-93
queue_test.go
provider/queue/queue_test.go
+8
-36
没有找到文件。
provider/queue/queue.go
浏览文件 @
eca93bc9
...
...
@@ -3,8 +3,7 @@ package queue
import
(
"context"
"fmt"
"strconv"
"strings"
"time"
cid
"github.com/ipfs/go-cid"
datastore
"github.com/ipfs/go-datastore"
...
...
@@ -25,8 +24,6 @@ type Queue struct {
// e.g. provider vs reprovider
name
string
ctx
context
.
Context
tail
uint64
head
uint64
ds
datastore
.
Datastore
// Must be threadsafe
dequeue
chan
cid
.
Cid
enqueue
chan
cid
.
Cid
...
...
@@ -37,16 +34,10 @@ type Queue struct {
// NewQueue creates a queue for cids
func
NewQueue
(
ctx
context
.
Context
,
name
string
,
ds
datastore
.
Datastore
)
(
*
Queue
,
error
)
{
namespaced
:=
namespace
.
Wrap
(
ds
,
datastore
.
NewKey
(
"/"
+
name
+
"/queue/"
))
head
,
tail
,
err
:=
getQueueHeadTail
(
ctx
,
namespaced
)
if
err
!=
nil
{
return
nil
,
err
}
cancelCtx
,
cancel
:=
context
.
WithCancel
(
ctx
)
q
:=
&
Queue
{
name
:
name
,
ctx
:
cancelCtx
,
head
:
head
,
tail
:
tail
,
ds
:
namespaced
,
dequeue
:
make
(
chan
cid
.
Cid
),
enqueue
:
make
(
chan
cid
.
Cid
),
...
...
@@ -77,41 +68,6 @@ func (q *Queue) Dequeue() <-chan cid.Cid {
return
q
.
dequeue
}
// Look for next Cid in the queue and return it. Skip over gaps and mangled data
func
(
q
*
Queue
)
nextEntry
()
(
datastore
.
Key
,
cid
.
Cid
)
{
for
{
if
q
.
head
>=
q
.
tail
{
return
datastore
.
Key
{},
cid
.
Undef
}
key
:=
q
.
queueKey
(
q
.
head
)
value
,
err
:=
q
.
ds
.
Get
(
key
)
if
err
!=
nil
{
if
err
==
datastore
.
ErrNotFound
{
log
.
Warningf
(
"Error missing entry in queue: %s"
,
key
)
}
else
{
log
.
Errorf
(
"Error fetching from queue: %s"
,
err
)
}
q
.
head
++
// move on
continue
}
c
,
err
:=
cid
.
Parse
(
value
)
if
err
!=
nil
{
log
.
Warningf
(
"Error marshalling Cid from queue: "
,
err
)
q
.
head
++
err
=
q
.
ds
.
Delete
(
key
)
if
err
!=
nil
{
log
.
Warningf
(
"Provider queue failed to delete: %s"
,
key
)
}
continue
}
return
key
,
c
}
}
// Run dequeues and enqueues when available.
func
(
q
*
Queue
)
work
()
{
go
func
()
{
...
...
@@ -124,7 +80,26 @@ func (q *Queue) work() {
for
{
if
c
==
cid
.
Undef
{
k
,
c
=
q
.
nextEntry
()
head
,
e
:=
q
.
getQueueHead
()
if
e
!=
nil
{
log
.
Errorf
(
"error querying for head of queue: %s, stopping provider"
,
e
)
return
}
else
if
head
!=
nil
{
k
=
datastore
.
NewKey
(
head
.
Key
)
c
,
e
=
cid
.
Parse
(
head
.
Value
)
if
e
!=
nil
{
log
.
Warningf
(
"error parsing queue entry cid with key (%s), removing it from queue: %s"
,
head
.
Key
,
e
)
err
:=
q
.
ds
.
Delete
(
k
)
if
err
!=
nil
{
log
.
Errorf
(
"error deleting queue entry with key (%s), due to error (%s), stopping provider"
,
head
.
Key
,
err
)
return
}
continue
}
}
else
{
c
=
cid
.
Undef
}
}
// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
...
...
@@ -135,14 +110,13 @@ func (q *Queue) work() {
select
{
case
toQueue
:=
<-
q
.
enqueue
:
nextKey
:=
q
.
queueKey
(
q
.
tail
)
keyPath
:=
fmt
.
Sprintf
(
"%d/%s"
,
time
.
Now
()
.
UnixNano
(),
c
.
String
())
nextKey
:=
datastore
.
NewKey
(
keyPath
)
if
err
:=
q
.
ds
.
Put
(
nextKey
,
toQueue
.
Bytes
());
err
!=
nil
{
log
.
Errorf
(
"Failed to enqueue cid: %s"
,
err
)
continue
}
q
.
tail
++
case
dequeue
<-
c
:
err
:=
q
.
ds
.
Delete
(
k
)
...
...
@@ -151,7 +125,6 @@ func (q *Queue) work() {
continue
}
c
=
cid
.
Undef
q
.
head
++
case
<-
q
.
ctx
.
Done
()
:
return
}
...
...
@@ -159,53 +132,17 @@ func (q *Queue) work() {
}()
}
func
(
q
*
Queue
)
queueKey
(
id
uint64
)
datastore
.
Key
{
s
:=
fmt
.
Sprintf
(
"%016X"
,
id
)
return
datastore
.
NewKey
(
s
)
}
func
getQueueHeadTail
(
ctx
context
.
Context
,
datastore
datastore
.
Datastore
)
(
uint64
,
uint64
,
error
)
{
head
,
err
:=
getQueueHead
(
datastore
)
if
err
!=
nil
{
return
0
,
0
,
err
}
tail
,
err
:=
getQueueTail
(
datastore
)
if
err
!=
nil
{
return
0
,
0
,
err
}
return
head
,
tail
,
nil
}
func
getQueueHead
(
ds
datastore
.
Datastore
)
(
uint64
,
error
)
{
return
getFirstIDByOrder
(
ds
,
query
.
OrderByKey
{})
}
func
getQueueTail
(
ds
datastore
.
Datastore
)
(
uint64
,
error
)
{
tail
,
err
:=
getFirstIDByOrder
(
ds
,
query
.
OrderByKeyDescending
{})
func
(
q
*
Queue
)
getQueueHead
()
(
*
query
.
Result
,
error
)
{
qry
:=
query
.
Query
{
Orders
:
[]
query
.
Order
{
query
.
OrderByKey
{}},
Limit
:
1
}
results
,
err
:=
q
.
ds
.
Query
(
qry
)
if
err
!=
nil
{
return
0
,
err
}
if
tail
>
0
{
tail
++
}
return
tail
,
nil
}
func
getFirstIDByOrder
(
ds
datastore
.
Datastore
,
order
query
.
Order
)
(
uint64
,
error
)
{
q
:=
query
.
Query
{
Orders
:
[]
query
.
Order
{
order
}}
results
,
err
:=
ds
.
Query
(
q
)
if
err
!=
nil
{
return
0
,
err
return
nil
,
err
}
defer
results
.
Close
()
r
,
ok
:=
results
.
NextSync
()
if
!
ok
{
return
0
,
nil
}
trimmed
:=
strings
.
TrimPrefix
(
r
.
Key
,
"/"
)
id
,
err
:=
strconv
.
ParseUint
(
trimmed
,
16
,
64
)
if
err
!=
nil
{
return
0
,
err
return
nil
,
nil
}
return
id
,
nil
return
&
r
,
nil
}
provider/queue/queue_test.go
浏览文件 @
eca93bc9
...
...
@@ -5,9 +5,9 @@ import (
"testing"
"time"
cid
"github.com/ipfs/go-cid"
datastore
"github.com/ipfs/go-datastore"
sync
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs-blocksutil"
)
...
...
@@ -55,36 +55,6 @@ func TestBasicOperation(t *testing.T) {
assertOrdered
(
cids
,
queue
,
t
)
}
func
TestSparseDatastore
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
defer
ctx
.
Done
()
ds
:=
sync
.
MutexWrap
(
datastore
.
NewMapDatastore
())
queue
,
err
:=
NewQueue
(
ctx
,
"test"
,
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
cids
:=
makeCids
(
10
)
for
_
,
c
:=
range
cids
{
queue
.
Enqueue
(
c
)
}
// remove entries in the middle
err
=
queue
.
ds
.
Delete
(
queue
.
queueKey
(
5
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
err
=
queue
.
ds
.
Delete
(
queue
.
queueKey
(
6
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
expected
:=
append
(
cids
[
:
5
],
cids
[
7
:
]
...
)
assertOrdered
(
expected
,
queue
,
t
)
}
func
TestMangledData
(
t
*
testing
.
T
)
{
ctx
:=
context
.
Background
()
defer
ctx
.
Done
()
...
...
@@ -100,13 +70,15 @@ func TestMangledData(t *testing.T) {
queue
.
Enqueue
(
c
)
}
// remove entries in the middle
err
=
queue
.
ds
.
Put
(
queue
.
queueKey
(
5
),
[]
byte
(
"borked"
))
// put bad data in the queue
queueKey
:=
datastore
.
NewKey
(
"/test/0"
)
err
=
queue
.
ds
.
Put
(
queueKey
,
[]
byte
(
"borked"
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
expected
:=
append
(
cids
[
:
5
],
cids
[
6
:
]
...
)
// expect to only see the valid cids we entered
expected
:=
cids
assertOrdered
(
expected
,
queue
,
t
)
}
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论