提交 9c6228d1 作者: Juan Batiz-Benet

bitswap and dht: lots of debugging logs

上级 bb8886f3
......@@ -108,6 +108,7 @@ type bitswap struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
log := log.Prefix("bitswap(%s).GetBlock(%s)", bs.self, k)
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
......@@ -120,10 +121,12 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
log.Event(ctx, "GetBlockRequestBegin", &k)
log.Debugf("GetBlockRequestBegin")
defer func() {
cancelFunc()
log.Event(ctx, "GetBlockRequestEnd", &k)
log.Debugf("GetBlockRequestEnd")
}()
promise, err := bs.GetBlocks(ctx, []u.Key{k})
......@@ -263,12 +266,16 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
}
func (bs *bitswap) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap(%s).taskWorker", bs.self)
for {
select {
case <-ctx.Done():
log.Debugf("exiting")
return
case envelope := <-bs.engine.Outbox():
log.Debugf("message to %s sending...", envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
log.Debugf("message to %s sent", envelope.Peer)
}
}
}
......
......@@ -91,6 +91,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
}
func (e *Engine) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap.Engine.taskWorker")
for {
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
......@@ -98,11 +99,16 @@ func (e *Engine) taskWorker(ctx context.Context) {
// Wait until there are!
select {
case <-ctx.Done():
log.Debugf("exiting: %s", ctx.Err())
return
case <-e.workSignal:
log.Debugf("woken up")
}
continue
}
log := log.Prefix("%s", nextTask)
log.Debugf("processing")
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
log.Warning("engine: task exists to send block, but block is not in blockstore")
......@@ -113,10 +119,12 @@ func (e *Engine) taskWorker(ctx context.Context) {
m := bsmsg.New()
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
log.Debugf("sending...")
select {
case <-ctx.Done():
return
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
log.Debugf("sent")
}
}
}
......@@ -140,7 +148,7 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
log := log.Prefix("Engine.MessageReceived(%s)", p)
log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p)
log.Debugf("enter")
defer log.Debugf("exit")
......
package decision
import (
"fmt"
"sync"
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
......@@ -30,6 +31,10 @@ type task struct {
Trash bool
}
func (t *task) String() string {
return fmt.Sprintf("<Task %s, %s, %v>", t.Target, t.Entry.Key, t.Trash)
}
// Push currently adds a new task to the end of the list
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock()
......
......@@ -2,15 +2,17 @@ package network
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
util "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = util.Logger("bitswap_network")
var log = eventlog.Logger("bitswap_network")
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
......@@ -41,13 +43,23 @@ func (bsnet *impl) SendMessage(
p peer.ID,
outgoing bsmsg.BitSwapMessage) error {
log := log.Prefix("bitswap net SendMessage to %s", p)
log.Debug("opening stream")
s, err := bsnet.host.NewStream(ProtocolBitswap, p)
if err != nil {
return err
}
defer s.Close()
return outgoing.ToNet(s)
log.Debug("sending")
if err := outgoing.ToNet(s); err != nil {
log.Errorf("error: %s", err)
return err
}
log.Debug("sent")
return err
}
func (bsnet *impl) SendRequest(
......@@ -55,18 +67,30 @@ func (bsnet *impl) SendRequest(
p peer.ID,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
log.Debugf("bsnet SendRequest to %s", p)
log := log.Prefix("bitswap net SendRequest to %s", p)
log.Debug("opening stream")
s, err := bsnet.host.NewStream(ProtocolBitswap, p)
if err != nil {
return nil, err
}
defer s.Close()
log.Debug("sending")
if err := outgoing.ToNet(s); err != nil {
log.Errorf("error: %s", err)
return nil, err
}
return bsmsg.FromNet(s)
log.Debug("sent, now receiveing")
incoming, err := bsmsg.FromNet(s)
if err != nil {
log.Errorf("error: %s", err)
return incoming, err
}
log.Debug("received")
return incoming, nil
}
func (bsnet *impl) SetDelegate(r Receiver) {
......@@ -106,11 +130,12 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
received, err := bsmsg.FromNet(s)
if err != nil {
go bsnet.receiver.ReceiveError(err)
log.Errorf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
return
}
p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("bsnet handleNewStream from %s", s.Conn().RemotePeer())
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received)
}
......@@ -148,7 +148,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
}
if closest == nil {
log.Warningf("handleFindPeer: could not find anything.")
log.Warningf("%s handleFindPeer %s: could not find anything.", dht.self, p)
return resp, nil
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论