mirror of
https://github.com/cloudflare/cloudflared.git
synced 2026-06-23 04:10:20 +00:00
chore: Fix warnings
Fixing warnings in cloudflared before making any further changes.
This commit is contained in:
@@ -17,8 +17,7 @@ import (
|
|||||||
// Websocket is used to carry data via WS binary frames over the tunnel from client to the origin
|
// Websocket is used to carry data via WS binary frames over the tunnel from client to the origin
|
||||||
// This implements the functions for glider proxy (sock5) and the carrier interface
|
// This implements the functions for glider proxy (sock5) and the carrier interface
|
||||||
type Websocket struct {
|
type Websocket struct {
|
||||||
log *zerolog.Logger
|
log *zerolog.Logger
|
||||||
isSocks bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWSConnection returns a new connection object
|
// NewWSConnection returns a new connection object
|
||||||
@@ -36,7 +35,7 @@ func (ws *Websocket) ServeStream(options *StartOptions, conn io.ReadWriter) erro
|
|||||||
ws.log.Err(err).Str(LogFieldOriginURL, options.OriginURL).Msg("failed to connect to origin")
|
ws.log.Err(err).Str(LogFieldOriginURL, options.OriginURL).Msg("failed to connect to origin")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer wsConn.Close()
|
defer func() { _ = wsConn.Close() }()
|
||||||
|
|
||||||
stream.Pipe(wsConn, conn, ws.log)
|
stream.Pipe(wsConn, conn, ws.log)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -146,8 +146,8 @@ func wsEchoEndpoint(w ResponseWriter, r *http.Request) error {
|
|||||||
case <-wsCtx.Done():
|
case <-wsCtx.Done():
|
||||||
case <-r.Context().Done():
|
case <-r.Context().Done():
|
||||||
}
|
}
|
||||||
readPipe.Close()
|
_ = readPipe.Close()
|
||||||
writePipe.Close()
|
_ = writePipe.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
originConn := &echoPipe{reader: readPipe, writer: writePipe}
|
originConn := &echoPipe{reader: readPipe, writer: writePipe}
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func (tc *tcpConnection) Stream(_ context.Context, tunnelConn io.ReadWriter, _ *
|
|||||||
|
|
||||||
func (tc *tcpConnection) Write(b []byte) (int, error) {
|
func (tc *tcpConnection) Write(b []byte) (int, error) {
|
||||||
if tc.writeTimeout > 0 {
|
if tc.writeTimeout > 0 {
|
||||||
if err := tc.Conn.SetWriteDeadline(time.Now().Add(tc.writeTimeout)); err != nil {
|
if err := tc.SetWriteDeadline(time.Now().Add(tc.writeTimeout)); err != nil {
|
||||||
tc.logger.Err(err).Msg("Error setting write deadline for TCP connection")
|
tc.logger.Err(err).Msg("Error setting write deadline for TCP connection")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gobwas/ws/wsutil"
|
"github.com/gobwas/ws/wsutil"
|
||||||
gorillaWS "github.com/gorilla/websocket"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/net/proxy"
|
"golang.org/x/net/proxy"
|
||||||
@@ -61,7 +60,7 @@ func TestStreamTCPConnection(t *testing.T) {
|
|||||||
})
|
})
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
echoTCPOrigin(t, originConn)
|
echoTCPOrigin(t, originConn)
|
||||||
originConn.Close()
|
_ = originConn.Close()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -88,7 +87,7 @@ func TestDefaultStreamWSOverTCPConnection(t *testing.T) {
|
|||||||
})
|
})
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
echoTCPOrigin(t, originConn)
|
echoTCPOrigin(t, originConn)
|
||||||
originConn.Close()
|
_ = originConn.Close()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -117,14 +116,14 @@ func TestSocksStreamWSOverTCPConnection(t *testing.T) {
|
|||||||
for _, status := range statusCodes {
|
for _, status := range statusCodes {
|
||||||
handler := func(w http.ResponseWriter, r *http.Request) {
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
require.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
require.Equal(t, []byte(sendMessage), body)
|
assert.Equal(t, []byte(sendMessage), body)
|
||||||
|
|
||||||
require.Equal(t, echoHeaderIncomingValue, r.Header.Get(echoHeaderName))
|
assert.Equal(t, echoHeaderIncomingValue, r.Header.Get(echoHeaderName))
|
||||||
w.Header().Set(echoHeaderName, echoHeaderReturnValue)
|
w.Header().Set(echoHeaderName, echoHeaderReturnValue)
|
||||||
|
|
||||||
w.WriteHeader(status)
|
w.WriteHeader(status)
|
||||||
w.Write([]byte(echoMessage))
|
_, _ = w.Write([]byte(echoMessage))
|
||||||
}
|
}
|
||||||
origin := httptest.NewServer(http.HandlerFunc(handler))
|
origin := httptest.NewServer(http.HandlerFunc(handler))
|
||||||
defer origin.Close()
|
defer origin.Close()
|
||||||
@@ -156,7 +155,7 @@ func TestSocksStreamWSOverTCPConnection(t *testing.T) {
|
|||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
wsForwarderInConn, err := wsForwarderListener.Accept()
|
wsForwarderInConn, err := wsForwarderListener.Accept()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer wsForwarderInConn.Close()
|
defer func() { _ = wsForwarderInConn.Close() }()
|
||||||
|
|
||||||
stream.Pipe(wsForwarderInConn, &wsEyeball{wsForwarderOutConn}, TestLogger)
|
stream.Pipe(wsForwarderInConn, &wsEyeball{wsForwarderOutConn}, TestLogger)
|
||||||
return nil
|
return nil
|
||||||
@@ -171,20 +170,22 @@ func TestSocksStreamWSOverTCPConnection(t *testing.T) {
|
|||||||
|
|
||||||
// Request URL doesn't matter because the transport is using eyeballDialer to connectq
|
// Request URL doesn't matter because the transport is using eyeballDialer to connectq
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", "http://test-socks-stream.com", bytes.NewBuffer([]byte(sendMessage)))
|
req, err := http.NewRequestWithContext(ctx, "GET", "http://test-socks-stream.com", bytes.NewBuffer([]byte(sendMessage)))
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = req.Body.Close() }()
|
||||||
req.Header.Set(echoHeaderName, echoHeaderIncomingValue)
|
req.Header.Set(echoHeaderName, echoHeaderIncomingValue)
|
||||||
|
|
||||||
resp, err := transport.RoundTrip(req)
|
resp, err := transport.RoundTrip(req)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
assert.Equal(t, status, resp.StatusCode)
|
assert.Equal(t, status, resp.StatusCode)
|
||||||
require.Equal(t, echoHeaderReturnValue, resp.Header.Get(echoHeaderName))
|
require.Equal(t, echoHeaderReturnValue, resp.Header.Get(echoHeaderName))
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, []byte(echoMessage), body)
|
require.Equal(t, []byte(echoMessage), body)
|
||||||
|
|
||||||
wsForwarderOutConn.Close()
|
_ = wsForwarderOutConn.Close()
|
||||||
edgeConn.Close()
|
_ = edgeConn.Close()
|
||||||
tcpOverWSConn.Close()
|
_ = tcpOverWSConn.Close()
|
||||||
|
|
||||||
require.NoError(t, errGroup.Wait())
|
require.NoError(t, errGroup.Wait())
|
||||||
}
|
}
|
||||||
@@ -205,7 +206,7 @@ func TestWsConnReturnsBeforeStreamReturns(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
time.Sleep(time.Millisecond * 10)
|
time.Sleep(time.Millisecond * 10)
|
||||||
// Simulate losing connection to origin
|
// Simulate losing connection to origin
|
||||||
originConn.Close()
|
_ = originConn.Close()
|
||||||
}()
|
}()
|
||||||
ctx := context.WithValue(r.Context(), websocket.PingPeriodContextKey, time.Microsecond)
|
ctx := context.WithValue(r.Context(), websocket.PingPeriodContextKey, time.Microsecond)
|
||||||
tcpOverWSConn.Stream(ctx, eyeballConn, TestLogger)
|
tcpOverWSConn.Stream(ctx, eyeballConn, TestLogger)
|
||||||
@@ -221,11 +222,13 @@ func TestWsConnReturnsBeforeStreamReturns(t *testing.T) {
|
|||||||
for i := 0; i < 50; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
eyeballConn, edgeConn := net.Pipe()
|
eyeballConn, edgeConn := net.Pipe()
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodConnect, server.URL, edgeConn)
|
req, err := http.NewRequestWithContext(ctx, http.MethodConnect, server.URL, edgeConn)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer func() { _ = req.Body.Close() }()
|
||||||
|
|
||||||
resp, err := client.Transport.RoundTrip(req)
|
resp, err := client.Transport.RoundTrip(req)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, resp.StatusCode, http.StatusOK)
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
for {
|
for {
|
||||||
@@ -261,60 +264,18 @@ func echoWSEyeball(t *testing.T, conn net.Conn) {
|
|||||||
assert.NoError(t, conn.Close())
|
assert.NoError(t, conn.Close())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if !assert.NoError(t, wsutil.WriteClientBinary(conn, testMessage)) {
|
require.NoError(t, wsutil.WriteClientBinary(conn, testMessage))
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
readMsg, err := wsutil.ReadServerBinary(conn)
|
readMsg, err := wsutil.ReadServerBinary(conn)
|
||||||
if !assert.NoError(t, err) {
|
require.NoError(t, err)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, testResponse, readMsg)
|
assert.Equal(t, testResponse, readMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func echoWSOrigin(t *testing.T, expectMessages bool) *httptest.Server {
|
|
||||||
var upgrader = gorillaWS.Upgrader{
|
|
||||||
ReadBufferSize: 10,
|
|
||||||
WriteBufferSize: 10,
|
|
||||||
}
|
|
||||||
|
|
||||||
ws := func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
header := make(http.Header)
|
|
||||||
for k, vs := range r.Header {
|
|
||||||
if k == "Test-Cloudflared-Echo" {
|
|
||||||
header[k] = vs
|
|
||||||
}
|
|
||||||
}
|
|
||||||
conn, err := upgrader.Upgrade(w, r, header)
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
sawMessage := false
|
|
||||||
for {
|
|
||||||
messageType, p, err := conn.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
if expectMessages && !sawMessage {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
assert.Equal(t, testMessage, p)
|
|
||||||
sawMessage = true
|
|
||||||
if err := conn.WriteMessage(messageType, testResponse); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTLSServer starts the server in another thread
|
|
||||||
return httptest.NewTLSServer(http.HandlerFunc(ws))
|
|
||||||
}
|
|
||||||
|
|
||||||
func echoTCPOrigin(t *testing.T, conn net.Conn) {
|
func echoTCPOrigin(t *testing.T, conn net.Conn) {
|
||||||
readBuffer := make([]byte, len(testMessage))
|
readBuffer := make([]byte, len(testMessage))
|
||||||
_, err := conn.Read(readBuffer)
|
_, err := conn.Read(readBuffer)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, testMessage, readBuffer)
|
assert.Equal(t, testMessage, readBuffer)
|
||||||
|
|
||||||
|
|||||||
+8
-8
@@ -137,7 +137,7 @@ func (p *Proxy) ProxyHTTP(
|
|||||||
p.proxyLocalRequest(originProxy, w, req, isWebsocket)
|
p.proxyLocalRequest(originProxy, w, req, isWebsocket)
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unrecognized service: %s, %t", rule.Service, originProxy)
|
return fmt.Errorf("unrecognized service: %s, %t", rule.Service, originProxy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -193,7 +193,7 @@ func (p *Proxy) proxyHTTPRequest(
|
|||||||
) error {
|
) error {
|
||||||
roundTripReq := tr.Request
|
roundTripReq := tr.Request
|
||||||
if isWebsocket {
|
if isWebsocket {
|
||||||
roundTripReq = tr.Clone(tr.Request.Context())
|
roundTripReq = tr.Clone(tr.Context())
|
||||||
roundTripReq.Header.Set("Connection", "Upgrade")
|
roundTripReq.Header.Set("Connection", "Upgrade")
|
||||||
roundTripReq.Header.Set("Upgrade", "websocket")
|
roundTripReq.Header.Set("Upgrade", "websocket")
|
||||||
roundTripReq.Header.Set("Sec-Websocket-Version", "13")
|
roundTripReq.Header.Set("Sec-Websocket-Version", "13")
|
||||||
@@ -203,7 +203,7 @@ func (p *Proxy) proxyHTTPRequest(
|
|||||||
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
|
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
|
||||||
if disableChunkedEncoding {
|
if disableChunkedEncoding {
|
||||||
roundTripReq.TransferEncoding = []string{"gzip", "deflate"}
|
roundTripReq.TransferEncoding = []string{"gzip", "deflate"}
|
||||||
cLength, err := strconv.Atoi(tr.Request.Header.Get("Content-Length"))
|
cLength, err := strconv.Atoi(tr.Header.Get("Content-Length"))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
roundTripReq.ContentLength = int64(cLength)
|
roundTripReq.ContentLength = int64(cLength)
|
||||||
}
|
}
|
||||||
@@ -228,7 +228,7 @@ func (p *Proxy) proxyHTTPRequest(
|
|||||||
}
|
}
|
||||||
|
|
||||||
tracing.EndWithStatusCode(ttfbSpan, resp.StatusCode)
|
tracing.EndWithStatusCode(ttfbSpan, resp.StatusCode)
|
||||||
defer resp.Body.Close()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
headers := make(http.Header, len(resp.Header))
|
headers := make(http.Header, len(resp.Header))
|
||||||
// copy headers
|
// copy headers
|
||||||
@@ -249,11 +249,11 @@ func (p *Proxy) proxyHTTPRequest(
|
|||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("internal error: unsupported connection type")
|
return errors.New("internal error: unsupported connection type")
|
||||||
}
|
}
|
||||||
defer rwc.Close()
|
defer func() { _ = rwc.Close() }()
|
||||||
|
|
||||||
eyeballStream := &bidirectionalStream{
|
eyeballStream := &bidirectionalStream{
|
||||||
writer: w,
|
writer: w,
|
||||||
reader: tr.Request.Body,
|
reader: tr.Body,
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.Pipe(eyeballStream, rwc, logger)
|
stream.Pipe(eyeballStream, rwc, logger)
|
||||||
@@ -292,7 +292,7 @@ func (p *Proxy) proxyStream(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
connectSpan.End()
|
connectSpan.End()
|
||||||
defer originConn.Close()
|
defer func() { _ = originConn.Close() }()
|
||||||
logger.Debug().Msg("origin connection established")
|
logger.Debug().Msg("origin connection established")
|
||||||
|
|
||||||
encodedSpans := tr.GetSpans()
|
encodedSpans := tr.GetSpans()
|
||||||
@@ -331,7 +331,7 @@ func (p *Proxy) proxyTCPStream(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
connectSpan.End()
|
connectSpan.End()
|
||||||
defer originConn.Close()
|
defer func() { _ = originConn.Close() }()
|
||||||
logger.Debug().Msg("origin connection established")
|
logger.Debug().Msg("origin connection established")
|
||||||
|
|
||||||
encodedSpans := tr.GetSpans()
|
encodedSpans := tr.GetSpans()
|
||||||
|
|||||||
+4
-5
@@ -67,7 +67,6 @@ func (s *bidirectionalStreamStatus) wait(maxWaitForSecondStream time.Duration) e
|
|||||||
|
|
||||||
// Only wait for second stream to finish if maxWait is greater than zero
|
// Only wait for second stream to finish if maxWait is greater than zero
|
||||||
if maxWaitForSecondStream > 0 {
|
if maxWaitForSecondStream > 0 {
|
||||||
|
|
||||||
timer := time.NewTimer(maxWaitForSecondStream)
|
timer := time.NewTimer(maxWaitForSecondStream)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
@@ -87,14 +86,14 @@ func (s *bidirectionalStreamStatus) isAnyDone() bool {
|
|||||||
|
|
||||||
// Pipe copies copy data to & from provided io.ReadWriters.
|
// Pipe copies copy data to & from provided io.ReadWriters.
|
||||||
func Pipe(tunnelConn, originConn io.ReadWriter, log *zerolog.Logger) {
|
func Pipe(tunnelConn, originConn io.ReadWriter, log *zerolog.Logger) {
|
||||||
PipeBidirectional(NopCloseWriterAdapter(tunnelConn), NopCloseWriterAdapter(originConn), 0, log)
|
_ = PipeBidirectional(NopCloseWriterAdapter(tunnelConn), NopCloseWriterAdapter(originConn), 0, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PipeBidirectional copies data two BidirectionStreams. It is a special case of Pipe where it receives a concept that allows for Read and Write side to be closed independently.
|
// PipeBidirectional copies data to two unidirectional streams. It is a special case of Pipe where it receives a concept that allows for Read and Write side to be closed independently.
|
||||||
// The main difference is that when piping data from a reader to a writer, if EOF is read, then this implementation propagates the EOF signal to the destination/writer by closing the write side of the
|
// The main difference is that when piping data from a reader to a writer, if EOF is read, then this implementation propagates the EOF signal to the destination/writer by closing the write side of the
|
||||||
// Bidirectional Stream.
|
// Bidirectional Stream.
|
||||||
// Finally, depending on once EOF is ready from one of the provided streams, the other direction of streaming data will have a configured time period to also finish, otherwise,
|
// Finally, depending on once EOF is ready from one of the provided streams, the other direction of streaming data will have a configured time period to also finish, otherwise,
|
||||||
// the method will return immediately with a timeout error. It is however, the responsability of the caller to close the associated streams in both ends in order to free all the resources/go-routines.
|
// the method will return immediately with a timeout error. It is however, the responsibility of the caller to close the associated streams in both ends in order to free all the resources/go-routines.
|
||||||
func PipeBidirectional(downstream, upstream Stream, maxWaitForSecondStream time.Duration, log *zerolog.Logger) error {
|
func PipeBidirectional(downstream, upstream Stream, maxWaitForSecondStream time.Duration, log *zerolog.Logger) error {
|
||||||
status := newBiStreamStatus()
|
status := newBiStreamStatus()
|
||||||
|
|
||||||
@@ -129,7 +128,7 @@ func unidirectionalStream(dst WriterCloser, src Reader, dir string, status *bidi
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
defer dst.CloseWrite()
|
defer func() { _ = dst.CloseWrite() }()
|
||||||
|
|
||||||
_, err := copyData(dst, src, dir)
|
_, err := copyData(dst, src, dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user