提交 18d2a2d8 作者: Jeromy Johnson 提交者: GitHub

Merge pull request #3943 from magik6k/feat/corenet2

Implemented experimental ptp(corenet) interface
package commands
import (
"bytes"
"errors"
"fmt"
"io"
"strconv"
"text/tabwriter"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
)
// PTPListenerInfoOutput is output type of ls command
type PTPListenerInfoOutput struct {
Protocol string
Address string
}
// PTPStreamInfoOutput is output type of streams command
type PTPStreamInfoOutput struct {
HandlerID string
Protocol string
LocalPeer string
LocalAddress string
RemotePeer string
RemoteAddress string
}
// PTPLsOutput is output type of ls command
type PTPLsOutput struct {
Listeners []PTPListenerInfoOutput
}
// PTPStreamsOutput is output type of streams command
type PTPStreamsOutput struct {
Streams []PTPStreamInfoOutput
}
// PTPCmd is the 'ipfs ptp' command
var PTPCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Libp2p stream mounting.",
ShortDescription: `
Create and use tunnels to remote peers over libp2p
Note: this command is experimental and subject to change as usecases and APIs are refined`,
},
Subcommands: map[string]*cmds.Command{
"listener": ptpListenerCmd,
"stream": ptpStreamCmd,
},
}
// ptpListenerCmd is the 'ipfs ptp listener' command
var ptpListenerCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "P2P listener management.",
ShortDescription: "Create and manage listener p2p endpoints",
},
Subcommands: map[string]*cmds.Command{
"ls": ptpListenerLsCmd,
"open": ptpListenerListenCmd,
"close": ptpListenerCloseCmd,
},
}
// ptpStreamCmd is the 'ipfs ptp stream' command
var ptpStreamCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "P2P stream management.",
ShortDescription: "Create and manage p2p streams",
},
Subcommands: map[string]*cmds.Command{
"ls": ptpStreamLsCmd,
"dial": ptpStreamDialCmd,
"close": ptpStreamCloseCmd,
},
}
var ptpListenerLsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List active p2p listeners.",
},
Options: []cmds.Option{
cmds.BoolOption("headers", "v", "Print table headers (HandlerID, Protocol, Local, Remote).").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
output := &PTPLsOutput{}
for _, listener := range n.PTP.Listeners.Listeners {
output.Listeners = append(output.Listeners, PTPListenerInfoOutput{
Protocol: listener.Protocol,
Address: listener.Address.String(),
})
}
res.SetOutput(output)
},
Type: PTPLsOutput{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
headers, _, _ := res.Request().Option("headers").Bool()
list, _ := res.Output().(*PTPLsOutput)
buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, listener := range list.Listeners {
if headers {
fmt.Fprintln(w, "Address\tProtocol")
}
fmt.Fprintf(w, "%s\t%s\n", listener.Address, listener.Protocol)
}
w.Flush()
return buf, nil
},
},
}
var ptpStreamLsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List active p2p streams.",
},
Options: []cmds.Option{
cmds.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote).").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
output := &PTPStreamsOutput{}
for _, s := range n.PTP.Streams.Streams {
output.Streams = append(output.Streams, PTPStreamInfoOutput{
HandlerID: strconv.FormatUint(s.HandlerID, 10),
Protocol: s.Protocol,
LocalPeer: s.LocalPeer.Pretty(),
LocalAddress: s.LocalAddr.String(),
RemotePeer: s.RemotePeer.Pretty(),
RemoteAddress: s.RemoteAddr.String(),
})
}
res.SetOutput(output)
},
Type: PTPStreamsOutput{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
headers, _, _ := res.Request().Option("headers").Bool()
list, _ := res.Output().(*PTPStreamsOutput)
buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, stream := range list.Streams {
if headers {
fmt.Fprintln(w, "HandlerID\tProtocol\tLocal\tRemote")
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer)
}
w.Flush()
return buf, nil
},
},
}
var ptpListenerListenCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Forward p2p connections to a network multiaddr.",
ShortDescription: `
Register a p2p connection handler and forward the connections to a specified address.
Note that the connections originate from the ipfs daemon process.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("Protocol", true, false, "Protocol identifier."),
cmds.StringArg("Address", true, false, "Request handling application address."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
proto := "/ptp/" + req.Arguments()[0]
if n.PTP.CheckProtoExists(proto) {
res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal)
return
}
addr, err := ma.NewMultiaddr(req.Arguments()[1])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
_, err = n.PTP.NewListener(n.Context(), proto, addr)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Successful response.
res.SetOutput(&PTPListenerInfoOutput{
Protocol: proto,
Address: addr.String(),
})
},
}
var ptpStreamDialCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Dial to a p2p listener.",
ShortDescription: `
Establish a new connection to a peer service.
When a connection is made to a peer service the ipfs daemon will setup one time
TCP listener and return it's bind port, this way a dialing application can
transparently connect to a p2p service.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("Peer", true, false, "Remote peer to connect to"),
cmds.StringArg("Protocol", true, false, "Protocol identifier."),
cmds.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
addr, peer, err := ParsePeerParam(req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
proto := "/ptp/" + req.Arguments()[1]
bindAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if len(req.Arguments()) == 3 {
bindAddr, err = ma.NewMultiaddr(req.Arguments()[2])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
listenerInfo, err := n.PTP.Dial(n.Context(), addr, peer, proto, bindAddr)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
output := PTPListenerInfoOutput{
Protocol: listenerInfo.Protocol,
Address: listenerInfo.Address.String(),
}
res.SetOutput(&output)
},
}
var ptpListenerCloseCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Close active p2p listener.",
},
Arguments: []cmds.Argument{
cmds.StringArg("Protocol", false, false, "P2P listener protocol"),
},
Options: []cmds.Option{
cmds.BoolOption("all", "a", "Close all listeners.").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
closeAll, _, _ := req.Option("all").Bool()
var proto string
if !closeAll {
if len(req.Arguments()) == 0 {
res.SetError(errors.New("no protocol name specified"), cmds.ErrNormal)
return
}
proto = "/ptp/" + req.Arguments()[0]
}
for _, listener := range n.PTP.Listeners.Listeners {
if !closeAll && listener.Protocol != proto {
continue
}
listener.Close()
if !closeAll {
break
}
}
},
}
var ptpStreamCloseCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Close active p2p stream.",
},
Arguments: []cmds.Argument{
cmds.StringArg("HandlerID", false, false, "Stream HandlerID"),
},
Options: []cmds.Option{
cmds.BoolOption("all", "a", "Close all streams.").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
closeAll, _, _ := req.Option("all").Bool()
var handlerID uint64
if !closeAll {
if len(req.Arguments()) == 0 {
res.SetError(errors.New("no HandlerID specified"), cmds.ErrNormal)
return
}
handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
for _, stream := range n.PTP.Streams.Streams {
if !closeAll && handlerID != stream.HandlerID {
continue
}
stream.Close()
if !closeAll {
break
}
}
},
}
func getNode(req cmds.Request) (*core.IpfsNode, error) {
n, err := req.InvocContext().GetNode()
if err != nil {
return nil, err
}
config, err := n.Repo.Config()
if err != nil {
return nil, err
}
if !config.Experimental.Libp2pStreamMounting {
return nil, errors.New("libp2p stream mounting not enabled")
}
if !n.OnlineMode() {
return nil, errNotOnline
}
return n, nil
}
......@@ -47,6 +47,7 @@ ADVANCED COMMANDS
pin Pin objects to local storage
repo Manipulate the IPFS repository
stats Various operational stats
ptp Libp2p stream mounting
filestore Manage the filestore (experimental)
NETWORK COMMANDS
......@@ -113,6 +114,7 @@ var rootSubcommands = map[string]*cmds.Command{
"object": ocmd.ObjectCmd,
"pin": PinCmd,
"ping": PingCmd,
"ptp": PTPCmd,
"pubsub": PubsubCmd,
"refs": RefsCmd,
"repo": RepoCmd,
......
......@@ -35,6 +35,7 @@ import (
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
ptp "github.com/ipfs/go-ipfs/ptp"
repo "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
nilrouting "github.com/ipfs/go-ipfs/routing/none"
......@@ -130,6 +131,7 @@ type IpfsNode struct {
IpnsRepub *ipnsrp.Republisher
Floodsub *floodsub.PubSub
PTP *ptp.PTP
proc goprocess.Process
ctx context.Context
......@@ -245,6 +247,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
n.PTP = ptp.NewPTP(n.Identity, n.PeerHost, n.Peerstore)
// setup local discovery
if do != nil {
service, err := do(ctx, n.PeerHost)
......
package corenet
import (
"time"
context "context"
core "github.com/ipfs/go-ipfs/core"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
type ipfsListener struct {
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
func (il *ipfsListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
func (il *ipfsListener) Close() error {
il.cancel()
// TODO: unregister handler from peerhost
return nil
}
func Listen(nd *core.IpfsNode, protocol string) (*ipfsListener, error) {
ctx, cancel := context.WithCancel(nd.Context())
list := &ipfsListener{
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
nd.PeerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Close()
}
})
return list, nil
}
func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30)
defer cancel()
err := nd.PeerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return nd.PeerHost.NewStream(nd.Context(), p, pro.ID(protocol))
}
package ptp
import (
"context"
"errors"
"time"
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
p2phost "gx/ipfs/QmUywuGNZoUKV8B9iyvup9bPkLiMrhTsyVMkeSXW5VxAfC/go-libp2p-host"
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
// PTP structure holds information on currently running streams/listeners
type PTP struct {
Listeners ListenerRegistry
Streams StreamRegistry
identity peer.ID
peerHost p2phost.Host
peerstore pstore.Peerstore
}
// NewPTP creates new PTP struct
func NewPTP(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *PTP {
return &PTP{
identity: identity,
peerHost: peerHost,
peerstore: peerstore,
}
}
func (ptp *PTP) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(ctx2, time.Second*30) //TODO: configurable?
defer cancel()
err := ptp.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return ptp.peerHost.NewStream(ctx2, p, pro.ID(protocol))
}
// Dial creates new P2P stream to a remote listener
func (ptp *PTP) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*ListenerInfo, error) {
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
listenerInfo := ListenerInfo{
Identity: ptp.identity,
Protocol: proto,
}
remote, err := ptp.newStreamTo(ctx, peer, proto)
if err != nil {
return nil, err
}
switch lnet {
case "tcp", "tcp4", "tcp6":
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Close(); err2 != nil {
return nil, err2
}
return nil, err
}
listenerInfo.Address = listener.Multiaddr()
listenerInfo.Closer = listener
listenerInfo.Running = true
go ptp.doAccept(&listenerInfo, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &listenerInfo, nil
}
func (ptp *PTP) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {
defer listener.Close()
local, err := listener.Accept()
if err != nil {
return
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &ptp.Streams,
}
ptp.Streams.Register(&stream)
stream.startStreaming()
}
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
// P2PListener holds information on a listener
type P2PListener struct {
peerHost p2phost.Host
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
// Accept waits for a connection from the listener
func (il *P2PListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
// Close closes the listener and removes stream handler
func (il *P2PListener) Close() error {
il.cancel()
il.peerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen creates new P2PListener
func (ptp *PTP) registerStreamHandler(ctx2 context.Context, protocol string) (*P2PListener, error) {
ctx, cancel := context.WithCancel(ctx2)
list := &P2PListener{
peerHost: ptp.peerHost,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
ptp.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Close()
}
})
return list, nil
}
// NewListener creates new ptp listener
func (ptp *PTP) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (*ListenerInfo, error) {
listener, err := ptp.registerStreamHandler(ctx, proto)
if err != nil {
return nil, err
}
listenerInfo := ListenerInfo{
Identity: ptp.identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &ptp.Listeners,
}
go ptp.acceptStreams(&listenerInfo, listener)
ptp.Listeners.Register(&listenerInfo)
return &listenerInfo, nil
}
func (ptp *PTP) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {
for listenerInfo.Running {
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(listenerInfo.Address)
if err != nil {
remote.Close()
continue
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &ptp.Streams,
}
ptp.Streams.Register(&stream)
stream.startStreaming()
}
ptp.Listeners.Deregister(listenerInfo.Protocol)
}
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func (ptp *PTP) CheckProtoExists(proto string) bool {
protos := ptp.peerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}
package ptp
import (
"fmt"
"io"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
// ListenerInfo holds information on a p2p listener.
type ListenerInfo struct {
// Application protocol identifier.
Protocol string
// Node identity
Identity peer.ID
// Local protocol stream address.
Address ma.Multiaddr
// Local protocol stream listener.
Closer io.Closer
// Flag indicating whether we're still accepting incoming connections, or
// whether this application listener has been shutdown.
Running bool
Registry *ListenerRegistry
}
// Close closes the listener. Does not affect child streams
func (c *ListenerInfo) Close() error {
c.Closer.Close()
err := c.Registry.Deregister(c.Protocol)
return err
}
// ListenerRegistry is a collection of local application protocol listeners.
type ListenerRegistry struct {
Listeners []*ListenerInfo
}
// Register registers listenerInfo2 in this registry
func (c *ListenerRegistry) Register(listenerInfo *ListenerInfo) {
c.Listeners = append(c.Listeners, listenerInfo)
}
// Deregister removes p2p listener from this registry
func (c *ListenerRegistry) Deregister(proto string) error {
foundAt := -1
for i, a := range c.Listeners {
if a.Protocol == proto {
foundAt = i
break
}
}
if foundAt != -1 {
c.Listeners = append(c.Listeners[:foundAt], c.Listeners[foundAt+1:]...)
return nil
}
return fmt.Errorf("failed to deregister proto %s", proto)
}
// StreamInfo holds information on active incoming and outgoing p2p streams.
type StreamInfo struct {
HandlerID uint64
Protocol string
LocalPeer peer.ID
LocalAddr ma.Multiaddr
RemotePeer peer.ID
RemoteAddr ma.Multiaddr
Local io.ReadWriteCloser
Remote io.ReadWriteCloser
Registry *StreamRegistry
}
// Close closes stream endpoints and deregisters it
func (s *StreamInfo) Close() error {
s.Local.Close()
s.Remote.Close()
s.Registry.Deregister(s.HandlerID)
return nil
}
func (s *StreamInfo) startStreaming() {
go func() {
io.Copy(s.Local, s.Remote)
s.Close()
}()
go func() {
io.Copy(s.Remote, s.Local)
s.Close()
}()
}
// StreamRegistry is a collection of active incoming and outgoing protocol app streams.
type StreamRegistry struct {
Streams []*StreamInfo
nextID uint64
}
// Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *StreamInfo) {
streamInfo.HandlerID = c.nextID
c.Streams = append(c.Streams, streamInfo)
c.nextID++
}
// Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.Streams {
if s.HandlerID == handlerID {
foundAt = i
break
}
}
if foundAt != -1 {
c.Streams = append(c.Streams[:foundAt], c.Streams[foundAt+1:]...)
}
}
package config
type Experiments struct {
FilestoreEnabled bool
ShardingEnabled bool
FilestoreEnabled bool
ShardingEnabled bool
Libp2pStreamMounting bool
}
......@@ -22,6 +22,10 @@ $(d)/go-timeout: test/dependencies/go-timeout
$(go-build)
TGTS_$(d) += $(d)/go-timeout
$(d)/ma-pipe-unidir: test/dependencies/ma-pipe-unidir
$(go-build)
TGTS_$(d) += $(d)/ma-pipe-unidir
TGTS_GX_$(d) := hang-fds iptb
TGTS_GX_$(d) := $(addprefix $(d)/,$(TGTS_GX_$(d)))
......
The MIT License (MIT)
Copyright (c) 2017 Łukasz Magiera <magik6k@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
package main
import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
)
const USAGE = "ma-pipe-unidir [-l|--listen] [--pidFile=path] [-h|--help] <send|recv> <multiaddr>\n"
type Opts struct {
Listen bool
PidFile string
}
func app() int {
opts := Opts{}
flag.BoolVar(&opts.Listen, "l", false, "")
flag.BoolVar(&opts.Listen, "listen", false, "")
flag.StringVar(&opts.PidFile, "pidFile", "", "")
flag.Usage = func() {
fmt.Print(USAGE)
}
flag.Parse()
args := flag.Args()
if len(args) < 2 { // <mode> <addr>
fmt.Print(USAGE)
return 1
}
mode := args[0]
addr := args[1]
if mode != "send" && mode != "recv" {
fmt.Print(USAGE)
return 1
}
if len(opts.PidFile) > 0 {
data := []byte(strconv.Itoa(os.Getpid()))
err := ioutil.WriteFile(opts.PidFile, data, 0644)
if err != nil {
return 1
}
defer os.Remove(opts.PidFile)
}
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return 1
}
var conn manet.Conn
if opts.Listen {
listener, err := manet.Listen(maddr)
if err != nil {
return 1
}
conn, err = listener.Accept()
if err != nil {
return 1
}
} else {
var err error
conn, err = manet.Dial(maddr)
if err != nil {
return 1
}
}
defer conn.Close()
switch mode {
case "recv":
io.Copy(os.Stdout, conn)
case "send":
io.Copy(conn, os.Stdin)
default:
return 1
}
return 0
}
func main() {
os.Exit(app())
}
......@@ -7,7 +7,7 @@ T_$(d) = $(sort $(wildcard $(d)/t[0-9][0-9][0-9][0-9]-*.sh))
DEPS_$(d) := test/bin/random test/bin/multihash test/bin/pollEndpoint \
test/bin/iptb test/bin/go-sleep test/bin/random-files \
test/bin/go-timeout test/bin/hang-fds
test/bin/go-timeout test/bin/hang-fds test/bin/ma-pipe-unidir
DEPS_$(d) += cmd/ipfs/ipfs
DEPS_$(d) += $(d)/clean-test-results
DEPS_$(d) += $(SHARNESS_$(d))
......
#!/bin/sh
test_description="Test experimental ptp commands"
. lib/test-lib.sh
# start iptb + wait for peering
test_expect_success 'init iptb' '
iptb init -n 2 --bootstrap=none --port=0
'
test_expect_success 'generate test data' '
echo "ABCDEF" > test0.bin &&
echo "012345" > test1.bin
'
startup_cluster 2
test_expect_success 'peer ids' '
PEERID_0=$(iptb get id 0) &&
PEERID_1=$(iptb get id 1)
'
test_expect_success "test ports are closed" '
(! (netstat -ln | grep "LISTEN" | grep ":10101 ")) &&
(! (netstat -ln | grep "LISTEN" | grep ":10102 "))
'
test_must_fail 'fail without config option being enabled' '
ipfsi 0 ptp stream ls
'
test_expect_success "enable filestore config setting" '
ipfsi 0 config --json Experimental.Libp2pStreamMounting true
ipfsi 1 config --json Experimental.Libp2pStreamMounting true
'
test_expect_success 'start ptp listener' '
ipfsi 0 ptp listener open ptp-test /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log
'
test_expect_success 'Test server to client communications' '
ma-pipe-unidir --listen send /ip4/127.0.0.1/tcp/10101 < test0.bin &
SERVER_PID=$!
ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log &&
ma-pipe-unidir recv /ip4/127.0.0.1/tcp/10102 > client.out &&
wait $SERVER_PID
'
test_expect_success 'Test client to server communications' '
ma-pipe-unidir --listen recv /ip4/127.0.0.1/tcp/10101 > server.out &
SERVER_PID=$!
ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log &&
ma-pipe-unidir send /ip4/127.0.0.1/tcp/10102 < test1.bin
wait $SERVER_PID
'
test_expect_success 'server to client output looks good' '
test_cmp client.out test0.bin
'
test_expect_success 'client to server output looks good' '
test_cmp server.out test1.bin
'
test_expect_success "'ipfs listener ptp ls' succeeds" '
echo "/ip4/127.0.0.1/tcp/10101 /ptp/ptp-test" > expected &&
ipfsi 0 ptp listener ls > actual
'
test_expect_success "'ipfs ptp listener ls' output looks good" '
test_cmp expected actual
'
test_expect_success "Cannot re-register app handler" '
(! ipfsi 0 ptp listener open ptp-test /ip4/127.0.0.1/tcp/10101)
'
test_expect_success "'ipfs ptp stream ls' output is empty" '
ipfsi 0 ptp stream ls > actual &&
test_must_be_empty actual
'
test_expect_success "Setup: Idle stream" '
ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 &
ipfsi 1 ptp stream dial $PEERID_0 ptp-test /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log &&
ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 &
go-sleep 500ms &&
kill -0 $(cat listener.pid) && kill -0 $(cat client.pid)
'
test_expect_success "'ipfs ptp stream ls' succeeds" '
echo "2 /ptp/ptp-test /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected
ipfsi 0 ptp stream ls > actual
'
test_expect_success "'ipfs ptp stream ls' output looks good" '
test_cmp expected actual
'
test_expect_success "'ipfs ptp stream close' closes stream" '
ipfsi 0 ptp stream close 2 &&
ipfsi 0 ptp stream ls > actual &&
[ ! -f listener.pid ] && [ ! -f client.pid ] &&
test_must_be_empty actual
'
test_expect_success "'ipfs ptp listener close' closes app handler" '
ipfsi 0 ptp listener close ptp-test &&
ipfsi 0 ptp listener ls > actual &&
test_must_be_empty actual
'
test_expect_success "Setup: Idle stream(2)" '
ma-pipe-unidir --listen --pidFile=listener.pid recv /ip4/127.0.0.1/tcp/10101 &
ipfsi 0 ptp listener open ptp-test2 /ip4/127.0.0.1/tcp/10101 2>&1 > listener-stdouterr.log &&
ipfsi 1 ptp stream dial $PEERID_0 ptp-test2 /ip4/127.0.0.1/tcp/10102 2>&1 > dialer-stdouterr.log &&
ma-pipe-unidir --pidFile=client.pid recv /ip4/127.0.0.1/tcp/10102 &
go-sleep 500ms &&
kill -0 $(cat listener.pid) && kill -0 $(cat client.pid)
'
test_expect_success "'ipfs ptp stream ls' succeeds(2)" '
echo "3 /ptp/ptp-test2 /ip4/127.0.0.1/tcp/10101 $PEERID_1" > expected
ipfsi 0 ptp stream ls > actual
test_cmp expected actual
'
test_expect_success "'ipfs ptp listener close -a' closes app handlers" '
ipfsi 0 ptp listener close -a &&
ipfsi 0 ptp listener ls > actual &&
test_must_be_empty actual
'
test_expect_success "'ipfs ptp stream close -a' closes streams" '
ipfsi 0 ptp stream close -a &&
ipfsi 0 ptp stream ls > actual &&
[ ! -f listener.pid ] && [ ! -f client.pid ] &&
test_must_be_empty actual
'
test_expect_success "'ipfs ptp listener close' closes app numeric handlers" '
ipfsi 0 ptp listener open 1234 /ip4/127.0.0.1/tcp/10101 &&
ipfsi 0 ptp listener close 1234 &&
ipfsi 0 ptp listener ls > actual &&
test_must_be_empty actual
'
test_expect_success 'stop iptb' '
iptb stop
'
test_done
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论