提交 23d41e08 作者: Jeromy

address comments from CR and fix random failures

License: MIT
Signed-off-by: 's avatarJeromy <jeromyj@gmail.com>
上级 1ce310be
...@@ -30,10 +30,18 @@ type Client interface { ...@@ -30,10 +30,18 @@ type Client interface {
type client struct { type client struct {
serverAddress string serverAddress string
httpClient http.Client
} }
func NewClient(address string) Client { func NewClient(address string) Client {
return &client{address} return &client{
serverAddress: address,
httpClient: http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
},
}
} }
func (c *client) Send(req cmds.Request) (cmds.Response, error) { func (c *client) Send(req cmds.Request) (cmds.Response, error) {
...@@ -85,20 +93,20 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) { ...@@ -85,20 +93,20 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
// TODO extract string consts? // TODO extract string consts?
if fileReader != nil { if fileReader != nil {
httpReq.Header.Set("Content-Type", "multipart/form-data; boundary="+fileReader.Boundary()) httpReq.Header.Set(contentTypeHeader, "multipart/form-data; boundary="+fileReader.Boundary())
httpReq.Header.Set("Content-Disposition", "form-data: name=\"files\"") httpReq.Header.Set(contentDispHeader, "form-data: name=\"files\"")
} else { } else {
httpReq.Header.Set("Content-Type", "application/octet-stream") httpReq.Header.Set(contentTypeHeader, applicationOctetStream)
} }
version := config.CurrentVersionNumber version := config.CurrentVersionNumber
httpReq.Header.Set("User-Agent", fmt.Sprintf("/go-ipfs/%s/", version)) httpReq.Header.Set(uaHeader, fmt.Sprintf("/go-ipfs/%s/", version))
ec := make(chan error, 1) ec := make(chan error, 1)
rc := make(chan cmds.Response, 1) rc := make(chan cmds.Response, 1)
dc := req.Context().Done() dc := req.Context().Done()
go func() { go func() {
httpRes, err := http.DefaultClient.Do(httpReq) httpRes, err := c.httpClient.Do(httpReq)
if err != nil { if err != nil {
ec <- err ec <- err
return return
...@@ -182,24 +190,25 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -182,24 +190,25 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
res.SetLength(length) res.SetLength(length)
} }
res.SetCloser(httpRes.Body) rr := &httpResponseReader{httpRes}
res.SetCloser(rr)
if contentType != "application/json" { if contentType != applicationJson {
// for all non json output types, just stream back the output // for all non json output types, just stream back the output
res.SetOutput(&httpResponseReader{httpRes}) res.SetOutput(rr)
return res, nil return res, nil
} else if len(httpRes.Header.Get(channelHeader)) > 0 { } else if len(httpRes.Header.Get(channelHeader)) > 0 {
// if output is coming from a channel, decode each chunk // if output is coming from a channel, decode each chunk
outChan := make(chan interface{}) outChan := make(chan interface{})
go readStreamedJson(req, httpRes, outChan) go readStreamedJson(req, rr, outChan)
res.SetOutput((<-chan interface{})(outChan)) res.SetOutput((<-chan interface{})(outChan))
return res, nil return res, nil
} }
dec := json.NewDecoder(&httpResponseReader{httpRes}) dec := json.NewDecoder(rr)
// If we ran into an error // If we ran into an error
if httpRes.StatusCode >= http.StatusBadRequest { if httpRes.StatusCode >= http.StatusBadRequest {
...@@ -211,7 +220,7 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -211,7 +220,7 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
e.Message = "Command not found." e.Message = "Command not found."
e.Code = cmds.ErrClient e.Code = cmds.ErrClient
case contentType == "text/plain": case contentType == plainText:
// handle non-marshalled errors // handle non-marshalled errors
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)
io.Copy(buf, httpRes.Body) io.Copy(buf, httpRes.Body)
...@@ -244,9 +253,9 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -244,9 +253,9 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
// read json objects off of the given stream, and write the objects out to // read json objects off of the given stream, and write the objects out to
// the 'out' channel // the 'out' channel
func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- interface{}) { func readStreamedJson(req cmds.Request, rr io.Reader, out chan<- interface{}) {
defer close(out) defer close(out)
dec := json.NewDecoder(&httpResponseReader{httpRes}) dec := json.NewDecoder(rr)
outputType := reflect.TypeOf(req.Command().Type) outputType := reflect.TypeOf(req.Command().Type)
ctx := req.Context() ctx := req.Context()
...@@ -254,9 +263,7 @@ func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- inter ...@@ -254,9 +263,7 @@ func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- inter
for { for {
v, err := decodeTypedVal(outputType, dec) v, err := decodeTypedVal(outputType, dec)
if err != nil { if err != nil {
// since we are just looping reading on the response, the only way to // reading on a closed response body is as good as an io.EOF here
// know we are 'done' is for the consumer to close the response body.
// doing so doesnt throw an io.EOF, but we want to treat it like one.
if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) { if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
log.Error(err) log.Error(err)
} }
...@@ -268,7 +275,6 @@ func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- inter ...@@ -268,7 +275,6 @@ func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- inter
return return
case out <- v: case out <- v:
} }
} }
} }
......
...@@ -36,10 +36,14 @@ const ( ...@@ -36,10 +36,14 @@ const (
StreamErrHeader = "X-Stream-Error" StreamErrHeader = "X-Stream-Error"
streamHeader = "X-Stream-Output" streamHeader = "X-Stream-Output"
channelHeader = "X-Chunked-Output" channelHeader = "X-Chunked-Output"
uaHeader = "User-Agent"
contentTypeHeader = "Content-Type" contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length" contentLengthHeader = "Content-Length"
contentDispHeader = "Content-Disposition"
transferEncodingHeader = "Transfer-Encoding" transferEncodingHeader = "Transfer-Encoding"
applicationJson = "application/json" applicationJson = "application/json"
applicationOctetStream = "application/octet-stream"
plainText = "text/plain"
) )
var mimeTypes = map[string]string{ var mimeTypes = map[string]string{
...@@ -156,7 +160,7 @@ func sendResponse(w http.ResponseWriter, req cmds.Request, res cmds.Response) { ...@@ -156,7 +160,7 @@ func sendResponse(w http.ResponseWriter, req cmds.Request, res cmds.Response) {
return return
} }
status := 200 status := http.StatusOK
// if response contains an error, write an HTTP error status code // if response contains an error, write an HTTP error status code
if e := res.Error(); e != nil { if e := res.Error(); e != nil {
if e.Code == cmds.ErrClient { if e.Code == cmds.ErrClient {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论