Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
G
go-ipfs
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
jihao
go-ipfs
Commits
3e9717aa
提交
3e9717aa
authored
6月 06, 2016
作者:
Jeromy
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
reuse streams in the dht networking code
License: MIT Signed-off-by:
Jeromy
<
why@ipfs.io
>
上级
385055d2
隐藏空白字符变更
内嵌
并排
正在显示
3 个修改的文件
包含
134 行增加
和
70 行删除
+134
-70
dht.go
routing/dht/dht.go
+4
-0
dht_net.go
routing/dht/dht_net.go
+128
-66
ext_test.go
routing/dht/ext_test.go
+2
-4
没有找到文件。
routing/dht/dht.go
浏览文件 @
3e9717aa
...
...
@@ -58,6 +58,9 @@ type IpfsDHT struct {
ctx
context
.
Context
proc
goprocess
.
Process
strmap
map
[
peer
.
ID
]
*
messageSender
smlk
sync
.
Mutex
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
...
...
@@ -77,6 +80,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT {
return
nil
})
dht
.
strmap
=
make
(
map
[
peer
.
ID
]
*
messageSender
)
dht
.
ctx
=
ctx
h
.
SetStreamHandler
(
ProtocolDHT
,
dht
.
handleNewStream
)
...
...
routing/dht/dht_net.go
浏览文件 @
3e9717aa
package
dht
import
(
"
errors
"
"
sync
"
"time"
pb
"github.com/ipfs/go-ipfs/routing/dht/pb"
...
...
@@ -27,40 +27,42 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
w
:=
ggio
.
NewDelimitedWriter
(
cw
)
mPeer
:=
s
.
Conn
()
.
RemotePeer
()
// receive msg
pmes
:=
new
(
pb
.
Message
)
if
err
:=
r
.
ReadMsg
(
pmes
);
err
!=
nil
{
log
.
Debugf
(
"Error unmarshaling data: %s"
,
err
)
return
}
// update the peer (on valid msgs only)
dht
.
updateFromMessage
(
ctx
,
mPeer
,
pmes
)
// get handler for this msg type.
handler
:=
dht
.
handlerForMsgType
(
pmes
.
GetType
())
if
handler
==
nil
{
log
.
Debug
(
"got back nil handler from handlerForMsgType"
)
return
}
// dispatch handler.
rpmes
,
err
:=
handler
(
ctx
,
mPeer
,
pmes
)
if
err
!=
nil
{
log
.
Debugf
(
"handle message error: %s"
,
err
)
return
}
// if nil response, return it before serializing
if
rpmes
==
nil
{
log
.
Debug
(
"Got back nil response from request."
)
return
}
// send out response msg
if
err
:=
w
.
WriteMsg
(
rpmes
);
err
!=
nil
{
log
.
Debugf
(
"send response error: %s"
,
err
)
return
for
{
// receive msg
pmes
:=
new
(
pb
.
Message
)
if
err
:=
r
.
ReadMsg
(
pmes
);
err
!=
nil
{
log
.
Debugf
(
"Error unmarshaling data: %s"
,
err
)
return
}
// update the peer (on valid msgs only)
dht
.
updateFromMessage
(
ctx
,
mPeer
,
pmes
)
// get handler for this msg type.
handler
:=
dht
.
handlerForMsgType
(
pmes
.
GetType
())
if
handler
==
nil
{
log
.
Debug
(
"got back nil handler from handlerForMsgType"
)
return
}
// dispatch handler.
rpmes
,
err
:=
handler
(
ctx
,
mPeer
,
pmes
)
if
err
!=
nil
{
log
.
Debugf
(
"handle message error: %s"
,
err
)
return
}
// if nil response, return it before serializing
if
rpmes
==
nil
{
log
.
Debug
(
"Got back nil response from request."
)
continue
}
// send out response msg
if
err
:=
w
.
WriteMsg
(
rpmes
);
err
!=
nil
{
log
.
Debugf
(
"send response error: %s"
,
err
)
return
}
}
return
...
...
@@ -70,32 +72,14 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// measure the RTT for latency measurements.
func
(
dht
*
IpfsDHT
)
sendRequest
(
ctx
context
.
Context
,
p
peer
.
ID
,
pmes
*
pb
.
Message
)
(
*
pb
.
Message
,
error
)
{
log
.
Debugf
(
"%s DHT starting stream"
,
dht
.
self
)
s
,
err
:=
dht
.
host
.
NewStream
(
ctx
,
ProtocolDHT
,
p
)
if
err
!=
nil
{
return
nil
,
err
}
defer
s
.
Close
()
cr
:=
ctxio
.
NewReader
(
ctx
,
s
)
// ok to use. we defer close stream in this func
cw
:=
ctxio
.
NewWriter
(
ctx
,
s
)
// ok to use. we defer close stream in this func
r
:=
ggio
.
NewDelimitedReader
(
cr
,
inet
.
MessageSizeMax
)
w
:=
ggio
.
NewDelimitedWriter
(
cw
)
ms
:=
dht
.
messageSenderForPeer
(
p
)
start
:=
time
.
Now
()
if
err
:=
w
.
WriteMsg
(
pmes
);
err
!=
nil
{
return
nil
,
err
}
log
.
Event
(
ctx
,
"dhtSentMessage"
,
dht
.
self
,
p
,
pmes
)
rpmes
:=
new
(
pb
.
Message
)
if
err
:=
r
.
ReadMsg
(
rpmes
);
err
!=
nil
{
rpmes
,
err
:=
ms
.
SendRequest
(
ctx
,
pmes
)
if
err
!=
nil
{
return
nil
,
err
}
if
rpmes
==
nil
{
return
nil
,
errors
.
New
(
"no response to request"
)
}
// update the peer (on valid msgs only)
dht
.
updateFromMessage
(
ctx
,
p
,
rpmes
)
...
...
@@ -108,17 +92,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
// sendMessage sends out a message
func
(
dht
*
IpfsDHT
)
sendMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
pmes
*
pb
.
Message
)
error
{
log
.
Debugf
(
"%s DHT starting stream"
,
dht
.
self
)
s
,
err
:=
dht
.
host
.
NewStream
(
ctx
,
ProtocolDHT
,
p
)
if
err
!=
nil
{
return
err
}
defer
s
.
Close
()
ms
:=
dht
.
messageSenderForPeer
(
p
)
cw
:=
ctxio
.
NewWriter
(
ctx
,
s
)
// ok to use. we defer close stream in this func
w
:=
ggio
.
NewDelimitedWriter
(
cw
)
if
err
:=
w
.
WriteMsg
(
pmes
);
err
!=
nil
{
if
err
:=
ms
.
SendMessage
(
ctx
,
pmes
);
err
!=
nil
{
return
err
}
log
.
Event
(
ctx
,
"dhtSentMessage"
,
dht
.
self
,
p
,
pmes
)
...
...
@@ -129,3 +105,89 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
dht
.
Update
(
ctx
,
p
)
return
nil
}
func
(
dht
*
IpfsDHT
)
messageSenderForPeer
(
p
peer
.
ID
)
*
messageSender
{
dht
.
smlk
.
Lock
()
defer
dht
.
smlk
.
Unlock
()
ms
,
ok
:=
dht
.
strmap
[
p
]
if
!
ok
{
ms
=
dht
.
newMessageSender
(
p
)
dht
.
strmap
[
p
]
=
ms
}
return
ms
}
type
messageSender
struct
{
s
inet
.
Stream
r
ggio
.
ReadCloser
w
ggio
.
WriteCloser
lk
sync
.
Mutex
p
peer
.
ID
dht
*
IpfsDHT
}
func
(
dht
*
IpfsDHT
)
newMessageSender
(
p
peer
.
ID
)
*
messageSender
{
return
&
messageSender
{
p
:
p
,
dht
:
dht
}
}
func
(
ms
*
messageSender
)
prep
()
error
{
if
ms
.
s
!=
nil
{
return
nil
}
nstr
,
err
:=
ms
.
dht
.
host
.
NewStream
(
ms
.
dht
.
ctx
,
ProtocolDHT
,
ms
.
p
)
if
err
!=
nil
{
return
err
}
ms
.
r
=
ggio
.
NewDelimitedReader
(
nstr
,
inet
.
MessageSizeMax
)
ms
.
w
=
ggio
.
NewDelimitedWriter
(
nstr
)
ms
.
s
=
nstr
return
nil
}
func
(
ms
*
messageSender
)
SendMessage
(
ctx
context
.
Context
,
pmes
*
pb
.
Message
)
error
{
ms
.
lk
.
Lock
()
defer
ms
.
lk
.
Unlock
()
if
err
:=
ms
.
prep
();
err
!=
nil
{
return
err
}
err
:=
ms
.
w
.
WriteMsg
(
pmes
)
if
err
!=
nil
{
ms
.
s
.
Close
()
ms
.
s
=
nil
return
err
}
return
nil
}
func
(
ms
*
messageSender
)
SendRequest
(
ctx
context
.
Context
,
pmes
*
pb
.
Message
)
(
*
pb
.
Message
,
error
)
{
ms
.
lk
.
Lock
()
defer
ms
.
lk
.
Unlock
()
if
err
:=
ms
.
prep
();
err
!=
nil
{
return
nil
,
err
}
err
:=
ms
.
w
.
WriteMsg
(
pmes
)
if
err
!=
nil
{
ms
.
s
.
Close
()
ms
.
s
=
nil
return
nil
,
err
}
log
.
Event
(
ctx
,
"dhtSentMessage"
,
ms
.
dht
.
self
,
ms
.
p
,
pmes
)
mes
:=
new
(
pb
.
Message
)
err
=
ms
.
r
.
ReadMsg
(
mes
)
if
err
!=
nil
{
ms
.
s
.
Close
()
ms
.
s
=
nil
return
nil
,
err
}
return
mes
,
nil
}
routing/dht/ext_test.go
浏览文件 @
3e9717aa
...
...
@@ -2,7 +2,6 @@ package dht
import
(
"io"
"io/ioutil"
"math/rand"
"testing"
"time"
...
...
@@ -40,8 +39,7 @@ func TestGetFailures(t *testing.T) {
// Reply with failures to every message
hosts
[
1
]
.
SetStreamHandler
(
ProtocolDHT
,
func
(
s
inet
.
Stream
)
{
defer
s
.
Close
()
io
.
Copy
(
ioutil
.
Discard
,
s
)
s
.
Close
()
})
// This one should time out
...
...
@@ -51,7 +49,7 @@ func TestGetFailures(t *testing.T) {
err
=
merr
[
0
]
}
if
err
.
Error
()
!=
"process closing"
{
if
err
!=
io
.
EOF
{
t
.
Fatal
(
"Got different error than we expected"
,
err
)
}
}
else
{
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论