提交 9360f5ca 作者: Kevin Atkinson

Store needed parts of IpfsNode in Adder.

This will make it easier to set up a specialized data pipeline.

License: MIT
Signed-off-by: 's avatarKevin Atkinson <k@kevina.org>
上级 dbabcf96
...@@ -141,11 +141,13 @@ You can now refer to the added file in a gateway, like so: ...@@ -141,11 +141,13 @@ You can now refer to the added file in a gateway, like so:
outChan := make(chan interface{}, 8) outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan)) res.SetOutput((<-chan interface{})(outChan))
fileAdder, err := coreunix.NewAdder(req.Context(), n, outChan) fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
fileAdder.Out = outChan
fileAdder.Chunker = chunker fileAdder.Chunker = chunker
fileAdder.Progress = progress fileAdder.Progress = progress
fileAdder.Hidden = hidden fileAdder.Hidden = hidden
......
...@@ -67,42 +67,46 @@ type AddedObject struct { ...@@ -67,42 +67,46 @@ type AddedObject struct {
Bytes int64 `json:",omitempty"` Bytes int64 `json:",omitempty"`
} }
func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) (*Adder, error) { func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) {
mr, err := mfs.NewRoot(ctx, n.DAG, newDirNode(), nil) mr, err := mfs.NewRoot(ctx, ds, newDirNode(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Adder{ return &Adder{
mr: mr, mr: mr,
ctx: ctx, ctx: ctx,
node: n, pinning: p,
out: out, blockstore: bs,
Progress: false, dagService: ds,
Hidden: true, Progress: false,
Pin: true, Hidden: true,
Trickle: false, Pin: true,
Wrap: false, Trickle: false,
Chunker: "", Wrap: false,
Chunker: "",
}, nil }, nil
} }
// Internal structure for holding the switches passed to the `add` call // Internal structure for holding the switches passed to the `add` call
type Adder struct { type Adder struct {
ctx context.Context ctx context.Context
node *core.IpfsNode pinning pin.Pinner
out chan interface{} blockstore bstore.GCBlockstore
Progress bool dagService dag.DAGService
Hidden bool Out chan interface{}
Pin bool Progress bool
Trickle bool Hidden bool
Silent bool Pin bool
Wrap bool Trickle bool
Chunker string Silent bool
root *dag.Node Wrap bool
mr *mfs.Root Chunker string
unlocker bs.Unlocker root *dag.Node
tempRoot key.Key mr *mfs.Root
unlocker bs.Unlocker
tempRoot key.Key
} }
// Perform the actual add & pin locally, outputting results to reader // Perform the actual add & pin locally, outputting results to reader
...@@ -114,12 +118,12 @@ func (adder Adder) add(reader io.Reader) (*dag.Node, error) { ...@@ -114,12 +118,12 @@ func (adder Adder) add(reader io.Reader) (*dag.Node, error) {
if adder.Trickle { if adder.Trickle {
return importer.BuildTrickleDagFromReader( return importer.BuildTrickleDagFromReader(
adder.node.DAG, adder.dagService,
chnk, chnk,
) )
} }
return importer.BuildDagFromReader( return importer.BuildDagFromReader(
adder.node.DAG, adder.dagService,
chnk, chnk,
) )
} }
...@@ -137,7 +141,7 @@ func (adder *Adder) RootNode() (*dag.Node, error) { ...@@ -137,7 +141,7 @@ func (adder *Adder) RootNode() (*dag.Node, error) {
// if not wrapping, AND one root file, use that hash as root. // if not wrapping, AND one root file, use that hash as root.
if !adder.Wrap && len(root.Links) == 1 { if !adder.Wrap && len(root.Links) == 1 {
root, err = root.Links[0].GetNode(adder.ctx, adder.node.DAG) root, err = root.Links[0].GetNode(adder.ctx, adder.dagService)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -156,21 +160,21 @@ func (adder *Adder) PinRoot() error { ...@@ -156,21 +160,21 @@ func (adder *Adder) PinRoot() error {
return nil return nil
} }
rnk, err := adder.node.DAG.Add(root) rnk, err := adder.dagService.Add(root)
if err != nil { if err != nil {
return err return err
} }
if adder.tempRoot != "" { if adder.tempRoot != "" {
err := adder.node.Pinning.Unpin(adder.ctx, adder.tempRoot, true) err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
if err != nil { if err != nil {
return err return err
} }
adder.tempRoot = rnk adder.tempRoot = rnk
} }
adder.node.Pinning.PinWithMode(rnk, pin.Recursive) adder.pinning.PinWithMode(rnk, pin.Recursive)
return adder.node.Pinning.Flush() return adder.pinning.Flush()
} }
func (adder *Adder) Finalize() (*dag.Node, error) { func (adder *Adder) Finalize() (*dag.Node, error) {
...@@ -237,7 +241,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error { ...@@ -237,7 +241,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error {
} }
} }
return outputDagnode(adder.out, path, nd) return outputDagnode(adder.Out, path, nd)
} }
// Add builds a merkledag from the a reader, pinning all objects to the local // Add builds a merkledag from the a reader, pinning all objects to the local
...@@ -245,7 +249,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error { ...@@ -245,7 +249,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error {
func Add(n *core.IpfsNode, r io.Reader) (string, error) { func Add(n *core.IpfsNode, r io.Reader) (string, error) {
defer n.Blockstore.PinLock().Unlock() defer n.Blockstore.PinLock().Unlock()
fileAdder, err := NewAdder(n.Context(), n, nil) fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -277,7 +281,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { ...@@ -277,7 +281,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
} }
defer f.Close() defer f.Close()
fileAdder, err := NewAdder(n.Context(), n, nil) fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -306,7 +310,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { ...@@ -306,7 +310,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
// the directory, and and error if any. // the directory, and and error if any.
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) { func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) {
file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil) file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
fileAdder, err := NewAdder(n.Context(), n, nil) fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
...@@ -355,14 +359,14 @@ func (adder *Adder) addNode(node *dag.Node, path string) error { ...@@ -355,14 +359,14 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {
} }
if !adder.Silent { if !adder.Silent {
return outputDagnode(adder.out, path, node) return outputDagnode(adder.Out, path, node)
} }
return nil return nil
} }
// Add the given file while respecting the adder. // Add the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error { func (adder *Adder) AddFile(file files.File) error {
adder.unlocker = adder.node.Blockstore.PinLock() adder.unlocker = adder.blockstore.PinLock()
defer func() { defer func() {
adder.unlocker.Unlock() adder.unlocker.Unlock()
}() }()
...@@ -388,7 +392,7 @@ func (adder *Adder) addFile(file files.File) error { ...@@ -388,7 +392,7 @@ func (adder *Adder) addFile(file files.File) error {
} }
dagnode := &dag.Node{Data: sdata} dagnode := &dag.Node{Data: sdata}
_, err = adder.node.DAG.Add(dagnode) _, err = adder.dagService.Add(dagnode)
if err != nil { if err != nil {
return err return err
} }
...@@ -401,7 +405,7 @@ func (adder *Adder) addFile(file files.File) error { ...@@ -401,7 +405,7 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel) // progress updates to the client (over the output channel)
var reader io.Reader = file var reader io.Reader = file
if adder.Progress { if adder.Progress {
reader = &progressReader{file: file, out: adder.out} reader = &progressReader{file: file, out: adder.Out}
} }
dagnode, err := adder.add(reader) dagnode, err := adder.add(reader)
...@@ -445,14 +449,14 @@ func (adder *Adder) addDir(dir files.File) error { ...@@ -445,14 +449,14 @@ func (adder *Adder) addDir(dir files.File) error {
} }
func (adder *Adder) maybePauseForGC() error { func (adder *Adder) maybePauseForGC() error {
if adder.node.Blockstore.GCRequested() { if adder.blockstore.GCRequested() {
err := adder.PinRoot() err := adder.PinRoot()
if err != nil { if err != nil {
return err return err
} }
adder.unlocker.Unlock() adder.unlocker.Unlock()
adder.unlocker = adder.node.Blockstore.PinLock() adder.unlocker = adder.blockstore.PinLock()
} }
return nil return nil
} }
......
...@@ -54,10 +54,11 @@ func TestAddGCLive(t *testing.T) { ...@@ -54,10 +54,11 @@ func TestAddGCLive(t *testing.T) {
errs := make(chan error) errs := make(chan error)
out := make(chan interface{}) out := make(chan interface{})
adder, err := NewAdder(context.Background(), node, out) adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
adder.Out = out
dataa := ioutil.NopCloser(bytes.NewBufferString("testfileA")) dataa := ioutil.NopCloser(bytes.NewBufferString("testfileA"))
rfa := files.NewReaderFile("a", "a", dataa, nil) rfa := files.NewReaderFile("a", "a", dataa, nil)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论