import http.server import json import urllib.request import urllib.error from urllib.parse import parse_qs, unquote, urlparse from datetime import datetime, timezone from .config import ( WEB_DIR, CONFIG_FILE, PROXY_PORT, DATA_DIR, APP_NAME, RELOAD_PORT, PROXY_BIND_IP ) from .utils import get_hwid, get_system_info from .storage import ( load_subscription, save_subscription, load_fallback_config, save_fallback_config, load_proxy_enabled, save_proxy_enabled, load_start_time, save_start_time ) from .network import measure_proxy_performance, measure_tcp_latency from .vless import ( parse_vless_url, generate_vless_config, generate_direct_config ) class ProxyControlHandler(http.server.BaseHTTPRequestHandler): """HTTP Request Handler for Proxy Control""" def log_message(self, format, *args): """Override to add timestamp prefix""" print(f"[WebUI] {args[0]}") def send_json(self, data: dict, status: int = 200): """Send JSON response""" self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8")) def send_html(self, content: bytes): """Send HTML response""" self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(content) def do_GET(self): """Handle GET requests""" if self.path == "/" or self.path == "/index.html": self.serve_index() elif self.path == "/status": self.get_status() elif self.path == "/subscription": self.get_subscription() elif self.path.startswith("/test-connection"): self.test_connection() elif self.path.startswith("/static/"): self.serve_static() elif self.path == "/fallback-config": self.get_fallback_config() elif self.path == "/active-proxy": self.get_active_proxy() elif self.path == "/proxy-enabled": self.get_proxy_enabled() else: self.send_error(404) def do_POST(self): """Handle POST requests""" if self.path == "/apply": self.apply_config() elif self.path == "/fetch-subscription": self.fetch_subscription() elif self.path == "/apply-subscription": self.apply_subscription() elif self.path == "/ping-target": self.ping_target() elif self.path == "/fallback-config": self.save_fallback_config_endpoint() elif self.path == "/proxy-enabled": self.set_proxy_enabled() else: self.send_error(404) def serve_index(self): """Serve main HTML page with SSI-like includes""" index_path = WEB_DIR / "index.html" if not index_path.exists(): self.send_error(404, "index.html not found") return try: content = index_path.read_text(encoding="utf-8") # Process includes: import re def replace_include(match): path = match.group(1) full_path = WEB_DIR / path if full_path.exists() and full_path.is_file(): return full_path.read_text(encoding="utf-8") return f"" # Simple one-pass replacement processed_content = re.sub(r'', replace_include, content) self.send_html(processed_content.encode("utf-8")) except Exception as e: self.send_error(500, f"Error serving index: {str(e)}") def serve_static(self): """Serve static files""" # Map /static/... to WEB_DIR/static/... path_clean = self.path.split('?')[0] # Remove query params file_path = WEB_DIR / path_clean.lstrip('/') if file_path.exists() and file_path.is_file(): content_type = "text/css" if str(file_path).endswith(".css") else "application/javascript" self.send_response(200) self.send_header("Content-Type", content_type) self.end_headers() self.wfile.write(file_path.read_bytes()) else: self.send_error(404) def get_fallback_config(self): """Get fallback proxy configuration""" fallback = load_fallback_config() self.send_json({ "enabled": fallback.get("enabled", False), "host": fallback.get("host", "192.168.50.111"), "port": fallback.get("port", 8080) }) def get_active_proxy(self): """Get information about current active proxy chain""" result = { "configured": False, "fallbackEnabled": False, "fallbackHost": None, "vpnTag": None, "vpnServer": None, "activeOutbound": None } if not CONFIG_FILE.exists(): self.send_json(result) return try: config = json.loads(CONFIG_FILE.read_text()) outbounds = config.get("outbounds", []) route_final = config.get("route", {}).get("final") result["configured"] = True for outbound in outbounds: out_type = outbound.get("type") if out_type == "urltest": result["fallbackEnabled"] = True elif out_type == "http" and outbound.get("tag") == "fallback-proxy": result["fallbackHost"] = f"{outbound.get('server')}:{outbound.get('server_port')}" elif out_type in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: result["vpnTag"] = outbound.get("tag") result["vpnServer"] = outbound.get("server") # Determine which is actually active # For now, we show the configured route result["activeOutbound"] = route_final # Check fallback proxy reachability (quick TCP check) if result["fallbackEnabled"] and result["fallbackHost"]: try: host, port = result["fallbackHost"].split(":") latency = measure_tcp_latency(host, int(port), timeout=1.0) result["fallbackReachable"] = latency > 0 result["fallbackLatency"] = latency if latency > 0 else None except Exception: result["fallbackReachable"] = False result["fallbackLatency"] = None except Exception as e: result["error"] = str(e) self.send_json(result) def save_fallback_config_endpoint(self): """Save fallback proxy configuration and regenerate config""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) enabled = data.get("enabled", False) host = data.get("host", "").strip() port = int(data.get("port", 8080)) if enabled and not host: self.send_json({"success": False, "error": "Host is required"}, 400) return save_fallback_config(enabled, host, port) # Regenerate current config if it exists regenerated = self.regenerate_current_config() self.send_json({ "success": True, "message": "Fallback config saved", "regenerated": regenerated }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Invalid JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def regenerate_current_config(self) -> bool: """Regenerate current config with updated fallback settings""" if not CONFIG_FILE.exists(): return False try: config = json.loads(CONFIG_FILE.read_text()) outbounds = config.get("outbounds", []) # Find the VPN outbound (vless, vmess, etc.) vpn_outbound = None utility_outbounds = [] for outbound in outbounds: if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: vpn_outbound = outbound elif outbound.get("type") in ["direct", "block", "dns"]: utility_outbounds.append(outbound) if not vpn_outbound: return False selected_tag = vpn_outbound.get("tag") # Load fallback config fallback = load_fallback_config() fallback_enabled = fallback.get("enabled", False) fallback_host = fallback.get("host", "") fallback_port = fallback.get("port", 8080) # Build new outbounds final_outbounds = [] final_tag = selected_tag if fallback_enabled and fallback_host: urltest_outbound = { "type": "urltest", "tag": "auto-select", "outbounds": ["fallback-proxy", selected_tag], "url": "http://www.gstatic.com/generate_204", "interval": "30s", "tolerance": 9999 } fallback_outbound = { "type": "http", "tag": "fallback-proxy", "server": fallback_host, "server_port": fallback_port } final_outbounds.append(urltest_outbound) final_outbounds.append(fallback_outbound) final_tag = "auto-select" final_outbounds.append(vpn_outbound) final_outbounds.extend(utility_outbounds) config["outbounds"] = final_outbounds config["route"]["final"] = final_tag # Write config CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Reload sing-box try: urllib.request.urlopen(f"http://127.0.0.1:{RELOAD_PORT}/reload", timeout=3) except Exception: pass return True except Exception as e: print(f"[WebUI] Failed to regenerate config: {e}") return False def get_status(self): """Get current proxy status""" config_exists = CONFIG_FILE.exists() current_tag = None current_server = None proxy_enabled = load_proxy_enabled() if config_exists: try: config = json.loads(CONFIG_FILE.read_text()) for outbound in config.get("outbounds", []): if outbound.get("type") == "vless": current_tag = outbound.get("tag", "unknown") current_server = outbound.get("server", "unknown") break except Exception: pass self.send_json({ "active": config_exists and proxy_enabled, "tag": current_tag, "server": current_server, "proxyPort": PROXY_PORT, "proxyEnabled": proxy_enabled, "startTime": load_start_time() if config_exists and proxy_enabled else 0 }) def get_proxy_enabled(self): """Get proxy enabled state""" enabled = load_proxy_enabled() self.send_json({"enabled": enabled}) def set_proxy_enabled(self): """Set proxy enabled state and regenerate config""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) enabled = data.get("enabled", True) save_proxy_enabled(enabled) # Regenerate config based on state if enabled: # Restore normal VPN config regenerated = self.regenerate_current_config() if regenerated: # Only update start time if actually enabling VPN save_start_time(datetime.now(timezone.utc).timestamp()) else: # Generate direct config (bypass proxy) regenerated = generate_direct_config() save_start_time(0) self.send_json({ "success": True, "enabled": enabled, "regenerated": regenerated }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Invalid JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def get_subscription(self): """Get saved subscription info""" sub = load_subscription() if sub: self.send_json({ "saved": True, "url": sub.get("url"), "selectedServer": sub.get("selectedServer"), "userInfo": sub.get("userInfo") }) else: self.send_json({"saved": False}) def test_connection(self): """Test active proxy connection""" query_components = {} if '?' in self.path: _, query = self.path.split('?', 1) query_components = parse_qs(query) enable_speed = query_components.get('speed', ['false'])[0].lower() == 'true' result = measure_proxy_performance(enable_speed_test=enable_speed) self.send_json(result) def ping_target(self): """Ping a specific target""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) server = data.get("server") port = int(data.get("port", 443)) if not server: self.send_json({"error": "No server specified"}, 400) return latency = measure_tcp_latency(server, port) self.send_json({"latency": latency}) except Exception as e: self.send_json({"error": str(e)}, 500) def apply_config(self): """Apply new config from VLESS URL""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) url = data.get("url", "").strip() if not url: self.send_json({"success": False, "error": "URL не указан"}, 400) return if not url.startswith("vless://"): self.send_json({"success": False, "error": "Неверный формат. Поддерживаются только vless:// ссылки"}, 400) return # Parse VLESS URL try: vless_params = parse_vless_url(url) except ValueError as e: self.send_json({"success": False, "error": f"Ошибка парсинга URL: {str(e)}"}, 400) return # Generate config config = generate_vless_config(vless_params) # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) # Write config file CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Trigger reload via internal control port try: urllib.request.urlopen(f"http://localhost:{RELOAD_PORT}/reload", timeout=5) except Exception as e: print(f"[WebUI] Warning: reload request failed: {e}") self.send_json({ "success": True, "message": f"Конфигурация '{vless_params['tag']}' успешно применена!" }) # Save new start time as connection is reset save_start_time(datetime.now(timezone.utc).timestamp()) except json.JSONDecodeError: self.send_json({"success": False, "error": "Неверный JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def fetch_subscription(self): """Fetch servers list from subscription URL""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) url = data.get("url", "").strip() if not url: self.send_json({"success": False, "error": "URL подписки не указан"}, 400) return # Validate URL scheme to prevent SSRF try: parsed_url = urlparse(url) if parsed_url.scheme not in ('http', 'https'): self.send_json({"success": False, "error": "Недопустимый протокол (только http/https)"}, 400) return except Exception: self.send_json({"success": False, "error": "Некорректный URL"}, 400) return # Fetch subscription config sys_info = get_system_info() req = urllib.request.Request( url, headers={ "User-Agent": "singbox", "x-hwid": get_hwid(), "x-device-os": sys_info["os"], "x-ver-os": sys_info["version"], "x-device-model": APP_NAME } ) config = None config_text = "" user_info = {} try: with urllib.request.urlopen(req, timeout=15) as response: config_text = response.read().decode("utf-8") # Parse User Info header user_info_header = response.headers.get("subscription-userinfo", "") if user_info_header: parts = user_info_header.split(';') for part in parts: if '=' in part: key, value = part.strip().split('=', 1) try: user_info[key] = int(value) except ValueError: pass except urllib.error.HTTPError as e: self.send_json({"success": False, "error": f"Ошибка HTTP: {e.code}"}, 400) return except urllib.error.URLError as e: self.send_json({"success": False, "error": f"Ошибка подключения: {e.reason}"}, 400) return # Try to parse as JSON first try: config = json.loads(config_text) except json.JSONDecodeError: # Not JSON - try Base64 decode or plain text VLESS links content = config_text.strip() # Try Base64 decode import base64 import re try: # Check if it looks like Base64 if re.match(r'^[A-Za-z0-9+/=\s]+$', content): decoded = base64.b64decode(content).decode('utf-8') content = decoded except Exception: pass # Not Base64, continue with original content # Parse VLESS links lines = content.strip().split('\n') vless_links = [line.strip() for line in lines if line.strip().startswith('vless://')] if not vless_links: self.send_json({"success": False, "error": "Не найдены VLESS ссылки в ответе"}, 400) return # Parse each VLESS link and create outbounds outbounds = [] for link in vless_links: try: params = parse_vless_url(link) outbound = { "type": "vless", "tag": params['tag'], "server": params['server'], "server_port": params['server_port'], "uuid": params['uuid'], "flow": params['flow'], "tls": { "enabled": True, "server_name": params['server_name'], "utls": {"enabled": True, "fingerprint": params['fingerprint']}, "reality": { "enabled": True, "public_key": params['public_key'], "short_id": params['short_id'] } }, "packet_encoding": "xudp" } outbounds.append(outbound) except Exception as e: print(f"[WebUI] Failed to parse VLESS link: {e}") continue if not outbounds: self.send_json({"success": False, "error": "Не удалось распарсить VLESS ссылки"}, 400) return # Create a mock config with parsed outbounds config = {"outbounds": outbounds} # Extract outbound servers outbounds = config.get("outbounds", []) servers = [] for outbound in outbounds: if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: servers.append({ "tag": outbound.get("tag", "unknown"), "type": outbound.get("type"), "server": outbound.get("server", "unknown"), "server_port": outbound.get("server_port", 443) }) if not servers: self.send_json({"success": False, "error": "Серверы не найдены в подписке"}, 400) return self.send_json({ "success": True, "servers": servers, "config": config, "userInfo": user_info }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Неверный JSON в ответе"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500) def apply_subscription(self): """Apply config from subscription with selected server""" try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode("utf-8") data = json.loads(body) config = data.get("config") selected_tag = data.get("selectedServer") sub_url = data.get("subUrl") # URL подписки для сохранения user_info = data.get("userInfo") if not config: self.send_json({"success": False, "error": "Конфигурация не указана"}, 400) return if not selected_tag: self.send_json({"success": False, "error": "Сервер не выбран"}, 400) return # Modify config to use only selected server outbounds = config.get("outbounds", []) new_outbounds = [] selected_outbound = None for outbound in outbounds: if outbound.get("tag") == selected_tag: selected_outbound = outbound elif outbound.get("type") in ["direct", "block", "dns"]: new_outbounds.append(outbound) elif outbound.get("type") == "selector": # Skip selector, we'll add selected server directly pass if not selected_outbound: self.send_json({"success": False, "error": f"Сервер '{selected_tag}' не найден"}, 400) return # Load fallback configuration fallback = load_fallback_config() fallback_enabled = fallback.get("enabled", False) fallback_host = fallback.get("host", "") fallback_port = fallback.get("port", 8080) # Build outbounds list final_outbounds = [] final_tag = selected_tag if fallback_enabled and fallback_host: # Add URLTest for automatic fallback selection # High tolerance (9999ms) ensures first working proxy is preferred urltest_outbound = { "type": "urltest", "tag": "auto-select", "outbounds": ["fallback-proxy", selected_tag], "url": "http://www.gstatic.com/generate_204", "interval": "30s", "tolerance": 9999 # Use first working proxy, not fastest } # Add HTTP fallback proxy fallback_outbound = { "type": "http", "tag": "fallback-proxy", "server": fallback_host, "server_port": fallback_port } final_outbounds.append(urltest_outbound) final_outbounds.append(fallback_outbound) final_tag = "auto-select" # Add selected VPN server final_outbounds.append(selected_outbound) # Add utility outbounds (direct, block, dns) final_outbounds.extend(new_outbounds) # Update route routes = { "final": final_tag, "auto_detect_interface": True } # Simplify DNS configuration to match client.json format config["dns"] = { "independent_cache": True } # Remove platform-specific and experimental fields from root config config.pop("platform", None) config.pop("experimental", None) # Replace TUN inbounds with mixed proxy config["inbounds"] = [ { "tag": "mixed-in", "type": "mixed", "sniff": True, "users": [], "listen": PROXY_BIND_IP, "listen_port": PROXY_PORT, "set_system_proxy": False } ] config["outbounds"] = final_outbounds config["route"] = routes # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) # Write config file CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) # Save subscription URL for persistence if sub_url: save_subscription(sub_url, selected_tag, user_info) # Trigger reload via internal control port try: urllib.request.urlopen(f"http://localhost:{RELOAD_PORT}/reload", timeout=5) except Exception as e: print(f"[WebUI] Warning: reload request failed: {e}") self.send_json({ "success": True, "message": f"Сервер '{selected_tag}' успешно применён!" }) except json.JSONDecodeError: self.send_json({"success": False, "error": "Неверный JSON"}, 400) except Exception as e: self.send_json({"success": False, "error": str(e)}, 500)