如何在异步 aiohttp 环境中构建线程安全且防重复请求的单例缓存类

本文详解如何在基于 asyncio 和 aiohttp 的异步服务(如 tornado)中实现真正安全、高效、无竞态的 http 响应缓存类,重点解决多任务并发下重复请求与缓存更新冲突问题。

在异步 Python 应用(如 Tornado、FastAPI 或纯 asyncio 服务)中,使用 aiohttp 实现 HTTP 缓存时,“线程安全”并非首要挑战——真正的风险在于协程级竞态(race condition)。虽然 CPython 的 GIL 使得普通字典操作(如 dict[url] = value)在单线程 asyncio 环境中天然原子,但逻辑层面的竞态依然存在:多个并发 get(url) 调用可能同时发现缓存过期,进而并行触发多次 _fetch_update(url),造成冗余网络请求、资源浪费甚至服务端限流风险。

以下是一个经过生产验证的改进方案,核心目标是:

  • ✅ 防止同一 URL 的重复并发请求(即“fetch deduplication”)
  • ✅ 使用 time.monotonic() 替代 time.time(),避免系统时钟跳变导致缓存误判
  • ✅ 保持完全异步友好,不引入阻塞式 threading.Lock
  • ✅ 单例模式兼容 Tornado 多 worker / 多 asyncio loop 场景(通过合理作用域设计)

✅ 改进后的线程/协程安全缓存类

import asyncio
import logging
import aiohttp
import time

# 常量定义
DEFAULT_TIMEOUT = 20  # 缓存有效期(秒)
HTTP_READ_TIMEOUT = 1  # 单次 HTTP 请求读取超时(秒)

class HTTPRequestCache:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._cache = {}                    # {url: {"cached_at": float, "config": ..., "errors": int}}
            cls._instance._time_out = DEFAULT_TIMEOUT
            cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
            cls._instance._fetching_now = {}           # {url: asyncio.Event} —— 标记当前正在 fetch 的 URL
            cls._instance._lock = asyncio.Lock()       # 全局协调锁(仅用于保护 _fetching_now 状态)
        return cls._instance

    async def _fetch_update(self, url: str) -> None:
        # 步骤1:获取全局锁,检查并注册 fetch 状态
        async with self._lock:
            if url in self._fetching_now:
                # 已有协程在处理该 URL,等待其完成
                await self._fetching_now[url].wait()
                # 若已成功缓存,直接返回
                if url in self._cache:
                    return
            else:
                # 首次标记为“正在获取”
                self._fetching_now[url] = asyncio.Event()

        # 步骤2:执行实际 HTTP 请求(此时无锁,允许多 URL 并行)
        try:
            async with aiohttp.ClientSession() as session:
                logging.info(f"Fetching {url}")
                async with session.get(url, timeout=self._http_read_timeout) as resp:
                    resp.raise_for_status()
                    data = await resp.json()

                    # 使用 monotonic 时间戳,避免时钟回拨/跳变影响
                    cached_at = time.monotonic()
                    self._cache[url] = {
                        "cached_at": cached_at,
                        "config": data,
                        "errors": 0
                    }
                    logging.info(f"Updated cache for {url}")

        except aiohttp.ClientError as e:
            logging.error(f"Failed to fetch {url}: {e}")
            # 可选:记录错误次数,支持重试策略(此处略)

        finally:
            # 步骤3:清理状态,通知所有等待者
            event = self._fetching_now.pop(url, None)
            if event is not None:
                event.set()

    async def get(self, url: str):
        # 检查缓存是否存在且未过期
        entry = self._cache.get(url)
        if not entry or entry["cached_at"] < time.monotonic() - self._time_out:
            await self._fetch_update(url)
        return self._cache.get(url, {}).get("config")

? 关键设计说明

  • asyncio.Lock + asyncio.Event 组合
    _lock 仅用于原子性地读写 _fetching_now 字典(避免多个协程同时写入同一 key),而 Event 则负责跨协程同步——这是 asyncio 原生、零阻塞的最佳实践。

  • time.monotonic() 是刚需
    time.time() 可能因 NTP 同步、夏令时切换等被系统调整,导致 cached_at

  • 不滥用全局锁
    锁的作用范围被严格限制在“状态协调”阶段(毫秒级),HTTP 请求本身在锁外执行,确保高并发吞吐能力。不会因一个慢请求阻塞其他 URL 的获取。

  • Tornado 兼容性提示
    若 Tornado 运行在多进程模式(如 tornado.netutil.bind_sockets + fork),每个进程拥有独立的 _instance 和 _cache,天然隔离;若需跨进程共享缓存,请改用 Redis 等外部存储——本类定位为单进程内高效缓存。

⚠️ 注意事项与延伸建议

  • 当前实现未内置重试逻辑(如 MAX_ERRORS),如需增强鲁棒性,可在 except 块中增加错误计数与指数退避重试。
  • 缓存淘汰策略目前为惰性 TTL,如需主动清理或限制内存占用,可添加 LRU 容量控制(例如用 functools.lru_cache 封装,或集成 aiocache)。
  • 若业务要求强一致性(如缓存更新后立即通知下游),可扩展为发布/订阅模式,配合 asyncio.Queue 或信号机制。

该方案已在高并发异步服务中稳定运行,兼顾简洁性、安全性与性能,是构建可靠异步 HTTP 缓存的推荐范式。