提交 d5331e7d 作者: Brian Tiger Chow

feat(gcr/s) add eventlogs

上级 f17ede83
...@@ -36,6 +36,7 @@ func NewClient(px proxy.Proxy, h host.Host, ps peer.Peerstore, local peer.ID) (* ...@@ -36,6 +36,7 @@ func NewClient(px proxy.Proxy, h host.Host, ps peer.Peerstore, local peer.ID) (*
} }
func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo { func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo {
defer log.EventBegin(ctx, "findProviders", &k).Done()
ch := make(chan peer.PeerInfo) ch := make(chan peer.PeerInfo)
go func() { go func() {
defer close(ch) defer close(ch)
...@@ -58,6 +59,7 @@ func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha ...@@ -58,6 +59,7 @@ func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha
} }
func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error { func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
defer log.EventBegin(ctx, "putValue", &k).Done()
r, err := makeRecord(c.peerstore, c.local, k, v) r, err := makeRecord(c.peerstore, c.local, k, v)
if err != nil { if err != nil {
return err return err
...@@ -68,6 +70,7 @@ func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error { ...@@ -68,6 +70,7 @@ func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error {
} }
func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) { func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
defer log.EventBegin(ctx, "getValue", &k).Done()
msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0) msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil { if err != nil {
...@@ -77,6 +80,7 @@ func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) { ...@@ -77,6 +80,7 @@ func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) {
} }
func (c *Client) Provide(ctx context.Context, k u.Key) error { func (c *Client) Provide(ctx context.Context, k u.Key) error {
defer log.EventBegin(ctx, "provide", &k).Done()
msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0) msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
// FIXME how is connectedness defined for the local node // FIXME how is connectedness defined for the local node
pri := []pb.PeerRoutingInfo{ pri := []pb.PeerRoutingInfo{
...@@ -92,6 +96,7 @@ func (c *Client) Provide(ctx context.Context, k u.Key) error { ...@@ -92,6 +96,7 @@ func (c *Client) Provide(ctx context.Context, k u.Key) error {
} }
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
defer log.EventBegin(ctx, "findPeer", id).Done()
request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
response, err := c.proxy.SendRequest(ctx, request) // hide remote response, err := c.proxy.SendRequest(ctx, request) // hide remote
if err != nil { if err != nil {
...@@ -121,6 +126,7 @@ func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, er ...@@ -121,6 +126,7 @@ func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, er
} }
func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) { func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
defer log.EventBegin(ctx, "ping", id).Done()
return time.Nanosecond, errors.New("grandcentral routing does not support the ping method") return time.Nanosecond, errors.New("grandcentral routing does not support the ping method")
} }
......
...@@ -13,10 +13,10 @@ import ( ...@@ -13,10 +13,10 @@ import (
errors "github.com/jbenet/go-ipfs/util/debugerror" errors "github.com/jbenet/go-ipfs/util/debugerror"
) )
var log = eventlog.Logger("proxy")
const ProtocolGCR = "/ipfs/grandcentral" const ProtocolGCR = "/ipfs/grandcentral"
var log = eventlog.Logger("grandcentral/proxy")
type Proxy interface { type Proxy interface {
HandleStream(inet.Stream) HandleStream(inet.Stream)
SendMessage(ctx context.Context, m *dhtpb.Message) error SendMessage(ctx context.Context, m *dhtpb.Message) error
...@@ -48,8 +48,15 @@ func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error { ...@@ -48,8 +48,15 @@ func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
return err // NB: returns the last error return err // NB: returns the last error
} }
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) error { func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil { e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
defer func() {
if err != nil {
e.SetError(err)
}
e.Done()
}()
if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
return err return err
} }
s, err := px.Host.NewStream(ProtocolGCR, remote) s, err := px.Host.NewStream(ProtocolGCR, remote)
...@@ -78,8 +85,15 @@ func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.M ...@@ -78,8 +85,15 @@ func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.M
return nil, err // NB: returns the last error return nil, err // NB: returns the last error
} }
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) { func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (_ *dhtpb.Message, err error) {
if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil { e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, m)
defer func() {
if err != nil {
e.SetError(err)
}
e.Done()
}()
if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil {
return nil, err return nil, err
} }
s, err := px.Host.NewStream(ProtocolGCR, remote) s, err := px.Host.NewStream(ProtocolGCR, remote)
...@@ -89,12 +103,12 @@ func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote pe ...@@ -89,12 +103,12 @@ func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote pe
defer s.Close() defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s) w := ggio.NewDelimitedWriter(s)
if err := w.WriteMsg(m); err != nil { if err = w.WriteMsg(m); err != nil {
return nil, err return nil, err
} }
var reply dhtpb.Message var reply dhtpb.Message
if err := r.ReadMsg(&reply); err != nil { if err = r.ReadMsg(&reply); err != nil {
return nil, err return nil, err
} }
// need ctx expiration? // need ctx expiration?
......
...@@ -42,6 +42,8 @@ func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Messag ...@@ -42,6 +42,8 @@ func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Messag
func (s *Server) handleMessage( func (s *Server) handleMessage(
ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) { ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) {
log.EventBegin(ctx, "routingMessageReceived", req, p, s.local).Done() // TODO may need to differentiate between local and remote
// FIXME threw everything into this switch statement to get things going. // FIXME threw everything into this switch statement to get things going.
// Once each operation is well-defined, extract pluggable backend so any // Once each operation is well-defined, extract pluggable backend so any
// database may be used. // database may be used.
...@@ -131,6 +133,7 @@ func putRoutingRecord(ds datastore.Datastore, k util.Key, value *dhtpb.Record) e ...@@ -131,6 +133,7 @@ func putRoutingRecord(ds datastore.Datastore, k util.Key, value *dhtpb.Record) e
} }
func putRoutingProviders(ds datastore.Datastore, k util.Key, providers []*dhtpb.Message_Peer) error { func putRoutingProviders(ds datastore.Datastore, k util.Key, providers []*dhtpb.Message_Peer) error {
log.Event(context.Background(), "putRoutingProviders", &k)
pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", k.String()}) pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", k.String()})
if v, err := ds.Get(pkey); err == nil { if v, err := ds.Get(pkey); err == nil {
if msg, ok := v.([]byte); ok { if msg, ok := v.([]byte); ok {
...@@ -166,6 +169,7 @@ func storeProvidersToPeerstore(ps peer.Peerstore, p peer.ID, providers []*dhtpb. ...@@ -166,6 +169,7 @@ func storeProvidersToPeerstore(ps peer.Peerstore, p peer.ID, providers []*dhtpb.
} }
func getRoutingProviders(local peer.ID, ds datastore.Datastore, k util.Key) ([]*dhtpb.Message_Peer, error) { func getRoutingProviders(local peer.ID, ds datastore.Datastore, k util.Key) ([]*dhtpb.Message_Peer, error) {
log.Event(context.Background(), "getProviders", local, &k)
var providers []*dhtpb.Message_Peer var providers []*dhtpb.Message_Peer
exists, err := ds.Has(k.DsKey()) // TODO store values in a local datastore? exists, err := ds.Has(k.DsKey()) // TODO store values in a local datastore?
if err == nil && exists { if err == nil && exists {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论