Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
G
go-ipfs
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
jihao
go-ipfs
Commits
b84cbec2
提交
b84cbec2
authored
5月 05, 2016
作者:
Kevin Atkinson
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Make blocks.Block an interface.
License: MIT Signed-off-by:
Kevin Atkinson
<
k@kevina.org
>
上级
fbf745fb
隐藏空白字符变更
内嵌
并排
正在显示
22 个修改的文件
包含
114 行增加
和
98 行删除
+114
-98
blocks.go
blocks/blocks.go
+27
-11
blockstore.go
blocks/blockstore/blockstore.go
+8
-8
blockstore_test.go
blocks/blockstore/blockstore_test.go
+1
-1
write_cache.go
blocks/blockstore/write_cache.go
+4
-4
block_generator.go
blocks/blocksutil/block_generator.go
+3
-3
blockservice.go
blockservice/blockservice.go
+5
-5
blocks_test.go
blockservice/test/blocks_test.go
+3
-3
block.go
core/commands/block.go
+3
-3
bitswap.go
exchange/bitswap/bitswap.go
+10
-10
bitswap_test.go
exchange/bitswap/bitswap_test.go
+2
-2
engine.go
exchange/bitswap/decision/engine.go
+6
-6
engine_test.go
exchange/bitswap/decision/engine_test.go
+1
-1
message.go
exchange/bitswap/message/message.go
+8
-8
notifications.go
exchange/bitswap/notifications/notifications.go
+6
-6
notifications_test.go
exchange/bitswap/notifications/notifications_test.go
+3
-3
network_test.go
exchange/bitswap/testnet/network_test.go
+1
-1
workers.go
exchange/bitswap/workers.go
+1
-1
interface.go
exchange/interface.go
+3
-3
offline.go
exchange/offline/offline.go
+4
-4
rabin_test.go
importer/chunk/rabin_test.go
+2
-2
merkledag.go
merkledag/merkledag.go
+11
-11
bitswap_wo_routing_test.go
test/integration/bitswap_wo_routing_test.go
+2
-2
没有找到文件。
blocks/blocks.go
浏览文件 @
b84cbec2
...
...
@@ -11,40 +11,56 @@ import (
u
"gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
)
type
Block
interface
{
Multihash
()
mh
.
Multihash
Data
()
[]
byte
Key
()
key
.
Key
String
()
string
Loggable
()
map
[
string
]
interface
{}
}
// Block is a singular block of data in ipfs
type
Block
struct
{
M
ultihash
mh
.
Multihash
D
ata
[]
byte
type
Raw
Block
struct
{
m
ultihash
mh
.
Multihash
d
ata
[]
byte
}
// NewBlock creates a Block object from opaque data. It will hash the data.
func
NewBlock
(
data
[]
byte
)
*
Block
{
return
&
Block
{
Data
:
data
,
M
ultihash
:
u
.
Hash
(
data
)}
func
NewBlock
(
data
[]
byte
)
*
Raw
Block
{
return
&
RawBlock
{
data
:
data
,
m
ultihash
:
u
.
Hash
(
data
)}
}
// NewBlockWithHash creates a new block when the hash of the data
// is already known, this is used to save time in situations where
// we are able to be confident that the data is correct
func
NewBlockWithHash
(
data
[]
byte
,
h
mh
.
Multihash
)
(
*
Block
,
error
)
{
func
NewBlockWithHash
(
data
[]
byte
,
h
mh
.
Multihash
)
(
*
Raw
Block
,
error
)
{
if
u
.
Debug
{
chk
:=
u
.
Hash
(
data
)
if
string
(
chk
)
!=
string
(
h
)
{
return
nil
,
errors
.
New
(
"Data did not match given hash!"
)
}
}
return
&
Block
{
Data
:
data
,
Multihash
:
h
},
nil
return
&
RawBlock
{
data
:
data
,
multihash
:
h
},
nil
}
func
(
b
*
RawBlock
)
Multihash
()
mh
.
Multihash
{
return
b
.
multihash
}
func
(
b
*
RawBlock
)
Data
()
[]
byte
{
return
b
.
data
}
// Key returns the block's Multihash as a Key value.
func
(
b
*
Block
)
Key
()
key
.
Key
{
return
key
.
Key
(
b
.
M
ultihash
)
func
(
b
*
Raw
Block
)
Key
()
key
.
Key
{
return
key
.
Key
(
b
.
m
ultihash
)
}
func
(
b
*
Block
)
String
()
string
{
func
(
b
*
Raw
Block
)
String
()
string
{
return
fmt
.
Sprintf
(
"[Block %s]"
,
b
.
Key
())
}
func
(
b
*
Block
)
Loggable
()
map
[
string
]
interface
{}
{
func
(
b
*
Raw
Block
)
Loggable
()
map
[
string
]
interface
{}
{
return
map
[
string
]
interface
{}{
"block"
:
b
.
Key
()
.
String
(),
}
...
...
blocks/blockstore/blockstore.go
浏览文件 @
b84cbec2
...
...
@@ -30,9 +30,9 @@ var ErrNotFound = errors.New("blockstore: block not found")
type
Blockstore
interface
{
DeleteBlock
(
key
.
Key
)
error
Has
(
key
.
Key
)
(
bool
,
error
)
Get
(
key
.
Key
)
(
*
blocks
.
Block
,
error
)
Put
(
*
blocks
.
Block
)
error
PutMany
([]
*
blocks
.
Block
)
error
Get
(
key
.
Key
)
(
blocks
.
Block
,
error
)
Put
(
blocks
.
Block
)
error
PutMany
([]
blocks
.
Block
)
error
AllKeysChan
(
ctx
context
.
Context
)
(
<-
chan
key
.
Key
,
error
)
}
...
...
@@ -73,7 +73,7 @@ type blockstore struct {
gcreqlk
sync
.
Mutex
}
func
(
bs
*
blockstore
)
Get
(
k
key
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
bs
*
blockstore
)
Get
(
k
key
.
Key
)
(
blocks
.
Block
,
error
)
{
maybeData
,
err
:=
bs
.
datastore
.
Get
(
k
.
DsKey
())
if
err
==
ds
.
ErrNotFound
{
return
nil
,
ErrNotFound
...
...
@@ -89,7 +89,7 @@ func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
return
blocks
.
NewBlockWithHash
(
bdata
,
mh
.
Multihash
(
k
))
}
func
(
bs
*
blockstore
)
Put
(
block
*
blocks
.
Block
)
error
{
func
(
bs
*
blockstore
)
Put
(
block
blocks
.
Block
)
error
{
k
:=
block
.
Key
()
.
DsKey
()
// Has is cheaper than Put, so see if we already have it
...
...
@@ -97,10 +97,10 @@ func (bs *blockstore) Put(block *blocks.Block) error {
if
err
==
nil
&&
exists
{
return
nil
// already stored.
}
return
bs
.
datastore
.
Put
(
k
,
block
.
Data
)
return
bs
.
datastore
.
Put
(
k
,
block
.
Data
()
)
}
func
(
bs
*
blockstore
)
PutMany
(
blocks
[]
*
blocks
.
Block
)
error
{
func
(
bs
*
blockstore
)
PutMany
(
blocks
[]
blocks
.
Block
)
error
{
t
,
err
:=
bs
.
datastore
.
Batch
()
if
err
!=
nil
{
return
err
...
...
@@ -112,7 +112,7 @@ func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
continue
}
err
=
t
.
Put
(
k
,
b
.
Data
)
err
=
t
.
Put
(
k
,
b
.
Data
()
)
if
err
!=
nil
{
return
err
}
...
...
blocks/blockstore/blockstore_test.go
浏览文件 @
b84cbec2
...
...
@@ -40,7 +40,7 @@ func TestPutThenGetBlock(t *testing.T) {
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
!
bytes
.
Equal
(
block
.
Data
,
blockFromBlockstore
.
Data
)
{
if
!
bytes
.
Equal
(
block
.
Data
(),
blockFromBlockstore
.
Data
()
)
{
t
.
Fail
()
}
}
...
...
blocks/blockstore/write_cache.go
浏览文件 @
b84cbec2
...
...
@@ -34,11 +34,11 @@ func (w *writecache) Has(k key.Key) (bool, error) {
return
w
.
blockstore
.
Has
(
k
)
}
func
(
w
*
writecache
)
Get
(
k
key
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
w
*
writecache
)
Get
(
k
key
.
Key
)
(
blocks
.
Block
,
error
)
{
return
w
.
blockstore
.
Get
(
k
)
}
func
(
w
*
writecache
)
Put
(
b
*
blocks
.
Block
)
error
{
func
(
w
*
writecache
)
Put
(
b
blocks
.
Block
)
error
{
k
:=
b
.
Key
()
if
_
,
ok
:=
w
.
cache
.
Get
(
k
);
ok
{
return
nil
...
...
@@ -49,8 +49,8 @@ func (w *writecache) Put(b *blocks.Block) error {
return
w
.
blockstore
.
Put
(
b
)
}
func
(
w
*
writecache
)
PutMany
(
bs
[]
*
blocks
.
Block
)
error
{
var
good
[]
*
blocks
.
Block
func
(
w
*
writecache
)
PutMany
(
bs
[]
blocks
.
Block
)
error
{
var
good
[]
blocks
.
Block
for
_
,
b
:=
range
bs
{
if
_
,
ok
:=
w
.
cache
.
Get
(
b
.
Key
());
!
ok
{
good
=
append
(
good
,
b
)
...
...
blocks/blocksutil/block_generator.go
浏览文件 @
b84cbec2
...
...
@@ -10,13 +10,13 @@ type BlockGenerator struct {
seq
int
}
func
(
bg
*
BlockGenerator
)
Next
()
*
blocks
.
Block
{
func
(
bg
*
BlockGenerator
)
Next
()
blocks
.
Block
{
bg
.
seq
++
return
blocks
.
NewBlock
([]
byte
(
string
(
bg
.
seq
)))
}
func
(
bg
*
BlockGenerator
)
Blocks
(
n
int
)
[]
*
blocks
.
Block
{
blocks
:=
make
([]
*
blocks
.
Block
,
0
)
func
(
bg
*
BlockGenerator
)
Blocks
(
n
int
)
[]
blocks
.
Block
{
blocks
:=
make
([]
blocks
.
Block
,
0
)
for
i
:=
0
;
i
<
n
;
i
++
{
b
:=
bg
.
Next
()
blocks
=
append
(
blocks
,
b
)
...
...
blockservice/blockservice.go
浏览文件 @
b84cbec2
...
...
@@ -41,7 +41,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func
(
s
*
BlockService
)
AddBlock
(
b
*
blocks
.
Block
)
(
key
.
Key
,
error
)
{
func
(
s
*
BlockService
)
AddBlock
(
b
blocks
.
Block
)
(
key
.
Key
,
error
)
{
k
:=
b
.
Key
()
err
:=
s
.
Blockstore
.
Put
(
b
)
if
err
!=
nil
{
...
...
@@ -53,7 +53,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
return
k
,
nil
}
func
(
s
*
BlockService
)
AddBlocks
(
bs
[]
*
blocks
.
Block
)
([]
key
.
Key
,
error
)
{
func
(
s
*
BlockService
)
AddBlocks
(
bs
[]
blocks
.
Block
)
([]
key
.
Key
,
error
)
{
err
:=
s
.
Blockstore
.
PutMany
(
bs
)
if
err
!=
nil
{
return
nil
,
err
...
...
@@ -71,7 +71,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func
(
s
*
BlockService
)
GetBlock
(
ctx
context
.
Context
,
k
key
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
s
*
BlockService
)
GetBlock
(
ctx
context
.
Context
,
k
key
.
Key
)
(
blocks
.
Block
,
error
)
{
log
.
Debugf
(
"BlockService GetBlock: '%s'"
,
k
)
block
,
err
:=
s
.
Blockstore
.
Get
(
k
)
if
err
==
nil
{
...
...
@@ -103,8 +103,8 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block,
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func
(
s
*
BlockService
)
GetBlocks
(
ctx
context
.
Context
,
ks
[]
key
.
Key
)
<-
chan
*
blocks
.
Block
{
out
:=
make
(
chan
*
blocks
.
Block
,
0
)
func
(
s
*
BlockService
)
GetBlocks
(
ctx
context
.
Context
,
ks
[]
key
.
Key
)
<-
chan
blocks
.
Block
{
out
:=
make
(
chan
blocks
.
Block
,
0
)
go
func
()
{
defer
close
(
out
)
var
misses
[]
key
.
Key
...
...
blockservice/test/blocks_test.go
浏览文件 @
b84cbec2
...
...
@@ -24,7 +24,7 @@ func TestBlocks(t *testing.T) {
b
:=
blocks
.
NewBlock
([]
byte
(
"beep boop"
))
h
:=
u
.
Hash
([]
byte
(
"beep boop"
))
if
!
bytes
.
Equal
(
b
.
Multihash
,
h
)
{
if
!
bytes
.
Equal
(
b
.
Multihash
()
,
h
)
{
t
.
Error
(
"Block Multihash and data multihash not equal"
)
}
...
...
@@ -54,7 +54,7 @@ func TestBlocks(t *testing.T) {
t
.
Error
(
"Block keys not equal."
)
}
if
!
bytes
.
Equal
(
b
.
Data
,
b2
.
Data
)
{
if
!
bytes
.
Equal
(
b
.
Data
(),
b2
.
Data
()
)
{
t
.
Error
(
"Block data is not equal."
)
}
}
...
...
@@ -79,7 +79,7 @@ func TestGetBlocksSequential(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
time
.
Second
*
50
)
defer
cancel
()
out
:=
servs
[
i
]
.
GetBlocks
(
ctx
,
keys
)
gotten
:=
make
(
map
[
key
.
Key
]
*
blocks
.
Block
)
gotten
:=
make
(
map
[
key
.
Key
]
blocks
.
Block
)
for
blk
:=
range
out
{
if
_
,
ok
:=
gotten
[
blk
.
Key
()];
ok
{
t
.
Fatal
(
"Got duplicate block!"
)
...
...
core/commands/block.go
浏览文件 @
b84cbec2
...
...
@@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout:
res
.
SetOutput
(
&
BlockStat
{
Key
:
b
.
Key
()
.
B58String
(),
Size
:
len
(
b
.
Data
),
Size
:
len
(
b
.
Data
()
),
})
},
Type
:
BlockStat
{},
...
...
@@ -97,7 +97,7 @@ It outputs to stdout, and <key> is a base58 encoded multihash.
return
}
res
.
SetOutput
(
bytes
.
NewReader
(
b
.
Data
))
res
.
SetOutput
(
bytes
.
NewReader
(
b
.
Data
()
))
},
}
...
...
@@ -161,7 +161,7 @@ It reads from stdin, and <key> is a base58 encoded multihash.
Type
:
BlockStat
{},
}
func
getBlockForKey
(
req
cmds
.
Request
,
skey
string
)
(
*
blocks
.
Block
,
error
)
{
func
getBlockForKey
(
req
cmds
.
Request
,
skey
string
)
(
blocks
.
Block
,
error
)
{
n
,
err
:=
req
.
InvocContext
()
.
GetNode
()
if
err
!=
nil
{
return
nil
,
err
...
...
exchange/bitswap/bitswap.go
浏览文件 @
b84cbec2
...
...
@@ -90,7 +90,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
network
:
network
,
findKeys
:
make
(
chan
*
wantlist
.
Entry
,
sizeBatchRequestChan
),
process
:
px
,
newBlocks
:
make
(
chan
*
blocks
.
Block
,
HasBlockBufferSize
),
newBlocks
:
make
(
chan
blocks
.
Block
,
HasBlockBufferSize
),
provideKeys
:
make
(
chan
key
.
Key
,
provideKeysBufferSize
),
wm
:
NewWantManager
(
ctx
,
network
),
}
...
...
@@ -137,7 +137,7 @@ type Bitswap struct {
process
process
.
Process
newBlocks
chan
*
blocks
.
Block
newBlocks
chan
blocks
.
Block
provideKeys
chan
key
.
Key
...
...
@@ -154,7 +154,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func
(
bs
*
Bitswap
)
GetBlock
(
parent
context
.
Context
,
k
key
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
bs
*
Bitswap
)
GetBlock
(
parent
context
.
Context
,
k
key
.
Key
)
(
blocks
.
Block
,
error
)
{
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
...
...
@@ -209,9 +209,9 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func
(
bs
*
Bitswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
key
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
func
(
bs
*
Bitswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
key
.
Key
)
(
<-
chan
blocks
.
Block
,
error
)
{
if
len
(
keys
)
==
0
{
out
:=
make
(
chan
*
blocks
.
Block
)
out
:=
make
(
chan
blocks
.
Block
)
close
(
out
)
return
out
,
nil
}
...
...
@@ -251,7 +251,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) {
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func
(
bs
*
Bitswap
)
HasBlock
(
blk
*
blocks
.
Block
)
error
{
func
(
bs
*
Bitswap
)
HasBlock
(
blk
blocks
.
Block
)
error
{
select
{
case
<-
bs
.
process
.
Closing
()
:
return
errors
.
New
(
"bitswap is closed"
)
...
...
@@ -277,7 +277,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
return
nil
}
func
(
bs
*
Bitswap
)
tryPutBlock
(
blk
*
blocks
.
Block
,
attempts
int
)
error
{
func
(
bs
*
Bitswap
)
tryPutBlock
(
blk
blocks
.
Block
,
attempts
int
)
error
{
var
err
error
for
i
:=
0
;
i
<
attempts
;
i
++
{
if
err
=
bs
.
blockstore
.
Put
(
blk
);
err
==
nil
{
...
...
@@ -316,7 +316,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg
:=
sync
.
WaitGroup
{}
for
_
,
block
:=
range
iblocks
{
wg
.
Add
(
1
)
go
func
(
b
*
blocks
.
Block
)
{
go
func
(
b
blocks
.
Block
)
{
defer
wg
.
Done
()
if
err
:=
bs
.
updateReceiveCounters
(
b
);
err
!=
nil
{
...
...
@@ -337,7 +337,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
var
ErrAlreadyHaveBlock
=
errors
.
New
(
"already have block"
)
func
(
bs
*
Bitswap
)
updateReceiveCounters
(
b
*
blocks
.
Block
)
error
{
func
(
bs
*
Bitswap
)
updateReceiveCounters
(
b
blocks
.
Block
)
error
{
bs
.
counterLk
.
Lock
()
defer
bs
.
counterLk
.
Unlock
()
bs
.
blocksRecvd
++
...
...
@@ -348,7 +348,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error {
}
if
err
==
nil
&&
has
{
bs
.
dupBlocksRecvd
++
bs
.
dupDataRecvd
+=
uint64
(
len
(
b
.
Data
))
bs
.
dupDataRecvd
+=
uint64
(
len
(
b
.
Data
()
))
}
if
has
{
...
...
exchange/bitswap/bitswap_test.go
浏览文件 @
b84cbec2
...
...
@@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
t
.
Fatal
(
"Expected to succeed"
)
}
if
!
bytes
.
Equal
(
block
.
Data
,
received
.
Data
)
{
if
!
bytes
.
Equal
(
block
.
Data
(),
received
.
Data
()
)
{
t
.
Fatal
(
"Data doesn't match"
)
}
}
...
...
@@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
}
func
getOrFail
(
bitswap
Instance
,
b
*
blocks
.
Block
,
t
*
testing
.
T
,
wg
*
sync
.
WaitGroup
)
{
func
getOrFail
(
bitswap
Instance
,
b
blocks
.
Block
,
t
*
testing
.
T
,
wg
*
sync
.
WaitGroup
)
{
if
_
,
err
:=
bitswap
.
Blockstore
()
.
Get
(
b
.
Key
());
err
!=
nil
{
_
,
err
:=
bitswap
.
Exchange
.
GetBlock
(
context
.
Background
(),
b
.
Key
())
if
err
!=
nil
{
...
...
exchange/bitswap/decision/engine.go
浏览文件 @
b84cbec2
...
...
@@ -58,7 +58,7 @@ type Envelope struct {
Peer
peer
.
ID
// Block is the payload
Block
*
blocks
.
Block
Block
blocks
.
Block
// A callback to notify the decision queue that the task is complete
Sent
func
()
...
...
@@ -226,13 +226,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}
for
_
,
block
:=
range
m
.
Blocks
()
{
log
.
Debugf
(
"got block %s %d bytes"
,
block
.
Key
(),
len
(
block
.
Data
))
l
.
ReceivedBytes
(
len
(
block
.
Data
))
log
.
Debugf
(
"got block %s %d bytes"
,
block
.
Key
(),
len
(
block
.
Data
()
))
l
.
ReceivedBytes
(
len
(
block
.
Data
()
))
}
return
nil
}
func
(
e
*
Engine
)
addBlock
(
block
*
blocks
.
Block
)
{
func
(
e
*
Engine
)
addBlock
(
block
blocks
.
Block
)
{
work
:=
false
for
_
,
l
:=
range
e
.
ledgerMap
{
...
...
@@ -247,7 +247,7 @@ func (e *Engine) addBlock(block *blocks.Block) {
}
}
func
(
e
*
Engine
)
AddBlock
(
block
*
blocks
.
Block
)
{
func
(
e
*
Engine
)
AddBlock
(
block
blocks
.
Block
)
{
e
.
lock
.
Lock
()
defer
e
.
lock
.
Unlock
()
...
...
@@ -266,7 +266,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
l
:=
e
.
findOrCreate
(
p
)
for
_
,
block
:=
range
m
.
Blocks
()
{
l
.
SentBytes
(
len
(
block
.
Data
))
l
.
SentBytes
(
len
(
block
.
Data
()
))
l
.
wantList
.
Remove
(
block
.
Key
())
e
.
peerRequestQueue
.
Remove
(
block
.
Key
(),
p
)
}
...
...
exchange/bitswap/decision/engine_test.go
浏览文件 @
b84cbec2
...
...
@@ -188,7 +188,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
received
:=
envelope
.
Block
expected
:=
blocks
.
NewBlock
([]
byte
(
k
))
if
received
.
Key
()
!=
expected
.
Key
()
{
return
errors
.
New
(
fmt
.
Sprintln
(
"received"
,
string
(
received
.
Data
),
"expected"
,
string
(
expected
.
Data
)))
return
errors
.
New
(
fmt
.
Sprintln
(
"received"
,
string
(
received
.
Data
()),
"expected"
,
string
(
expected
.
Data
()
)))
}
}
return
nil
...
...
exchange/bitswap/message/message.go
浏览文件 @
b84cbec2
...
...
@@ -22,7 +22,7 @@ type BitSwapMessage interface {
Wantlist
()
[]
Entry
// Blocks returns a slice of unique blocks
Blocks
()
[]
*
blocks
.
Block
Blocks
()
[]
blocks
.
Block
// AddEntry adds an entry to the Wantlist.
AddEntry
(
key
key
.
Key
,
priority
int
)
...
...
@@ -34,7 +34,7 @@ type BitSwapMessage interface {
// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Full
()
bool
AddBlock
(
*
blocks
.
Block
)
AddBlock
(
blocks
.
Block
)
Exportable
Loggable
()
map
[
string
]
interface
{}
...
...
@@ -48,7 +48,7 @@ type Exportable interface {
type
impl
struct
{
full
bool
wantlist
map
[
key
.
Key
]
Entry
blocks
map
[
key
.
Key
]
*
blocks
.
Block
blocks
map
[
key
.
Key
]
blocks
.
Block
}
func
New
(
full
bool
)
BitSwapMessage
{
...
...
@@ -57,7 +57,7 @@ func New(full bool) BitSwapMessage {
func
newMsg
(
full
bool
)
*
impl
{
return
&
impl
{
blocks
:
make
(
map
[
key
.
Key
]
*
blocks
.
Block
),
blocks
:
make
(
map
[
key
.
Key
]
blocks
.
Block
),
wantlist
:
make
(
map
[
key
.
Key
]
Entry
),
full
:
full
,
}
...
...
@@ -96,8 +96,8 @@ func (m *impl) Wantlist() []Entry {
return
out
}
func
(
m
*
impl
)
Blocks
()
[]
*
blocks
.
Block
{
bs
:=
make
([]
*
blocks
.
Block
,
0
,
len
(
m
.
blocks
))
func
(
m
*
impl
)
Blocks
()
[]
blocks
.
Block
{
bs
:=
make
([]
blocks
.
Block
,
0
,
len
(
m
.
blocks
))
for
_
,
block
:=
range
m
.
blocks
{
bs
=
append
(
bs
,
block
)
}
...
...
@@ -129,7 +129,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
}
}
func
(
m
*
impl
)
AddBlock
(
b
*
blocks
.
Block
)
{
func
(
m
*
impl
)
AddBlock
(
b
blocks
.
Block
)
{
m
.
blocks
[
b
.
Key
()]
=
b
}
...
...
@@ -156,7 +156,7 @@ func (m *impl) ToProto() *pb.Message {
})
}
for
_
,
b
:=
range
m
.
Blocks
()
{
pbm
.
Blocks
=
append
(
pbm
.
Blocks
,
b
.
Data
)
pbm
.
Blocks
=
append
(
pbm
.
Blocks
,
b
.
Data
()
)
}
return
pbm
}
...
...
exchange/bitswap/notifications/notifications.go
浏览文件 @
b84cbec2
...
...
@@ -10,8 +10,8 @@ import (
const
bufferSize
=
16
type
PubSub
interface
{
Publish
(
block
*
blocks
.
Block
)
Subscribe
(
ctx
context
.
Context
,
keys
...
key
.
Key
)
<-
chan
*
blocks
.
Block
Publish
(
block
blocks
.
Block
)
Subscribe
(
ctx
context
.
Context
,
keys
...
key
.
Key
)
<-
chan
blocks
.
Block
Shutdown
()
}
...
...
@@ -23,7 +23,7 @@ type impl struct {
wrapped
pubsub
.
PubSub
}
func
(
ps
*
impl
)
Publish
(
block
*
blocks
.
Block
)
{
func
(
ps
*
impl
)
Publish
(
block
blocks
.
Block
)
{
topic
:=
string
(
block
.
Key
())
ps
.
wrapped
.
Pub
(
block
,
topic
)
}
...
...
@@ -35,9 +35,9 @@ func (ps *impl) Shutdown() {
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func
(
ps
*
impl
)
Subscribe
(
ctx
context
.
Context
,
keys
...
key
.
Key
)
<-
chan
*
blocks
.
Block
{
func
(
ps
*
impl
)
Subscribe
(
ctx
context
.
Context
,
keys
...
key
.
Key
)
<-
chan
blocks
.
Block
{
blocksCh
:=
make
(
chan
*
blocks
.
Block
,
len
(
keys
))
blocksCh
:=
make
(
chan
blocks
.
Block
,
len
(
keys
))
valuesCh
:=
make
(
chan
interface
{},
len
(
keys
))
// provide our own channel to control buffer, prevent blocking
if
len
(
keys
)
==
0
{
close
(
blocksCh
)
...
...
@@ -55,7 +55,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.B
if
!
ok
{
return
}
block
,
ok
:=
val
.
(
*
blocks
.
Block
)
block
,
ok
:=
val
.
(
blocks
.
Block
)
if
!
ok
{
return
}
...
...
exchange/bitswap/notifications/notifications_test.go
浏览文件 @
b84cbec2
...
...
@@ -151,15 +151,15 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t
.
Log
(
"publishing the large number of blocks to the ignored channel must not deadlock"
)
}
func
assertBlockChannelNil
(
t
*
testing
.
T
,
blockChannel
<-
chan
*
blocks
.
Block
)
{
func
assertBlockChannelNil
(
t
*
testing
.
T
,
blockChannel
<-
chan
blocks
.
Block
)
{
_
,
ok
:=
<-
blockChannel
if
ok
{
t
.
Fail
()
}
}
func
assertBlocksEqual
(
t
*
testing
.
T
,
a
,
b
*
blocks
.
Block
)
{
if
!
bytes
.
Equal
(
a
.
Data
,
b
.
Data
)
{
func
assertBlocksEqual
(
t
*
testing
.
T
,
a
,
b
blocks
.
Block
)
{
if
!
bytes
.
Equal
(
a
.
Data
(),
b
.
Data
()
)
{
t
.
Fatal
(
"blocks aren't equal"
)
}
if
a
.
Key
()
!=
b
.
Key
()
{
...
...
exchange/bitswap/testnet/network_test.go
浏览文件 @
b84cbec2
...
...
@@ -44,7 +44,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
// TODO assert that this came from the correct peer and that the message contents are as expected
ok
:=
false
for
_
,
b
:=
range
msgFromResponder
.
Blocks
()
{
if
string
(
b
.
Data
)
==
expectedStr
{
if
string
(
b
.
Data
()
)
==
expectedStr
{
wg
.
Done
()
ok
=
true
}
...
...
exchange/bitswap/workers.go
浏览文件 @
b84cbec2
...
...
@@ -61,7 +61,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
log
.
Event
(
ctx
,
"Bitswap.TaskWorker.Work"
,
logging
.
LoggableMap
{
"ID"
:
id
,
"Target"
:
envelope
.
Peer
.
Pretty
(),
"Block"
:
envelope
.
Block
.
Multihash
.
B58String
(),
"Block"
:
envelope
.
Block
.
Multihash
()
.
B58String
(),
})
bs
.
wm
.
SendBlock
(
ctx
,
envelope
)
...
...
exchange/interface.go
浏览文件 @
b84cbec2
...
...
@@ -13,13 +13,13 @@ import (
// exchange protocol.
type
Interface
interface
{
// type Exchanger interface
// GetBlock returns the block associated with a given key.
GetBlock
(
context
.
Context
,
key
.
Key
)
(
*
blocks
.
Block
,
error
)
GetBlock
(
context
.
Context
,
key
.
Key
)
(
blocks
.
Block
,
error
)
GetBlocks
(
context
.
Context
,
[]
key
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
GetBlocks
(
context
.
Context
,
[]
key
.
Key
)
(
<-
chan
blocks
.
Block
,
error
)
// TODO Should callers be concerned with whether the block was made
// available on the network?
HasBlock
(
*
blocks
.
Block
)
error
HasBlock
(
blocks
.
Block
)
error
io
.
Closer
}
exchange/offline/offline.go
浏览文件 @
b84cbec2
...
...
@@ -23,12 +23,12 @@ type offlineExchange struct {
// GetBlock returns nil to signal that a block could not be retrieved for the
// given key.
// NB: This function may return before the timeout expires.
func
(
e
*
offlineExchange
)
GetBlock
(
_
context
.
Context
,
k
key
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
e
*
offlineExchange
)
GetBlock
(
_
context
.
Context
,
k
key
.
Key
)
(
blocks
.
Block
,
error
)
{
return
e
.
bs
.
Get
(
k
)
}
// HasBlock always returns nil.
func
(
e
*
offlineExchange
)
HasBlock
(
b
*
blocks
.
Block
)
error
{
func
(
e
*
offlineExchange
)
HasBlock
(
b
blocks
.
Block
)
error
{
return
e
.
bs
.
Put
(
b
)
}
...
...
@@ -39,8 +39,8 @@ func (_ *offlineExchange) Close() error {
return
nil
}
func
(
e
*
offlineExchange
)
GetBlocks
(
ctx
context
.
Context
,
ks
[]
key
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
out
:=
make
(
chan
*
blocks
.
Block
,
0
)
func
(
e
*
offlineExchange
)
GetBlocks
(
ctx
context
.
Context
,
ks
[]
key
.
Key
)
(
<-
chan
blocks
.
Block
,
error
)
{
out
:=
make
(
chan
blocks
.
Block
,
0
)
go
func
()
{
defer
close
(
out
)
var
misses
[]
key
.
Key
...
...
importer/chunk/rabin_test.go
浏览文件 @
b84cbec2
...
...
@@ -39,10 +39,10 @@ func TestRabinChunking(t *testing.T) {
}
}
func
chunkData
(
t
*
testing
.
T
,
data
[]
byte
)
map
[
key
.
Key
]
*
blocks
.
Block
{
func
chunkData
(
t
*
testing
.
T
,
data
[]
byte
)
map
[
key
.
Key
]
blocks
.
Block
{
r
:=
NewRabin
(
bytes
.
NewReader
(
data
),
1024
*
256
)
blkmap
:=
make
(
map
[
key
.
Key
]
*
blocks
.
Block
)
blkmap
:=
make
(
map
[
key
.
Key
]
blocks
.
Block
)
for
{
blk
,
err
:=
r
.
NextBytes
()
...
...
merkledag/merkledag.go
浏览文件 @
b84cbec2
...
...
@@ -52,13 +52,13 @@ func (n *dagService) Add(nd *Node) (key.Key, error) {
return
""
,
err
}
b
:=
new
(
blocks
.
Block
)
b
.
Data
=
d
b
.
Multihash
,
err
=
nd
.
Multihash
()
mh
,
err
:=
nd
.
Multihash
()
if
err
!=
nil
{
return
""
,
err
}
b
,
_
:=
blocks
.
NewBlockWithHash
(
d
,
mh
)
return
n
.
Blocks
.
AddBlock
(
b
)
}
...
...
@@ -82,7 +82,7 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
return
nil
,
fmt
.
Errorf
(
"Failed to get block for %s: %v"
,
k
.
B58String
(),
err
)
}
res
,
err
:=
DecodeProtobuf
(
b
.
Data
)
res
,
err
:=
DecodeProtobuf
(
b
.
Data
()
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"Failed to decode Protocol Buffers: %v"
,
err
)
}
...
...
@@ -135,7 +135,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeO
}
return
}
nd
,
err
:=
DecodeProtobuf
(
b
.
Data
)
nd
,
err
:=
DecodeProtobuf
(
b
.
Data
()
)
if
err
!=
nil
{
out
<-
&
NodeOption
{
Err
:
err
}
return
...
...
@@ -316,7 +316,7 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
type
Batch
struct
{
ds
*
dagService
blocks
[]
*
blocks
.
Block
blocks
[]
blocks
.
Block
size
int
MaxSize
int
}
...
...
@@ -327,17 +327,17 @@ func (t *Batch) Add(nd *Node) (key.Key, error) {
return
""
,
err
}
b
:=
new
(
blocks
.
Block
)
b
.
Data
=
d
b
.
Multihash
,
err
=
nd
.
Multihash
()
mh
,
err
:=
nd
.
Multihash
()
if
err
!=
nil
{
return
""
,
err
}
k
:=
key
.
Key
(
b
.
Multihash
)
b
,
_
:=
blocks
.
NewBlockWithHash
(
d
,
mh
)
k
:=
key
.
Key
(
mh
)
t
.
blocks
=
append
(
t
.
blocks
,
b
)
t
.
size
+=
len
(
b
.
Data
)
t
.
size
+=
len
(
b
.
Data
()
)
if
t
.
size
>
t
.
MaxSize
{
return
k
,
t
.
Commit
()
}
...
...
test/integration/bitswap_wo_routing_test.go
浏览文件 @
b84cbec2
...
...
@@ -71,7 +71,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
b
,
err
:=
n
.
Blocks
.
GetBlock
(
ctx
,
block0
.
Key
())
if
err
!=
nil
{
t
.
Error
(
err
)
}
else
if
!
bytes
.
Equal
(
b
.
Data
,
block0
.
Data
)
{
}
else
if
!
bytes
.
Equal
(
b
.
Data
(),
block0
.
Data
()
)
{
t
.
Error
(
"byte comparison fail"
)
}
else
{
log
.
Debug
(
"got block: %s"
,
b
.
Key
())
...
...
@@ -88,7 +88,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
b
,
err
:=
n
.
Blocks
.
GetBlock
(
ctx
,
block1
.
Key
())
if
err
!=
nil
{
t
.
Error
(
err
)
}
else
if
!
bytes
.
Equal
(
b
.
Data
,
block1
.
Data
)
{
}
else
if
!
bytes
.
Equal
(
b
.
Data
(),
block1
.
Data
()
)
{
t
.
Error
(
"byte comparison fail"
)
}
else
{
log
.
Debug
(
"got block: %s"
,
b
.
Key
())
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论