提交 8830aae9 作者: Jeromy Johnson 提交者: GitHub

Merge pull request #3091 from ipfs/feat/temp-err-retries

datastore: blockstore should retry when it encounters temp errors
......@@ -4,6 +4,9 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
"os"
"syscall"
"time"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
......@@ -14,12 +17,13 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dsync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
dsync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto"
retry "gx/ipfs/QmY6UVhgS2ZxhbM5qU23Fnz3daJwfyAuNErd3StmVofnAU/retry-datastore"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
......@@ -127,14 +131,30 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
return n, nil
}
func isTooManyFDError(err error) bool {
perr, ok := err.(*os.PathError)
if ok && perr.Err == syscall.EMFILE {
return true
}
return false
}
func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
// setup local peer ID (private key is loaded in online setup)
if err := n.loadID(); err != nil {
return err
}
rds := &retry.Datastore{
Batching: n.Repo.Datastore(),
Delay: time.Millisecond * 200,
Retries: 6,
TempErrFunc: isTooManyFDError,
}
var err error
bs := bstore.NewBlockstore(n.Repo.Datastore())
bs := bstore.NewBlockstore(rds)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
......
......@@ -265,7 +265,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
default:
}
err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times
err := bs.blockstore.Put(blk)
if err != nil {
log.Errorf("Error writing block to datastore: %s", err)
return err
......@@ -284,18 +284,6 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return nil
}
func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
var err error
for i := 0; i < attempts; i++ {
if err = bs.blockstore.Put(blk); err == nil {
break
}
time.Sleep(time.Millisecond * time.Duration(400*(i+1)))
}
return err
}
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
......
......@@ -24,8 +24,12 @@ import (
// well under varying conditions
const kNetworkDelay = 0 * time.Millisecond
func getVirtualNetwork() tn.Network {
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}
func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......
......@@ -195,6 +195,18 @@
"hash": "QmaeHSCBd9XjXxmgHEiKkHtLcMCb2eZsPLKT7bHgBfBkqw",
"name": "go-is-domain",
"version": "1.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmY6UVhgS2ZxhbM5qU23Fnz3daJwfyAuNErd3StmVofnAU",
"name": "retry-datastore",
"version": "1.1.0"
},
{
"author": "whyrusleeping",
"hash": "QmdjfJJFxgqqR9skVZDmgiGrbKomSqxpaw12rjLNim5NYR",
"name": "failstore",
"version": "1.0.0"
}
],
"gxVersion": "0.4.0",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论