提交 0721a589 作者: Brian Tiger Chow

fix(gcr/s) proto marshaling bugs

上级 a88621ae
package grandcentral
import (
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
......@@ -48,36 +50,17 @@ func (s *Server) handleMessage(
switch req.GetType() {
case dhtpb.Message_GET_VALUE:
dskey := util.Key(req.GetKey()).DsKey()
val, err := s.datastore.Get(dskey)
rawRecord, err := getRoutingRecord(s.datastore, util.Key(req.GetKey()))
if err != nil {
log.Debug(errors.Wrap(err))
return "", nil
}
rawRecord, ok := val.([]byte)
if !ok {
log.Debugf("datastore had non byte-slice value for %v", dskey)
return "", nil
}
if err := proto.Unmarshal(rawRecord, response.Record); err != nil {
log.Debug("failed to unmarshal dht record from datastore")
return "", nil
}
response.Record = rawRecord
// TODO before merging: if we know any providers for the requested value, return those.
return p, response
case dhtpb.Message_PUT_VALUE:
// TODO before merging: verifyRecord(req.GetRecord())
data, err := proto.Marshal(req.GetRecord())
if err != nil {
log.Debug(err)
return "", nil
}
dskey := util.Key(req.GetKey()).DsKey()
if err := s.datastore.Put(dskey, data); err != nil {
log.Debug(err)
return "", nil
}
putRoutingRecord(s.datastore, util.Key(req.GetKey()), req.GetRecord())
return p, req // TODO before merging: verify that we should return record
case dhtpb.Message_FIND_NODE:
......@@ -92,51 +75,19 @@ func (s *Server) handleMessage(
return p.ID, response
case dhtpb.Message_ADD_PROVIDER:
for _, provider := range req.GetProviderPeers() {
providerID := peer.ID(provider.GetId())
if providerID != p {
log.Debugf("provider message came from third-party %s", p)
continue
}
for _, maddr := range provider.Addresses() {
// FIXME do we actually want to store to peerstore
s.peerstore.AddAddr(p, maddr, peer.TempAddrTTL)
}
}
var providers []dhtpb.Message_Peer
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", req.GetKey()})
if v, err := s.datastore.Get(pkey); err == nil {
if protopeers, ok := v.([]dhtpb.Message_Peer); ok {
providers = append(providers, protopeers...)
}
}
if err := s.datastore.Put(pkey, providers); err != nil {
log.Debug(err)
storeProvidersToPeerstore(s.peerstore, p, req.GetProviderPeers())
if err := putRoutingProviders(s.datastore, util.Key(req.GetKey()), req.GetProviderPeers()); err != nil {
return "", nil
}
return "", nil
case dhtpb.Message_GET_PROVIDERS:
dskey := util.Key(req.GetKey()).DsKey()
exists, err := s.datastore.Has(dskey)
if err == nil && exists {
pri := []dhtpb.PeerRoutingInfo{
dhtpb.PeerRoutingInfo{
// Connectedness: TODO how is connectedness defined for the local node
PeerInfo: peer.PeerInfo{ID: s.local},
},
}
response.ProviderPeers = append(response.ProviderPeers, dhtpb.PeerRoutingInfosToPBPeers(pri)...)
}
// FIXME(btc) is this how we want to persist this data?
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", req.GetKey()})
if v, err := s.datastore.Get(pkey); err == nil {
if protopeers, ok := v.([]dhtpb.Message_Peer); ok {
for _, p := range protopeers {
response.ProviderPeers = append(response.ProviderPeers, &p)
}
}
providers, err := getRoutingProviders(s.local, s.datastore, util.Key(req.GetKey()))
if err != nil {
return "", nil
}
response.ProviderPeers = providers
return p, response
case dhtpb.Message_PING:
......@@ -148,3 +99,94 @@ func (s *Server) handleMessage(
var _ proxy.RequestHandler = &Server{}
var _ proxy.Proxy = &Server{}
func getRoutingRecord(ds datastore.Datastore, k util.Key) (*dhtpb.Record, error) {
dskey := k.DsKey()
val, err := ds.Get(dskey)
if err != nil {
return nil, errors.Wrap(err)
}
recordBytes, ok := val.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
var record dhtpb.Record
if err := proto.Unmarshal(recordBytes, &record); err != nil {
return nil, errors.New("failed to unmarshal dht record from datastore")
}
return &record, nil
}
func putRoutingRecord(ds datastore.Datastore, k util.Key, value *dhtpb.Record) error {
data, err := proto.Marshal(value)
if err != nil {
return err
}
dskey := k.DsKey()
// TODO namespace
if err := ds.Put(dskey, data); err != nil {
return err
}
return nil
}
func putRoutingProviders(ds datastore.Datastore, k util.Key, providers []*dhtpb.Message_Peer) error {
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", k.String()})
if v, err := ds.Get(pkey); err == nil {
if msg, ok := v.([]byte); ok {
var protomsg dhtpb.Message
if err := proto.Unmarshal(msg, &protomsg); err != nil {
log.Error("failed to unmarshal routing provider record. programmer error")
} else {
providers = append(providers, protomsg.ProviderPeers...)
}
}
}
var protomsg dhtpb.Message
protomsg.ProviderPeers = providers
data, err := proto.Marshal(&protomsg)
if err != nil {
return err
}
return ds.Put(pkey, data)
}
func storeProvidersToPeerstore(ps peer.Peerstore, p peer.ID, providers []*dhtpb.Message_Peer) {
for _, provider := range providers {
providerID := peer.ID(provider.GetId())
if providerID != p {
log.Errorf("provider message came from third-party %s", p)
continue
}
for _, maddr := range provider.Addresses() {
// as a router, we want to store addresses for peers who have provided
ps.AddAddr(p, maddr, peer.AddressTTL)
}
}
}
func getRoutingProviders(local peer.ID, ds datastore.Datastore, k util.Key) ([]*dhtpb.Message_Peer, error) {
var providers []*dhtpb.Message_Peer
exists, err := ds.Has(k.DsKey()) // TODO store values in a local datastore?
if err == nil && exists {
pri := []dhtpb.PeerRoutingInfo{
dhtpb.PeerRoutingInfo{
// Connectedness: TODO how is connectedness defined for the local node
PeerInfo: peer.PeerInfo{ID: local},
},
}
providers = append(providers, dhtpb.PeerRoutingInfosToPBPeers(pri)...)
}
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", k.String()}) // TODO key fmt
if v, err := ds.Get(pkey); err == nil {
if data, ok := v.([]byte); ok {
var msg dhtpb.Message
if err := proto.Unmarshal(data, &msg); err != nil {
return nil, err
}
providers = append(providers, msg.GetProviderPeers()...)
}
}
return providers, nil
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论