提交 678db4fa 作者: Jeromy

more work on bitswap and other code cleanup

上级 91e4675c
*.swp
.ipfsconfig
*.out
*.test
...@@ -62,6 +62,15 @@ Guidelines: ...@@ -62,6 +62,15 @@ Guidelines:
- if you'd like to work on ipfs part-time (20+ hrs/wk) or full-time (40+ hrs/wk), contact [@jbenet](https://github.com/jbenet) - if you'd like to work on ipfs part-time (20+ hrs/wk) or full-time (40+ hrs/wk), contact [@jbenet](https://github.com/jbenet)
- have fun! - have fun!
## Todo
Ipfs is still under heavy development, there is a lot to be done!
- [ ] Finish Bitswap
- [ ] Connect fuse interface to Blockservice
- [ ] Write tests for bitswap
- [ ] Come up with more TODO items
## Development Dependencies ## Development Dependencies
If you make changes to the protocol buffers, you will need to install the [protoc compiler](https://code.google.com/p/protobuf/downloads/list). If you make changes to the protocol buffers, you will need to install the [protoc compiler](https://code.google.com/p/protobuf/downloads/list).
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
swarm "github.com/jbenet/go-ipfs/swarm" swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
...@@ -36,7 +37,7 @@ type BitSwap struct { ...@@ -36,7 +37,7 @@ type BitSwap struct {
datastore ds.Datastore datastore ds.Datastore
// routing interface for communication // routing interface for communication
routing routing.IpfsRouting routing *dht.IpfsDHT
listener *swarm.MesListener listener *swarm.MesListener
...@@ -63,7 +64,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR ...@@ -63,7 +64,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
datastore: d, datastore: d,
partners: LedgerMap{}, partners: LedgerMap{},
wantList: KeySet{}, wantList: KeySet{},
routing: r, routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}), haltChan: make(chan struct{}),
} }
...@@ -76,32 +77,32 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR ...@@ -76,32 +77,32 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
*blocks.Block, error) { *blocks.Block, error) {
begin := time.Now() begin := time.Now()
provs, err := bs.routing.FindProviders(k, timeout)
if err != nil {
u.PErr("GetBlock error: %s\n", err)
return nil, err
}
tleft := timeout - time.Now().Sub(begin) tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
valchan := make(chan []byte) valchan := make(chan []byte)
after := time.After(tleft) after := time.After(tleft)
for _, p := range provs {
go func(pr *peer.Peer) { // TODO: when the data is received, shut down this for loop
ledger := bs.GetLedger(pr.Key()) go func() {
blk, err := bs.getBlock(k, pr, tleft) for p := range provs_ch {
if err != nil { go func(pr *peer.Peer) {
u.PErr("%v\n", err) ledger := bs.GetLedger(pr.Key())
return blk, err := bs.getBlock(k, pr, tleft)
} if err != nil {
// NOTE: this credits everyone who sends us a block, u.PErr("%v\n", err)
// even if we dont use it return
ledger.ReceivedBytes(uint64(len(blk))) }
select { // NOTE: this credits everyone who sends us a block,
case valchan <- blk: // even if we dont use it
default: ledger.ReceivedBytes(uint64(len(blk)))
} select {
}(p) case valchan <- blk:
} default:
}
}(p)
}
}()
select { select {
case blkdata := <-valchan: case blkdata := <-valchan:
...@@ -213,3 +214,7 @@ func (bs *BitSwap) GetLedger(k u.Key) *Ledger { ...@@ -213,3 +214,7 @@ func (bs *BitSwap) GetLedger(k u.Key) *Ledger {
bs.partners[k] = l bs.partners[k] = l
return l return l
} }
func (bs *BitSwap) Halt() {
bs.haltChan <- struct{}{}
}
package blocks package blockservice
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"testing"
ds "github.com/jbenet/datastore.go" ds "github.com/jbenet/datastore.go"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
"testing"
) )
func TestBlocks(t *testing.T) { func TestBlocks(t *testing.T) {
...@@ -17,7 +19,7 @@ func TestBlocks(t *testing.T) { ...@@ -17,7 +19,7 @@ func TestBlocks(t *testing.T) {
return return
} }
b, err := NewBlock([]byte("beep boop")) b, err := blocks.NewBlock([]byte("beep boop"))
if err != nil { if err != nil {
t.Error("failed to construct block", err) t.Error("failed to construct block", err)
return return
......
...@@ -2,10 +2,11 @@ package importer ...@@ -2,10 +2,11 @@ package importer
import ( import (
"fmt" "fmt"
dag "github.com/jbenet/go-ipfs/merkledag"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
dag "github.com/jbenet/go-ipfs/merkledag"
) )
// BlockSizeLimit specifies the maximum size an imported block can have. // BlockSizeLimit specifies the maximum size an imported block can have.
...@@ -23,12 +24,16 @@ func NewDagFromReader(r io.Reader, size int64) (*dag.Node, error) { ...@@ -23,12 +24,16 @@ func NewDagFromReader(r io.Reader, size int64) (*dag.Node, error) {
// todo: block-splitting based on rabin fingerprinting // todo: block-splitting based on rabin fingerprinting
// todo: block-splitting with user-defined function // todo: block-splitting with user-defined function
// todo: block-splitting at all. :P // todo: block-splitting at all. :P
// todo: write mote todos
// totally just trusts the reported size. fix later. // totally just trusts the reported size. fix later.
if size > BlockSizeLimit { // 1 MB limit for now. if size > BlockSizeLimit { // 1 MB limit for now.
return nil, ErrSizeLimitExceeded return nil, ErrSizeLimitExceeded
} }
// Ensure that we dont get stuck reading way too much data
r = io.LimitReader(r, BlockSizeLimit)
// we're doing it live! // we're doing it live!
buf, err := ioutil.ReadAll(r) buf, err := ioutil.ReadAll(r)
if err != nil { if err != nil {
...@@ -52,7 +57,7 @@ func NewDagFromFile(fpath string) (*dag.Node, error) { ...@@ -52,7 +57,7 @@ func NewDagFromFile(fpath string) (*dag.Node, error) {
} }
if stat.IsDir() { if stat.IsDir() {
return nil, fmt.Errorf("`fpath` is a directory") return nil, fmt.Errorf("`%s` is a directory", fpath)
} }
f, err := os.Open(fpath) f, err := os.Open(fpath)
......
...@@ -22,7 +22,7 @@ import ( ...@@ -22,7 +22,7 @@ import (
// PutValue adds value corresponding to given Key. // PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT // This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) { func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
complete := make(chan struct{}) complete := make(chan struct{})
count := 0 count := 0
for _, route := range dht.routingTables { for _, route := range dht.routingTables {
...@@ -45,6 +45,7 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) { ...@@ -45,6 +45,7 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
<-complete <-complete
} }
return nil
} }
// GetValue searches for the value corresponding to given Key. // GetValue searches for the value corresponding to given Key.
...@@ -183,6 +184,59 @@ func (dht *IpfsDHT) Provide(key u.Key) error { ...@@ -183,6 +184,59 @@ func (dht *IpfsDHT) Provide(key u.Key) error {
return nil return nil
} }
func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer {
peerOut := make(chan *peer.Peer, count)
go func() {
ps := newPeerSet()
provs := dht.providers.GetProviders(key)
for _, p := range provs {
count--
// NOTE: assuming that the list of peers is unique
ps.Add(p)
peerOut <- p
if count <= 0 {
return
}
}
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
go func() {
pmes, err := dht.findProvidersSingle(pp, key, 0, timeout)
if err != nil {
u.PErr("%v\n", err)
return
}
dht.addPeerListAsync(key, pmes.GetPeers(), ps, count, peerOut)
}()
}
}()
return peerOut
}
//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps *peerSet, count int, out chan *peer.Peer) {
for _, pbp := range peers {
maddr, err := ma.NewMultiaddr(pbp.GetAddr())
if err != nil {
u.PErr("%v\n", err)
continue
}
p, err := dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
if err != nil {
u.PErr("%v\n", err)
continue
}
dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
} else if ps.Size() >= count {
return
}
}
}
// FindProviders searches for peers who can provide the value for given key. // FindProviders searches for peers who can provide the value for given key.
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRPC("FindProviders") ll := startNewRPC("FindProviders")
......
...@@ -40,6 +40,7 @@ func (c *counter) Size() (s int) { ...@@ -40,6 +40,7 @@ func (c *counter) Size() (s int) {
return return
} }
// peerSet is a threadsafe set of peers
type peerSet struct { type peerSet struct {
ps map[string]bool ps map[string]bool
lk sync.RWMutex lk sync.RWMutex
...@@ -69,3 +70,14 @@ func (ps *peerSet) Size() int { ...@@ -69,3 +70,14 @@ func (ps *peerSet) Size() int {
defer ps.lk.RUnlock() defer ps.lk.RUnlock()
return len(ps.ps) return len(ps.ps)
} }
func (ps *peerSet) AddIfSmallerThan(p *peer.Peer, maxsize int) bool {
var success bool
ps.lk.Lock()
if _, ok := ps.ps[string(p.ID)]; !ok && len(ps.ps) < maxsize {
success = true
ps.ps[string(p.ID)] = true
}
ps.lk.Unlock()
return success
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论