Files
cloudflared/component-tests/test_prechecks.py
T
Miguel da Costa Martins Marcelino 4177dd6936 TUN-10391: Avoid using fmt.Println
Avoid using fmt.Println and instead switch to logging pre-checks with the provided logger.
2026-05-26 22:04:54 +00:00

542 lines
23 KiB
Python

#!/usr/bin/env python
"""
Integration tests for cloudflared connectivity pre-checks (TUN-10391).
Scope
-----
These tests verify the end-to-end behavior of cloudflared pre-checks:
- that the human-readable table written to the log output has the correct
structure and content,
- that structured JSON log lines are emitted with the expected fields, and
- that running the `diag` subcommand against a live tunnel instance produces a
zip archive that contains prechecks.json.
They do NOT cover every failure mode of the precheck logic — those are owned
by the unit tests in prechecks/checker_test.go which use mock dialers.
At the integration level the only reliable way to induce specific failure modes
without real firewall intervention is:
- --edge <unreachable>: StaticEdgeDNSResolver resolves the literal IP
directly (DNS row = PASS), then both QUIC and HTTP/2 probes time out
-> hard fail (both transports blocked).
This does NOT exercise the DNS-failure -> transport-skip path.
DNS failure and Management API failure cannot be triggered via CLI flags alone;
they require network-level intervention outside the component-test harness.
stdout/stderr design
--------------------
The pre-checks table is emitted via cliutil.LogTable, which wraps the content
in an ASCII box and logs each line at Info level through zerolog. zerolog
writes to stderr, which the test harness merges into stdout (stderr=STDOUT in
Popen). We poll a --logfile for the "precheck complete" sentinel before
leaving the `with` block, ensuring the goroutine has finished. We then call
cfd.terminate(). After the `with` block exits, the process is dead and all
output has been captured by CloudflaredProcess's background reader thread. We
read the accumulated lines from cfd.stdout_lines.
Box format (cliutil.asciiBox with padding=2, title="CONNECTIVITY PRE-CHECKS"):
+----...----+
| CONNECTIVITY PRE-CHECKS | (centered title)
+----...----+
| COMPONENT TARGET ... | (content rows)
...
+----...----+
"""
import json
import os
import re
import subprocess
import time
import zipfile as zipfilemod
from constants import METRICS_PORT
from util import LOGGER, start_cloudflared, wait_tunnel_ready
# ASCII box constants (cliutil.asciiBox, padding=2, title="CONNECTIVITY PRE-CHECKS")
BOX_TITLE = "CONNECTIVITY PRE-CHECKS"
BOX_BORDER_RE = re.compile(r"^\+(-+)\+$", re.MULTILINE) # matches +----...----+
COL_HEADER = "COMPONENT" # first word of the column-header row
# zerolog console format: "2006-01-02T15:04:05Z LVL <message>"
_LOG_PREFIX_RE = re.compile(r"^\S+ \w+ ")
# Component names (probes.go: componentXxx)
COMP_DNS = "DNS Resolution"
COMP_QUIC = "UDP Connectivity"
COMP_H2 = "TCP Connectivity"
COMP_API = "Cloudflare API"
# Target labels used in the rendered table.
#
# probeRegion() (checker.go:216) always overwrites the Target field of
# whatever CheckResult the inner probe function returns with the regionTarget
# hostname, so QUIC and HTTP/2 rows carry the same region hostname as the
# corresponding DNS row — not the "Port 7844 (QUIC/HTTP2)" strings that
# targetPortQUIC/targetPortHTTP2 define. Those port-label constants are only
# used in the empty-addrs SKIP branch and inside action message strings.
TARGET_API = "api.cloudflare.com:443"
TARGET_REGION1 = "region1.v2.argotunnel.com"
TARGET_REGION2 = "region2.v2.argotunnel.com"
# Details strings (probes.go: detailsXxx)
DETAILS_DNS_RESOLVED = "DNS Resolved successfully"
DETAILS_QUIC_OK = "QUIC connection successful"
DETAILS_HTTP2_OK = "HTTP/2 connection successful"
DETAILS_API_OK = "API is reachable"
DETAILS_QUIC_FAIL = "QUIC connection failed"
DETAILS_HTTP2_FAIL = "HTTP/2 connection is blocked or unreachable"
# Status labels (result.go: xyzStatus)
PASS = "PASS"
FAIL = "FAIL"
SKIP = "SKIP"
# Action prefixes (result.go: renderActions)
PREFIX_ERROR = "ERROR: "
PREFIX_WARNING = "WARNING: "
# Action messages (probes.go: actionXxx)
ACTION_QUIC_BLOCKED = "Allow outbound QUIC traffic on port 7844 or use HTTP2."
ACTION_HTTP2_BLOCKED = "Allow outbound TCP on port 7844."
# Exact summary lines (result.go: summaryLine)
SUMMARY_HEALTHY = "SUMMARY: Environment is healthy. cloudflared will use 'quic' as primary protocol."
SUMMARY_CRITICAL = "SUMMARY: Environment has critical failures. cloudflared may not be able to establish a tunnel."
# structured log constants (result.go)
LOG_MSG_PRECHECK = "precheck"
LOG_MSG_PRECHECK_COMPLETE = "precheck complete"
STATUS_PASS_LOG = "pass"
UNREACHABLE_EDGE = "192.0.2.1:7844"
# cloudflared dial timeout per probe: 5 s, up to 2 retries -> ~15 s total.
PRECHECK_POLL_TIMEOUT_SECS = 15
PRECHECK_POLL_INTERVAL_SECS = 1
# ---------- helpers ----------
def _poll_log_file_for_precheck_complete(log_file: str, timeout: float) -> list[dict]:
"""
Poll a JSON log file until a 'precheck complete' line appears or timeout
expires. Returns all precheck-related log lines found.
cloudflared's --logfile writes one JSON object per line. Polling keeps
the test fast on healthy networks and still tolerates slow CI hosts.
We re-read from the beginning of the file on every poll because the file
is append-only, small, and tracking a byte offset would add complexity with
no meaningful performance benefit for a ~15 s total window.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
lines = _read_precheck_log_lines_from_file(log_file)
if any(l.get("message") == LOG_MSG_PRECHECK_COMPLETE for l in lines):
return lines
time.sleep(PRECHECK_POLL_INTERVAL_SECS)
return _read_precheck_log_lines_from_file(log_file)
def _read_precheck_log_lines_from_file(log_file: str) -> list[dict]:
"""Parse all precheck-related JSON log lines from a --logfile path."""
result = []
try:
with open(log_file, "r") as f:
for raw_line in f:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
obj = json.loads(raw_line)
except json.JSONDecodeError:
continue
msg = obj.get("message") or obj.get("msg", "")
if msg in (LOG_MSG_PRECHECK, LOG_MSG_PRECHECK_COMPLETE):
result.append(obj)
except FileNotFoundError:
pass
return result
# stdout table parse
class TableRow:
"""One data row parsed from the rendered precheck table."""
def __init__(self, component: str, target: str, status: str, details: str):
self.component = component
self.target = target
self.status = status
self.details = details
def __repr__(self):
return f"TableRow({self.component!r}, {self.target!r}, {self.status!r}, {self.details!r})"
def _strip_log_prefix(line: str) -> str:
"""Remove the zerolog console prefix ('2006-01-02T15:04:05Z LVL ') if present."""
return _LOG_PREFIX_RE.sub("", line, count=1)
def _unbox_line(line: str) -> str:
"""Strip the box border padding from a content line: '| text |' -> 'text'.
Accepts lines that may still carry a zerolog console prefix; the prefix is
removed before the box delimiters are stripped.
"""
msg = _strip_log_prefix(line)
if msg.startswith("|") and msg.endswith("|"):
return msg[1:-1].strip()
return msg.strip()
def _parse_table(stdout: str) -> list[TableRow]:
"""
Parse the data rows from a precheck table in stdout.
The table is now wrapped in an ASCII box by cliutil.LogTable. Each
content line has the form '| <content> |', optionally preceded by a
zerolog console prefix. We strip both the prefix and the box borders
before splitting on two-or-more spaces (text/tabwriter padding=2).
We skip the column-header row and stop at blank lines, SUMMARY, box
border lines, ERROR, or WARNING lines.
"""
rows = []
in_data = False
for raw_line in stdout.splitlines():
msg = _strip_log_prefix(raw_line)
line = _unbox_line(raw_line)
if line.startswith("COMPONENT"):
in_data = True
continue
if not in_data:
continue
if (line == "" or line.startswith("SUMMARY") or BOX_BORDER_RE.match(msg)
or line.startswith("ERROR") or line.startswith("WARNING")):
in_data = False
continue
parts = re.split(r" +", line.rstrip())
if len(parts) >= 3:
rows.append(TableRow(
component=parts[0],
target=parts[1],
status=parts[2],
details=parts[3] if len(parts) >= 4 else "",
))
return rows
def _rows_for(rows: list[TableRow], component: str) -> list[TableRow]:
return [r for r in rows if r.component == component]
# log assertions
def _assert_precheck_summary_log(
log_lines: list[dict],
*,
hard_fail: bool,
suggested_protocol: str | None = None,
):
"""Assert the 'precheck complete' summary log line has the expected fields."""
summary_lines = [l for l in log_lines if l.get("message") == LOG_MSG_PRECHECK_COMPLETE]
assert len(summary_lines) == 1, \
f"Expected exactly one '{LOG_MSG_PRECHECK_COMPLETE}' log line; got {summary_lines}"
summary = summary_lines[0]
assert summary.get("hard_fail") is hard_fail, \
f"Expected hard_fail={hard_fail} in summary log: {summary}"
if suggested_protocol is not None:
assert summary.get("suggested_protocol") == suggested_protocol, \
(f"Expected suggested_protocol={suggested_protocol!r}; "
f"got {summary.get('suggested_protocol')!r}")
# ---------- Tests ----------
class TestPrechecksHappyPath:
"""
On a healthy connection all probes pass. We assert:
- the full table structure (header, column header, separator)
- every row's component, target, status, and details
- no ERROR/WARNING action lines
- the exact summary line
- the structured log summary (hard_fail=false, suggested_protocol=quic)
"""
def test_prechecks_pass_on_healthy_connection(self, tmp_path, component_tests_config):
log_file = str(tmp_path / "cloudflared.log")
config = component_tests_config({"logfile": log_file})
with start_cloudflared(
tmp_path,
config,
cfd_pre_args=["tunnel", "--ha-connections", "1"],
cfd_args=["run"],
new_process=True,
capture_output=True,
) as cfd:
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
# Poll the log file for the sentinel before signalling the process.
log_lines = _poll_log_file_for_precheck_complete(
log_file, timeout=PRECHECK_POLL_TIMEOUT_SECS
)
# Signal shutdown.
cfd.terminate()
# The process is now dead. All output was captured by the background
# reader thread into cfd.stdout_lines (stderr is merged into stdout).
stdout = b"".join(cfd.stdout_lines).decode(errors="replace")
LOGGER.debug(f"[happy-path] stdout:\n{stdout}")
LOGGER.debug(f"[happy-path] log_lines:\n{log_lines}")
# Strip zerolog console prefixes so pattern matching works on raw messages.
messages = "\n".join(_strip_log_prefix(l) for l in stdout.splitlines())
# ── table structure ──────────────────────────────────────────────────
# zerolog writes to stderr which is merged into stdout by the harness.
# The table is wrapped in an ASCII box by cliutil.LogTable.
assert BOX_TITLE in messages, \
f"Expected box title '{BOX_TITLE}' in output;\ngot:\n{stdout}"
assert COL_HEADER in messages, \
f"Expected column header row in output;\ngot:\n{stdout}"
assert BOX_BORDER_RE.search(messages), \
f"Expected box border line (+---+) in output;\ngot:\n{stdout}"
# ── row content ──────────────────────────────────────────────────────
rows = _parse_table(stdout)
assert len(rows) == 7, \
f"Expected 7 rows (2 DNS + 2 QUIC + 2 HTTP/2 + 1 API); got {len(rows)}: {rows}"
dns_rows = _rows_for(rows, COMP_DNS)
assert len(dns_rows) == 2, f"Expected 2 DNS rows; got {dns_rows}"
assert dns_rows[0].target == TARGET_REGION1
assert dns_rows[1].target == TARGET_REGION2
for r in dns_rows:
assert r.status == PASS, f"DNS row not PASS: {r}"
assert r.details == DETAILS_DNS_RESOLVED, f"DNS row details wrong: {r}"
quic_rows = _rows_for(rows, COMP_QUIC)
assert len(quic_rows) == 2, f"Expected 2 QUIC rows; got {quic_rows}"
assert quic_rows[0].target == TARGET_REGION1, f"QUIC row[0] target wrong: {quic_rows[0]}"
assert quic_rows[1].target == TARGET_REGION2, f"QUIC row[1] target wrong: {quic_rows[1]}"
for r in quic_rows:
assert r.status == PASS, f"QUIC row not PASS: {r}"
assert r.details == DETAILS_QUIC_OK, f"QUIC row details wrong: {r}"
h2_rows = _rows_for(rows, COMP_H2)
assert len(h2_rows) == 2, f"Expected 2 HTTP/2 rows; got {h2_rows}"
assert h2_rows[0].target == TARGET_REGION1, f"HTTP/2 row[0] target wrong: {h2_rows[0]}"
assert h2_rows[1].target == TARGET_REGION2, f"HTTP/2 row[1] target wrong: {h2_rows[1]}"
for r in h2_rows:
assert r.status == PASS, f"HTTP/2 row not PASS: {r}"
assert r.details == DETAILS_HTTP2_OK, f"HTTP/2 row details wrong: {r}"
api_rows = _rows_for(rows, COMP_API)
assert len(api_rows) == 1, f"Expected 1 API row; got {api_rows}"
assert api_rows[0].target == TARGET_API, f"API row target wrong: {api_rows[0]}"
assert api_rows[0].status == PASS, f"API row not PASS: {api_rows[0]}"
assert api_rows[0].details == DETAILS_API_OK, f"API row details wrong: {api_rows[0]}"
# ── no action lines ──────────────────────────────────────────────────
assert PREFIX_ERROR not in messages, f"Unexpected ERROR action:\n{stdout}"
assert PREFIX_WARNING not in messages, f"Unexpected WARNING action:\n{stdout}"
# ── summary line ─────────────────────────────────────────────────────
assert SUMMARY_HEALTHY in messages, \
f"Expected healthy summary;\ngot:\n{stdout}"
# ── structured log ───────────────────────────────────────────────────
assert len(log_lines) > 0, \
"Expected at least one structured precheck log line in log file"
for line in log_lines:
if line.get("message") == LOG_MSG_PRECHECK:
assert line.get("status") == STATUS_PASS_LOG, \
f"Expected status=pass in precheck log line: {line}"
_assert_precheck_summary_log(log_lines, hard_fail=False, suggested_protocol="quic")
class TestPrechecksHardFail:
"""
When --edge points at an unreachable IP, StaticEdgeDNSResolver resolves
the literal address directly (DNS row = PASS), but both transport probes
time out -> hard fail. We assert:
- the full table structure
- DNS row: PASS (the literal IP was resolved)
- QUIC row: FAIL with correct details + ERROR action
- HTTP/2 row: FAIL with correct details + ERROR action
- API row: PASS (api.cloudflare.com:443 is independently reachable)
- the exact critical summary line
- the structured log summary (hard_fail=true)
This test does NOT call wait_tunnel_ready because the tunnel will not
connect to the unreachable address.
"""
def test_prechecks_hard_fail_when_edge_unreachable(self, tmp_path, component_tests_config):
log_file = str(tmp_path / "cloudflared.log")
config = component_tests_config({"logfile": log_file})
with start_cloudflared(
tmp_path,
config,
cfd_pre_args=[
"tunnel",
"--ha-connections", "1",
"--edge", UNREACHABLE_EDGE,
],
cfd_args=["run"],
new_process=True,
capture_output=True,
) as cfd:
log_lines = _poll_log_file_for_precheck_complete(
log_file, timeout=PRECHECK_POLL_TIMEOUT_SECS
)
cfd.terminate()
stdout = b"".join(cfd.stdout_lines).decode(errors="replace")
LOGGER.debug(f"[hard-fail] stdout:\n{stdout}")
LOGGER.debug(f"[hard-fail] log_lines:\n{log_lines}")
# Strip zerolog console prefixes so pattern matching works on raw messages.
messages = "\n".join(_strip_log_prefix(l) for l in stdout.splitlines())
# ── table structure ──────────────────────────────────────────────────
# zerolog writes to stderr which is merged into stdout by the harness.
# The table is wrapped in an ASCII box by cliutil.LogTable.
assert BOX_TITLE in messages, \
f"Expected box title '{BOX_TITLE}' in output;\ngot:\n{stdout}"
assert COL_HEADER in messages, \
f"Expected column header row in output;\ngot:\n{stdout}"
assert BOX_BORDER_RE.search(messages), \
f"Expected box border line (+---+) in output;\ngot:\n{stdout}"
# ── row content ──────────────────────────────────────────────────────
rows = _parse_table(stdout)
assert len(rows) == 4, \
f"Expected 4 rows (1 DNS + 1 QUIC + 1 HTTP/2 + 1 API); got {len(rows)}: {rows}"
dns_rows = _rows_for(rows, COMP_DNS)
assert len(dns_rows) == 1, f"Expected 1 DNS row; got {dns_rows}"
assert dns_rows[0].target == UNREACHABLE_EDGE
assert dns_rows[0].status == PASS, f"DNS row not PASS: {dns_rows[0]}"
assert dns_rows[0].details == DETAILS_DNS_RESOLVED, f"DNS row details wrong: {dns_rows[0]}"
quic_rows = _rows_for(rows, COMP_QUIC)
assert len(quic_rows) == 1, f"Expected 1 QUIC row; got {quic_rows}"
assert quic_rows[0].target == UNREACHABLE_EDGE, f"QUIC row target wrong: {quic_rows[0]}"
assert quic_rows[0].status == FAIL, f"QUIC row not FAIL: {quic_rows[0]}"
assert quic_rows[0].details == DETAILS_QUIC_FAIL, f"QUIC row details wrong: {quic_rows[0]}"
h2_rows = _rows_for(rows, COMP_H2)
assert len(h2_rows) == 1, f"Expected 1 HTTP/2 row; got {h2_rows}"
assert h2_rows[0].target == UNREACHABLE_EDGE, f"HTTP/2 row target wrong: {h2_rows[0]}"
assert h2_rows[0].status == FAIL, f"HTTP/2 row not FAIL: {h2_rows[0]}"
assert h2_rows[0].details == DETAILS_HTTP2_FAIL, f"HTTP/2 row details wrong: {h2_rows[0]}"
api_rows = _rows_for(rows, COMP_API)
assert len(api_rows) == 1, f"Expected 1 API row; got {api_rows}"
assert api_rows[0].target == TARGET_API, f"API row target wrong: {api_rows[0]}"
assert api_rows[0].status == PASS, f"API row not PASS: {api_rows[0]}"
assert api_rows[0].details == DETAILS_API_OK, f"API row details wrong: {api_rows[0]}"
assert f"{PREFIX_ERROR}{ACTION_QUIC_BLOCKED}" in messages, \
f"Expected QUIC ERROR action;\ngot:\n{stdout}"
assert f"{PREFIX_ERROR}{ACTION_HTTP2_BLOCKED}" in messages, \
f"Expected HTTP/2 ERROR action;\ngot:\n{stdout}"
assert SUMMARY_CRITICAL in messages, \
f"Expected critical summary;\ngot:\n{stdout}"
_assert_precheck_summary_log(log_lines, hard_fail=True, suggested_protocol=None)
class TestPreChecksDiag:
"""
Verify that `cloudflared tunnel diag` includes prechecks.json in the
diagnostic zip archive produced against a live tunnel instance.
The precheck job in diagnostic.go is gated on noDiagNetwork; we do NOT
pass --no-diag-network so prechecks.json must be present. We skip the
heavier collectors (logs, metrics, system, runtime) to keep the test fast.
The diag subcommand writes the zip to its current working directory. We
run it with cwd=tmp_path so the archive lands there and is cleaned up
automatically by pytest. We resolve config.cloudflared_binary to an
absolute path before changing cwd, because the binary path may be relative
to the original working directory.
"""
def test_diag_contains_prechecks_json(self, tmp_path, component_tests_config):
config = component_tests_config()
binary = os.path.abspath(config.cloudflared_binary)
with start_cloudflared(
tmp_path,
config,
cfd_pre_args=["tunnel", "--ha-connections", "1"],
cfd_args=["run"],
new_process=True,
capture_output=True,
) as cfd:
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
# Run the diag subcommand as a one-shot process against the
# already-running instance. We skip log/metrics/system/runtime
# collectors; the network collector (which runs prechecks) is left
# enabled.
diag_result = subprocess.run(
[
binary,
"tunnel",
"diag",
"--metrics", f"localhost:{METRICS_PORT}",
"--no-diag-logs",
"--no-diag-metrics",
"--no-diag-system",
"--no-diag-runtime",
],
cwd=str(tmp_path),
capture_output=True,
timeout=60,
)
cfd.terminate()
diag_stdout = diag_result.stdout.decode(errors="replace")
diag_stderr = diag_result.stderr.decode(errors="replace")
LOGGER.debug(f"[diag] stdout:\n{diag_stdout}")
LOGGER.debug(f"[diag] stderr:\n{diag_stderr}")
assert diag_result.returncode == 0, (
f"cloudflared tunnel diag exited with code {diag_result.returncode}\n"
f"stdout:\n{diag_stdout}\nstderr:\n{diag_stderr}"
)
# Locate the zip file written to tmp_path by the diag command.
zip_files = list(tmp_path.glob("cloudflared-diag-*.zip"))
assert len(zip_files) == 1, \
f"Expected exactly one cloudflared-diag-*.zip in {tmp_path}; found {zip_files}"
zip_path = zip_files[0]
with zipfilemod.ZipFile(zip_path) as zf:
names = zf.namelist()
LOGGER.debug(f"[diag] zip contents: {names}")
assert "prechecks.json" in names, \
f"Expected prechecks.json in diag zip; got: {names}"
# Must be valid JSON containing at least the RunID field that
# prechecks.Run() always sets.
with zf.open("prechecks.json") as fh:
data = json.load(fh)
assert "RunID" in data, \
f"Expected RunID key in prechecks.json; got keys: {list(data.keys())}"