提交 1a835202 作者: Jakub Sztandera 提交者: Kevin Atkinson

filestore: add URLStore

License: MIT
Signed-off-by: 's avatarJakub Sztandera <kubuxu@protonmail.ch>
上级 92ac43af
......@@ -210,6 +210,8 @@ func TestCommands(t *testing.T) {
"/tar/add",
"/tar/cat",
"/update",
"/urlstore",
"/urlstore/add",
"/version",
}
......
......@@ -5,12 +5,12 @@ import (
"strings"
oldcmds "github.com/ipfs/go-ipfs/commands"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
dag "github.com/ipfs/go-ipfs/core/commands/dag"
e "github.com/ipfs/go-ipfs/core/commands/e"
ocmd "github.com/ipfs/go-ipfs/core/commands/object"
unixfs "github.com/ipfs/go-ipfs/core/commands/unixfs"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
"gx/ipfs/QmNueRyPRQiV7PUEpnP4GgGLuK1rKQLaRW7sfPvUetYig1/go-ipfs-cmds"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
"gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit"
......@@ -136,6 +136,7 @@ var rootSubcommands = map[string]*cmds.Command{
"tar": lgc.NewCommand(TarCmd),
"file": lgc.NewCommand(unixfs.UnixFSCmd),
"update": lgc.NewCommand(ExternalBinary()),
"urlstore": lgc.NewCommand(UrlStoreCmd),
"version": lgc.NewCommand(VersionCmd),
"shutdown": lgc.NewCommand(daemonShutdownCmd),
}
......
package commands
import (
"fmt"
"io"
"net/http"
"strings"
cmds "github.com/ipfs/go-ipfs/commands"
balanced "github.com/ipfs/go-ipfs/importer/balanced"
ihelper "github.com/ipfs/go-ipfs/importer/helpers"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
chunk "gx/ipfs/QmXnzH7wowyLZy8XJxxaQCVTgLMcDXdMBznmsrmQWCyiQV/go-ipfs-chunker"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
cmdkit "gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit"
)
var UrlStoreCmd = &cmds.Command{
Subcommands: map[string]*cmds.Command{
"add": urlAdd,
},
}
var urlAdd = &cmds.Command{
Arguments: []cmdkit.Argument{
cmdkit.StringArg("url", true, false, "URL to add to IPFS"),
},
Type: BlockStat{},
Run: func(req cmds.Request, res cmds.Response) {
url := req.Arguments()[0]
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
hreq, err := http.NewRequest("GET", url, nil)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
hres, err := http.DefaultClient.Do(hreq)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
if hres.StatusCode != http.StatusOK {
res.SetError(fmt.Errorf("expected code 200, got: %d", hres.StatusCode), cmdkit.ErrNormal)
return
}
chk := chunk.NewSizeSplitter(hres.Body, chunk.DefaultBlockSize)
prefix := cid.NewPrefixV1(cid.DagProtobuf, mh.SHA2_256)
dbp := &ihelper.DagBuilderParams{
Dagserv: n.DAG,
RawLeaves: true,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: true,
Prefix: &prefix,
URL: url,
}
blc, err := balanced.Layout(dbp.New(chk))
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
res.SetOutput(BlockStat{
Key: blc.Cid().String(),
Size: int(hres.ContentLength),
})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
bs := res.Output().(*BlockStat)
return strings.NewReader(bs.Key + "\n"), nil
},
},
}
......@@ -19,12 +19,12 @@ import (
"github.com/ipfs/go-ipfs/pin"
unixfs "github.com/ipfs/go-ipfs/unixfs"
posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
chunker "gx/ipfs/QmXnzH7wowyLZy8XJxxaQCVTgLMcDXdMBznmsrmQWCyiQV/go-ipfs-chunker"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
files "gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit/files"
posinfo "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
bstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
)
......
......@@ -18,9 +18,9 @@ import (
"github.com/ipfs/go-ipfs/repo/config"
blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
pi "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
files "gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit/files"
pi "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
datastore "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
syncds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync"
......
......@@ -11,9 +11,9 @@ import (
"context"
blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
posinfo "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
dsq "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query"
)
......
......@@ -9,8 +9,8 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
posinfo "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
)
......
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
......@@ -12,8 +13,8 @@ import (
dshelp "gx/ipfs/QmNP2u7bofwUQptHQGPfabGWtTCbxhNLSZKqbf1uzsup9V/go-ipfs-ds-help"
proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
posinfo "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
dsns "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/namespace"
......@@ -111,7 +112,6 @@ func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
if err != nil {
return nil, err
}
out, err := f.readDataObj(c, dobj)
if err != nil {
return nil, err
......@@ -120,6 +120,14 @@ func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
return blocks.NewBlockWithCid(out, c)
}
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
if !d.GetURL() {
return f.readFileDataObj(c, d)
} else {
return f.readURLDataObj(c, d)
}
}
func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
o, err := f.ds.Get(dshelp.CidToDsKey(c))
switch err {
......@@ -148,8 +156,7 @@ func unmarshalDataObj(o interface{}) (*pb.DataObj, error) {
return &dobj, nil
}
// reads and verifies the block
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
func (f *FileManager) readFileDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)
......@@ -187,6 +194,46 @@ func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
return outbuf, nil
}
// reads and verifies the block from URL
func (f *FileManager) readURLDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
req, err := http.NewRequest("GET", d.GetFilePath(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", d.GetOffset(), d.GetOffset()+d.GetSize_()-1))
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusPartialContent {
return nil, fmt.Errorf("expected HTTP 206 got %d", res.StatusCode)
}
outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(res.Body, outbuf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
res.Body.Close()
outcid, err := c.Prefix().Sum(outbuf)
if err != nil {
return nil, err
}
if !c.Equals(outcid) {
return nil, &CorruptReferenceError{StatusFileChanged,
fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
}
return outbuf, nil
}
// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
func (f *FileManager) Has(c *cid.Cid) (bool, error) {
......@@ -209,16 +256,21 @@ func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
var dobj pb.DataObj
if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
}
if !b.PosInfo.IsURL {
if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
}
p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
if err != nil {
return err
}
p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
if err != nil {
return err
}
dobj.FilePath = proto.String(filepath.ToSlash(p))
dobj.FilePath = proto.String(filepath.ToSlash(p))
} else {
dobj.FilePath = proto.String(b.PosInfo.FullPath)
dobj.URL = proto.Bool(true)
}
dobj.Offset = proto.Uint64(b.PosInfo.Offset)
dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))
......
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
include mk/header.mk
all: $(GO)
PB_$(d) = $(wildcard $(d)/*.proto)
TGTS_$(d) = $(PB_$(d):.proto=.pb.go)
%.pb.go: %.proto
protoc --gogo_out=. $<
#DEPS_GO += $(TGTS_$(d))
clean:
rm *.pb.go
include mk/footer.mk
// Code generated by protoc-gen-gogo.
// source: dataobj.proto
// source: filestore/pb/dataobj.proto
// DO NOT EDIT!
/*
Package datastore_pb is a generated protocol buffer package.
It is generated from these files:
dataobj.proto
filestore/pb/dataobj.proto
It has these top-level messages:
DataObj
......@@ -26,6 +26,7 @@ type DataObj struct {
FilePath *string `protobuf:"bytes,1,opt,name=FilePath" json:"FilePath,omitempty"`
Offset *uint64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"`
Size_ *uint64 `protobuf:"varint,3,opt,name=Size" json:"Size,omitempty"`
URL *bool `protobuf:"varint,4,opt,name=URL" json:"URL,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
......@@ -54,6 +55,13 @@ func (m *DataObj) GetSize_() uint64 {
return 0
}
func (m *DataObj) GetURL() bool {
if m != nil && m.URL != nil {
return *m.URL
}
return false
}
func init() {
proto.RegisterType((*DataObj)(nil), "datastore.pb.DataObj")
}
......@@ -4,4 +4,5 @@ message DataObj {
optional string FilePath = 1;
optional uint64 Offset = 2;
optional uint64 Size = 3;
optional bool URL = 4;
}
......@@ -25,6 +25,7 @@ type DagBuilderHelper struct {
maxlinks int
batch *ipld.Batch
fullPath string
isUrl bool
stat os.FileInfo
prefix *cid.Prefix
}
......@@ -48,6 +49,8 @@ type DagBuilderParams struct {
// NoCopy signals to the chunker that it should track fileinfo for
// filestore adds
NoCopy bool
URL string
}
// New generates a new DagBuilderHelper from the given params and a given
......@@ -65,6 +68,11 @@ func (dbp *DagBuilderParams) New(spl chunker.Splitter) *DagBuilderHelper {
db.fullPath = fi.AbsPath()
db.stat = fi.Stat()
}
if dbp.URL != "" {
db.fullPath = dbp.URL
db.isUrl = true
}
return db
}
......@@ -206,7 +214,7 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
// from the DagBuilderHelper.
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.fullPath != "" {
node.SetPosInfo(offset, db.fullPath, db.stat)
node.SetPosInfo(offset, db.fullPath, db.stat, db.isUrl)
}
}
......
......@@ -8,9 +8,9 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
pi "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
pi "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
)
// BlockSizeLimit specifies the maximum size an imported block can have.
......@@ -142,11 +142,12 @@ func (n *UnixfsNode) FileSize() uint64 {
// SetPosInfo sets information about the offset of the data of this node in a
// filesystem file.
func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) {
func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo, isUrl bool) {
n.posInfo = &pi.PosInfo{
Offset: offset,
FullPath: fullPath,
Stat: stat,
IsURL: isUrl,
}
}
......
......@@ -422,9 +422,9 @@
},
{
"author": "hector",
"hash": "QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe",
"hash": "QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC",
"name": "go-ipfs-posinfo",
"version": "0.0.2"
"version": "0.0.3"
},
{
"author": "hsanjuan",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论