Files
vpn-proxy/web/server.py

747 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Simple HTTP Web Server for VPN Proxy Control
Provides a web UI to manage sing-box subscriptions
"""
import http.server
import json
import os
import platform
import socketserver
import urllib.request
import urllib.error
import uuid
import socket
import time
from urllib.parse import parse_qs, unquote
from pathlib import Path
PORT = int(os.environ.get("PORT", 3456))
PROXY_PORT = int(os.environ.get("PROXY_PORT", 8080))
APP_NAME = "VPN-Proxy-Control by Dokril"
APP_DIR = Path(__file__).parent
BASE_DIR = APP_DIR.parent
WEB_DIR = APP_DIR
DATA_DIR = BASE_DIR / "data"
CONFIG_FILE = DATA_DIR / "client.json"
HWID_FILE = DATA_DIR / "hwid"
SUBSCRIPTION_FILE = DATA_DIR / "subscription.json"
def get_hwid() -> str:
"""Get or generate hardware ID"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
if HWID_FILE.exists():
return HWID_FILE.read_text().strip()
# Generate new random HWID
hwid = uuid.uuid4().hex[:16]
HWID_FILE.write_text(hwid)
return hwid
def get_system_info() -> dict:
"""Get system information for headers"""
system = platform.system().lower() # windows, linux, darwin
version = platform.release() # 10, 5.15.0, 22.0.0
return {
"os": system,
"version": version
}
def save_subscription(url: str, selected_server: str = None, user_info: dict = None):
"""Save subscription URL, selected server and user info to file"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
data = {
"url": url,
"selectedServer": selected_server,
"userInfo": user_info
}
SUBSCRIPTION_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2))
def load_subscription() -> dict:
"""Load subscription from file"""
if SUBSCRIPTION_FILE.exists():
try:
return json.loads(SUBSCRIPTION_FILE.read_text())
except json.JSONDecodeError:
pass
return None
def measure_tcp_latency(host: str, port: int, timeout: float = 2.0) -> int:
"""Measure TCP latency to a host:port in milliseconds"""
start_time = time.time()
try:
with socket.create_connection((host, port), timeout=timeout):
latency = (time.time() - start_time) * 1000
return int(latency)
except Exception:
return -1
def measure_proxy_performance(enable_speed_test: bool = False) -> dict:
"""Measure proxy latency, speed and public IP via local proxy"""
proxy_url = f"http://127.0.0.1:{PROXY_PORT}"
proxies = {"http": proxy_url, "https": proxy_url}
# 1. Measure Latency (Ping)
latency = "Timeout"
try:
start_time = time.time()
# Use a reliable endpoint for ping
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
req = urllib.request.Request("http://www.gstatic.com/generate_204", headers={"User-Agent": "singbox-test"})
with opener.open(req, timeout=5) as response:
lat_ms = int((time.time() - start_time) * 1000)
latency = f"{lat_ms}ms"
except Exception as e:
latency = "Error"
# 2. Get Public IP (IPv4)
ip = "Unknown"
try:
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
# Use v4.ident.me to force IPv4
req = urllib.request.Request("http://v4.ident.me", headers={"User-Agent": "curl/7.68.0"})
with opener.open(req, timeout=5) as response:
ip = response.read().decode('utf-8').strip()
except Exception:
# Fallback to ipify if ident.me fails or returns garbage
try:
req = urllib.request.Request("http://api.ipify.org", headers={"User-Agent": "curl/7.68.0"})
with opener.open(req, timeout=5) as response:
ip = response.read().decode('utf-8').strip()
except Exception:
pass
# 3. Measure Download Speed
speed_mbps = 0.0
if enable_speed_test:
test_files = [
# Tele2 Speedtest (Usually very reliable and fast)
("https://speedtest.selectel.ru/100MB", 100),
# ThinkBroadband (Reliable backup)
("https://speedtest.selectel.ru/1GB", 1000)
]
for url, size_mb in test_files:
try:
print(f"[WebUI] Testing speed with: {url}")
start_time = time.time()
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
# Set a longer timeout for speed tests
with opener.open(url, timeout=30) as response:
downloaded = 0
# Larger chunk size for better throughput measurement
chunk_size = 1024 * 256 # 256KB chunks
# Download for at least 2 seconds or up to 25MB for accurate measurement
min_test_duration = 2.0 # seconds
max_download_bytes = 25 * 1024 * 1024 # 25MB
while True:
chunk = response.read(chunk_size)
if not chunk:
break
downloaded += len(chunk)
elapsed = time.time() - start_time
# Stop if we've downloaded enough AND tested for minimum duration
if downloaded >= max_download_bytes or (elapsed >= min_test_duration and downloaded >= 2 * 1024 * 1024):
break
duration = time.time() - start_time
if duration > 0.1 and downloaded > 0:
# Calculate speed in Mbps (megabits per second)
# downloaded bytes * 8 bits/byte / 1,000,000 / seconds
speed_mbps = round((downloaded * 8) / (1000 * 1000) / duration, 1)
print(f"[WebUI] Speed test: downloaded {downloaded / (1024*1024):.1f}MB in {duration:.1f}s = {speed_mbps} Mbps")
break # Stop if successful
except Exception as e:
print(f"[WebUI] Speed test failed for {url}: {e}")
continue
result = {
"latency": latency,
"ip": ip
}
if enable_speed_test:
# If speed is still 0.0 but we tried, return Error or 0.0
result["speed"] = f"{speed_mbps} Mbps"
return result
def parse_vless_url(url: str) -> dict:
"""Parse VLESS URL and extract connection parameters"""
if not url.startswith("vless://"):
raise ValueError("URL must start with vless://")
# Remove scheme
url_no_scheme = url[8:]
# Split by fragment (#tag)
if '#' in url_no_scheme:
url_part, tag = url_no_scheme.split('#', 1)
tag = unquote(tag)
else:
url_part = url_no_scheme
tag = "reality"
# Split by query (?)
if '?' in url_part:
uuid_host_port, query_string = url_part.split('?', 1)
else:
raise ValueError("Missing query parameters")
# Parse UUID@host:port
if '@' not in uuid_host_port:
raise ValueError("Missing @ separator")
uuid_str, host_port = uuid_host_port.split('@', 1)
if ':' not in host_port:
raise ValueError("Missing port")
host, port_str = host_port.rsplit(':', 1)
port = int(port_str)
# Parse query parameters
params = {}
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
params[key] = unquote(value)
# Extract required parameters
pbk = params.get('pbk', '')
sid = params.get('sid', '')
sni = params.get('sni', host)
fp = params.get('fp', 'chrome')
flow = params.get('flow', '')
if not pbk or not sid:
raise ValueError("Missing required parameters: pbk or sid")
return {
'uuid': uuid_str,
'server': host,
'server_port': port,
'tag': tag,
'public_key': pbk,
'short_id': sid,
'server_name': sni,
'fingerprint': fp,
'flow': flow
}
def generate_vless_config(vless_params: dict) -> dict:
"""Generate sing-box configuration from VLESS parameters"""
config = {
"dns": {
"independent_cache": True
},
"log": {
"level": "debug",
"disabled": True,
"timestamp": True
},
"route": {
"final": vless_params['tag'],
"auto_detect_interface": True
},
"inbounds": [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
],
"outbounds": [
{
"type": "vless",
"tag": vless_params['tag'],
"server": vless_params['server'],
"server_port": vless_params['server_port'],
"flow": vless_params['flow'],
"tls": {
"enabled": True,
"server_name": vless_params['server_name'],
"reality": {
"enabled": True,
"public_key": vless_params['public_key'],
"short_id": vless_params['short_id']
},
"utls": {
"enabled": True,
"fingerprint": vless_params['fingerprint']
}
},
"uuid": vless_params['uuid']
},
{
"tag": "direct",
"type": "direct"
}
]
}
return config
class ThreadingHTTPServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
class ProxyControlHandler(http.server.BaseHTTPRequestHandler):
"""HTTP Request Handler for Proxy Control"""
def log_message(self, format, *args):
"""Override to add timestamp prefix"""
print(f"[WebUI] {args[0]}")
def send_json(self, data: dict, status: int = 200):
"""Send JSON response"""
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))
def send_html(self, content: bytes):
"""Send HTML response"""
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(content)
def do_GET(self):
"""Handle GET requests"""
if self.path == "/" or self.path == "/index.html":
self.serve_index()
elif self.path == "/status":
self.get_status()
elif self.path == "/subscription":
self.get_subscription()
elif self.path.startswith("/test-connection"):
self.test_connection()
elif self.path.startswith("/static/"):
self.serve_static()
else:
self.send_error(404)
def do_POST(self):
"""Handle POST requests"""
if self.path == "/apply":
self.apply_config()
elif self.path == "/fetch-subscription":
self.fetch_subscription()
elif self.path == "/apply-subscription":
self.apply_subscription()
elif self.path == "/ping-target":
self.ping_target()
else:
self.send_error(404)
def serve_index(self):
"""Serve main HTML page"""
index_path = WEB_DIR / "index.html"
if index_path.exists():
self.send_html(index_path.read_bytes())
else:
self.send_error(404, "index.html not found")
def serve_static(self):
"""Serve static files"""
file_path = WEB_DIR / self.path[8:] # Remove /static/
if file_path.exists() and file_path.is_file():
content_type = "text/css" if str(file_path).endswith(".css") else "application/javascript"
self.send_response(200)
self.send_header("Content-Type", content_type)
self.end_headers()
self.wfile.write(file_path.read_bytes())
else:
self.send_error(404)
def get_status(self):
"""Get current proxy status"""
config_exists = CONFIG_FILE.exists()
current_tag = None
current_server = None
if config_exists:
try:
config = json.loads(CONFIG_FILE.read_text())
for outbound in config.get("outbounds", []):
if outbound.get("type") == "vless":
current_tag = outbound.get("tag", "unknown")
current_server = outbound.get("server", "unknown")
break
except Exception:
pass
self.send_json({
"active": config_exists,
"tag": current_tag,
"server": current_server
})
def get_subscription(self):
"""Get saved subscription info"""
sub = load_subscription()
if sub:
self.send_json({
"saved": True,
"url": sub.get("url"),
"selectedServer": sub.get("selectedServer"),
"userInfo": sub.get("userInfo")
})
else:
self.send_json({"saved": False})
def test_connection(self):
"""Test active proxy connection"""
query_components = {}
if '?' in self.path:
_, query = self.path.split('?', 1)
query_components = parse_qs(query)
enable_speed = query_components.get('speed', ['false'])[0].lower() == 'true'
result = measure_proxy_performance(enable_speed_test=enable_speed)
self.send_json(result)
def ping_target(self):
"""Ping a specific target"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
server = data.get("server")
port = int(data.get("port", 443))
if not server:
self.send_json({"error": "No server specified"}, 400)
return
latency = measure_tcp_latency(server, port)
self.send_json({"latency": latency})
except Exception as e:
self.send_json({"error": str(e)}, 500)
def apply_config(self):
"""Apply new config from VLESS URL"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
url = data.get("url", "").strip()
if not url:
self.send_json({"success": False, "error": "URL не указан"}, 400)
return
if not url.startswith("vless://"):
self.send_json({"success": False, "error": "Неверный формат. Поддерживаются только vless:// ссылки"}, 400)
return
# Parse VLESS URL
try:
vless_params = parse_vless_url(url)
except ValueError as e:
self.send_json({"success": False, "error": f"Ошибка парсинга URL: {str(e)}"}, 400)
return
# Generate config
config = generate_vless_config(vless_params)
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Write config file
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Trigger reload via internal control port
try:
urllib.request.urlopen("http://localhost:9090/reload", timeout=5)
except Exception as e:
print(f"[WebUI] Warning: reload request failed: {e}")
self.send_json({
"success": True,
"message": f"Конфигурация '{vless_params['tag']}' успешно применена!"
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def fetch_subscription(self):
"""Fetch servers list from subscription URL"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
url = data.get("url", "").strip()
if not url:
self.send_json({"success": False, "error": "URL подписки не указан"}, 400)
return
# Fetch subscription config
sys_info = get_system_info()
req = urllib.request.Request(
url,
headers={
"User-Agent": "singbox",
"x-hwid": get_hwid(),
"x-device-os": sys_info["os"],
"x-ver-os": sys_info["version"],
"x-device-model": APP_NAME
}
)
config = None
config_text = ""
user_info = {}
try:
with urllib.request.urlopen(req, timeout=15) as response:
config_text = response.read().decode("utf-8")
# Parse User Info header
user_info_header = response.headers.get("subscription-userinfo", "")
if user_info_header:
parts = user_info_header.split(';')
for part in parts:
if '=' in part:
key, value = part.strip().split('=', 1)
try:
user_info[key] = int(value)
except ValueError:
pass
except urllib.error.HTTPError as e:
self.send_json({"success": False, "error": f"Ошибка HTTP: {e.code}"}, 400)
return
except urllib.error.URLError as e:
self.send_json({"success": False, "error": f"Ошибка подключения: {e.reason}"}, 400)
return
# Try to parse as JSON first
try:
config = json.loads(config_text)
except json.JSONDecodeError:
# Not JSON - try Base64 decode or plain text VLESS links
content = config_text.strip()
# Try Base64 decode
import base64
import re
try:
# Check if it looks like Base64
if re.match(r'^[A-Za-z0-9+/=\s]+$', content):
decoded = base64.b64decode(content).decode('utf-8')
content = decoded
except Exception:
pass # Not Base64, continue with original content
# Parse VLESS links
lines = content.strip().split('\n')
vless_links = [line.strip() for line in lines if line.strip().startswith('vless://')]
if not vless_links:
self.send_json({"success": False, "error": "Не найдены VLESS ссылки в ответе"}, 400)
return
# Parse each VLESS link and create outbounds
outbounds = []
for link in vless_links:
try:
params = parse_vless_url(link)
outbound = {
"type": "vless",
"tag": params['tag'],
"server": params['server'],
"server_port": params['server_port'],
"uuid": params['uuid'],
"flow": params['flow'],
"tls": {
"enabled": True,
"server_name": params['server_name'],
"utls": {"enabled": True, "fingerprint": params['fingerprint']},
"reality": {
"enabled": True,
"public_key": params['public_key'],
"short_id": params['short_id']
}
},
"packet_encoding": "xudp"
}
outbounds.append(outbound)
except Exception as e:
print(f"[WebUI] Failed to parse VLESS link: {e}")
continue
if not outbounds:
self.send_json({"success": False, "error": "Не удалось распарсить VLESS ссылки"}, 400)
return
# Create a mock config with parsed outbounds
config = {"outbounds": outbounds}
# Extract outbound servers
outbounds = config.get("outbounds", [])
servers = []
for outbound in outbounds:
if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]:
servers.append({
"tag": outbound.get("tag", "unknown"),
"type": outbound.get("type"),
"server": outbound.get("server", "unknown"),
"server_port": outbound.get("server_port", 443)
})
if not servers:
self.send_json({"success": False, "error": "Серверы не найдены в подписке"}, 400)
return
self.send_json({
"success": True,
"servers": servers,
"config": config,
"userInfo": user_info
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON в ответе"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def apply_subscription(self):
"""Apply config from subscription with selected server"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
config = data.get("config")
selected_tag = data.get("selectedServer")
sub_url = data.get("subUrl") # URL подписки для сохранения
user_info = data.get("userInfo")
if not config:
self.send_json({"success": False, "error": "Конфигурация не указана"}, 400)
return
if not selected_tag:
self.send_json({"success": False, "error": "Сервер не выбран"}, 400)
return
# Modify config to use only selected server
outbounds = config.get("outbounds", [])
new_outbounds = []
selected_outbound = None
for outbound in outbounds:
if outbound.get("tag") == selected_tag:
selected_outbound = outbound
elif outbound.get("type") in ["direct", "block", "dns"]:
new_outbounds.append(outbound)
elif outbound.get("type") == "selector":
# Skip selector, we'll add selected server directly
pass
if not selected_outbound:
self.send_json({"success": False, "error": f"Сервер '{selected_tag}' не найден"}, 400)
return
# Add selected server as main outbound
new_outbounds.insert(0, selected_outbound)
# Update route - remove incompatible fields and set only final
routes = {
"final": selected_tag,
"auto_detect_interface": True
}
# Simplify DNS configuration to match client.json format
config["dns"] = {
"independent_cache": True
}
# Remove platform-specific and experimental fields from root config
config.pop("platform", None)
config.pop("experimental", None)
# Replace TUN inbounds with mixed proxy
config["inbounds"] = [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
]
config["outbounds"] = new_outbounds
config["route"] = routes
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Write config file
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Save subscription URL for persistence
if sub_url:
save_subscription(sub_url, selected_tag, user_info)
# Trigger reload via internal control port
try:
urllib.request.urlopen("http://localhost:9090/reload", timeout=5)
except Exception as e:
print(f"[WebUI] Warning: reload request failed: {e}")
self.send_json({
"success": True,
"message": f"Сервер '{selected_tag}' успешно применён!"
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def main():
"""Start the web server"""
# Use ThreadingTCPServer for concurrent requests
with ThreadingHTTPServer(("", PORT), ProxyControlHandler) as httpd:
print(f"[WebUI] Server started on port {PORT}")
print(f"[WebUI] Open http://localhost:{PORT} in your browser")
httpd.serve_forever()
if __name__ == "__main__":
main()