提交 4a176d14 作者: Brian Tiger Chow

Merge pull request #570 from jbenet/feat/eventbegin

Feat/eventbegin
...@@ -120,12 +120,11 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -120,12 +120,11 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
log.Event(ctx, "GetBlockRequestBegin", &k) defer log.EventBegin(ctx, "GetBlockRequest", &k).Done()
log.Debugf("GetBlockRequestBegin") log.Debugf("GetBlockRequestBegin")
defer func() { defer func() {
cancelFunc() cancelFunc()
log.Event(ctx, "GetBlockRequestEnd", &k)
log.Debugf("GetBlockRequestEnd") log.Debugf("GetBlockRequestEnd")
}() }()
......
...@@ -81,7 +81,8 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e ...@@ -81,7 +81,8 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e
} }
log.Debugf("handshake: %s <--start--> %s", s.localPeer, s.remotePeer) log.Debugf("handshake: %s <--start--> %s", s.localPeer, s.remotePeer)
log.Event(ctx, "secureHandshakeStart", s.localPeer) defer log.EventBegin(ctx, "secureHandshake", s.localPeer).Done()
s.local.permanentPubKey = s.localKey.GetPublic() s.local.permanentPubKey = s.localKey.GetPublic()
myPubKeyBytes, err := s.local.permanentPubKey.Bytes() myPubKeyBytes, err := s.local.permanentPubKey.Bytes()
if err != nil { if err != nil {
......
...@@ -196,6 +196,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, ...@@ -196,6 +196,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
// getValueSingle simply performs the get value RPC with the given parameters // getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
key u.Key) (*pb.Message, error) { key u.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
...@@ -265,11 +266,15 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo { ...@@ -265,11 +266,15 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) { func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
} }
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) { func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
return dht.sendRequest(ctx, p, pmes) return dht.sendRequest(ctx, p, pmes)
} }
......
...@@ -39,6 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { ...@@ -39,6 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
} }
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handleGetValue", p).Done()
log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey()) log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
// setup response // setup response
...@@ -114,6 +115,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -114,6 +115,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Store a value in this peer local storage // Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handlePutValue", p).Done()
dskey := u.Key(pmes.GetKey()).DsKey() dskey := u.Key(pmes.GetKey()).DsKey()
if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil { if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil {
...@@ -137,6 +139,7 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( ...@@ -137,6 +139,7 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (
} }
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handleFindPeer", p).Done()
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []peer.ID var closest []peer.ID
...@@ -166,6 +169,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -166,6 +169,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
} }
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handleGetProviders", p).Done()
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
key := u.Key(pmes.GetKey()) key := u.Key(pmes.GetKey())
...@@ -211,6 +215,7 @@ type providerInfo struct { ...@@ -211,6 +215,7 @@ type providerInfo struct {
} }
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handleAddProvider", p).Done()
key := u.Key(pmes.GetKey()) key := u.Key(pmes.GetKey())
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
......
...@@ -122,10 +122,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ...@@ -122,10 +122,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// Provide makes this node announce that it can provide a value for the given key // Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
log := dht.log().Prefix("Provide(%s)", key) log := dht.log().Prefix("Provide(%s)", key)
log.Debugf("start", key)
log.Event(ctx, "provideBegin", &key) defer log.EventBegin(ctx, "provide", &key).Done()
defer log.Debugf("end", key)
defer log.Event(ctx, "provideEnd", &key)
// add self locally // add self locally
dht.providers.AddProvider(key, dht.self) dht.providers.AddProvider(key, dht.self)
...@@ -163,6 +161,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn ...@@ -163,6 +161,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers // Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key // to the given key
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) {
e := log.EventBegin(ctx, "getClosestPeers", &key)
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
if len(tablepeers) == 0 { if len(tablepeers) == 0 {
return nil, errors.Wrap(kb.ErrLookupFailure) return nil, errors.Wrap(kb.ErrLookupFailure)
...@@ -204,6 +203,7 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer ...@@ -204,6 +203,7 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer
go func() { go func() {
defer close(out) defer close(out)
defer e.Done()
// run it! // run it!
_, err := query.Run(ctx, tablepeers) _, err := query.Run(ctx, tablepeers)
if err != nil { if err != nil {
...@@ -242,10 +242,8 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int ...@@ -242,10 +242,8 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) {
log := dht.log().Prefix("FindProviders(%s)", key) log := dht.log().Prefix("FindProviders(%s)", key)
defer log.EventBegin(ctx, "findProvidersAsync", &key).Done()
defer close(peerOut) defer close(peerOut)
defer log.Event(ctx, "findProviders end", &key)
log.Debug("begin")
defer log.Debug("begin")
ps := pset.NewLimited(count) ps := pset.NewLimited(count)
provs := dht.providers.GetProviders(ctx, key) provs := dht.providers.GetProviders(ctx, key)
...@@ -314,6 +312,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co ...@@ -314,6 +312,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
defer log.EventBegin(ctx, "FindPeer", id).Done()
// Check if were already connected to them // Check if were already connected to them
if pi := dht.FindLocal(id); pi.ID != "" { if pi := dht.FindLocal(id); pi.ID != "" {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论