提交 e0bd4a11 作者: Łukasz Magiera

Corenet API: Fix codeclimate issues

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 a3889a71
......@@ -45,6 +45,7 @@ type CorenetStreamsOutput struct {
Streams []CorenetStreamInfoOutput
}
// CorenetCmd is the 'ipfs corenet' command
var CorenetCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Libp2p stream mounting.",
......@@ -55,15 +56,15 @@ Note: this command is experimental and subject to change as usecases and APIs ar
},
Subcommands: map[string]*cmds.Command{
"ls": CorenetLsCmd,
"streams": CorenetStreamsCmd,
"dial": CorenetDialCmd,
"listen": CorenetListenCmd,
"close": CorenetCloseCmd,
"ls": corenetLsCmd,
"streams": corenetStreamsCmd,
"dial": corenetDialCmd,
"listen": corenetListenCmd,
"close": corenetCloseCmd,
},
}
var CorenetLsCmd = &cmds.Command{
var corenetLsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List active application protocol listeners.",
},
......@@ -120,7 +121,7 @@ var CorenetLsCmd = &cmds.Command{
},
}
var CorenetStreamsCmd = &cmds.Command{
var corenetStreamsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List active application protocol streams.",
},
......@@ -184,7 +185,7 @@ var CorenetStreamsCmd = &cmds.Command{
},
}
var CorenetListenCmd = &cmds.Command{
var corenetListenCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Create application protocol listener and proxy to network multiaddr.",
},
......@@ -306,7 +307,7 @@ func startStreaming(stream *corenet.StreamInfo) {
}()
}
var CorenetDialCmd = &cmds.Command{
var corenetDialCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Dial to an application service.",
},
......@@ -427,7 +428,7 @@ func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listene
startStreaming(&stream)
}
var CorenetCloseCmd = &cmds.Command{
var corenetCloseCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Closes an active stream listener or client.",
},
......
......@@ -28,6 +28,7 @@ type AppInfo struct {
Registry *AppRegistry
}
// Close closes the listener. Does not affect child streams
func (c *AppInfo) Close() error {
c.Registry.Deregister(c.Protocol)
c.Closer.Close()
......@@ -39,10 +40,12 @@ type AppRegistry struct {
Apps []*AppInfo
}
// Register registers appInfo in this registry
func (c *AppRegistry) Register(appInfo *AppInfo) {
c.Apps = append(c.Apps, appInfo)
}
// Deregister deregisters protocol handler from this registry
func (c *AppRegistry) Deregister(proto string) {
foundAt := -1
for i, a := range c.Apps {
......
package corenet
// Corenet structure holds information on currently running streams/apps
type Corenet struct {
Apps AppRegistry
Streams StreamRegistry
}
// NewCorenet creates new Corenet struct
func NewCorenet() *Corenet {
return &Corenet{}
}
......@@ -11,12 +11,14 @@ import (
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
type ipfsListener struct {
// IpfsListener holds information on a listener
type IpfsListener struct {
node *core.IpfsNode
conCh chan net.Stream
proto pro.ID
......@@ -24,7 +26,8 @@ type ipfsListener struct {
cancel func()
}
func (il *ipfsListener) Accept() (net.Stream, error) {
// Accept waits for a connection from the listener
func (il *IpfsListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
......@@ -33,16 +36,18 @@ func (il *ipfsListener) Accept() (net.Stream, error) {
}
}
func (il *ipfsListener) Close() error {
// Close closes the listener and removes stream handler
func (il *IpfsListener) Close() error {
il.cancel()
il.node.PeerHost.RemoveStreamHandler(il.proto)
return nil
}
func Listen(nd *core.IpfsNode, protocol string) (*ipfsListener, error) {
// Listen creates new IpfsListener
func Listen(nd *core.IpfsNode, protocol string) (*IpfsListener, error) {
ctx, cancel := context.WithCancel(nd.Context())
list := &ipfsListener{
list := &IpfsListener{
node: nd,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
......@@ -61,6 +66,7 @@ func Listen(nd *core.IpfsNode, protocol string) (*ipfsListener, error) {
return list, nil
}
// Dial dials to a specified node and protocol
func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30)
defer cancel()
......
......@@ -25,6 +25,7 @@ type StreamInfo struct {
Registry *StreamRegistry
}
// Close closes stream endpoints and deregisters it
func (c *StreamInfo) Close() error {
c.Local.Close()
c.Remote.Close()
......@@ -39,12 +40,14 @@ type StreamRegistry struct {
nextID uint64
}
// Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *StreamInfo) {
streamInfo.HandlerID = c.nextID
c.Streams = append(c.Streams, streamInfo)
c.nextID++
}
// Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(handlerID uint64) {
foundAt := -1
for i, s := range c.Streams {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论