提交 3ae70217 作者: Juan Batiz-Benet 提交者: Brian Tiger Chow

getFromPeerList and peerFromInfo

上级 e4e02108
...@@ -225,13 +225,14 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, ...@@ -225,13 +225,14 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
continue continue
} }
np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr) // check if we already have this peer.
if err != nil { pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
u.PErr("%v\n", err.Error()) if pr == nil {
continue pr = &peer.Peer{ID: peer.ID(pb.GetId())}
dht.peerstore.Put(pr)
} }
pr.AddAddress(addr) // idempotent
peers = append(peers, np) peers = append(peers, pr)
} }
if len(peers) > 0 { if len(peers) > 0 {
...@@ -257,33 +258,26 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer, ...@@ -257,33 +258,26 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
// from someone what do we do with it? Connect to each of them? randomly pick // from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a // one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it? // successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration, func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*Message_PBPeer, level int) ([]byte, error) { peerlist []*Message_Peer, level int) ([]byte, error) {
for _, pinfo := range peerlist {
p, _ := dht.Find(peer.ID(pinfo.GetId()))
if p == nil {
maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
if err != nil {
u.PErr("getValue error: %s\n", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr) for _, pinfo := range peerlist {
if err != nil { p, err := dht.peerFromInfo(pinfo)
u.PErr("getValue error: %s\n", err) if err != nil {
continue u.DErr("getFromPeers error: %s\n", err)
} continue
} }
pmes, err := dht.getValueSingle(p, key, timeout, level)
pmes, err := dht.getValueSingle(ctx, p, key, level)
if err != nil { if err != nil {
u.DErr("getFromPeers error: %s\n", err) u.DErr("getFromPeers error: %s\n", err)
continue continue
} }
dht.providers.AddProvider(key, p)
// Make sure it was a successful get if value := pmes.GetValue(); value != nil {
if pmes.GetSuccess() && pmes.Value != nil { // Success! We were given the value
return pmes.GetValue(), nil dht.providers.AddProvider(key, p)
return value, nil
} }
} }
return nil, u.ErrNotFound return nil, u.ErrNotFound
...@@ -463,13 +457,32 @@ func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer { ...@@ -463,13 +457,32 @@ func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
return closer return closer
} }
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) { func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
maddr, err := ma.NewMultiaddr(pbp.GetAddr())
if err != nil { id := peer.ID(pbp.GetId())
return nil, err p, _ := dht.peerstore.Get(id)
if p == nil {
p, _ = dht.Find(id)
if p != nil {
panic("somehow peer not getting into peerstore")
}
}
if p == nil {
maddr, err := ma.NewMultiaddr(pbp.GetAddr())
if err != nil {
return nil, err
}
// create new Peer
p := &peer.Peer{ID: id}
p.AddAddress(maddr)
dht.peerstore.Put(pr)
} }
return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr) // dial connection
err = dht.network.Dial(p)
return p, err
} }
func (dht *IpfsDHT) loadProvidableKeys() error { func (dht *IpfsDHT) loadProvidableKeys() error {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论