提交 60cd0f1c 作者: Jeromy 提交者: Juan Batiz-Benet

some dht cleanup, and make DHTs take a master context

上级 f10b4bd8
...@@ -141,7 +141,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { ...@@ -141,7 +141,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
diagnostics = diag.NewDiagnostics(local, net, diagService) diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.SetHandler(diagnostics) diagService.SetHandler(diagnostics)
route = dht.NewDHT(local, peerstore, net, dhtService, d) route = dht.NewDHT(ctx, local, peerstore, net, dhtService, d)
// TODO(brian): perform this inside NewDHT factory method // TODO(brian): perform this inside NewDHT factory method
dhtService.SetHandler(route) // wire the handler to the service. dhtService.SetHandler(route) // wire the handler to the service.
......
...@@ -53,16 +53,19 @@ type IpfsDHT struct { ...@@ -53,16 +53,19 @@ type IpfsDHT struct {
//lock to make diagnostics work better //lock to make diagnostics work better
diaglock sync.Mutex diaglock sync.Mutex
ctx context.Context
} }
// NewDHT creates a new DHT object with the given peer as the 'local' host // NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT) dht := new(IpfsDHT)
dht.network = net dht.network = net
dht.sender = sender dht.sender = sender
dht.datastore = dstore dht.datastore = dstore
dht.self = p dht.self = p
dht.peerstore = ps dht.peerstore = ps
dht.ctx = ctx
dht.providers = NewProviderManager(p.ID) dht.providers = NewProviderManager(p.ID)
...@@ -71,6 +74,8 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende ...@@ -71,6 +74,8 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000) dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.birth = time.Now() dht.birth = time.Now()
go dht.PingRoutine(time.Second * 10)
return dht return dht
} }
...@@ -137,7 +142,6 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N ...@@ -137,7 +142,6 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
// get handler for this msg type. // get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType()) handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil { if handler == nil {
// TODO handle/log err
log.Error("got back nil handler from handlerForMsgType") log.Error("got back nil handler from handlerForMsgType")
return nil return nil
} }
...@@ -350,7 +354,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) { ...@@ -350,7 +354,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
byt, ok := v.([]byte) byt, ok := v.([]byte)
if !ok { if !ok {
return byt, errors.New("value stored in datastore not []byte") return nil, errors.New("value stored in datastore not []byte")
} }
return byt, nil return byt, nil
} }
...@@ -533,6 +537,27 @@ func (dht *IpfsDHT) loadProvidableKeys() error { ...@@ -533,6 +537,27 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
return nil return nil
} }
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
tick := time.Tick(t)
for {
select {
case <-tick:
id := make([]byte, 16)
rand.Read(id)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.ctx, time.Second*5)
err := dht.Ping(ctx, p)
if err != nil {
log.Error("Ping error: %s", err)
}
}
case <-dht.ctx.Done():
return
}
}
}
// Bootstrap builds up list of peers by requesting random peer IDs // Bootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap(ctx context.Context) { func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
id := make([]byte, 16) id := make([]byte, 16)
......
...@@ -2,6 +2,7 @@ package dht ...@@ -2,6 +2,7 @@ package dht
import ( import (
"encoding/json" "encoding/json"
"fmt"
"time" "time"
) )
...@@ -29,12 +30,16 @@ func (l *logDhtRPC) EndLog() { ...@@ -29,12 +30,16 @@ func (l *logDhtRPC) EndLog() {
func (l *logDhtRPC) Print() { func (l *logDhtRPC) Print() {
b, err := json.Marshal(l) b, err := json.Marshal(l)
if err != nil { if err != nil {
log.Debug(err.Error()) log.Debug("Error marshaling logDhtRPC object: %s", err)
} else { } else {
log.Debug(string(b)) log.Debug(string(b))
} }
} }
func (l *logDhtRPC) String() string {
return fmt.Sprintf("DHT RPC: %s took %s, success = %s", l.Type, l.Duration, l.Success)
}
func (l *logDhtRPC) EndAndPrint() { func (l *logDhtRPC) EndAndPrint() {
l.EndLog() l.EndLog()
l.Print() l.Print()
......
...@@ -5,9 +5,7 @@ import ( ...@@ -5,9 +5,7 @@ import (
"fmt" "fmt"
"time" "time"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
...@@ -32,8 +30,6 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { ...@@ -32,8 +30,6 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
return dht.handleGetProviders return dht.handleGetProviders
case Message_PING: case Message_PING:
return dht.handlePing return dht.handlePing
case Message_DIAGNOSTIC:
return dht.handleDiagnostic
default: default:
return nil return nil
} }
...@@ -211,53 +207,3 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er ...@@ -211,53 +207,3 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er
func (dht *IpfsDHT) Halt() { func (dht *IpfsDHT) Halt() {
dht.providers.Halt() dht.providers.Halt()
} }
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
for _, ps := range seq {
_, err := msg.FromObject(ps, pmes)
if err != nil {
log.Error("handleDiagnostics error creating message: %v\n", err)
continue
}
// dht.sender.SendRequest(context.TODO(), mes)
}
return nil, errors.New("not yet ported back")
// buf := new(bytes.Buffer)
// di := dht.getDiagInfo()
// buf.Write(di.Marshal())
//
// // NOTE: this shouldnt be a hardcoded value
// after := time.After(time.Second * 20)
// count := len(seq)
// for count > 0 {
// select {
// case <-after:
// //Timeout, return what we have
// goto out
// case reqResp := <-listenChan:
// pmesOut := new(Message)
// err := proto.Unmarshal(reqResp.Data, pmesOut)
// if err != nil {
// // It broke? eh, whatever, keep going
// continue
// }
// buf.Write(reqResp.Data)
// count--
// }
// }
//
// out:
// resp := Message{
// Type: Message_DIAGNOSTIC,
// ID: pmes.GetId(),
// Value: buf.Bytes(),
// Response: true,
// }
//
// mes := swarm.NewMessage(p, resp.ToProtobuf())
// dht.netChan.Outgoing <- mes
}
// Code generated by protoc-gen-gogo. // Code generated by protoc-gen-go.
// source: messages.proto // source: messages.proto
// DO NOT EDIT! // DO NOT EDIT!
...@@ -13,13 +13,11 @@ It has these top-level messages: ...@@ -13,13 +13,11 @@ It has these top-level messages:
*/ */
package dht package dht
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" import proto "code.google.com/p/goprotobuf/proto"
import json "encoding/json"
import math "math" import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf var _ = math.Inf
type Message_MessageType int32 type Message_MessageType int32
...@@ -31,7 +29,6 @@ const ( ...@@ -31,7 +29,6 @@ const (
Message_GET_PROVIDERS Message_MessageType = 3 Message_GET_PROVIDERS Message_MessageType = 3
Message_FIND_NODE Message_MessageType = 4 Message_FIND_NODE Message_MessageType = 4
Message_PING Message_MessageType = 5 Message_PING Message_MessageType = 5
Message_DIAGNOSTIC Message_MessageType = 6
) )
var Message_MessageType_name = map[int32]string{ var Message_MessageType_name = map[int32]string{
...@@ -41,7 +38,6 @@ var Message_MessageType_name = map[int32]string{ ...@@ -41,7 +38,6 @@ var Message_MessageType_name = map[int32]string{
3: "GET_PROVIDERS", 3: "GET_PROVIDERS",
4: "FIND_NODE", 4: "FIND_NODE",
5: "PING", 5: "PING",
6: "DIAGNOSTIC",
} }
var Message_MessageType_value = map[string]int32{ var Message_MessageType_value = map[string]int32{
"PUT_VALUE": 0, "PUT_VALUE": 0,
...@@ -50,7 +46,6 @@ var Message_MessageType_value = map[string]int32{ ...@@ -50,7 +46,6 @@ var Message_MessageType_value = map[string]int32{
"GET_PROVIDERS": 3, "GET_PROVIDERS": 3,
"FIND_NODE": 4, "FIND_NODE": 4,
"PING": 5, "PING": 5,
"DIAGNOSTIC": 6,
} }
func (x Message_MessageType) Enum() *Message_MessageType { func (x Message_MessageType) Enum() *Message_MessageType {
...@@ -72,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error { ...@@ -72,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error {
type Message struct { type Message struct {
// defines what type of message it is. // defines what type of message it is.
Type *Message_MessageType `protobuf:"varint,1,req,name=type,enum=dht.Message_MessageType" json:"type,omitempty"` Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.Message_MessageType" json:"type,omitempty"`
// defines what coral cluster level this query/response belongs to. // defines what coral cluster level this query/response belongs to.
ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"` ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"`
// Used to specify the key associated with this message. // Used to specify the key associated with this message.
...@@ -137,8 +132,8 @@ func (m *Message) GetProviderPeers() []*Message_Peer { ...@@ -137,8 +132,8 @@ func (m *Message) GetProviderPeers() []*Message_Peer {
} }
type Message_Peer struct { type Message_Peer struct {
Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"` Addr *string `protobuf:"bytes,2,opt,name=addr" json:"addr,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
......
...@@ -10,16 +10,15 @@ message Message { ...@@ -10,16 +10,15 @@ message Message {
GET_PROVIDERS = 3; GET_PROVIDERS = 3;
FIND_NODE = 4; FIND_NODE = 4;
PING = 5; PING = 5;
DIAGNOSTIC = 6;
} }
message Peer { message Peer {
required string id = 1; optional string id = 1;
required string addr = 2; optional string addr = 2;
} }
// defines what type of message it is. // defines what type of message it is.
required MessageType type = 1; optional MessageType type = 1;
// defines what coral cluster level this query/response belongs to. // defines what coral cluster level this query/response belongs to.
optional int32 clusterLevelRaw = 10; optional int32 clusterLevelRaw = 10;
......
package dht package dht
import ( import (
"bytes"
"encoding/json"
"sync" "sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -62,6 +60,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ...@@ -62,6 +60,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
routeLevel := 0 routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize) closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 { if closest == nil || len(closest) == 0 {
log.Warning("Got no peers back from routing table!")
return nil, nil return nil, nil
} }
...@@ -282,33 +281,3 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error { ...@@ -282,33 +281,3 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error {
log.Info("ping %s end (err = %s)", p, err) log.Info("ping %s end (err = %s)", p, err)
return err return err
} }
func (dht *IpfsDHT) getDiagnostic(ctx context.Context) ([]*diagInfo, error) {
log.Info("Begin Diagnostic")
peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
var out []*diagInfo
query := newQuery(dht.self.Key(), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
pmes := newMessage(Message_DIAGNOSTIC, "", 0)
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
return nil, err
}
dec := json.NewDecoder(bytes.NewBuffer(rpmes.GetValue()))
for {
di := new(diagInfo)
err := dec.Decode(di)
if err != nil {
break
}
out = append(out, di)
}
return &dhtQueryResult{success: true}, nil
})
_, err := query.Run(ctx, peers)
return out, err
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论