Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
G
go-ipfs
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
jihao
go-ipfs
Commits
6956dad5
Unverified
提交
6956dad5
authored
2月 15, 2018
作者:
Whyrusleeping
提交者:
GitHub
2月 15, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4712 from ipfs/extract/godep-pubsub
Point briantigerchow/pubsub GoDep'ed module to the gx'ed version
上级
476ad38e
6950d068
隐藏空白字符变更
内嵌
并排
正在显示
6 个修改的文件
包含
8 行增加
和
519 行删除
+8
-519
Godeps.json
Godeps/Godeps.json
+0
-4
README.md
..._workspace/src/github.com/briantigerchow/pubsub/README.md
+0
-30
pubsub.go
..._workspace/src/github.com/briantigerchow/pubsub/pubsub.go
+0
-235
pubsub_test.go
...space/src/github.com/briantigerchow/pubsub/pubsub_test.go
+0
-247
notifications.go
exchange/bitswap/notifications/notifications.go
+2
-3
package.json
package.json
+6
-0
没有找到文件。
Godeps/Godeps.json
浏览文件 @
6956dad5
...
...
@@ -10,10 +10,6 @@
"Rev"
:
"e4fcc9a2c7567d1c42861deebeb483315d222262"
},
{
"ImportPath"
:
"github.com/briantigerchow/pubsub"
,
"Rev"
:
"39ce5f556423a4c7223b370fa17a3bbd75b2d197"
},
{
"ImportPath"
:
"github.com/camlistore/lock"
,
"Rev"
:
"ae27720f340952636b826119b58130b9c1a847a0"
},
...
...
Godeps/_workspace/src/github.com/briantigerchow/pubsub/README.md
deleted
100644 → 0
浏览文件 @
476ad38e
Install pubsub with,
go get github.com/tuxychandru/pubsub
View the
[
API Documentation
](
http://godoc.org/github.com/tuxychandru/pubsub
)
.
## License
Copyright (c) 2013, Chandra Sekar S
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1.
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2.
Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub.go
deleted
100644 → 0
浏览文件 @
476ad38e
// Copyright 2013, Chandra Sekar S. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the README.md file.
// Package pubsub implements a simple multi-topic pub-sub
// library.
//
// Topics must be strings and messages of any type can be
// published. A topic can have any number of subcribers and
// all of them receive messages published on the topic.
package
pubsub
type
operation
int
const
(
sub
operation
=
iota
subOnce
subOnceEach
pub
unsub
unsubAll
closeTopic
shutdown
)
// PubSub is a collection of topics.
type
PubSub
struct
{
cmdChan
chan
cmd
capacity
int
}
type
cmd
struct
{
op
operation
topics
[]
string
ch
chan
interface
{}
msg
interface
{}
}
// New creates a new PubSub and starts a goroutine for handling operations.
// The capacity of the channels created by Sub and SubOnce will be as specified.
func
New
(
capacity
int
)
*
PubSub
{
ps
:=
&
PubSub
{
make
(
chan
cmd
),
capacity
}
go
ps
.
start
()
return
ps
}
// Sub returns a channel on which messages published on any of
// the specified topics can be received.
func
(
ps
*
PubSub
)
Sub
(
topics
...
string
)
chan
interface
{}
{
return
ps
.
sub
(
sub
,
topics
...
)
}
// SubOnce is similar to Sub, but only the first message published, after subscription,
// on any of the specified topics can be received.
func
(
ps
*
PubSub
)
SubOnce
(
topics
...
string
)
chan
interface
{}
{
return
ps
.
sub
(
subOnce
,
topics
...
)
}
// SubOnceEach returns a channel on which callers receive, at most, one message
// for each topic.
func
(
ps
*
PubSub
)
SubOnceEach
(
topics
...
string
)
chan
interface
{}
{
return
ps
.
sub
(
subOnceEach
,
topics
...
)
}
func
(
ps
*
PubSub
)
sub
(
op
operation
,
topics
...
string
)
chan
interface
{}
{
ch
:=
make
(
chan
interface
{},
ps
.
capacity
)
ps
.
cmdChan
<-
cmd
{
op
:
op
,
topics
:
topics
,
ch
:
ch
}
return
ch
}
// AddSub adds subscriptions to an existing channel.
func
(
ps
*
PubSub
)
AddSub
(
ch
chan
interface
{},
topics
...
string
)
{
ps
.
cmdChan
<-
cmd
{
op
:
sub
,
topics
:
topics
,
ch
:
ch
}
}
// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach
// behavior.
func
(
ps
*
PubSub
)
AddSubOnceEach
(
ch
chan
interface
{},
topics
...
string
)
{
ps
.
cmdChan
<-
cmd
{
op
:
subOnceEach
,
topics
:
topics
,
ch
:
ch
}
}
// Pub publishes the given message to all subscribers of
// the specified topics.
func
(
ps
*
PubSub
)
Pub
(
msg
interface
{},
topics
...
string
)
{
ps
.
cmdChan
<-
cmd
{
op
:
pub
,
topics
:
topics
,
msg
:
msg
}
}
// Unsub unsubscribes the given channel from the specified
// topics. If no topic is specified, it is unsubscribed
// from all topics.
func
(
ps
*
PubSub
)
Unsub
(
ch
chan
interface
{},
topics
...
string
)
{
if
len
(
topics
)
==
0
{
ps
.
cmdChan
<-
cmd
{
op
:
unsubAll
,
ch
:
ch
}
return
}
ps
.
cmdChan
<-
cmd
{
op
:
unsub
,
topics
:
topics
,
ch
:
ch
}
}
// Close closes all channels currently subscribed to the specified topics.
// If a channel is subscribed to multiple topics, some of which is
// not specified, it is not closed.
func
(
ps
*
PubSub
)
Close
(
topics
...
string
)
{
ps
.
cmdChan
<-
cmd
{
op
:
closeTopic
,
topics
:
topics
}
}
// Shutdown closes all subscribed channels and terminates the goroutine.
func
(
ps
*
PubSub
)
Shutdown
()
{
ps
.
cmdChan
<-
cmd
{
op
:
shutdown
}
}
func
(
ps
*
PubSub
)
start
()
{
reg
:=
registry
{
topics
:
make
(
map
[
string
]
map
[
chan
interface
{}]
subtype
),
revTopics
:
make
(
map
[
chan
interface
{}]
map
[
string
]
bool
),
}
loop
:
for
cmd
:=
range
ps
.
cmdChan
{
if
cmd
.
topics
==
nil
{
switch
cmd
.
op
{
case
unsubAll
:
reg
.
removeChannel
(
cmd
.
ch
)
case
shutdown
:
break
loop
}
continue
loop
}
for
_
,
topic
:=
range
cmd
.
topics
{
switch
cmd
.
op
{
case
sub
:
reg
.
add
(
topic
,
cmd
.
ch
,
stNorm
)
case
subOnce
:
reg
.
add
(
topic
,
cmd
.
ch
,
stOnceAny
)
case
subOnceEach
:
reg
.
add
(
topic
,
cmd
.
ch
,
stOnceEach
)
case
pub
:
reg
.
send
(
topic
,
cmd
.
msg
)
case
unsub
:
reg
.
remove
(
topic
,
cmd
.
ch
)
case
closeTopic
:
reg
.
removeTopic
(
topic
)
}
}
}
for
topic
,
chans
:=
range
reg
.
topics
{
for
ch
,
_
:=
range
chans
{
reg
.
remove
(
topic
,
ch
)
}
}
}
// registry maintains the current subscription state. It's not
// safe to access a registry from multiple goroutines simultaneously.
type
registry
struct
{
topics
map
[
string
]
map
[
chan
interface
{}]
subtype
revTopics
map
[
chan
interface
{}]
map
[
string
]
bool
}
type
subtype
int
const
(
stOnceAny
=
iota
stOnceEach
stNorm
)
func
(
reg
*
registry
)
add
(
topic
string
,
ch
chan
interface
{},
st
subtype
)
{
if
reg
.
topics
[
topic
]
==
nil
{
reg
.
topics
[
topic
]
=
make
(
map
[
chan
interface
{}]
subtype
)
}
reg
.
topics
[
topic
][
ch
]
=
st
if
reg
.
revTopics
[
ch
]
==
nil
{
reg
.
revTopics
[
ch
]
=
make
(
map
[
string
]
bool
)
}
reg
.
revTopics
[
ch
][
topic
]
=
true
}
func
(
reg
*
registry
)
send
(
topic
string
,
msg
interface
{})
{
for
ch
,
st
:=
range
reg
.
topics
[
topic
]
{
ch
<-
msg
switch
st
{
case
stOnceAny
:
for
topic
:=
range
reg
.
revTopics
[
ch
]
{
reg
.
remove
(
topic
,
ch
)
}
case
stOnceEach
:
reg
.
remove
(
topic
,
ch
)
}
}
}
func
(
reg
*
registry
)
removeTopic
(
topic
string
)
{
for
ch
:=
range
reg
.
topics
[
topic
]
{
reg
.
remove
(
topic
,
ch
)
}
}
func
(
reg
*
registry
)
removeChannel
(
ch
chan
interface
{})
{
for
topic
:=
range
reg
.
revTopics
[
ch
]
{
reg
.
remove
(
topic
,
ch
)
}
}
func
(
reg
*
registry
)
remove
(
topic
string
,
ch
chan
interface
{})
{
if
_
,
ok
:=
reg
.
topics
[
topic
];
!
ok
{
return
}
if
_
,
ok
:=
reg
.
topics
[
topic
][
ch
];
!
ok
{
return
}
delete
(
reg
.
topics
[
topic
],
ch
)
delete
(
reg
.
revTopics
[
ch
],
topic
)
if
len
(
reg
.
topics
[
topic
])
==
0
{
delete
(
reg
.
topics
,
topic
)
}
if
len
(
reg
.
revTopics
[
ch
])
==
0
{
close
(
ch
)
delete
(
reg
.
revTopics
,
ch
)
}
}
Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub_test.go
deleted
100644 → 0
浏览文件 @
476ad38e
// Copyright 2013, Chandra Sekar S. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the README.md file.
package
pubsub
import
(
check
"gopkg.in/check.v1"
"runtime"
"testing"
"time"
)
var
_
=
check
.
Suite
(
new
(
Suite
))
func
Test
(
t
*
testing
.
T
)
{
check
.
TestingT
(
t
)
}
type
Suite
struct
{}
func
(
s
*
Suite
)
TestSub
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch1
:=
ps
.
Sub
(
"t1"
)
ch2
:=
ps
.
Sub
(
"t1"
)
ch3
:=
ps
.
Sub
(
"t2"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch1
,
check
.
Equals
,
"hi"
)
c
.
Check
(
<-
ch2
,
check
.
Equals
,
"hi"
)
ps
.
Pub
(
"hello"
,
"t2"
)
c
.
Check
(
<-
ch3
,
check
.
Equals
,
"hello"
)
ps
.
Shutdown
()
_
,
ok
:=
<-
ch1
c
.
Check
(
ok
,
check
.
Equals
,
false
)
_
,
ok
=
<-
ch2
c
.
Check
(
ok
,
check
.
Equals
,
false
)
_
,
ok
=
<-
ch3
c
.
Check
(
ok
,
check
.
Equals
,
false
)
}
func
(
s
*
Suite
)
TestSubOnce
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
SubOnce
(
"t1"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hi"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestAddSub
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch1
:=
ps
.
Sub
(
"t1"
)
ch2
:=
ps
.
Sub
(
"t2"
)
ps
.
Pub
(
"hi1"
,
"t1"
)
c
.
Check
(
<-
ch1
,
check
.
Equals
,
"hi1"
)
ps
.
Pub
(
"hi2"
,
"t2"
)
c
.
Check
(
<-
ch2
,
check
.
Equals
,
"hi2"
)
ps
.
AddSub
(
ch1
,
"t2"
,
"t3"
)
ps
.
Pub
(
"hi3"
,
"t2"
)
c
.
Check
(
<-
ch1
,
check
.
Equals
,
"hi3"
)
c
.
Check
(
<-
ch2
,
check
.
Equals
,
"hi3"
)
ps
.
Pub
(
"hi4"
,
"t3"
)
c
.
Check
(
<-
ch1
,
check
.
Equals
,
"hi4"
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestUnsub
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
Sub
(
"t1"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hi"
)
ps
.
Unsub
(
ch
,
"t1"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestUnsubAll
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch1
:=
ps
.
Sub
(
"t1"
,
"t2"
,
"t3"
)
ch2
:=
ps
.
Sub
(
"t1"
,
"t3"
)
ps
.
Unsub
(
ch1
)
m
,
ok
:=
<-
ch1
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Pub
(
"hi"
,
"t1"
)
m
,
ok
=
<-
ch2
c
.
Check
(
m
,
check
.
Equals
,
"hi"
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestClose
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch1
:=
ps
.
Sub
(
"t1"
)
ch2
:=
ps
.
Sub
(
"t1"
)
ch3
:=
ps
.
Sub
(
"t2"
)
ch4
:=
ps
.
Sub
(
"t3"
)
ps
.
Pub
(
"hi"
,
"t1"
)
ps
.
Pub
(
"hello"
,
"t2"
)
c
.
Check
(
<-
ch1
,
check
.
Equals
,
"hi"
)
c
.
Check
(
<-
ch2
,
check
.
Equals
,
"hi"
)
c
.
Check
(
<-
ch3
,
check
.
Equals
,
"hello"
)
ps
.
Close
(
"t1"
,
"t2"
)
_
,
ok
:=
<-
ch1
c
.
Check
(
ok
,
check
.
Equals
,
false
)
_
,
ok
=
<-
ch2
c
.
Check
(
ok
,
check
.
Equals
,
false
)
_
,
ok
=
<-
ch3
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Pub
(
"welcome"
,
"t3"
)
c
.
Check
(
<-
ch4
,
check
.
Equals
,
"welcome"
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestUnsubAfterClose
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
Sub
(
"t1"
)
defer
func
()
{
ps
.
Unsub
(
ch
,
"t1"
)
ps
.
Shutdown
()
}()
ps
.
Close
(
"t1"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
}
func
(
s
*
Suite
)
TestShutdown
(
c
*
check
.
C
)
{
start
:=
runtime
.
NumGoroutine
()
New
(
10
)
.
Shutdown
()
time
.
Sleep
(
1
)
c
.
Check
(
runtime
.
NumGoroutine
()
-
start
,
check
.
Equals
,
1
)
}
func
(
s
*
Suite
)
TestMultiSub
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
Sub
(
"t1"
,
"t2"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hi"
)
ps
.
Pub
(
"hello"
,
"t2"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hello"
)
ps
.
Shutdown
()
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
}
func
(
s
*
Suite
)
TestMultiSubOnce
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
SubOnce
(
"t1"
,
"t2"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hi"
)
ps
.
Pub
(
"hello"
,
"t2"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestMultiSubOnceEach
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
SubOnceEach
(
"t1"
,
"t2"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hi"
)
ps
.
Pub
(
"hi!"
,
"t1"
)
// ignored
ps
.
Pub
(
"hello"
,
"t2"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hello"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestMultiPub
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch1
:=
ps
.
Sub
(
"t1"
)
ch2
:=
ps
.
Sub
(
"t2"
)
ps
.
Pub
(
"hi"
,
"t1"
,
"t2"
)
c
.
Check
(
<-
ch1
,
check
.
Equals
,
"hi"
)
c
.
Check
(
<-
ch2
,
check
.
Equals
,
"hi"
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestMultiUnsub
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
Sub
(
"t1"
,
"t2"
,
"t3"
)
ps
.
Unsub
(
ch
,
"t1"
)
ps
.
Pub
(
"hi"
,
"t1"
)
ps
.
Pub
(
"hello"
,
"t2"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hello"
)
ps
.
Unsub
(
ch
,
"t2"
,
"t3"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Shutdown
()
}
func
(
s
*
Suite
)
TestMultiClose
(
c
*
check
.
C
)
{
ps
:=
New
(
1
)
ch
:=
ps
.
Sub
(
"t1"
,
"t2"
)
ps
.
Pub
(
"hi"
,
"t1"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hi"
)
ps
.
Close
(
"t1"
)
ps
.
Pub
(
"hello"
,
"t2"
)
c
.
Check
(
<-
ch
,
check
.
Equals
,
"hello"
)
ps
.
Close
(
"t2"
)
_
,
ok
:=
<-
ch
c
.
Check
(
ok
,
check
.
Equals
,
false
)
ps
.
Shutdown
()
}
exchange/bitswap/notifications/notifications.go
浏览文件 @
6956dad5
...
...
@@ -4,10 +4,9 @@ import (
"context"
"sync"
blocks
"gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
pubsub
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
cid
"gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
pubsub
"gx/ipfs/QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2/pubsub"
blocks
"gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
)
const
bufferSize
=
16
...
...
package.json
浏览文件 @
6956dad5
...
...
@@ -563,6 +563,12 @@
"hash"
:
"QmTVDM4LCSUMFNQzbDLL9zQwp8usE6QHymFdh3h8vL9v6b"
,
"name"
:
"go-ipfs-blockstore"
,
"version"
:
"0.0.1"
},
{
"author"
:
"why"
,
"hash"
:
"QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2"
,
"name"
:
"pubsub"
,
"version"
:
"1.0.0"
}
],
"gxVersion"
:
"0.10.0"
,
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论