提交 87739b3a 作者: Jeromy Johnson 提交者: Juan Batiz-Benet

a little more work on message handling stuff

上级 3444d41d
package dht
import (
"sync"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
"code.google.com/p/goprotobuf/proto"
"sync"
)
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js
......@@ -16,6 +18,9 @@ type IpfsDHT struct {
network *swarm.Swarm
// local data (TEMPORARY: until we formalize data storage with datastore)
data map[string][]byte
// map of channels waiting for reply messages
listeners map[uint64]chan *swarm.Message
listenLock sync.RWMutex
......@@ -38,22 +43,28 @@ func (dht *IpfsDHT) handleMessages() {
}
// Note: not sure if this is the correct place for this
dht.listenLock.RLock()
ch, ok := dht.listeners[pmes.GetId()]
dht.listenLock.RUnlock()
if ok {
ch <- mes
if pmes.GetResponse() {
dht.listenLock.RLock()
ch, ok := dht.listeners[pmes.GetId()]
dht.listenLock.RUnlock()
if ok {
ch <- mes
}
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
continue
}
//
// Do something else with the messages?
switch pmes.GetType() {
case DHTMessage_ADD_PROVIDER:
case DHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
case DHTMessage_PUT_VALUE:
case DHTMessage_FIND_NODE:
case DHTMessage_ADD_PROVIDER:
case DHTMessage_GET_PROVIDERS:
case DHTMessage_GET_VALUE:
case DHTMessage_PING:
case DHTMessage_PUT_VALUE:
}
case <-dht.shutdown:
......@@ -62,6 +73,44 @@ func (dht *IpfsDHT) handleMessages() {
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
val, found := dht.data[pmes.GetKey()]
if found {
isResponse := true
resp := new(DHTMessage)
resp.Response = &isResponse
resp.Id = pmes.Id
resp.Key = pmes.Key
resp.Value = val
} else {
// Find closest node(s) to desired key and reply with that info
// TODO: this will need some other metadata in the protobuf message
// to signal to the querying node that the data its receiving
// is actually a list of other nodes
}
}
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
isResponse := true
resp := new(DHTMessage)
resp.Id = pmes.Id
resp.Response = &isResponse
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
dht.network.Chan.Outgoing <- mes
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
......@@ -71,3 +120,9 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
dht.listenLock.Unlock()
return lchan
}
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
}
......@@ -66,11 +66,14 @@ func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error {
}
type DHTMessage struct {
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
XXX_unrecognized []byte `json:"-"`
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
// Unique ID of this message, used to match queries with responses
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
// Signals whether or not this message is a response to another message
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DHTMessage) Reset() { *m = DHTMessage{} }
......@@ -105,6 +108,13 @@ func (m *DHTMessage) GetId() uint64 {
return 0
}
func (m *DHTMessage) GetResponse() bool {
if m != nil && m.Response != nil {
return *m.Response
}
return false
}
func init() {
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
}
......@@ -15,5 +15,10 @@ message DHTMessage {
required MessageType type = 1;
optional string key = 2;
optional bytes value = 3;
// Unique ID of this message, used to match queries with responses
required uint64 id = 4;
// Signals whether or not this message is a response to another message
optional bool response = 5;
}
package dht
import (
"math/rand"
"time"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
"time"
)
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
return 4
return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32())
}
// This file implements the Routing interface for the IpfsDHT struct.
......@@ -68,9 +70,6 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
case resp := <-response_chan:
return resp.Data, nil
}
// Should never be hit
return nil, nil
}
// Value provider layer of indirection.
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论