提交 67bd041b 作者: Juan Batiz-Benet 提交者: Brian Tiger Chow

got everything to build

上级 551c4093
......@@ -142,7 +142,6 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())
// get handler for this msg type.
var resp *Message
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
return nil, errors.New("Recieved invalid message type")
......@@ -190,6 +189,27 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
return rpmes, nil
}
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, key string, value []byte) error {
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes.Value = value
mes, err := msg.FromObject(p, pmes)
if err != nil {
return err
}
return dht.sender.SendMessage(ctx, mes)
}
func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error {
pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
mes, err := msg.FromObject(p, pmes)
if err != nil {
return err
}
return dht.sender.SendMessage(ctx, mes)
}
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
key u.Key, level int) ([]byte, []*peer.Peer, error) {
......@@ -406,6 +426,12 @@ func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
id := peer.ID(pbp.GetId())
// continue if it's ourselves
if id.Equal(dht.self.ID) {
return nil, errors.New("found self")
}
p, _ := dht.peerstore.Get(id)
if p == nil {
p, _ = dht.Find(id)
......
......@@ -36,3 +36,8 @@ func (l *logDhtRPC) Print() {
u.DOut(string(b))
}
}
func (l *logDhtRPC) EndAndPrint() {
l.EndLog()
l.Print()
}
......@@ -10,7 +10,6 @@ import (
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)
......@@ -38,18 +37,6 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
}
}
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
typ := Message_PUT_VALUE
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes.Value = value
mes, err := msg.FromObject(p, pmes)
if err != nil {
return err
}
return dht.sender.SendMessage(context.TODO(), mes)
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
u.DOut("handleGetValue for key: %s\n", pmes.GetKey())
......@@ -205,7 +192,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, err
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
for _, ps := range seq {
mes, err := msg.FromObject(ps, pmes)
_, err := msg.FromObject(ps, pmes)
if err != nil {
u.PErr("handleDiagnostics error creating message: %v\n", err)
continue
......
package dht
import (
peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
type dhtQuery struct {
// a PeerQueue
peers queue.PeerQueue
// the function to execute per peer
qfunc queryFunc
}
// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, *peer.Peer) (interface{}, []*peer.Peer, error)
func (q *dhtQuery) Run(ctx context.Context, concurrency int) (interface{}, error) {
// get own cancel function to signal when we've found the value
ctx, cancel := context.WithCancel(ctx)
// the variable waiting to be populated upon success
var result interface{}
// chanQueue is how workers receive their work
chanQueue := queue.NewChanQueue(ctx, q.peers)
// worker
worker := func() {
for {
select {
case p := <-chanQueue.DeqChan:
val, closer, err := q.qfunc(ctx, p)
if err != nil {
u.PErr("error running query: %v\n", err)
continue
}
if val != nil {
result = val
cancel() // signal we're done.
return
}
if closer != nil {
for _, p := range closer {
select {
case chanQueue.EnqChan <- p:
case <-ctx.Done():
return
}
}
}
case <-ctx.Done():
return
}
}
}
// launch all workers
for i := 0; i < concurrency; i++ {
go worker()
}
// wait until we're done. yep.
select {
case <-ctx.Done():
}
if result != nil {
return result, nil
}
return nil, ctx.Err()
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论