提交 c22b6aa3 作者: Jeromy 提交者: Juan Batiz-Benet

fixing some race conditions

上级 41c124a2
...@@ -3,6 +3,7 @@ package peer ...@@ -3,6 +3,7 @@ package peer
import ( import (
"encoding/hex" "encoding/hex"
"time" "time"
"sync"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
...@@ -31,7 +32,9 @@ type Map map[u.Key]*Peer ...@@ -31,7 +32,9 @@ type Map map[u.Key]*Peer
type Peer struct { type Peer struct {
ID ID ID ID
Addresses []*ma.Multiaddr Addresses []*ma.Multiaddr
Distance time.Duration
distance time.Duration
distLock sync.RWMutex
} }
// Key returns the ID as a Key (string) for maps. // Key returns the ID as a Key (string) for maps.
...@@ -60,3 +63,13 @@ func (p *Peer) NetAddress(n string) *ma.Multiaddr { ...@@ -60,3 +63,13 @@ func (p *Peer) NetAddress(n string) *ma.Multiaddr {
} }
return nil return nil
} }
func (p *Peer) GetDistance() time.Duration {
return p.distance
}
func (p *Peer) SetDistance(dist time.Duration) {
p.distLock.Lock()
p.distance = dist
p.distLock.Unlock()
}
...@@ -186,6 +186,7 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -186,6 +186,7 @@ func (dht *IpfsDHT) handleMessages() {
case err := <-dht.network.Chan.Errors: case err := <-dht.network.Chan.Errors:
u.DErr("dht err: %s", err) u.DErr("dht err: %s", err)
panic(err)
case <-dht.shutdown: case <-dht.shutdown:
checkTimeouts.Stop() checkTimeouts.Stop()
return return
......
...@@ -48,6 +48,8 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { ...@@ -48,6 +48,8 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
} }
// GetValue searches for the value corresponding to given Key. // GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer var p *peer.Peer
p = s.routes.NearestPeer(convertKey(key)) p = s.routes.NearestPeer(convertKey(key))
...@@ -77,7 +79,11 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { ...@@ -77,7 +79,11 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
if err != nil { if err != nil {
return nil,err return nil,err
} }
return pmes_out.GetValue(), nil if pmes_out.GetSuccess() {
return pmes_out.GetValue(), nil
} else {
return pmes_out.GetValue(), u.ErrSearchIncomplete
}
} }
} }
...@@ -225,7 +231,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { ...@@ -225,7 +231,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
select { select {
case <-response_chan: case <-response_chan:
roundtrip := time.Since(before) roundtrip := time.Since(before)
p.Distance = roundtrip //TODO: This isnt threadsafe p.SetDistance(roundtrip)
u.POut("Ping took %s.", roundtrip.String()) u.POut("Ping took %s.", roundtrip.String())
return nil return nil
case <-tout: case <-tout:
......
...@@ -3,8 +3,10 @@ package dht ...@@ -3,8 +3,10 @@ package dht
import ( import (
"container/list" "container/list"
"sort" "sort"
"sync"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
) )
// RoutingTable defines the routing table. // RoutingTable defines the routing table.
...@@ -13,6 +15,9 @@ type RoutingTable struct { ...@@ -13,6 +15,9 @@ type RoutingTable struct {
// ID of the local peer // ID of the local peer
local ID local ID
// Blanket lock, refine later for better performance
tabLock sync.RWMutex
// kBuckets define all the fingers to other nodes. // kBuckets define all the fingers to other nodes.
Buckets []*Bucket Buckets []*Bucket
bucketsize int bucketsize int
...@@ -29,6 +34,8 @@ func NewRoutingTable(bucketsize int, local_id ID) *RoutingTable { ...@@ -29,6 +34,8 @@ func NewRoutingTable(bucketsize int, local_id ID) *RoutingTable {
// Update adds or moves the given peer to the front of its respective bucket // Update adds or moves the given peer to the front of its respective bucket
// If a peer gets removed from a bucket, it is returned // If a peer gets removed from a bucket, it is returned
func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peer_id := convertPeerID(p.ID) peer_id := convertPeerID(p.ID)
cpl := xor(peer_id, rt.local).commonPrefixLen() cpl := xor(peer_id, rt.local).commonPrefixLen()
...@@ -88,7 +95,11 @@ func (p peerSorterArr) Less(a, b int) bool { ...@@ -88,7 +95,11 @@ func (p peerSorterArr) Less(a, b int) bool {
// //
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() { if peerList == nil {
return peerSorterArr{}
}
e := peerList.Front()
for ; e != nil; {
p := e.Value.(*peer.Peer) p := e.Value.(*peer.Peer)
p_id := convertPeerID(p.ID) p_id := convertPeerID(p.ID)
pd := peerDistance{ pd := peerDistance{
...@@ -96,6 +107,11 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe ...@@ -96,6 +107,11 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe
distance: xor(target, p_id), distance: xor(target, p_id),
} }
peerArr = append(peerArr, &pd) peerArr = append(peerArr, &pd)
if e != nil {
u.POut("list element was nil.")
return peerArr
}
e = e.Next()
} }
return peerArr return peerArr
} }
...@@ -112,6 +128,8 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { ...@@ -112,6 +128,8 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
// Returns a list of the 'count' closest peers to the given ID // Returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
cpl := xor(id, rt.local).commonPrefixLen() cpl := xor(id, rt.local).commonPrefixLen()
// Get bucket at cpl index or last bucket // Get bucket at cpl index or last bucket
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论