提交 aaaf884c 作者: Juan Benet

Merge pull request #1519 from ipfs/get-fix

Implement http trailers for error handling
......@@ -3,6 +3,7 @@ package http
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
......@@ -29,10 +30,21 @@ type Client interface {
type client struct {
serverAddress string
httpClient http.Client
}
func NewClient(address string) Client {
return &client{address}
// We cannot use the default transport because of a bug in go's connection reuse
// code. It causes random failures in the connection including io.EOF and connection
// refused on 'client.Do'
return &client{
serverAddress: address,
httpClient: http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
},
}
}
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
......@@ -84,20 +96,20 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
// TODO extract string consts?
if fileReader != nil {
httpReq.Header.Set("Content-Type", "multipart/form-data; boundary="+fileReader.Boundary())
httpReq.Header.Set("Content-Disposition", "form-data: name=\"files\"")
httpReq.Header.Set(contentTypeHeader, "multipart/form-data; boundary="+fileReader.Boundary())
httpReq.Header.Set(contentDispHeader, "form-data: name=\"files\"")
} else {
httpReq.Header.Set("Content-Type", "application/octet-stream")
httpReq.Header.Set(contentTypeHeader, applicationOctetStream)
}
version := config.CurrentVersionNumber
httpReq.Header.Set("User-Agent", fmt.Sprintf("/go-ipfs/%s/", version))
httpReq.Header.Set(uaHeader, fmt.Sprintf("/go-ipfs/%s/", version))
ec := make(chan error, 1)
rc := make(chan cmds.Response, 1)
dc := req.Context().Done()
go func() {
httpRes, err := http.DefaultClient.Do(httpReq)
httpRes, err := c.httpClient.Do(httpReq)
if err != nil {
ec <- err
return
......@@ -181,80 +193,44 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
res.SetLength(length)
}
res.SetCloser(httpRes.Body)
rr := &httpResponseReader{httpRes}
res.SetCloser(rr)
if len(httpRes.Header.Get(streamHeader)) > 0 {
// if output is a stream, we can just use the body reader
res.SetOutput(httpRes.Body)
if contentType != applicationJson {
// for all non json output types, just stream back the output
res.SetOutput(rr)
return res, nil
} else if len(httpRes.Header.Get(channelHeader)) > 0 {
// if output is coming from a channel, decode each chunk
outChan := make(chan interface{})
go func() {
dec := json.NewDecoder(httpRes.Body)
outputType := reflect.TypeOf(req.Command().Type)
ctx := req.Context()
for {
var v interface{}
var err error
if outputType != nil {
v = reflect.New(outputType).Interface()
err = dec.Decode(v)
} else {
err = dec.Decode(&v)
}
// since we are just looping reading on the response, the only way to
// know we are 'done' is for the consumer to close the response body.
// doing so doesnt throw an io.EOF, but we want to treat it like one.
if err != nil && strings.Contains(err.Error(), "read on closed response body") {
err = io.EOF
}
if err != nil && err != io.EOF {
log.Error(err)
return
}
select {
case <-ctx.Done():
close(outChan)
return
default:
}
if err == io.EOF {
close(outChan)
return
}
outChan <- v
}
}()
go readStreamedJson(req, rr, outChan)
res.SetOutput((<-chan interface{})(outChan))
return res, nil
}
dec := json.NewDecoder(httpRes.Body)
dec := json.NewDecoder(rr)
// If we ran into an error
if httpRes.StatusCode >= http.StatusBadRequest {
e := cmds.Error{}
if httpRes.StatusCode == http.StatusNotFound {
switch {
case httpRes.StatusCode == http.StatusNotFound:
// handle 404s
e.Message = "Command not found."
e.Code = cmds.ErrClient
} else if contentType == "text/plain" {
case contentType == plainText:
// handle non-marshalled errors
buf := bytes.NewBuffer(nil)
io.Copy(buf, httpRes.Body)
e.Message = string(buf.Bytes())
e.Code = cmds.ErrNormal
} else {
default:
// handle marshalled errors
err = dec.Decode(&e)
if err != nil {
......@@ -264,23 +240,88 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
res.SetError(e, e.Code)
} else {
outputType := reflect.TypeOf(req.Command().Type)
var v interface{}
if outputType != nil {
v = reflect.New(outputType).Interface()
err = dec.Decode(v)
} else {
err = dec.Decode(&v)
return res, nil
}
outputType := reflect.TypeOf(req.Command().Type)
v, err := decodeTypedVal(outputType, dec)
if err != nil && err != io.EOF {
return nil, err
}
res.SetOutput(v)
return res, nil
}
// read json objects off of the given stream, and write the objects out to
// the 'out' channel
func readStreamedJson(req cmds.Request, rr io.Reader, out chan<- interface{}) {
defer close(out)
dec := json.NewDecoder(rr)
outputType := reflect.TypeOf(req.Command().Type)
ctx := req.Context()
for {
v, err := decodeTypedVal(outputType, dec)
if err != nil {
// reading on a closed response body is as good as an io.EOF here
if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
log.Error(err)
}
return
}
if err != nil && err != io.EOF {
return nil, err
select {
case <-ctx.Done():
return
case out <- v:
}
if v != nil {
res.SetOutput(v)
}
}
// decode a value of the given type, if the type is nil, attempt to decode into
// an interface{} anyways
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
var v interface{}
var err error
if t != nil {
v = reflect.New(t).Interface()
err = dec.Decode(v)
} else {
err = dec.Decode(&v)
}
return v, err
}
// httpResponseReader reads from the response body, and checks for an error
// in the http trailer upon EOF, this error if present is returned instead
// of the EOF.
type httpResponseReader struct {
resp *http.Response
}
func (r *httpResponseReader) Read(b []byte) (int, error) {
n, err := r.resp.Body.Read(b)
if err == io.EOF {
_ = r.resp.Body.Close()
trailerErr := r.checkError()
if trailerErr != nil {
return n, trailerErr
}
}
return n, err
}
return res, nil
func (r *httpResponseReader) checkError() error {
if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
return errors.New(e)
}
return nil
}
func (r *httpResponseReader) Close() error {
return r.resp.Body.Close()
}
package http
import (
"bufio"
"errors"
"fmt"
"io"
......@@ -32,12 +33,17 @@ type Handler struct {
var ErrNotFound = errors.New("404 page not found")
const (
StreamErrHeader = "X-Stream-Error"
streamHeader = "X-Stream-Output"
channelHeader = "X-Chunked-Output"
uaHeader = "User-Agent"
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
contentDispHeader = "Content-Disposition"
transferEncodingHeader = "Transfer-Encoding"
applicationJson = "application/json"
applicationOctetStream = "application/octet-stream"
plainText = "text/plain"
)
var mimeTypes = map[string]string{
......@@ -70,6 +76,11 @@ func NewHandler(ctx cmds.Context, root *cmds.Command, allowedOrigin string) *Han
return &Handler{internal, c.Handler(internal)}
}
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Call the CORS handler which wraps the internal handler.
i.corsHandler.ServeHTTP(w, r)
}
func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug("Incoming API request: ", r.URL)
......@@ -101,8 +112,8 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// get the node's context to pass into the commands.
node, err := i.ctx.GetNode()
if err != nil {
err = fmt.Errorf("cmds/http: couldn't GetNode(): %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
s := fmt.Sprintf("cmds/http: couldn't GetNode(): %s", err)
http.Error(w, s, http.StatusInternalServerError)
return
}
......@@ -117,46 +128,60 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// call the command
res := i.root.Call(req)
// set the Content-Type based on res output
// now handle responding to the client properly
sendResponse(w, req, res)
}
func guessMimeType(res cmds.Response) (string, error) {
if _, ok := res.Output().(io.Reader); ok {
// we don't set the Content-Type for streams, so that browsers can MIME-sniff the type themselves
// we set this header so clients have a way to know this is an output stream
// (not marshalled command output)
// TODO: set a specific Content-Type if the command response needs it to be a certain type
w.Header().Set(streamHeader, "1")
return "", nil
}
} else {
enc, found, err := req.Option(cmds.EncShort).String()
if err != nil || !found {
w.WriteHeader(http.StatusInternalServerError)
return
}
mime := mimeTypes[enc]
w.Header().Set(contentTypeHeader, mime)
// Try to guess mimeType from the encoding option
enc, found, err := res.Request().Option(cmds.EncShort).String()
if err != nil {
return "", err
}
if !found {
return "", errors.New("no encoding option set")
}
// set the Content-Length from the response length
if res.Length() > 0 {
w.Header().Set(contentLengthHeader, strconv.FormatUint(res.Length(), 10))
return mimeTypes[enc], nil
}
func sendResponse(w http.ResponseWriter, req cmds.Request, res cmds.Response) {
mime, err := guessMimeType(res)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
status := http.StatusOK
// if response contains an error, write an HTTP error status code
if e := res.Error(); e != nil {
if e.Code == cmds.ErrClient {
w.WriteHeader(http.StatusBadRequest)
status = http.StatusBadRequest
} else {
w.WriteHeader(http.StatusInternalServerError)
status = http.StatusInternalServerError
}
// NOTE: The error will actually be written out by the reader below
}
out, err := res.Reader()
if err != nil {
w.Header().Set(contentTypeHeader, "text/plain")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
h := w.Header()
if res.Length() > 0 {
h.Set(contentLengthHeader, strconv.FormatUint(res.Length(), 10))
}
// if output is a channel and user requested streaming channels,
// use chunk copier for the output
_, isChan := res.Output().(chan interface{})
......@@ -165,44 +190,32 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
streamChans, _, _ := req.Option("stream-channels").Bool()
if isChan && streamChans {
// w.WriteString(transferEncodingHeader + ": chunked\r\n")
// w.Header().Set(channelHeader, "1")
// w.WriteHeader(200)
err = copyChunks(applicationJson, w, out)
if err != nil {
log.Debug("copy chunks error: ", err)
if isChan {
h.Set(channelHeader, "1")
if streamChans {
// streaming output from a channel will always be json objects
mime = applicationJson
}
return
}
err = flushCopy(w, out)
if err != nil {
log.Debug("Flush copy returned an error: ", err)
if mime != "" {
h.Set(contentTypeHeader, mime)
}
}
h.Set(streamHeader, "1")
h.Set(transferEncodingHeader, "chunked")
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Call the CORS handler which wraps the internal handler.
i.corsHandler.ServeHTTP(w, r)
}
// flushCopy Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func flushCopy(w http.ResponseWriter, out io.Reader) error {
if _, ok := w.(http.Flusher); !ok {
return copyChunks("", w, out)
if err := writeResponse(status, w, out); err != nil {
log.Error("error while writing stream", err)
}
_, err := io.Copy(&flushResponse{w}, out)
return err
}
// Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error {
func writeResponse(status int, w http.ResponseWriter, out io.Reader) error {
// hijack the connection so we can write our own chunked output and trailers
hijacker, ok := w.(http.Hijacker)
if !ok {
log.Error("Failed to create hijacker! cannot continue!")
return errors.New("Could not create hijacker")
}
conn, writer, err := hijacker.Hijack()
......@@ -211,29 +224,47 @@ func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error
}
defer conn.Close()
writer.WriteString("HTTP/1.1 200 OK\r\n")
if contentType != "" {
writer.WriteString(contentTypeHeader + ": " + contentType + "\r\n")
// write status
writer.WriteString(fmt.Sprintf("HTTP/1.1 %d %s\r\n", status, http.StatusText(status)))
// Write out headers
w.Header().Write(writer)
// end of headers
writer.WriteString("\r\n")
// write body
streamErr := writeChunks(out, writer)
// close body
writer.WriteString("0\r\n")
// if there was a stream error, write out an error trailer. hopefully
// the client will pick it up!
if streamErr != nil {
writer.WriteString(StreamErrHeader + ": " + sanitizedErrStr(streamErr) + "\r\n")
}
writer.WriteString(transferEncodingHeader + ": chunked\r\n")
writer.WriteString(channelHeader + ": 1\r\n\r\n")
writer.WriteString("\r\n") // close response
writer.Flush()
return streamErr
}
func writeChunks(r io.Reader, w *bufio.ReadWriter) error {
buf := make([]byte, 32*1024)
for {
n, err := out.Read(buf)
n, err := r.Read(buf)
if n > 0 {
length := fmt.Sprintf("%x\r\n", n)
writer.WriteString(length)
w.WriteString(length)
_, err := writer.Write(buf[0:n])
_, err := w.Write(buf[0:n])
if err != nil {
return err
}
writer.WriteString("\r\n")
writer.Flush()
w.WriteString("\r\n")
w.Flush()
}
if err != nil && err != io.EOF {
......@@ -243,25 +274,12 @@ func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error
break
}
}
writer.WriteString("0\r\n\r\n")
writer.Flush()
return nil
}
type flushResponse struct {
W http.ResponseWriter
}
func (fr *flushResponse) Write(buf []byte) (int, error) {
n, err := fr.W.Write(buf)
if err != nil {
return n, err
}
if flusher, ok := fr.W.(http.Flusher); ok {
flusher.Flush()
}
return n, err
func sanitizedErrStr(err error) string {
s := err.Error()
s = strings.Split(s, "\n")[0]
s = strings.Split(s, "\r")[0]
return s
}
......@@ -120,8 +120,7 @@ may also specify the level of compression by specifying '-l=<1-9>'.
bar.Start()
defer bar.Finish()
_, err = io.Copy(file, pbReader)
if err != nil {
if _, err := io.Copy(file, pbReader); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
......@@ -140,10 +139,8 @@ may also specify the level of compression by specifying '-l=<1-9>'.
bar.Start()
defer bar.Finish()
extractor := &tar.Extractor{outPath}
err = extractor.Extract(reader)
if err != nil {
if err := extractor.Extract(reader); err != nil {
res.SetError(err, cmds.ErrNormal)
}
},
......@@ -169,7 +166,7 @@ func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int)
return nil, err
}
return utar.NewReader(p, node.DAG, dagnode, compression)
return utar.NewReader(ctx, p, node.DAG, dagnode, compression)
}
// getZip is equivalent to `ipfs getdag $hash | gzip`
......
......@@ -39,15 +39,13 @@ func (te *Extractor) Extract(reader io.Reader) error {
}
if header.Typeflag == tar.TypeDir {
err = te.extractDir(header, i, exists)
if err != nil {
if err := te.extractDir(header, i, exists); err != nil {
return err
}
continue
}
err = te.extractFile(header, tarReader, i, exists, pathIsDir)
if err != nil {
if err := te.extractFile(header, tarReader, i, exists, pathIsDir); err != nil {
return err
}
}
......
......@@ -9,7 +9,7 @@ import (
"time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
......@@ -28,7 +28,7 @@ type Reader struct {
err error
}
func NewReader(path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compression int) (*Reader, error) {
func NewReader(ctx cxt.Context, path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compression int) (*Reader, error) {
reader := &Reader{
signalChan: make(chan struct{}),
......@@ -49,12 +49,11 @@ func NewReader(path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compress
// writeToBuf will write the data to the buffer, and will signal when there
// is new data to read
_, filename := gopath.Split(path.String())
go reader.writeToBuf(dagnode, filename, 0)
go reader.writeToBuf(ctx, dagnode, filename, 0)
return reader, nil
}
func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
func (r *Reader) writeToBuf(ctx cxt.Context, dagnode *mdag.Node, path string, depth int) {
pb := new(upb.Data)
err := proto.Unmarshal(dagnode.Data, pb)
if err != nil {
......@@ -80,16 +79,13 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
}
r.flush()
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*60)
defer cancel()
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get(ctx)
if err != nil {
r.emitError(err)
return
}
r.writeToBuf(childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
r.writeToBuf(ctx, childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
}
return
}
......@@ -108,7 +104,7 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
}
r.flush()
reader, err := uio.NewDagReader(context.TODO(), dagnode, r.dag)
reader, err := uio.NewDagReader(ctx, dagnode, r.dag)
if err != nil {
r.emitError(err)
return
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论