Files
vpn-proxy/web/server.py

1126 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Simple HTTP Web Server for VPN Proxy Control
Provides a web UI to manage sing-box subscriptions
"""
import http.server
import json
import os
import platform
import socketserver
import urllib.request
import urllib.error
import uuid
import socket
import time
import time
from datetime import datetime, timezone
from urllib.parse import parse_qs, unquote
from pathlib import Path
PORT = int(os.environ.get("PORT", 3456))
PROXY_PORT = int(os.environ.get("PROXY_PORT", 8080))
APP_NAME = "VPN-Proxy-Control by Dokril"
APP_DIR = Path(__file__).parent
BASE_DIR = APP_DIR.parent
WEB_DIR = APP_DIR
DATA_DIR = BASE_DIR / "data"
CONFIG_FILE = DATA_DIR / "client.json"
HWID_FILE = DATA_DIR / "hwid"
SUBSCRIPTION_FILE = DATA_DIR / "subscription.json"
SUBSCRIPTION_FILE = DATA_DIR / "subscription.json"
FALLBACK_FILE = DATA_DIR / "fallback.json"
PROXY_ENABLED_FILE = DATA_DIR / "proxy_enabled.json"
START_TIME_FILE = DATA_DIR / "start_time.json"
# Default fallback proxy settings
DEFAULT_FALLBACK = {
"enabled": False,
"host": "192.168.50.111",
"port": 8080
}
def get_hwid() -> str:
"""Get or generate hardware ID"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
if HWID_FILE.exists():
return HWID_FILE.read_text().strip()
# Generate new random HWID
hwid = uuid.uuid4().hex[:16]
HWID_FILE.write_text(hwid)
return hwid
def get_system_info() -> dict:
"""Get system information for headers"""
system = platform.system().lower() # windows, linux, darwin
version = platform.release() # 10, 5.15.0, 22.0.0
return {
"os": system,
"version": version
}
def save_subscription(url: str, selected_server: str = None, user_info: dict = None):
"""Save subscription URL, selected server and user info to file"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
data = {
"url": url,
"selectedServer": selected_server,
"userInfo": user_info
}
SUBSCRIPTION_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2))
def load_subscription() -> dict:
"""Load subscription from file"""
if SUBSCRIPTION_FILE.exists():
try:
return json.loads(SUBSCRIPTION_FILE.read_text())
except json.JSONDecodeError:
pass
return None
def save_fallback_config(enabled: bool, host: str, port: int):
"""Save fallback proxy configuration to file"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
data = {
"enabled": enabled,
"host": host,
"port": port
}
FALLBACK_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2))
def load_fallback_config() -> dict:
"""Load fallback proxy configuration from file"""
if FALLBACK_FILE.exists():
try:
return json.loads(FALLBACK_FILE.read_text())
except json.JSONDecodeError:
pass
return DEFAULT_FALLBACK.copy()
def save_proxy_enabled(enabled: bool):
"""Save proxy enabled state to file"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
PROXY_ENABLED_FILE.write_text(json.dumps({"enabled": enabled}))
def load_proxy_enabled() -> bool:
"""Load proxy enabled state from file"""
if PROXY_ENABLED_FILE.exists():
try:
data = json.loads(PROXY_ENABLED_FILE.read_text())
return data.get("enabled", True)
except json.JSONDecodeError:
pass
return True # Default: proxy enabled
def save_start_time(start_time: float):
"""Save VPN start time to file"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
START_TIME_FILE.write_text(json.dumps({"startTime": start_time}))
def load_start_time() -> float:
"""Load VPN start time from file"""
if START_TIME_FILE.exists():
try:
data = json.loads(START_TIME_FILE.read_text())
return data.get("startTime", 0.0)
except json.JSONDecodeError:
pass
return 0.0
def measure_tcp_latency(host: str, port: int, timeout: float = 2.0) -> int:
"""Measure TCP latency to a host:port in milliseconds"""
start_time = time.time()
try:
with socket.create_connection((host, port), timeout=timeout):
latency = (time.time() - start_time) * 1000
return int(latency)
except Exception:
return -1
def measure_proxy_performance(enable_speed_test: bool = False) -> dict:
"""Measure proxy latency, speed and public IP via local proxy"""
proxy_url = f"http://127.0.0.1:{PROXY_PORT}"
proxies = {"http": proxy_url, "https": proxy_url}
# 1. Measure Latency (Ping)
latency = "Timeout"
try:
start_time = time.time()
# Use a reliable endpoint for ping
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
req = urllib.request.Request("http://www.gstatic.com/generate_204", headers={"User-Agent": "singbox-test"})
with opener.open(req, timeout=5) as response:
lat_ms = int((time.time() - start_time) * 1000)
latency = f"{lat_ms}ms"
except Exception as e:
latency = "Error"
# 2. Get Public IP (IPv4)
ip = "Unknown"
try:
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
# Use v4.ident.me to force IPv4
req = urllib.request.Request("http://v4.ident.me", headers={"User-Agent": "curl/7.68.0"})
with opener.open(req, timeout=5) as response:
ip = response.read().decode('utf-8').strip()
except Exception:
# Fallback to ipify if ident.me fails or returns garbage
try:
req = urllib.request.Request("http://api.ipify.org", headers={"User-Agent": "curl/7.68.0"})
with opener.open(req, timeout=5) as response:
ip = response.read().decode('utf-8').strip()
except Exception:
pass
# 3. Measure Download Speed
speed_mbps = 0.0
if enable_speed_test:
test_files = [
# Tele2 Speedtest (Usually very reliable and fast)
("https://speedtest.selectel.ru/100MB", 100),
# ThinkBroadband (Reliable backup)
("https://speedtest.selectel.ru/1GB", 1000)
]
for url, size_mb in test_files:
try:
print(f"[WebUI] Testing speed with: {url}")
start_time = time.time()
opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies))
# Set a longer timeout for speed tests
with opener.open(url, timeout=30) as response:
downloaded = 0
# Larger chunk size for better throughput measurement
chunk_size = 1024 * 256 # 256KB chunks
# Download for at least 2 seconds or up to 25MB for accurate measurement
min_test_duration = 2.0 # seconds
max_download_bytes = 25 * 1024 * 1024 # 25MB
while True:
chunk = response.read(chunk_size)
if not chunk:
break
downloaded += len(chunk)
elapsed = time.time() - start_time
# Stop if we've downloaded enough AND tested for minimum duration
if downloaded >= max_download_bytes or (elapsed >= min_test_duration and downloaded >= 2 * 1024 * 1024):
break
duration = time.time() - start_time
if duration > 0.1 and downloaded > 0:
# Calculate speed in Mbps (megabits per second)
# downloaded bytes * 8 bits/byte / 1,000,000 / seconds
speed_mbps = round((downloaded * 8) / (1000 * 1000) / duration, 1)
print(f"[WebUI] Speed test: downloaded {downloaded / (1024*1024):.1f}MB in {duration:.1f}s = {speed_mbps} Mbps")
break # Stop if successful
except Exception as e:
print(f"[WebUI] Speed test failed for {url}: {e}")
continue
result = {
"latency": latency,
"ip": ip
}
if enable_speed_test:
# If speed is still 0.0 but we tried, return Error or 0.0
result["speed"] = f"{speed_mbps} Mbps"
return result
def parse_vless_url(url: str) -> dict:
"""Parse VLESS URL and extract connection parameters"""
if not url.startswith("vless://"):
raise ValueError("URL must start with vless://")
# Remove scheme
url_no_scheme = url[8:]
# Split by fragment (#tag)
if '#' in url_no_scheme:
url_part, tag = url_no_scheme.split('#', 1)
tag = unquote(tag)
else:
url_part = url_no_scheme
tag = "reality"
# Split by query (?)
if '?' in url_part:
uuid_host_port, query_string = url_part.split('?', 1)
else:
raise ValueError("Missing query parameters")
# Parse UUID@host:port
if '@' not in uuid_host_port:
raise ValueError("Missing @ separator")
uuid_str, host_port = uuid_host_port.split('@', 1)
if ':' not in host_port:
raise ValueError("Missing port")
host, port_str = host_port.rsplit(':', 1)
port = int(port_str)
# Parse query parameters
params = {}
for param in query_string.split('&'):
if '=' in param:
key, value = param.split('=', 1)
params[key] = unquote(value)
# Extract required parameters
pbk = params.get('pbk', '')
sid = params.get('sid', '')
sni = params.get('sni', host)
fp = params.get('fp', 'chrome')
flow = params.get('flow', '')
if not pbk or not sid:
raise ValueError("Missing required parameters: pbk or sid")
return {
'uuid': uuid_str,
'server': host,
'server_port': port,
'tag': tag,
'public_key': pbk,
'short_id': sid,
'server_name': sni,
'fingerprint': fp,
'flow': flow
}
def generate_vless_config(vless_params: dict) -> dict:
"""Generate sing-box configuration from VLESS parameters"""
config = {
"dns": {
"independent_cache": True
},
"log": {
"level": "debug",
"disabled": True,
"timestamp": True
},
"route": {
"final": vless_params['tag'],
"auto_detect_interface": True
},
"inbounds": [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
],
"outbounds": [
{
"type": "vless",
"tag": vless_params['tag'],
"server": vless_params['server'],
"server_port": vless_params['server_port'],
"flow": vless_params['flow'],
"tls": {
"enabled": True,
"server_name": vless_params['server_name'],
"reality": {
"enabled": True,
"public_key": vless_params['public_key'],
"short_id": vless_params['short_id']
},
"utls": {
"enabled": True,
"fingerprint": vless_params['fingerprint']
}
},
"uuid": vless_params['uuid']
},
{
"tag": "direct",
"type": "direct"
}
]
}
return config
class ThreadingHTTPServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
class ProxyControlHandler(http.server.BaseHTTPRequestHandler):
"""HTTP Request Handler for Proxy Control"""
def log_message(self, format, *args):
"""Override to add timestamp prefix"""
print(f"[WebUI] {args[0]}")
def send_json(self, data: dict, status: int = 200):
"""Send JSON response"""
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))
def send_html(self, content: bytes):
"""Send HTML response"""
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(content)
def do_GET(self):
"""Handle GET requests"""
if self.path == "/" or self.path == "/index.html":
self.serve_index()
elif self.path == "/status":
self.get_status()
elif self.path == "/subscription":
self.get_subscription()
elif self.path.startswith("/test-connection"):
self.test_connection()
elif self.path.startswith("/static/"):
self.serve_static()
elif self.path == "/fallback-config":
self.get_fallback_config()
elif self.path == "/active-proxy":
self.get_active_proxy()
elif self.path == "/proxy-enabled":
self.get_proxy_enabled()
else:
self.send_error(404)
def do_POST(self):
"""Handle POST requests"""
if self.path == "/apply":
self.apply_config()
elif self.path == "/fetch-subscription":
self.fetch_subscription()
elif self.path == "/apply-subscription":
self.apply_subscription()
elif self.path == "/ping-target":
self.ping_target()
elif self.path == "/fallback-config":
self.save_fallback_config_endpoint()
elif self.path == "/proxy-enabled":
self.set_proxy_enabled()
else:
self.send_error(404)
def serve_index(self):
"""Serve main HTML page"""
index_path = WEB_DIR / "index.html"
if index_path.exists():
self.send_html(index_path.read_bytes())
else:
self.send_error(404, "index.html not found")
def serve_static(self):
"""Serve static files"""
file_path = WEB_DIR / self.path[8:] # Remove /static/
if file_path.exists() and file_path.is_file():
content_type = "text/css" if str(file_path).endswith(".css") else "application/javascript"
self.send_response(200)
self.send_header("Content-Type", content_type)
self.end_headers()
self.wfile.write(file_path.read_bytes())
else:
self.send_error(404)
def get_fallback_config(self):
"""Get fallback proxy configuration"""
fallback = load_fallback_config()
self.send_json({
"enabled": fallback.get("enabled", False),
"host": fallback.get("host", "192.168.50.111"),
"port": fallback.get("port", 8080)
})
def get_active_proxy(self):
"""Get information about current active proxy chain"""
result = {
"configured": False,
"fallbackEnabled": False,
"fallbackHost": None,
"vpnTag": None,
"vpnServer": None,
"activeOutbound": None
}
if not CONFIG_FILE.exists():
self.send_json(result)
return
try:
config = json.loads(CONFIG_FILE.read_text())
outbounds = config.get("outbounds", [])
route_final = config.get("route", {}).get("final")
result["configured"] = True
for outbound in outbounds:
out_type = outbound.get("type")
if out_type == "urltest":
result["fallbackEnabled"] = True
elif out_type == "http" and outbound.get("tag") == "fallback-proxy":
result["fallbackHost"] = f"{outbound.get('server')}:{outbound.get('server_port')}"
elif out_type in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]:
result["vpnTag"] = outbound.get("tag")
result["vpnServer"] = outbound.get("server")
# Determine which is actually active
# For now, we show the configured route
result["activeOutbound"] = route_final
# Check fallback proxy reachability (quick TCP check)
if result["fallbackEnabled"] and result["fallbackHost"]:
try:
host, port = result["fallbackHost"].split(":")
latency = measure_tcp_latency(host, int(port), timeout=1.0)
result["fallbackReachable"] = latency > 0
result["fallbackLatency"] = latency if latency > 0 else None
except Exception:
result["fallbackReachable"] = False
result["fallbackLatency"] = None
except Exception as e:
result["error"] = str(e)
self.send_json(result)
def save_fallback_config_endpoint(self):
"""Save fallback proxy configuration and regenerate config"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
enabled = data.get("enabled", False)
host = data.get("host", "").strip()
port = int(data.get("port", 8080))
if enabled and not host:
self.send_json({"success": False, "error": "Host is required"}, 400)
return
save_fallback_config(enabled, host, port)
# Regenerate current config if it exists
regenerated = self.regenerate_current_config()
self.send_json({
"success": True,
"message": "Fallback config saved",
"regenerated": regenerated
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Invalid JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def regenerate_current_config(self) -> bool:
"""Regenerate current config with updated fallback settings"""
if not CONFIG_FILE.exists():
return False
try:
config = json.loads(CONFIG_FILE.read_text())
outbounds = config.get("outbounds", [])
# Find the VPN outbound (vless, vmess, etc.)
vpn_outbound = None
utility_outbounds = []
for outbound in outbounds:
if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]:
vpn_outbound = outbound
elif outbound.get("type") in ["direct", "block", "dns"]:
utility_outbounds.append(outbound)
if not vpn_outbound:
return False
selected_tag = vpn_outbound.get("tag")
# Load fallback config
fallback = load_fallback_config()
fallback_enabled = fallback.get("enabled", False)
fallback_host = fallback.get("host", "")
fallback_port = fallback.get("port", 8080)
# Build new outbounds
final_outbounds = []
final_tag = selected_tag
if fallback_enabled and fallback_host:
urltest_outbound = {
"type": "urltest",
"tag": "auto-select",
"outbounds": ["fallback-proxy", selected_tag],
"url": "http://www.gstatic.com/generate_204",
"interval": "30s",
"tolerance": 9999
}
fallback_outbound = {
"type": "http",
"tag": "fallback-proxy",
"server": fallback_host,
"server_port": fallback_port
}
final_outbounds.append(urltest_outbound)
final_outbounds.append(fallback_outbound)
final_tag = "auto-select"
final_outbounds.append(vpn_outbound)
final_outbounds.extend(utility_outbounds)
config["outbounds"] = final_outbounds
config["route"]["final"] = final_tag
# Write config
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Reload sing-box
try:
urllib.request.urlopen("http://127.0.0.1:9090/reload", timeout=3)
except Exception:
pass
return True
except Exception as e:
print(f"[WebUI] Failed to regenerate config: {e}")
return False
def get_status(self):
"""Get current proxy status"""
config_exists = CONFIG_FILE.exists()
current_tag = None
current_server = None
proxy_enabled = load_proxy_enabled()
if config_exists:
try:
config = json.loads(CONFIG_FILE.read_text())
for outbound in config.get("outbounds", []):
if outbound.get("type") == "vless":
current_tag = outbound.get("tag", "unknown")
current_server = outbound.get("server", "unknown")
break
except Exception:
pass
self.send_json({
"active": config_exists and proxy_enabled,
"tag": current_tag,
"server": current_server,
"proxyPort": PROXY_PORT,
"proxyEnabled": proxy_enabled,
"startTime": load_start_time() if config_exists and proxy_enabled else 0
})
def get_proxy_enabled(self):
"""Get proxy enabled state"""
enabled = load_proxy_enabled()
self.send_json({"enabled": enabled})
def set_proxy_enabled(self):
"""Set proxy enabled state and regenerate config"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
enabled = data.get("enabled", True)
save_proxy_enabled(enabled)
# Regenerate config based on state
if enabled:
# Restore normal VPN config
regenerated = self.regenerate_current_config()
if regenerated:
# Only update start time if actually enabling VPN
save_start_time(datetime.now(timezone.utc).timestamp())
else:
# Generate direct config (bypass proxy)
regenerated = self.generate_direct_config()
save_start_time(0)
self.send_json({
"success": True,
"enabled": enabled,
"regenerated": regenerated
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Invalid JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def generate_direct_config(self) -> bool:
"""Generate a direct connection config (bypass all proxies)"""
try:
config = {
"dns": {
"independent_cache": True
},
"log": {
"level": "debug",
"disabled": True,
"timestamp": True
},
"route": {
"final": "direct",
"auto_detect_interface": True
},
"inbounds": [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
],
"outbounds": [
{
"tag": "direct",
"type": "direct"
}
]
}
DATA_DIR.mkdir(parents=True, exist_ok=True)
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Reload sing-box
try:
urllib.request.urlopen("http://127.0.0.1:9090/reload", timeout=3)
except Exception:
pass
return True
except Exception as e:
print(f"[WebUI] Failed to generate direct config: {e}")
return False
def get_subscription(self):
"""Get saved subscription info"""
sub = load_subscription()
if sub:
self.send_json({
"saved": True,
"url": sub.get("url"),
"selectedServer": sub.get("selectedServer"),
"userInfo": sub.get("userInfo")
})
else:
self.send_json({"saved": False})
def test_connection(self):
"""Test active proxy connection"""
query_components = {}
if '?' in self.path:
_, query = self.path.split('?', 1)
query_components = parse_qs(query)
enable_speed = query_components.get('speed', ['false'])[0].lower() == 'true'
result = measure_proxy_performance(enable_speed_test=enable_speed)
self.send_json(result)
def ping_target(self):
"""Ping a specific target"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
server = data.get("server")
port = int(data.get("port", 443))
if not server:
self.send_json({"error": "No server specified"}, 400)
return
latency = measure_tcp_latency(server, port)
self.send_json({"latency": latency})
except Exception as e:
self.send_json({"error": str(e)}, 500)
def apply_config(self):
"""Apply new config from VLESS URL"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
url = data.get("url", "").strip()
if not url:
self.send_json({"success": False, "error": "URL не указан"}, 400)
return
if not url.startswith("vless://"):
self.send_json({"success": False, "error": "Неверный формат. Поддерживаются только vless:// ссылки"}, 400)
return
# Parse VLESS URL
try:
vless_params = parse_vless_url(url)
except ValueError as e:
self.send_json({"success": False, "error": f"Ошибка парсинга URL: {str(e)}"}, 400)
return
# Generate config
config = generate_vless_config(vless_params)
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Write config file
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Trigger reload via internal control port
try:
urllib.request.urlopen("http://localhost:9090/reload", timeout=5)
except Exception as e:
print(f"[WebUI] Warning: reload request failed: {e}")
self.send_json({
"success": True,
"message": f"Конфигурация '{vless_params['tag']}' успешно применена!"
})
# Save new start time as connection is reset
save_start_time(datetime.now(timezone.utc).timestamp())
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def fetch_subscription(self):
"""Fetch servers list from subscription URL"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
url = data.get("url", "").strip()
if not url:
self.send_json({"success": False, "error": "URL подписки не указан"}, 400)
return
# Fetch subscription config
sys_info = get_system_info()
req = urllib.request.Request(
url,
headers={
"User-Agent": "singbox",
"x-hwid": get_hwid(),
"x-device-os": sys_info["os"],
"x-ver-os": sys_info["version"],
"x-device-model": APP_NAME
}
)
config = None
config_text = ""
user_info = {}
try:
with urllib.request.urlopen(req, timeout=15) as response:
config_text = response.read().decode("utf-8")
# Parse User Info header
user_info_header = response.headers.get("subscription-userinfo", "")
if user_info_header:
parts = user_info_header.split(';')
for part in parts:
if '=' in part:
key, value = part.strip().split('=', 1)
try:
user_info[key] = int(value)
except ValueError:
pass
except urllib.error.HTTPError as e:
self.send_json({"success": False, "error": f"Ошибка HTTP: {e.code}"}, 400)
return
except urllib.error.URLError as e:
self.send_json({"success": False, "error": f"Ошибка подключения: {e.reason}"}, 400)
return
# Try to parse as JSON first
try:
config = json.loads(config_text)
except json.JSONDecodeError:
# Not JSON - try Base64 decode or plain text VLESS links
content = config_text.strip()
# Try Base64 decode
import base64
import re
try:
# Check if it looks like Base64
if re.match(r'^[A-Za-z0-9+/=\s]+$', content):
decoded = base64.b64decode(content).decode('utf-8')
content = decoded
except Exception:
pass # Not Base64, continue with original content
# Parse VLESS links
lines = content.strip().split('\n')
vless_links = [line.strip() for line in lines if line.strip().startswith('vless://')]
if not vless_links:
self.send_json({"success": False, "error": "Не найдены VLESS ссылки в ответе"}, 400)
return
# Parse each VLESS link and create outbounds
outbounds = []
for link in vless_links:
try:
params = parse_vless_url(link)
outbound = {
"type": "vless",
"tag": params['tag'],
"server": params['server'],
"server_port": params['server_port'],
"uuid": params['uuid'],
"flow": params['flow'],
"tls": {
"enabled": True,
"server_name": params['server_name'],
"utls": {"enabled": True, "fingerprint": params['fingerprint']},
"reality": {
"enabled": True,
"public_key": params['public_key'],
"short_id": params['short_id']
}
},
"packet_encoding": "xudp"
}
outbounds.append(outbound)
except Exception as e:
print(f"[WebUI] Failed to parse VLESS link: {e}")
continue
if not outbounds:
self.send_json({"success": False, "error": "Не удалось распарсить VLESS ссылки"}, 400)
return
# Create a mock config with parsed outbounds
config = {"outbounds": outbounds}
# Extract outbound servers
outbounds = config.get("outbounds", [])
servers = []
for outbound in outbounds:
if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]:
servers.append({
"tag": outbound.get("tag", "unknown"),
"type": outbound.get("type"),
"server": outbound.get("server", "unknown"),
"server_port": outbound.get("server_port", 443)
})
if not servers:
self.send_json({"success": False, "error": "Серверы не найдены в подписке"}, 400)
return
self.send_json({
"success": True,
"servers": servers,
"config": config,
"userInfo": user_info
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON в ответе"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def apply_subscription(self):
"""Apply config from subscription with selected server"""
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
config = data.get("config")
selected_tag = data.get("selectedServer")
sub_url = data.get("subUrl") # URL подписки для сохранения
user_info = data.get("userInfo")
if not config:
self.send_json({"success": False, "error": "Конфигурация не указана"}, 400)
return
if not selected_tag:
self.send_json({"success": False, "error": "Сервер не выбран"}, 400)
return
# Modify config to use only selected server
outbounds = config.get("outbounds", [])
new_outbounds = []
selected_outbound = None
for outbound in outbounds:
if outbound.get("tag") == selected_tag:
selected_outbound = outbound
elif outbound.get("type") in ["direct", "block", "dns"]:
new_outbounds.append(outbound)
elif outbound.get("type") == "selector":
# Skip selector, we'll add selected server directly
pass
if not selected_outbound:
self.send_json({"success": False, "error": f"Сервер '{selected_tag}' не найден"}, 400)
return
# Load fallback configuration
fallback = load_fallback_config()
fallback_enabled = fallback.get("enabled", False)
fallback_host = fallback.get("host", "")
fallback_port = fallback.get("port", 8080)
# Build outbounds list
final_outbounds = []
final_tag = selected_tag
if fallback_enabled and fallback_host:
# Add URLTest for automatic fallback selection
# High tolerance (9999ms) ensures first working proxy is preferred
urltest_outbound = {
"type": "urltest",
"tag": "auto-select",
"outbounds": ["fallback-proxy", selected_tag],
"url": "http://www.gstatic.com/generate_204",
"interval": "30s",
"tolerance": 9999 # Use first working proxy, not fastest
}
# Add HTTP fallback proxy
fallback_outbound = {
"type": "http",
"tag": "fallback-proxy",
"server": fallback_host,
"server_port": fallback_port
}
final_outbounds.append(urltest_outbound)
final_outbounds.append(fallback_outbound)
final_tag = "auto-select"
# Add selected VPN server
final_outbounds.append(selected_outbound)
# Add utility outbounds (direct, block, dns)
final_outbounds.extend(new_outbounds)
# Update route
routes = {
"final": final_tag,
"auto_detect_interface": True
}
# Simplify DNS configuration to match client.json format
config["dns"] = {
"independent_cache": True
}
# Remove platform-specific and experimental fields from root config
config.pop("platform", None)
config.pop("experimental", None)
# Replace TUN inbounds with mixed proxy
config["inbounds"] = [
{
"tag": "mixed-in",
"type": "mixed",
"sniff": True,
"users": [],
"listen": "0.0.0.0",
"listen_port": PROXY_PORT,
"set_system_proxy": False
}
]
config["outbounds"] = final_outbounds
config["route"] = routes
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Write config file
CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False))
# Save subscription URL for persistence
if sub_url:
save_subscription(sub_url, selected_tag, user_info)
# Trigger reload via internal control port
try:
urllib.request.urlopen("http://localhost:9090/reload", timeout=5)
except Exception as e:
print(f"[WebUI] Warning: reload request failed: {e}")
self.send_json({
"success": True,
"message": f"Сервер '{selected_tag}' успешно применён!"
})
except json.JSONDecodeError:
self.send_json({"success": False, "error": "Неверный JSON"}, 400)
except Exception as e:
self.send_json({"success": False, "error": str(e)}, 500)
def main():
"""Start the web server"""
# Use ThreadingTCPServer for concurrent requests
with ThreadingHTTPServer(("", PORT), ProxyControlHandler) as httpd:
print(f"[WebUI] Server started on port {PORT}")
print(f"[WebUI] Open http://localhost:{PORT} in your browser")
httpd.serve_forever()
if __name__ == "__main__":
main()