提交 c0cc9511 作者: Juan Batiz-Benet

updated goprocess

上级 708e47fc
......@@ -156,7 +156,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "162148a58668ca38b0b8f0459ccc6ca88e32f1f4"
"Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8"
},
{
"ImportPath": "github.com/kr/binarydist",
......
......@@ -113,7 +113,7 @@ type Process interface {
// }
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc)
Go(f ProcessFunc) Process
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
......@@ -121,6 +121,10 @@ type Process interface {
// If the process has already been closed, Close returns immediately.
Close() error
// CloseAfterChildren calls Close _after_ its children have Closed
// normally (i.e. it _does not_ attempt to close them).
CloseAfterChildren() error
// Closing is a signal to wait upon. The returned channel is closed
// _after_ Close has been called at least once, but teardown may or may
// not be done yet. The primary use case of Closing is for children who
......@@ -167,7 +171,19 @@ var nilProcessFunc = func(Process) {}
//
// This is because having the process you
func Go(f ProcessFunc) Process {
return GoChild(Background(), f)
// return GoChild(Background(), f)
// we use two processes, one for communication, and
// one for ensuring we wait on the function (unclosable from the outside).
p := newProcess(nil)
waitFor := newProcess(nil)
p.WaitFor(waitFor) // prevent p from closing
go func() {
f(p)
waitFor.Close() // allow p to close.
p.Close() // ensure p closes.
}()
return p
}
// GoChild is like Go, but it registers the returned Process as a child of parent,
......
......@@ -289,7 +289,6 @@ func TestAddChild(t *testing.T) {
func TestGoChildrenClose(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
var bWait = make(chan struct{})
var cWait = make(chan struct{})
......@@ -335,10 +334,85 @@ func TestGoChildrenClose(t *testing.T) {
go a.Close()
testNone(t, Q)
bWait <- struct{}{} // relase b
go b.Close()
testNone(t, Q)
cWait <- struct{}{} // relase c
<-c.Closed()
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
eWait <- struct{}{} // release e
<-e.Closed()
testStrs(t, Q, "e")
dWait <- struct{}{} // releasse d
<-d.Closed()
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
}
func TestCloseAfterChildren(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
a = WithParent(Background())
a.Go(func(p Process) {
b = p
b.Go(func(p Process) {
c = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
})
a.Go(func(p Process) {
d = p
d.Go(func(p Process) {
e = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
})
<-ready
<-ready
<-ready
<-ready
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
aDone := make(chan struct{})
bDone := make(chan struct{})
testNone(t, Q)
go func() {
a.CloseAfterChildren()
aDone <- struct{}{}
}()
testNone(t, Q)
go func() {
b.CloseAfterChildren()
bDone <- struct{}{}
}()
testNone(t, Q)
c.Close()
<-bDone
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
......@@ -346,6 +420,7 @@ func TestGoChildrenClose(t *testing.T) {
testStrs(t, Q, "e")
d.Close()
<-aDone
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
......@@ -354,11 +429,7 @@ func TestGoChildrenClose(t *testing.T) {
func TestBackground(t *testing.T) {
// test it hangs indefinitely:
b := Background()
go b.Close()
go func() {
b.Close()
}()
select {
case <-b.Closing():
......
......@@ -9,6 +9,7 @@ type process struct {
children []Process // process to close with us
waitfors []Process // process to only wait for
teardown TeardownFunc // called to run the teardown logic.
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeErr error // error to return to clients of Close()
......@@ -73,13 +74,18 @@ func (p *process) AddChild(child Process) {
p.Unlock()
}
func (p *process) Go(f ProcessFunc) {
func (p *process) Go(f ProcessFunc) Process {
child := newProcess(nil)
p.AddChild(child)
waitFor := newProcess(nil)
child.WaitFor(waitFor) // prevent child from closing
go func() {
f(child)
child.Close() // close to tear down.
waitFor.Close() // allow child to close.
child.Close() // close to tear down.
}()
return child
}
// Close is the external close function.
......@@ -125,3 +131,46 @@ func (p *process) doClose() {
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}
// We will only wait on the children we have now.
// We will not wait on children added subsequently.
// this may change in the future.
func (p *process) CloseAfterChildren() error {
p.Lock()
select {
case <-p.Closed():
p.Unlock()
return p.Close() // get error. safe, after p.Closed()
case <-p.waiting: // already called it.
p.Unlock()
<-p.Closed()
return p.Close() // get error. safe, after p.Closed()
default:
}
p.Unlock()
// here only from one goroutine.
nextToWaitFor := func() Process {
p.Lock()
defer p.Unlock()
for _, e := range p.waitfors {
select {
case <-e.Closed():
default:
return e
}
}
return nil
}
// wait for all processes we're waiting for are closed.
// the semantics here are simple: we will _only_ close
// if there are no processes currently waiting for.
for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
<-next.Closed()
}
// YAY! we're done. close
return p.Close()
}
......@@ -45,29 +45,14 @@ func NewRateLimiter(parent process.Process, limit int) *RateLimiter {
func (rl *RateLimiter) LimitedGo(f process.ProcessFunc) {
<-rl.limiter
rl.Go(func(child process.Process) {
p := rl.Go(f)
// call the function as rl.Go would.
f(child)
// this close is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
// we have two options:
// * this approach (which is what process.Go itself does), or
// * spawn another goroutine that waits on <-child.Closed()
//
// go func() {
// <-child.Closed()
// rl.limiter <- struct{}{}
// }()
//
// This approach saves a goroutine. It is fine to call child.Close()
// multiple times.
child.Close()
// after it's done.
// this <-closed() is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
go func() {
<-p.Closed()
rl.limiter <- struct{}{}
})
}()
}
// LimitChan returns a rate-limiting channel. it is the usual, simple,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论