Files
vpn-proxy/web/server.py

683 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Simple HTTP Web Server for VPN Proxy Control
Provides a web UI to manage sing-box subscriptions
"""
import http.server
import json
import os
import platform
import socketserver
import urllib.request
import urllib.error
import uuid
import socket
import time
from urllib.parse import parse_qs, unquote
from pathlib import Path
PORT = int(os.environ.get("PORT", 3456))
PROXY_PORT = int(os.environ.get("PROXY_PORT", 8080))
APP_NAME = "VPN-Proxy-Control by Dokril"
APP_DIR = Path(__file__).parent
BASE_DIR = APP_DIR.parent
WEB_DIR = APP_DIR
DATA_DIR = BASE_DIR / "data"
CONFIG_FILE = DATA_DIR / "client.json"
HWID_FILE = DATA_DIR / "hwid"
SUBSCRIPTION_FILE = DATA_DIR / "subscription.json"
def get_hwid() -> str:
"""Get or generate hardware ID"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
if HWID_FILE.exists():
return HWID_FILE.read_text().strip()
# Generate new random HWID
hwid = uuid.uuid4().hex[:16]
HWID_FILE.write_text(hwid)
return hwid
def get_system_info() -> dict:
"""Get system information for headers"""
system = platform.system().lower() # windows, linux, darwin
version = platform.release() # 10, 5.15.0, 22.0.0
return {
"os": system,
"version": version
}
def save_subscription(url: str, selected_server: str = None, user_info: dict = None):
"""Save subscription URL, selected server and user info to file"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
data = {
"url": url,
"selectedServer": selected_server,
"userInfo": user_info
}
SUBSCRIPTION_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2))
def load_subscription() -> dict:
"""Load subscription from file"""
if SUBSCRIPTION_FILE.exists():
try:
return json.loads(SUBSCRIPTION_FILE.read_text())
except json.JSONDecodeError:
pass
return None
def measure_tcp_latency(host: str, port: int, timeout: float = 2.0) -> int:
"""Measure TCP latency to a host:port in milliseconds"""
start_time = time.time()
try:
with socket.create_connection((host, port), timeout=timeout):
latency = (time.time() - start_time) * 1000
return int(latency)
except Exception:
return -1
def measure_proxy_performance(enable_speed_test: bool = False) -> dict:
"""Measure proxy latency, speed and public IP via local proxy"""
proxy_url = f"http://127.0.0.1:{PROXY_PORT}"
proxies = {"http": proxy_url, "https": proxy_url}
# 1. Measure Latency (Ping)
latency = "Timeout"
try:
start_time = time.time()
# Use a reliable endpoint for ping
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
req = urllib.request.Request("http://www.gstatic.com/generate_204", headers={"User-Agent": "singbox-test"})
with opener.open(req, timeout=5) as response:
lat_ms = int((time.time() - start_time) * 1000)
latency = f"{lat_ms}ms"
except Exception as e:
latency = "Error"
# 2. Get Public IP (IPv4)
ip = "Unknown"
try:
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
# Use v4.ident.me to force IPv4
req = urllib.request.Request("http://v4.ident.me", headers={"User-Agent": "curl/7.68.0"})
with opener.open(req, timeout=5) as response:
ip = response.read().decode('utf-8').strip()
except Exception:
# Fallback to ipify if ident.me fails or returns garbage
try:
req = urllib.request.Request("http://api.ipify.org", headers={"User-Agent": "curl/7.68.0"})
with opener.open(req, timeout=5) as response:
ip = response.read().decode('utf-8').strip()
except Exception:
pass
# 3. Measure Download Speed
speed_mbps = 0.0
if enable_speed_test:
test_files = [
# Tele2 Speedtest (Usually very reliable and fast)
("https://speedtest.selectel.ru/100MB", 100),
# ThinkBroadband (Reliable backup)
("https://speedtest.selectel.ru/1GB", 1000)
]
for url, size_mb in test_files:
try:
print(f"[WebUI] Testing speed with: {url}")
start_time = time.time()
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
# Set a longer timeout for speed tests
with opener.open(url, timeout=30) as response:
downloaded = 0
# Larger chunk size for better throughput measurement
chunk_size = 1024 * 256 # 256KB chunks
# Download for at least 2 seconds or up to 25MB for accurate measurement
min_test_duration = 2.0 # seconds
max_download_bytes = 25 * 1024 * 1024 # 25MB
while True:
chunk = response.read(chunk_size)
if not chunk:
break
downloaded += len(chunk)
elapsed = time.time() - start_time
# Stop if we've downloaded enough AND tested for minimum duration
if downloaded >= max_download_bytes or (elapsed >= min_test_duration and downloaded >= 2 * 1024 * 1024):
break
duration = time.time() - start_time
if duration > 0.1 and downloaded > 0:
# Calculate speed in Mbps (megabits per second)
# downloaded bytes * 8 bits/byte / 1,000,000 / seconds
speed_mbps = round((downloaded * 8) / (1000 * 1000) / duration, 1)
print(f"[WebUI] Speed test: downloaded {downloaded / (1024*1024):.1f}MB in {duration:.1f}s = {speed_mbps} Mbps")
break # Stop if successful
except Exception as e:
print(f"[WebUI] Speed test failed for {url}: {e}")
continue
result = {
"latency": latency,
"ip": ip
}
if enable_speed_test:
# If speed is still 0.0 but we tried, return Error or 0.0
result["speed"] = f"{speed_mbps} Mbps"
return result
def parse_vless_url(url: str) -> dict:
"""Parse VLESS URL and extract connection parameters"""
if not url.startswith("vless://"):
raise ValueError("URL must start with vless://")
# Remove scheme
url_no_scheme = url[8:]
# Split by fragment (#tag)
if '#' in url_no_scheme:
url_part, tag = url_no_scheme.split('#', 1)
tag = unquote(tag)
else:
url_part = url_no_scheme
tag = "reality"
# Split by query (?)
if '?' in url_part:
uuid_host_port, query_string = url_part.split('?', 1)
else:
raise ValueError("Missing query parameters")
# Parse UUID@host:port
if '@' not in uuid_host_port:
raise ValueError("Missing @ separator")
uuid_str, host_port = uuid_host_port.split('@', 1)
if ':' not in host_port:
raise ValueError("Missing port")
host, port_str = host_port.rsplit(':', 1)
port = int(port_str)
# Parse query parameters
params = {}
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
params[key] = unquote(value)
# Extract required parameters
pbk = params.get('pbk', '')
sid = params.get('sid', '')
sni = params.get('sni', host)
fp = params.get('fp', 'chrome')
flow = params.get('flow', '')
if not pbk or not sid:
raise ValueError("Missing required parameters: pbk or sid")
return {
'uuid': uuid_str,
'server': host,
'server_port': port,
'tag': tag,
'public_key': pbk,
'short_id': sid,
'server_name': sni,
'fingerprint': fp,
'flow': flow
}
def generate_vless_config(vless_params: dict) -> dict:
"""Generate sing-box configuration from VLESS parameters"""
config = {
"dns": {
"independent_cache": True
},
"log": {
"level": "debug",
"disabled": True,
"timestamp": True
},
"route": {
"final": vless_params['tag'],
"auto_detect_interface": True
},
"inbounds": [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
],
"outbounds": [
{
"type": "vless",
"tag": vless_params['tag'],
"server": vless_params['server'],
"server_port": vless_params['server_port'],
"flow": vless_params['flow'],
"tls": {
"enabled": True,
"server_name": vless_params['server_name'],
"reality": {
"enabled": True,
"public_key": vless_params['public_key'],
"short_id": vless_params['short_id']
},
"utls": {
"enabled": True,
"fingerprint": vless_params['fingerprint']
}
},
"uuid": vless_params['uuid']
},
{
"tag": "direct",
"type": "direct"
}
]
}
return config
class ThreadingHTTPServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
class ProxyControlHandler(http.server.BaseHTTPRequestHandler):
"""HTTP Request Handler for Proxy Control"""
def log_message(self, format, *args):
"""Override to add timestamp prefix"""
print(f"[WebUI] {args[0]}")
def send_json(self, data: dict, status: int = 200):
"""Send JSON response"""
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))
def send_html(self, content: bytes):
"""Send HTML response"""
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(content)
def do_GET(self):
"""Handle GET requests"""
if self.path == "/" or self.path == "/index.html":
self.serve_index()
elif self.path == "/status":
self.get_status()
elif self.path == "/subscription":
self.get_subscription()
elif self.path.startswith("/test-connection"):
self.test_connection()
elif self.path.startswith("/static/"):
self.serve_static()
else:
self.send_error(404)
def do_POST(self):
"""Handle POST requests"""
if self.path == "/apply":
self.apply_config()
elif self.path == "/fetch-subscription":
self.fetch_subscription()
elif self.path == "/apply-subscription":
self.apply_subscription()
elif self.path == "/ping-target":
self.ping_target()
else:
self.send_error(404)
def serve_index(self):
"""Serve main HTML page"""
index_path = WEB_DIR / "index.html"
if index_path.exists():
self.send_html(index_path.read_bytes())
else:
self.send_error(404, "index.html not found")
def serve_static(self):
"""Serve static files"""
file_path = WEB_DIR / self.path[8:] # Remove /static/
if file_path.exists() and file_path.is_file():
content_type = "text/css" if str(file_path).endswith(".css") else "application/javascript"
self.send_response(200)
self.send_header("Content-Type", content_type)
self.end_headers()
self.wfile.write(file_path.read_bytes())
else:
self.send_error(404)
def get_status(self):
"""Get current proxy status"""
config_exists = CONFIG_FILE.exists()
current_tag = None
current_server = None
if config_exists:
try:
config = json.loads(CONFIG_FILE.read_text())
for outbound in config.get("outbounds", []):
if outbound.get("type") == "vless":
current_tag = outbound.get("tag", "unknown")
current_server = outbound.get("server", "unknown")
break
except Exception:
pass
self.send_json({
"active": config_exists,
"tag": current_tag,
"server": current_server
})
def get_subscription(self):
"""Get saved subscription info"""
sub = load_subscription()
if sub:
self.send_json({
"saved": True,
"url": sub.get("url"),
"selectedServer": sub.get("selectedServer"),
"userInfo": sub.get("userInfo")
})
else:
self.send_json({"saved": False})
def test_connection(self):
"""Test active proxy connection"""
query_components = {}
if '?' in self.path:
_, query = self.path.split('?', 1)
query_components = parse_qs(query)
enable_speed = query_components.get('speed', ['false'])[0].lower() == 'true'
result = measure_proxy_performance(enable_speed_test=enable_speed)
self.send_json(result)
def ping_target(self):
"""Ping a specific target"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
server = data.get("server")
port = int(data.get("port", 443))
if not server:
self.send_json({"error": "No server specified"}, 400)
return
latency = measure_tcp_latency(server, port)
self.send_json({"latency": latency})
except Exception as e:
self.send_json({"error": str(e)}, 500)
def apply_config(self):
"""Apply new config from VLESS URL"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
url = data.get("url", "").strip()
if not url:
self.send_json({"success": False, "error": "URL не указан"}, 400)
return
if not url.startswith("vless://"):
self.send_json({"success": False, "error": "Неверный формат. Поддерживаются только vless:// ссылки"}, 400)
return
# Parse VLESS URL
try:
vless_params = parse_vless_url(url)
except ValueError as e:
self.send_json({"success": False, "error": f"Ошибка парсинга URL: {str(e)}"}, 400)
return
# Generate config
config = generate_vless_config(vless_params)
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Write config file
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Trigger reload via internal control port
try:
urllib.request.urlopen("http://localhost:9090/reload", timeout=5)
except Exception as e:
print(f"[WebUI] Warning: reload request failed: {e}")
self.send_json({
"success": True,
"message": f"Конфигурация '{vless_params['tag']}' успешно применена!"
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def fetch_subscription(self):
"""Fetch servers list from subscription URL"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
url = data.get("url", "").strip()
if not url:
self.send_json({"success": False, "error": "URL подписки не указан"}, 400)
return
# Fetch subscription config
sys_info = get_system_info()
req = urllib.request.Request(
url,
headers={
"User-Agent": "singbox",
"x-hwid": get_hwid(),
"x-device-os": sys_info["os"],
"x-ver-os": sys_info["version"],
"x-device-model": APP_NAME
}
)
try:
with urllib.request.urlopen(req, timeout=15) as response:
config_text = response.read().decode("utf-8")
config = json.loads(config_text)
# Parse User Info header
user_info_header = response.headers.get("subscription-userinfo", "")
user_info = {}
if user_info_header:
parts = user_info_header.split(';')
for part in parts:
if '=' in part:
key, value = part.strip().split('=', 1)
try:
user_info[key] = int(value)
except ValueError:
pass
except urllib.error.HTTPError as e:
self.send_json({"success": False, "error": f"Ошибка HTTP: {e.code}"}, 400)
return
except urllib.error.URLError as e:
self.send_json({"success": False, "error": f"Ошибка подключения: {e.reason}"}, 400)
return
# Extract outbound servers
outbounds = config.get("outbounds", [])
servers = []
for outbound in outbounds:
if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]:
servers.append({
"tag": outbound.get("tag", "unknown"),
"type": outbound.get("type"),
"server": outbound.get("server", "unknown"),
"server_port": outbound.get("server_port", 443)
})
if not servers:
self.send_json({"success": False, "error": "Серверы не найдены в подписке"}, 400)
return
self.send_json({
"success": True,
"servers": servers,
"config": config,
"userInfo": user_info
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON в ответе"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def apply_subscription(self):
"""Apply config from subscription with selected server"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
config = data.get("config")
selected_tag = data.get("selectedServer")
sub_url = data.get("subUrl") # URL подписки для сохранения
user_info = data.get("userInfo")
if not config:
self.send_json({"success": False, "error": "Конфигурация не указана"}, 400)
return
if not selected_tag:
self.send_json({"success": False, "error": "Сервер не выбран"}, 400)
return
# Modify config to use only selected server
outbounds = config.get("outbounds", [])
new_outbounds = []
selected_outbound = None
for outbound in outbounds:
if outbound.get("tag") == selected_tag:
selected_outbound = outbound
elif outbound.get("type") in ["direct", "block", "dns"]:
new_outbounds.append(outbound)
elif outbound.get("type") == "selector":
# Skip selector, we'll add selected server directly
pass
if not selected_outbound:
self.send_json({"success": False, "error": f"Сервер '{selected_tag}' не найден"}, 400)
return
# Add selected server as main outbound
new_outbounds.insert(0, selected_outbound)
# Update route - remove incompatible fields and set only final
routes = {
"final": selected_tag,
"auto_detect_interface": True
}
# Simplify DNS configuration to match client.json format
config["dns"] = {
"independent_cache": True
}
# Remove platform-specific and experimental fields from root config
config.pop("platform", None)
config.pop("experimental", None)
# Replace TUN inbounds with mixed proxy
config["inbounds"] = [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
]
config["outbounds"] = new_outbounds
config["route"] = routes
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Write config file
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Save subscription URL for persistence
if sub_url:
save_subscription(sub_url, selected_tag, user_info)
# Trigger reload via internal control port
try:
urllib.request.urlopen("http://localhost:9090/reload", timeout=5)
except Exception as e:
print(f"[WebUI] Warning: reload request failed: {e}")
self.send_json({
"success": True,
"message": f"Сервер '{selected_tag}' успешно применён!"
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def main():
"""Start the web server"""
# Use ThreadingTCPServer for concurrent requests
with ThreadingHTTPServer(("", PORT), ProxyControlHandler) as httpd:
print(f"[WebUI] Server started on port {PORT}")
print(f"[WebUI] Open http://localhost:{PORT} in your browser")
httpd.serve_forever()
if __name__ == "__main__":
main()