init
This commit is contained in:
80
app/services/rules.py
Normal file
80
app/services/rules.py
Normal file
@@ -0,0 +1,80 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
from app.config import get_settings
|
||||
from app.models import AppConfig, ClientConfig, RuleConfig
|
||||
|
||||
|
||||
def resolve_policy(policy: str, client: ClientConfig) -> str:
|
||||
return (
|
||||
policy.replace("{{ main_policy }}", client.main_policy)
|
||||
.replace("{{main_policy}}", client.main_policy)
|
||||
.replace("{{ direct_policy }}", client.direct_policy)
|
||||
.replace("{{direct_policy}}", client.direct_policy)
|
||||
)
|
||||
|
||||
|
||||
def load_rule_text(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def load_rule_payload(path: Path) -> list[str]:
|
||||
if path.suffix.lower() in {".yaml", ".yml"}:
|
||||
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
|
||||
payload = data.get("payload", [])
|
||||
if not isinstance(payload, list):
|
||||
raise ValueError(f"Rule file {path.name} must contain a list field named 'payload'")
|
||||
return [str(item).strip() for item in payload if str(item).strip()]
|
||||
|
||||
lines: list[str] = []
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith("#"):
|
||||
continue
|
||||
lines.append(stripped)
|
||||
return lines
|
||||
|
||||
|
||||
def build_rule_provider_entries(app_config: AppConfig, client: ClientConfig, base_url: str, public_path: str):
|
||||
providers: dict[str, dict] = {}
|
||||
for name, rule in app_config.rules.items():
|
||||
entry = {
|
||||
"behavior": rule.behavior,
|
||||
"format": rule.format,
|
||||
"url": f"{base_url}/{public_path}/rules/{name}.yaml",
|
||||
"interval": rule.interval,
|
||||
}
|
||||
providers[name] = entry
|
||||
return providers
|
||||
|
||||
|
||||
def build_rule_set_references(app_config: AppConfig, client: ClientConfig) -> list[str]:
|
||||
refs: list[str] = []
|
||||
for name, rule in app_config.rules.items():
|
||||
target = resolve_policy(rule.policy, client)
|
||||
line = f"RULE-SET,{name},{target}"
|
||||
if rule.no_resolve:
|
||||
line += ",no-resolve"
|
||||
refs.append(line)
|
||||
refs.append(f"MATCH,{client.main_policy}")
|
||||
return refs
|
||||
|
||||
|
||||
def build_inline_rules(app_config: AppConfig, client: ClientConfig) -> list[str]:
|
||||
settings = get_settings()
|
||||
lines: list[str] = []
|
||||
for rule in app_config.rules.values():
|
||||
path = (settings.rules_dir / rule.file).resolve()
|
||||
if not path.is_file() or settings.rules_dir.resolve() not in path.parents:
|
||||
raise FileNotFoundError(f"Rule file missing: {rule.file}")
|
||||
target = resolve_policy(rule.policy, client)
|
||||
for payload_line in load_rule_payload(path):
|
||||
line = f"{payload_line},{target}"
|
||||
if rule.no_resolve:
|
||||
line += ",no-resolve"
|
||||
lines.append(line)
|
||||
lines.append(f"MATCH,{client.main_policy}")
|
||||
return lines
|
||||
Reference in New Issue
Block a user