提交 058edaff 作者: Łukasz Magiera

p2p: make registries thread safer

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 df6540e0
package p2p package p2p
import (
"sync"
)
type Listener interface { type Listener interface {
Protocol() string Protocol() string
ListenAddress() string ListenAddress() string
...@@ -18,16 +22,23 @@ type listenerKey struct { ...@@ -18,16 +22,23 @@ type listenerKey struct {
// ListenerRegistry is a collection of local application proto listeners. // ListenerRegistry is a collection of local application proto listeners.
type ListenerRegistry struct { type ListenerRegistry struct {
Listeners map[listenerKey]Listener Listeners map[listenerKey]Listener
lk *sync.Mutex
} }
// Register registers listenerInfo in this registry // Register registers listenerInfo in this registry
func (c *ListenerRegistry) Register(l Listener) { func (r *ListenerRegistry) Register(l Listener) {
c.Listeners[getListenerKey(l)] = l r.lk.Lock()
defer r.lk.Unlock()
r.Listeners[getListenerKey(l)] = l
} }
// Deregister removes p2p listener from this registry // Deregister removes p2p listener from this registry
func (c *ListenerRegistry) Deregister(k listenerKey) { func (r *ListenerRegistry) Deregister(k listenerKey) {
delete(c.Listeners, k) r.lk.Lock()
defer r.lk.Unlock()
delete(r.Listeners, k)
} }
func getListenerKey(l Listener) listenerKey { func getListenerKey(l Listener) listenerKey {
......
package p2p package p2p
import ( import (
"sync"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore" pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host" p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
...@@ -25,9 +27,11 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) ...@@ -25,9 +27,11 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
Listeners: &ListenerRegistry{ Listeners: &ListenerRegistry{
Listeners: map[listenerKey]Listener{}, Listeners: map[listenerKey]Listener{},
lk: &sync.Mutex{},
}, },
Streams: &StreamRegistry{ Streams: &StreamRegistry{
Streams: map[uint64]*Stream{}, Streams: map[uint64]*Stream{},
lk: &sync.Mutex{},
}, },
} }
} }
......
...@@ -38,7 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad ...@@ -38,7 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad
return return
} }
stream := Stream{ stream := &Stream{
Protocol: proto, Protocol: proto,
OriginAddr: remote.Conn().RemoteMultiaddr(), OriginAddr: remote.Conn().RemoteMultiaddr(),
...@@ -50,7 +50,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad ...@@ -50,7 +50,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad
Registry: p2p.Streams, Registry: p2p.Streams,
} }
p2p.Streams.Register(&stream) p2p.Streams.Register(stream)
stream.startStreaming() stream.startStreaming()
}) })
......
...@@ -2,6 +2,7 @@ package p2p ...@@ -2,6 +2,7 @@ package p2p
import ( import (
"io" "io"
"sync"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net" net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net"
...@@ -58,18 +59,25 @@ func (s *Stream) startStreaming() { ...@@ -58,18 +59,25 @@ func (s *Stream) startStreaming() {
// StreamRegistry is a collection of active incoming and outgoing proto app streams. // StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct { type StreamRegistry struct {
Streams map[uint64]*Stream Streams map[uint64]*Stream
lk *sync.Mutex
nextId uint64 nextId uint64
} }
// Register registers a stream to the registry // Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *Stream) { func (r *StreamRegistry) Register(streamInfo *Stream) {
streamInfo.Id = c.nextId r.lk.Lock()
c.Streams[c.nextId] = streamInfo defer r.lk.Unlock()
c.nextId++
streamInfo.Id = r.nextId
r.Streams[r.nextId] = streamInfo
r.nextId++
} }
// Deregister deregisters stream from the registry // Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(streamId uint64) { func (r *StreamRegistry) Deregister(streamId uint64) {
delete(c.Streams, streamId) r.lk.Lock()
defer r.lk.Unlock()
delete(r.Streams, streamId)
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论