Files
sub-provider/app/services/bundle_cache.py
2026-04-01 16:19:08 +08:00

55 lines
1.6 KiB
Python

from __future__ import annotations
import hashlib
import json
import time
from pathlib import Path
from pydantic import BaseModel, Field
class BundleCacheEntry(BaseModel):
content: str
headers: dict[str, str] = Field(default_factory=dict)
def build_bundle_cache_key(*, client_type: str, source_names: list[str]) -> str:
raw = f"{client_type}|{','.join(source_names)}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def load_bundle_cache(*, cache_dir: Path, cache_key: str, ttl_seconds: int) -> BundleCacheEntry | None:
yaml_path = cache_dir / f"{cache_key}.yaml"
meta_path = cache_dir / f"{cache_key}.json"
if not yaml_path.is_file() or not meta_path.is_file():
return None
expires_at = meta_path.stat().st_mtime + ttl_seconds
if expires_at < time.time():
return None
try:
metadata = json.loads(meta_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return None
content = yaml_path.read_text(encoding="utf-8")
headers = metadata.get("headers")
if not isinstance(headers, dict):
headers = {}
return BundleCacheEntry(content=content, headers={str(k): str(v) for k, v in headers.items()})
def save_bundle_cache(
*,
cache_dir: Path,
cache_key: str,
content: str,
headers: dict[str, str],
) -> Path:
cache_dir.mkdir(parents=True, exist_ok=True)
yaml_path = cache_dir / f"{cache_key}.yaml"
meta_path = cache_dir / f"{cache_key}.json"
yaml_path.write_text(content, encoding="utf-8")
meta_path.write_text(json.dumps({"headers": headers}, ensure_ascii=False, indent=2), encoding="utf-8")
return yaml_path