from __future__ import annotations from pathlib import Path import yaml from app.config import get_settings from app.models import AppConfig, ClientConfig def resolve_policy(policy: str, client: ClientConfig) -> str: return ( policy.replace("{{ main_policy }}", client.main_policy) .replace("{{main_policy}}", client.main_policy) .replace("{{ direct_policy }}", client.direct_policy) .replace("{{direct_policy}}", client.direct_policy) ) def load_rule_text(path: Path) -> str: return path.read_text(encoding="utf-8") def load_rule_payload(path: Path) -> list[str]: if path.suffix.lower() in {".yaml", ".yml"}: data = yaml.safe_load(path.read_text(encoding="utf-8")) or {} payload = data.get("payload", []) if not isinstance(payload, list): raise ValueError(f"Rule file {path.name} must contain a list field named 'payload'") return [str(item).strip() for item in payload if str(item).strip()] lines: list[str] = [] for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue lines.append(stripped) return lines def _render_payload_line(payload_line: str, behavior: str) -> str: if "," in payload_line or behavior == "classical": return payload_line if behavior == "ipcidr": return f"IP-CIDR,{payload_line}" if behavior == "domain": return f"DOMAIN-SUFFIX,{payload_line}" return payload_line def _attach_policy(rendered_line: str, target: str, append_no_resolve: bool) -> str: parts = [part.strip() for part in rendered_line.split(",")] if parts and parts[-1] == "no-resolve": parts.insert(len(parts) - 1, target) line = ",".join(parts) else: line = f"{rendered_line},{target}" if append_no_resolve: line += ",no-resolve" return line def _resolve_rule_lines(rule_name: str, app_config: AppConfig, client: ClientConfig) -> list[str]: rule = app_config.rules[rule_name] target = resolve_policy(rule.policy, client) lines: list[str] = [] for payload_line in rule.payload: rendered = _render_payload_line(payload_line, rule.behavior) lines.append(_attach_policy(rendered, target, rule.no_resolve)) if rule.file: line = f"RULE-SET,{rule_name},{target}" if rule.no_resolve: line += ",no-resolve" lines.append(line) return lines def build_rule_provider_entries(app_config: AppConfig, client: ClientConfig, base_url: str, public_path: str): providers: dict[str, dict] = {} for name, rule in app_config.rules.items(): if not rule.file: continue entry = { "behavior": rule.behavior, "format": rule.format, "url": f"{base_url}/{public_path}/rules/{name}.yaml", "interval": rule.interval, } providers[name] = entry return providers def build_rule_set_references(app_config: AppConfig, client: ClientConfig) -> list[str]: refs: list[str] = [] for name in app_config.rules: refs.extend(_resolve_rule_lines(name, app_config, client)) refs.append(f"MATCH,{client.main_policy}") return refs def build_inline_rules(app_config: AppConfig, client: ClientConfig) -> list[str]: settings = get_settings() lines: list[str] = [] for name, rule in app_config.rules.items(): for payload_line in rule.payload: line = f"{payload_line},{resolve_policy(rule.policy, client)}" if rule.no_resolve: line += ",no-resolve" lines.append(line) if not rule.file: continue path = (settings.rules_dir / rule.file).resolve() if not path.is_file() or settings.rules_dir.resolve() not in path.parents: raise FileNotFoundError(f"Rule file missing: {rule.file}") target = resolve_policy(rule.policy, client) for payload_line in load_rule_payload(path): rendered = _render_payload_line(payload_line, rule.behavior) lines.append(_attach_policy(rendered, target, rule.no_resolve)) lines.append(f"MATCH,{client.main_policy}") return lines