Files
sub-provider/app/services/rules.py
riglen 09a9faa1be opt
2026-03-31 16:39:23 +08:00

106 lines
3.5 KiB
Python

from __future__ import annotations
from pathlib import Path
import yaml
from app.config import get_settings
from app.models import AppConfig, ClientConfig
def resolve_policy(policy: str, client: ClientConfig) -> str:
return (
policy.replace("{{ main_policy }}", client.main_policy)
.replace("{{main_policy}}", client.main_policy)
.replace("{{ direct_policy }}", client.direct_policy)
.replace("{{direct_policy}}", client.direct_policy)
)
def load_rule_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def load_rule_payload(path: Path) -> list[str]:
if path.suffix.lower() in {".yaml", ".yml"}:
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
payload = data.get("payload", [])
if not isinstance(payload, list):
raise ValueError(f"Rule file {path.name} must contain a list field named 'payload'")
return [str(item).strip() for item in payload if str(item).strip()]
lines: list[str] = []
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
lines.append(stripped)
return lines
def _resolve_rule_lines(rule_name: str, app_config: AppConfig, client: ClientConfig) -> list[str]:
rule = app_config.rules[rule_name]
target = resolve_policy(rule.policy, client)
lines: list[str] = []
for payload_line in rule.payload:
line = f"{payload_line},{target}"
if rule.no_resolve:
line += ",no-resolve"
lines.append(line)
if rule.file:
line = f"RULE-SET,{rule_name},{target}"
if rule.no_resolve:
line += ",no-resolve"
lines.append(line)
return lines
def build_rule_provider_entries(app_config: AppConfig, client: ClientConfig, base_url: str, public_path: str):
providers: dict[str, dict] = {}
for name, rule in app_config.rules.items():
if not rule.file:
continue
entry = {
"behavior": rule.behavior,
"format": rule.format,
"url": f"{base_url}/{public_path}/rules/{name}.yaml",
"interval": rule.interval,
}
providers[name] = entry
return providers
def build_rule_set_references(app_config: AppConfig, client: ClientConfig) -> list[str]:
refs: list[str] = []
for name in app_config.rules:
refs.extend(_resolve_rule_lines(name, app_config, client))
refs.append(f"MATCH,{client.main_policy}")
return refs
def build_inline_rules(app_config: AppConfig, client: ClientConfig) -> list[str]:
settings = get_settings()
lines: list[str] = []
for name, rule in app_config.rules.items():
for payload_line in rule.payload:
line = f"{payload_line},{resolve_policy(rule.policy, client)}"
if rule.no_resolve:
line += ",no-resolve"
lines.append(line)
if not rule.file:
continue
path = (settings.rules_dir / rule.file).resolve()
if not path.is_file() or settings.rules_dir.resolve() not in path.parents:
raise FileNotFoundError(f"Rule file missing: {rule.file}")
target = resolve_policy(rule.policy, client)
for payload_line in load_rule_payload(path):
line = f"{payload_line},{target}"
if rule.no_resolve:
line += ",no-resolve"
lines.append(line)
lines.append(f"MATCH,{client.main_policy}")
return lines