提交 87ecb92f 作者: Jeromy Johnson 提交者: GitHub

Merge pull request #3225 from ipfs/kevina/rm-block-refactor

"block rm": move core functionally into blockstore_util package
package blockstore_util
import (
"fmt"
"io"
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/pin"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)
// RemovedBlock is used to respresent the result of removing a block.
// If a block was removed successfully than the Error string will be
// empty. If a block could not be removed than Error will contain the
// reason the block could not be removed. If the removal was aborted
// due to a fatal error Hash will be be empty, Error will contain the
// reason, and no more results will be sent.
type RemovedBlock struct {
Hash string `json:",omitempty"`
Error string `json:",omitempty"`
}
type RmBlocksOpts struct {
Prefix string
Quiet bool
Force bool
}
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error {
go func() {
defer close(out)
unlocker := blocks.GCLock()
defer unlocker.Unlock()
stillOkay := FilterPinned(pins, out, cids)
for _, c := range stillOkay {
err := blocks.DeleteBlock(key.Key(c.Hash()))
if err != nil && opts.Force && (err == bs.ErrNotFound || err == ds.ErrNotFound) {
// ignore non-existent blocks
} else if err != nil {
out <- &RemovedBlock{Hash: c.String(), Error: err.Error()}
} else if !opts.Quiet {
out <- &RemovedBlock{Hash: c.String()}
}
}
}()
return nil
}
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid {
stillOkay := make([]*cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
if err != nil {
out <- &RemovedBlock{Error: fmt.Sprintf("pin check failed: %s", err)}
return nil
}
for _, r := range res {
if !r.Pinned() {
stillOkay = append(stillOkay, r.Key)
} else {
out <- &RemovedBlock{
Hash: r.Key.String(),
Error: r.String(),
}
}
}
return stillOkay
}
func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error {
someFailed := false
for res := range in {
r := res.(*RemovedBlock)
if r.Hash == "" && r.Error != "" {
return fmt.Errorf("aborted: %s", r.Error)
} else if r.Error != "" {
someFailed = true
fmt.Fprintf(serr, "cannot remove %s: %s\n", r.Hash, r.Error)
} else {
fmt.Fprintf(sout, "removed %s\n", r.Hash)
}
}
if someFailed {
return fmt.Errorf("some blocks not removed")
}
return nil
}
......@@ -9,12 +9,9 @@ import (
"strings"
"github.com/ipfs/go-ipfs/blocks"
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
util "github.com/ipfs/go-ipfs/blocks/blockstore/util"
cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/pin"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)
......@@ -224,19 +221,15 @@ It takes a list of base58 encoded multihashs to remove.
cids = append(cids, c)
}
outChan := make(chan interface{})
err = util.RmBlocks(n.Blockstore, n.Pinning, outChan, cids, util.RmBlocksOpts{
Quiet: quiet,
Force: force,
})
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput((<-chan interface{})(outChan))
go func() {
defer close(outChan)
pinning := n.Pinning
err := rmBlocks(n.Blockstore, pinning, outChan, cids, rmBlocksOpts{
quiet: quiet,
force: force,
})
if err != nil {
outChan <- &RemovedBlock{Error: err.Error()}
}
}()
return
},
PostRun: func(req cmds.Request, res cmds.Response) {
if res.Error() != nil {
......@@ -249,79 +242,10 @@ It takes a list of base58 encoded multihashs to remove.
}
res.SetOutput(nil)
someFailed := false
for out := range outChan {
o := out.(*RemovedBlock)
if o.Hash == "" && o.Error != "" {
res.SetError(fmt.Errorf("aborted: %s", o.Error), cmds.ErrNormal)
return
} else if o.Error != "" {
someFailed = true
fmt.Fprintf(res.Stderr(), "cannot remove %s: %s\n", o.Hash, o.Error)
} else {
fmt.Fprintf(res.Stdout(), "removed %s\n", o.Hash)
}
}
if someFailed {
res.SetError(fmt.Errorf("some blocks not removed"), cmds.ErrNormal)
err := util.ProcRmOutput(outChan, res.Stdout(), res.Stderr())
if err != nil {
res.SetError(err, cmds.ErrNormal)
}
},
Type: RemovedBlock{},
}
type RemovedBlock struct {
Hash string `json:",omitempty"`
Error string `json:",omitempty"`
}
type rmBlocksOpts struct {
quiet bool
force bool
}
func rmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts rmBlocksOpts) error {
unlocker := blocks.GCLock()
defer unlocker.Unlock()
stillOkay, err := checkIfPinned(pins, cids, out)
if err != nil {
return fmt.Errorf("pin check failed: %s", err)
}
for _, c := range stillOkay {
err := blocks.DeleteBlock(key.Key(c.Hash()))
if err != nil && opts.force && (err == bs.ErrNotFound || err == ds.ErrNotFound) {
// ignore non-existent blocks
} else if err != nil {
out <- &RemovedBlock{Hash: c.String(), Error: err.Error()}
} else if !opts.quiet {
out <- &RemovedBlock{Hash: c.String()}
}
}
return nil
}
func checkIfPinned(pins pin.Pinner, cids []*cid.Cid, out chan<- interface{}) ([]*cid.Cid, error) {
stillOkay := make([]*cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
if err != nil {
return nil, err
}
for _, r := range res {
switch r.Mode {
case pin.NotPinned:
stillOkay = append(stillOkay, r.Key)
case pin.Indirect:
out <- &RemovedBlock{
Hash: r.Key.String(),
Error: fmt.Sprintf("pinned via %s", r.Via)}
default:
modeStr, _ := pin.PinModeToString(r.Mode)
out <- &RemovedBlock{
Hash: r.Key.String(),
Error: fmt.Sprintf("pinned: %s", modeStr)}
}
}
return stillOkay, nil
Type: util.RemovedBlock{},
}
......@@ -111,6 +111,26 @@ type Pinned struct {
Via *cid.Cid
}
func (p Pinned) Pinned() bool {
if p.Mode == NotPinned {
return false
} else {
return true
}
}
func (p Pinned) String() string {
switch p.Mode {
case NotPinned:
return "not pinned"
case Indirect:
return fmt.Sprintf("pinned via %s", p.Via)
default:
modeStr, _ := PinModeToString(p.Mode)
return fmt.Sprintf("pinned: %s", modeStr)
}
}
// pinner implements the Pinner interface
type pinner struct {
lock sync.RWMutex
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论