提交 6334e193 作者: Juan Batiz-Benet

make net work with new stream + mux

上级 e5e2d59f
...@@ -15,12 +15,7 @@ import ( ...@@ -15,12 +15,7 @@ import (
// Map maps Keys (Peer.IDs) to Connections. // Map maps Keys (Peer.IDs) to Connections.
type Map map[u.Key]Conn type Map map[u.Key]Conn
// Conn is a generic message-based Peer-to-Peer connection. type PeerConn interface {
type Conn interface {
// ID is an identifier unique to this connection.
ID() string
// LocalMultiaddr is the Multiaddr on this side // LocalMultiaddr is the Multiaddr on this side
LocalMultiaddr() ma.Multiaddr LocalMultiaddr() ma.Multiaddr
...@@ -32,8 +27,16 @@ type Conn interface { ...@@ -32,8 +27,16 @@ type Conn interface {
// RemotePeer is the Peer on the remote side // RemotePeer is the Peer on the remote side
RemotePeer() peer.Peer RemotePeer() peer.Peer
}
// Conn is a generic message-based Peer-to-Peer connection.
type Conn interface {
PeerConn
// ID is an identifier unique to this connection.
ID() string
// net.Conn, cause duplicates. // can't just say "net.Conn" cause we have duplicate methods.
LocalAddr() net.Addr LocalAddr() net.Addr
RemoteAddr() net.Addr RemoteAddr() net.Addr
SetDeadline(t time.Time) error SetDeadline(t time.Time) error
......
package net package net
import ( import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "io"
conn "github.com/jbenet/go-ipfs/net/conn" conn "github.com/jbenet/go-ipfs/net/conn"
msg "github.com/jbenet/go-ipfs/net/message" swarm "github.com/jbenet/go-ipfs/net/swarm2"
mux "github.com/jbenet/go-ipfs/net/mux"
srv "github.com/jbenet/go-ipfs/net/service"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
) )
// Network is the interface IPFS uses for connecting to the world. // Stream represents a bidirectional channel between two agents in
type Network interface { // the IPFS network. "agent" is as granular as desired, potentially
ctxc.ContextCloser // being a "request -> reply" pair, or whole protocols.
// Streams are backed by SPDY streams underneath the hood.
// Listen handles incoming connections on given Multiaddr. type Stream interface {
// Listen(*ma.Muliaddr) error io.Reader
// TODO: for now, only listen on addrs in local peer when initializing. io.Writer
io.Closer
// LocalPeer returns the local peer associated with this network
LocalPeer() peer.Peer // Conn returns the connection this stream is part of.
Conn() Conn
}
// DialPeer attempts to establish a connection to a given peer // StreamHandler is the function protocols who wish to listen to
DialPeer(context.Context, peer.Peer) error // incoming streams must implement.
type StreamHandler func(Stream)
// ClosePeer connection to peer // Conn is a connection to a remote peer. It multiplexes streams.
ClosePeer(peer.Peer) error // Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side:
// stream.Conn().RemotePeer()
type Conn interface {
conn.PeerConn
// Connectedness returns a state signaling connection capabilities // NewStream constructs a new Stream directly connected to p.
Connectedness(peer.Peer) Connectedness NewStream(p peer.Peer) (Stream, error)
}
// GetProtocols returns the protocols registered in the network. // Mux provides simple stream multixplexing.
GetProtocols() *mux.ProtocolMap // It helps you precisely when:
// * You have many streams
// * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
//
// We use a totally ad-hoc encoding:
// <1 byte length in bytes><string name>
// So "bitswap" is 0x0762697473776170
//
// NOTE: only the dialer specifies this muxing line.
// This is because we're using Streams :)
//
// WARNING: this datastructure IS NOT threadsafe.
// do not modify it once the network is using it.
type Mux struct {
Default StreamHandler // handles unknown protocols.
Handlers map[string]StreamHandler
}
// GetPeerList returns the list of peers currently connected in this network. // Network is the interface IPFS uses for connecting to the world.
GetPeerList() []peer.Peer // It dials and listens for connections. it uses a Swarm to pool
// connnections (see swarm pkg, and peerstream.Swarm). Connections
// are encrypted with a TLS-like protocol.
type Network interface {
Dialer
io.Closer
// GetConnections returns the list of connections currently open in this network. // NewStream returns a new stream to given peer p.
GetConnections() []conn.Conn // If there is no connection to p, attempts to create one.
NewStream(p peer.Peer) (Stream, error)
// GetBandwidthTotals returns the total number of bytes passed through // Swarm returns the connection Swarm
// the network since it was instantiated Swarm() *swarm.Swarm
GetBandwidthTotals() (uint64, uint64)
// GetMessageCounts returns the total number of messages passed through // BandwidthTotals returns the total number of bytes passed through
// the network since it was instantiated // the network since it was instantiated
GetMessageCounts() (uint64, uint64) BandwidthTotals() (uint64, uint64)
// SendMessage sends given Message out
SendMessage(msg.NetMessage) error
// ListenAddresses returns a list of addresses at which this network listens. // ListenAddresses returns a list of addresses at which this network listens.
ListenAddresses() []ma.Multiaddr ListenAddresses() []ma.Multiaddr
...@@ -61,15 +89,6 @@ type Network interface { ...@@ -61,15 +89,6 @@ type Network interface {
InterfaceListenAddresses() ([]ma.Multiaddr, error) InterfaceListenAddresses() ([]ma.Multiaddr, error)
} }
// Sender interface for network services.
type Sender srv.Sender
// Handler interface for network services.
type Handler srv.Handler
// Service interface for network resources.
type Service srv.Service
// Dialer represents a service that can dial out to peers // Dialer represents a service that can dial out to peers
// (this is usually just a Network, but other services may not need the whole // (this is usually just a Network, but other services may not need the whole
// stack, and thus it becomes easier to mock) // stack, and thus it becomes easier to mock)
......
package mux package net
import ( import (
"errors" "errors"
...@@ -6,36 +6,12 @@ import ( ...@@ -6,36 +6,12 @@ import (
"io" "io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
swarm "github.com/jbenet/go-ipfs/net/swarm2"
eventlog "github.com/jbenet/go-ipfs/util/eventlog" eventlog "github.com/jbenet/go-ipfs/util/eventlog"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
) )
var log = eventlog.Logger("mux2") var log = eventlog.Logger("mux2")
// Mux provides simple stream multixplexing.
// It helps you precisely when:
// * You have many streams
// * You have function handlers
//
// We use a totally ad-hoc encoding:
//
// <1 byte length in bytes><string name>
//
// So "bitswap" is 0x0762697473776170
//
// NOTE: only the dialer specifies this muxing line.
// This is because we're using Streams :)
//
// WARNING: this datastructure IS NOT threadsafe.
// do not modify it once it's begun serving.
type Mux struct {
Default StreamHandler
Handlers map[string]StreamHandler
}
type StreamHandler func(s *swarm.Stream)
// NextName reads the stream and returns the next protocol name // NextName reads the stream and returns the next protocol name
// according to the muxer encoding. // according to the muxer encoding.
func (m *Mux) NextName(s io.Reader) (string, error) { func (m *Mux) NextName(s io.Reader) (string, error) {
...@@ -78,7 +54,7 @@ func (m *Mux) NextHandler(s io.Reader) (string, StreamHandler, error) { ...@@ -78,7 +54,7 @@ func (m *Mux) NextHandler(s io.Reader) (string, StreamHandler, error) {
} }
// Handle reads the next name off the Stream, and calls a function // Handle reads the next name off the Stream, and calls a function
func (m *Mux) Handle(s *swarm.Stream) { func (m *Mux) Handle(s Stream) {
ctx := context.Background() ctx := context.Background()
name, handler, err := m.NextHandler(s) name, handler, err := m.NextHandler(s)
......
package mux package net
import ( import (
"bytes" "bytes"
"testing" "testing"
swarm "github.com/jbenet/go-ipfs/net/swarm2"
) )
var testCases = map[string]string{ var testCases = map[string]string{
...@@ -30,8 +28,8 @@ func TestHandler(t *testing.T) { ...@@ -30,8 +28,8 @@ func TestHandler(t *testing.T) {
outs := make(chan string, 10) outs := make(chan string, 10)
h := func(n string) func(s *swarm.Stream) { h := func(n string) func(s Stream) {
return func(s *swarm.Stream) { return func(s Stream) {
outs <- n outs <- n
} }
} }
......
...@@ -2,66 +2,112 @@ ...@@ -2,66 +2,112 @@
package net package net
import ( import (
conn "github.com/jbenet/go-ipfs/net/conn" swarm "github.com/jbenet/go-ipfs/net/swarm2"
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
swarm "github.com/jbenet/go-ipfs/net/swarm"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util" util "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
) )
// IpfsNetwork implements the Network interface, type stream swarm.Stream
type IpfsNetwork struct {
// local peer func (s *stream) SwarmStream() *swarm.Stream {
local peer.Peer return (*swarm.Stream)(s)
}
// protocol multiplexing // Conn returns the connection this stream is part of.
muxer *mux.Muxer func (s *stream) Conn() Conn {
c := s.SwarmStream().Conn()
return (*conn_)(c)
}
// peer connection multiplexing // Conn returns the connection this stream is part of.
swarm *swarm.Swarm func (s *stream) Close() error {
return s.SwarmStream().Close()
}
// network context closer // Read reads bytes from a stream.
ctxc.ContextCloser func (s *stream) Read(p []byte) (n int, err error) {
return s.SwarmStream().Read(p)
} }
// NewIpfsNetwork is the structure that implements the network interface // Write writes bytes to a stream, flushing for each call.
func NewIpfsNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, func (s *stream) Write(p []byte) (n int, err error) {
peers peer.Peerstore, pmap *mux.ProtocolMap) (*IpfsNetwork, error) { return s.SwarmStream().Write(p)
}
in := &IpfsNetwork{ type conn_ swarm.Conn
local: local,
muxer: mux.NewMuxer(ctx, *pmap),
ContextCloser: ctxc.NewContextCloser(ctx, nil),
}
var err error func (c *conn_) SwarmConn() *swarm.Conn {
in.swarm, err = swarm.NewSwarm(ctx, listen, local, peers) return (*swarm.Conn)(c)
}
func (c *conn_) NewStream(p peer.Peer) (Stream, error) {
s, err := (*swarm.Conn)(c).NewStream()
if err != nil { if err != nil {
in.Close()
return nil, err return nil, err
} }
return (*stream)(s), nil
}
in.AddCloserChild(in.swarm) // LocalMultiaddr is the Multiaddr on this side
in.AddCloserChild(in.muxer) func (c *conn_) LocalMultiaddr() ma.Multiaddr {
return c.SwarmConn().LocalMultiaddr()
}
// remember to wire components together. // LocalPeer is the Peer on our side of the connection
in.muxer.Pipe.ConnectTo(in.swarm.Pipe) func (c *conn_) LocalPeer() peer.Peer {
return c.SwarmConn().LocalPeer()
}
return in, nil // RemoteMultiaddr is the Multiaddr on the remote side
func (c *conn_) RemoteMultiaddr() ma.Multiaddr {
return c.SwarmConn().RemoteMultiaddr()
} }
// Listen handles incoming connections on given Multiaddr. // RemotePeer is the Peer on the remote side
// func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} func (c *conn_) RemotePeer() peer.Peer {
return c.SwarmConn().RemotePeer()
}
// network implements the Network interface,
type network struct {
local peer.Peer // local peer
mux Mux // protocol multiplexing
swarm *swarm.Swarm // peer connection multiplexing
cg ctxgroup.ContextGroup // for Context closing
}
// NewConn is the structure that implements the network interface
func NewConn(ctx context.Context, listen []ma.Multiaddr, local peer.Peer,
peers peer.Peerstore, m Mux) (*network, error) {
s, err := swarm.NewSwarm(ctx, listen, local, peers)
if err != nil {
return nil, err
}
n := &network{
local: local,
swarm: s,
mux: m,
cg: ctxgroup.WithContext(ctx),
}
s.SetStreamHandler(func(s *swarm.Stream) {
m.Handle((*stream)(s))
})
n.cg.AddChildGroup(s.CtxGroup())
return n, nil
}
// DialPeer attempts to establish a connection to a given peer. // DialPeer attempts to establish a connection to a given peer.
// Respects the context. // Respects the context.
func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { func (n *network) DialPeer(ctx context.Context, p peer.Peer) error {
err := util.ContextDo(ctx, func() error { err := util.ContextDo(ctx, func() error {
_, err := n.swarm.Dial(p) _, err := n.swarm.Dial(p)
return err return err
...@@ -70,72 +116,39 @@ func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { ...@@ -70,72 +116,39 @@ func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error {
} }
// LocalPeer the network's LocalPeer // LocalPeer the network's LocalPeer
func (n *IpfsNetwork) LocalPeer() peer.Peer { func (n *network) LocalPeer() peer.Peer {
return n.swarm.LocalPeer() return n.swarm.LocalPeer()
} }
// ClosePeer connection to peer // ClosePeer connection to peer
func (n *IpfsNetwork) ClosePeer(p peer.Peer) error { func (n *network) ClosePeer(p peer.Peer) error {
return n.swarm.CloseConnection(p) return n.swarm.CloseConnection(p)
} }
// IsConnected returns whether a connection to given peer exists. // BandwidthTotals returns the total amount of bandwidth transferred
func (n *IpfsNetwork) IsConnected(p peer.Peer) bool { func (n *network) BandwidthTotals() (in uint64, out uint64) {
return n.swarm.GetConnection(p.ID()) != nil // need to implement this. probably best to do it in swarm this time.
} // need a "metrics" object
return 0, 0
// GetProtocols returns the protocols registered in the network.
func (n *IpfsNetwork) GetProtocols() *mux.ProtocolMap {
// copy over because this map should be read only.
pmap := mux.ProtocolMap{}
for id, proto := range n.muxer.Protocols {
pmap[id] = proto
}
return &pmap
}
// SendMessage sends given Message out
func (n *IpfsNetwork) SendMessage(m msg.NetMessage) error {
n.swarm.Outgoing <- m
return nil
}
// GetPeerList returns the networks list of connected peers
func (n *IpfsNetwork) GetPeerList() []peer.Peer {
return n.swarm.GetPeerList()
}
// GetConnections returns the networks list of open connections
func (n *IpfsNetwork) GetConnections() []conn.Conn {
return n.swarm.Connections()
}
// GetBandwidthTotals returns the total amount of bandwidth transferred
func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) {
return n.muxer.GetBandwidthTotals()
}
// GetBandwidthTotals returns the total amount of messages transferred
func (n *IpfsNetwork) GetMessageCounts() (in uint64, out uint64) {
return n.muxer.GetMessageCounts()
} }
// ListenAddresses returns a list of addresses at which this network listens. // ListenAddresses returns a list of addresses at which this network listens.
func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr { func (n *network) ListenAddresses() []ma.Multiaddr {
return n.swarm.ListenAddresses() return n.swarm.ListenAddresses()
} }
// InterfaceListenAddresses returns a list of addresses at which this network // InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to // listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces. // use the known local interfaces.
func (n *IpfsNetwork) InterfaceListenAddresses() ([]ma.Multiaddr, error) { func (n *network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return n.swarm.InterfaceListenAddresses() return swarm.InterfaceListenAddresses(n.swarm)
} }
// Connectedness returns a state signaling connection capabilities // Connectedness returns a state signaling connection capabilities
// For now only returns Connecter || NotConnected. Expand into more later. // For now only returns Connecter || NotConnected. Expand into more later.
func (n *IpfsNetwork) Connectedness(p peer.Peer) Connectedness { func (n *network) Connectedness(p peer.Peer) Connectedness {
if n.swarm.GetConnection(p.ID()) != nil { c := n.swarm.ConnectionsToPeer(p)
if c != nil && len(c) < 1 {
return Connected return Connected
} }
return NotConnected return NotConnected
......
...@@ -50,6 +50,11 @@ func (s *Swarm) teardown() error { ...@@ -50,6 +50,11 @@ func (s *Swarm) teardown() error {
return s.swarm.Close() return s.swarm.Close()
} }
// CtxGroup returns the Context Group of the swarm
func (s *Swarm) CtxGroup() ctxgroup.ContextGroup {
return s.cg
}
// Close stops the Swarm. // Close stops the Swarm.
func (s *Swarm) Close() error { func (s *Swarm) Close() error {
return s.cg.Close() return s.cg.Close()
...@@ -106,8 +111,8 @@ func (s *Swarm) CloseConnection(p peer.Peer) error { ...@@ -106,8 +111,8 @@ func (s *Swarm) CloseConnection(p peer.Peer) error {
return nil return nil
} }
// GetPeerList returns a copy of the set of peers swarm is connected to. // Peers returns a copy of the set of peers swarm is connected to.
func (s *Swarm) GetPeerList() []peer.Peer { func (s *Swarm) Peers() []peer.Peer {
conns := s.Connections() conns := s.Connections()
seen := make(map[peer.Peer]struct{}) seen := make(map[peer.Peer]struct{})
......
...@@ -22,19 +22,23 @@ func (s *Stream) Conn() *Conn { ...@@ -22,19 +22,23 @@ func (s *Stream) Conn() *Conn {
return (*Conn)(s.Stream().Conn()) return (*Conn)(s.Stream().Conn())
} }
// Write writes bytes to a stream, calling write data for each call. // Wait waits for the stream to receive a reply.
func (s *Stream) Wait() error { func (s *Stream) Wait() error {
return s.Stream().Wait() return s.Stream().Wait()
} }
// Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) { func (s *Stream) Read(p []byte) (n int, err error) {
return s.Stream().Read(p) return s.Stream().Read(p)
} }
// Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) { func (s *Stream) Write(p []byte) (n int, err error) {
return s.Stream().Write(p) return s.Stream().Write(p)
} }
// Close closes the stream, indicating this side is finished
// with the stream.
func (s *Stream) Close() error { func (s *Stream) Close() error {
return s.Stream().Close() return s.Stream().Close()
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论