提交 217611c2 作者: Brian Tiger Chow

use raw primitives

上级 589ed37b
...@@ -10,19 +10,27 @@ import ( ...@@ -10,19 +10,27 @@ import (
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random" datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
random "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice" blockservice "github.com/jbenet/go-ipfs/blockservice"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
importer "github.com/jbenet/go-ipfs/importer" importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk" chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag" merkledag "github.com/jbenet/go-ipfs/merkledag"
net "github.com/jbenet/go-ipfs/net"
mocknet "github.com/jbenet/go-ipfs/net/mock" mocknet "github.com/jbenet/go-ipfs/net/mock"
path "github.com/jbenet/go-ipfs/path" path "github.com/jbenet/go-ipfs/path"
mockrouting "github.com/jbenet/go-ipfs/routing/mock" peer "github.com/jbenet/go-ipfs/peer"
dht "github.com/jbenet/go-ipfs/routing/dht"
uio "github.com/jbenet/go-ipfs/unixfs/io" uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util" util "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/datastore2"
errors "github.com/jbenet/go-ipfs/util/debugerror" errors "github.com/jbenet/go-ipfs/util/debugerror"
delay "github.com/jbenet/go-ipfs/util/delay"
) )
const kSeed = 1 const kSeed = 1
...@@ -34,7 +42,7 @@ func Test1KBInstantaneous(t *testing.T) { ...@@ -34,7 +42,7 @@ func Test1KBInstantaneous(t *testing.T) {
BlockstoreLatency: 0, BlockstoreLatency: 0,
} }
if err := AddCatBytes(RandomBytes(1*KB), conf); err != nil { if err := AddCatBytes(RandomBytes(100*MB), conf); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
...@@ -88,55 +96,83 @@ func RandomBytes(n int64) []byte { ...@@ -88,55 +96,83 @@ func RandomBytes(n int64) []byte {
return data.Bytes() return data.Bytes()
} }
type instance struct {
ID peer.ID
Network net.Network
Blockstore blockstore.Blockstore
Datastore datastore.ThreadSafeDatastore
DHT *dht.IpfsDHT
Exchange exchange.Interface
BitSwapNetwork bsnet.BitSwapNetwork
datastoreDelay delay.D
}
func AddCatBytes(data []byte, conf Config) error { func AddCatBytes(data []byte, conf Config) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
mn := mocknet.New(ctx) const numPeers = 2
// defer mn.Close() FIXME does mocknet require clean-up instances := make(map[peer.ID]*instance, numPeers)
// create network
mn, err := mocknet.FullMeshLinked(ctx, numPeers)
if err != nil {
return errors.Wrap(err)
}
mn.SetLinkDefaults(mocknet.LinkOptions{ mn.SetLinkDefaults(mocknet.LinkOptions{
Latency: conf.NetworkLatency, Latency: conf.NetworkLatency,
// TODO add to conf. This is tricky because we want 0 values to be functional. // TODO add to conf. This is tricky because we want 0 values to be functional.
Bandwidth: math.MaxInt32, Bandwidth: math.MaxInt32,
}) })
dhtNetwork := mockrouting.NewDHTNetwork(mn) for _, p := range mn.Peers() {
net, err := tn.StreamNet(ctx, mn, dhtNetwork) instances[p] = &instance{
if err != nil { ID: p,
return errors.Wrap(err) Network: mn.Net(p),
}
}
// create dht network
for _, p := range mn.Peers() {
dsDelay := delay.Fixed(conf.BlockstoreLatency)
instances[p].Datastore = sync.MutexWrap(datastore2.WithDelay(datastore.NewMapDatastore(), dsDelay))
instances[p].datastoreDelay = dsDelay
}
for _, p := range mn.Peers() {
instances[p].DHT = dht.NewDHT(ctx, p, instances[p].Network, instances[p].Datastore)
} }
sessionGenerator := bitswap.NewSessionGenerator(net) // create two bitswap network clients
defer sessionGenerator.Close() for _, p := range mn.Peers() {
instances[p].BitSwapNetwork = bsnet.NewFromIpfsNetwork(instances[p].Network, instances[p].DHT)
adder := sessionGenerator.Next() }
catter := sessionGenerator.Next() for _, p := range mn.Peers() {
// catter.Routing.Update(context.TODO(), adder.Peer) const kWriteCacheElems = 100
const alwaysSendToPeer = true
peers := mn.Peers() adapter := instances[p].BitSwapNetwork
if len(peers) != 2 { dstore := instances[p].Datastore
return errors.New("peers not in network") instances[p].Blockstore, err = blockstore.WriteCached(blockstore.NewBlockstore(dstore), kWriteCacheElems)
} if err != nil {
return err
for _, i := range peers {
for _, j := range peers {
if i == j {
continue
}
if _, err := mn.LinkPeers(i, j); err != nil {
return err
}
if err := mn.ConnectPeers(i, j); err != nil {
return err
}
} }
instances[p].Exchange = bitswap.New(ctx, p, adapter, instances[p].Blockstore, alwaysSendToPeer)
} }
var peers []peer.ID
for _, p := range mn.Peers() {
peers = append(peers, p)
}
adder := instances[peers[0]]
catter := instances[peers[1]]
catter.SetBlockstoreLatency(conf.BlockstoreLatency) // bootstrap the DHTs
adder.DHT.Connect(ctx, catter.ID)
catter.DHT.Connect(ctx, adder.ID)
adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation adder.datastoreDelay.Set(0) // disable blockstore latency during add operation
keyAdded, err := add(adder, bytes.NewReader(data)) keyAdded, err := add(adder, bytes.NewReader(data))
if err != nil { if err != nil {
return err return err
} }
adder.SetBlockstoreLatency(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait adder.datastoreDelay.Set(conf.BlockstoreLatency) // add some blockstore delay to make the catter wait
readerCatted, err := cat(catter, keyAdded) readerCatted, err := cat(catter, keyAdded)
if err != nil { if err != nil {
...@@ -152,8 +188,8 @@ func AddCatBytes(data []byte, conf Config) error { ...@@ -152,8 +188,8 @@ func AddCatBytes(data []byte, conf Config) error {
return nil return nil
} }
func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) { func cat(catter *instance, k util.Key) (io.Reader, error) {
catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore(), catter.Exchange}) catterdag := merkledag.NewDAGService(&blockservice.BlockService{catter.Blockstore, catter.Exchange})
nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String()) nodeCatted, err := (&path.Resolver{catterdag}).ResolvePath(k.String())
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -161,10 +197,10 @@ func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) { ...@@ -161,10 +197,10 @@ func cat(catter bitswap.Instance, k util.Key) (io.Reader, error) {
return uio.NewDagReader(nodeCatted, catterdag) return uio.NewDagReader(nodeCatted, catterdag)
} }
func add(adder bitswap.Instance, r io.Reader) (util.Key, error) { func add(adder *instance, r io.Reader) (util.Key, error) {
nodeAdded, err := importer.BuildDagFromReader( nodeAdded, err := importer.BuildDagFromReader(
r, r,
merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore(), adder.Exchange}), merkledag.NewDAGService(&blockservice.BlockService{adder.Blockstore, adder.Exchange}),
nil, nil,
chunk.DefaultSplitter, chunk.DefaultSplitter,
) )
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论