提交 c49dcffc 作者: Jeromy

Allow for gc during adds

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 be092053
...@@ -5,6 +5,7 @@ package blockstore ...@@ -5,6 +5,7 @@ package blockstore
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsns "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" dsns "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
...@@ -49,6 +50,10 @@ type GCBlockstore interface { ...@@ -49,6 +50,10 @@ type GCBlockstore interface {
// at the same time, but no GC should not happen simulatenously. // at the same time, but no GC should not happen simulatenously.
// Reading during Pinning is safe, and requires no lock. // Reading during Pinning is safe, and requires no lock.
PinLock() func() PinLock() func()
// GcRequested returns true if GCLock has been called and is waiting to
// take the lock
GCRequested() bool
} }
func NewBlockstore(d ds.Batching) *blockstore { func NewBlockstore(d ds.Batching) *blockstore {
...@@ -63,7 +68,9 @@ func NewBlockstore(d ds.Batching) *blockstore { ...@@ -63,7 +68,9 @@ func NewBlockstore(d ds.Batching) *blockstore {
type blockstore struct { type blockstore struct {
datastore ds.Batching datastore ds.Batching
lk sync.RWMutex lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
} }
func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) { func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
...@@ -192,7 +199,9 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { ...@@ -192,7 +199,9 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
} }
func (bs *blockstore) GCLock() func() { func (bs *blockstore) GCLock() func() {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock() bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return bs.lk.Unlock return bs.lk.Unlock
} }
...@@ -200,3 +209,7 @@ func (bs *blockstore) PinLock() func() { ...@@ -200,3 +209,7 @@ func (bs *blockstore) PinLock() func() {
bs.lk.RLock() bs.lk.RLock()
return bs.lk.RUnlock return bs.lk.RUnlock
} }
func (bs *blockstore) GCRequested() bool {
return atomic.LoadInt32(&bs.gcreq) > 0
}
...@@ -66,3 +66,7 @@ func (w *writecache) GCLock() func() { ...@@ -66,3 +66,7 @@ func (w *writecache) GCLock() func() {
func (w *writecache) PinLock() func() { func (w *writecache) PinLock() func() {
return w.blockstore.(GCBlockstore).PinLock() return w.blockstore.(GCBlockstore).PinLock()
} }
func (w *writecache) GCRequested() bool {
return w.blockstore.(GCBlockstore).GCRequested()
}
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice" bserv "github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/exchange/offline" "github.com/ipfs/go-ipfs/exchange/offline"
importer "github.com/ipfs/go-ipfs/importer" importer "github.com/ipfs/go-ipfs/importer"
...@@ -99,6 +100,8 @@ type Adder struct { ...@@ -99,6 +100,8 @@ type Adder struct {
Chunker string Chunker string
root *dag.Node root *dag.Node
mr *mfs.Root mr *mfs.Root
unlock func()
tempRoot key.Key
} }
// Perform the actual add & pin locally, outputting results to reader // Perform the actual add & pin locally, outputting results to reader
...@@ -157,6 +160,14 @@ func (params *Adder) PinRoot() error { ...@@ -157,6 +160,14 @@ func (params *Adder) PinRoot() error {
return err return err
} }
if params.tempRoot != "" {
err := params.node.Pinning.Unpin(params.ctx, params.tempRoot, true)
if err != nil {
return err
}
params.tempRoot = rnk
}
params.node.Pinning.PinWithMode(rnk, pin.Recursive) params.node.Pinning.PinWithMode(rnk, pin.Recursive)
return params.node.Pinning.Flush() return params.node.Pinning.Flush()
} }
...@@ -256,7 +267,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { ...@@ -256,7 +267,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
return "", err return "", err
} }
err = fileAdder.AddFile(f) err = fileAdder.addFile(f)
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -289,7 +300,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No ...@@ -289,7 +300,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No
unlock := n.Blockstore.PinLock() unlock := n.Blockstore.PinLock()
defer unlock() defer unlock()
err = fileAdder.AddFile(file) err = fileAdder.addFile(file)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
...@@ -330,12 +341,24 @@ func (params *Adder) addNode(node *dag.Node, path string) error { ...@@ -330,12 +341,24 @@ func (params *Adder) addNode(node *dag.Node, path string) error {
// Add the given file while respecting the params. // Add the given file while respecting the params.
func (params *Adder) AddFile(file files.File) error { func (params *Adder) AddFile(file files.File) error {
params.unlock = params.node.Blockstore.PinLock()
defer params.unlock()
return params.addFile(file)
}
func (adder *Adder) addFile(file files.File) error {
err := adder.maybePauseForGC()
if err != nil {
return err
}
switch { switch {
case files.IsHidden(file) && !params.Hidden: case files.IsHidden(file) && !adder.Hidden:
log.Debugf("%s is hidden, skipping", file.FileName()) log.Debugf("%s is hidden, skipping", file.FileName())
return &hiddenFileError{file.FileName()} return &hiddenFileError{file.FileName()}
case file.IsDirectory(): case file.IsDirectory():
return params.addDir(file) return adder.addDir(file)
} }
// case for symlink // case for symlink
...@@ -346,29 +369,29 @@ func (params *Adder) AddFile(file files.File) error { ...@@ -346,29 +369,29 @@ func (params *Adder) AddFile(file files.File) error {
} }
dagnode := &dag.Node{Data: sdata} dagnode := &dag.Node{Data: sdata}
_, err = params.node.DAG.Add(dagnode) _, err = adder.node.DAG.Add(dagnode)
if err != nil { if err != nil {
return err return err
} }
return params.addNode(dagnode, s.FileName()) return adder.addNode(dagnode, s.FileName())
} }
// case for regular file // case for regular file
// if the progress flag was specified, wrap the file so that we can send // if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel) // progress updates to the client (over the output channel)
var reader io.Reader = file var reader io.Reader = file
if params.Progress { if adder.Progress {
reader = &progressReader{file: file, out: params.out} reader = &progressReader{file: file, out: adder.out}
} }
dagnode, err := params.add(reader) dagnode, err := adder.add(reader)
if err != nil { if err != nil {
return err return err
} }
// patch it into the root // patch it into the root
return params.addNode(dagnode, file.FileName()) return adder.addNode(dagnode, file.FileName())
} }
func (params *Adder) addDir(dir files.File) error { func (params *Adder) addDir(dir files.File) error {
...@@ -388,7 +411,7 @@ func (params *Adder) addDir(dir files.File) error { ...@@ -388,7 +411,7 @@ func (params *Adder) addDir(dir files.File) error {
break break
} }
err = params.AddFile(file) err = params.addFile(file)
if _, ok := err.(*hiddenFileError); ok { if _, ok := err.(*hiddenFileError); ok {
// hidden file error, skip file // hidden file error, skip file
continue continue
...@@ -400,6 +423,19 @@ func (params *Adder) addDir(dir files.File) error { ...@@ -400,6 +423,19 @@ func (params *Adder) addDir(dir files.File) error {
return nil return nil
} }
func (adder *Adder) maybePauseForGC() error {
if adder.node.Blockstore.GCRequested() {
err := adder.PinRoot()
if err != nil {
return err
}
adder.unlock()
adder.unlock = adder.node.Blockstore.PinLock()
}
return nil
}
// outputDagnode sends dagnode info over the output channel // outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error { func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
if out == nil { if out == nil {
......
...@@ -24,7 +24,6 @@ var log = logging.Logger("gc") ...@@ -24,7 +24,6 @@ var log = logging.Logger("gc")
// deletes any block that is not found in the marked set. // deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) { func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) {
unlock := bs.GCLock() unlock := bs.GCLock()
defer unlock()
bsrv := bserv.New(bs, offline.Exchange(bs)) bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv) ds := dag.NewDAGService(bsrv)
...@@ -42,6 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key. ...@@ -42,6 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.
output := make(chan key.Key) output := make(chan key.Key)
go func() { go func() {
defer close(output) defer close(output)
defer unlock()
for { for {
select { select {
case k, ok := <-keychan: case k, ok := <-keychan:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论