mirror of
https://github.com/ultravioletrs/cocos.git
synced 2026-06-23 04:10:25 +00:00
a3265bc346
* feat: Introduce computation runner, log forwarder, ingress, and egress proxy services. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: Update Go environment variable parsing and build system to use new architecture and repository. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: Update package sources to `sammyoina/cocos-ai` at a specific commit, add log-forwarder pre-start hook, and rename proxy binaries. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * chore: Update build system references to a specific commit and enhance logging for service connections and message processing. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * build: Update package source repositories and versions, migrate client logging to slog, and adjust ingress/egress proxy build and install steps. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * debug stuck Signed-off-by: Sammy Oina <sammyoina@gmail.com> * debug Signed-off-by: Sammy Oina <sammyoina@gmail.com> * debug Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: add HTTP/2 support to egress proxy and update build system to use specific commit hashes Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: enhance egress proxy CONNECT handling, update package sources, and add gRPC test utility Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: Update build system for various services to a specific commit from a new repository, change agent gRPC port to 7001, and add a gRPC test client. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: Migrate agent-internal gRPC communication to Unix sockets, set ingress proxy to port 7002, and update build hashes. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * refactor: Remove standalone ingress-proxy systemd service and update component versions. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * fix: Prevent computation re-initialization in agent and update component versions across several packages. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: update package versions and enable h2c support in ingress proxy. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: refactor ingress proxy to support HTTP/2 over Unix sockets and update component versions. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: Update build system package sources to `ultravioletrs/cocos` and reduce agent logging verbosity. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * refactor: improve error handling in proxy commands and remove unused gRPC test Signed-off-by: Sammy Oina <sammyoina@gmail.com> * test: add mock service state return value in handleRunReqChunks test Signed-off-by: Sammy Oina <sammyoina@gmail.com> * feat: add comprehensive tests for service and proxy components Signed-off-by: Sammy Oina <sammyoina@gmail.com> * fix linter Signed-off-by: Sammy Oina <sammyoina@gmail.com> * improve coverage Signed-off-by: Sammy Oina <sammyoina@gmail.com> * test: add gRPC client and ingress adapter tests, and update egress proxy tests. Signed-off-by: Sammy Oina <sammyoina@gmail.com> * improve coverage Signed-off-by: Sammy Oina <sammyoina@gmail.com> --------- Signed-off-by: Sammy Oina <sammyoina@gmail.com>
796 lines
22 KiB
Go
796 lines
22 KiB
Go
// Copyright (c) Ultraviolet
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
package egress
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestProxyHTTP(t *testing.T) {
|
|
// 1. Start a backend server
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte("backend response")); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
// 2. Start Proxy
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
proxy := NewProxy(logger, ":0")
|
|
|
|
// Listen on a random port
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
// waiting for server start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// 3. Make request via proxy
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
}
|
|
|
|
resp, err := client.Get(backend.URL)
|
|
require.NoError(t, err)
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "backend response", string(body))
|
|
assert.Equal(t, http.StatusOK, resp.StatusCode)
|
|
}
|
|
|
|
func TestProxyConnect(t *testing.T) {
|
|
// 1. Start a backend TLS server
|
|
backend := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte("secure backend response")); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
// 2. Start Proxy
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
// Listen on a random port
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// 3. Configure client to use proxy
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTPS_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTPS_PROXY")
|
|
|
|
client := backend.Client() // This client trusts the test cert
|
|
// But we need to update its transport proxy
|
|
if transport, ok := client.Transport.(*http.Transport); ok {
|
|
transport.Proxy = http.ProxyFromEnvironment
|
|
} else {
|
|
// Create new transport if needed, but backend.Client() returns transport with TLS config
|
|
tr := &http.Transport{
|
|
TLSClientConfig: client.Transport.(*http.Transport).TLSClientConfig,
|
|
Proxy: http.ProxyFromEnvironment,
|
|
}
|
|
client.Transport = tr
|
|
}
|
|
|
|
resp, err := client.Get(backend.URL)
|
|
require.NoError(t, err)
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "secure backend response", string(body))
|
|
}
|
|
|
|
// TestProxyHTTP2 tests HTTP/2 requests through the proxy.
|
|
func TestProxyHTTP2(t *testing.T) {
|
|
// 1. Start a backend server
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte("http2 response")); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
// 2. Start Proxy
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// 3. Make HTTP/2 request via proxy
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
ForceAttemptHTTP2: true,
|
|
},
|
|
}
|
|
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
// This will be an HTTP/1.1 request unless explicitly configured for HTTP/2
|
|
resp, err := client.Get(backend.URL)
|
|
if err == nil {
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
assert.Equal(t, "http2 response", string(body))
|
|
}
|
|
}
|
|
|
|
// TestProxyHeaderHandling tests that headers are properly handled.
|
|
func TestProxyHeaderHandling(t *testing.T) {
|
|
// Start a backend server that echoes headers
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("X-Custom-Header", "custom-value")
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte(r.Header.Get("X-Request-Header"))); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
// Start Proxy
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Make request with custom headers
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
req, _ := http.NewRequest("GET", backend.URL, nil)
|
|
req.Header.Set("X-Request-Header", "request-value")
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
if err == nil {
|
|
defer resp.Body.Close()
|
|
assert.Equal(t, "custom-value", resp.Header.Get("X-Custom-Header"))
|
|
}
|
|
}
|
|
|
|
// TestProxyWithDifferentMethods tests different HTTP methods.
|
|
func TestProxyWithDifferentMethods(t *testing.T) {
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte(r.Method)); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
methods := []string{"GET", "POST", "PUT", "DELETE"}
|
|
for _, method := range methods {
|
|
req, _ := http.NewRequest(method, backend.URL, nil)
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
if err == nil {
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
assert.Equal(t, method, string(body))
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestProxyErrorHandling tests error handling in the proxy.
|
|
func TestProxyErrorHandling(t *testing.T) {
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Try to connect to a non-existent backend
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
}
|
|
|
|
// This should fail because the backend doesn't exist
|
|
resp, err := client.Get("http://nonexistent.example.com:99999")
|
|
if err != nil {
|
|
return
|
|
}
|
|
if resp != nil {
|
|
defer resp.Body.Close()
|
|
// Status should be error
|
|
assert.NotEqual(t, http.StatusOK, resp.StatusCode)
|
|
}
|
|
}
|
|
|
|
// TestProxyWithLargeBody tests proxy with large response body.
|
|
func TestProxyWithLargeBody(t *testing.T) {
|
|
largeBody := make([]byte, 1024*1024) // 1MB
|
|
for i := range largeBody {
|
|
largeBody[i] = byte(i % 256)
|
|
}
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write(largeBody); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
}
|
|
|
|
resp, err := client.Get(backend.URL)
|
|
require.NoError(t, err)
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, len(largeBody), len(body))
|
|
}
|
|
|
|
// TestCopyHeader tests the copyHeader utility function.
|
|
func TestCopyHeader(t *testing.T) {
|
|
src := http.Header{}
|
|
src.Add("X-Custom-Header", "value1")
|
|
src.Add("X-Custom-Header", "value2")
|
|
src.Add("Content-Type", "application/json")
|
|
|
|
dst := http.Header{}
|
|
copyHeader(dst, src)
|
|
|
|
assert.Equal(t, []string{"value1", "value2"}, dst["X-Custom-Header"])
|
|
assert.Equal(t, []string{"application/json"}, dst["Content-Type"])
|
|
}
|
|
|
|
// TestDelHopHeaders tests the delHopHeaders utility function.
|
|
func TestDelHopHeaders(t *testing.T) {
|
|
header := http.Header{}
|
|
header.Set("Connection", "keep-alive")
|
|
header.Set("Keep-Alive", "timeout=5")
|
|
header.Set("Proxy-Authenticate", "Basic")
|
|
header.Set("Proxy-Authorization", "Bearer token")
|
|
header.Set("Te", "trailers")
|
|
header.Set("Trailers", "X-Custom")
|
|
header.Set("Transfer-Encoding", "chunked")
|
|
header.Set("Upgrade", "websocket")
|
|
header.Set("X-Custom-Header", "should-remain")
|
|
|
|
delHopHeaders(header)
|
|
|
|
// Hop-by-hop headers should be removed
|
|
assert.Empty(t, header.Get("Connection"))
|
|
assert.Empty(t, header.Get("Keep-Alive"))
|
|
assert.Empty(t, header.Get("Proxy-Authenticate"))
|
|
assert.Empty(t, header.Get("Proxy-Authorization"))
|
|
assert.Empty(t, header.Get("Te"))
|
|
assert.Empty(t, header.Get("Trailers"))
|
|
assert.Empty(t, header.Get("Transfer-Encoding"))
|
|
assert.Empty(t, header.Get("Upgrade"))
|
|
|
|
// Custom headers should remain
|
|
assert.Equal(t, "should-remain", header.Get("X-Custom-Header"))
|
|
}
|
|
|
|
// TestProxyConnectDialTimeout tests CONNECT with dial timeout.
|
|
func TestProxyConnectDialTimeout(t *testing.T) {
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Try to CONNECT to a non-routable address (should timeout)
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTPS_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTPS_PROXY")
|
|
|
|
// Create a client with very short timeout
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
Timeout: 2 * time.Second,
|
|
}
|
|
|
|
// This should fail because 192.0.2.1 is a TEST-NET address (non-routable)
|
|
_, err = client.Get("https://192.0.2.1:9999/test")
|
|
assert.Error(t, err)
|
|
}
|
|
|
|
// TestProxyHTTPWithRedirect tests HTTP proxy handling redirects.
|
|
func TestProxyHTTPWithRedirect(t *testing.T) {
|
|
// Create a backend that redirects
|
|
redirectCount := 0
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if redirectCount == 0 {
|
|
redirectCount++
|
|
http.Redirect(w, r, "/redirected", http.StatusFound)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte("redirected response")); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
}
|
|
|
|
resp, err := client.Get(backend.URL)
|
|
require.NoError(t, err)
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "redirected response", string(body))
|
|
}
|
|
|
|
// TestProxyStopContext tests proxy stop with context.
|
|
func TestProxyStopContext(t *testing.T) {
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
err = proxy.Stop(ctx)
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
// TestProxyPipeWithRealConnections tests the pipe function with real TCP connections.
|
|
func TestProxyPipeWithRealConnections(t *testing.T) {
|
|
// Create two connected TCP connections
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
defer listener.Close()
|
|
|
|
// Channel to receive the server connection
|
|
serverConnChan := make(chan net.Conn, 1)
|
|
go func() {
|
|
conn, err := listener.Accept()
|
|
if err == nil {
|
|
serverConnChan <- conn
|
|
}
|
|
}()
|
|
|
|
// Create client connection
|
|
clientConn, err := net.Dial("tcp", listener.Addr().String())
|
|
require.NoError(t, err)
|
|
defer clientConn.Close()
|
|
|
|
// Get server connection
|
|
serverConn := <-serverConnChan
|
|
defer serverConn.Close()
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
proxy := NewProxy(logger, ":0")
|
|
|
|
// Test data transfer
|
|
testData := []byte("test data for pipe")
|
|
|
|
// Start pipe in goroutine
|
|
go proxy.pipe(clientConn, serverConn)
|
|
|
|
// Write from client
|
|
_, err = clientConn.Write(testData)
|
|
require.NoError(t, err)
|
|
|
|
// Read from server
|
|
buf := make([]byte, len(testData))
|
|
if err := serverConn.SetReadDeadline(time.Now().Add(1 * time.Second)); err != nil {
|
|
t.Logf("Failed to set read deadline: %v", err)
|
|
}
|
|
n, err := serverConn.Read(buf)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, testData, buf[:n])
|
|
|
|
// Close connections to trigger pipe completion
|
|
clientConn.Close()
|
|
serverConn.Close()
|
|
|
|
// Give pipe time to complete
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
// TestProxyHTTP2ErrorPath tests HTTP/2 error handler.
|
|
func TestProxyHTTP2ErrorPath(t *testing.T) {
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Create a request that will trigger HTTP/2 handling
|
|
req, err := http.NewRequest("GET", "http://"+ln.Addr().String()+"/test", nil)
|
|
require.NoError(t, err)
|
|
|
|
// Force HTTP/2 by setting the request protocol
|
|
req.ProtoMajor = 2
|
|
req.ProtoMinor = 0
|
|
req.Host = "nonexistent.invalid:9999" // This should cause an error
|
|
|
|
// Create a response recorder
|
|
rr := httptest.NewRecorder()
|
|
|
|
// Call the handler directly to test HTTP/2 error path
|
|
proxy.server.Handler.ServeHTTP(rr, req)
|
|
|
|
// Should get an error response
|
|
assert.Equal(t, http.StatusBadGateway, rr.Code)
|
|
}
|
|
|
|
// TestNewProxyHTTP2ConfigWarning tests NewProxy when HTTP/2 configuration might fail.
|
|
func TestNewProxyHTTP2ConfigWarning(t *testing.T) {
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
|
|
// Create proxy - HTTP/2 configuration should succeed normally
|
|
proxy := NewProxy(logger, ":0")
|
|
|
|
assert.NotNil(t, proxy)
|
|
assert.NotNil(t, proxy.transport)
|
|
assert.True(t, proxy.transport.ForceAttemptHTTP2)
|
|
}
|
|
|
|
// TestProxyHandleHTTPError tests HTTP handler error path.
|
|
func TestProxyHandleHTTPError(t *testing.T) {
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTP_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTP_PROXY")
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
},
|
|
Timeout: 2 * time.Second,
|
|
}
|
|
|
|
// Try to connect to invalid backend
|
|
resp, err := client.Get("http://invalid.backend.test:99999/test")
|
|
if err == nil {
|
|
defer resp.Body.Close()
|
|
// Should get error status
|
|
assert.NotEqual(t, http.StatusOK, resp.StatusCode)
|
|
}
|
|
// Either error or bad gateway response is acceptable
|
|
}
|
|
|
|
// TestProxyConnectWriteError tests CONNECT with write error after hijacking.
|
|
func TestProxyConnectWriteError(t *testing.T) {
|
|
// This test is challenging because we need to trigger a write error
|
|
// after successful hijacking. We'll test the path by using a closed connection.
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Create a backend server for CONNECT
|
|
backend := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer backend.Close()
|
|
|
|
proxyURL := fmt.Sprintf("http://%s", ln.Addr().String())
|
|
os.Setenv("HTTPS_PROXY", proxyURL)
|
|
defer os.Unsetenv("HTTPS_PROXY")
|
|
|
|
client := backend.Client()
|
|
if transport, ok := client.Transport.(*http.Transport); ok {
|
|
transport.Proxy = http.ProxyFromEnvironment
|
|
}
|
|
|
|
// Make a request through CONNECT
|
|
_, err = client.Get(backend.URL)
|
|
// The request may succeed or fail, but we're testing the code path
|
|
if err != nil {
|
|
t.Logf("Request error (expected in some cases): %v", err)
|
|
}
|
|
}
|
|
|
|
// TestProxyHTTP2WithAbsoluteURL tests HTTP/2 handling with absolute URL.
|
|
func TestProxyHTTP2WithAbsoluteURL(t *testing.T) {
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
if _, err := w.Write([]byte("http2 absolute url response")); err != nil {
|
|
t.Logf("Failed to write response: %v", err)
|
|
}
|
|
}))
|
|
defer backend.Close()
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
ln, err := net.Listen("tcp", ":0")
|
|
require.NoError(t, err)
|
|
|
|
proxy := NewProxy(logger, ln.Addr().String())
|
|
proxy.server.Addr = ln.Addr().String()
|
|
|
|
go func() {
|
|
if err := proxy.server.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
t.Logf("Proxy server error: %v", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err := proxy.Stop(context.Background()); err != nil {
|
|
t.Logf("Failed to stop proxy: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Create request with absolute URL
|
|
req, err := http.NewRequest("GET", backend.URL+"/test", nil)
|
|
require.NoError(t, err)
|
|
req.ProtoMajor = 2
|
|
req.ProtoMinor = 0
|
|
|
|
rr := httptest.NewRecorder()
|
|
proxy.server.Handler.ServeHTTP(rr, req)
|
|
|
|
assert.Equal(t, http.StatusOK, rr.Code)
|
|
}
|