from __future__ import annotations import re from typing import Any import yaml from app.models import AppConfig, ClientConfig, ProxyGroupConfig, SourceSnapshot from app.services.rules import build_inline_rules, build_rule_provider_entries, build_rule_set_references def dump_yaml(data: dict[str, Any]) -> str: return yaml.safe_dump(data, allow_unicode=True, sort_keys=False, default_flow_style=False) def _expand_proxy_tokens( proxies: list[str], *, client: ClientConfig, source_auto_names: list[str], selector_names: list[str], ) -> list[str]: expanded: list[str] = [] tokens = { "{{ main_policy }}": [client.main_policy], "{{main_policy}}": [client.main_policy], "{{ source_policy }}": [client.source_policy], "{{source_policy}}": [client.source_policy], "{{ mixed_auto_policy }}": [client.mixed_auto_policy], "{{mixed_auto_policy}}": [client.mixed_auto_policy], "{{ manual_policy }}": [client.manual_policy], "{{manual_policy}}": [client.manual_policy], "{{ direct_policy }}": [client.direct_policy], "{{direct_policy}}": [client.direct_policy], "{{ source_auto_groups }}": source_auto_names, "{{source_auto_groups}}": source_auto_names, "{{ selector_groups }}": selector_names, "{{selector_groups}}": selector_names, } for item in proxies: expanded.extend(tokens.get(item, [item])) return expanded def _build_thin_filter_group( *, client_type: str, client: ClientConfig, group: ProxyGroupConfig, selected_source_names: list[str], ) -> dict[str, Any]: built: dict[str, Any] = { "name": group.name, "type": group.type, "filter": group.filter, } if group.type == "url-test": built["url"] = str(group.url or client.test_url) built["interval"] = group.interval or client.test_interval if group.tolerance is not None: built["tolerance"] = group.tolerance if client_type == "mihomo": built["use"] = selected_source_names else: built["include-all"] = True return built def _build_bundle_filter_group( *, client: ClientConfig, group: ProxyGroupConfig, all_proxy_names: list[str], ) -> dict[str, Any]: matched = [name for name in all_proxy_names if group.filter and re.search(group.filter, name)] built: dict[str, Any] = { "name": group.name, "type": group.type, "proxies": matched or [client.direct_policy], } if group.type == "url-test": built["url"] = str(group.url or client.test_url) built["interval"] = group.interval or client.test_interval if group.tolerance is not None: built["tolerance"] = group.tolerance return built def _build_custom_policy_groups( *, app_config: AppConfig, client: ClientConfig, source_auto_names: list[str], selector_names: list[str], ) -> list[dict[str, Any]]: groups: list[dict[str, Any]] = [] for group in app_config.policy_groups: built: dict[str, Any] = { "name": group.name, "type": group.type, "proxies": _expand_proxy_tokens( group.proxies, client=client, source_auto_names=source_auto_names, selector_names=selector_names, ), } if group.type == "url-test": built["url"] = str(group.url or client.test_url) built["interval"] = group.interval or client.test_interval if group.tolerance is not None: built["tolerance"] = group.tolerance groups.append(built) return groups def build_thin_profile( *, client_type: str, app_config: AppConfig, client: ClientConfig, selected_source_names: list[str], base_url: str, public_path: str, ) -> dict[str, Any]: profile: dict[str, Any] = { "mode": client.mode, "ipv6": client.ipv6, } if client.log_level: profile["log-level"] = client.log_level if client_type == "mihomo": if client.mixed_port is not None: profile["mixed-port"] = client.mixed_port if client.socks_port is not None: profile["socks-port"] = client.socks_port profile["allow-lan"] = client.allow_lan proxy_providers: dict[str, dict[str, Any]] = {} for name in selected_source_names: if client_type == "mihomo": proxy_providers[name] = { "type": "http", "url": f"{base_url}/{public_path}/providers/{name}.yaml", "path": f"./providers/{name}.yaml", "interval": client.provider_interval, "health-check": { "enable": True, "url": str(client.test_url), "interval": client.test_interval, }, } else: proxy_providers[name] = { "url": f"{base_url}/{public_path}/providers/{name}.yaml", "interval": client.provider_interval, } profile["proxy-providers"] = proxy_providers profile["proxy-groups"] = _build_thin_groups(client_type, app_config, client, selected_source_names) profile["rule-providers"] = build_rule_provider_entries(app_config, client, base_url, public_path) profile["rules"] = build_rule_set_references(app_config, client) return profile def build_bundle_profile( *, client_type: str, app_config: AppConfig, client: ClientConfig, snapshots: list[SourceSnapshot], ) -> dict[str, Any]: profile: dict[str, Any] = { "mode": client.mode, "ipv6": client.ipv6, } if client.log_level: profile["log-level"] = client.log_level if client_type == "mihomo": if client.mixed_port is not None: profile["mixed-port"] = client.mixed_port if client.socks_port is not None: profile["socks-port"] = client.socks_port profile["allow-lan"] = client.allow_lan proxies: list[dict[str, Any]] = [] source_proxy_names: dict[str, list[str]] = {} seen: set[str] = set() for snapshot in snapshots: names: list[str] = [] for proxy in snapshot.document.proxies: candidate = dict(proxy) name = str(candidate.get("name", "")).strip() if not name: continue original = name index = 2 while name in seen: name = f"{original} #{index}" index += 1 seen.add(name) candidate["name"] = name proxies.append(candidate) names.append(name) source_proxy_names[snapshot.name] = names profile["proxies"] = proxies profile["proxy-groups"] = _build_bundle_groups(app_config, client, snapshots, source_proxy_names) profile["rules"] = build_inline_rules(app_config, client) return profile def _build_thin_groups(client_type: str, app_config: AppConfig, client: ClientConfig, selected_source_names: list[str]) -> list[dict[str, Any]]: groups: list[dict[str, Any]] = [] source_auto_names: list[str] = [] for source_name in selected_source_names: display_name = app_config.sources[source_name].display_name or source_name group_name = f"{display_name} 自动" source_auto_names.append(group_name) groups.append( { "name": group_name, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "use": [source_name], } ) if client_type == "mihomo": mixed_auto = { "name": client.mixed_auto_policy, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "include-all-providers": True, } manual = { "name": client.manual_policy, "type": "select", "proxies": [client.direct_policy], "include-all-providers": True, } else: mixed_auto = { "name": client.mixed_auto_policy, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "include-all": True, } manual = { "name": client.manual_policy, "type": "select", "proxies": [client.direct_policy], "include-all": True, } groups.append(mixed_auto) selector_names: list[str] = [] for region in app_config.regions.values(): group = { "name": region.name, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "filter": region.filter, "tolerance": region.tolerance, } if client_type == "mihomo": group["include-all-providers"] = True else: group["include-all"] = True groups.append(group) selector_names.append(region.name) for selector in app_config.selector_groups: if selector.filter: groups.append( _build_thin_filter_group( client_type=client_type, client=client, group=selector, selected_source_names=selected_source_names, ) ) else: groups.append( { "name": selector.name, "type": selector.type, "proxies": _expand_proxy_tokens( selector.proxies, client=client, source_auto_names=source_auto_names, selector_names=selector_names, ), } ) selector_names.append(selector.name) groups.append( { "name": client.source_policy, "type": "select", "proxies": [client.mixed_auto_policy, *source_auto_names, client.direct_policy], } ) groups.append(manual) groups.extend( _build_custom_policy_groups( app_config=app_config, client=client, source_auto_names=source_auto_names, selector_names=selector_names, ) ) groups.append( { "name": client.main_policy, "type": "select", "proxies": [ client.source_policy, client.mixed_auto_policy, *selector_names, client.manual_policy, client.direct_policy, ], } ) return groups def _build_bundle_groups( app_config: AppConfig, client: ClientConfig, snapshots: list[SourceSnapshot], source_proxy_names: dict[str, list[str]], ) -> list[dict[str, Any]]: groups: list[dict[str, Any]] = [] source_auto_names: list[str] = [] all_proxy_names = [name for names in source_proxy_names.values() for name in names] for snapshot in snapshots: group_name = f"{snapshot.display_name} 自动" source_auto_names.append(group_name) groups.append( { "name": group_name, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "proxies": source_proxy_names.get(snapshot.name) or [client.direct_policy], } ) groups.append( { "name": client.mixed_auto_policy, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "proxies": all_proxy_names or [client.direct_policy], } ) selector_names: list[str] = [] for region in app_config.regions.values(): matched = [name for name in all_proxy_names if re.search(region.filter, name)] groups.append( { "name": region.name, "type": "url-test", "url": str(client.test_url), "interval": client.test_interval, "tolerance": region.tolerance, "proxies": matched or [client.direct_policy], } ) selector_names.append(region.name) for selector in app_config.selector_groups: if selector.filter: groups.append( _build_bundle_filter_group( client=client, group=selector, all_proxy_names=all_proxy_names, ) ) else: groups.append( { "name": selector.name, "type": selector.type, "proxies": _expand_proxy_tokens( selector.proxies, client=client, source_auto_names=source_auto_names, selector_names=selector_names, ), } ) selector_names.append(selector.name) groups.append( { "name": client.source_policy, "type": "select", "proxies": [client.mixed_auto_policy, *source_auto_names, client.direct_policy], } ) groups.append( { "name": client.manual_policy, "type": "select", "proxies": [*all_proxy_names, client.direct_policy] if all_proxy_names else [client.direct_policy], } ) groups.extend( _build_custom_policy_groups( app_config=app_config, client=client, source_auto_names=source_auto_names, selector_names=selector_names, ) ) groups.append( { "name": client.main_policy, "type": "select", "proxies": [ client.source_policy, client.mixed_auto_policy, *selector_names, client.manual_policy, client.direct_policy, ], } ) return groups