Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
G
go-ipfs
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
jihao
go-ipfs
Commits
b1bb16f0
提交
b1bb16f0
authored
7月 07, 2015
作者:
Juan Batiz-Benet
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1455 from ipfs/bitswap-events
add in some events to bitswap to emit worker information
上级
f6f9cae4
ff1bf305
隐藏空白字符变更
内嵌
并排
正在显示
2 个修改的文件
包含
27 行增加
和
5 行删除
+27
-5
bitswap.go
exchange/bitswap/bitswap.go
+9
-1
workers.go
exchange/bitswap/workers.go
+18
-4
没有找到文件。
exchange/bitswap/bitswap.go
浏览文件 @
b1bb16f0
...
@@ -150,7 +150,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, e
...
@@ -150,7 +150,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, e
ctx
,
cancelFunc
:=
context
.
WithCancel
(
parent
)
ctx
,
cancelFunc
:=
context
.
WithCancel
(
parent
)
ctx
=
eventlog
.
ContextWithLoggable
(
ctx
,
eventlog
.
Uuid
(
"GetBlockRequest"
))
ctx
=
eventlog
.
ContextWithLoggable
(
ctx
,
eventlog
.
Uuid
(
"GetBlockRequest"
))
defer
log
.
EventBegin
(
ctx
,
"GetBlockRequest"
,
&
k
)
.
Done
()
log
.
Event
(
ctx
,
"Bitswap.GetBlockRequest.Start"
,
&
k
)
defer
log
.
Event
(
ctx
,
"Bitswap.GetBlockRequest.End"
,
&
k
)
defer
func
()
{
defer
func
()
{
cancelFunc
()
cancelFunc
()
...
@@ -200,6 +201,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
...
@@ -200,6 +201,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
}
}
promise
:=
bs
.
notifications
.
Subscribe
(
ctx
,
keys
...
)
promise
:=
bs
.
notifications
.
Subscribe
(
ctx
,
keys
...
)
for
_
,
k
:=
range
keys
{
log
.
Event
(
ctx
,
"Bitswap.GetBlockRequest.Start"
,
&
k
)
}
bs
.
wm
.
WantBlocks
(
keys
)
bs
.
wm
.
WantBlocks
(
keys
)
req
:=
&
blockRequest
{
req
:=
&
blockRequest
{
...
@@ -310,6 +315,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
...
@@ -310,6 +315,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return
return
}
}
k
:=
b
.
Key
()
log
.
Event
(
ctx
,
"Bitswap.GetBlockRequest.End"
,
&
k
)
log
.
Debugf
(
"got block %s from %s (%d,%d)"
,
b
,
p
,
brecvd
,
bdup
)
log
.
Debugf
(
"got block %s from %s (%d,%d)"
,
b
,
p
,
brecvd
,
bdup
)
hasBlockCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
hasBlockTimeout
)
hasBlockCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
hasBlockTimeout
)
if
err
:=
bs
.
HasBlock
(
hasBlockCtx
,
b
);
err
!=
nil
{
if
err
:=
bs
.
HasBlock
(
hasBlockCtx
,
b
);
err
!=
nil
{
...
...
exchange/bitswap/workers.go
浏览文件 @
b1bb16f0
...
@@ -7,7 +7,9 @@ import (
...
@@ -7,7 +7,9 @@ import (
process
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
process
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
context
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key
"github.com/ipfs/go-ipfs/blocks/key"
key
"github.com/ipfs/go-ipfs/blocks/key"
eventlog
"github.com/ipfs/go-ipfs/thirdparty/eventlog"
)
)
var
TaskWorkerCount
=
8
var
TaskWorkerCount
=
8
...
@@ -36,8 +38,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
...
@@ -36,8 +38,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up workers to handle requests from other nodes for the data on this node
// Start up workers to handle requests from other nodes for the data on this node
for
i
:=
0
;
i
<
TaskWorkerCount
;
i
++
{
for
i
:=
0
;
i
<
TaskWorkerCount
;
i
++
{
i
:=
i
px
.
Go
(
func
(
px
process
.
Process
)
{
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
taskWorker
(
ctx
)
bs
.
taskWorker
(
ctx
,
i
)
})
})
}
}
...
@@ -55,15 +58,18 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
...
@@ -55,15 +58,18 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// consider increasing number if providing blocks bottlenecks
// consider increasing number if providing blocks bottlenecks
// file transfers
// file transfers
for
i
:=
0
;
i
<
provideWorkers
;
i
++
{
for
i
:=
0
;
i
<
provideWorkers
;
i
++
{
i
:=
i
px
.
Go
(
func
(
px
process
.
Process
)
{
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
provideWorker
(
ctx
)
bs
.
provideWorker
(
ctx
,
i
)
})
})
}
}
}
}
func
(
bs
*
Bitswap
)
taskWorker
(
ctx
context
.
Context
)
{
func
(
bs
*
Bitswap
)
taskWorker
(
ctx
context
.
Context
,
id
int
)
{
idmap
:=
eventlog
.
LoggableMap
{
"ID"
:
id
}
defer
log
.
Info
(
"bitswap task worker shutting down..."
)
defer
log
.
Info
(
"bitswap task worker shutting down..."
)
for
{
for
{
log
.
Event
(
ctx
,
"Bitswap.TaskWorker.Loop"
,
idmap
)
select
{
select
{
case
nextEnvelope
:=
<-
bs
.
engine
.
Outbox
()
:
case
nextEnvelope
:=
<-
bs
.
engine
.
Outbox
()
:
select
{
select
{
...
@@ -71,6 +77,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
...
@@ -71,6 +77,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
if
!
ok
{
if
!
ok
{
continue
continue
}
}
log
.
Event
(
ctx
,
"Bitswap.TaskWorker.Work"
,
eventlog
.
LoggableMap
{
"ID"
:
id
,
"Target"
:
envelope
.
Peer
.
Pretty
(),
"Block"
:
envelope
.
Block
.
Multihash
.
B58String
()})
bs
.
wm
.
SendBlock
(
ctx
,
envelope
)
bs
.
wm
.
SendBlock
(
ctx
,
envelope
)
case
<-
ctx
.
Done
()
:
case
<-
ctx
.
Done
()
:
...
@@ -82,10 +89,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
...
@@ -82,10 +89,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
}
}
}
}
func
(
bs
*
Bitswap
)
provideWorker
(
ctx
context
.
Context
)
{
func
(
bs
*
Bitswap
)
provideWorker
(
ctx
context
.
Context
,
id
int
)
{
idmap
:=
eventlog
.
LoggableMap
{
"ID"
:
id
}
for
{
for
{
log
.
Event
(
ctx
,
"Bitswap.ProvideWorker.Loop"
,
idmap
)
select
{
select
{
case
k
,
ok
:=
<-
bs
.
provideKeys
:
case
k
,
ok
:=
<-
bs
.
provideKeys
:
log
.
Event
(
ctx
,
"Bitswap.ProvideWorker.Work"
,
idmap
,
&
k
)
if
!
ok
{
if
!
ok
{
log
.
Debug
(
"provideKeys channel closed"
)
log
.
Debug
(
"provideKeys channel closed"
)
return
return
...
@@ -139,6 +149,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
...
@@ -139,6 +149,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
defer
log
.
Info
(
"bitswap client worker shutting down..."
)
defer
log
.
Info
(
"bitswap client worker shutting down..."
)
for
{
for
{
log
.
Event
(
parent
,
"Bitswap.ProviderConnector.Loop"
)
select
{
select
{
case
req
:=
<-
bs
.
findKeys
:
case
req
:=
<-
bs
.
findKeys
:
keys
:=
req
.
keys
keys
:=
req
.
keys
...
@@ -146,6 +157,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
...
@@ -146,6 +157,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
log
.
Warning
(
"Received batch request for zero blocks"
)
log
.
Warning
(
"Received batch request for zero blocks"
)
continue
continue
}
}
log
.
Event
(
parent
,
"Bitswap.ProviderConnector.Work"
,
eventlog
.
LoggableMap
{
"Keys"
:
keys
})
// NB: Optimization. Assumes that providers of key[0] are likely to
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// be able to provide for all keys. This currently holds true in most
...
@@ -174,6 +186,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
...
@@ -174,6 +186,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
defer
tick
.
Stop
()
defer
tick
.
Stop
()
for
{
for
{
log
.
Event
(
ctx
,
"Bitswap.Rebroadcast.idle"
)
select
{
select
{
case
<-
tick
.
C
:
case
<-
tick
.
C
:
n
:=
bs
.
wm
.
wl
.
Len
()
n
:=
bs
.
wm
.
wl
.
Len
()
...
@@ -181,6 +194,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
...
@@ -181,6 +194,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
log
.
Debug
(
n
,
"keys in bitswap wantlist"
)
log
.
Debug
(
n
,
"keys in bitswap wantlist"
)
}
}
case
<-
broadcastSignal
.
C
:
// resend unfulfilled wantlist keys
case
<-
broadcastSignal
.
C
:
// resend unfulfilled wantlist keys
log
.
Event
(
ctx
,
"Bitswap.Rebroadcast.active"
)
entries
:=
bs
.
wm
.
wl
.
Entries
()
entries
:=
bs
.
wm
.
wl
.
Entries
()
if
len
(
entries
)
>
0
{
if
len
(
entries
)
>
0
{
bs
.
connectToProviders
(
ctx
,
entries
)
bs
.
connectToProviders
(
ctx
,
entries
)
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论