diff --git a/.gitignore b/.gitignore index 91b6e6c..169ec65 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,60 @@ -data -_legacy +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual Env +venv/ +.venv/ +env/ +.env + +# PyInstaller +*.manifest +*.spec + +# MacOS +.DS_Store +.AppleDouble +.LSOverride +._* +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# IDE & Editors +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Project Specific +data/ +_legacy/ +*.log +sing-box + +# Docker +docker-compose.override.yml diff --git a/web/app/__init__.py b/web/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/web/app/api.py b/web/app/api.py new file mode 100644 index 0000000..682870d --- /dev/null +++ b/web/app/api.py @@ -0,0 +1,742 @@ +import http.server +import json +import urllib.request +import urllib.error +from urllib.parse import parse_qs, unquote, urlparse +from datetime import datetime, timezone + +from .config import ( + WEB_DIR, CONFIG_FILE, PROXY_PORT, DATA_DIR, APP_NAME, + RELOAD_PORT, PROXY_BIND_IP +) +from .utils import get_hwid, get_system_info +from .storage import ( + load_subscription, save_subscription, + load_fallback_config, save_fallback_config, + load_proxy_enabled, save_proxy_enabled, + load_start_time, save_start_time +) +from .network import measure_proxy_performance, measure_tcp_latency +from .vless import ( + parse_vless_url, generate_vless_config, generate_direct_config +) + +class ProxyControlHandler(http.server.BaseHTTPRequestHandler): + """HTTP Request Handler for Proxy Control""" + + def log_message(self, format, *args): + """Override to add timestamp prefix""" + print(f"[WebUI] {args[0]}") + + def send_json(self, data: dict, status: int = 200): + """Send JSON response""" + self.send_response(status) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8")) + + def send_html(self, content: bytes): + """Send HTML response""" + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + self.wfile.write(content) + + def do_GET(self): + """Handle GET requests""" + if self.path == "/" or self.path == "/index.html": + self.serve_index() + elif self.path == "/status": + self.get_status() + elif self.path == "/subscription": + self.get_subscription() + elif self.path.startswith("/test-connection"): + self.test_connection() + elif self.path.startswith("/static/"): + self.serve_static() + elif self.path == "/fallback-config": + self.get_fallback_config() + elif self.path == "/active-proxy": + self.get_active_proxy() + elif self.path == "/proxy-enabled": + self.get_proxy_enabled() + else: + self.send_error(404) + + def do_POST(self): + """Handle POST requests""" + if self.path == "/apply": + self.apply_config() + elif self.path == "/fetch-subscription": + self.fetch_subscription() + elif self.path == "/apply-subscription": + self.apply_subscription() + elif self.path == "/ping-target": + self.ping_target() + elif self.path == "/fallback-config": + self.save_fallback_config_endpoint() + elif self.path == "/proxy-enabled": + self.set_proxy_enabled() + else: + self.send_error(404) + + def serve_index(self): + """Serve main HTML page with SSI-like includes""" + index_path = WEB_DIR / "index.html" + if not index_path.exists(): + self.send_error(404, "index.html not found") + return + + try: + content = index_path.read_text(encoding="utf-8") + + # Process includes: + import re + + def replace_include(match): + path = match.group(1) + full_path = WEB_DIR / path + if full_path.exists() and full_path.is_file(): + return full_path.read_text(encoding="utf-8") + return f"" + + # Simple one-pass replacement + processed_content = re.sub(r'', replace_include, content) + + self.send_html(processed_content.encode("utf-8")) + + except Exception as e: + self.send_error(500, f"Error serving index: {str(e)}") + + def serve_static(self): + """Serve static files""" + # Map /static/... to WEB_DIR/static/... + path_clean = self.path.split('?')[0] # Remove query params + file_path = WEB_DIR / path_clean.lstrip('/') + if file_path.exists() and file_path.is_file(): + content_type = "text/css" if str(file_path).endswith(".css") else "application/javascript" + self.send_response(200) + self.send_header("Content-Type", content_type) + self.end_headers() + self.wfile.write(file_path.read_bytes()) + else: + self.send_error(404) + + def get_fallback_config(self): + """Get fallback proxy configuration""" + fallback = load_fallback_config() + self.send_json({ + "enabled": fallback.get("enabled", False), + "host": fallback.get("host", "192.168.50.111"), + "port": fallback.get("port", 8080) + }) + + def get_active_proxy(self): + """Get information about current active proxy chain""" + result = { + "configured": False, + "fallbackEnabled": False, + "fallbackHost": None, + "vpnTag": None, + "vpnServer": None, + "activeOutbound": None + } + + if not CONFIG_FILE.exists(): + self.send_json(result) + return + + try: + config = json.loads(CONFIG_FILE.read_text()) + outbounds = config.get("outbounds", []) + route_final = config.get("route", {}).get("final") + + result["configured"] = True + + for outbound in outbounds: + out_type = outbound.get("type") + + if out_type == "urltest": + result["fallbackEnabled"] = True + elif out_type == "http" and outbound.get("tag") == "fallback-proxy": + result["fallbackHost"] = f"{outbound.get('server')}:{outbound.get('server_port')}" + elif out_type in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: + result["vpnTag"] = outbound.get("tag") + result["vpnServer"] = outbound.get("server") + + # Determine which is actually active + # For now, we show the configured route + result["activeOutbound"] = route_final + + # Check fallback proxy reachability (quick TCP check) + if result["fallbackEnabled"] and result["fallbackHost"]: + try: + host, port = result["fallbackHost"].split(":") + latency = measure_tcp_latency(host, int(port), timeout=1.0) + result["fallbackReachable"] = latency > 0 + result["fallbackLatency"] = latency if latency > 0 else None + except Exception: + result["fallbackReachable"] = False + result["fallbackLatency"] = None + + except Exception as e: + result["error"] = str(e) + + self.send_json(result) + + def save_fallback_config_endpoint(self): + """Save fallback proxy configuration and regenerate config""" + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) + + enabled = data.get("enabled", False) + host = data.get("host", "").strip() + port = int(data.get("port", 8080)) + + if enabled and not host: + self.send_json({"success": False, "error": "Host is required"}, 400) + return + + save_fallback_config(enabled, host, port) + + # Regenerate current config if it exists + regenerated = self.regenerate_current_config() + + self.send_json({ + "success": True, + "message": "Fallback config saved", + "regenerated": regenerated + }) + + except json.JSONDecodeError: + self.send_json({"success": False, "error": "Invalid JSON"}, 400) + except Exception as e: + self.send_json({"success": False, "error": str(e)}, 500) + + def regenerate_current_config(self) -> bool: + """Regenerate current config with updated fallback settings""" + if not CONFIG_FILE.exists(): + return False + + try: + config = json.loads(CONFIG_FILE.read_text()) + outbounds = config.get("outbounds", []) + + # Find the VPN outbound (vless, vmess, etc.) + vpn_outbound = None + utility_outbounds = [] + + for outbound in outbounds: + if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: + vpn_outbound = outbound + elif outbound.get("type") in ["direct", "block", "dns"]: + utility_outbounds.append(outbound) + + if not vpn_outbound: + return False + + selected_tag = vpn_outbound.get("tag") + + # Load fallback config + fallback = load_fallback_config() + fallback_enabled = fallback.get("enabled", False) + fallback_host = fallback.get("host", "") + fallback_port = fallback.get("port", 8080) + + # Build new outbounds + final_outbounds = [] + final_tag = selected_tag + + if fallback_enabled and fallback_host: + urltest_outbound = { + "type": "urltest", + "tag": "auto-select", + "outbounds": ["fallback-proxy", selected_tag], + "url": "http://www.gstatic.com/generate_204", + "interval": "30s", + "tolerance": 9999 + } + + fallback_outbound = { + "type": "http", + "tag": "fallback-proxy", + "server": fallback_host, + "server_port": fallback_port + } + + final_outbounds.append(urltest_outbound) + final_outbounds.append(fallback_outbound) + final_tag = "auto-select" + + final_outbounds.append(vpn_outbound) + final_outbounds.extend(utility_outbounds) + + config["outbounds"] = final_outbounds + config["route"]["final"] = final_tag + + # Write config + CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) + + # Reload sing-box + try: + urllib.request.urlopen(f"http://127.0.0.1:{RELOAD_PORT}/reload", timeout=3) + except Exception: + pass + + return True + + except Exception as e: + print(f"[WebUI] Failed to regenerate config: {e}") + return False + + def get_status(self): + """Get current proxy status""" + config_exists = CONFIG_FILE.exists() + current_tag = None + current_server = None + proxy_enabled = load_proxy_enabled() + + if config_exists: + try: + config = json.loads(CONFIG_FILE.read_text()) + for outbound in config.get("outbounds", []): + if outbound.get("type") == "vless": + current_tag = outbound.get("tag", "unknown") + current_server = outbound.get("server", "unknown") + break + except Exception: + pass + + self.send_json({ + "active": config_exists and proxy_enabled, + "tag": current_tag, + "server": current_server, + "proxyPort": PROXY_PORT, + "proxyEnabled": proxy_enabled, + "startTime": load_start_time() if config_exists and proxy_enabled else 0 + }) + + def get_proxy_enabled(self): + """Get proxy enabled state""" + enabled = load_proxy_enabled() + self.send_json({"enabled": enabled}) + + def set_proxy_enabled(self): + """Set proxy enabled state and regenerate config""" + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) + + enabled = data.get("enabled", True) + save_proxy_enabled(enabled) + + # Regenerate config based on state + if enabled: + # Restore normal VPN config + regenerated = self.regenerate_current_config() + if regenerated: + # Only update start time if actually enabling VPN + save_start_time(datetime.now(timezone.utc).timestamp()) + else: + # Generate direct config (bypass proxy) + regenerated = generate_direct_config() + save_start_time(0) + + self.send_json({ + "success": True, + "enabled": enabled, + "regenerated": regenerated + }) + + except json.JSONDecodeError: + self.send_json({"success": False, "error": "Invalid JSON"}, 400) + except Exception as e: + self.send_json({"success": False, "error": str(e)}, 500) + + def get_subscription(self): + """Get saved subscription info""" + sub = load_subscription() + if sub: + self.send_json({ + "saved": True, + "url": sub.get("url"), + "selectedServer": sub.get("selectedServer"), + "userInfo": sub.get("userInfo") + }) + else: + self.send_json({"saved": False}) + + def test_connection(self): + """Test active proxy connection""" + query_components = {} + if '?' in self.path: + _, query = self.path.split('?', 1) + query_components = parse_qs(query) + + enable_speed = query_components.get('speed', ['false'])[0].lower() == 'true' + + result = measure_proxy_performance(enable_speed_test=enable_speed) + self.send_json(result) + + def ping_target(self): + """Ping a specific target""" + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) + + server = data.get("server") + port = int(data.get("port", 443)) + + if not server: + self.send_json({"error": "No server specified"}, 400) + return + + latency = measure_tcp_latency(server, port) + self.send_json({"latency": latency}) + + except Exception as e: + self.send_json({"error": str(e)}, 500) + + def apply_config(self): + """Apply new config from VLESS URL""" + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) + url = data.get("url", "").strip() + + if not url: + self.send_json({"success": False, "error": "URL не указан"}, 400) + return + + if not url.startswith("vless://"): + self.send_json({"success": False, "error": "Неверный формат. Поддерживаются только vless:// ссылки"}, 400) + return + + # Parse VLESS URL + try: + vless_params = parse_vless_url(url) + except ValueError as e: + self.send_json({"success": False, "error": f"Ошибка парсинга URL: {str(e)}"}, 400) + return + + # Generate config + config = generate_vless_config(vless_params) + + # Ensure data directory exists + DATA_DIR.mkdir(parents=True, exist_ok=True) + + # Write config file + CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) + + # Trigger reload via internal control port + try: + urllib.request.urlopen(f"http://localhost:{RELOAD_PORT}/reload", timeout=5) + except Exception as e: + print(f"[WebUI] Warning: reload request failed: {e}") + + self.send_json({ + "success": True, + "message": f"Конфигурация '{vless_params['tag']}' успешно применена!" + }) + + # Save new start time as connection is reset + save_start_time(datetime.now(timezone.utc).timestamp()) + + except json.JSONDecodeError: + self.send_json({"success": False, "error": "Неверный JSON"}, 400) + except Exception as e: + self.send_json({"success": False, "error": str(e)}, 500) + + def fetch_subscription(self): + """Fetch servers list from subscription URL""" + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) + url = data.get("url", "").strip() + + if not url: + self.send_json({"success": False, "error": "URL подписки не указан"}, 400) + return + + # Validate URL scheme to prevent SSRF + try: + parsed_url = urlparse(url) + if parsed_url.scheme not in ('http', 'https'): + self.send_json({"success": False, "error": "Недопустимый протокол (только http/https)"}, 400) + return + except Exception: + self.send_json({"success": False, "error": "Некорректный URL"}, 400) + return + + # Fetch subscription config + sys_info = get_system_info() + req = urllib.request.Request( + url, + headers={ + "User-Agent": "singbox", + "x-hwid": get_hwid(), + "x-device-os": sys_info["os"], + "x-ver-os": sys_info["version"], + "x-device-model": APP_NAME + } + ) + + config = None + config_text = "" + user_info = {} + + try: + with urllib.request.urlopen(req, timeout=15) as response: + config_text = response.read().decode("utf-8") + + # Parse User Info header + user_info_header = response.headers.get("subscription-userinfo", "") + if user_info_header: + parts = user_info_header.split(';') + for part in parts: + if '=' in part: + key, value = part.strip().split('=', 1) + try: + user_info[key] = int(value) + except ValueError: + pass + + except urllib.error.HTTPError as e: + self.send_json({"success": False, "error": f"Ошибка HTTP: {e.code}"}, 400) + return + except urllib.error.URLError as e: + self.send_json({"success": False, "error": f"Ошибка подключения: {e.reason}"}, 400) + return + + # Try to parse as JSON first + try: + config = json.loads(config_text) + except json.JSONDecodeError: + # Not JSON - try Base64 decode or plain text VLESS links + content = config_text.strip() + + # Try Base64 decode + import base64 + import re + try: + # Check if it looks like Base64 + if re.match(r'^[A-Za-z0-9+/=\s]+$', content): + decoded = base64.b64decode(content).decode('utf-8') + content = decoded + except Exception: + pass # Not Base64, continue with original content + + # Parse VLESS links + lines = content.strip().split('\n') + vless_links = [line.strip() for line in lines if line.strip().startswith('vless://')] + + if not vless_links: + self.send_json({"success": False, "error": "Не найдены VLESS ссылки в ответе"}, 400) + return + + # Parse each VLESS link and create outbounds + outbounds = [] + for link in vless_links: + try: + params = parse_vless_url(link) + outbound = { + "type": "vless", + "tag": params['tag'], + "server": params['server'], + "server_port": params['server_port'], + "uuid": params['uuid'], + "flow": params['flow'], + "tls": { + "enabled": True, + "server_name": params['server_name'], + "utls": {"enabled": True, "fingerprint": params['fingerprint']}, + "reality": { + "enabled": True, + "public_key": params['public_key'], + "short_id": params['short_id'] + } + }, + "packet_encoding": "xudp" + } + outbounds.append(outbound) + except Exception as e: + print(f"[WebUI] Failed to parse VLESS link: {e}") + continue + + if not outbounds: + self.send_json({"success": False, "error": "Не удалось распарсить VLESS ссылки"}, 400) + return + + # Create a mock config with parsed outbounds + config = {"outbounds": outbounds} + + # Extract outbound servers + outbounds = config.get("outbounds", []) + servers = [] + + for outbound in outbounds: + if outbound.get("type") in ["vless", "vmess", "trojan", "shadowsocks", "hysteria2"]: + servers.append({ + "tag": outbound.get("tag", "unknown"), + "type": outbound.get("type"), + "server": outbound.get("server", "unknown"), + "server_port": outbound.get("server_port", 443) + }) + + if not servers: + self.send_json({"success": False, "error": "Серверы не найдены в подписке"}, 400) + return + + self.send_json({ + "success": True, + "servers": servers, + "config": config, + "userInfo": user_info + }) + + except json.JSONDecodeError: + self.send_json({"success": False, "error": "Неверный JSON в ответе"}, 400) + except Exception as e: + self.send_json({"success": False, "error": str(e)}, 500) + + def apply_subscription(self): + """Apply config from subscription with selected server""" + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) + + config = data.get("config") + selected_tag = data.get("selectedServer") + sub_url = data.get("subUrl") # URL подписки для сохранения + user_info = data.get("userInfo") + + if not config: + self.send_json({"success": False, "error": "Конфигурация не указана"}, 400) + return + + if not selected_tag: + self.send_json({"success": False, "error": "Сервер не выбран"}, 400) + return + + # Modify config to use only selected server + outbounds = config.get("outbounds", []) + new_outbounds = [] + selected_outbound = None + + for outbound in outbounds: + if outbound.get("tag") == selected_tag: + selected_outbound = outbound + elif outbound.get("type") in ["direct", "block", "dns"]: + new_outbounds.append(outbound) + elif outbound.get("type") == "selector": + # Skip selector, we'll add selected server directly + pass + + if not selected_outbound: + self.send_json({"success": False, "error": f"Сервер '{selected_tag}' не найден"}, 400) + return + + # Load fallback configuration + fallback = load_fallback_config() + fallback_enabled = fallback.get("enabled", False) + fallback_host = fallback.get("host", "") + fallback_port = fallback.get("port", 8080) + + # Build outbounds list + final_outbounds = [] + final_tag = selected_tag + + if fallback_enabled and fallback_host: + # Add URLTest for automatic fallback selection + # High tolerance (9999ms) ensures first working proxy is preferred + urltest_outbound = { + "type": "urltest", + "tag": "auto-select", + "outbounds": ["fallback-proxy", selected_tag], + "url": "http://www.gstatic.com/generate_204", + "interval": "30s", + "tolerance": 9999 # Use first working proxy, not fastest + } + + # Add HTTP fallback proxy + fallback_outbound = { + "type": "http", + "tag": "fallback-proxy", + "server": fallback_host, + "server_port": fallback_port + } + + final_outbounds.append(urltest_outbound) + final_outbounds.append(fallback_outbound) + final_tag = "auto-select" + + # Add selected VPN server + final_outbounds.append(selected_outbound) + + # Add utility outbounds (direct, block, dns) + final_outbounds.extend(new_outbounds) + + # Update route + routes = { + "final": final_tag, + "auto_detect_interface": True + } + + # Simplify DNS configuration to match client.json format + config["dns"] = { + "independent_cache": True + } + + # Remove platform-specific and experimental fields from root config + config.pop("platform", None) + config.pop("experimental", None) + + # Replace TUN inbounds with mixed proxy + config["inbounds"] = [ + { + "tag": "mixed-in", + "type": "mixed", + "sniff": True, + "users": [], + "listen": PROXY_BIND_IP, + "listen_port": PROXY_PORT, + "set_system_proxy": False + } + ] + + config["outbounds"] = final_outbounds + config["route"] = routes + + # Ensure data directory exists + DATA_DIR.mkdir(parents=True, exist_ok=True) + + # Write config file + CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) + + # Save subscription URL for persistence + if sub_url: + save_subscription(sub_url, selected_tag, user_info) + + # Trigger reload via internal control port + try: + urllib.request.urlopen(f"http://localhost:{RELOAD_PORT}/reload", timeout=5) + except Exception as e: + print(f"[WebUI] Warning: reload request failed: {e}") + + self.send_json({ + "success": True, + "message": f"Сервер '{selected_tag}' успешно применён!" + }) + + except json.JSONDecodeError: + self.send_json({"success": False, "error": "Неверный JSON"}, 400) + except Exception as e: + self.send_json({"success": False, "error": str(e)}, 500) diff --git a/web/app/config.py b/web/app/config.py new file mode 100644 index 0000000..485a6e4 --- /dev/null +++ b/web/app/config.py @@ -0,0 +1,31 @@ +import os +from pathlib import Path + +# Environment Configuration +PORT = int(os.environ.get("PORT", 3456)) +PROXY_PORT = int(os.environ.get("PROXY_PORT", 8080)) +RELOAD_PORT = int(os.environ.get("RELOAD_PORT", 9090)) +PROXY_BIND_IP = os.environ.get("PROXY_BIND_IP", "0.0.0.0") +APP_NAME = "VPN-Proxy-Control by Dokril" + +# Path Configuration +# web/app/config.py -> web/app -> web -> base +APP_DIR = Path(__file__).parent.parent +BASE_DIR = APP_DIR.parent +WEB_DIR = APP_DIR +DATA_DIR = BASE_DIR / "data" + +# File Paths +CONFIG_FILE = DATA_DIR / "client.json" +HWID_FILE = DATA_DIR / "hwid" +SUBSCRIPTION_FILE = DATA_DIR / "subscription.json" +FALLBACK_FILE = DATA_DIR / "fallback.json" +PROXY_ENABLED_FILE = DATA_DIR / "proxy_enabled.json" +START_TIME_FILE = DATA_DIR / "start_time.json" + +# Default fallback proxy settings +DEFAULT_FALLBACK = { + "enabled": False, + "host": "192.168.50.111", + "port": 8080 +} diff --git a/web/app/network.py b/web/app/network.py new file mode 100644 index 0000000..e9f1468 --- /dev/null +++ b/web/app/network.py @@ -0,0 +1,108 @@ +import socket +import time +import urllib.request +from .config import PROXY_PORT + +def measure_tcp_latency(host: str, port: int, timeout: float = 2.0) -> int: + """Measure TCP latency to a host:port in milliseconds""" + start_time = time.time() + try: + with socket.create_connection((host, port), timeout=timeout): + latency = (time.time() - start_time) * 1000 + return int(latency) + except Exception: + return -1 + + +def measure_proxy_performance(enable_speed_test: bool = False) -> dict: + """Measure proxy latency, speed and public IP via local proxy""" + proxy_url = f"http://127.0.0.1:{PROXY_PORT}" + proxies = {"http": proxy_url, "https": proxy_url} + + # 1. Measure Latency (Ping) + latency = "Timeout" + try: + start_time = time.time() + # Use a reliable endpoint for ping + opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) + req = urllib.request.Request("http://www.gstatic.com/generate_204", headers={"User-Agent": "singbox-test"}) + with opener.open(req, timeout=5) as response: + lat_ms = int((time.time() - start_time) * 1000) + latency = f"{lat_ms}ms" + except Exception as e: + latency = "Error" + + # 2. Get Public IP (IPv4) + ip = "Unknown" + try: + opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) + # Use v4.ident.me to force IPv4 + req = urllib.request.Request("http://v4.ident.me", headers={"User-Agent": "curl/7.68.0"}) + with opener.open(req, timeout=5) as response: + ip = response.read().decode('utf-8').strip() + except Exception: + # Fallback to ipify if ident.me fails or returns garbage + try: + req = urllib.request.Request("http://api.ipify.org", headers={"User-Agent": "curl/7.68.0"}) + with opener.open(req, timeout=5) as response: + ip = response.read().decode('utf-8').strip() + except Exception: + pass + + # 3. Measure Download Speed + speed_mbps = 0.0 + if enable_speed_test: + test_files = [ + # Tele2 Speedtest (Usually very reliable and fast) + ("https://speedtest.selectel.ru/100MB", 100), + # ThinkBroadband (Reliable backup) + ("https://speedtest.selectel.ru/1GB", 1000) + ] + + for url, size_mb in test_files: + try: + print(f"[WebUI] Testing speed with: {url}") + start_time = time.time() + opener = urllib.request.build_opener(urllib.request.ProxyHandler(proxies)) + # Set a longer timeout for speed tests + with opener.open(url, timeout=30) as response: + downloaded = 0 + # Larger chunk size for better throughput measurement + chunk_size = 1024 * 256 # 256KB chunks + + # Download for at least 2 seconds or up to 25MB for accurate measurement + min_test_duration = 2.0 # seconds + max_download_bytes = 25 * 1024 * 1024 # 25MB + + while True: + chunk = response.read(chunk_size) + if not chunk: + break + downloaded += len(chunk) + + elapsed = time.time() - start_time + # Stop if we've downloaded enough AND tested for minimum duration + if downloaded >= max_download_bytes or (elapsed >= min_test_duration and downloaded >= 2 * 1024 * 1024): + break + + duration = time.time() - start_time + if duration > 0.1 and downloaded > 0: + # Calculate speed in Mbps (megabits per second) + # downloaded bytes * 8 bits/byte / 1,000,000 / seconds + speed_mbps = round((downloaded * 8) / (1000 * 1000) / duration, 1) + print(f"[WebUI] Speed test: downloaded {downloaded / (1024*1024):.1f}MB in {duration:.1f}s = {speed_mbps} Mbps") + break # Stop if successful + except Exception as e: + print(f"[WebUI] Speed test failed for {url}: {e}") + continue + + result = { + "latency": latency, + "ip": ip + } + + if enable_speed_test: + # If speed is still 0.0 but we tried, return Error or 0.0 + result["speed"] = f"{speed_mbps} Mbps" + + return result diff --git a/web/app/storage.py b/web/app/storage.py new file mode 100644 index 0000000..6e0132d --- /dev/null +++ b/web/app/storage.py @@ -0,0 +1,80 @@ +import json +from .config import ( + DATA_DIR, SUBSCRIPTION_FILE, FALLBACK_FILE, PROXY_ENABLED_FILE, + START_TIME_FILE, DEFAULT_FALLBACK +) + +def save_subscription(url: str, selected_server: str = None, user_info: dict = None): + """Save subscription URL, selected server and user info to file""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + data = { + "url": url, + "selectedServer": selected_server, + "userInfo": user_info + } + SUBSCRIPTION_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2)) + + +def load_subscription() -> dict: + """Load subscription from file""" + if SUBSCRIPTION_FILE.exists(): + try: + return json.loads(SUBSCRIPTION_FILE.read_text()) + except json.JSONDecodeError: + pass + return None + + +def save_fallback_config(enabled: bool, host: str, port: int): + """Save fallback proxy configuration to file""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + data = { + "enabled": enabled, + "host": host, + "port": port + } + FALLBACK_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2)) + + +def load_fallback_config() -> dict: + """Load fallback proxy configuration from file""" + if FALLBACK_FILE.exists(): + try: + return json.loads(FALLBACK_FILE.read_text()) + except json.JSONDecodeError: + pass + return DEFAULT_FALLBACK.copy() + + +def save_proxy_enabled(enabled: bool): + """Save proxy enabled state to file""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + PROXY_ENABLED_FILE.write_text(json.dumps({"enabled": enabled})) + + +def load_proxy_enabled() -> bool: + """Load proxy enabled state from file""" + if PROXY_ENABLED_FILE.exists(): + try: + data = json.loads(PROXY_ENABLED_FILE.read_text()) + return data.get("enabled", True) + except json.JSONDecodeError: + pass + return True # Default: proxy enabled + + +def save_start_time(start_time: float): + """Save VPN start time to file""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + START_TIME_FILE.write_text(json.dumps({"startTime": start_time})) + + +def load_start_time() -> float: + """Load VPN start time from file""" + if START_TIME_FILE.exists(): + try: + data = json.loads(START_TIME_FILE.read_text()) + return data.get("startTime", 0.0) + except json.JSONDecodeError: + pass + return 0.0 diff --git a/web/app/utils.py b/web/app/utils.py new file mode 100644 index 0000000..ad369f0 --- /dev/null +++ b/web/app/utils.py @@ -0,0 +1,26 @@ +import platform +import uuid +from .config import DATA_DIR, HWID_FILE + +def get_hwid() -> str: + """Get or generate hardware ID""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + + if HWID_FILE.exists(): + return HWID_FILE.read_text().strip() + + # Generate new random HWID + hwid = uuid.uuid4().hex[:16] + HWID_FILE.write_text(hwid) + return hwid + + +def get_system_info() -> dict: + """Get system information for headers""" + system = platform.system().lower() # windows, linux, darwin + version = platform.release() # 10, 5.15.0, 22.0.0 + + return { + "os": system, + "version": version + } diff --git a/web/app/vless.py b/web/app/vless.py new file mode 100644 index 0000000..7521b95 --- /dev/null +++ b/web/app/vless.py @@ -0,0 +1,176 @@ +from urllib.parse import unquote +import json +import urllib.request +from .config import PROXY_PORT, DATA_DIR, CONFIG_FILE + +def parse_vless_url(url: str) -> dict: + """Parse VLESS URL and extract connection parameters""" + if not url.startswith("vless://"): + raise ValueError("URL must start with vless://") + + # Remove scheme + url_no_scheme = url[8:] + + # Split by fragment (#tag) + if '#' in url_no_scheme: + url_part, tag = url_no_scheme.split('#', 1) + tag = unquote(tag) + else: + url_part = url_no_scheme + tag = "reality" + + # Split by query (?) + if '?' in url_part: + uuid_host_port, query_string = url_part.split('?', 1) + else: + raise ValueError("Missing query parameters") + + # Parse UUID@host:port + if '@' not in uuid_host_port: + raise ValueError("Missing @ separator") + + uuid_str, host_port = uuid_host_port.split('@', 1) + + if ':' not in host_port: + raise ValueError("Missing port") + + host, port_str = host_port.rsplit(':', 1) + port = int(port_str) + + # Parse query parameters + params = {} + for param in query_string.split('&'): + if '=' in param: + key, value = param.split('=', 1) + params[key] = unquote(value) + + # Extract required parameters + pbk = params.get('pbk', '') + sid = params.get('sid', '') + sni = params.get('sni', host) + fp = params.get('fp', 'chrome') + flow = params.get('flow', '') + + if not pbk or not sid: + raise ValueError("Missing required parameters: pbk or sid") + + return { + 'uuid': uuid_str, + 'server': host, + 'server_port': port, + 'tag': tag, + 'public_key': pbk, + 'short_id': sid, + 'server_name': sni, + 'fingerprint': fp, + 'flow': flow + } + + +def generate_vless_config(vless_params: dict) -> dict: + """Generate sing-box configuration from VLESS parameters""" + config = { + "dns": { + "independent_cache": True + }, + "log": { + "level": "debug", + "disabled": True, + "timestamp": True + }, + "route": { + "final": vless_params['tag'], + "auto_detect_interface": True + }, + "inbounds": [ + { + "tag": "mixed-in", + "type": "mixed", + "sniff": True, + "users": [], + "listen": "0.0.0.0", + "listen_port": PROXY_PORT, + "set_system_proxy": False + } + ], + "outbounds": [ + { + "type": "vless", + "tag": vless_params['tag'], + "server": vless_params['server'], + "server_port": vless_params['server_port'], + "flow": vless_params['flow'], + "tls": { + "enabled": True, + "server_name": vless_params['server_name'], + "reality": { + "enabled": True, + "public_key": vless_params['public_key'], + "short_id": vless_params['short_id'] + }, + "utls": { + "enabled": True, + "fingerprint": vless_params['fingerprint'] + } + }, + "uuid": vless_params['uuid'] + }, + { + "tag": "direct", + "type": "direct" + } + ] + } + + return config + + +def generate_direct_config() -> bool: + """Generate a direct connection config (bypass all proxies)""" + try: + config = { + "dns": { + "independent_cache": True + }, + "log": { + "level": "debug", + "disabled": True, + "timestamp": True + }, + "route": { + "final": "direct", + "auto_detect_interface": True + }, + "inbounds": [ + { + "tag": "mixed-in", + "type": "mixed", + "sniff": True, + "users": [], + "listen": "0.0.0.0", + "listen_port": PROXY_PORT, + "set_system_proxy": False + } + ], + "outbounds": [ + { + "tag": "direct", + "type": "direct" + } + ] + } + + DATA_DIR.mkdir(parents=True, exist_ok=True) + CONFIG_FILE.write_text(json.dumps(config, indent=2, ensure_ascii=False)) + + # Reload sing-box + try: + urllib.request.urlopen("http://127.0.0.1:9090/reload", timeout=3) + except Exception: + pass + + return True + + except Exception as e: + print(f"[WebUI] Failed to generate direct config: {e}") + return False diff --git a/web/components/connection_info.html b/web/components/connection_info.html new file mode 100644 index 0000000..a123375 --- /dev/null +++ b/web/components/connection_info.html @@ -0,0 +1,37 @@ + +
Secure Shell v4.2
+Secure Shell v4.2
-