提交 277743eb 作者: frrist

Add event logs to bitswap

上级 3f2c7746
...@@ -185,8 +185,14 @@ type blockRequest struct { ...@@ -185,8 +185,14 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context. // deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (b blocks.Block, err error) {
return getBlock(parent, k, bs.GetBlocks) eip := log.EventBegin(parent, "Bitswap.GetBlock", k)
defer func() {
eip.Append(b)
eip.DoneWithErr(err)
}()
b, err = getBlock(parent, k, bs.GetBlocks)
return b, err
} }
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid { func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
...@@ -220,11 +226,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block ...@@ -220,11 +226,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
return nil, errors.New("bitswap is closed") return nil, errors.New("bitswap is closed")
default: default:
} }
promise := bs.notifications.Subscribe(ctx, keys...) //to be completed when the below goroutine completes
ctx = log.EventBeginInContext(ctx, "Bitswap.GetBlocks")
for _, k := range keys { promise := bs.notifications.Subscribe(ctx, keys...)
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
mses := bs.getNextSessionID() mses := bs.getNextSessionID()
...@@ -240,6 +245,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block ...@@ -240,6 +245,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
remaining := cid.NewSet() remaining := cid.NewSet()
for _, k := range keys { for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlocks.Start", k)
remaining.Add(k) remaining.Add(k)
} }
...@@ -251,6 +257,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block ...@@ -251,6 +257,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
defer func() { defer func() {
// can't just defer this call on its own, arguments are resolved *when* the defer is created // can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys(), mses) bs.CancelWants(remaining.Keys(), mses)
logging.MaybeFinishEvent(ctx)
}() }()
for { for {
select { select {
...@@ -292,12 +299,26 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { ...@@ -292,12 +299,26 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
if len(cids) == 0 { if len(cids) == 0 {
return return
} }
defer log.EventBegin(bs.wm.ctx, "Bitswap.CancelWants", logging.LoggableF(func() map[string]interface{} {
var wants []string
for c := range cids {
wants = append(wants, cids[c].KeyString())
}
return logging.LoggableMap{
"Session": ses,
"Blocks": wants,
}
})).Done()
bs.wm.CancelWants(context.Background(), cids, nil, ses) bs.wm.CancelWants(context.Background(), cids, nil, ses)
} }
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error { func (bs *Bitswap) HasBlock(blk blocks.Block) (err error) {
eip := log.EventBegin(bs.wm.ctx, "Bitswap.HasBlock", blk)
defer func() {
eip.DoneWithErr(err)
}()
return bs.receiveBlockFrom(blk, "") return bs.receiveBlockFrom(blk, "")
} }
...@@ -359,6 +380,10 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { ...@@ -359,6 +380,10 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.counters.messagesRecvd, 1) atomic.AddUint64(&bs.counters.messagesRecvd, 1)
eip := log.EventBegin(bs.wm.ctx, "Bitswap.ReceiveMessage", p, incoming)
defer func() {
eip.Done()
}()
// This call records changes to wantlists, blocks received, // This call records changes to wantlists, blocks received,
// and number of bytes transfered. // and number of bytes transfered.
...@@ -420,12 +445,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { ...@@ -420,12 +445,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) { func (bs *Bitswap) PeerConnected(p peer.ID) {
defer log.EventBegin(bs.wm.ctx, "Bitswap.PeerConnected", p).Done()
bs.wm.Connected(p) bs.wm.Connected(p)
bs.engine.PeerConnected(p) bs.engine.PeerConnected(p)
} }
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) { func (bs *Bitswap) PeerDisconnected(p peer.ID) {
defer log.EventBegin(bs.wm.ctx, "Bitswap.PeerDisonnected", p).Done()
bs.wm.Disconnected(p) bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p) bs.engine.PeerDisconnected(p)
} }
......
...@@ -65,6 +65,13 @@ type Envelope struct { ...@@ -65,6 +65,13 @@ type Envelope struct {
Sent func() Sent func()
} }
func (e *Envelope) Loggable() map[string]interface{} {
return logging.LoggableMap{
"Peer": e.Peer.Pretty(),
"Block": e.Block.String(),
}
}
type Engine struct { type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers. // peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the // Requests are popped from the queue, packaged up, and placed in the
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
...@@ -79,6 +80,7 @@ func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Bl ...@@ -79,6 +80,7 @@ func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Bl
close(out) close(out)
// can't just defer this call on its own, arguments are resolved *when* the defer is created // can't just defer this call on its own, arguments are resolved *when* the defer is created
cfun(remaining.Keys()) cfun(remaining.Keys())
logging.MaybeFinishEvent(ctx)
}() }()
for { for {
select { select {
......
...@@ -6,9 +6,12 @@ import ( ...@@ -6,9 +6,12 @@ import (
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub" pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
) )
var log = logging.Logger("bitswap/notifications")
const bufferSize = 16 const bufferSize = 16
type PubSub interface { type PubSub interface {
...@@ -37,17 +40,22 @@ func (ps *impl) Shutdown() { ...@@ -37,17 +40,22 @@ func (ps *impl) Shutdown() {
// is closed if the |ctx| times out or is cancelled, or after sending len(keys) // is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks. // blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block { func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block, len(keys)) blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 { if len(keys) == 0 {
close(blocksCh) close(blocksCh)
return blocksCh return blocksCh
} }
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
sKeys := toStrings(keys)
ctx = log.EventBeginInContext(ctx, "Bitswap.Subscribe", logging.LoggableMap{"Keys": sKeys})
ps.wrapped.AddSubOnceEach(valuesCh, sKeys...)
go func() { go func() {
defer close(blocksCh) defer func() {
defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization close(blocksCh)
ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
logging.MaybeFinishEvent(ctx)
}()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -302,11 +302,19 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { ...@@ -302,11 +302,19 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) {
// guaranteed on the returned blocks. // guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid) ctx = logging.ContextWithLoggable(ctx, s.uuid)
//TODO(frrist): This could probably be handled in a cleaner way, might require a slight refactor
ctx = log.EventBeginInContext(ctx, "Session.GetBlocks")
return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
} }
// GetBlock fetches a single block // GetBlock fetches a single block
func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (b blocks.Block, err error) {
eip := log.EventBegin(parent, "Session.GetBlock", k)
defer func() {
eip.DoneWithErr(err)
}()
return getBlock(parent, k, s.GetBlocks) return getBlock(parent, k, s.GetBlocks)
} }
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
peer "gx/ipfs/Qma7H6RW8wRrfZpNSXwxYGcd1E149s42FpWNpDNieSVrnU/go-libp2p-peer" peer "gx/ipfs/Qma7H6RW8wRrfZpNSXwxYGcd1E149s42FpWNpDNieSVrnU/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
) )
...@@ -78,12 +79,43 @@ type msgQueue struct { ...@@ -78,12 +79,43 @@ type msgQueue struct {
// WantBlocks adds the given cids to the wantlist, tracked by the given session // WantBlocks adds the given cids to the wantlist, tracked by the given session
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
defer log.EventBegin(ctx, "WantManager.WantBlocks", logging.LoggableF(func() map[string]interface{} {
var wants []string
for c := range ks {
wants = append(wants, ks[c].KeyString())
}
var targets []string
for p := range peers {
targets = append(targets, peers[p].Pretty())
}
return logging.LoggableMap{
"Session": ses,
"Blocks": wants,
"Targets": targets,
}
})).Done()
log.Infof("want blocks: %s", ks) log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, peers, false, ses) pm.addEntries(ctx, ks, peers, false, ses)
} }
// CancelWants removes the given cids from the wantlist, tracked by the given session // CancelWants removes the given cids from the wantlist, tracked by the given session
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
defer log.EventBegin(ctx, "WantManager.CancleWants", logging.LoggableF(func() map[string]interface{} {
var wants []string
for c := range ks {
wants = append(wants, ks[c].KeyString())
}
var targets []string
for p := range peers {
targets = append(targets, peers[p].Pretty())
}
return logging.LoggableMap{
"Session": ses,
"Blocks": wants,
"Targets": targets,
}
})).Done()
pm.addEntries(context.Background(), ks, peers, true, ses) pm.addEntries(context.Background(), ks, peers, true, ses)
} }
...@@ -117,7 +149,11 @@ func (pm *WantManager) ConnectedPeers() []peer.ID { ...@@ -117,7 +149,11 @@ func (pm *WantManager) ConnectedPeers() []peer.ID {
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure // Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack // throughout the network stack
defer env.Sent() eip := log.EventBegin(ctx, "WantManager.SendBlock", env)
defer func() {
env.Sent()
eip.Done()
}()
pm.sentHistogram.Observe(float64(len(env.Block.RawData()))) pm.sentHistogram.Observe(float64(len(env.Block.RawData())))
...@@ -273,6 +309,7 @@ func (mq *msgQueue) openSender(ctx context.Context) error { ...@@ -273,6 +309,7 @@ func (mq *msgQueue) openSender(ctx context.Context) error {
} }
func (pm *WantManager) Connected(p peer.ID) { func (pm *WantManager) Connected(p peer.ID) {
defer log.EventBegin(pm.ctx, "WantManager.Connected", p).Done()
select { select {
case pm.connectEvent <- peerStatus{peer: p, connect: true}: case pm.connectEvent <- peerStatus{peer: p, connect: true}:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
...@@ -280,6 +317,7 @@ func (pm *WantManager) Connected(p peer.ID) { ...@@ -280,6 +317,7 @@ func (pm *WantManager) Connected(p peer.ID) {
} }
func (pm *WantManager) Disconnected(p peer.ID) { func (pm *WantManager) Disconnected(p peer.ID) {
defer log.EventBegin(pm.ctx, "WantManager.Disconnected", p).Done()
select { select {
case pm.connectEvent <- peerStatus{peer: p, connect: false}: case pm.connectEvent <- peerStatus{peer: p, connect: false}:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论