提交 6f9a9a5b 作者: Lars Gierth

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>

WIP

License: MIT
Signed-off-by: 's avatarLars Gierth <larsg@systemli.org>
上级 de0bbb00
package main
import (
"context"
"fmt"
"strings"
"sync"
"time"
p2pdiscovery "gx/ipfs/QmPL3AKtiaQyYpchZceXBZhZ3MSnoGqJvLZrc7fzDTTQdJ/go-libp2p/p2p/discovery"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing"
p2pcrypto "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
p2pnet "gx/ipfs/QmSTaEYUgDe1r581hxyd2u9582Hgp3KX4wGwYbRqz2u9Qh/go-libp2p-net"
record "gx/ipfs/QmSb4B8ZAAj5ALe9LjfzPyF8Ma6ezC1NTnDF2JQPUJxEXb/go-libp2p-record"
dht "gx/ipfs/QmSteomMgXnSQxLEY5UpxmkYAd8QF9JuLLeLYBokTHxFru/go-libp2p-kad-dht"
dhtopts "gx/ipfs/QmSteomMgXnSQxLEY5UpxmkYAd8QF9JuLLeLYBokTHxFru/go-libp2p-kad-dht/opts"
floodsub "gx/ipfs/QmTcC9Qx2adsdGguNpqZ6dJK7MMsH8sf3yfxZxG3bSwKet/go-libp2p-floodsub"
dns "gx/ipfs/QmWchsfMt9Re1CQaiHqPQC1DrZ9bkpa6n229dRYkGyLXNh/dns"
peerstore "gx/ipfs/QmWtCpWB39Rzc2xTB75MKorsxNpo3TyecTEN24CJ3KVohE/go-libp2p-peerstore"
ipns "gx/ipfs/QmX72XT6sSQRkNHKcAFLM2VqB3B4bWPetgWnHY8LgsUVeT/go-ipns"
ipnspb "gx/ipfs/QmX72XT6sSQRkNHKcAFLM2VqB3B4bWPetgWnHY8LgsUVeT/go-ipns/pb"
maddr "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
datastore "gx/ipfs/QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D/go-datastore"
p2ppeer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
proto "gx/ipfs/QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8/gogo-protobuf/proto"
multibase "gx/ipfs/QmekxXDhCxCJRNuzmHreuaT3BsuJcsjcXWNrtV9C8DRHtd/go-multibase"
p2phost "gx/ipfs/Qmf5yHzmWAyHSJRPAmZzfk3Yd7icydBLi7eec5741aov7v/go-libp2p-host"
)
type Daemon struct {
Context context.Context
Host p2phost.Host
Routing routing.IpfsRouting
Discovery p2pdiscovery.Service
PubSub *floodsub.PubSub
Updates *floodsub.Subscription
entries map[p2ppeer.ID]*ipnspb.IpnsEntry
entriesMutex sync.RWMutex
}
func NewDaemon(ctx context.Context, host p2phost.Host) (*Daemon, error) {
d := &Daemon{Context: ctx, Host: host}
pubsub, err := floodsub.NewGossipSub(ctx, host)
if err != nil {
return nil, err
}
interval := 5 * time.Second
discovery, err := p2pdiscovery.NewMdnsService(ctx, host, interval, p2pdiscovery.ServiceTag)
if err != nil {
return nil, err
}
discovery.RegisterNotifee(d)
ds := datastore.NewMapDatastore()
validator := record.NamespacedValidator{
// "pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{KeyBook: host.Peerstore()},
}
dht, err := dht.New(ctx, host, dhtopts.Datastore(ds), dhtopts.Validator(validator))
if err != nil {
return nil, err
}
d.PubSub = pubsub
d.Discovery = discovery
d.Routing = dht
d.entries = map[p2ppeer.ID]*ipnspb.IpnsEntry{}
return d, nil
}
func (d *Daemon) Bootstrap(ctx context.Context, addrs []string, topic string) error {
if err := d.BootstrapNetwork(ctx, addrs); err != nil {
return err
}
return d.Subscribe(topic)
}
func (d *Daemon) BootstrapNetwork(ctx context.Context, addrs []string) error {
for _, a := range addrs {
pinfo, err := peerstore.InfoFromP2pAddr(maddr.StringCast(a))
if err != nil {
return err
}
if err = d.Host.Connect(ctx, *pinfo); err != nil {
return err
}
fmt.Printf("connected: /p2p/%s\n", pinfo.ID.Pretty())
}
return nil
}
func (d *Daemon) HandlePeerFound(pinfo peerstore.PeerInfo) {
connectTimeout := 10 * time.Second
ctx, cancel := context.WithTimeout(d.Context, connectTimeout)
defer cancel()
connected := d.Host.Network().Connectedness(pinfo.ID) == p2pnet.Connected
if connected {
return
}
fmt.Printf("found: /p2p/%s %+v\n", pinfo.ID.Pretty(), pinfo.Addrs)
if err := d.Host.Connect(ctx, pinfo); err == nil {
fmt.Printf("connected: /p2p/%s\n", pinfo.ID.Pretty())
}
}
func (d *Daemon) AnnouncePubsub(ctx context.Context, topic string) error {
timeout := 120 * time.Second
cid := blocks.NewBlock([]byte("floodsub:" + topic)).Cid()
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if err := d.Routing.Provide(ctx, cid, true); err != nil {
return err
}
return nil
}
func (d *Daemon) MaintainPubsub(ctx context.Context, topic string) error {
searchMax := 10
searchTimeout := 30 * time.Second
connectTimeout := 10 * time.Second
cid := blocks.NewBlock([]byte("floodsub:" + topic)).Cid()
sctx, cancel := context.WithTimeout(ctx, searchTimeout)
defer cancel()
provs := d.Routing.FindProvidersAsync(sctx, cid, searchMax)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi peerstore.PeerInfo) {
defer wg.Done()
ctx, cancel2 := context.WithTimeout(ctx, connectTimeout)
defer cancel2()
err := d.Host.Connect(ctx, pi)
if err != nil {
return
}
}(p)
}
wg.Wait()
return nil
}
func (d *Daemon) Subscribe(topic string) error {
if err := d.PubSub.RegisterTopicValidator(topic, d.validateMessage); err != nil {
return err
}
sub, err := d.PubSub.Subscribe(topic)
if err != nil {
return err
}
d.Updates = sub
return nil
}
func (d *Daemon) validateMessage(ctx context.Context, msg *floodsub.Message) bool {
return true
}
func (d *Daemon) ReceiveUpdates(ctx context.Context) {
validator := ipns.Validator{}
for {
msg, err := d.Updates.Next(ctx)
if err != nil {
// fmt.Printf("update: updates.next: %s\n", err)
continue
}
entry := new(ipnspb.IpnsEntry)
err = proto.Unmarshal(msg.Data, entry)
if err != nil {
// fmt.Printf("update: unmarshal: %s\n", err)
continue
}
pubkey, err := p2pcrypto.UnmarshalPublicKey(entry.GetPubKey())
if err != nil {
// fmt.Printf("update: pubkey: %s\n", err)
continue
}
peerid, err := p2ppeer.IDFromPublicKey(pubkey)
if err != nil {
// fmt.Printf("update: peerid: %s\n", err)
continue
}
err = validator.Validate("/ipns/"+string(peerid), msg.Data)
if err != nil {
// fmt.Printf("update: validate: %s\n", err)
continue
}
// store entry
d.entriesMutex.Lock()
d.entries[peerid] = entry
d.entriesMutex.Unlock()
fmt.Printf("update: /ipns/%s => %s\n", peerid.Pretty(), entry.GetValue())
}
}
func (d *Daemon) GetEntry(peerid p2ppeer.ID) (*ipnspb.IpnsEntry, bool) {
d.entriesMutex.RLock()
defer d.entriesMutex.RUnlock()
entry, ok := d.entries[peerid]
return entry, ok
}
func (d *Daemon) StartDNS(ctx context.Context, address, network string) {
handler := &dnsServer{getEntry: d.GetEntry, network: network}
err := dns.ListenAndServe(address, network, handler)
if err != nil {
fmt.Printf("dns server: %s\n", err)
}
}
type dnsServer struct {
getEntry func(p2ppeer.ID) (*ipnspb.IpnsEntry, bool)
network string
}
func (dnsserv *dnsServer) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
fmt.Printf("dns(%s): request: %+v\n", dnsserv.network, r.Question)
if len(r.Question) == 0 {
// fmt.Printf("dns(%s): no question asked\n", dnsserv.network)
return
}
q := r.Question[0]
if q.Qtype != dns.TypeTXT {
// fmt.Printf("dns(%s): i only speak TXT\n", dnsserv.network)
return
}
labels := dns.SplitDomainName(q.Name)
peercid, err := cid.Decode(labels[0])
if err != nil {
// fmt.Printf("dns(%s): cid error: %s\n", dnsserv.network, err)
return
}
peerid, err := p2ppeer.IDFromBytes(peercid.Hash())
if err != nil {
// fmt.Printf("dns(%s): peerid error: %s\n", dnsserv.network, err)
return
}
hdr := dns.RR_Header{Ttl: 1, Class: dns.ClassINET, Rrtype: dns.TypeTXT}
hdr.Name = strings.Join(labels, ".") + "."
entry, ok := dnsserv.getEntry(peerid)
if !ok {
m := new(dns.Msg)
m.SetRcode(r, dns.RcodeSuccess)
m.Answer = []dns.RR{&dns.TXT{Hdr: hdr, Txt: []string{"ipns="}}}
m.Authoritative = true
w.WriteMsg(m)
fmt.Printf("dns(%s): nxdomain: /ipns/%s\n", dnsserv.network, peerid.Pretty())
return
}
m := new(dns.Msg)
m.SetReply(r)
if e := m.IsEdns0(); e != nil {
// fmt.Printf("dns(%s): edns0: 4096 bytes\n", dnsserv.network)
m.SetEdns0(4096, e.Do())
} else if dnsserv.network == "udp" {
m.Truncated = true
}
m.Authoritative = true
data, err := proto.Marshal(entry)
if err != nil {
// fmt.Printf("dns(%s): protobuf error: %s\n", dnsserv.network, err)
return
}
bigtxt := "ipns=" + multibase.MustNewEncoder(multibase.Base32).Encode(data)
biglen := len(bigtxt)
txt := []string{}
for biglen > 0 {
pos := 254
if biglen < 254 {
pos = biglen
}
txt = append(txt, bigtxt[:pos])
bigtxt = bigtxt[pos:]
biglen = len(bigtxt)
}
m.Answer = []dns.RR{&dns.TXT{Hdr: hdr, Txt: txt}}
m.SetRcode(r, dns.RcodeSuccess)
w.WriteMsg(m)
fmt.Printf("dns(%s): ok: /ipns/%s\n", dnsserv.network, peerid.Pretty())
}
package main
// /ipns/QmWjFr2JgC1rirRGMeLYxR3EEjCjK34DaRRHuz2PLVpTgb
// bafkreid4vaai4tak4nf5xqrvsridzlkg5ry26y3tyadk4avsje3z5zljta.ipns.name
import (
"context"
"fmt"
"os"
"time"
libp2p "gx/ipfs/QmPL3AKtiaQyYpchZceXBZhZ3MSnoGqJvLZrc7fzDTTQdJ/go-libp2p"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
)
const topic = "/ipns/.well-known/all"
var bootstrap = []string{
"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",
"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",
"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",
"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(os.Args) == 1 {
fmt.Printf("usage: ipns-dns <dns-listen-addr> (e.g. ipns-dns 127.0.0.1:53)\n")
os.Exit(1)
}
dnsAddress := os.Args[1]
host, err := libp2p.New(ctx)
if err != nil {
panic(err)
}
for _, ma := range host.Network().ListenAddresses() {
fmt.Printf("listening: %s/p2p/%s\n", ma.String(), host.ID().Pretty())
}
d, err := NewDaemon(ctx, host)
if err != nil {
panic(err)
}
if err = d.Bootstrap(ctx, bootstrap, topic); err != nil {
panic(err)
}
fmt.Printf("bootstrapped: ok\n")
go d.ReceiveUpdates(ctx)
go d.StartDNS(ctx, dnsAddress, "udp")
go d.StartDNS(ctx, dnsAddress, "tcp")
go func() {
for {
fmt.Printf("announcing pubsub...\n")
d.AnnouncePubsub(ctx, topic)
fmt.Printf("announcing pubsub: done\n")
time.Sleep(30 * time.Second)
}
}()
go func() {
for {
fmt.Printf("maintaining pubsub...\n")
d.MaintainPubsub(ctx, topic)
fmt.Printf("maintaining pubsub: done\n")
time.Sleep(30 * time.Second)
}
}()
go func() {
for range time.Tick(10 * time.Second) {
conns := host.Network().Conns()
local := 0
for _, conn := range conns {
if manet.IsIPLoopback(conn.RemoteMultiaddr()) {
local++
}
}
fmt.Printf("peers: total %d, local %d, topic %d\n",
len(conns), local, len(d.PubSub.ListPeers(topic)))
}
}()
select {}
}
package dnspubsub
import (
"context"
"fmt"
"net"
"strings"
"time"
namesys "github.com/ipfs/go-ipfs/namesys"
namesysopt "github.com/ipfs/go-ipfs/namesys/opts"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
multihash "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
p2pcrypto "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
floodsub "gx/ipfs/QmTcC9Qx2adsdGguNpqZ6dJK7MMsH8sf3yfxZxG3bSwKet/go-libp2p-floodsub"
ipns "gx/ipfs/QmX72XT6sSQRkNHKcAFLM2VqB3B4bWPetgWnHY8LgsUVeT/go-ipns"
ipnspb "gx/ipfs/QmX72XT6sSQRkNHKcAFLM2VqB3B4bWPetgWnHY8LgsUVeT/go-ipns/pb"
path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
proto "gx/ipfs/QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8/gogo-protobuf/proto"
multibase "gx/ipfs/QmekxXDhCxCJRNuzmHreuaT3BsuJcsjcXWNrtV9C8DRHtd/go-multibase"
)
type Namesys struct {
PubSub *floodsub.PubSub
DNS *net.Resolver
Topic string
}
func NewNamesys(pubsub *floodsub.PubSub, resolver *net.Resolver, topic string) Namesys {
return Namesys{pubsub, resolver, topic}
}
func (n Namesys) Resolve(ctx context.Context, namepath string, opts ...namesysopt.ResolveOpt) (path.Path, error) {
if !strings.HasPrefix(namepath, "/ipns/") {
return "", fmt.Errorf("not an ipns name: %s", namepath)
}
peerid, err := multihash.FromB58String(strings.Split(namepath, "/")[2])
if err != nil {
return "", fmt.Errorf("failed to decode PeerID: %s", err)
}
peercid := cid.NewCidV1(cid.Raw, peerid)
peeridb32 := peercid.Encode(multibase.MustNewEncoder(multibase.Base32))
domainname := peeridb32 + ".ipns.name"
fmt.Printf("dns: lookup: TXT %s\n", domainname)
records, err := n.DNS.LookupTXT(ctx, domainname)
if err != nil {
return "", err
}
str := strings.Join(records, "")
// fmt.Printf("dns: record: %+v\n", str)
if !strings.HasPrefix(str, "ipns=") || len(str) <= 5 {
return "", fmt.Errorf("dns: not a ipns= record")
}
// fmt.Printf("dns: multibase\n")
_, pb, err := multibase.Decode(str[5:])
if err != nil {
return "", fmt.Errorf("dns: multibase error: %s", err)
}
// fmt.Printf("dns: protobuf\n")
entry := new(ipnspb.IpnsEntry)
err = proto.Unmarshal(pb, entry)
if err != nil {
return "", fmt.Errorf("dns: protobuf error: %s", err)
}
// fmt.Printf("dns: path\n")
p, err := path.ParsePath(string(entry.GetValue()))
if err != nil {
return "", fmt.Errorf("dns: path error: %s", err)
}
// fmt.Printf("dns: return %s\n", p)
return p, nil
}
func (n Namesys) ResolveAsync(ctx context.Context, name string, opts ...namesysopt.ResolveOpt) <-chan namesys.Result {
res := make(chan namesys.Result, 1)
path, err := n.Resolve(ctx, name, opts...)
res <- namesys.Result{path, err}
close(res)
return res
}
func (n Namesys) Publish(ctx context.Context, name p2pcrypto.PrivKey, value path.Path) error {
arbitraryEOL := time.Now().Add(24 * time.Hour)
return n.PublishWithEOL(ctx, name, value, arbitraryEOL)
}
func (n Namesys) PublishWithEOL(ctx context.Context, privkey p2pcrypto.PrivKey, value path.Path, eol time.Time) error {
seqNo := 0
entry, err := ipns.Create(privkey, []byte(value), uint64(seqNo), eol)
if err != nil {
return err
}
if err = ipns.EmbedPublicKey(privkey.GetPublic(), entry); err != nil {
return err
}
data, err := proto.Marshal(entry)
if err != nil {
return err
}
if err = n.PubSub.Publish(n.Topic, data); err != nil {
return err
}
return nil
}
......@@ -16,11 +16,13 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strings"
"time"
version "github.com/ipfs/go-ipfs"
dnspubsub "github.com/ipfs/go-ipfs/cmd/ipns-dns/namesys"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
mount "github.com/ipfs/go-ipfs/fuse/mount"
......@@ -581,13 +583,24 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, routingOptio
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore)
size, err := n.getCacheSize()
if err != nil {
return err
}
// size, err := n.getCacheSize()
// if err != nil {
// return err
// }
// setup name system
n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
// n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
// dnsdial := func(ctx context.Context, network, address string) (net.Conn, error) {
// dnsdialer := &net.Dialer{}
// host, _, err := net.SplitHostPort(address)
// if err != nil {
// return nil, err
// }
// return dnsdialer.DialContext(ctx, network, host+":4053")
// }
// dnsresolver := &net.Resolver{Dial: dnsdial}
dnsresolver := net.DefaultResolver
n.Namesys = dnspubsub.NewNamesys(n.Floodsub, dnsresolver, "/ipns/.well-known/all")
// setup ipns republishing
return n.setupIpnsRepublisher()
......
......@@ -605,6 +605,12 @@
"hash": "QmRmMbeY5QC5iMsuW16wchtFt8wmYTv2suWb8t9MV8dsxm",
"name": "go-libp2p-autonat-svc",
"version": "1.0.5"
},
{
"author": "whyrusleeping",
"hash": "QmWchsfMt9Re1CQaiHqPQC1DrZ9bkpa6n229dRYkGyLXNh",
"name": "dns",
"version": "0.0.0"
}
],
"gxVersion": "0.10.0",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论