提交 2487c99a 作者: Łukasz Magiera

p2p: refactor local/remote

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 48d83be0
...@@ -22,7 +22,8 @@ import ( ...@@ -22,7 +22,8 @@ import (
// P2PListenerInfoOutput is output type of ls command // P2PListenerInfoOutput is output type of ls command
type P2PListenerInfoOutput struct { type P2PListenerInfoOutput struct {
Protocol string Protocol string
Address string ListenAddress string
TargetAddress string
} }
// P2PStreamInfoOutput is output type of streams command // P2PStreamInfoOutput is output type of streams command
...@@ -130,7 +131,7 @@ func forwardRemote(ctx context.Context, p *p2p.P2P, proto string, target string) ...@@ -130,7 +131,7 @@ func forwardRemote(ctx context.Context, p *p2p.P2P, proto string, target string)
} }
// TODO: return some info // TODO: return some info
_, err = p.NewListener(ctx, proto, addr) _, err = p.ForwardRemote(ctx, proto, addr)
return err return err
} }
...@@ -151,7 +152,7 @@ func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto st ...@@ -151,7 +152,7 @@ func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto st
} }
// TODO: return some info // TODO: return some info
_, err = p.Dial(ctx, peer, proto, bindAddr) _, err = p.ForwardLocal(ctx, peer, proto, bindAddr)
return err return err
} }
...@@ -207,7 +208,8 @@ var p2pListenerLsCmd = &cmds.Command{ ...@@ -207,7 +208,8 @@ var p2pListenerLsCmd = &cmds.Command{
for _, listener := range n.P2P.Listeners.Listeners { for _, listener := range n.P2P.Listeners.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: listener.Protocol(), Protocol: listener.Protocol(),
Address: listener.Address(), ListenAddress: listener.ListenAddress(),
TargetAddress: listener.TargetAddress(),
}) })
} }
...@@ -227,10 +229,10 @@ var p2pListenerLsCmd = &cmds.Command{ ...@@ -227,10 +229,10 @@ var p2pListenerLsCmd = &cmds.Command{
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, listener := range list.Listeners { for _, listener := range list.Listeners {
if headers { if headers {
fmt.Fprintln(w, "Address\tProtocol") fmt.Fprintln(w, "Protocol\tListen Address\tTarget Address")
} }
fmt.Fprintf(w, "%s\t%s\n", listener.Address, listener.Protocol) fmt.Fprintf(w, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress)
} }
w.Flush() w.Flush()
...@@ -330,7 +332,7 @@ Note that the connections originate from the ipfs daemon process. ...@@ -330,7 +332,7 @@ Note that the connections originate from the ipfs daemon process.
return return
} }
_, err = n.P2P.NewListener(n.Context(), proto, addr) _, err = n.P2P.ForwardRemote(n.Context(), proto, addr)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) res.SetError(err, cmdkit.ErrNormal)
return return
...@@ -339,7 +341,7 @@ Note that the connections originate from the ipfs daemon process. ...@@ -339,7 +341,7 @@ Note that the connections originate from the ipfs daemon process.
// Successful response. // Successful response.
res.SetOutput(&P2PListenerInfoOutput{ res.SetOutput(&P2PListenerInfoOutput{
Protocol: proto, Protocol: proto,
Address: addr.String(), TargetAddress: addr.String(),
}) })
}, },
} }
...@@ -389,7 +391,7 @@ can transparently connect to a p2p service. ...@@ -389,7 +391,7 @@ can transparently connect to a p2p service.
} }
} }
listenerInfo, err := n.P2P.Dial(n.Context(), peer, proto, bindAddr) listenerInfo, err := n.P2P.ForwardLocal(n.Context(), peer, proto, bindAddr)
if err != nil { if err != nil {
res.SetError(err, cmdkit.ErrNormal) res.SetError(err, cmdkit.ErrNormal)
return return
...@@ -397,7 +399,7 @@ can transparently connect to a p2p service. ...@@ -397,7 +399,7 @@ can transparently connect to a p2p service.
output := P2PListenerInfoOutput{ output := P2PListenerInfoOutput{
Protocol: listenerInfo.Protocol(), Protocol: listenerInfo.Protocol(),
Address: listenerInfo.Address(), ListenAddress: listenerInfo.ListenAddress(),
} }
res.SetOutput(&output) res.SetOutput(&output)
......
...@@ -2,7 +2,8 @@ package p2p ...@@ -2,7 +2,8 @@ package p2p
type Listener interface { type Listener interface {
Protocol() string Protocol() string
Address() string ListenAddress() string
TargetAddress() string
// Close closes the listener. Does not affect child streams // Close closes the listener. Does not affect child streams
Close() error Close() error
......
...@@ -12,8 +12,8 @@ import ( ...@@ -12,8 +12,8 @@ import (
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
) )
// outboundListener accepts libp2p streams and proxies them to a manet host // localListener manet streams and proxies them to libp2p services
type outboundListener struct { type localListener struct {
ctx context.Context ctx context.Context
p2p *P2P p2p *P2P
...@@ -25,14 +25,14 @@ type outboundListener struct { ...@@ -25,14 +25,14 @@ type outboundListener struct {
listener manet.Listener listener manet.Listener
} }
// Dial creates new P2P stream to a remote listener // ForwardLocal creates new P2P stream to a remote listener
func (p2p *P2P) Dial(ctx context.Context, peer peer.ID, proto string, bindAddr ma.Multiaddr) (Listener, error) { func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto string, bindAddr ma.Multiaddr) (Listener, error) {
maListener, err := manet.Listen(bindAddr) maListener, err := manet.Listen(bindAddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
listener := &outboundListener{ listener := &localListener{
ctx: ctx, ctx: ctx,
p2p: p2p, p2p: p2p,
...@@ -50,7 +50,7 @@ func (p2p *P2P) Dial(ctx context.Context, peer peer.ID, proto string, bindAddr m ...@@ -50,7 +50,7 @@ func (p2p *P2P) Dial(ctx context.Context, peer peer.ID, proto string, bindAddr m
return listener, nil return listener, nil
} }
func (l *outboundListener) dial() (net.Stream, error) { func (l *localListener) dial() (net.Stream, error) {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*30) //TODO: configurable? ctx, cancel := context.WithTimeout(l.ctx, time.Second*30) //TODO: configurable?
defer cancel() defer cancel()
...@@ -62,7 +62,7 @@ func (l *outboundListener) dial() (net.Stream, error) { ...@@ -62,7 +62,7 @@ func (l *outboundListener) dial() (net.Stream, error) {
return l.p2p.peerHost.NewStream(l.ctx, l.peer, protocol.ID(l.proto)) return l.p2p.peerHost.NewStream(l.ctx, l.peer, protocol.ID(l.proto))
} }
func (l *outboundListener) acceptConns() { func (l *localListener) acceptConns() {
for { for {
local, err := l.listener.Accept() local, err := l.listener.Accept()
if err != nil { if err != nil {
...@@ -95,16 +95,20 @@ func (l *outboundListener) acceptConns() { ...@@ -95,16 +95,20 @@ func (l *outboundListener) acceptConns() {
} }
} }
func (l *outboundListener) Close() error { func (l *localListener) Close() error {
l.listener.Close() l.listener.Close()
l.p2p.Listeners.Deregister(l.proto) l.p2p.Listeners.Deregister(l.proto)
return nil return nil
} }
func (l *outboundListener) Protocol() string { func (l *localListener) Protocol() string {
return l.proto return l.proto
} }
func (l *outboundListener) Address() string { func (l *localListener) ListenAddress() string {
return "/ipfs/" + l.peer.String() return l.listener.Multiaddr().String()
}
func (l *localListener) TargetAddress() string {
return "/ipfs/" + l.peer.Pretty()
} }
...@@ -9,8 +9,8 @@ import ( ...@@ -9,8 +9,8 @@ import (
manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net" manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net"
) )
// inboundListener accepts libp2p streams and proxies them to a manet host // remoteListener accepts libp2p streams and proxies them to a manet host
type inboundListener struct { type remoteListener struct {
p2p *P2P p2p *P2P
// Application proto identifier. // Application proto identifier.
...@@ -20,15 +20,17 @@ type inboundListener struct { ...@@ -20,15 +20,17 @@ type inboundListener struct {
addr ma.Multiaddr addr ma.Multiaddr
} }
// NewListener creates new p2p listener // ForwardRemote creates new p2p listener
func (p2p *P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (Listener, error) { func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiaddr) (Listener, error) {
listenerInfo := &inboundListener{ listenerInfo := &remoteListener{
p2p: p2p, p2p: p2p,
proto: proto, proto: proto,
addr: addr, addr: addr,
} }
p2p.Listeners.Register(listenerInfo)
p2p.peerHost.SetStreamHandler(protocol.ID(proto), func(remote net.Stream) { p2p.peerHost.SetStreamHandler(protocol.ID(proto), func(remote net.Stream) {
local, err := manet.Dial(addr) local, err := manet.Dial(addr)
if err != nil { if err != nil {
...@@ -55,20 +57,22 @@ func (p2p *P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr ...@@ -55,20 +57,22 @@ func (p2p *P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr
stream.startStreaming() stream.startStreaming()
}) })
p2p.Listeners.Register(listenerInfo)
return listenerInfo, nil return listenerInfo, nil
} }
func (l *inboundListener) Protocol() string { func (l *remoteListener) Protocol() string {
return l.proto return l.proto
} }
func (l *inboundListener) Address() string { func (l *remoteListener) ListenAddress() string {
return "/ipfs"
}
func (l *remoteListener) TargetAddress() string {
return l.addr.String() return l.addr.String()
} }
func (l *inboundListener) Close() error { func (l *remoteListener) Close() error {
l.p2p.peerHost.RemoveStreamHandler(protocol.ID(l.proto)) l.p2p.peerHost.RemoveStreamHandler(protocol.ID(l.proto))
l.p2p.Listeners.Deregister(l.proto) l.p2p.Listeners.Deregister(l.proto)
return nil return nil
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论