提交 391b78a2 作者: Jeromy Johnson 提交者: GitHub

Merge pull request #3273 from ipfs/fix/pin-fail

fix bug in pinsets and add a stress test for the scenario
...@@ -2,15 +2,14 @@ package pin ...@@ -2,15 +2,14 @@ package pin
import ( import (
"bytes" "bytes"
"context"
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"sort" "sort"
"unsafe"
"context"
"github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin/internal/pb" "github.com/ipfs/go-ipfs/pin/internal/pb"
"gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
...@@ -19,8 +18,11 @@ import ( ...@@ -19,8 +18,11 @@ import (
) )
const ( const (
// defaultFanout specifies the default number of fan-out links per layer
defaultFanout = 256 defaultFanout = 256
maxItems = 8192
// maxItems is the maximum number of items that will fit in a single bucket
maxItems = 8192
) )
func randomSeed() (uint32, error) { func randomSeed() (uint32, error) {
...@@ -40,36 +42,12 @@ func hash(seed uint32, c *cid.Cid) uint32 { ...@@ -40,36 +42,12 @@ func hash(seed uint32, c *cid.Cid) uint32 {
return h.Sum32() return h.Sum32()
} }
type itemIterator func() (c *cid.Cid, data []byte, ok bool) type itemIterator func() (c *cid.Cid, ok bool)
type keyObserver func(*cid.Cid) type keyObserver func(*cid.Cid)
// refcount is the marshaled format of refcounts. It may change
// between versions; this is valid for version 1. Changing it may
// become desirable if there are many links with refcount > 255.
//
// There are two guarantees that need to be preserved, if this is
// changed:
//
// - the marshaled format is of fixed size, matching
// unsafe.Sizeof(refcount(0))
// - methods of refcount handle endianness, and may
// in later versions need encoding/binary.
type refcount uint8
func (r refcount) Bytes() []byte {
return []byte{byte(r)}
}
// readRefcount returns the idx'th refcount in []byte, which is
// assumed to be a sequence of refcount.Bytes results.
func (r *refcount) ReadFromIdx(buf []byte, idx int) {
*r = refcount(buf[idx])
}
type sortByHash struct { type sortByHash struct {
links []*merkledag.Link links []*merkledag.Link
data []byte
} }
func (s sortByHash) Len() int { func (s sortByHash) Len() int {
...@@ -82,13 +60,6 @@ func (s sortByHash) Less(a, b int) bool { ...@@ -82,13 +60,6 @@ func (s sortByHash) Less(a, b int) bool {
func (s sortByHash) Swap(a, b int) { func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a] s.links[a], s.links[b] = s.links[b], s.links[a]
if len(s.data) != 0 {
const n = int(unsafe.Sizeof(refcount(0)))
tmp := make([]byte, n)
copy(tmp, s.data[a*n:a*n+n])
copy(s.data[a*n:a*n+n], s.data[b*n:b*n+n])
copy(s.data[b*n:b*n+n], tmp)
}
} }
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.Node, error) { func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.Node, error) {
...@@ -96,13 +67,15 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint ...@@ -96,13 +67,15 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
if err != nil { if err != nil {
return nil, err return nil, err
} }
n := &merkledag.Node{
Links: make([]*merkledag.Link, 0, defaultFanout+maxItems), n := &merkledag.Node{Links: make([]*merkledag.Link, 0, defaultFanout+maxItems)}
}
for i := 0; i < defaultFanout; i++ { for i := 0; i < defaultFanout; i++ {
n.Links = append(n.Links, &merkledag.Link{Hash: emptyKey.Hash()}) n.Links = append(n.Links, &merkledag.Link{Hash: emptyKey.Hash()})
} }
// add emptyKey to our set of internal pinset objects
internalKeys(emptyKey) internalKeys(emptyKey)
hdr := &pb.Set{ hdr := &pb.Set{
Version: proto.Uint32(1), Version: proto.Uint32(1),
Fanout: proto.Uint32(defaultFanout), Fanout: proto.Uint32(defaultFanout),
...@@ -111,97 +84,106 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint ...@@ -111,97 +84,106 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
if err := writeHdr(n, hdr); err != nil { if err := writeHdr(n, hdr); err != nil {
return nil, err return nil, err
} }
hdrLen := len(n.Data())
if estimatedLen < maxItems { if estimatedLen < maxItems {
// it'll probably fit // it'll probably fit
for i := 0; i < maxItems; i++ { for i := 0; i < maxItems; i++ {
k, data, ok := iter() k, ok := iter()
if !ok { if !ok {
// all done // all done
break break
} }
n.Links = append(n.Links, &merkledag.Link{Hash: k.Hash()}) n.Links = append(n.Links, &merkledag.Link{Hash: k.Hash()})
n.SetData(append(n.Data(), data...))
} }
// sort by hash, also swap item Data // sort by hash, also swap item Data
s := sortByHash{ s := sortByHash{
links: n.Links[defaultFanout:], links: n.Links[defaultFanout:],
data: n.Data()[hdrLen:],
} }
sort.Stable(s) sort.Stable(s)
} }
// wasteful but simple hashed := make([][]*cid.Cid, defaultFanout)
type item struct {
c *cid.Cid
data []byte
}
hashed := make(map[uint32][]item)
for { for {
k, data, ok := iter() // This loop essentially enumerates every single item in the set
// and maps them all into a set of buckets. Each bucket will be recursively
// turned into its own sub-set, and so on down the chain. Each sub-set
// gets added to the dagservice, and put into its place in a set nodes
// links array.
//
// Previously, the bucket was selected by taking an int32 from the hash of
// the input key + seed. This was erroneous as we would later be assigning
// the created sub-sets into an array of length 256 by the modulus of the
// int32 hash value with 256. This resulted in overwriting existing sub-sets
// and losing pins. The fix (a few lines down from this comment), is to
// map the hash value down to the 8 bit keyspace here while creating the
// buckets. This way, we avoid any overlapping later on.
k, ok := iter()
if !ok { if !ok {
break break
} }
h := hash(seed, k) h := hash(seed, k) % defaultFanout
hashed[h] = append(hashed[h], item{k, data}) hashed[h] = append(hashed[h], k)
} }
for h, items := range hashed { for h, items := range hashed {
childIter := func() (c *cid.Cid, data []byte, ok bool) { if len(items) == 0 {
if len(items) == 0 { // recursion base case
return nil, nil, false continue
}
first := items[0]
items = items[1:]
return first.c, first.data, true
} }
childIter := getCidListIterator(items)
// recursively create a pinset from the items for this bucket index
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys) child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
size, err := child.Size() size, err := child.Size()
if err != nil { if err != nil {
return nil, err return nil, err
} }
childKey, err := dag.Add(child) childKey, err := dag.Add(child)
if err != nil { if err != nil {
return nil, err return nil, err
} }
internalKeys(childKey) internalKeys(childKey)
l := &merkledag.Link{
Name: "", // overwrite the 'empty key' in the existing links array
n.Links[h] = &merkledag.Link{
Hash: childKey.Hash(), Hash: childKey.Hash(),
Size: size, Size: size,
} }
n.Links[int(h%defaultFanout)] = l
} }
return n, nil return n, nil
} }
func readHdr(n *merkledag.Node) (*pb.Set, []byte, error) { func readHdr(n *merkledag.Node) (*pb.Set, error) {
hdrLenRaw, consumed := binary.Uvarint(n.Data()) hdrLenRaw, consumed := binary.Uvarint(n.Data())
if consumed <= 0 { if consumed <= 0 {
return nil, nil, errors.New("invalid Set header length") return nil, errors.New("invalid Set header length")
} }
buf := n.Data()[consumed:]
if hdrLenRaw > uint64(len(buf)) { pbdata := n.Data()[consumed:]
return nil, nil, errors.New("impossibly large Set header length") if hdrLenRaw > uint64(len(pbdata)) {
return nil, errors.New("impossibly large Set header length")
} }
// as hdrLenRaw was <= an int, we now know it fits in an int // as hdrLenRaw was <= an int, we now know it fits in an int
hdrLen := int(hdrLenRaw) hdrLen := int(hdrLenRaw)
var hdr pb.Set var hdr pb.Set
if err := proto.Unmarshal(buf[:hdrLen], &hdr); err != nil { if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil {
return nil, nil, err return nil, err
} }
buf = buf[hdrLen:]
if v := hdr.GetVersion(); v != 1 { if v := hdr.GetVersion(); v != 1 {
return nil, nil, fmt.Errorf("unsupported Set version: %d", v) return nil, fmt.Errorf("unsupported Set version: %d", v)
} }
if uint64(hdr.GetFanout()) > uint64(len(n.Links)) { if uint64(hdr.GetFanout()) > uint64(len(n.Links)) {
return nil, nil, errors.New("impossibly large Fanout") return nil, errors.New("impossibly large Fanout")
} }
return &hdr, buf, nil return &hdr, nil
} }
func writeHdr(n *merkledag.Node, hdr *pb.Set) error { func writeHdr(n *merkledag.Node, hdr *pb.Set) error {
...@@ -209,24 +191,31 @@ func writeHdr(n *merkledag.Node, hdr *pb.Set) error { ...@@ -209,24 +191,31 @@ func writeHdr(n *merkledag.Node, hdr *pb.Set) error {
if err != nil { if err != nil {
return err return err
} }
n.SetData(make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData)))
written := binary.PutUvarint(n.Data(), uint64(len(hdrData))) // make enough space for the length prefix and the marshalled header data
n.SetData(n.Data()[:written]) data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))
n.SetData(append(n.Data(), hdrData...))
// write the uvarint length of the header data
uvarlen := binary.PutUvarint(data, uint64(len(hdrData)))
// append the actual protobuf data *after* the length value we wrote
data = append(data[:uvarlen], hdrData...)
n.SetData(data)
return nil return nil
} }
type walkerFunc func(buf []byte, idx int, link *merkledag.Link) error type walkerFunc func(idx int, link *merkledag.Link) error
func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Node, fn walkerFunc, children keyObserver) error { func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Node, fn walkerFunc, children keyObserver) error {
hdr, buf, err := readHdr(n) hdr, err := readHdr(n)
if err != nil { if err != nil {
return err return err
} }
// readHdr guarantees fanout is a safe value // readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout() fanout := hdr.GetFanout()
for i, l := range n.Links[fanout:] { for i, l := range n.Links[fanout:] {
if err := fn(buf, i, l); err != nil { if err := fn(i, l); err != nil {
return err return err
} }
} }
...@@ -262,7 +251,7 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node ...@@ -262,7 +251,7 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node
} }
var res []*cid.Cid var res []*cid.Cid
walk := func(buf []byte, idx int, link *merkledag.Link) error { walk := func(idx int, link *merkledag.Link) error {
res = append(res, cid.NewCidV0(link.Hash)) res = append(res, cid.NewCidV0(link.Hash))
return nil return nil
} }
...@@ -272,40 +261,21 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node ...@@ -272,40 +261,21 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node
return res, nil return res, nil
} }
func loadMultiset(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node, name string, internalKeys keyObserver) (map[key.Key]uint64, error) { func getCidListIterator(cids []*cid.Cid) itemIterator {
l, err := root.GetNodeLink(name) return func() (c *cid.Cid, ok bool) {
if err != nil {
return nil, fmt.Errorf("Failed to get link %s: %v", name, err)
}
c := cid.NewCidV0(l.Hash)
internalKeys(c)
n, err := l.GetNode(ctx, dag)
if err != nil {
return nil, fmt.Errorf("Failed to get node from link %s: %v", name, err)
}
refcounts := make(map[key.Key]uint64)
walk := func(buf []byte, idx int, link *merkledag.Link) error {
var r refcount
r.ReadFromIdx(buf, idx)
refcounts[key.Key(link.Hash)] += uint64(r)
return nil
}
if err := walkItems(ctx, dag, n, walk, internalKeys); err != nil {
return nil, err
}
return refcounts, nil
}
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) {
iter := func() (c *cid.Cid, data []byte, ok bool) {
if len(cids) == 0 { if len(cids) == 0 {
return nil, nil, false return nil, false
} }
first := cids[0] first := cids[0]
cids = cids[1:] cids = cids[1:]
return first, nil, true return first, true
} }
}
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) {
iter := getCidListIterator(cids)
n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys) n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys)
if err != nil { if err != nil {
return nil, err return nil, err
......
package pin package pin
import "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" import (
"context"
"fmt"
"os"
"testing"
func ignoreKeys(key.Key) {} dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
func copyMap(m map[key.Key]uint16) map[key.Key]uint64 { cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
c := make(map[key.Key]uint64, len(m)) )
for k, v := range m {
c[k] = uint64(v) func ignoreCids(_ *cid.Cid) {}
func TestSet(t *testing.T) {
ds := mdtest.Mock()
limit := 10000 // 10000 reproduces the pinloss issue fairly reliably
if os.Getenv("STRESS_IT_OUT_YO") != "" {
limit = 10000000
}
var inputs []*cid.Cid
for i := 0; i < limit; i++ {
c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i))))
if err != nil {
t.Fatal(err)
}
inputs = append(inputs, c)
}
out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
if err != nil {
t.Fatal(err)
}
// weird wrapper node because loadSet expects us to pass an
// object pointing to multiple named sets
setroot := &dag.Node{}
err = setroot.AddNodeLinkClean("foo", out)
if err != nil {
t.Fatal(err)
}
outset, err := loadSet(context.Background(), ds, setroot, "foo", ignoreCids)
if err != nil {
t.Fatal(err)
}
if len(outset) != limit {
t.Fatal("got wrong number", len(outset), limit)
}
seen := cid.NewSet()
for _, c := range outset {
seen.Add(c)
}
for _, c := range inputs {
if !seen.Has(c) {
t.Fatalf("expected to have %s, didnt find it")
}
} }
return c
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论