提交 8d98d4b4 作者: Jeromy 提交者: Juan Batiz-Benet

making connections between nodes get closer to working

上级 61f13ea7
...@@ -4,12 +4,12 @@ package identify ...@@ -4,12 +4,12 @@ package identify
import ( import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util"
) )
// Perform initial communication with this peer to share node ID's and // Perform initial communication with this peer to share node ID's and
// initiate communication // initiate communication
func Handshake(self *peer.Peer, conn *swarm.Conn) error { func Handshake(self, remote *peer.Peer, in, out chan []byte) error {
// temporary: // temporary:
// put your own id in a 16byte buffer and send that over to // put your own id in a 16byte buffer and send that over to
...@@ -17,12 +17,10 @@ func Handshake(self *peer.Peer, conn *swarm.Conn) error { ...@@ -17,12 +17,10 @@ func Handshake(self *peer.Peer, conn *swarm.Conn) error {
// Once that trade is finished, the handshake is complete and // Once that trade is finished, the handshake is complete and
// both sides should 'trust' each other // both sides should 'trust' each other
id := make([]byte, 16) out <- self.ID
copy(id, self.ID) resp := <-in
remote.ID = peer.ID(resp)
conn.Outgoing.MsgChan <- id u.DOut("Got node id: %s", string(remote.ID))
resp := <-conn.Incoming.MsgChan
conn.Peer.ID = peer.ID(resp)
return nil return nil
} }
...@@ -55,6 +55,10 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) { ...@@ -55,6 +55,10 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
return dht, nil return dht, nil
} }
func (dht *IpfsDHT) Start() {
go dht.handleMessages()
}
// Connect to a new peer at the given address // Connect to a new peer at the given address
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error { func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
peer := new(peer.Peer) peer := new(peer.Peer)
...@@ -65,24 +69,26 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error { ...@@ -65,24 +69,26 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
return err return err
} }
err = identify.Handshake(dht.self, conn) err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
if err != nil { if err != nil {
return err return err
} }
dht.network.StartConn(conn.Peer.Key(), conn) dht.network.StartConn(conn)
// TODO: Add this peer to our routing table // TODO: Add this peer to our routing table
return nil return nil
} }
// Read in all messages from swarm and handle them appropriately // Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch // NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() { func (dht *IpfsDHT) handleMessages() {
u.DOut("Being message handling routine")
for { for {
select { select {
case mes := <-dht.network.Chan.Incoming: case mes := <-dht.network.Chan.Incoming:
u.DOut("recieved message from swarm.")
pmes := new(DHTMessage) pmes := new(DHTMessage)
err := proto.Unmarshal(mes.Data, pmes) err := proto.Unmarshal(mes.Data, pmes)
if err != nil { if err != nil {
...@@ -118,6 +124,8 @@ func (dht *IpfsDHT) handleMessages() { ...@@ -118,6 +124,8 @@ func (dht *IpfsDHT) handleMessages() {
dht.handleFindNode(mes.Peer, pmes) dht.handleFindNode(mes.Peer, pmes)
} }
case err := <-dht.network.Chan.Errors:
panic(err)
case <-dht.shutdown: case <-dht.shutdown:
return return
} }
...@@ -158,10 +166,6 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { ...@@ -158,10 +166,6 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
} }
} }
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
isResponse := true isResponse := true
resp := new(DHTMessage) resp := new(DHTMessage)
...@@ -172,6 +176,18 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { ...@@ -172,6 +176,18 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String())) dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String()))
} }
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
// Register a handler for a specific message ID, used for getting replies // Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message) // to certain messages (i.e. response to a GET_VALUE message)
...@@ -202,6 +218,8 @@ func (dht *IpfsDHT) Halt() { ...@@ -202,6 +218,8 @@ func (dht *IpfsDHT) Halt() {
// Ping a node, log the time it took // Ping a node, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) { func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) {
// Thoughts: maybe this should accept an ID and do a peer lookup? // Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
id := GenerateMessageID() id := GenerateMessageID()
mes_type := DHTMessage_PING mes_type := DHTMessage_PING
pmes := new(DHTMessage) pmes := new(DHTMessage)
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
ident "github.com/jbenet/go-ipfs/identify"
) )
// Message represents a packet of information sent to or received from a // Message represents a packet of information sent to or received from a
...@@ -109,27 +110,21 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { ...@@ -109,27 +110,21 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
// Handle getting ID from this peer and adding it into the map // Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) { func (s *Swarm) handleNewConn(nconn net.Conn) {
p := MakePeerFromConn(nconn) p := new(peer.Peer)
var addr *ma.Multiaddr
//naddr := nconn.RemoteAddr()
//addr := ma.FromDialArgs(naddr.Network(), naddr.String())
conn := &Conn{ conn := &Conn{
Peer: p, Peer: p,
Addr: addr, Addr: nil,
Conn: nconn, Conn: nconn,
} }
newConnChans(conn) newConnChans(conn)
go s.fanIn(conn)
}
// Negotiate with peer for its ID and create a peer object err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
// TODO: this might belong in the peer package if err != nil {
func MakePeerFromConn(conn net.Conn) *peer.Peer { panic(err)
panic("Not yet implemented.") }
s.StartConn(conn)
} }
// Close closes a swarm. // Close closes a swarm.
...@@ -170,14 +165,19 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { ...@@ -170,14 +165,19 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
return nil, err return nil, err
} }
s.StartConn(k, conn) s.StartConn(conn)
return conn, nil return conn, nil
} }
func (s *Swarm) StartConn(k u.Key, conn *Conn) { func (s *Swarm) StartConn(conn *Conn) {
if conn == nil {
panic("tried to start nil Conn!")
}
u.DOut("Starting connection: %s", string(conn.Peer.ID))
// add to conns // add to conns
s.connsLock.Lock() s.connsLock.Lock()
s.conns[k] = conn s.conns[conn.Peer.Key()] = conn
s.connsLock.Unlock() s.connsLock.Unlock()
// kick off reader goroutine // kick off reader goroutine
...@@ -191,6 +191,7 @@ func (s *Swarm) fanOut() { ...@@ -191,6 +191,7 @@ func (s *Swarm) fanOut() {
case <-s.Chan.Close: case <-s.Chan.Close:
return // told to close. return // told to close.
case msg, ok := <-s.Chan.Outgoing: case msg, ok := <-s.Chan.Outgoing:
u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key())
if !ok { if !ok {
return return
} }
...@@ -198,14 +199,17 @@ func (s *Swarm) fanOut() { ...@@ -198,14 +199,17 @@ func (s *Swarm) fanOut() {
s.connsLock.RLock() s.connsLock.RLock()
conn, found := s.conns[msg.Peer.Key()] conn, found := s.conns[msg.Peer.Key()]
s.connsLock.RUnlock() s.connsLock.RUnlock()
if !found { if !found {
e := fmt.Errorf("Sent msg to peer without open conn: %v", e := fmt.Errorf("Sent msg to peer without open conn: %v",
msg.Peer) msg.Peer)
s.Chan.Errors <- e s.Chan.Errors <- e
continue
} }
// queue it in the connection's buffer // queue it in the connection's buffer
conn.Outgoing.MsgChan <- msg.Data conn.Outgoing.MsgChan <- msg.Data
u.DOut("fanOut: message off.")
} }
} }
} }
...@@ -225,6 +229,7 @@ Loop: ...@@ -225,6 +229,7 @@ Loop:
break Loop break Loop
case data, ok := <-conn.Incoming.MsgChan: case data, ok := <-conn.Incoming.MsgChan:
u.DOut("fanIn: got message from incoming channel.")
if !ok { if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", conn) e := fmt.Errorf("Error retrieving from conn: %v", conn)
s.Chan.Errors <- e s.Chan.Errors <- e
...@@ -234,6 +239,7 @@ Loop: ...@@ -234,6 +239,7 @@ Loop:
// wrap it for consumers. // wrap it for consumers.
msg := &Message{Peer: conn.Peer, Data: data} msg := &Message{Peer: conn.Peer, Data: data}
s.Chan.Incoming <- msg s.Chan.Incoming <- msg
u.DOut("fanIn: message off.")
} }
} }
......
...@@ -41,12 +41,12 @@ func TildeExpansion(filename string) (string, error) { ...@@ -41,12 +41,12 @@ func TildeExpansion(filename string) (string, error) {
// PErr is a shorthand printing function to output to Stderr. // PErr is a shorthand printing function to output to Stderr.
func PErr(format string, a ...interface{}) { func PErr(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, format, a...) fmt.Fprintf(os.Stderr, format + "\n", a...)
} }
// POut is a shorthand printing function to output to Stdout. // POut is a shorthand printing function to output to Stdout.
func POut(format string, a ...interface{}) { func POut(format string, a ...interface{}) {
fmt.Fprintf(os.Stdout, format, a...) fmt.Fprintf(os.Stdout, format + "\n", a...)
} }
// DErr is a shorthand debug printing function to output to Stderr. // DErr is a shorthand debug printing function to output to Stderr.
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论