commit aacf975446bcefcb5671c8f3afa3c460f8c015a4 Author: yair Date: Tue Jan 13 18:20:07 2026 +0200 Initial commit: yaping - SmokePing-like network latency monitoring tool Features: - Multiple probe methods: ICMP (subprocess), TCP connect, HTTP/HTTPS - No root required - SQLite storage for measurements - Beautiful terminal graphs with plotext - Single-file script with PEP 723 inline dependencies - CLI interface with rich output Commands: add, remove, list, enable, disable, probe, run, stats, graph, history, import-config Run with: uv run yaping.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4b1741b --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.nox/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Database (don't commit local test data) +*.db + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..1379ebf --- /dev/null +++ b/README.md @@ -0,0 +1,167 @@ +# yaping - Yet Another PING + +A SmokePing-like network latency monitoring tool written in Python with CLI graphs. + +## Features + +- **Multiple Probe Methods**: ICMP (via subprocess), TCP connect, HTTP/HTTPS timing +- **No Root Required**: Uses system ping command or pure-Python TCP/HTTP probes +- **SQLite Storage**: Persistent storage of all measurements +- **Terminal Graphs**: Rich CLI visualization using plotext +- **Single-file Script**: Uses PEP 723 inline dependencies with `uv run` + +## Requirements + +- Python 3.11+ +- [uv](https://github.com/astral-sh/uv) package manager + +## Installation + +No installation required! Just run with `uv`: + +```bash +# Clone or download yaping.py +uv run yaping.py --help +``` + +uv automatically installs dependencies on first run. + +## Quick Start + +```bash +# Add targets +uv run yaping.py add google --host google.com +uv run yaping.py add cloudflare --host 1.1.1.1 --method tcp --port 443 +uv run yaping.py add github --host https://api.github.com --method http + +# List targets +uv run yaping.py list + +# Run single probe +uv run yaping.py probe + +# Start continuous monitoring +uv run yaping.py run + +# View statistics +uv run yaping.py stats + +# Display graph +uv run yaping.py graph google +uv run yaping.py graph # All targets +``` + +## Commands + +| Command | Description | +|---------|-------------| +| `add NAME --host HOST` | Add a monitoring target | +| `remove NAME` | Remove a target | +| `list` | List all targets | +| `enable NAME` | Enable a target | +| `disable NAME` | Disable a target | +| `probe` | Run single probe for all targets | +| `run` | Run continuous monitoring | +| `stats [NAME]` | Show latency statistics | +| `graph [NAME]` | Display terminal graph | +| `history NAME` | Show measurement history | +| `import-config FILE` | Import targets from TOML config | + +## Probe Methods + +### ICMP (default) +```bash +uv run yaping.py add myserver --host example.com +``` + +### TCP Connect +```bash +uv run yaping.py add dns --host 1.1.1.1 --method tcp --port 53 +uv run yaping.py add https --host example.com --method tcp --port 443 +``` + +### HTTP/HTTPS +```bash +uv run yaping.py add api --host https://api.example.com --method http +``` + +## Example Output + +### Statistics +``` + Statistics (last 1h) +┏━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━┳━━━━━━┳━━━━━━━━━┓ +┃ Target ┃ Avg ┃ Min ┃ Max ┃ StdDev ┃ Loss ┃ Samples ┃ +┡━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━╇━━━━━━╇━━━━━━━━━┩ +│ cloudflare │ 3.4ms │ 3.3ms │ 3.5ms │ 0.2ms │ 0.0% │ 2 │ +│ google │ 4.1ms │ 4.1ms │ 4.2ms │ 0.1ms │ 0.0% │ 2 │ +└────────────┴───────┴───────┴───────┴────────┴──────┴─────────┘ +``` + +### Graph +``` + Latency - google (last 1h) + ┌───────────────────────────────────────────────────────────────────────────┐ +4.20┤ ⢀⠤⠊⠒⠒⠢⠤⠤⣀⣀⡀ │ +4.10┤⣀⠤⠊⠁ ⠈⠉⠉⠒⠒⠢⠤⠤⣀⣀⡀ │ + │ ⠈⠉⠉⠒⠒⠢⠤⠤⣀⣀⡀ │ +4.00┤ ⠈⠉⠉⠒⠒⠒⠤⠤⢄⣀⣀ │ +3.90├────────────────────────────────────────────⠉⠉⠑⠒⠒⠤⠤⢄⣀⣀─────────────────────┤ +3.80┤ ⠉⠉⠑⠒⠒⠤⠤⢄⣀⣀ ⢸│ + └┬──────────────────┬─────────────────┬──────────────────┬─────────────────┬┘ + +Statistics: avg=3.9ms, min=3.6ms, max=4.2ms, loss=0.0% +``` + +## Configuration File + +Import targets from a TOML configuration file: + +```toml +# yaping.toml +[[targets]] +name = "google-dns" +host = "8.8.8.8" +method = "icmp" +interval = 30 + +[[targets]] +name = "cloudflare" +host = "1.1.1.1" +method = "tcp" +port = 443 +interval = 60 +``` + +```bash +uv run yaping.py import-config yaping.toml +``` + +## Options + +### Global Options +- `--db PATH`: Custom database path (default: `~/.local/share/yaping/yaping.db`) + +### Time Periods +Use with `stats`, `graph`, and `history`: +- `-p 1h`: Last hour (default) +- `-p 24h`: Last 24 hours +- `-p 7d`: Last 7 days + +## Running Tests + +```bash +uv run --script test_yaping.py +``` + +## Dependencies + +All handled automatically by uv: +- click - CLI framework +- httpx - HTTP/HTTPS probing +- plotext - Terminal graphs +- rich - Beautiful terminal output + +## License + +MIT diff --git a/plans/yaping-architecture.md b/plans/yaping-architecture.md new file mode 100644 index 0000000..adface8 --- /dev/null +++ b/plans/yaping-architecture.md @@ -0,0 +1,625 @@ +# Yaping - Yet Another PING + +A SmokePing-like network latency monitoring tool written in Python. + +## Overview + +**yaping** is a lightweight, non-root network monitoring tool that measures latency and packet loss to multiple targets using various probe methods, stores results in SQLite, and displays beautiful terminal graphs. + +## Key Features + +- **Multiple Probe Methods**: ICMP (via subprocess), TCP connect, HTTP/HTTPS timing +- **No Root Required**: Uses system ping command or pure-Python TCP/HTTP probes +- **SQLite Storage**: Persistent storage of all measurements +- **Terminal Graphs**: Rich CLI visualization using plotext or similar +- **Flexible Configuration**: TOML config file + CLI arguments +- **Single-file Script**: Uses PEP 723 inline dependencies with `uv run` + +## Architecture + +```mermaid +flowchart TB + subgraph CLI[CLI Interface] + ADD[yaping add target] + RUN[yaping run] + STATS[yaping stats] + GRAPH[yaping graph] + LIST[yaping list] + end + + subgraph Core[Core Modules] + PROBES[Probe Methods] + SCHEDULER[Scheduler] + DB[SQLite Storage] + end + + subgraph Probes[Probe Types] + ICMP[ICMP - subprocess ping] + TCP[TCP Connect] + HTTP[HTTP/HTTPS Request] + end + + CLI --> Core + PROBES --> Probes + SCHEDULER --> PROBES + SCHEDULER --> DB + GRAPH --> DB + STATS --> DB +``` + +## File Structure + +``` +yaping/ +├── yaping.py # Main single-file script with PEP 723 deps +├── yaping.toml # Optional configuration file +├── yaping.db # SQLite database (auto-created) +└── plans/ + └── yaping-architecture.md +``` + +## Database Schema + +```sql +-- Targets to monitor +CREATE TABLE targets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + host TEXT NOT NULL, + probe_type TEXT NOT NULL DEFAULT 'icmp', -- icmp, tcp, http + port INTEGER, -- For TCP/HTTP probes + interval INTEGER DEFAULT 60, -- Probe interval in seconds + enabled INTEGER DEFAULT 1, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- Measurement results +CREATE TABLE measurements ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + target_id INTEGER NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + latency_ms REAL, -- NULL if packet lost + success INTEGER NOT NULL, -- 1=success, 0=failure + error_message TEXT, -- Error details if failed + FOREIGN KEY (target_id) REFERENCES targets(id) +); + +-- Index for efficient time-range queries +CREATE INDEX idx_measurements_target_time + ON measurements(target_id, timestamp); +``` + +## CLI Commands + +### Target Management + +```bash +# Add targets with different probe methods +uv run yaping.py add google --host google.com --method icmp +uv run yaping.py add cloudflare-dns --host 1.1.1.1 --method tcp --port 53 +uv run yaping.py add github-api --host https://api.github.com --method http + +# List all targets +uv run yaping.py list + +# Remove a target +uv run yaping.py remove google + +# Enable/disable a target +uv run yaping.py disable google +uv run yaping.py enable google +``` + +### Running Probes + +```bash +# Run continuous monitoring (foreground) +uv run yaping.py run + +# Run single probe for all targets +uv run yaping.py probe + +# Run with custom interval +uv run yaping.py run --interval 30 +``` + +### Viewing Results + +```bash +# Show statistics for all targets +uv run yaping.py stats + +# Show stats for specific target +uv run yaping.py stats google + +# Show stats for last hour/day/week +uv run yaping.py stats --period 1h +uv run yaping.py stats --period 24h +uv run yaping.py stats --period 7d + +# Display terminal graph +uv run yaping.py graph + +# Graph for specific target +uv run yaping.py graph google + +# Graph with custom time range +uv run yaping.py graph --period 24h +``` + +## Probe Methods + +### 1. ICMP Probe (subprocess) + +Uses the system `ping` command: +- Linux: `ping -c 1 -W 5 ` +- macOS: `ping -c 1 -W 5000 ` + +Parses output to extract latency. + +### 2. TCP Connect Probe + +Pure Python socket connection timing: +```python +import socket +import time + +def tcp_probe(host: str, port: int, timeout: float = 5.0) -> float | None: + start = time.perf_counter() + try: + sock = socket.create_connection((host, port), timeout=timeout) + sock.close() + return (time.perf_counter() - start) * 1000 # ms + except (socket.timeout, OSError): + return None +``` + +### 3. HTTP/HTTPS Probe + +Uses httpx for accurate timing: +```python +import httpx +import time + +def http_probe(url: str, timeout: float = 5.0) -> float | None: + start = time.perf_counter() + try: + response = httpx.get(url, timeout=timeout, follow_redirects=True) + return (time.perf_counter() - start) * 1000 # ms + except httpx.HTTPError: + return None +``` + +## Terminal Graph Visualization + +Using `plotext` for terminal graphs: + +``` +╭─────────────────────────────────────────────────────────────────────╮ +│ Latency - google (last 1h) │ +╰─────────────────────────────────────────────────────────────────────╯ + 50 ┤ + 45 ┤ ╭─╮ + 40 ┤ ╭╯ ╰╮ ╭╮ + 35 ┤ ╭─╮ ╭╯ ╰╮ ╭╯╰╮ + 30 ┼─────╯ ╰──╯ ╰──────────────────╯ ╰───────────────── + 25 ┤ + 20 ┤ + └──────────────────────────────────────────────────────────── + 15:00 15:15 15:30 15:45 16:00 + +Statistics: avg=32.4ms, min=25.1ms, max=48.2ms, loss=0.0% +``` + +Multi-target comparison: +``` +╭─────────────────────────────────────────────────────────────────────╮ +│ Latency Comparison (last 1h) │ +╰─────────────────────────────────────────────────────────────────────╯ + 100 ┤ + 80 ┤ ■ ■ + 60 ┤ ■ ■ ■ ■ ■ ■ + 40 ┼──●─●─●──●──●──●──●──●──●──●──●──●──●──●──●──●──● + 20 ┤ + └──────────────────────────────────────────────────────────── + 15:00 15:15 15:30 15:45 16:00 + +● google (avg: 32ms) ■ cloudflare-dns (avg: 65ms) +``` + +## PEP 723 Inline Dependencies + +```python +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "click>=8.1", +# "httpx>=0.27", +# "plotext>=5.2", +# "rich>=13.7", +# ] +# /// +``` + +## Configuration File (yaping.toml) + +```toml +[defaults] +interval = 60 # Default probe interval in seconds +timeout = 5 # Default timeout in seconds +database = "~/.local/share/yaping/yaping.db" + +[[targets]] +name = "google" +host = "google.com" +method = "icmp" +interval = 30 + +[[targets]] +name = "cloudflare-dns" +host = "1.1.1.1" +method = "tcp" +port = 53 + +[[targets]] +name = "github-api" +host = "https://api.github.com/zen" +method = "http" +interval = 120 +``` + +## Implementation Phases + +### Phase 1: Core Infrastructure +- SQLite database layer with schema +- Probe method implementations (ICMP, TCP, HTTP) +- Basic CLI skeleton with Click + +### Phase 2: Target Management +- Add/remove/list/enable/disable commands +- Configuration file parsing + +### Phase 3: Monitoring +- Scheduler for continuous probing +- Single probe command for testing + +### Phase 4: Visualization +- Statistics calculation and display +- Terminal graphs with plotext +- Rich formatting for tables and output + +## Dependencies + +| Package | Purpose | +|---------|---------| +| click | CLI framework | +| httpx | HTTP/HTTPS probing | +| plotext | Terminal graphs | +| rich | Beautiful terminal output, tables, progress | + +## Usage Example Session + +```bash +# First run - add some targets +$ uv run yaping.py add google --host google.com +✓ Added target 'google' (icmp -> google.com) + +$ uv run yaping.py add cloudflare --host 1.1.1.1 --method tcp --port 443 +✓ Added target 'cloudflare' (tcp -> 1.1.1.1:443) + +$ uv run yaping.py add github --host https://api.github.com --method http +✓ Added target 'github' (http -> https://api.github.com) + +# List targets +$ uv run yaping.py list +┏━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┓ +┃ Name ┃ Host ┃ Method ┃ Interval ┃ +┡━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━┩ +│ google │ google.com │ icmp │ 60s │ +│ cloudflare │ 1.1.1.1:443 │ tcp │ 60s │ +│ github │ https://api.github.com │ http │ 60s │ +└────────────┴──────────────────────────┴────────┴──────────┘ + +# Run monitoring (Ctrl+C to stop) +$ uv run yaping.py run +[15:45:01] google: 28.3ms ✓ +[15:45:01] cloudflare: 12.1ms ✓ +[15:45:02] github: 145.2ms ✓ +... + +# View statistics +$ uv run yaping.py stats +┏━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┓ +┃ Target ┃ Avg ┃ Min ┃ Max ┃ StdDev ┃ Loss ┃ +┡━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━┩ +│ google │ 29.1ms │ 25.3ms │ 45.2ms │ 4.2ms │ 0.0% │ +│ cloudflare │ 11.8ms │ 10.1ms │ 15.3ms │ 1.1ms │ 0.0% │ +│ github │ 142.3ms │ 135.1ms │ 189.2ms │ 12.4ms │ 1.2% │ +└────────────┴─────────┴─────────┴─────────┴─────────┴────────┘ + +# Display graph +$ uv run yaping.py graph google --period 1h +``` + +## Testing Strategy + +Tests will be in a separate file `test_yaping.py` using pytest with inline PEP 723 dependencies. + +### Test File Structure + +```python +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "pytest>=8.0", +# "pytest-asyncio>=0.23", +# "click>=8.1", +# "httpx>=0.27", +# "plotext>=5.2", +# "rich>=13.7", +# ] +# /// +``` + +Run tests with: `uv run test_yaping.py` or `uv run pytest test_yaping.py -v` + +### Test Categories + +#### 1. Unit Tests - Probe Methods + +```python +class TestProbes: + def test_icmp_probe_success(self): + """Test ICMP probe against localhost""" + result = icmp_probe("127.0.0.1") + assert result is not None + assert result > 0 + + def test_icmp_probe_failure(self): + """Test ICMP probe against invalid host""" + result = icmp_probe("invalid.host.that.does.not.exist.local") + assert result is None + + def test_tcp_probe_success(self): + """Test TCP probe against known open port""" + result = tcp_probe("1.1.1.1", 53) # Cloudflare DNS + assert result is not None + assert result > 0 + + def test_tcp_probe_failure(self): + """Test TCP probe against closed port""" + result = tcp_probe("127.0.0.1", 59999) # Unlikely to be open + assert result is None + + def test_http_probe_success(self): + """Test HTTP probe against known endpoint""" + result = http_probe("https://httpbin.org/get") + assert result is not None + assert result > 0 + + def test_http_probe_failure(self): + """Test HTTP probe against invalid URL""" + result = http_probe("https://invalid.domain.local") + assert result is None +``` + +#### 2. Unit Tests - Database Layer + +```python +class TestDatabase: + @pytest.fixture + def temp_db(self, tmp_path): + """Create a temporary database for testing""" + db_path = tmp_path / "test.db" + db = Database(str(db_path)) + db.init() + yield db + db.close() + + def test_add_target(self, temp_db): + """Test adding a target""" + target_id = temp_db.add_target("test", "example.com", "icmp") + assert target_id is not None + assert target_id > 0 + + def test_add_duplicate_target(self, temp_db): + """Test adding duplicate target raises error""" + temp_db.add_target("test", "example.com", "icmp") + with pytest.raises(Exception): + temp_db.add_target("test", "other.com", "tcp") + + def test_get_targets(self, temp_db): + """Test retrieving targets""" + temp_db.add_target("test1", "example1.com", "icmp") + temp_db.add_target("test2", "example2.com", "tcp", port=80) + targets = temp_db.get_targets() + assert len(targets) == 2 + + def test_record_measurement(self, temp_db): + """Test recording a measurement""" + target_id = temp_db.add_target("test", "example.com", "icmp") + temp_db.record_measurement(target_id, 25.5, success=True) + stats = temp_db.get_stats(target_id) + assert stats["count"] == 1 + assert stats["avg"] == 25.5 + + def test_record_failed_measurement(self, temp_db): + """Test recording a failed measurement""" + target_id = temp_db.add_target("test", "example.com", "icmp") + temp_db.record_measurement(target_id, None, success=False, error="Timeout") + stats = temp_db.get_stats(target_id) + assert stats["loss_percent"] == 100.0 + + def test_get_measurements_time_range(self, temp_db): + """Test retrieving measurements with time filter""" + target_id = temp_db.add_target("test", "example.com", "icmp") + temp_db.record_measurement(target_id, 25.5, success=True) + measurements = temp_db.get_measurements(target_id, period="1h") + assert len(measurements) == 1 +``` + +#### 3. Integration Tests - CLI Commands + +```python +from click.testing import CliRunner + +class TestCLI: + @pytest.fixture + def runner(self): + return CliRunner() + + @pytest.fixture + def isolated_env(self, runner, tmp_path): + """Run CLI in isolated filesystem with temp database""" + with runner.isolated_filesystem(temp_dir=tmp_path): + yield runner + + def test_add_command(self, isolated_env): + """Test add command""" + result = isolated_env.invoke(cli, ["add", "test", "--host", "example.com"]) + assert result.exit_code == 0 + assert "Added target" in result.output + + def test_list_command(self, isolated_env): + """Test list command with no targets""" + result = isolated_env.invoke(cli, ["list"]) + assert result.exit_code == 0 + + def test_list_command_with_targets(self, isolated_env): + """Test list command with targets""" + isolated_env.invoke(cli, ["add", "test", "--host", "example.com"]) + result = isolated_env.invoke(cli, ["list"]) + assert result.exit_code == 0 + assert "test" in result.output + + def test_remove_command(self, isolated_env): + """Test remove command""" + isolated_env.invoke(cli, ["add", "test", "--host", "example.com"]) + result = isolated_env.invoke(cli, ["remove", "test"]) + assert result.exit_code == 0 + assert "Removed" in result.output + + def test_stats_no_data(self, isolated_env): + """Test stats command with no measurements""" + isolated_env.invoke(cli, ["add", "test", "--host", "example.com"]) + result = isolated_env.invoke(cli, ["stats"]) + assert result.exit_code == 0 + + def test_probe_command(self, isolated_env): + """Test single probe command""" + isolated_env.invoke(cli, ["add", "localhost", "--host", "127.0.0.1"]) + result = isolated_env.invoke(cli, ["probe"]) + assert result.exit_code == 0 +``` + +#### 4. Unit Tests - Statistics Calculation + +```python +class TestStatistics: + def test_calculate_stats_empty(self): + """Test stats with no data""" + stats = calculate_stats([]) + assert stats["count"] == 0 + assert stats["avg"] is None + + def test_calculate_stats_single(self): + """Test stats with single measurement""" + stats = calculate_stats([25.0]) + assert stats["count"] == 1 + assert stats["avg"] == 25.0 + assert stats["min"] == 25.0 + assert stats["max"] == 25.0 + + def test_calculate_stats_multiple(self): + """Test stats with multiple measurements""" + stats = calculate_stats([10.0, 20.0, 30.0]) + assert stats["count"] == 3 + assert stats["avg"] == 20.0 + assert stats["min"] == 10.0 + assert stats["max"] == 30.0 + + def test_calculate_loss_percent(self): + """Test packet loss calculation""" + # 2 successes, 1 failure + loss = calculate_loss_percent(successes=2, total=3) + assert abs(loss - 33.33) < 0.1 +``` + +#### 5. Unit Tests - Configuration Parsing + +```python +class TestConfig: + def test_parse_config(self, tmp_path): + """Test parsing TOML config""" + config_content = ''' +[defaults] +interval = 30 + +[[targets]] +name = "test" +host = "example.com" +method = "icmp" +''' + config_path = tmp_path / "yaping.toml" + config_path.write_text(config_content) + + config = parse_config(str(config_path)) + assert config["defaults"]["interval"] == 30 + assert len(config["targets"]) == 1 + + def test_parse_config_missing(self): + """Test handling missing config file""" + config = parse_config("/nonexistent/path/config.toml") + assert config is None +``` + +### Test Coverage Goals + +| Module | Target Coverage | +|--------|----------------| +| Probe methods | 90% | +| Database layer | 95% | +| CLI commands | 85% | +| Statistics | 100% | +| Config parsing | 90% | + +### Running Tests + +```bash +# Run all tests +uv run pytest test_yaping.py -v + +# Run with coverage +uv run pytest test_yaping.py --cov=yaping --cov-report=term-missing + +# Run specific test category +uv run pytest test_yaping.py -v -k "TestProbes" + +# Run only fast tests (skip network) +uv run pytest test_yaping.py -v -m "not network" +``` + +### Test Markers + +```python +# Mark slow/network tests +@pytest.mark.network +def test_real_ping(): + """Tests that require network access""" + pass + +@pytest.mark.slow +def test_long_running(): + """Tests that take a while""" + pass +``` + +## Notes + +- The tool is designed as a single Python file for easy distribution +- All dependencies are specified inline using PEP 723 +- Run with `uv run yaping.py` - uv handles dependency installation automatically +- Database defaults to `~/.local/share/yaping/yaping.db` but can be overridden +- Graceful handling of Ctrl+C during monitoring diff --git a/test_yaping.py b/test_yaping.py new file mode 100644 index 0000000..7718ecb --- /dev/null +++ b/test_yaping.py @@ -0,0 +1,625 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "pytest>=8.0", +# "click>=8.1", +# "httpx>=0.27", +# "plotext>=5.2", +# "rich>=13.7", +# ] +# /// +""" +Tests for yaping - Yet Another PING + +Run with: uv run pytest test_yaping.py -v +""" + +from __future__ import annotations + +import sqlite3 +import tempfile +from datetime import datetime, timedelta +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from click.testing import CliRunner + +# Import from yaping module +from yaping import ( + Database, + Measurement, + Stats, + Target, + calculate_stats, + cli, + http_probe, + icmp_probe, + parse_period, + probe_target, + tcp_probe, +) + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def temp_db(): + """Create a temporary database for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + db = Database(db_path) + db.init() + yield db + db.close() + + +@pytest.fixture +def runner(): + """Create a CLI runner.""" + return CliRunner() + + +@pytest.fixture +def isolated_runner(runner): + """Create an isolated CLI runner with temp database.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + + def invoke_with_db(*args, **kwargs): + return runner.invoke(cli, ["--db", str(db_path)] + list(args[0]), **kwargs) + + yield invoke_with_db + + +# ============================================================================= +# Unit Tests - Database Layer +# ============================================================================= + + +class TestDatabase: + """Tests for the Database class.""" + + def test_init_creates_tables(self, temp_db): + """Test that init creates required tables.""" + # Check tables exist + tables = temp_db.conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall() + table_names = [t["name"] for t in tables] + + assert "targets" in table_names + assert "measurements" in table_names + + def test_add_target(self, temp_db): + """Test adding a target.""" + target_id = temp_db.add_target("test", "example.com", "icmp") + + assert target_id is not None + assert target_id > 0 + + def test_add_target_with_port(self, temp_db): + """Test adding a TCP target with port.""" + target_id = temp_db.add_target("test", "example.com", "tcp", port=443) + + target = temp_db.get_target("test") + assert target is not None + assert target.port == 443 + assert target.probe_type == "tcp" + + def test_add_duplicate_target_raises(self, temp_db): + """Test adding duplicate target raises IntegrityError.""" + temp_db.add_target("test", "example.com", "icmp") + + with pytest.raises(sqlite3.IntegrityError): + temp_db.add_target("test", "other.com", "tcp") + + def test_get_target(self, temp_db): + """Test retrieving a target by name.""" + temp_db.add_target("mytest", "example.com", "icmp") + + target = temp_db.get_target("mytest") + + assert target is not None + assert target.name == "mytest" + assert target.host == "example.com" + assert target.probe_type == "icmp" + assert target.enabled is True + + def test_get_target_not_found(self, temp_db): + """Test retrieving non-existent target returns None.""" + target = temp_db.get_target("nonexistent") + assert target is None + + def test_get_targets(self, temp_db): + """Test retrieving all targets.""" + temp_db.add_target("test1", "example1.com", "icmp") + temp_db.add_target("test2", "example2.com", "tcp", port=80) + temp_db.add_target("test3", "example3.com", "http") + + targets = temp_db.get_targets() + + assert len(targets) == 3 + + def test_get_targets_enabled_only(self, temp_db): + """Test retrieving only enabled targets.""" + temp_db.add_target("test1", "example1.com", "icmp") + temp_db.add_target("test2", "example2.com", "icmp") + temp_db.set_target_enabled("test2", False) + + targets = temp_db.get_targets(enabled_only=True) + + assert len(targets) == 1 + assert targets[0].name == "test1" + + def test_remove_target(self, temp_db): + """Test removing a target.""" + temp_db.add_target("test", "example.com", "icmp") + + result = temp_db.remove_target("test") + + assert result is True + assert temp_db.get_target("test") is None + + def test_remove_target_not_found(self, temp_db): + """Test removing non-existent target returns False.""" + result = temp_db.remove_target("nonexistent") + assert result is False + + def test_set_target_enabled(self, temp_db): + """Test enabling/disabling a target.""" + temp_db.add_target("test", "example.com", "icmp") + + temp_db.set_target_enabled("test", False) + target = temp_db.get_target("test") + assert target.enabled is False + + temp_db.set_target_enabled("test", True) + target = temp_db.get_target("test") + assert target.enabled is True + + def test_record_measurement_success(self, temp_db): + """Test recording a successful measurement.""" + target_id = temp_db.add_target("test", "example.com", "icmp") + + temp_db.record_measurement(target_id, 25.5, success=True) + + measurements = temp_db.get_measurements(target_id) + assert len(measurements) == 1 + assert measurements[0].latency_ms == 25.5 + assert measurements[0].success is True + assert measurements[0].error_message is None + + def test_record_measurement_failure(self, temp_db): + """Test recording a failed measurement.""" + target_id = temp_db.add_target("test", "example.com", "icmp") + + temp_db.record_measurement(target_id, None, success=False, error="Timeout") + + measurements = temp_db.get_measurements(target_id) + assert len(measurements) == 1 + assert measurements[0].latency_ms is None + assert measurements[0].success is False + assert measurements[0].error_message == "Timeout" + + def test_get_measurements_limit(self, temp_db): + """Test limiting measurements returned.""" + target_id = temp_db.add_target("test", "example.com", "icmp") + + for i in range(10): + temp_db.record_measurement(target_id, float(i), success=True) + + measurements = temp_db.get_measurements(target_id, limit=5) + assert len(measurements) == 5 + + def test_get_stats(self, temp_db): + """Test calculating statistics.""" + target_id = temp_db.add_target("test", "example.com", "icmp") + + temp_db.record_measurement(target_id, 10.0, success=True) + temp_db.record_measurement(target_id, 20.0, success=True) + temp_db.record_measurement(target_id, 30.0, success=True) + + stats = temp_db.get_stats(target_id) + + assert stats.count == 3 + assert stats.avg == 20.0 + assert stats.min == 10.0 + assert stats.max == 30.0 + assert stats.loss_percent == 0.0 + + +# ============================================================================= +# Unit Tests - Statistics +# ============================================================================= + + +class TestStatistics: + """Tests for statistics calculation.""" + + def test_calculate_stats_empty(self): + """Test stats with empty list.""" + stats = calculate_stats([]) + + assert stats.count == 0 + assert stats.avg is None + assert stats.min is None + assert stats.max is None + assert stats.loss_percent == 0.0 + + def test_calculate_stats_single(self): + """Test stats with single measurement.""" + measurements = [ + Measurement( + id=1, + target_id=1, + timestamp=datetime.now(), + latency_ms=25.0, + success=True, + error_message=None, + ) + ] + + stats = calculate_stats(measurements) + + assert stats.count == 1 + assert stats.avg == 25.0 + assert stats.min == 25.0 + assert stats.max == 25.0 + assert stats.stddev == 0.0 + assert stats.loss_percent == 0.0 + + def test_calculate_stats_multiple(self): + """Test stats with multiple measurements.""" + measurements = [ + Measurement(1, 1, datetime.now(), 10.0, True, None), + Measurement(2, 1, datetime.now(), 20.0, True, None), + Measurement(3, 1, datetime.now(), 30.0, True, None), + ] + + stats = calculate_stats(measurements) + + assert stats.count == 3 + assert stats.avg == 20.0 + assert stats.min == 10.0 + assert stats.max == 30.0 + + def test_calculate_stats_with_failures(self): + """Test stats with some failed measurements.""" + measurements = [ + Measurement(1, 1, datetime.now(), 10.0, True, None), + Measurement(2, 1, datetime.now(), None, False, "Timeout"), + Measurement(3, 1, datetime.now(), 30.0, True, None), + ] + + stats = calculate_stats(measurements) + + assert stats.count == 3 + assert stats.avg == 20.0 # Average of 10 and 30 + assert abs(stats.loss_percent - 33.33) < 0.1 + + def test_calculate_stats_all_failures(self): + """Test stats when all measurements failed.""" + measurements = [ + Measurement(1, 1, datetime.now(), None, False, "Timeout"), + Measurement(2, 1, datetime.now(), None, False, "Timeout"), + ] + + stats = calculate_stats(measurements) + + assert stats.count == 2 + assert stats.avg is None + assert stats.loss_percent == 100.0 + + +class TestParsePeriod: + """Tests for period parsing.""" + + def test_parse_seconds(self): + """Test parsing seconds.""" + delta = parse_period("30s") + assert delta == timedelta(seconds=30) + + def test_parse_minutes(self): + """Test parsing minutes.""" + delta = parse_period("5m") + assert delta == timedelta(minutes=5) + + def test_parse_hours(self): + """Test parsing hours.""" + delta = parse_period("24h") + assert delta == timedelta(hours=24) + + def test_parse_days(self): + """Test parsing days.""" + delta = parse_period("7d") + assert delta == timedelta(days=7) + + def test_parse_invalid(self): + """Test invalid period returns None.""" + assert parse_period("invalid") is None + assert parse_period("") is None + assert parse_period("10x") is None + + +# ============================================================================= +# Unit Tests - Probe Methods +# ============================================================================= + + +class TestProbes: + """Tests for probe methods.""" + + def test_icmp_probe_localhost(self): + """Test ICMP probe against localhost.""" + latency, error = icmp_probe("127.0.0.1", timeout=5.0) + + # Localhost should always respond + assert latency is not None + assert latency > 0 + assert error is None + + def test_icmp_probe_invalid_host(self): + """Test ICMP probe against invalid host.""" + latency, error = icmp_probe("invalid.host.that.does.not.exist.local", timeout=2.0) + + assert latency is None + assert error is not None + + @pytest.mark.network + def test_tcp_probe_dns(self): + """Test TCP probe against known service (Cloudflare DNS).""" + latency, error = tcp_probe("1.1.1.1", 53, timeout=5.0) + + assert latency is not None + assert latency > 0 + assert error is None + + def test_tcp_probe_refused(self): + """Test TCP probe against closed port.""" + # Use a high port that's unlikely to be open + latency, error = tcp_probe("127.0.0.1", 59999, timeout=2.0) + + assert latency is None + assert error is not None + + def test_tcp_probe_invalid_host(self): + """Test TCP probe against invalid host.""" + latency, error = tcp_probe("invalid.host.local", 80, timeout=2.0) + + assert latency is None + assert error is not None + + @pytest.mark.network + def test_http_probe_success(self): + """Test HTTP probe against known endpoint.""" + # Try multiple endpoints in case one is down + endpoints = [ + "https://www.google.com", + "https://cloudflare.com", + "https://httpbin.org/get", + ] + + for url in endpoints: + latency, error = http_probe(url, timeout=10.0) + if latency is not None: + assert latency > 0 + assert error is None + return + + # If all endpoints fail, skip the test (network might be unavailable) + pytest.skip("All HTTP endpoints unreachable - network may be unavailable") + + def test_http_probe_invalid_url(self): + """Test HTTP probe against invalid URL.""" + latency, error = http_probe("https://invalid.domain.that.does.not.exist.local", timeout=2.0) + + assert latency is None + assert error is not None + + def test_probe_target_icmp(self, temp_db): + """Test probe_target with ICMP method.""" + target_id = temp_db.add_target("test", "127.0.0.1", "icmp") + target = temp_db.get_target("test") + + latency, error = probe_target(target, timeout=5.0) + + assert latency is not None or error is not None # Either should be set + + def test_probe_target_tcp_missing_port(self, temp_db): + """Test probe_target with TCP method but missing port.""" + target_id = temp_db.add_target("test", "example.com", "tcp") + target = temp_db.get_target("test") + + latency, error = probe_target(target, timeout=2.0) + + assert latency is None + assert "port" in error.lower() + + def test_probe_target_unknown_method(self, temp_db): + """Test probe_target with unknown method.""" + # Manually insert a target with invalid probe type + temp_db.conn.execute( + "INSERT INTO targets (name, host, probe_type) VALUES (?, ?, ?)", + ("test", "example.com", "invalid"), + ) + temp_db.conn.commit() + target = temp_db.get_target("test") + + latency, error = probe_target(target, timeout=2.0) + + assert latency is None + assert "unknown" in error.lower() + + +# ============================================================================= +# Integration Tests - CLI Commands +# ============================================================================= + + +class TestCLI: + """Integration tests for CLI commands.""" + + def test_add_command(self, isolated_runner): + """Test add command.""" + result = isolated_runner(["add", "test", "--host", "example.com"]) + + assert result.exit_code == 0 + assert "Added target" in result.output + assert "test" in result.output + + def test_add_tcp_requires_port(self, isolated_runner): + """Test add TCP target requires port.""" + result = isolated_runner(["add", "test", "--host", "example.com", "--method", "tcp"]) + + assert result.exit_code != 0 + assert "port" in result.output.lower() + + def test_add_tcp_with_port(self, isolated_runner): + """Test add TCP target with port.""" + result = isolated_runner(["add", "test", "--host", "example.com", "--method", "tcp", "--port", "443"]) + + assert result.exit_code == 0 + assert "Added target" in result.output + + def test_add_duplicate(self, isolated_runner): + """Test adding duplicate target fails.""" + isolated_runner(["add", "test", "--host", "example.com"]) + result = isolated_runner(["add", "test", "--host", "other.com"]) + + assert result.exit_code != 0 + assert "already exists" in result.output + + def test_list_empty(self, isolated_runner): + """Test list command with no targets.""" + result = isolated_runner(["list"]) + + assert result.exit_code == 0 + assert "No targets" in result.output + + def test_list_with_targets(self, isolated_runner): + """Test list command with targets.""" + isolated_runner(["add", "test1", "--host", "example1.com"]) + isolated_runner(["add", "test2", "--host", "example2.com", "--method", "http"]) + + result = isolated_runner(["list"]) + + assert result.exit_code == 0 + assert "test1" in result.output + assert "test2" in result.output + assert "example1.com" in result.output + assert "example2.com" in result.output + + def test_remove_command(self, isolated_runner): + """Test remove command.""" + isolated_runner(["add", "test", "--host", "example.com"]) + result = isolated_runner(["remove", "test"]) + + assert result.exit_code == 0 + assert "Removed" in result.output + + def test_remove_not_found(self, isolated_runner): + """Test remove non-existent target.""" + result = isolated_runner(["remove", "nonexistent"]) + + assert result.exit_code != 0 + assert "not found" in result.output + + def test_enable_disable(self, isolated_runner): + """Test enable/disable commands.""" + isolated_runner(["add", "test", "--host", "example.com"]) + + result = isolated_runner(["disable", "test"]) + assert result.exit_code == 0 + assert "Disabled" in result.output + + result = isolated_runner(["enable", "test"]) + assert result.exit_code == 0 + assert "Enabled" in result.output + + def test_probe_command(self, isolated_runner): + """Test probe command.""" + isolated_runner(["add", "localhost", "--host", "127.0.0.1"]) + result = isolated_runner(["probe"]) + + assert result.exit_code == 0 + # Should show either success or failure with the target name + assert "localhost" in result.output + + def test_probe_no_targets(self, isolated_runner): + """Test probe command with no targets.""" + result = isolated_runner(["probe"]) + + assert result.exit_code == 0 + assert "No enabled targets" in result.output + + def test_stats_no_data(self, isolated_runner): + """Test stats command with no measurements.""" + isolated_runner(["add", "test", "--host", "example.com"]) + result = isolated_runner(["stats"]) + + assert result.exit_code == 0 + # Should show stats table even with no data + + def test_stats_specific_target(self, isolated_runner): + """Test stats for specific target.""" + isolated_runner(["add", "test", "--host", "127.0.0.1"]) + isolated_runner(["probe"]) # Generate some data + + result = isolated_runner(["stats", "test"]) + + assert result.exit_code == 0 + assert "test" in result.output + + def test_stats_target_not_found(self, isolated_runner): + """Test stats for non-existent target.""" + result = isolated_runner(["stats", "nonexistent"]) + + assert result.exit_code != 0 + assert "not found" in result.output + + def test_graph_no_data(self, isolated_runner): + """Test graph command with no data.""" + isolated_runner(["add", "test", "--host", "example.com"]) + result = isolated_runner(["graph", "test"]) + + assert result.exit_code == 0 + assert "No measurements" in result.output + + def test_graph_target_not_found(self, isolated_runner): + """Test graph for non-existent target.""" + result = isolated_runner(["graph", "nonexistent"]) + + assert result.exit_code != 0 + assert "not found" in result.output + + def test_history_command(self, isolated_runner): + """Test history command.""" + isolated_runner(["add", "localhost", "--host", "127.0.0.1"]) + isolated_runner(["probe"]) # Generate some data + + result = isolated_runner(["history", "localhost"]) + + assert result.exit_code == 0 + # Should show history table + + def test_history_no_data(self, isolated_runner): + """Test history command with no measurements.""" + isolated_runner(["add", "test", "--host", "example.com"]) + result = isolated_runner(["history", "test"]) + + assert result.exit_code == 0 + assert "No measurements" in result.output + + +# ============================================================================= +# Main +# ============================================================================= + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/yaping.py b/yaping.py new file mode 100644 index 0000000..90db91f --- /dev/null +++ b/yaping.py @@ -0,0 +1,979 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "click>=8.1", +# "httpx>=0.27", +# "plotext>=5.2", +# "rich>=13.7", +# ] +# /// +""" +yaping - Yet Another PING + +A SmokePing-like network latency monitoring tool with CLI graphs. +Supports ICMP (via subprocess), TCP, and HTTP probes. +Stores measurements in SQLite and displays terminal graphs. + +Usage: + uv run yaping.py add google --host google.com + uv run yaping.py add cloudflare --host 1.1.1.1 --method tcp --port 443 + uv run yaping.py run + uv run yaping.py graph +""" + +from __future__ import annotations + +import os +import platform +import re +import signal +import socket +import sqlite3 +import subprocess +import sys +import time +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from statistics import mean, stdev +from typing import TYPE_CHECKING + +import click +import httpx +import plotext as plt +from rich.console import Console +from rich.table import Table + +if TYPE_CHECKING: + from collections.abc import Generator + +# ============================================================================= +# Configuration +# ============================================================================= + +DEFAULT_DB_PATH = Path.home() / ".local" / "share" / "yaping" / "yaping.db" +DEFAULT_INTERVAL = 60 # seconds +DEFAULT_TIMEOUT = 5.0 # seconds + +console = Console() + + +# ============================================================================= +# Data Classes +# ============================================================================= + + +@dataclass +class Target: + """Represents a monitoring target.""" + + id: int + name: str + host: str + probe_type: str + port: int | None + interval: int + enabled: bool + created_at: datetime + + +@dataclass +class Measurement: + """Represents a single measurement result.""" + + id: int + target_id: int + timestamp: datetime + latency_ms: float | None + success: bool + error_message: str | None + + +@dataclass +class Stats: + """Statistics for a target.""" + + count: int + avg: float | None + min: float | None + max: float | None + stddev: float | None + loss_percent: float + + +# ============================================================================= +# Database Layer +# ============================================================================= + + +class Database: + """SQLite database for storing targets and measurements.""" + + def __init__(self, db_path: str | Path) -> None: + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn: sqlite3.Connection | None = None + + @property + def conn(self) -> sqlite3.Connection: + """Get database connection, creating if necessary.""" + if self._conn is None: + self._conn = sqlite3.connect(self.db_path) + self._conn.row_factory = sqlite3.Row + return self._conn + + def init(self) -> None: + """Initialize database schema.""" + self.conn.executescript(""" + CREATE TABLE IF NOT EXISTS targets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + host TEXT NOT NULL, + probe_type TEXT NOT NULL DEFAULT 'icmp', + port INTEGER, + interval INTEGER DEFAULT 60, + enabled INTEGER DEFAULT 1, + created_at TIMESTAMP DEFAULT (DATETIME('now', 'localtime')) + ); + + CREATE TABLE IF NOT EXISTS measurements ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + target_id INTEGER NOT NULL, + timestamp TIMESTAMP DEFAULT (DATETIME('now', 'localtime')), + latency_ms REAL, + success INTEGER NOT NULL, + error_message TEXT, + FOREIGN KEY (target_id) REFERENCES targets(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_measurements_target_time + ON measurements(target_id, timestamp); + """) + self.conn.commit() + + def close(self) -> None: + """Close database connection.""" + if self._conn: + self._conn.close() + self._conn = None + + def add_target( + self, + name: str, + host: str, + probe_type: str, + port: int | None = None, + interval: int = DEFAULT_INTERVAL, + ) -> int: + """Add a new target. Returns the target ID.""" + cursor = self.conn.execute( + """ + INSERT INTO targets (name, host, probe_type, port, interval) + VALUES (?, ?, ?, ?, ?) + """, + (name, host, probe_type, port, interval), + ) + self.conn.commit() + return cursor.lastrowid # type: ignore[return-value] + + def remove_target(self, name: str) -> bool: + """Remove a target by name. Returns True if target was found.""" + cursor = self.conn.execute("DELETE FROM targets WHERE name = ?", (name,)) + self.conn.commit() + return cursor.rowcount > 0 + + def get_target(self, name: str) -> Target | None: + """Get a target by name.""" + row = self.conn.execute( + "SELECT * FROM targets WHERE name = ?", (name,) + ).fetchone() + if row: + return self._row_to_target(row) + return None + + def get_targets(self, enabled_only: bool = False) -> list[Target]: + """Get all targets.""" + query = "SELECT * FROM targets" + if enabled_only: + query += " WHERE enabled = 1" + query += " ORDER BY name" + rows = self.conn.execute(query).fetchall() + return [self._row_to_target(row) for row in rows] + + def set_target_enabled(self, name: str, enabled: bool) -> bool: + """Enable or disable a target. Returns True if target was found.""" + cursor = self.conn.execute( + "UPDATE targets SET enabled = ? WHERE name = ?", (int(enabled), name) + ) + self.conn.commit() + return cursor.rowcount > 0 + + def record_measurement( + self, + target_id: int, + latency_ms: float | None, + success: bool, + error: str | None = None, + ) -> None: + """Record a measurement result.""" + self.conn.execute( + """ + INSERT INTO measurements (target_id, latency_ms, success, error_message) + VALUES (?, ?, ?, ?) + """, + (target_id, latency_ms, int(success), error), + ) + self.conn.commit() + + def get_measurements( + self, + target_id: int, + period: str | None = None, + limit: int | None = None, + ) -> list[Measurement]: + """Get measurements for a target, optionally filtered by time period.""" + query = "SELECT * FROM measurements WHERE target_id = ?" + params: list = [target_id] + + if period: + delta = parse_period(period) + if delta: + cutoff = datetime.now() - delta + query += " AND timestamp >= ?" + # SQLite CURRENT_TIMESTAMP uses space separator, not 'T' + params.append(cutoff.strftime("%Y-%m-%d %H:%M:%S")) + + query += " ORDER BY timestamp DESC" + + if limit: + query += " LIMIT ?" + params.append(limit) + + rows = self.conn.execute(query, params).fetchall() + return [self._row_to_measurement(row) for row in rows] + + def get_stats(self, target_id: int, period: str | None = None) -> Stats: + """Calculate statistics for a target.""" + measurements = self.get_measurements(target_id, period) + return calculate_stats(measurements) + + def _row_to_target(self, row: sqlite3.Row) -> Target: + """Convert a database row to a Target object.""" + created = row["created_at"] + if isinstance(created, str): + created = datetime.fromisoformat(created) + return Target( + id=row["id"], + name=row["name"], + host=row["host"], + probe_type=row["probe_type"], + port=row["port"], + interval=row["interval"], + enabled=bool(row["enabled"]), + created_at=created, + ) + + def _row_to_measurement(self, row: sqlite3.Row) -> Measurement: + """Convert a database row to a Measurement object.""" + ts = row["timestamp"] + if isinstance(ts, str): + ts = datetime.fromisoformat(ts) + return Measurement( + id=row["id"], + target_id=row["target_id"], + timestamp=ts, + latency_ms=row["latency_ms"], + success=bool(row["success"]), + error_message=row["error_message"], + ) + + +# ============================================================================= +# Probe Methods +# ============================================================================= + + +def icmp_probe(host: str, timeout: float = DEFAULT_TIMEOUT) -> tuple[float | None, str | None]: + """ + Perform ICMP ping using system ping command. + Returns (latency_ms, error_message). + """ + system = platform.system().lower() + + if system == "windows": + cmd = ["ping", "-n", "1", "-w", str(int(timeout * 1000)), host] + pattern = r"time[=<](\d+(?:\.\d+)?)\s*ms" + elif system == "darwin": # macOS + cmd = ["ping", "-c", "1", "-W", str(int(timeout * 1000)), host] + pattern = r"time=(\d+(?:\.\d+)?)\s*ms" + else: # Linux and others + cmd = ["ping", "-c", "1", "-W", str(int(timeout)), host] + pattern = r"time=(\d+(?:\.\d+)?)\s*ms" + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + 2, + ) + + if result.returncode == 0: + match = re.search(pattern, result.stdout, re.IGNORECASE) + if match: + return float(match.group(1)), None + return None, "Could not parse ping output" + return None, f"Ping failed: {result.stderr.strip() or 'No response'}" + + except subprocess.TimeoutExpired: + return None, "Ping timed out" + except FileNotFoundError: + return None, "ping command not found" + except Exception as e: + return None, str(e) + + +def tcp_probe( + host: str, port: int, timeout: float = DEFAULT_TIMEOUT +) -> tuple[float | None, str | None]: + """ + Perform TCP connect probe. + Returns (latency_ms, error_message). + """ + start = time.perf_counter() + try: + sock = socket.create_connection((host, port), timeout=timeout) + sock.close() + latency = (time.perf_counter() - start) * 1000 + return latency, None + except socket.timeout: + return None, "Connection timed out" + except ConnectionRefusedError: + return None, "Connection refused" + except socket.gaierror as e: + return None, f"DNS resolution failed: {e}" + except OSError as e: + return None, str(e) + + +def http_probe( + url: str, timeout: float = DEFAULT_TIMEOUT +) -> tuple[float | None, str | None]: + """ + Perform HTTP/HTTPS probe. + Returns (latency_ms, error_message). + """ + # Ensure URL has scheme + if not url.startswith(("http://", "https://")): + url = f"https://{url}" + + start = time.perf_counter() + try: + with httpx.Client(timeout=timeout, follow_redirects=True) as client: + response = client.get(url) + response.raise_for_status() + latency = (time.perf_counter() - start) * 1000 + return latency, None + except httpx.TimeoutException: + return None, "Request timed out" + except httpx.HTTPStatusError as e: + return None, f"HTTP {e.response.status_code}" + except httpx.RequestError as e: + return None, str(e) + + +def probe_target(target: Target, timeout: float = DEFAULT_TIMEOUT) -> tuple[float | None, str | None]: + """Probe a target using its configured method.""" + if target.probe_type == "icmp": + return icmp_probe(target.host, timeout) + elif target.probe_type == "tcp": + if target.port is None: + return None, "TCP probe requires a port" + return tcp_probe(target.host, target.port, timeout) + elif target.probe_type == "http": + return http_probe(target.host, timeout) + else: + return None, f"Unknown probe type: {target.probe_type}" + + +# ============================================================================= +# Statistics +# ============================================================================= + + +def parse_period(period: str) -> timedelta | None: + """Parse a period string like '1h', '24h', '7d' into a timedelta.""" + match = re.match(r"^(\d+)([smhd])$", period.lower()) + if not match: + return None + + value = int(match.group(1)) + unit = match.group(2) + + if unit == "s": + return timedelta(seconds=value) + elif unit == "m": + return timedelta(minutes=value) + elif unit == "h": + return timedelta(hours=value) + elif unit == "d": + return timedelta(days=value) + return None + + +def calculate_stats(measurements: list[Measurement]) -> Stats: + """Calculate statistics from a list of measurements.""" + if not measurements: + return Stats( + count=0, + avg=None, + min=None, + max=None, + stddev=None, + loss_percent=0.0, + ) + + total = len(measurements) + successful = [m for m in measurements if m.success and m.latency_ms is not None] + latencies = [m.latency_ms for m in successful if m.latency_ms is not None] + + loss_percent = ((total - len(successful)) / total) * 100 if total > 0 else 0.0 + + if not latencies: + return Stats( + count=total, + avg=None, + min=None, + max=None, + stddev=None, + loss_percent=loss_percent, + ) + + return Stats( + count=total, + avg=mean(latencies), + min=min(latencies), + max=max(latencies), + stddev=stdev(latencies) if len(latencies) > 1 else 0.0, + loss_percent=loss_percent, + ) + + +# ============================================================================= +# Graphing +# ============================================================================= + + +def draw_latency_graph( + measurements: list[Measurement], + title: str = "Latency", + width: int | None = None, + height: int | None = None, +) -> None: + """Draw a terminal graph of latency over time.""" + # Filter successful measurements with latency + valid = [(m.timestamp, m.latency_ms) for m in measurements if m.success and m.latency_ms is not None] + + if not valid: + console.print("[yellow]No data to graph[/yellow]") + return + + # Sort by timestamp (oldest first for proper graph) + valid.sort(key=lambda x: x[0]) + + timestamps = [m[0] for m in valid] + latencies = [m[1] for m in valid] + + # Convert timestamps to relative minutes from start + start_time = timestamps[0] + x_values = [(t - start_time).total_seconds() / 60 for t in timestamps] + + plt.clear_figure() + + if width: + plt.plot_size(width, height or 15) + + plt.plot(x_values, latencies, marker="braille") + plt.title(title) + plt.xlabel("Minutes ago") + plt.ylabel("Latency (ms)") + + # Add statistics in the plot + stats = calculate_stats(measurements) + if stats.avg is not None: + plt.hline(stats.avg, "blue") + + plt.show() + + +def draw_multi_target_graph( + db: Database, + targets: list[Target], + period: str = "1h", + width: int | None = None, + height: int | None = None, +) -> None: + """Draw a comparison graph for multiple targets.""" + plt.clear_figure() + + if width: + plt.plot_size(width, height or 20) + + colors = ["red", "green", "blue", "yellow", "cyan", "magenta"] + labels = [] + + for i, target in enumerate(targets): + measurements = db.get_measurements(target.id, period) + valid = [(m.timestamp, m.latency_ms) for m in measurements if m.success and m.latency_ms is not None] + + if not valid: + continue + + valid.sort(key=lambda x: x[0]) + timestamps = [m[0] for m in valid] + latencies = [m[1] for m in valid] + + # Convert to relative minutes + if timestamps: + now = datetime.now() + x_values = [(now - t).total_seconds() / 60 for t in timestamps] + color = colors[i % len(colors)] + plt.plot(x_values, latencies, marker="braille", color=color, label=target.name) + stats = calculate_stats(measurements) + labels.append(f"{target.name} (avg: {stats.avg:.1f}ms)" if stats.avg else target.name) + + if labels: + plt.title(f"Latency Comparison (last {period})") + plt.xlabel("Minutes ago") + plt.ylabel("Latency (ms)") + plt.show() + else: + console.print("[yellow]No data to graph[/yellow]") + + +# ============================================================================= +# CLI Commands +# ============================================================================= + + +@contextmanager +def get_db(db_path: str | None = None) -> Generator[Database, None, None]: + """Context manager for database access.""" + path = Path(db_path) if db_path else DEFAULT_DB_PATH + db = Database(path) + db.init() + try: + yield db + finally: + db.close() + + +@click.group() +@click.option( + "--db", + "db_path", + envvar="YAPING_DB", + help="Path to SQLite database file", +) +@click.pass_context +def cli(ctx: click.Context, db_path: str | None) -> None: + """yaping - Yet Another PING - Network latency monitoring with CLI graphs.""" + ctx.ensure_object(dict) + ctx.obj["db_path"] = db_path + + +@cli.command() +@click.argument("name") +@click.option("--host", "-h", required=True, help="Target host or URL") +@click.option( + "--method", + "-m", + type=click.Choice(["icmp", "tcp", "http"]), + default="icmp", + help="Probe method", +) +@click.option("--port", "-p", type=int, help="Port for TCP probe") +@click.option("--interval", "-i", type=int, default=DEFAULT_INTERVAL, help="Probe interval in seconds") +@click.pass_context +def add(ctx: click.Context, name: str, host: str, method: str, port: int | None, interval: int) -> None: + """Add a new target to monitor.""" + if method == "tcp" and port is None: + raise click.BadParameter("--port is required for TCP probe method") + + with get_db(ctx.obj["db_path"]) as db: + try: + db.add_target(name, host, method, port, interval) + port_str = f":{port}" if port else "" + console.print(f"[green]✓[/green] Added target '[bold]{name}[/bold]' ({method} → {host}{port_str})") + except sqlite3.IntegrityError: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' already exists") + sys.exit(1) + + +@cli.command() +@click.argument("name") +@click.pass_context +def remove(ctx: click.Context, name: str) -> None: + """Remove a target.""" + with get_db(ctx.obj["db_path"]) as db: + if db.remove_target(name): + console.print(f"[green]✓[/green] Removed target '[bold]{name}[/bold]'") + else: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' not found") + sys.exit(1) + + +@cli.command("list") +@click.pass_context +def list_targets(ctx: click.Context) -> None: + """List all targets.""" + with get_db(ctx.obj["db_path"]) as db: + targets = db.get_targets() + + if not targets: + console.print("[dim]No targets configured. Use 'add' to add one.[/dim]") + return + + table = Table(title="Monitoring Targets") + table.add_column("Name", style="cyan") + table.add_column("Host") + table.add_column("Method") + table.add_column("Interval") + table.add_column("Status") + + for target in targets: + host = target.host + if target.port: + host += f":{target.port}" + status = "[green]enabled[/green]" if target.enabled else "[dim]disabled[/dim]" + table.add_row( + target.name, + host, + target.probe_type, + f"{target.interval}s", + status, + ) + + console.print(table) + + +@cli.command() +@click.argument("name") +@click.pass_context +def enable(ctx: click.Context, name: str) -> None: + """Enable a target.""" + with get_db(ctx.obj["db_path"]) as db: + if db.set_target_enabled(name, True): + console.print(f"[green]✓[/green] Enabled target '[bold]{name}[/bold]'") + else: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' not found") + sys.exit(1) + + +@cli.command() +@click.argument("name") +@click.pass_context +def disable(ctx: click.Context, name: str) -> None: + """Disable a target.""" + with get_db(ctx.obj["db_path"]) as db: + if db.set_target_enabled(name, False): + console.print(f"[green]✓[/green] Disabled target '[bold]{name}[/bold]'") + else: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' not found") + sys.exit(1) + + +@cli.command() +@click.option("--timeout", "-t", type=float, default=DEFAULT_TIMEOUT, help="Probe timeout in seconds") +@click.pass_context +def probe(ctx: click.Context, timeout: float) -> None: + """Run a single probe for all enabled targets.""" + with get_db(ctx.obj["db_path"]) as db: + targets = db.get_targets(enabled_only=True) + + if not targets: + console.print("[dim]No enabled targets. Use 'add' to add one.[/dim]") + return + + for target in targets: + latency, error = probe_target(target, timeout) + success = latency is not None + + db.record_measurement(target.id, latency, success, error) + + timestamp = datetime.now().strftime("%H:%M:%S") + if success: + console.print(f"[dim]{timestamp}[/dim] [cyan]{target.name}[/cyan]: [green]{latency:.1f}ms[/green] ✓") + else: + console.print(f"[dim]{timestamp}[/dim] [cyan]{target.name}[/cyan]: [red]{error}[/red] ✗") + + +@cli.command() +@click.option("--interval", "-i", type=int, help="Override probe interval (seconds)") +@click.option("--timeout", "-t", type=float, default=DEFAULT_TIMEOUT, help="Probe timeout in seconds") +@click.pass_context +def run(ctx: click.Context, interval: int | None, timeout: float) -> None: + """Run continuous monitoring (Ctrl+C to stop).""" + running = True + + def handle_signal(signum: int, frame: object) -> None: + nonlocal running + running = False + console.print("\n[yellow]Stopping...[/yellow]") + + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + with get_db(ctx.obj["db_path"]) as db: + targets = db.get_targets(enabled_only=True) + + if not targets: + console.print("[dim]No enabled targets. Use 'add' to add one.[/dim]") + return + + console.print(f"[bold]Starting monitoring of {len(targets)} target(s)...[/bold]") + console.print("[dim]Press Ctrl+C to stop[/dim]\n") + + # Track next probe time for each target + next_probe = {t.id: time.time() for t in targets} + + while running: + now = time.time() + + for target in targets: + target_interval = interval if interval else target.interval + + if now >= next_probe[target.id]: + latency, error = probe_target(target, timeout) + success = latency is not None + + db.record_measurement(target.id, latency, success, error) + + timestamp = datetime.now().strftime("%H:%M:%S") + if success: + console.print(f"[dim]{timestamp}[/dim] [cyan]{target.name}[/cyan]: [green]{latency:.1f}ms[/green] ✓") + else: + console.print(f"[dim]{timestamp}[/dim] [cyan]{target.name}[/cyan]: [red]{error}[/red] ✗") + + next_probe[target.id] = now + target_interval + + # Sleep briefly to avoid busy-waiting + time.sleep(0.1) + + +@cli.command() +@click.argument("name", required=False) +@click.option("--period", "-p", default="1h", help="Time period (e.g., 1h, 24h, 7d)") +@click.pass_context +def stats(ctx: click.Context, name: str | None, period: str) -> None: + """Show statistics for targets.""" + with get_db(ctx.obj["db_path"]) as db: + if name: + target = db.get_target(name) + if not target: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' not found") + sys.exit(1) + targets = [target] + else: + targets = db.get_targets() + + if not targets: + console.print("[dim]No targets configured.[/dim]") + return + + table = Table(title=f"Statistics (last {period})") + table.add_column("Target", style="cyan") + table.add_column("Avg", justify="right") + table.add_column("Min", justify="right") + table.add_column("Max", justify="right") + table.add_column("StdDev", justify="right") + table.add_column("Loss", justify="right") + table.add_column("Samples", justify="right") + + for target in targets: + target_stats = db.get_stats(target.id, period) + + def fmt_ms(val: float | None) -> str: + return f"{val:.1f}ms" if val is not None else "-" + + loss_style = "green" if target_stats.loss_percent < 1 else "yellow" if target_stats.loss_percent < 5 else "red" + + table.add_row( + target.name, + fmt_ms(target_stats.avg), + fmt_ms(target_stats.min), + fmt_ms(target_stats.max), + fmt_ms(target_stats.stddev), + f"[{loss_style}]{target_stats.loss_percent:.1f}%[/{loss_style}]", + str(target_stats.count), + ) + + console.print(table) + + +@cli.command() +@click.argument("name", required=False) +@click.option("--period", "-p", default="1h", help="Time period (e.g., 1h, 24h, 7d)") +@click.option("--width", "-w", type=int, help="Graph width") +@click.option("--height", "-H", type=int, help="Graph height") +@click.pass_context +def graph(ctx: click.Context, name: str | None, period: str, width: int | None, height: int | None) -> None: + """Display latency graph.""" + with get_db(ctx.obj["db_path"]) as db: + if name: + target = db.get_target(name) + if not target: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' not found") + sys.exit(1) + + measurements = db.get_measurements(target.id, period) + if not measurements: + console.print(f"[yellow]No measurements for '{name}' in the last {period}[/yellow]") + return + + stats_data = calculate_stats(measurements) + draw_latency_graph( + measurements, + title=f"Latency - {name} (last {period})", + width=width, + height=height, + ) + + # Print summary below graph + if stats_data.avg is not None: + console.print( + f"\n[dim]Statistics:[/dim] avg=[green]{stats_data.avg:.1f}ms[/green], " + f"min=[cyan]{stats_data.min:.1f}ms[/cyan], " + f"max=[yellow]{stats_data.max:.1f}ms[/yellow], " + f"loss=[{'red' if stats_data.loss_percent > 0 else 'green'}]{stats_data.loss_percent:.1f}%[/]" + ) + else: + targets = db.get_targets() + if not targets: + console.print("[dim]No targets configured.[/dim]") + return + + draw_multi_target_graph(db, targets, period, width, height) + + +@cli.command() +@click.argument("name") +@click.option("--period", "-p", default="1h", help="Time period (e.g., 1h, 24h, 7d)") +@click.option("--limit", "-l", type=int, help="Limit number of results") +@click.pass_context +def history(ctx: click.Context, name: str, period: str, limit: int | None) -> None: + """Show measurement history for a target.""" + with get_db(ctx.obj["db_path"]) as db: + target = db.get_target(name) + if not target: + console.print(f"[red]✗[/red] Target '[bold]{name}[/bold]' not found") + sys.exit(1) + + measurements = db.get_measurements(target.id, period, limit) + if not measurements: + console.print(f"[yellow]No measurements for '{name}' in the last {period}[/yellow]") + return + + table = Table(title=f"History - {name} (last {period})") + table.add_column("Timestamp") + table.add_column("Latency", justify="right") + table.add_column("Status") + + for m in measurements[:50]: # Limit displayed rows + if m.success and m.latency_ms is not None: + status = "[green]✓[/green]" + latency = f"{m.latency_ms:.1f}ms" + else: + status = "[red]✗[/red]" + latency = m.error_message or "Failed" + + table.add_row( + m.timestamp.strftime("%Y-%m-%d %H:%M:%S"), + latency, + status, + ) + + console.print(table) + + if len(measurements) > 50: + console.print(f"[dim]Showing 50 of {len(measurements)} measurements[/dim]") + + +# ============================================================================= +# Config File Support +# ============================================================================= + + +def load_config(config_path: Path) -> dict | None: + """Load configuration from TOML file.""" + if not config_path.exists(): + return None + + try: + # Use tomllib in Python 3.11+ + import tomllib + + with open(config_path, "rb") as f: + return tomllib.load(f) + except ImportError: + # Fallback for older Python (shouldn't happen with requires-python >= 3.11) + console.print("[yellow]Warning: tomllib not available, config file ignored[/yellow]") + return None + except Exception as e: + console.print(f"[yellow]Warning: Failed to load config: {e}[/yellow]") + return None + + +@cli.command() +@click.argument("config_file", type=click.Path(exists=True), required=False) +@click.pass_context +def import_config(ctx: click.Context, config_file: str | None) -> None: + """Import targets from a TOML configuration file.""" + config_path = Path(config_file) if config_file else Path("yaping.toml") + + if not config_path.exists(): + console.print(f"[red]✗[/red] Config file '{config_path}' not found") + sys.exit(1) + + config = load_config(config_path) + if not config: + console.print("[red]✗[/red] Failed to parse config file") + sys.exit(1) + + targets = config.get("targets", []) + if not targets: + console.print("[yellow]No targets found in config file[/yellow]") + return + + with get_db(ctx.obj["db_path"]) as db: + added = 0 + for target_config in targets: + name = target_config.get("name") + host = target_config.get("host") + method = target_config.get("method", "icmp") + port = target_config.get("port") + interval = target_config.get("interval", DEFAULT_INTERVAL) + + if not name or not host: + console.print(f"[yellow]Skipping invalid target: {target_config}[/yellow]") + continue + + try: + db.add_target(name, host, method, port, interval) + console.print(f"[green]✓[/green] Added '{name}'") + added += 1 + except sqlite3.IntegrityError: + console.print(f"[yellow]⚠[/yellow] Target '{name}' already exists, skipping") + + console.print(f"\n[bold]Imported {added} target(s)[/bold]") + + +# ============================================================================= +# Entry Point +# ============================================================================= + + +if __name__ == "__main__": + cli() diff --git a/yaping.toml b/yaping.toml new file mode 100644 index 0000000..ccb242f --- /dev/null +++ b/yaping.toml @@ -0,0 +1,40 @@ +# yaping - Sample Configuration File +# Import with: uv run yaping.py import-config yaping.toml + +[defaults] +interval = 60 # Default probe interval in seconds +timeout = 5 # Default timeout in seconds + +# Example targets - uncomment and modify as needed + +[[targets]] +name = "google-dns" +host = "8.8.8.8" +method = "icmp" +interval = 30 + +[[targets]] +name = "cloudflare-dns" +host = "1.1.1.1" +method = "tcp" +port = 53 +interval = 30 + +[[targets]] +name = "cloudflare-https" +host = "1.1.1.1" +method = "tcp" +port = 443 +interval = 60 + +# [[targets]] +# name = "github-api" +# host = "https://api.github.com" +# method = "http" +# interval = 120 + +# [[targets]] +# name = "httpbin" +# host = "https://httpbin.org/get" +# method = "http" +# interval = 60