提交 c5e7273c 作者: Jeromy

refactor to allow use of mes_listener outside of dht

上级 05b80afc
package bitswap
import (
"code.google.com/p/goprotobuf/proto"
blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
......@@ -27,12 +29,18 @@ type BitSwap struct {
peer *peer.Peer
// net holds the connections to all peers.
net swarm.Network
net swarm.Network
meschan *swarm.Chan
// datastore is the local database
// Ledgers of known
datastore ds.Datastore
// routing interface for communication
routing routing.IpfsRouting
listener *swarm.MesListener
// partners is a map of currently active bitswap relationships.
// The Ledger has the peer.ID, and the peer connection works through net.
// Ledgers of known relationships (active or inactive) stored in datastore.
......@@ -44,27 +52,71 @@ type BitSwap struct {
// wantList is the set of keys we want values for. a map for fast lookups.
wantList KeySet
haltChan chan struct{}
}
// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore) *BitSwap {
return &BitSwap{
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
bs := &BitSwap{
peer: p,
net: net,
datastore: d,
partners: LedgerMap{},
wantList: KeySet{},
routing: r,
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}),
}
go bs.handleMessages()
return bs
}
// GetBlock attempts to retrieve a particular block from peers, within timeout.
func (s *BitSwap) GetBlock(k u.Key, timeout time.Time) (
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Time) (
*blocks.Block, error) {
begin := time.Now()
_, err := bs.routing.FindProviders(k, timeout)
if err != nil {
u.PErr("GetBlock error: %s\n", err)
return
}
tleft := timeout.Sub(time.Now().Sub(begin))
return nil, errors.New("not implemented")
}
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
mes := new(PBMessage)
mes.Id = proto.Uint64(swarm.GenerateID())
mes.Key = proto.String(k)
typ := PBMessage_GET_BLOCK
mes.Type = &typ
after := time.After(timeout)
resp := bs.listener.Listen(mes.GetId(), 1, timeout)
smes := swarm.NewMessage(p, mes)
bs.meschan.Outgoing <- smes
select {
case resp_mes := <-resp:
case <-after:
u.PErr("getBlock for '%s' timed out.", k)
return nil, u.ErrTimeout
}
}
// HaveBlock announces the existance of a block to BitSwap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (s *BitSwap) HaveBlock(k u.Key) (*blocks.Block, error) {
func (bs *BitSwap) HaveBlock(k u.Key) (*blocks.Block, error) {
return nil, errors.New("not implemented")
}
func (bs *BitSwap) handleMessages() {
for {
select {
case mes := bs.meschan.Incoming:
case <-bs.haltChan:
}
}
}
// Code generated by protoc-gen-go.
// source: message.proto
// DO NOT EDIT!
/*
Package bitswap is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
PBMessage
*/
package bitswap
import proto "code.google.com/p/goprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type PBMessage_MessageType int32
const (
PBMessage_GET_BLOCK PBMessage_MessageType = 0
)
var PBMessage_MessageType_name = map[int32]string{
0: "GET_BLOCK",
}
var PBMessage_MessageType_value = map[string]int32{
"GET_BLOCK": 0,
}
func (x PBMessage_MessageType) Enum() *PBMessage_MessageType {
p := new(PBMessage_MessageType)
*p = x
return p
}
func (x PBMessage_MessageType) String() string {
return proto.EnumName(PBMessage_MessageType_name, int32(x))
}
func (x *PBMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(PBMessage_MessageType_value, data, "PBMessage_MessageType")
if err != nil {
return err
}
*x = PBMessage_MessageType(value)
return nil
}
type PBMessage struct {
Type *PBMessage_MessageType `protobuf:"varint,1,req,enum=bitswap.PBMessage_MessageType" json:"Type,omitempty"`
Id *uint64 `protobuf:"varint,2,req,name=id" json:"id,omitempty"`
Key *string `protobuf:"bytes,3,req,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBMessage) Reset() { *m = PBMessage{} }
func (m *PBMessage) String() string { return proto.CompactTextString(m) }
func (*PBMessage) ProtoMessage() {}
func (m *PBMessage) GetType() PBMessage_MessageType {
if m != nil && m.Type != nil {
return *m.Type
}
return PBMessage_GET_BLOCK
}
func (m *PBMessage) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *PBMessage) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *PBMessage) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *PBMessage) GetResponse() bool {
if m != nil && m.Response != nil {
return *m.Response
}
return false
}
func init() {
proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value)
}
package bitswap;
message PBMessage {
enum MessageType {
GET_BLOCK = 0;
}
required MessageType Type = 1;
required uint64 id = 2;
required string key = 3;
optional bytes value = 4;
optional bool response = 5;
}
......@@ -2,7 +2,7 @@ package dht
import (
"bytes"
"errors"
"fmt"
"sync"
"time"
......@@ -49,7 +49,7 @@ type IpfsDHT struct {
diaglock sync.Mutex
// listener is a server to register to listen for responses to messages
listener *mesListener
listener *swarm.MesListener
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
......@@ -66,7 +66,7 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.listener = newMesListener()
dht.listener = swarm.NewMesListener()
dht.birth = time.Now()
return dht
}
......@@ -89,7 +89,7 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
// NOTE: this should be done better...
err = dht.Ping(npeer, time.Second*2)
if err != nil {
return nil, errors.New("failed to ping newly connected peer\n")
return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
}
dht.Update(npeer)
......@@ -132,19 +132,19 @@ func (dht *IpfsDHT) handleMessages() {
pmes.GetId(), mes.Peer.ID.Pretty())
switch pmes.GetType() {
case PBDHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
go dht.handleGetValue(mes.Peer, pmes)
case PBDHTMessage_PUT_VALUE:
dht.handlePutValue(mes.Peer, pmes)
go dht.handlePutValue(mes.Peer, pmes)
case PBDHTMessage_FIND_NODE:
dht.handleFindPeer(mes.Peer, pmes)
go dht.handleFindPeer(mes.Peer, pmes)
case PBDHTMessage_ADD_PROVIDER:
dht.handleAddProvider(mes.Peer, pmes)
go dht.handleAddProvider(mes.Peer, pmes)
case PBDHTMessage_GET_PROVIDERS:
dht.handleGetProviders(mes.Peer, pmes)
go dht.handleGetProviders(mes.Peer, pmes)
case PBDHTMessage_PING:
dht.handlePing(mes.Peer, pmes)
go dht.handlePing(mes.Peer, pmes)
case PBDHTMessage_DIAGNOSTIC:
dht.handleDiagnostic(mes.Peer, pmes)
go dht.handleDiagnostic(mes.Peer, pmes)
default:
u.PErr("Recieved invalid message type")
}
......@@ -162,7 +162,7 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er
Type: PBDHTMessage_PUT_VALUE,
Key: key,
Value: value,
ID: GenerateMessageID(),
ID: swarm.GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
......@@ -242,6 +242,7 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
resp := Message{
Type: pmes.GetType(),
Response: true,
......@@ -328,6 +329,8 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
dht.providers.Halt()
dht.listener.Halt()
}
// NOTE: not yet finished, low priority
......@@ -424,7 +427,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Value: []byte{byte(level)},
ID: GenerateMessageID(),
ID: swarm.GenerateMessageID(),
}
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
......@@ -539,7 +542,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
pmes := Message{
Type: PBDHTMessage_FIND_NODE,
Key: string(id),
ID: GenerateMessageID(),
ID: swarm.GenerateMessageID(),
Value: []byte{byte(level)},
}
......@@ -575,7 +578,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time
pmes := Message{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
ID: GenerateMessageID(),
ID: swarm.GenerateMessageID(),
Value: []byte{byte(level)},
}
......
......@@ -153,7 +153,7 @@ func TestGetFailures(t *testing.T) {
req := Message{
Type: PBDHTMessage_GET_VALUE,
Key: "hello",
ID: GenerateMessageID(),
ID: swarm.GenerateMessageID(),
Value: []byte{0},
}
fn.Chan.Incoming <- swarm.NewMessage(other, req.ToProtobuf())
......
......@@ -81,3 +81,7 @@ func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
pm.getprovs <- gp
return <-gp.resp
}
func (pm *ProviderManager) Halt() {
pm.halt <- struct{}{}
}
......@@ -4,8 +4,6 @@ import (
"bytes"
"encoding/json"
"errors"
"math/rand"
"sync"
"time"
proto "code.google.com/p/goprotobuf/proto"
......@@ -18,21 +16,6 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6
// We put the 'K' in kademlia!
var KValue = 10
// Its in the paper, i swear
var AlphaValue = 3
// GenerateMessageID creates and returns a new message ID
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
return (uint64(rand.Uint32()) << 32) | uint64(rand.Uint32())
}
// This file implements the Routing interface for the IpfsDHT struct.
// Basic Put/Get
......@@ -64,60 +47,6 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) {
}
}
// A counter for incrementing a variable across multiple threads
type counter struct {
n int
mut sync.RWMutex
}
func (c *counter) Increment() {
c.mut.Lock()
c.n++
c.mut.Unlock()
}
func (c *counter) Decrement() {
c.mut.Lock()
c.n--
c.mut.Unlock()
}
func (c *counter) Size() int {
c.mut.RLock()
defer c.mut.RUnlock()
return c.n
}
type peerSet struct {
ps map[string]bool
lk sync.RWMutex
}
func newPeerSet() *peerSet {
ps := new(peerSet)
ps.ps = make(map[string]bool)
return ps
}
func (ps *peerSet) Add(p *peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID)] = true
ps.lk.Unlock()
}
func (ps *peerSet) Contains(p *peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID)]
ps.lk.RUnlock()
return ok
}
func (ps *peerSet) Size() int {
ps.lk.RLock()
defer ps.lk.RUnlock()
return len(ps.ps)
}
// GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
......@@ -159,9 +88,13 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
count := 0
go func() {
defer close(procPeer)
for {
select {
case p := <-npeerChan:
case p, ok := <-npeerChan:
if !ok {
return
}
count++
if count >= KValue {
errChan <- u.ErrNotFound
......@@ -171,8 +104,11 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
procPeer <- p
default:
if c.Size() == 0 {
errChan <- u.ErrNotFound
if c.Size() <= 0 {
select {
case errChan <- u.ErrNotFound:
default:
}
return
}
}
......@@ -180,20 +116,22 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
}()
process := func() {
defer c.Decrement()
for p := range procPeer {
if p == nil {
c.Decrement()
return
}
val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
if err != nil {
u.DErr("%v\n", err.Error())
c.Decrement()
continue
}
if val != nil {
valChan <- val
c.Decrement()
select {
case valChan <- val:
default:
u.DOut("Wasnt the first to return the value!")
}
return
}
......@@ -347,7 +285,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := Message{ID: GenerateMessageID(), Type: PBDHTMessage_PING}
pmes := Message{ID: swarm.GenerateMessageID(), Type: PBDHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
......@@ -363,7 +301,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
return nil
case <-tout:
// Timed out, think about removing peer from network
u.DOut("Ping peer timed out.")
u.DOut("[%s] Ping peer [%s] timed out.", dht.self.ID.Pretty(), p.ID.Pretty())
dht.listener.Unlisten(pmes.ID)
return u.ErrTimeout
}
......@@ -377,7 +315,7 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
// TODO: Add timeout to this struct so nodes know when to return
pmes := Message{
Type: PBDHTMessage_DIAGNOSTIC,
ID: GenerateMessageID(),
ID: swarm.GenerateMessageID(),
}
listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
......
package dht
import (
"sync"
peer "github.com/jbenet/go-ipfs/peer"
)
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6
// We put the 'K' in kademlia!
var KValue = 10
// Its in the paper, i swear
var AlphaValue = 3
// A counter for incrementing a variable across multiple threads
type counter struct {
n int
mut sync.Mutex
}
func (c *counter) Increment() {
c.mut.Lock()
c.n++
c.mut.Unlock()
}
func (c *counter) Decrement() {
c.mut.Lock()
c.n--
c.mut.Unlock()
}
func (c *counter) Size() (s int) {
c.mut.Lock()
s = c.n
c.mut.Unlock()
return
}
type peerSet struct {
ps map[string]bool
lk sync.RWMutex
}
func newPeerSet() *peerSet {
ps := new(peerSet)
ps.ps = make(map[string]bool)
return ps
}
func (ps *peerSet) Add(p *peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID)] = true
ps.lk.Unlock()
}
func (ps *peerSet) Contains(p *peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID)]
ps.lk.RUnlock()
return ok
}
func (ps *peerSet) Size() int {
ps.lk.RLock()
defer ps.lk.RUnlock()
return len(ps.ps)
}
package dht
package swarm
import (
"math/rand"
"sync"
"time"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
)
type mesListener struct {
type MesListener struct {
listeners map[uint64]*listenInfo
haltchan chan struct{}
unlist chan uint64
......@@ -16,10 +16,15 @@ type mesListener struct {
send chan *respMes
}
// GenerateMessageID creates and returns a new message ID
func GenerateMessageID() uint64 {
return (uint64(rand.Uint32()) << 32) | uint64(rand.Uint32())
}
// The listen info struct holds information about a message that is being waited for
type listenInfo struct {
// Responses matching the listen ID will be sent through resp
resp chan *swarm.Message
resp chan *Message
// count is the number of responses to listen for
count int
......@@ -36,8 +41,8 @@ type listenInfo struct {
id uint64
}
func newMesListener() *mesListener {
ml := new(mesListener)
func NewMesListener() *MesListener {
ml := new(MesListener)
ml.haltchan = make(chan struct{})
ml.listeners = make(map[uint64]*listenInfo)
ml.nlist = make(chan *listenInfo, 16)
......@@ -47,37 +52,37 @@ func newMesListener() *mesListener {
return ml
}
func (ml *mesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message {
func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *Message {
li := new(listenInfo)
li.count = count
li.eol = time.Now().Add(timeout)
li.resp = make(chan *swarm.Message, count)
li.resp = make(chan *Message, count)
li.id = id
ml.nlist <- li
return li.resp
}
func (ml *mesListener) Unlisten(id uint64) {
func (ml *MesListener) Unlisten(id uint64) {
ml.unlist <- id
}
type respMes struct {
id uint64
mes *swarm.Message
mes *Message
}
func (ml *mesListener) Respond(id uint64, mes *swarm.Message) {
func (ml *MesListener) Respond(id uint64, mes *Message) {
ml.send <- &respMes{
id: id,
mes: mes,
}
}
func (ml *mesListener) Halt() {
func (ml *MesListener) Halt() {
ml.haltchan <- struct{}{}
}
func (ml *mesListener) run() {
func (ml *MesListener) run() {
for {
select {
case <-ml.haltchan:
......
package dht
package swarm
import (
"testing"
"time"
"github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/swarm"
peer "github.com/jbenet/go-ipfs/peer"
)
// Ensure that the Message Listeners basic functionality works
func TestMesListenerBasic(t *testing.T) {
ml := newMesListener()
ml := NewMesListener()
a := GenerateMessageID()
resp := ml.Listen(a, 1, time.Minute)
pmes := new(swarm.PBWrapper)
pmes := new(PBWrapper)
pmes.Message = []byte("Hello")
pmes.Type = new(swarm.PBWrapper_MessageType)
mes := swarm.NewMessage(new(peer.Peer), pmes)
pmes.Type = new(PBWrapper_MessageType)
mes := NewMessage(new(peer.Peer), pmes)
go ml.Respond(a, mes)
......
......@@ -25,15 +25,18 @@ type PBWrapper_MessageType int32
const (
PBWrapper_TEST PBWrapper_MessageType = 0
PBWrapper_DHT_MESSAGE PBWrapper_MessageType = 1
PBWrapper_BITSWAP PBWrapper_MessageType = 2
)
var PBWrapper_MessageType_name = map[int32]string{
0: "TEST",
1: "DHT_MESSAGE",
2: "BITSWAP",
}
var PBWrapper_MessageType_value = map[string]int32{
"TEST": 0,
"DHT_MESSAGE": 1,
"BITSWAP": 2,
}
func (x PBWrapper_MessageType) Enum() *PBWrapper_MessageType {
......
......@@ -4,6 +4,7 @@ message PBWrapper {
enum MessageType {
TEST = 0;
DHT_MESSAGE = 1;
BITSWAP = 2;
}
required MessageType Type = 1;
......
......@@ -90,6 +90,7 @@ type Swarm struct {
local *peer.Peer
listeners []net.Listener
haltroute chan struct{}
}
// NewSwarm constructs a Swarm, with a Chan.
......@@ -101,6 +102,7 @@ func NewSwarm(local *peer.Peer) *Swarm {
filterChans: make(map[PBWrapper_MessageType]*Chan),
toFilter: make(chan *Message, 32),
newFilters: make(chan *newFilterInfo),
haltroute: make(chan struct{}),
}
go s.routeMessages()
go s.fanOut()
......@@ -203,6 +205,12 @@ func (s *Swarm) Close() {
for _, list := range s.listeners {
list.Close()
}
s.haltroute <- struct{}{}
for _, filter := range s.filterChans {
filter.Close <- true
}
}
// Dial connects to a peer.
......@@ -347,6 +355,8 @@ func (s *Swarm) routeMessages() {
go s.muxChan(nch, gchan.Type)
}
gchan.resp <- nch
case <-s.haltroute:
return
}
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论