提交 b63cbf53 作者: Juan Batiz-Benet

updated peerstream

上级 b01663ed
...@@ -165,7 +165,7 @@ ...@@ -165,7 +165,7 @@
}, },
{ {
"ImportPath": "github.com/jbenet/go-peerstream", "ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "530b09b2300da11cc19f479289be5d014c146581" "Rev": "bbe2a6461aa80ee25fd87eccf35bd54bac7f788d"
}, },
{ {
"ImportPath": "github.com/jbenet/go-random", "ImportPath": "github.com/jbenet/go-random",
......
language: go language: go
go: go:
- 1.2
- 1.3 - 1.3
- 1.4 - 1.4
- release - release
- tip - tip
script: script:
- go test -race -cpu=5 -v ./... - go test -race -cpu=5 ./...
...@@ -122,7 +122,11 @@ func (c *Conn) Close() error { ...@@ -122,7 +122,11 @@ func (c *Conn) Close() error {
// close underlying connection // close underlying connection
c.swarm.removeConn(c) c.swarm.removeConn(c)
return c.pstConn.Close() err := c.pstConn.Close()
c.swarm.notifyAll(func(n Notifiee) {
n.Disconnected(c)
})
return err
} }
// ConnsWithGroup narrows down a set of connections to those in a given group. // ConnsWithGroup narrows down a set of connections to those in a given group.
...@@ -198,6 +202,9 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { ...@@ -198,6 +202,9 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
s.StreamHandler()(stream) // call our handler s.StreamHandler()(stream) // call our handler
}) })
s.notifyAll(func(n Notifiee) {
n.Connected(c)
})
return c, nil return c, nil
} }
...@@ -228,6 +235,10 @@ func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream { ...@@ -228,6 +235,10 @@ func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream {
c.streams[stream] = struct{}{} c.streams[stream] = struct{}{}
s.streamLock.Unlock() s.streamLock.Unlock()
c.streamLock.Unlock() c.streamLock.Unlock()
s.notifyAll(func(n Notifiee) {
n.OpenedStream(stream)
})
return stream return stream
} }
...@@ -241,7 +252,11 @@ func (s *Swarm) removeStream(stream *Stream) error { ...@@ -241,7 +252,11 @@ func (s *Swarm) removeStream(stream *Stream) error {
s.streamLock.Unlock() s.streamLock.Unlock()
stream.conn.streamLock.Unlock() stream.conn.streamLock.Unlock()
return stream.pstStream.Close() err := stream.pstStream.Close()
s.notifyAll(func(n Notifiee) {
n.ClosedStream(stream)
})
return err
} }
func (s *Swarm) removeConn(conn *Conn) { func (s *Swarm) removeConn(conn *Conn) {
......
...@@ -39,6 +39,10 @@ type Swarm struct { ...@@ -39,6 +39,10 @@ type Swarm struct {
streamHandler StreamHandler // receives Streams initiated remotely streamHandler StreamHandler // receives Streams initiated remotely
selectConn SelectConn // default SelectConn function selectConn SelectConn // default SelectConn function
// notification listeners
notifiees map[Notifiee]struct{}
notifieeLock sync.RWMutex
closed chan struct{} closed chan struct{}
} }
...@@ -48,6 +52,7 @@ func NewSwarm(t pst.Transport) *Swarm { ...@@ -48,6 +52,7 @@ func NewSwarm(t pst.Transport) *Swarm {
streams: make(map[*Stream]struct{}), streams: make(map[*Stream]struct{}),
conns: make(map[*Conn]struct{}), conns: make(map[*Conn]struct{}),
listeners: make(map[*Listener]struct{}), listeners: make(map[*Listener]struct{}),
notifiees: make(map[Notifiee]struct{}),
selectConn: SelectRandomConn, selectConn: SelectRandomConn,
streamHandler: NoOpStreamHandler, streamHandler: NoOpStreamHandler,
connHandler: NoOpConnHandler, connHandler: NoOpConnHandler,
...@@ -361,3 +366,37 @@ func (s *Swarm) connGarbageCollect() { ...@@ -361,3 +366,37 @@ func (s *Swarm) connGarbageCollect() {
} }
} }
} }
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(n Notifiee) {
s.notifieeLock.Lock()
s.notifiees[n] = struct{}{}
s.notifieeLock.Unlock()
}
// StopNotify unregisters Notifiee fromr receiving signals
func (s *Swarm) StopNotify(n Notifiee) {
s.notifieeLock.Lock()
delete(s.notifiees, n)
s.notifieeLock.Unlock()
}
// notifyAll runs the notification function on all Notifiees
func (s *Swarm) notifyAll(notification func(n Notifiee)) {
s.notifieeLock.RLock()
for n := range s.notifiees {
// make sure we dont block
// and they dont block each other.
go notification(n)
}
s.notifieeLock.RUnlock()
}
// Notifiee is an interface for an object wishing to receive
// notifications from a Swarm
type Notifiee interface {
Connected(*Conn) // called when a connection opened
Disconnected(*Conn) // called when a connection closed
OpenedStream(*Stream) // called when a stream opened
ClosedStream(*Stream) // called when a stream closed
}
...@@ -350,6 +350,8 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, ...@@ -350,6 +350,8 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
for _, a := range swarms { for _, a := range swarms {
for _, b := range swarms { for _, b := range swarms {
wg.Add(1) wg.Add(1)
a := a // race
b := b // race
go rateLimit(func() { go rateLimit(func() {
defer wg.Done() defer wg.Done()
openConnsAndRW(a, b) openConnsAndRW(a, b)
...@@ -370,6 +372,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, ...@@ -370,6 +372,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
t.Error(err) t.Error(err)
} }
for _, s := range swarms {
s.Close()
}
} }
func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) { func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论