提交 bba2d05c 作者: Łukasz Magiera

p2p: cleanup after listener iface split

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 dd48b823
...@@ -4,11 +4,9 @@ import ( ...@@ -4,11 +4,9 @@ import (
"errors" "errors"
"sync" "sync"
net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
) )
// ListenerLocal listens for connections and proxies them to a target // ListenerLocal listens for connections and proxies them to a target
...@@ -31,40 +29,12 @@ type ListenersLocal struct { ...@@ -31,40 +29,12 @@ type ListenersLocal struct {
starting map[string]struct{} starting map[string]struct{}
} }
func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenersLocal { func newListenerRegistry(id peer.ID) *ListenersLocal {
reg := &ListenersLocal{ reg := &ListenersLocal{
Listeners: map[string]ListenerLocal{}, Listeners: map[string]ListenerLocal{},
starting: map[string]struct{}{}, starting: map[string]struct{}{},
} }
addr, err := ma.NewMultiaddr(maPrefix + id.Pretty())
if err != nil {
panic(err)
}
host.SetStreamHandlerMatch("/x/", func(p string) bool {
reg.RLock()
defer reg.RUnlock()
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && string(l.Protocol()) == p {
return true
}
}
return false
}, func(stream net.Stream) {
reg.RLock()
defer reg.RUnlock()
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() {
go l.(*remoteListener).handleStream(stream)
return
}
}
})
return reg return reg
} }
......
...@@ -27,7 +27,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) ...@@ -27,7 +27,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost, peerHost: peerHost,
peerstore: peerstore, peerstore: peerstore,
ListenersLocal: newListenerRegistry(identity, peerHost), ListenersLocal: newListenerRegistry(identity),
ListenersP2P: newListenerP2PRegistry(identity, peerHost), ListenersP2P: newListenerP2PRegistry(identity, peerHost),
Streams: &StreamRegistry{ Streams: &StreamRegistry{
......
...@@ -12,12 +12,13 @@ import ( ...@@ -12,12 +12,13 @@ import (
) )
// Listener listens for connections and proxies them to a target // Listener listens for connections and proxies them to a target
type P2PListener interface { type ListenerP2P interface {
Protocol() protocol.ID Protocol() protocol.ID
ListenAddress() ma.Multiaddr ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr TargetAddress() ma.Multiaddr
start() error start() error
handleStream(remote net.Stream)
// Close closes the listener. Does not affect child streams // Close closes the listener. Does not affect child streams
Close() error Close() error
...@@ -27,13 +28,13 @@ type P2PListener interface { ...@@ -27,13 +28,13 @@ type P2PListener interface {
type ListenersP2P struct { type ListenersP2P struct {
sync.RWMutex sync.RWMutex
Listeners map[protocol.ID]ListenerLocal Listeners map[protocol.ID]ListenerP2P
starting map[protocol.ID]struct{} starting map[protocol.ID]struct{}
} }
func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
reg := &ListenersP2P{ reg := &ListenersP2P{
Listeners: map[protocol.ID]ListenerLocal{}, Listeners: map[protocol.ID]ListenerP2P{},
starting: map[protocol.ID]struct{}{}, starting: map[protocol.ID]struct{}{},
} }
...@@ -59,7 +60,7 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { ...@@ -59,7 +60,7 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
for _, l := range reg.Listeners { for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() { if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() {
go l.(*remoteListener).handleStream(stream) go l.handleStream(stream)
return return
} }
} }
...@@ -69,7 +70,7 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { ...@@ -69,7 +70,7 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
} }
// Register registers listenerInfo into this registry and starts it // Register registers listenerInfo into this registry and starts it
func (r *ListenersP2P) Register(l ListenerLocal) error { func (r *ListenersP2P) Register(l ListenerP2P) error {
r.Lock() r.Lock()
k := l.Protocol() k := l.Protocol()
......
...@@ -23,7 +23,7 @@ type remoteListener struct { ...@@ -23,7 +23,7 @@ type remoteListener struct {
} }
// ForwardRemote creates new p2p listener // ForwardRemote creates new p2p listener
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (P2PListener, error) { func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (ListenerP2P, error) {
listener := &remoteListener{ listener := &remoteListener{
p2p: p2p, p2p: p2p,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论