提交 7ff9f09b 作者: Łukasz Magiera

namesys: Implement async methods

License: MIT
Signed-off-by: 's avatarŁukasz Magiera <magik6k@gmail.com>
上级 86559e9e
......@@ -30,6 +30,7 @@ const (
nocacheOptionName = "nocache"
dhtRecordCountOptionName = "dht-record-count"
dhtTimeoutOptionName = "dht-timeout"
streamOptionName = "stream"
)
var IpnsCmd = &cmds.Command{
......@@ -78,6 +79,7 @@ Resolve the value of a dnslink:
cmdkit.BoolOption(nocacheOptionName, "n", "Do not use cached entries."),
cmdkit.UintOption(dhtRecordCountOptionName, "dhtrc", "Number of records to request for DHT resolution."),
cmdkit.StringOption(dhtTimeoutOptionName, "dhtt", "Max time to collect values during DHT resolution eg \"30s\". Pass 0 for no timeout."),
cmdkit.BoolOption(streamOptionName, "s", "Stream entries as they are found."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
......
......@@ -10,13 +10,21 @@ import (
path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path"
)
type onceResult struct {
value path.Path
ttl time.Duration
err error
}
type resolver interface {
// resolveOnce looks up a name once (without recursion).
resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (value path.Path, ttl time.Duration, err error)
resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (value path.Path, ttl time.Duration, err error)
resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult
}
// resolve is a helper for implementing Resolver.ResolveN using resolveOnce.
func resolve(ctx context.Context, r resolver, name string, options *opts.ResolveOpts, prefixes ...string) (path.Path, error) {
func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) {
depth := options.Depth
for {
p, _, err := r.resolveOnce(ctx, name, options)
......@@ -34,23 +42,82 @@ func resolve(ctx context.Context, r resolver, name string, options *opts.Resolve
return p, ErrResolveRecursion
}
matched := false
for _, prefix := range prefixes {
if strings.HasPrefix(p.String(), prefix) {
matched = true
if len(prefixes) == 1 {
name = strings.TrimPrefix(p.String(), prefix)
}
break
}
}
if !matched {
if !strings.HasPrefix(p.String(), prefix) {
return p, nil
}
name = strings.TrimPrefix(p.String(), prefix)
if depth > 1 {
depth--
}
}
}
//TODO:
// - better error handling
func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
resCh := r.resolveOnceAsync(ctx, name, options)
depth := options.Depth
outCh := make(chan Result)
go func() {
defer close(outCh)
var subCh <-chan Result
var cancelSub context.CancelFunc
for {
select {
case res, ok := <-resCh:
if res.err != nil {
outCh <- Result{err: res.err}
return
}
if !ok {
resCh = nil
continue
}
log.Debugf("resolved %s to %s", name, res.value.String())
if strings.HasPrefix(res.value.String(), "/ipfs/") {
outCh <- Result{err: res.err}
continue
}
p := strings.TrimPrefix(res.value.String(), prefix)
if depth == 1 {
outCh <- Result{err: ErrResolveRecursion}
continue
}
subopts := options
if subopts.Depth > 1 {
subopts.Depth--
}
var subCtx context.Context
if subCh != nil {
// Cancel previous recursive resolve since it won't be used anyways
cancelSub()
}
subCtx, cancelSub = context.WithCancel(ctx)
subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix)
case res, ok := <-subCh:
if res.err != nil {
outCh <- Result{err: res.err}
return
}
if !ok {
subCh = nil
continue
}
outCh <- res
case <-ctx.Done():
}
}
}()
return outCh
}
func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
return resolveAsyncDo(ctx, r, name, options, prefix)
}
......@@ -39,7 +39,7 @@ type lookupRes struct {
// resolveOnce implements resolver.
// TXT records for a given domain name should contain a b58
// encoded multihash.
func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) {
func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
segments := strings.SplitN(name, "/", 2)
domain := segments[0]
......@@ -84,6 +84,61 @@ func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options *opt
return p, 0, err
}
func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
out := make(chan onceResult, 1)
segments := strings.SplitN(name, "/", 2)
domain := segments[0]
if !isd.IsDomain(domain) {
out <- onceResult{err: errors.New("not a valid domain name")}
close(out)
return out
}
log.Debugf("DNSResolver resolving %s", domain)
rootChan := make(chan lookupRes, 1)
go workDomain(r, domain, rootChan)
subChan := make(chan lookupRes, 1)
go workDomain(r, "_dnslink."+domain, subChan)
go func() {
defer close(out)
for {
select {
case subRes, ok := <-subChan:
if !ok {
subChan = nil
}
if subRes.error == nil {
select {
case out <- onceResult{value: subRes.path}:
case <-ctx.Done():
}
return
}
case rootRes, ok := <-rootChan:
if !ok {
subChan = nil
}
if rootRes.error == nil {
select {
case out <- onceResult{value: rootRes.path}:
case <-ctx.Done():
}
}
case <-ctx.Done():
return
}
if subChan == nil && rootChan == nil {
return
}
}
}()
return out
}
func workDomain(r *DNSResolver, name string, res chan lookupRes) {
txt, err := r.lookupTXT(name)
......
......@@ -63,6 +63,12 @@ type NameSystem interface {
Publisher
}
// Result is the return type for Resolver.ResolveAsync.
type Result struct {
path path.Path
err error
}
// Resolver is an object capable of resolving names.
type Resolver interface {
......@@ -81,6 +87,11 @@ type Resolver interface {
// users will be fine with this default limit, but if you need to
// adjust the limit you can specify it as an option.
Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (value path.Path, err error)
// ResolveAsync performs recursive name lookup, like Resolve, but it returns
// entries as they are discovered in the DHT. Each returned result is guaranteed
// to be "better" (which usually means newer) than the previous one.
ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result
}
// Publisher is an object capable of publishing particular names.
......
......@@ -64,8 +64,25 @@ func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.Resolv
return resolve(ctx, ns, name, opts.ProcessOpts(options), "/ipns/")
}
func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
res := make(chan Result, 1)
if strings.HasPrefix(name, "/ipfs/") {
p, err := path.ParsePath(name)
res <- Result{p, err}
return res
}
if !strings.HasPrefix(name, "/") {
p, err := path.ParsePath("/ipfs/" + name)
res <- Result{p, err}
return res
}
return resolveAsync(ctx, ns, name, opts.ProcessOpts(options), "/ipns/")
}
// resolveOnce implements resolver.
func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) {
func (ns *mpns) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
if !strings.HasPrefix(name, "/ipns/") {
name = "/ipns/" + name
}
......@@ -107,6 +124,75 @@ func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.Reso
return p, 0, err
}
func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
out := make(chan onceResult, 1)
if !strings.HasPrefix(name, "/ipns/") {
name = "/ipns/" + name
}
segments := strings.SplitN(name, "/", 4)
if len(segments) < 3 || segments[0] != "" {
log.Debugf("invalid name syntax for %s", name)
out <- onceResult{err: ErrResolveFailed}
close(out)
return out
}
key := segments[2]
if p, ok := ns.cacheGet(key); ok {
out <- onceResult{value: p}
close(out)
return out
}
// Resolver selection:
// 1. if it is a multihash resolve through "ipns".
// 2. if it is a domain name, resolve through "dns"
// 3. otherwise resolve through the "proquint" resolver
var res resolver
if _, err := mh.FromB58String(key); err == nil {
res = ns.ipnsResolver
} else if isd.IsDomain(key) {
res = ns.dnsResolver
} else {
res = ns.proquintResolver
}
resCh := res.resolveOnceAsync(ctx, key, options)
var best onceResult
go func() {
defer close(out)
for {
select {
case res, ok := <-resCh:
if !ok {
if best != (onceResult{}) {
ns.cacheSet(key, best.value, best.ttl)
}
return
}
if res.err == nil {
best = res
}
p := res.value
// Attach rest of the path
if len(segments) > 3 {
p, _ = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
}
out <- onceResult{value: p, err: res.err}
case <-ctx.Done():
return
}
}
}()
return out
}
// Publish implements Publisher
func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL))
......
......@@ -31,8 +31,8 @@ type ResolveOpts struct {
// DefaultResolveOpts returns the default options for resolving
// an IPNS path
func DefaultResolveOpts() *ResolveOpts {
return &ResolveOpts{
func DefaultResolveOpts() ResolveOpts {
return ResolveOpts{
Depth: DefaultDepthLimit,
DhtRecordCount: 16,
DhtTimeout: time.Minute,
......@@ -65,10 +65,10 @@ func DhtTimeout(timeout time.Duration) ResolveOpt {
}
// ProcessOpts converts an array of ResolveOpt into a ResolveOpts object
func ProcessOpts(opts []ResolveOpt) *ResolveOpts {
func ProcessOpts(opts []ResolveOpt) ResolveOpts {
rsopts := DefaultResolveOpts()
for _, option := range opts {
option(rsopts)
option(&rsopts)
}
return rsopts
}
......@@ -19,7 +19,7 @@ func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ...
}
// resolveOnce implements resolver. Decodes the proquint string.
func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) {
func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
ok, err := proquint.IsProquint(name)
if err != nil || !ok {
return "", 0, errors.New("not a valid proquint string")
......@@ -27,3 +27,17 @@ func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options
// Return a 0 TTL as caching this result is pointless.
return path.FromString(string(proquint.Decode(name))), 0, nil
}
func (r *ProquintResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
out := make(chan onceResult, 1)
defer close(out)
ok, err := proquint.IsProquint(name)
if err != nil || !ok {
out <- onceResult{err: errors.New("not a valid proquint string")}
return out
}
// Return a 0 TTL as caching this result is pointless.
out <- onceResult{value: path.FromString(string(proquint.Decode(name)))}
return out
}
......@@ -42,9 +42,13 @@ func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts
return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
}
func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
}
// resolveOnce implements resolver. Uses the IPFS routing system to
// resolve SFS-like names.
func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) {
func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
log.Debugf("RoutingResolver resolving %s", name)
if options.DhtTimeout != 0 {
......@@ -126,3 +130,132 @@ func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options *op
return p, ttl, nil
}
func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
out := make(chan onceResult, 1)
log.Debugf("RoutingResolver resolving %s", name)
if options.DhtTimeout != 0 {
// Resolution must complete within the timeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout)
defer cancel()
}
name = strings.TrimPrefix(name, "/ipns/")
hash, err := mh.FromB58String(name)
if err != nil {
// name should be a multihash. if it isn't, error out here.
log.Debugf("RoutingResolver: bad input hash: [%s]\n", name)
out <- onceResult{err: err}
close(out)
return out
}
pid, err := peer.IDFromBytes(hash)
if err != nil {
log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err)
out <- onceResult{err: err}
close(out)
return out
}
// Name should be the hash of a public key retrievable from ipfs.
// We retrieve the public key here to make certain that it's in the peer
// store before calling GetValue() on the DHT - the DHT will call the
// ipns validator, which in turn will get the public key from the peer
// store to verify the record signature
_, err = routing.GetPublicKey(r.routing, ctx, pid)
if err != nil {
log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err)
out <- onceResult{err: err}
close(out)
return out
}
// Use the routing system to get the name.
// Note that the DHT will call the ipns validator when retrieving
// the value, which in turn verifies the ipns record signature
ipnsKey := ipns.RecordKey(pid)
vals, err := r.routing.(*dht.IpfsDHT).SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount)))
if err != nil {
log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err)
out <- onceResult{err: err}
close(out)
return out
}
go func() {
defer close(out)
for {
select {
case val, ok := <-vals:
if !ok {
return
}
entry := new(pb.IpnsEntry)
err = proto.Unmarshal(val, entry)
if err != nil {
log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err)
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
return
}
var p path.Path
// check for old style record:
if valh, err := mh.Cast(entry.GetValue()); err == nil {
// Its an old style multihash record
log.Debugf("encountered CIDv0 ipns entry: %s", valh)
p = path.FromCid(cid.NewCidV0(valh))
} else {
// Not a multihash, probably a new style record
p, err = path.ParsePath(string(entry.GetValue()))
if err != nil {
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
return
}
}
ttl := DefaultResolverCacheTTL
if entry.Ttl != nil {
ttl = time.Duration(*entry.Ttl)
}
switch eol, err := ipns.GetEOL(entry); err {
case ipns.ErrUnrecognizedValidity:
// No EOL.
case nil:
ttEol := eol.Sub(time.Now())
if ttEol < 0 {
// It *was* valid when we first resolved it.
ttl = 0
} else if ttEol < ttl {
ttl = ttEol
}
default:
log.Errorf("encountered error when parsing EOL: %s", err)
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
return
}
select {
case out <- onceResult{value: p, ttl: ttl}:
case <-ctx.Done():
}
case <-ctx.Done():
return
}
}
}()
return out
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论