109 lines
4.5 KiB
Python
109 lines
4.5 KiB
Python
import socket
|
|
import time
|
|
import urllib.request
|
|
from .config import PROXY_PORT
|
|
|
|
def measure_tcp_latency(host: str, port: int, timeout: float = 2.0) -> int:
|
|
"""Measure TCP latency to a host:port in milliseconds"""
|
|
start_time = time.time()
|
|
try:
|
|
with socket.create_connection((host, port), timeout=timeout):
|
|
latency = (time.time() - start_time) * 1000
|
|
return int(latency)
|
|
except Exception:
|
|
return -1
|
|
|
|
|
|
def measure_proxy_performance(enable_speed_test: bool = False) -> dict:
|
|
"""Measure proxy latency, speed and public IP via local proxy"""
|
|
proxy_url = f"http://127.0.0.1:{PROXY_PORT}"
|
|
proxies = {"http": proxy_url, "https": proxy_url}
|
|
|
|
# 1. Measure Latency (Ping)
|
|
latency = "Timeout"
|
|
try:
|
|
start_time = time.time()
|
|
# Use a reliable endpoint for ping
|
|
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
|
|
req = urllib.request.Request("http://www.gstatic.com/generate_204", headers={"User-Agent": "singbox-test"})
|
|
with opener.open(req, timeout=5) as response:
|
|
lat_ms = int((time.time() - start_time) * 1000)
|
|
latency = f"{lat_ms}ms"
|
|
except Exception as e:
|
|
latency = "Error"
|
|
|
|
# 2. Get Public IP (IPv4)
|
|
ip = "Unknown"
|
|
try:
|
|
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
|
|
# Use v4.ident.me to force IPv4
|
|
req = urllib.request.Request("http://v4.ident.me", headers={"User-Agent": "curl/7.68.0"})
|
|
with opener.open(req, timeout=5) as response:
|
|
ip = response.read().decode('utf-8').strip()
|
|
except Exception:
|
|
# Fallback to ipify if ident.me fails or returns garbage
|
|
try:
|
|
req = urllib.request.Request("http://api.ipify.org", headers={"User-Agent": "curl/7.68.0"})
|
|
with opener.open(req, timeout=5) as response:
|
|
ip = response.read().decode('utf-8').strip()
|
|
except Exception:
|
|
pass
|
|
|
|
# 3. Measure Download Speed
|
|
speed_mbps = 0.0
|
|
if enable_speed_test:
|
|
test_files = [
|
|
# Tele2 Speedtest (Usually very reliable and fast)
|
|
("https://speedtest.selectel.ru/100MB", 100),
|
|
# ThinkBroadband (Reliable backup)
|
|
("https://speedtest.selectel.ru/1GB", 1000)
|
|
]
|
|
|
|
for url, size_mb in test_files:
|
|
try:
|
|
print(f"[WebUI] Testing speed with: {url}")
|
|
start_time = time.time()
|
|
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
|
|
# Set a longer timeout for speed tests
|
|
with opener.open(url, timeout=30) as response:
|
|
downloaded = 0
|
|
# Larger chunk size for better throughput measurement
|
|
chunk_size = 1024 * 256 # 256KB chunks
|
|
|
|
# Download for at least 2 seconds or up to 25MB for accurate measurement
|
|
min_test_duration = 2.0 # seconds
|
|
max_download_bytes = 25 * 1024 * 1024 # 25MB
|
|
|
|
while True:
|
|
chunk = response.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
downloaded += len(chunk)
|
|
|
|
elapsed = time.time() - start_time
|
|
# Stop if we've downloaded enough AND tested for minimum duration
|
|
if downloaded >= max_download_bytes or (elapsed >= min_test_duration and downloaded >= 2 * 1024 * 1024):
|
|
break
|
|
|
|
duration = time.time() - start_time
|
|
if duration > 0.1 and downloaded > 0:
|
|
# Calculate speed in Mbps (megabits per second)
|
|
# downloaded bytes * 8 bits/byte / 1,000,000 / seconds
|
|
speed_mbps = round((downloaded * 8) / (1000 * 1000) / duration, 1)
|
|
print(f"[WebUI] Speed test: downloaded {downloaded / (1024*1024):.1f}MB in {duration:.1f}s = {speed_mbps} Mbps")
|
|
break # Stop if successful
|
|
except Exception as e:
|
|
print(f"[WebUI] Speed test failed for {url}: {e}")
|
|
continue
|
|
|
|
result = {
|
|
"latency": latency,
|
|
"ip": ip
|
|
}
|
|
|
|
if enable_speed_test:
|
|
# If speed is still 0.0 but we tried, return Error or 0.0
|
|
result["speed"] = f"{speed_mbps} Mbps"
|
|
|
|
return result
|