提交 a037ae6c 作者: Łukasz Magiera

p2p: cleanup listening logic, make dial act like ssh -L

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 eb456441
......@@ -105,8 +105,8 @@ var p2pListenerLsCmd = &cmds.Command{
for _, listener := range n.P2P.Listeners.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: listener.Protocol,
Address: listener.Address.String(),
Protocol: listener.Protocol(),
Address: listener.Address(),
})
}
......@@ -267,7 +267,7 @@ can transparently connect to a p2p service.
return
}
addr, peer, err := ParsePeerParam(req.Arguments()[0])
_, peer, err := ParsePeerParam(req.Arguments()[0])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
......@@ -284,15 +284,15 @@ can transparently connect to a p2p service.
}
}
listenerInfo, err := n.P2P.Dial(n.Context(), addr, peer, proto, bindAddr)
listenerInfo, err := n.P2P.Dial(n.Context(), peer, proto, bindAddr)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
output := P2PListenerInfoOutput{
Protocol: listenerInfo.Protocol,
Address: listenerInfo.Address.String(),
Protocol: listenerInfo.Protocol(),
Address: listenerInfo.Address(),
}
res.SetOutput(&output)
......@@ -331,7 +331,7 @@ var p2pListenerCloseCmd = &cmds.Command{
}
for _, listener := range n.P2P.Listeners.Listeners {
if !closeAll && listener.Protocol != proto {
if !closeAll && listener.Protocol() != proto {
continue
}
listener.Close()
......
package p2p
import (
"context"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net"
)
// inboundListener accepts libp2p streams and proxies them to a manet host
type inboundListener struct {
p2p *P2P
// Application proto identifier.
proto string
addr ma.Multiaddr
}
// NewListener creates new p2p listener
func (p2p *P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (Listener, error) {
listenerInfo := &inboundListener{
proto: proto,
}
p2p.peerHost.SetStreamHandler(protocol.ID(proto), func(remote net.Stream) {
local, err := manet.Dial(addr)
if err != nil {
remote.Reset()
return
}
stream := StreamInfo{
Protocol: proto,
LocalPeer: p2p.identity,
LocalAddr: addr,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &p2p.Streams,
}
p2p.Streams.Register(&stream)
stream.startStreaming()
})
p2p.Listeners.Register(listenerInfo)
return listenerInfo, nil
}
func (l *inboundListener) Protocol() string {
return l.proto
}
func (l *inboundListener) Address() string {
return l.addr.String()
}
func (l *inboundListener) Close() error {
l.p2p.peerHost.RemoveStreamHandler(protocol.ID(l.proto))
return l.p2p.Listeners.Deregister(l.proto)
}
package p2p
import (
"context"
"errors"
"time"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/QmZb7hAgQEhW9dBbzBudU39gCeD4zbe6xafD52LUuF4cUN/go-libp2p-peerstore"
manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
)
// inboundListener accepts libp2p streams and proxies them to a manet host
type outboundListener struct {
ctx context.Context
cancel context.CancelFunc
p2p *P2P
id peer.ID
proto string
peer peer.ID
listener manet.Listener
}
// Dial creates new P2P stream to a remote listener
func (p2p *P2P) Dial(ctx context.Context, peer peer.ID, proto string, bindAddr ma.Multiaddr) (Listener, error) {
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
switch lnet {
case "tcp", "tcp4", "tcp6":
maListener, err := manet.Listen(bindAddr)
if err != nil {
return nil, err
}
listener := &outboundListener{
p2p: p2p,
id: p2p.identity,
proto: proto,
peer: peer,
listener: maListener,
}
go listener.acceptConns()
return listener, nil
default:
return nil, errors.New("unsupported proto: " + lnet)
}
}
func (l *outboundListener) dial() (net.Stream, error) {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*30) //TODO: configurable?
defer cancel()
err := l.p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: l.peer})
if err != nil {
return nil, err
}
return l.p2p.peerHost.NewStream(l.ctx, l.peer, protocol.ID(l.proto))
}
func (l *outboundListener) acceptConns() {
for {
local, err := l.listener.Accept()
if err != nil {
return
}
remote, err := l.dial()
if err != nil {
local.Close()
return
}
stream := StreamInfo{
Protocol: l.proto,
LocalPeer: l.id,
LocalAddr: l.listener.Multiaddr(),
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &l.p2p.Streams,
}
l.p2p.Streams.Register(&stream)
stream.startStreaming()
}
}
func (l *outboundListener) Close() error {
l.listener.Close()
err := l.p2p.Listeners.Deregister(l.proto)
return err
}
func (l *outboundListener) Protocol() string {
return l.proto
}
func (l *outboundListener) Address() string {
return "/ipfs/" + l.peer.String()
}
package p2p
import (
"context"
"errors"
"time"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
p2phost "gx/ipfs/QmeMYW7Nj8jnnEfs9qhm7SxKkoDPUWXu3MsxX6BFwz34tf/go-libp2p-host"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)
// P2P structure holds information on currently running streams/listeners
......@@ -24,6 +16,14 @@ type P2P struct {
peerstore pstore.Peerstore
}
type Listener interface {
Protocol() string
Address() string
// Close closes the listener. Does not affect child streams
Close() error
}
// NewP2P creates new P2P struct
func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P {
return &P2P{
......@@ -33,197 +33,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
}
}
func (p2p *P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(ctx2, time.Second*30) //TODO: configurable?
defer cancel()
err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol))
}
// Dial creates new P2P stream to a remote listener
func (p2p *P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ListenerInfo, error) {
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
listenerInfo := ListenerInfo{
Identity: p2p.identity,
Protocol: proto,
}
remote, err := p2p.newStreamTo(ctx, peer, proto)
if err != nil {
return nil, err
}
switch lnet {
case "tcp", "tcp4", "tcp6":
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Reset(); err2 != nil {
return nil, err2
}
return nil, err
}
listenerInfo.Address = listener.Multiaddr()
listenerInfo.Closer = listener
listenerInfo.Running = true
go p2p.doAccept(&listenerInfo, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &listenerInfo, nil
}
func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
if err != nil {
return
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &p2p.Streams,
}
p2p.Streams.Register(&stream)
stream.startStreaming()
}
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
// P2PListener holds information on a listener
type P2PListener struct {
peerHost p2phost.Host
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
// Accept waits for a connection from the listener
func (il *P2PListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
// Close closes the listener and removes stream handler
func (il *P2PListener) Close() error {
il.cancel()
il.peerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen creates new P2PListener
func (p2p *P2P) registerStreamHandler(ctx2 context.Context, protocol string) (*P2PListener, error) {
ctx, cancel := context.WithCancel(ctx2)
list := &P2PListener{
peerHost: p2p.peerHost,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Reset()
}
})
return list, nil
}
// NewListener creates new p2p listener
func (p2p *P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (*ListenerInfo, error) {
listener, err := p2p.registerStreamHandler(ctx, proto)
if err != nil {
return nil, err
}
listenerInfo := ListenerInfo{
Identity: p2p.identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &p2p.Listeners,
}
go p2p.acceptStreams(&listenerInfo, listener)
p2p.Listeners.Register(&listenerInfo)
return &listenerInfo, nil
}
func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {
for listenerInfo.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(listenerInfo.Address)
if err != nil {
remote.Reset()
continue
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &p2p.Streams,
}
p2p.Streams.Register(&stream)
stream.startStreaming()
}
p2p.Listeners.Deregister(listenerInfo.Protocol)
}
// CheckProtoExists checks whether a protocol handler is registered to
// CheckProtoExists checks whether a proto handler is registered to
// mux handler
func (p2p *P2P) CheckProtoExists(proto string) bool {
protos := p2p.peerHost.Mux().Protocols()
......
......@@ -10,41 +10,13 @@ import (
net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net"
)
// ListenerInfo holds information on a p2p listener.
type ListenerInfo struct {
// Application protocol identifier.
Protocol string
// Node identity
Identity peer.ID
// Local protocol stream address.
Address ma.Multiaddr
// Local protocol stream listener.
Closer io.Closer
// Flag indicating whether we're still accepting incoming connections, or
// whether this application listener has been shutdown.
Running bool
Registry *ListenerRegistry
}
// Close closes the listener. Does not affect child streams
func (c *ListenerInfo) Close() error {
c.Closer.Close()
err := c.Registry.Deregister(c.Protocol)
return err
}
// ListenerRegistry is a collection of local application protocol listeners.
// ListenerRegistry is a collection of local application proto listeners.
type ListenerRegistry struct {
Listeners []*ListenerInfo
Listeners []Listener
}
// Register registers listenerInfo2 in this registry
func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) {
func (c *ListenerRegistry) Register(listenerInfo Listener) {
c.Listeners = append(c.Listeners, listenerInfo)
}
......@@ -52,7 +24,7 @@ func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) {
func (c *ListenerRegistry) Deregister(proto string) error {
foundAt := -1
for i, a := range c.Listeners {
if a.Protocol == proto {
if a.Protocol() == proto {
foundAt = i
break
}
......@@ -120,7 +92,7 @@ func (s *StreamInfo) startStreaming() {
}()
}
// StreamRegistry is a collection of active incoming and outgoing protocol app streams.
// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
Streams []*StreamInfo
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论