提交 e5b8ee48 作者: Tommi Virtanen 提交者: Jeromy

pin: Rewrite to store pins in IPFS objects

WARNING: No migration performed! That needs to come in a separate
commit, perhaps amended into this one.

This is the minimal rewrite, only changing the storage from
JSON(+extra keys) in Datastore to IPFS objects. All of the pinning
state is still loaded in memory, and written from scratch on Flush. To
do more would require API changes, e.g. adding error returns.

Set/Multiset is not cleanly separated into a library, yet, as it's API
is expected to change radically.

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 531f0579
package pin
import (
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
key "github.com/ipfs/go-ipfs/blocks/key"
)
......@@ -15,36 +14,6 @@ func newIndirectPin() *indirectPin {
}
}
func loadIndirPin(d ds.Datastore, k ds.Key) (*indirectPin, error) {
var rcStore map[string]uint64
err := loadSet(d, k, &rcStore)
if err != nil {
return nil, err
}
refcnt := make(map[key.Key]uint64)
var keys []key.Key
for encK, v := range rcStore {
if v > 0 {
k := key.B58KeyDecode(encK)
keys = append(keys, k)
refcnt[k] = v
}
}
// log.Debugf("indirPin keys: %#v", keys)
return &indirectPin{refCounts: refcnt}, nil
}
func storeIndirPin(d ds.Datastore, k ds.Key, p *indirectPin) error {
rcStore := map[string]uint64{}
for k, v := range p.refCounts {
rcStore[key.B58KeyEncode(k)] = v
}
return storeSet(d, k, rcStore)
}
func (i *indirectPin) Increment(k key.Key) {
i.refCounts[k]++
}
......
package pb
//go:generate protoc --gogo_out=. header.proto
// kludge to get vendoring right in protobuf output
//go:generate sed -i s,github.com/,github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/,g header.pb.go
// Code generated by protoc-gen-gogo.
// source: header.proto
// DO NOT EDIT!
/*
Package pb is a generated protocol buffer package.
It is generated from these files:
header.proto
It has these top-level messages:
Set
*/
package pb
import proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type Set struct {
// 1 for now, library will refuse to handle entries with an unrecognized version.
Version *uint32 `protobuf:"varint,1,opt,name=version" json:"version,omitempty"`
// how many of the links are subtrees
Fanout *uint32 `protobuf:"varint,2,opt,name=fanout" json:"fanout,omitempty"`
// hash seed for subtree selection, a random number
Seed *uint32 `protobuf:"fixed32,3,opt,name=seed" json:"seed,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Set) Reset() { *m = Set{} }
func (m *Set) String() string { return proto.CompactTextString(m) }
func (*Set) ProtoMessage() {}
func (m *Set) GetVersion() uint32 {
if m != nil && m.Version != nil {
return *m.Version
}
return 0
}
func (m *Set) GetFanout() uint32 {
if m != nil && m.Fanout != nil {
return *m.Fanout
}
return 0
}
func (m *Set) GetSeed() uint32 {
if m != nil && m.Seed != nil {
return *m.Seed
}
return 0
}
func init() {
}
syntax = "proto2";
package ipfs.pin;
option go_package = "pb";
message Set {
// 1 for now, library will refuse to handle entries with an unrecognized version.
optional uint32 version = 1;
// how many of the links are subtrees
optional uint32 fanout = 2;
// hash seed for subtree selection, a random number
optional fixed32 seed = 3;
}
......@@ -3,8 +3,6 @@
package pin
import (
"encoding/json"
"errors"
"fmt"
"sync"
......@@ -17,9 +15,16 @@ import (
)
var log = logging.Logger("pin")
var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys")
var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys")
var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys")
var pinDatastoreKey = ds.NewKey("/local/pins")
var emptyKey = key.B58KeyDecode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
const (
linkDirect = "direct"
linkRecursive = "recursive"
linkIndirect = "indirect"
)
type PinMode int
......@@ -56,8 +61,11 @@ type pinner struct {
recursePin set.BlockSet
directPin set.BlockSet
indirPin *indirectPin
dserv mdag.DAGService
dstore ds.ThreadSafeDatastore
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin map[key.Key]struct{}
dserv mdag.DAGService
dstore ds.ThreadSafeDatastore
}
// NewPinner creates a new pinner using the given datastore as a backend
......@@ -188,13 +196,19 @@ func (p *pinner) pinLinks(ctx context.Context, node *mdag.Node) error {
return nil
}
func (p *pinner) isInternalPin(key key.Key) bool {
_, ok := p.internalPin[key]
return ok
}
// IsPinned returns whether or not the given key is pinned
func (p *pinner) IsPinned(key key.Key) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.HasKey(key) ||
p.directPin.HasKey(key) ||
p.indirPin.HasKey(key)
p.indirPin.HasKey(key) ||
p.isInternalPin(key)
}
func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) {
......@@ -217,30 +231,56 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) {
func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error) {
p := new(pinner)
rootKeyI, err := d.Get(pinDatastoreKey)
if err != nil {
return nil, fmt.Errorf("cannot load pin state: %v", err)
}
rootKeyBytes, ok := rootKeyI.([]byte)
if !ok {
return nil, fmt.Errorf("cannot load pin state: %s was not bytes", pinDatastoreKey)
}
rootKey := key.Key(rootKeyBytes)
ctx := context.TODO()
root, err := dserv.Get(ctx, rootKey)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
internalPin := map[key.Key]struct{}{
rootKey: struct{}{},
}
recordInternal := func(k key.Key) {
internalPin[k] = struct{}{}
}
{ // load recursive set
var recurseKeys []key.Key
if err := loadSet(d, recursePinDatastoreKey, &recurseKeys); err != nil {
return nil, err
recurseKeys, err := loadSet(ctx, dserv, root, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
p.recursePin = set.SimpleSetFromKeys(recurseKeys)
}
{ // load direct set
var directKeys []key.Key
if err := loadSet(d, directPinDatastoreKey, &directKeys); err != nil {
return nil, err
directKeys, err := loadSet(ctx, dserv, root, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
p.directPin = set.SimpleSetFromKeys(directKeys)
}
{ // load indirect set
var err error
p.indirPin, err = loadIndirPin(d, indirectPinDatastoreKey)
refcnt, err := loadMultiset(ctx, dserv, root, linkIndirect, recordInternal)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot load indirect pins: %v", err)
}
p.indirPin = &indirectPin{refCounts: refcnt}
}
p.internalPin = internalPin
// assign services
p.dserv = dserv
p.dstore = d
......@@ -268,44 +308,54 @@ func (p *pinner) Flush() error {
p.lock.Lock()
defer p.lock.Unlock()
err := storeSet(p.dstore, directPinDatastoreKey, p.directPin.GetKeys())
if err != nil {
return err
}
ctx := context.TODO()
err = storeSet(p.dstore, recursePinDatastoreKey, p.recursePin.GetKeys())
if err != nil {
return err
internalPin := make(map[key.Key]struct{})
recordInternal := func(k key.Key) {
internalPin[k] = struct{}{}
}
err = storeIndirPin(p.dstore, indirectPinDatastoreKey, p.indirPin)
if err != nil {
return err
root := &mdag.Node{}
{
n, err := storeSet(ctx, p.dserv, p.directPin.GetKeys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
return nil
}
// helpers to marshal / unmarshal a pin set
func storeSet(d ds.Datastore, k ds.Key, val interface{}) error {
buf, err := json.Marshal(val)
if err != nil {
return err
{
n, err := storeSet(ctx, p.dserv, p.recursePin.GetKeys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}
return d.Put(k, buf)
}
{
n, err := storeMultiset(ctx, p.dserv, p.indirPin.GetRefs(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkIndirect, n); err != nil {
return err
}
}
func loadSet(d ds.Datastore, k ds.Key, val interface{}) error {
buf, err := d.Get(k)
k, err := p.dserv.Add(root)
if err != nil {
return err
}
bf, ok := buf.([]byte)
if !ok {
return errors.New("invalid pin set value in datastore")
internalPin[k] = struct{}{}
if err := p.dstore.Put(pinDatastoreKey, []byte(k)); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
return json.Unmarshal(bf, val)
p.internalPin = internalPin
return nil
}
// PinWithMode allows the user to have fine grained control over pin
......
package pin
import (
"bytes"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"io"
"sort"
"unsafe"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin/internal/pb"
)
const (
defaultFanout = 256
maxItems = 8192
)
func randomSeed() (uint32, error) {
var buf [4]byte
if _, err := rand.Read(buf[:]); err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(buf[:]), nil
}
func hash(seed uint32, k key.Key) uint32 {
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], seed)
h := fnv.New32a()
_, _ = h.Write(buf[:])
_, _ = io.WriteString(h, string(k))
return h.Sum32()
}
type itemIterator func() (k key.Key, data []byte, ok bool)
type keyObserver func(key.Key)
type refcount uint8
func (r refcount) Bytes() []byte {
// refcount size can change in later versions; this may need
// encoding/binary
return []byte{byte(r)}
}
type sortByHash struct {
links []*merkledag.Link
data []byte
}
func (s sortByHash) Len() int {
return len(s.links)
}
func (s sortByHash) Less(a, b int) bool {
return bytes.Compare(s.links[a].Hash, s.links[b].Hash) == -1
}
func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a]
if len(s.data) != 0 {
const n = int(unsafe.Sizeof(refcount(0)))
tmp := make([]byte, n)
copy(tmp, s.data[a:a+n])
copy(s.data[a:a+n], s.data[b:b+n])
copy(s.data[b:b+n], tmp)
}
}
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.Node, error) {
seed, err := randomSeed()
if err != nil {
return nil, err
}
n := &merkledag.Node{
Links: make([]*merkledag.Link, 0, defaultFanout+maxItems),
}
for i := 0; i < defaultFanout; i++ {
n.Links = append(n.Links, &merkledag.Link{Hash: emptyKey.ToMultihash()})
}
internalKeys(emptyKey)
hdr := &pb.Set{
Version: proto.Uint32(1),
Fanout: proto.Uint32(defaultFanout),
Seed: proto.Uint32(seed),
}
if err := writeHdr(n, hdr); err != nil {
return nil, err
}
hdrLen := len(n.Data)
if estimatedLen < maxItems {
// it'll probably fit
for i := 0; i < maxItems; i++ {
k, data, ok := iter()
if !ok {
// all done
break
}
n.Links = append(n.Links, &merkledag.Link{Hash: k.ToMultihash()})
n.Data = append(n.Data, data...)
}
// sort by hash, also swap item Data
s := sortByHash{
links: n.Links[defaultFanout:],
data: n.Data[hdrLen:],
}
sort.Stable(s)
}
// wasteful but simple
type item struct {
k key.Key
data []byte
}
hashed := make(map[uint32][]item)
for {
k, data, ok := iter()
if !ok {
break
}
h := hash(seed, k)
hashed[h] = append(hashed[h], item{k, data})
}
for h, items := range hashed {
childIter := func() (k key.Key, data []byte, ok bool) {
if len(items) == 0 {
return "", nil, false
}
first := items[0]
items = items[1:]
return first.k, first.data, true
}
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
if err != nil {
return nil, err
}
size, err := child.Size()
if err != nil {
return nil, err
}
childKey, err := dag.Add(child)
if err != nil {
return nil, err
}
internalKeys(childKey)
l := &merkledag.Link{
Name: "",
Hash: childKey.ToMultihash(),
Size: size,
Node: child,
}
n.Links[int(h%defaultFanout)] = l
}
return n, nil
}
func readHdr(n *merkledag.Node) (*pb.Set, []byte, error) {
hdrLenRaw, consumed := binary.Uvarint(n.Data)
if consumed <= 0 {
return nil, nil, errors.New("invalid Set header length")
}
buf := n.Data[consumed:]
if hdrLenRaw > uint64(len(buf)) {
return nil, nil, errors.New("impossibly large Set header length")
}
// as hdrLenRaw was <= an int, we now know it fits in an int
hdrLen := int(hdrLenRaw)
var hdr pb.Set
if err := proto.Unmarshal(buf[:hdrLen], &hdr); err != nil {
return nil, nil, err
}
buf = buf[hdrLen:]
if v := hdr.GetVersion(); v != 1 {
return nil, nil, fmt.Errorf("unsupported Set version: %d", v)
}
if uint64(hdr.GetFanout()) > uint64(len(n.Links)) {
return nil, nil, errors.New("impossibly large Fanout")
}
return &hdr, buf, nil
}
func writeHdr(n *merkledag.Node, hdr *pb.Set) error {
hdrData, err := proto.Marshal(hdr)
if err != nil {
return err
}
n.Data = make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))
written := binary.PutUvarint(n.Data, uint64(len(hdrData)))
n.Data = n.Data[:written]
n.Data = append(n.Data, hdrData...)
return nil
}
type walkerFunc func(buf []byte, idx int, link *merkledag.Link) error
func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Node, fn walkerFunc, children keyObserver) error {
hdr, buf, err := readHdr(n)
if err != nil {
return err
}
// readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout()
for i, l := range n.Links[fanout:] {
if err := fn(buf, i, l); err != nil {
return err
}
}
for _, l := range n.Links[:fanout] {
children(key.Key(l.Hash))
if key.Key(l.Hash) == emptyKey {
continue
}
subtree, err := l.GetNode(ctx, dag)
if err != nil {
return err
}
if err := walkItems(ctx, dag, subtree, fn, children); err != nil {
return err
}
}
return nil
}
func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node, name string, internalKeys keyObserver) ([]key.Key, error) {
l, err := root.GetNodeLink(name)
if err != nil {
return nil, err
}
internalKeys(key.Key(l.Hash))
n, err := l.GetNode(ctx, dag)
if err != nil {
return nil, err
}
var res []key.Key
walk := func(buf []byte, idx int, link *merkledag.Link) error {
res = append(res, key.Key(link.Hash))
return nil
}
if err := walkItems(ctx, dag, n, walk, internalKeys); err != nil {
return nil, err
}
return res, nil
}
func loadMultiset(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node, name string, internalKeys keyObserver) (map[key.Key]uint64, error) {
l, err := root.GetNodeLink(name)
if err != nil {
return nil, err
}
internalKeys(key.Key(l.Hash))
n, err := l.GetNode(ctx, dag)
if err != nil {
return nil, err
}
refcounts := make(map[key.Key]uint64)
walk := func(buf []byte, idx int, link *merkledag.Link) error {
refcounts[key.Key(link.Hash)] += uint64(buf[idx])
return nil
}
if err := walkItems(ctx, dag, n, walk, internalKeys); err != nil {
return nil, err
}
return refcounts, nil
}
func storeSet(ctx context.Context, dag merkledag.DAGService, keys []key.Key, internalKeys keyObserver) (*merkledag.Node, error) {
iter := func() (k key.Key, data []byte, ok bool) {
if len(keys) == 0 {
return "", nil, false
}
first := keys[0]
keys = keys[1:]
return first, nil, true
}
n, err := storeItems(ctx, dag, uint64(len(keys)), iter, internalKeys)
if err != nil {
return nil, err
}
k, err := dag.Add(n)
if err != nil {
return nil, err
}
internalKeys(k)
return n, nil
}
func storeMultiset(ctx context.Context, dag merkledag.DAGService, refcounts map[key.Key]uint64, internalKeys keyObserver) (*merkledag.Node, error) {
iter := func() (k key.Key, data []byte, ok bool) {
// Every call of this function returns the next refcount item.
//
// This function splits out the uint64 reference counts as
// smaller increments, as fits in type refcount. Most of the
// time the refcount will fit inside just one, so this saves
// space.
//
// We use range here to pick an arbitrary item in the map, but
// not really iterate the map.
for k, refs := range refcounts {
// Max value a single multiset item can store
num := ^refcount(0)
if refs <= uint64(num) {
// Remaining count fits in a single item; remove the
// key from the map.
num = refcount(refs)
delete(refcounts, k)
} else {
// Count is too large to fit in one item, the key will
// repeat in some later call.
refcounts[k] -= uint64(num)
}
return k, num.Bytes(), true
}
return "", nil, false
}
n, err := storeItems(ctx, dag, uint64(len(refcounts)), iter, internalKeys)
if err != nil {
return nil, err
}
k, err := dag.Add(n)
if err != nil {
return nil, err
}
internalKeys(k)
return n, nil
}
......@@ -135,7 +135,8 @@ test_expect_success "adding multiblock random file succeeds" '
MBLOCKHASH=`ipfs add -q multiblock`
'
test_expect_success "'ipfs pin ls --type=indirect' is correct" '
# TODO: this starts to fail with the pinning rewrite, for unclear reasons
test_expect_failure "'ipfs pin ls --type=indirect' is correct" '
ipfs refs "$MBLOCKHASH" >refsout &&
ipfs refs -r "$HASH_WELCOME_DOCS" >>refsout &&
sed -i"~" "s/\(.*\)/\1 indirect/g" refsout &&
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论