提交 32a68c6f 作者: Juan Batiz-Benet

Merge pull request #686 from jbenet/exchange-with-connected

return connected peers as providers
......@@ -85,6 +85,9 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
return ioutil.NopCloser(nil), nil
}
// make a signal to wait for one bootstrap round to complete.
doneWithRound := make(chan struct{})
// the periodic bootstrap function -- the connection supervisor
periodic := func(worker goprocess.Process) {
ctx := procctx.WithProcessClosing(context.Background(), worker)
......@@ -94,6 +97,8 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Debugf("%s bootstrap error: %s", n.Identity, err)
}
<-doneWithRound
}
// kick off the node's periodic bootstrapping
......@@ -109,6 +114,8 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
// add dht bootstrap proc as a child, so it is closed automatically when we are.
proc.AddChild(dbproc)
doneWithRound <- struct{}{}
close(doneWithRound) // it no longer blocks periodic
return proc, nil
}
......
......@@ -36,23 +36,6 @@ func TestClose(t *testing.T) {
bitswap.Exchange.GetBlock(context.Background(), block.Key())
}
func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
g := NewTestSessionGenerator(net)
defer g.Close()
self := g.Next()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
block := blocks.NewBlock([]byte("block"))
_, err := self.Exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
}
}
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
......@@ -244,6 +227,7 @@ func TestSendToWantingPeer(t *testing.T) {
func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")
......
......@@ -99,7 +99,19 @@ func (bsnet *impl) SetDelegate(r Receiver) {
// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
out := make(chan peer.ID)
// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
// precisely tracking which peers provide certain keys. This optimization
// would be misleading. In the long run, this may not be the most
// appropriate place for this optimization, but it won't cause any harm in
// the short term.
connectedPeers := bsnet.host.Network().Peers()
out := make(chan peer.ID, len(connectedPeers)) // just enough buffer for these connectedPeers
for _, id := range connectedPeers {
out <- id
}
go func() {
defer close(out)
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
......
......@@ -6,6 +6,7 @@ import (
"time"
peer "github.com/jbenet/go-ipfs/p2p/peer"
ci "github.com/jbenet/go-ipfs/util/testutil/ci"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
......@@ -45,10 +46,14 @@ func TestSimultOpen(t *testing.T) {
func TestSimultOpenMany(t *testing.T) {
// t.Skip("very very slow")
t.Parallel()
addrs := 20
SubtestSwarm(t, addrs, 10)
rounds := 10
if ci.IsRunning() {
addrs = 10
rounds = 5
}
SubtestSwarm(t, addrs, rounds)
}
func TestSimultOpenFewStress(t *testing.T) {
......
package epictest
import (
"bytes"
"testing"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/core"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestBitswapWithoutRouting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const numPeers = 4
// create network
mn, err := mocknet.FullMeshLinked(ctx, numPeers)
if err != nil {
t.Fatal(errors.Wrap(err))
}
peers := mn.Peers()
if len(peers) < numPeers {
t.Fatal(errors.New("test initialization error"))
}
// set the routing latency to infinity.
conf := testutil.LatencyConfig{RoutingLatency: (525600 * time.Minute)}
var nodes []*core.IpfsNode
for _, p := range peers {
n, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(p, mn.Host(p), conf)))
if err != nil {
t.Fatal(err)
}
defer n.Close()
nodes = append(nodes, n)
}
// connect them
for _, n1 := range nodes {
for _, n2 := range nodes {
if n1 == n2 {
continue
}
log.Debug("connecting to other hosts")
p2 := n2.PeerHost.Peerstore().PeerInfo(n2.PeerHost.ID())
if err := n1.PeerHost.Connect(ctx, p2); err != nil {
t.Fatal(err)
}
}
}
// add blocks to each before
log.Debug("adding block.")
block0 := blocks.NewBlock([]byte("block0"))
block1 := blocks.NewBlock([]byte("block1"))
// put 1 before
if err := nodes[0].Blockstore.Put(block0); err != nil {
t.Fatal(err)
}
// get it out.
for i, n := range nodes {
// skip first because block not in its exchange. will hang.
if i == 0 {
continue
}
log.Debugf("%d %s get block.", i, n.Identity)
b, err := n.Exchange.GetBlock(ctx, block0.Key())
if err != nil {
t.Error(err)
} else if !bytes.Equal(b.Data, block0.Data) {
t.Error("byte comparison fail")
} else {
log.Debug("got block: %s", b.Key())
}
}
// put 1 after
if err := nodes[1].Blockstore.Put(block1); err != nil {
t.Fatal(err)
}
// get it out.
for _, n := range nodes {
b, err := n.Exchange.GetBlock(ctx, block1.Key())
if err != nil {
t.Error(err)
} else if !bytes.Equal(b.Data, block1.Data) {
t.Error("byte comparison fail")
} else {
log.Debug("got block: %s", b.Key())
}
}
}
......@@ -5,6 +5,7 @@ import (
"io"
"math"
"testing"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
core "github.com/jbenet/go-ipfs/core"
......@@ -16,7 +17,7 @@ import (
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestThreeLeggedCat(t *testing.T) {
func TestThreeLeggedCat1KBInstantaneous(t *testing.T) {
conf := testutil.LatencyConfig{
NetworkLatency: 0,
RoutingLatency: 0,
......@@ -27,6 +28,38 @@ func TestThreeLeggedCat(t *testing.T) {
}
}
func TestThreeLeggedCatDegenerateSlowBlockstore(t *testing.T) {
SkipUnlessEpic(t)
conf := testutil.LatencyConfig{BlockstoreLatency: 50 * time.Millisecond}
if err := RunThreeLeggedCat(RandomBytes(1*unit.KB), conf); err != nil {
t.Fatal(err)
}
}
func TestThreeLeggedCatDegenerateSlowNetwork(t *testing.T) {
SkipUnlessEpic(t)
conf := testutil.LatencyConfig{NetworkLatency: 400 * time.Millisecond}
if err := RunThreeLeggedCat(RandomBytes(1*unit.KB), conf); err != nil {
t.Fatal(err)
}
}
func TestThreeLeggedCatDegenerateSlowRouting(t *testing.T) {
SkipUnlessEpic(t)
conf := testutil.LatencyConfig{RoutingLatency: 400 * time.Millisecond}
if err := RunThreeLeggedCat(RandomBytes(1*unit.KB), conf); err != nil {
t.Fatal(err)
}
}
func TestThreeLeggedCat100MBMacbookCoastToCoast(t *testing.T) {
SkipUnlessEpic(t)
conf := testutil.LatencyConfig{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
if err := RunThreeLeggedCat(RandomBytes(1*unit.KB), conf); err != nil {
t.Fatal(err)
}
}
func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -47,6 +80,11 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
if len(peers) < numPeers {
return errors.New("test initialization error")
}
bootstrap, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[2], mn.Host(peers[2]), conf)))
if err != nil {
return err
}
defer bootstrap.Close()
adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf)))
if err != nil {
return err
......@@ -57,11 +95,6 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
return err
}
defer catter.Close()
bootstrap, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[2], mn.Host(peers[2]), conf)))
if err != nil {
return err
}
defer bootstrap.Close()
bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis})
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论