Files
yaping/test_yaping.py
yair aacf975446 Initial commit: yaping - SmokePing-like network latency monitoring tool
Features:
- Multiple probe methods: ICMP (subprocess), TCP connect, HTTP/HTTPS
- No root required
- SQLite storage for measurements
- Beautiful terminal graphs with plotext
- Single-file script with PEP 723 inline dependencies
- CLI interface with rich output

Commands: add, remove, list, enable, disable, probe, run, stats, graph, history, import-config

Run with: uv run yaping.py
2026-01-13 18:20:07 +02:00

626 lines
20 KiB
Python

#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pytest>=8.0",
# "click>=8.1",
# "httpx>=0.27",
# "plotext>=5.2",
# "rich>=13.7",
# ]
# ///
"""
Tests for yaping - Yet Another PING
Run with: uv run pytest test_yaping.py -v
"""
from __future__ import annotations
import sqlite3
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from click.testing import CliRunner
# Import from yaping module
from yaping import (
Database,
Measurement,
Stats,
Target,
calculate_stats,
cli,
http_probe,
icmp_probe,
parse_period,
probe_target,
tcp_probe,
)
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def temp_db():
"""Create a temporary database for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
db = Database(db_path)
db.init()
yield db
db.close()
@pytest.fixture
def runner():
"""Create a CLI runner."""
return CliRunner()
@pytest.fixture
def isolated_runner(runner):
"""Create an isolated CLI runner with temp database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
def invoke_with_db(*args, **kwargs):
return runner.invoke(cli, ["--db", str(db_path)] + list(args[0]), **kwargs)
yield invoke_with_db
# =============================================================================
# Unit Tests - Database Layer
# =============================================================================
class TestDatabase:
"""Tests for the Database class."""
def test_init_creates_tables(self, temp_db):
"""Test that init creates required tables."""
# Check tables exist
tables = temp_db.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
table_names = [t["name"] for t in tables]
assert "targets" in table_names
assert "measurements" in table_names
def test_add_target(self, temp_db):
"""Test adding a target."""
target_id = temp_db.add_target("test", "example.com", "icmp")
assert target_id is not None
assert target_id > 0
def test_add_target_with_port(self, temp_db):
"""Test adding a TCP target with port."""
target_id = temp_db.add_target("test", "example.com", "tcp", port=443)
target = temp_db.get_target("test")
assert target is not None
assert target.port == 443
assert target.probe_type == "tcp"
def test_add_duplicate_target_raises(self, temp_db):
"""Test adding duplicate target raises IntegrityError."""
temp_db.add_target("test", "example.com", "icmp")
with pytest.raises(sqlite3.IntegrityError):
temp_db.add_target("test", "other.com", "tcp")
def test_get_target(self, temp_db):
"""Test retrieving a target by name."""
temp_db.add_target("mytest", "example.com", "icmp")
target = temp_db.get_target("mytest")
assert target is not None
assert target.name == "mytest"
assert target.host == "example.com"
assert target.probe_type == "icmp"
assert target.enabled is True
def test_get_target_not_found(self, temp_db):
"""Test retrieving non-existent target returns None."""
target = temp_db.get_target("nonexistent")
assert target is None
def test_get_targets(self, temp_db):
"""Test retrieving all targets."""
temp_db.add_target("test1", "example1.com", "icmp")
temp_db.add_target("test2", "example2.com", "tcp", port=80)
temp_db.add_target("test3", "example3.com", "http")
targets = temp_db.get_targets()
assert len(targets) == 3
def test_get_targets_enabled_only(self, temp_db):
"""Test retrieving only enabled targets."""
temp_db.add_target("test1", "example1.com", "icmp")
temp_db.add_target("test2", "example2.com", "icmp")
temp_db.set_target_enabled("test2", False)
targets = temp_db.get_targets(enabled_only=True)
assert len(targets) == 1
assert targets[0].name == "test1"
def test_remove_target(self, temp_db):
"""Test removing a target."""
temp_db.add_target("test", "example.com", "icmp")
result = temp_db.remove_target("test")
assert result is True
assert temp_db.get_target("test") is None
def test_remove_target_not_found(self, temp_db):
"""Test removing non-existent target returns False."""
result = temp_db.remove_target("nonexistent")
assert result is False
def test_set_target_enabled(self, temp_db):
"""Test enabling/disabling a target."""
temp_db.add_target("test", "example.com", "icmp")
temp_db.set_target_enabled("test", False)
target = temp_db.get_target("test")
assert target.enabled is False
temp_db.set_target_enabled("test", True)
target = temp_db.get_target("test")
assert target.enabled is True
def test_record_measurement_success(self, temp_db):
"""Test recording a successful measurement."""
target_id = temp_db.add_target("test", "example.com", "icmp")
temp_db.record_measurement(target_id, 25.5, success=True)
measurements = temp_db.get_measurements(target_id)
assert len(measurements) == 1
assert measurements[0].latency_ms == 25.5
assert measurements[0].success is True
assert measurements[0].error_message is None
def test_record_measurement_failure(self, temp_db):
"""Test recording a failed measurement."""
target_id = temp_db.add_target("test", "example.com", "icmp")
temp_db.record_measurement(target_id, None, success=False, error="Timeout")
measurements = temp_db.get_measurements(target_id)
assert len(measurements) == 1
assert measurements[0].latency_ms is None
assert measurements[0].success is False
assert measurements[0].error_message == "Timeout"
def test_get_measurements_limit(self, temp_db):
"""Test limiting measurements returned."""
target_id = temp_db.add_target("test", "example.com", "icmp")
for i in range(10):
temp_db.record_measurement(target_id, float(i), success=True)
measurements = temp_db.get_measurements(target_id, limit=5)
assert len(measurements) == 5
def test_get_stats(self, temp_db):
"""Test calculating statistics."""
target_id = temp_db.add_target("test", "example.com", "icmp")
temp_db.record_measurement(target_id, 10.0, success=True)
temp_db.record_measurement(target_id, 20.0, success=True)
temp_db.record_measurement(target_id, 30.0, success=True)
stats = temp_db.get_stats(target_id)
assert stats.count == 3
assert stats.avg == 20.0
assert stats.min == 10.0
assert stats.max == 30.0
assert stats.loss_percent == 0.0
# =============================================================================
# Unit Tests - Statistics
# =============================================================================
class TestStatistics:
"""Tests for statistics calculation."""
def test_calculate_stats_empty(self):
"""Test stats with empty list."""
stats = calculate_stats([])
assert stats.count == 0
assert stats.avg is None
assert stats.min is None
assert stats.max is None
assert stats.loss_percent == 0.0
def test_calculate_stats_single(self):
"""Test stats with single measurement."""
measurements = [
Measurement(
id=1,
target_id=1,
timestamp=datetime.now(),
latency_ms=25.0,
success=True,
error_message=None,
)
]
stats = calculate_stats(measurements)
assert stats.count == 1
assert stats.avg == 25.0
assert stats.min == 25.0
assert stats.max == 25.0
assert stats.stddev == 0.0
assert stats.loss_percent == 0.0
def test_calculate_stats_multiple(self):
"""Test stats with multiple measurements."""
measurements = [
Measurement(1, 1, datetime.now(), 10.0, True, None),
Measurement(2, 1, datetime.now(), 20.0, True, None),
Measurement(3, 1, datetime.now(), 30.0, True, None),
]
stats = calculate_stats(measurements)
assert stats.count == 3
assert stats.avg == 20.0
assert stats.min == 10.0
assert stats.max == 30.0
def test_calculate_stats_with_failures(self):
"""Test stats with some failed measurements."""
measurements = [
Measurement(1, 1, datetime.now(), 10.0, True, None),
Measurement(2, 1, datetime.now(), None, False, "Timeout"),
Measurement(3, 1, datetime.now(), 30.0, True, None),
]
stats = calculate_stats(measurements)
assert stats.count == 3
assert stats.avg == 20.0 # Average of 10 and 30
assert abs(stats.loss_percent - 33.33) < 0.1
def test_calculate_stats_all_failures(self):
"""Test stats when all measurements failed."""
measurements = [
Measurement(1, 1, datetime.now(), None, False, "Timeout"),
Measurement(2, 1, datetime.now(), None, False, "Timeout"),
]
stats = calculate_stats(measurements)
assert stats.count == 2
assert stats.avg is None
assert stats.loss_percent == 100.0
class TestParsePeriod:
"""Tests for period parsing."""
def test_parse_seconds(self):
"""Test parsing seconds."""
delta = parse_period("30s")
assert delta == timedelta(seconds=30)
def test_parse_minutes(self):
"""Test parsing minutes."""
delta = parse_period("5m")
assert delta == timedelta(minutes=5)
def test_parse_hours(self):
"""Test parsing hours."""
delta = parse_period("24h")
assert delta == timedelta(hours=24)
def test_parse_days(self):
"""Test parsing days."""
delta = parse_period("7d")
assert delta == timedelta(days=7)
def test_parse_invalid(self):
"""Test invalid period returns None."""
assert parse_period("invalid") is None
assert parse_period("") is None
assert parse_period("10x") is None
# =============================================================================
# Unit Tests - Probe Methods
# =============================================================================
class TestProbes:
"""Tests for probe methods."""
def test_icmp_probe_localhost(self):
"""Test ICMP probe against localhost."""
latency, error = icmp_probe("127.0.0.1", timeout=5.0)
# Localhost should always respond
assert latency is not None
assert latency > 0
assert error is None
def test_icmp_probe_invalid_host(self):
"""Test ICMP probe against invalid host."""
latency, error = icmp_probe("invalid.host.that.does.not.exist.local", timeout=2.0)
assert latency is None
assert error is not None
@pytest.mark.network
def test_tcp_probe_dns(self):
"""Test TCP probe against known service (Cloudflare DNS)."""
latency, error = tcp_probe("1.1.1.1", 53, timeout=5.0)
assert latency is not None
assert latency > 0
assert error is None
def test_tcp_probe_refused(self):
"""Test TCP probe against closed port."""
# Use a high port that's unlikely to be open
latency, error = tcp_probe("127.0.0.1", 59999, timeout=2.0)
assert latency is None
assert error is not None
def test_tcp_probe_invalid_host(self):
"""Test TCP probe against invalid host."""
latency, error = tcp_probe("invalid.host.local", 80, timeout=2.0)
assert latency is None
assert error is not None
@pytest.mark.network
def test_http_probe_success(self):
"""Test HTTP probe against known endpoint."""
# Try multiple endpoints in case one is down
endpoints = [
"https://www.google.com",
"https://cloudflare.com",
"https://httpbin.org/get",
]
for url in endpoints:
latency, error = http_probe(url, timeout=10.0)
if latency is not None:
assert latency > 0
assert error is None
return
# If all endpoints fail, skip the test (network might be unavailable)
pytest.skip("All HTTP endpoints unreachable - network may be unavailable")
def test_http_probe_invalid_url(self):
"""Test HTTP probe against invalid URL."""
latency, error = http_probe("https://invalid.domain.that.does.not.exist.local", timeout=2.0)
assert latency is None
assert error is not None
def test_probe_target_icmp(self, temp_db):
"""Test probe_target with ICMP method."""
target_id = temp_db.add_target("test", "127.0.0.1", "icmp")
target = temp_db.get_target("test")
latency, error = probe_target(target, timeout=5.0)
assert latency is not None or error is not None # Either should be set
def test_probe_target_tcp_missing_port(self, temp_db):
"""Test probe_target with TCP method but missing port."""
target_id = temp_db.add_target("test", "example.com", "tcp")
target = temp_db.get_target("test")
latency, error = probe_target(target, timeout=2.0)
assert latency is None
assert "port" in error.lower()
def test_probe_target_unknown_method(self, temp_db):
"""Test probe_target with unknown method."""
# Manually insert a target with invalid probe type
temp_db.conn.execute(
"INSERT INTO targets (name, host, probe_type) VALUES (?, ?, ?)",
("test", "example.com", "invalid"),
)
temp_db.conn.commit()
target = temp_db.get_target("test")
latency, error = probe_target(target, timeout=2.0)
assert latency is None
assert "unknown" in error.lower()
# =============================================================================
# Integration Tests - CLI Commands
# =============================================================================
class TestCLI:
"""Integration tests for CLI commands."""
def test_add_command(self, isolated_runner):
"""Test add command."""
result = isolated_runner(["add", "test", "--host", "example.com"])
assert result.exit_code == 0
assert "Added target" in result.output
assert "test" in result.output
def test_add_tcp_requires_port(self, isolated_runner):
"""Test add TCP target requires port."""
result = isolated_runner(["add", "test", "--host", "example.com", "--method", "tcp"])
assert result.exit_code != 0
assert "port" in result.output.lower()
def test_add_tcp_with_port(self, isolated_runner):
"""Test add TCP target with port."""
result = isolated_runner(["add", "test", "--host", "example.com", "--method", "tcp", "--port", "443"])
assert result.exit_code == 0
assert "Added target" in result.output
def test_add_duplicate(self, isolated_runner):
"""Test adding duplicate target fails."""
isolated_runner(["add", "test", "--host", "example.com"])
result = isolated_runner(["add", "test", "--host", "other.com"])
assert result.exit_code != 0
assert "already exists" in result.output
def test_list_empty(self, isolated_runner):
"""Test list command with no targets."""
result = isolated_runner(["list"])
assert result.exit_code == 0
assert "No targets" in result.output
def test_list_with_targets(self, isolated_runner):
"""Test list command with targets."""
isolated_runner(["add", "test1", "--host", "example1.com"])
isolated_runner(["add", "test2", "--host", "example2.com", "--method", "http"])
result = isolated_runner(["list"])
assert result.exit_code == 0
assert "test1" in result.output
assert "test2" in result.output
assert "example1.com" in result.output
assert "example2.com" in result.output
def test_remove_command(self, isolated_runner):
"""Test remove command."""
isolated_runner(["add", "test", "--host", "example.com"])
result = isolated_runner(["remove", "test"])
assert result.exit_code == 0
assert "Removed" in result.output
def test_remove_not_found(self, isolated_runner):
"""Test remove non-existent target."""
result = isolated_runner(["remove", "nonexistent"])
assert result.exit_code != 0
assert "not found" in result.output
def test_enable_disable(self, isolated_runner):
"""Test enable/disable commands."""
isolated_runner(["add", "test", "--host", "example.com"])
result = isolated_runner(["disable", "test"])
assert result.exit_code == 0
assert "Disabled" in result.output
result = isolated_runner(["enable", "test"])
assert result.exit_code == 0
assert "Enabled" in result.output
def test_probe_command(self, isolated_runner):
"""Test probe command."""
isolated_runner(["add", "localhost", "--host", "127.0.0.1"])
result = isolated_runner(["probe"])
assert result.exit_code == 0
# Should show either success or failure with the target name
assert "localhost" in result.output
def test_probe_no_targets(self, isolated_runner):
"""Test probe command with no targets."""
result = isolated_runner(["probe"])
assert result.exit_code == 0
assert "No enabled targets" in result.output
def test_stats_no_data(self, isolated_runner):
"""Test stats command with no measurements."""
isolated_runner(["add", "test", "--host", "example.com"])
result = isolated_runner(["stats"])
assert result.exit_code == 0
# Should show stats table even with no data
def test_stats_specific_target(self, isolated_runner):
"""Test stats for specific target."""
isolated_runner(["add", "test", "--host", "127.0.0.1"])
isolated_runner(["probe"]) # Generate some data
result = isolated_runner(["stats", "test"])
assert result.exit_code == 0
assert "test" in result.output
def test_stats_target_not_found(self, isolated_runner):
"""Test stats for non-existent target."""
result = isolated_runner(["stats", "nonexistent"])
assert result.exit_code != 0
assert "not found" in result.output
def test_graph_no_data(self, isolated_runner):
"""Test graph command with no data."""
isolated_runner(["add", "test", "--host", "example.com"])
result = isolated_runner(["graph", "test"])
assert result.exit_code == 0
assert "No measurements" in result.output
def test_graph_target_not_found(self, isolated_runner):
"""Test graph for non-existent target."""
result = isolated_runner(["graph", "nonexistent"])
assert result.exit_code != 0
assert "not found" in result.output
def test_history_command(self, isolated_runner):
"""Test history command."""
isolated_runner(["add", "localhost", "--host", "127.0.0.1"])
isolated_runner(["probe"]) # Generate some data
result = isolated_runner(["history", "localhost"])
assert result.exit_code == 0
# Should show history table
def test_history_no_data(self, isolated_runner):
"""Test history command with no measurements."""
isolated_runner(["add", "test", "--host", "example.com"])
result = isolated_runner(["history", "test"])
assert result.exit_code == 0
assert "No measurements" in result.output
# =============================================================================
# Main
# =============================================================================
if __name__ == "__main__":
pytest.main([__file__, "-v"])