提交 f03efbd2 作者: Hector Sanjuan 提交者: Steven Allen

Use go-libp2p-http

License: MIT
Signed-off-by: 's avatarHector Sanjuan <hector@protocol.ai>
上级 d30c41a5
package corehttp package corehttp
import ( import (
"bufio"
"fmt" "fmt"
"io"
"net" "net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url"
"strings" "strings"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" p2phttp "gx/ipfs/QmcLYfmHLsaVRKGMZQovwEYhHAjWtRjg1Lij3pnzw5UkRD/go-libp2p-http"
inet "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net"
) )
// ProxyOption is an endpoint for proxying a HTTP request to another ipfs peer // ProxyOption is an endpoint for proxying a HTTP request to another ipfs peer
...@@ -27,23 +25,24 @@ func ProxyOption() ServeOption { ...@@ -27,23 +25,24 @@ func ProxyOption() ServeOption {
return return
} }
// open connect to peer target, err := url.Parse(fmt.Sprintf("libp2p://%s/%s", parsedRequest.target, parsedRequest.httpPath))
stream, err := ipfsNode.P2P.PeerHost.NewStream(request.Context(), parsedRequest.target, protocol.ID("/x/"+parsedRequest.name))
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to open stream '%v' to target peer '%v'", parsedRequest.name, parsedRequest.target) handleError(w, "Failed to parse url", err, 400)
handleError(w, msg, err, 500)
return return
} }
//send proxy request and response to client
newReverseHTTPProxy(parsedRequest, stream).ServeHTTP(w, request) rt := p2phttp.NewTransport(ipfsNode.P2P.PeerHost, p2phttp.ProtocolOption(parsedRequest.name))
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = rt
proxy.ServeHTTP(w, request)
}) })
return mux, nil return mux, nil
} }
} }
type proxyRequest struct { type proxyRequest struct {
target peer.ID target string
name string name protocol.ID
httpPath string // path to send to the proxy-host httpPath string // path to send to the proxy-host
} }
...@@ -57,13 +56,7 @@ func parseRequest(request *http.Request) (*proxyRequest, error) { ...@@ -57,13 +56,7 @@ func parseRequest(request *http.Request) (*proxyRequest, error) {
return nil, fmt.Errorf("Invalid request path '%s'", path) return nil, fmt.Errorf("Invalid request path '%s'", path)
} }
peerID, err := peer.IDB58Decode(split[3]) return &proxyRequest{split[3], protocol.ID(split[4]), "/" + split[5]}, nil
if err != nil {
return nil, err
}
return &proxyRequest{peerID, split[4], "/" + split[5]}, nil
} }
func handleError(w http.ResponseWriter, msg string, err error, code int) { func handleError(w http.ResponseWriter, msg string, err error, code int) {
...@@ -71,62 +64,3 @@ func handleError(w http.ResponseWriter, msg string, err error, code int) { ...@@ -71,62 +64,3 @@ func handleError(w http.ResponseWriter, msg string, err error, code int) {
fmt.Fprintf(w, "%s: %s\n", msg, err) fmt.Fprintf(w, "%s: %s\n", msg, err)
log.Warningf("server error: %s: %s", err) log.Warningf("server error: %s: %s", err)
} }
func newReverseHTTPProxy(req *proxyRequest, streamToPeer inet.Stream) *httputil.ReverseProxy {
director := func(r *http.Request) {
r.URL.Path = req.httpPath //the scheme etc. doesn't matter
}
return &httputil.ReverseProxy{
Director: director,
Transport: &roundTripper{streamToPeer}}
}
type roundTripper struct {
stream inet.Stream
}
// we wrap the response body and close the stream
// only when it's closed.
type respBody struct {
io.ReadCloser
stream inet.Stream
}
// Closes the response's body and the connection.
func (rb *respBody) Close() error {
if err := rb.stream.Close(); err != nil {
rb.stream.Reset()
} else {
go inet.AwaitEOF(rb.stream)
}
return rb.ReadCloser.Close()
}
func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
sendRequest := func() {
err := req.Write(rt.stream)
if err != nil {
rt.stream.Close()
}
if req.Body != nil {
req.Body.Close()
}
}
//send request while reading response
go sendRequest()
s := bufio.NewReader(rt.stream)
resp, err := http.ReadResponse(s, req)
if err != nil {
return resp, err
}
resp.Body = &respBody{
ReadCloser: resp.Body,
stream: rt.stream,
}
return resp, nil
}
...@@ -592,6 +592,12 @@ ...@@ -592,6 +592,12 @@
"hash": "QmTqLBwme9BusYWdACqL62NFb8WV2Q72gXLsQVfC7vmCr4", "hash": "QmTqLBwme9BusYWdACqL62NFb8WV2Q72gXLsQVfC7vmCr4",
"name": "iptb-plugins", "name": "iptb-plugins",
"version": "1.0.5" "version": "1.0.5"
},
{
"author": "hsanjuan",
"hash": "QmcLYfmHLsaVRKGMZQovwEYhHAjWtRjg1Lij3pnzw5UkRD",
"name": "go-libp2p-http",
"version": "1.1.8"
} }
], ],
"gxVersion": "0.10.0", "gxVersion": "0.10.0",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论