#!/usr/bin/env python3 """ Simple HTTP Web Server for VPN Proxy Control Provides a web UI to manage sing-box subscriptions """ import http.server import json import os import platform import socketserver import urllib.request import urllib.error import uuid import socket import time import time from datetime import datetime, timezone from urllib.parse import parse_qs, unquote from pathlib import Path PORT = int(os.environ.get("PORT", 3456)) PROXY_PORT = int(os.environ.get("PROXY_PORT", 8080)) APP_NAME = "VPN-Proxy-Control by Dokril" APP_DIR = Path(__file__).parent BASE_DIR = APP_DIR.parent WEB_DIR = APP_DIR DATA_DIR = BASE_DIR / "data" CONFIG_FILE = DATA_DIR / "client.json" HWID_FILE = DATA_DIR / "hwid" SUBSCRIPTION_FILE = DATA_DIR / "subscription.json" SUBSCRIPTION_FILE = DATA_DIR / "subscription.json" FALLBACK_FILE = DATA_DIR / "fallback.json" PROXY_ENABLED_FILE = DATA_DIR / "proxy_enabled.json" START_TIME_FILE = DATA_DIR / "start_time.json" # Default fallback proxy settings DEFAULT_FALLBACK = { "enabled": False, "host": "192.168.50.111", "port": 8080 } def get_hwid() -> str: """Get or generate hardware ID""" DATA_DIR.mkdir(parents=True, exist_ok=True) if HWID_FILE.exists(): return HWID_FILE.read_text().strip() # Generate new random HWID hwid = uuid.uuid4().hex[:16] HWID_FILE.write_text(hwid) return hwid def get_system_info() -> dict: """Get system information for headers""" system = platform.system().lower() # windows, linux, darwin version = platform.release() # 10, 5.15.0, 22.0.0 return { "os": system, "version": version } def save_subscription(url: str, selected_server: str = None, user_info: dict = None): """Save subscription URL, selected server and user info to file""" DATA_DIR.mkdir(parents=True, exist_ok=True) data = { "url": url, "selectedServer": selected_server, "userInfo": user_info } SUBSCRIPTION_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2)) def load_subscription() -> dict: """Load subscription from file""" if SUBSCRIPTION_FILE.exists(): try: return json.loads(SUBSCRIPTION_FILE.read_text()) except json.JSONDecodeError: pass return None def save_fallback_config(enabled: bool, host: str, port: int): """Save fallback proxy configuration to file""" DATA_DIR.mkdir(parents=True, exist_ok=True) data = { "enabled": enabled, "host": host, "port": port } FALLBACK_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2)) def load_fallback_config() -> dict: """Load fallback proxy configuration from file""" if FALLBACK_FILE.exists(): try: return json.loads(FALLBACK_FILE.read_text()) except json.JSONDecodeError: pass return DEFAULT_FALLBACK.copy() def save_proxy_enabled(enabled: bool): """Save proxy enabled state to file""" DATA_DIR.mkdir(parents=True, exist_ok=True) PROXY_ENABLED_FILE.write_text(json.dumps({"enabled": enabled})) def load_proxy_enabled() -> bool: """Load proxy enabled state from file""" if PROXY_ENABLED_FILE.exists(): try: data = json.loads(PROXY_ENABLED_FILE.read_text()) return data.get("enabled", True) except json.JSONDecodeError: pass return True # Default: proxy enabled def save_start_time(start_time: float): """Save VPN start time to file""" DATA_DIR.mkdir(parents=True, exist_ok=True) START_TIME_FILE.write_text(json.dumps({"startTime": start_time})) def load_start_time() -> float: """Load VPN start time from file""" if START_TIME_FILE.exists(): try: data = json.loads(START_TIME_FILE.read_text()) return data.get("startTime", 0.0) except json.JSONDecodeError: pass return 0.0 def measure_tcp_latency(host: str, port: int, timeout: float = 2.0) -> int: """Measure TCP latency to a host:port in milliseconds""" start_time = time.time() try: with socket.create_connection((host, port), timeout=timeout): latency = (time.time() - start_time) * 1000 return int(latency) except Exception: return -1 def measure_proxy_performance(enable_speed_test: bool = False) -> dict: """Measure proxy latency, speed and public IP via local proxy""" proxy_url = f"http://127.0.0.1:{PROXY_PORT}" proxies = {"http": proxy_url, "https": proxy_url} # 1. Measure Latency (Ping) latency = "Timeout" try: start_time = time.time() # Use a reliable endpoint for ping opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) req = urllib.request.Request("http://www.gstatic.com/generate_204", headers={"User-Agent": "singbox-test"}) with opener.open(req, timeout=5) as response: lat_ms = int((time.time() - start_time) * 1000) latency = f"{lat_ms}ms" except Exception as e: latency = "Error" # 2. Get Public IP (IPv4) ip = "Unknown" try: opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) # Use v4.ident.me to force IPv4 req = urllib.request.Request("http://v4.ident.me", headers={"User-Agent": "curl/7.68.0"}) with opener.open(req, timeout=5) as response: ip = response.read().decode('utf-8').strip() except Exception: # Fallback to ipify if ident.me fails or returns garbage try: req = urllib.request.Request("http://api.ipify.org", headers={"User-Agent": "curl/7.68.0"}) with opener.open(req, timeout=5) as response: ip = response.read().decode('utf-8').strip() except Exception: pass # 3. Measure Download Speed speed_mbps = 0.0 if enable_speed_test: test_files = [ # Tele2 Speedtest (Usually very reliable and fast) ("https://speedtest.selectel.ru/100MB", 100), # ThinkBroadband (Reliable backup) ("https://speedtest.selectel.ru/1GB", 1000) ] for url, size_mb in test_files: try: print(f"[WebUI] Testing speed with: {url}") start_time = time.time() opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) # Set a longer timeout for speed tests with opener.open(url, timeout=30) as response: downloaded = 0 # Larger chunk size for better throughput measurement chunk_size = 1024 * 256 # 256KB chunks # Download for at least 2 seconds or up to 25MB for accurate measurement min_test_duration = 2.0 # seconds max_download_bytes = 25 * 1024 * 1024 # 25MB while True: chunk = response.read(chunk_size) if not chunk: break downloaded += len(chunk) elapsed = time.time() - start_time # Stop if we've downloaded enough AND tested for minimum duration if downloaded >= max_download_bytes or (elapsed >= min_test_duration and downloaded >= 2 * 1024 * 1024): break duration = time.time() - start_time if duration > 0.1 and downloaded > 0: # Calculate speed in Mbps (megabits per second) # downloaded bytes * 8 bits/byte / 1,000,000 / seconds speed_mbps = round((downloaded * 8) / (1000 * 1000) / duration, 1) print(f"[WebUI] Speed test: downloaded {downloaded / (1024*1024):.1f}MB in {duration:.1f}s = {speed_mbps} Mbps") break # Stop if successful except Exception as e: print(f"[WebUI] Speed test failed for {url}: {e}") continue result = { "latency": latency, "ip": ip } if enable_speed_test: # If speed is still 0.0 but we tried, return Error or 0.0 result["speed"] = f"{speed_mbps} Mbps" return result def parse_vless_url(url: str) -> dict: """Parse VLESS URL and extract connection parameters""" if not url.startswith("vless://"): raise ValueError("URL must start with vless://") # Remove scheme url_no_scheme = url[8:] # Split by fragment (#tag) if '#' in url_no_scheme: url_part, tag = url_no_scheme.split('#', 1) tag = unquote(tag) else: url_part = url_no_scheme tag = "reality" # Split by query (?) if '?' in url_part: uuid_host_port, query_string = url_part.split('?', 1) else: raise ValueError("Missing query parameters") # Parse UUID@host:port if '@' not in uuid_host_port: raise ValueError("Missing @ separator") uuid_str, host_port = uuid_host_port.split('@', 1) if ':' not in host_port: raise ValueError("Missing port") host, port_str = host_port.rsplit(':', 1) port = int(port_str) # Parse query parameters params = {} for param in query_string.split('&'): if '=' in param: key, value = param.split('=', 1) params[key] = unquote(value) # Extract required parameters pbk = params.get('pbk', '') sid = params.get('sid', '') sni = params.get('sni', host) fp = params.get('fp', 'chrome') flow = params.get('flow', '') if not pbk or not sid: raise ValueError("Missing required parameters: pbk or sid") return { 'uuid': uuid_str, 'server': host, 'server_port': port, 'tag': tag, 'public_key': pbk, 'short_id': sid, 'server_name': sni, 'fingerprint': fp, 'flow': flow } def generate_vless_config(vless_params: dict) -> dict: """Generate sing-box configuration from VLESS parameters""" config = { "dns": { "independent_cache": True }, "log": { "level": "debug", "disabled": True, "timestamp": True }, "route": { "final": vless_params['tag'], "auto_detect_interface": True }, "inbounds": [ { "tag": "mixed-in", "type": "mixed", "sniff": True, "users": [], "listen": "0.0.0.0", "listen_port": PROXY_PORT, "set_system_proxy": False } ], "outbounds": [ { "type": "vless", "tag": vless_params['tag'], "server": vless_params['server'], "server_port": vless_params['server_port'], "flow": vless_params['flow'], "tls": { "enabled": True, "server_name": vless_params['server_name'], "reality": { "enabled": True, "public_key": vless_params['public_key'], "short_id": vless_params['short_id'] }, "utls": { "enabled": True, "fingerprint": vless_params['fingerprint'] } }, "uuid": vless_params['uuid'] }, { "tag": "direct", "type": "direct" } ] } return config class ThreadingHTTPServer(socketserver.ThreadingTCPServer): allow_reuse_address = True class ProxyControlHandler(http.server.BaseHTTPRequestHandler): """HTTP Request Handler for Proxy Control""" def log_message(self, format, *args): """Override to add timestamp prefix""" print(f"[WebUI] {args[0]}") def send_json(self, data: dict, status: int = 200): """Send JSON response""" self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8")) def send_html(self, content: bytes): """Send HTML response""" self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(content) def do_GET(self): """Handle GET requests""" if self.path == "/" or self.path == "/index.html": self.serve_index() elif self.path == "/status": self.get_status() elif self.path == "/subscription": self.get_subscription() elif self.path.startswith("/test-connection"): self.test_connection() elif self.path.startswith("/static/"): self.serve_static() elif self.path == "/fallback-config": self.get_fallback_config() elif self.path == "/active-proxy": self.get_active_proxy() elif self.path == "/proxy-enabled": self.get_proxy_enabled() else: self.send_error(404) def do_POST(self): """Handle POST requests""" if self.path == "/apply": self.apply_config() elif self.path == "/fetch-subscription": self.fetch_subscription() elif self.path == "/apply-subscription": self.apply_subscription() elif self.path == "/ping-target": self.ping_target() elif self.path == "/fallback-config": self.save_fallback_config_endpoint() elif self.path == "/proxy-enabled": self.set_proxy_enabled() else: self.send_error(404) def serve_index(self): """Serve main HTML page""" index_path = WEB_DIR / "index.html" if index_path.exists(): self.send_html(index_path.read_bytes()) else: self.send_error(404, "index.html not found") def serve_static(self): """Serve static files""" file_path = WEB_DIR / self.path[8:] # Remove /static/ if file_path.exists() and file_path.is_file(): content_type = "text/css" if str(file_path).endswith(".css") else "application/javascript" self.send_response(200) self.send_header("Content-Type", content_type) self.end_headers() self.wfile.write(file_path.read_bytes()) else: self.send_error(404) def get_fallback_config(self): """Get fallback proxy configuration""" fallback = load_fallback_config() self.send_json({ "enabled": fallback.get("enabled", False), "host": fallback.get("host", "192.168.50.111"), "port": fallback.get("port", 8080) }) def get_active_proxy(self): """Get information about current active proxy chain""" result = { "configured": False, "fallbackEnabled": False, "fallbackHost": None, "vpnTag": None, "vpnServer": None, "activeOutbound": None } if not CONFIG_FILE.exists(): self.send_json(result) return try: config = json.loads(CONFIG_FILE.read_text()) outbounds = config.get("outbounds", []) route_final = config.get("route", {}).get("final") result["configured"] = True for outbound in outbounds: out_type = outbound.get("type") if out_type == "urltest": result["fallbackEnabled"] = True elif out_type == "http" and outbound.get("tag") == "fallback-proxy": result["fallbackHost"] = f"{outbound.get('server')}:{outbound.get('server_port')}" elif out_type in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: result["vpnTag"] = outbound.get("tag") result["vpnServer"] = outbound.get("server") # Determine which is actually active # For now, we show the configured route result["activeOutbound"] = route_final # Check fallback proxy reachability (quick TCP check) if result["fallbackEnabled"] and result["fallbackHost"]: try: host, port = result["fallbackHost"].split(":") latency = measure_tcp_latency(host, int(port), timeout=1.0) result["fallbackReachable"] = latency > 0 result["fallbackLatency"] = latency if latency > 0 else None except Exception: result["fallbackReachable"] = False result["fallbackLatency"] = None except Exception as e: result["error"] = str(e) self.send_json(result) def save_fallback_config_endpoint(self): """Save fallback proxy configuration and regenerate config""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) enabled = data.get("enabled", False) host = data.get("host", "").strip() port = int(data.get("port", 8080)) if enabled and not host: self.send_json({"success": False, "error": "Host is required"}, 400) return save_fallback_config(enabled, host, port) # Regenerate current config if it exists regenerated = self.regenerate_current_config() self.send_json({ "success": True, "message": "Fallback config saved", "regenerated": regenerated }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Invalid JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def regenerate_current_config(self) -> bool: """Regenerate current config with updated fallback settings""" if not CONFIG_FILE.exists(): return False try: config = json.loads(CONFIG_FILE.read_text()) outbounds = config.get("outbounds", []) # Find the VPN outbound (vless, vmess, etc.) vpn_outbound = None utility_outbounds = [] for outbound in outbounds: if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: vpn_outbound = outbound elif outbound.get("type") in ["direct", "block", "dns"]: utility_outbounds.append(outbound) if not vpn_outbound: return False selected_tag = vpn_outbound.get("tag") # Load fallback config fallback = load_fallback_config() fallback_enabled = fallback.get("enabled", False) fallback_host = fallback.get("host", "") fallback_port = fallback.get("port", 8080) # Build new outbounds final_outbounds = [] final_tag = selected_tag if fallback_enabled and fallback_host: urltest_outbound = { "type": "urltest", "tag": "auto-select", "outbounds": ["fallback-proxy", selected_tag], "url": "http://www.gstatic.com/generate_204", "interval": "30s", "tolerance": 9999 } fallback_outbound = { "type": "http", "tag": "fallback-proxy", "server": fallback_host, "server_port": fallback_port } final_outbounds.append(urltest_outbound) final_outbounds.append(fallback_outbound) final_tag = "auto-select" final_outbounds.append(vpn_outbound) final_outbounds.extend(utility_outbounds) config["outbounds"] = final_outbounds config["route"]["final"] = final_tag # Write config CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Reload sing-box try: urllib.request.urlopen("http://127.0.0.1:9090/reload", timeout=3) except Exception: pass return True except Exception as e: print(f"[WebUI] Failed to regenerate config: {e}") return False def get_status(self): """Get current proxy status""" config_exists = CONFIG_FILE.exists() current_tag = None current_server = None proxy_enabled = load_proxy_enabled() if config_exists: try: config = json.loads(CONFIG_FILE.read_text()) for outbound in config.get("outbounds", []): if outbound.get("type") == "vless": current_tag = outbound.get("tag", "unknown") current_server = outbound.get("server", "unknown") break except Exception: pass self.send_json({ "active": config_exists and proxy_enabled, "tag": current_tag, "server": current_server, "proxyPort": PROXY_PORT, "proxyEnabled": proxy_enabled, "startTime": load_start_time() if config_exists and proxy_enabled else 0 }) def get_proxy_enabled(self): """Get proxy enabled state""" enabled = load_proxy_enabled() self.send_json({"enabled": enabled}) def set_proxy_enabled(self): """Set proxy enabled state and regenerate config""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) enabled = data.get("enabled", True) save_proxy_enabled(enabled) # Regenerate config based on state if enabled: # Restore normal VPN config regenerated = self.regenerate_current_config() if regenerated: # Only update start time if actually enabling VPN save_start_time(datetime.now(timezone.utc).timestamp()) else: # Generate direct config (bypass proxy) regenerated = self.generate_direct_config() save_start_time(0) self.send_json({ "success": True, "enabled": enabled, "regenerated": regenerated }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Invalid JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def generate_direct_config(self) -> bool: """Generate a direct connection config (bypass all proxies)""" try: config = { "dns": { "independent_cache": True }, "log": { "level": "debug", "disabled": True, "timestamp": True }, "route": { "final": "direct", "auto_detect_interface": True }, "inbounds": [ { "tag": "mixed-in", "type": "mixed", "sniff": True, "users": [], "listen": "0.0.0.0", "listen_port": PROXY_PORT, "set_system_proxy": False } ], "outbounds": [ { "tag": "direct", "type": "direct" } ] } DATA_DIR.mkdir(parents=True, exist_ok=True) CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Reload sing-box try: urllib.request.urlopen("http://127.0.0.1:9090/reload", timeout=3) except Exception: pass return True except Exception as e: print(f"[WebUI] Failed to generate direct config: {e}") return False def get_subscription(self): """Get saved subscription info""" sub = load_subscription() if sub: self.send_json({ "saved": True, "url": sub.get("url"), "selectedServer": sub.get("selectedServer"), "userInfo": sub.get("userInfo") }) else: self.send_json({"saved": False}) def test_connection(self): """Test active proxy connection""" query_components = {} if '?' in self.path: _, query = self.path.split('?', 1) query_components = parse_qs(query) enable_speed = query_components.get('speed', ['false'])[0].lower() == 'true' result = measure_proxy_performance(enable_speed_test=enable_speed) self.send_json(result) def ping_target(self): """Ping a specific target""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) server = data.get("server") port = int(data.get("port", 443)) if not server: self.send_json({"error": "No server specified"}, 400) return latency = measure_tcp_latency(server, port) self.send_json({"latency": latency}) except Exception as e: self.send_json({"error": str(e)}, 500) def apply_config(self): """Apply new config from VLESS URL""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) url = data.get("url", "").strip() if not url: self.send_json({"success": False, "error": "URL не указан"}, 400) return if not url.startswith("vless://"): self.send_json({"success": False, "error": "Неверный формат. Поддерживаются только vless:// ссылки"}, 400) return # Parse VLESS URL try: vless_params = parse_vless_url(url) except ValueError as e: self.send_json({"success": False, "error": f"Ошибка парсинга URL: {str(e)}"}, 400) return # Generate config config = generate_vless_config(vless_params) # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) # Write config file CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Trigger reload via internal control port try: urllib.request.urlopen("http://localhost:9090/reload", timeout=5) except Exception as e: print(f"[WebUI] Warning: reload request failed: {e}") self.send_json({ "success": True, "message": f"Конфигурация '{vless_params['tag']}' успешно применена!" }) # Save new start time as connection is reset save_start_time(datetime.now(timezone.utc).timestamp()) except json.JSONDecodeError: self.send_json({"success": False, "error": "Неверный JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def fetch_subscription(self): """Fetch servers list from subscription URL""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) url = data.get("url", "").strip() if not url: self.send_json({"success": False, "error": "URL подписки не указан"}, 400) return # Fetch subscription config sys_info = get_system_info() req = urllib.request.Request( url, headers={ "User-Agent": "singbox", "x-hwid": get_hwid(), "x-device-os": sys_info["os"], "x-ver-os": sys_info["version"], "x-device-model": APP_NAME } ) config = None config_text = "" user_info = {} try: with urllib.request.urlopen(req, timeout=15) as response: config_text = response.read().decode("utf-8") # Parse User Info header user_info_header = response.headers.get("subscription-userinfo", "") if user_info_header: parts = user_info_header.split(';') for part in parts: if '=' in part: key, value = part.strip().split('=', 1) try: user_info[key] = int(value) except ValueError: pass except urllib.error.HTTPError as e: self.send_json({"success": False, "error": f"Ошибка HTTP: {e.code}"}, 400) return except urllib.error.URLError as e: self.send_json({"success": False, "error": f"Ошибка подключения: {e.reason}"}, 400) return # Try to parse as JSON first try: config = json.loads(config_text) except json.JSONDecodeError: # Not JSON - try Base64 decode or plain text VLESS links content = config_text.strip() # Try Base64 decode import base64 import re try: # Check if it looks like Base64 if re.match(r'^[A-Za-z0-9+/=\s]+$', content): decoded = base64.b64decode(content).decode('utf-8') content = decoded except Exception: pass # Not Base64, continue with original content # Parse VLESS links lines = content.strip().split('\n') vless_links = [line.strip() for line in lines if line.strip().startswith('vless://')] if not vless_links: self.send_json({"success": False, "error": "Не найдены VLESS ссылки в ответе"}, 400) return # Parse each VLESS link and create outbounds outbounds = [] for link in vless_links: try: params = parse_vless_url(link) outbound = { "type": "vless", "tag": params['tag'], "server": params['server'], "server_port": params['server_port'], "uuid": params['uuid'], "flow": params['flow'], "tls": { "enabled": True, "server_name": params['server_name'], "utls": {"enabled": True, "fingerprint": params['fingerprint']}, "reality": { "enabled": True, "public_key": params['public_key'], "short_id": params['short_id'] } }, "packet_encoding": "xudp" } outbounds.append(outbound) except Exception as e: print(f"[WebUI] Failed to parse VLESS link: {e}") continue if not outbounds: self.send_json({"success": False, "error": "Не удалось распарсить VLESS ссылки"}, 400) return # Create a mock config with parsed outbounds config = {"outbounds": outbounds} # Extract outbound servers outbounds = config.get("outbounds", []) servers = [] for outbound in outbounds: if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: servers.append({ "tag": outbound.get("tag", "unknown"), "type": outbound.get("type"), "server": outbound.get("server", "unknown"), "server_port": outbound.get("server_port", 443) }) if not servers: self.send_json({"success": False, "error": "Серверы не найдены в подписке"}, 400) return self.send_json({ "success": True, "servers": servers, "config": config, "userInfo": user_info }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Неверный JSON в ответе"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def apply_subscription(self): """Apply config from subscription with selected server""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) config = data.get("config") selected_tag = data.get("selectedServer") sub_url = data.get("subUrl") # URL подписки для сохранения user_info = data.get("userInfo") if not config: self.send_json({"success": False, "error": "Конфигурация не указана"}, 400) return if not selected_tag: self.send_json({"success": False, "error": "Сервер не выбран"}, 400) return # Modify config to use only selected server outbounds = config.get("outbounds", []) new_outbounds = [] selected_outbound = None for outbound in outbounds: if outbound.get("tag") == selected_tag: selected_outbound = outbound elif outbound.get("type") in ["direct", "block", "dns"]: new_outbounds.append(outbound) elif outbound.get("type") == "selector": # Skip selector, we'll add selected server directly pass if not selected_outbound: self.send_json({"success": False, "error": f"Сервер '{selected_tag}' не найден"}, 400) return # Load fallback configuration fallback = load_fallback_config() fallback_enabled = fallback.get("enabled", False) fallback_host = fallback.get("host", "") fallback_port = fallback.get("port", 8080) # Build outbounds list final_outbounds = [] final_tag = selected_tag if fallback_enabled and fallback_host: # Add URLTest for automatic fallback selection # High tolerance (9999ms) ensures first working proxy is preferred urltest_outbound = { "type": "urltest", "tag": "auto-select", "outbounds": ["fallback-proxy", selected_tag], "url": "http://www.gstatic.com/generate_204", "interval": "30s", "tolerance": 9999 # Use first working proxy, not fastest } # Add HTTP fallback proxy fallback_outbound = { "type": "http", "tag": "fallback-proxy", "server": fallback_host, "server_port": fallback_port } final_outbounds.append(urltest_outbound) final_outbounds.append(fallback_outbound) final_tag = "auto-select" # Add selected VPN server final_outbounds.append(selected_outbound) # Add utility outbounds (direct, block, dns) final_outbounds.extend(new_outbounds) # Update route routes = { "final": final_tag, "auto_detect_interface": True } # Simplify DNS configuration to match client.json format config["dns"] = { "independent_cache": True } # Remove platform-specific and experimental fields from root config config.pop("platform", None) config.pop("experimental", None) # Replace TUN inbounds with mixed proxy config["inbounds"] = [ { "tag": "mixed-in", "type": "mixed", "sniff": True, "users": [], "listen": "0.0.0.0", "listen_port": PROXY_PORT, "set_system_proxy": False } ] config["outbounds"] = final_outbounds config["route"] = routes # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) # Write config file CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Save subscription URL for persistence if sub_url: save_subscription(sub_url, selected_tag, user_info) # Trigger reload via internal control port try: urllib.request.urlopen("http://localhost:9090/reload", timeout=5) except Exception as e: print(f"[WebUI] Warning: reload request failed: {e}") self.send_json({ "success": True, "message": f"Сервер '{selected_tag}' успешно применён!" }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Неверный JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def main(): """Start the web server""" # Use ThreadingTCPServer for concurrent requests with ThreadingHTTPServer(("", PORT), ProxyControlHandler) as httpd: print(f"[WebUI] Server started on port {PORT}") print(f"[WebUI] Open http://localhost:{PORT} in your browser") httpd.serve_forever() if __name__ == "__main__": main()