如何正确使用 pg-promise 实现事务内批量数据库操作并避免未捕获异常

本文详解 pg-promise 中事务(`db.tx`)与 promise 批量执行的正确模式,指出 `t.batch()` 已废弃,推荐使用 `await` 串行调用或 `promise.all()` 并行执行,并强调统一错误处理与上下文传递(如传入 `t`)是避免“uncaught exception”的关键。

在使用 pg-promise 构建健壮的数据库事务逻辑时,一个常见陷阱是:将已执行(即已触发)的 Promise 直接传入 t.batch() 或封装函数中,导致错误无法被事务上下文捕获。你遇到的“服务器崩溃 + Uncaught Exception”正是这一问题的典型表现——根本原因在于:Promise 一旦创建即开始执行,若其内部抛出异步错误而未被及时 .catch(),且又未被事务作用域包裹,则会逸出到全局,触发 Node.js 的 unhandledRejection 事件,最终导致进程崩溃(尤其在未监听该事件时)

✅ 正确做法:延迟执行 + 统一错误处理

核心原则是:所有数据库操作必须在事务回调函数内部、由 t(事务对象)发起,而非提前在外部构造 Promise。这意味着:

  • ❌ 错误示范(提前执行):

    // ⚠️ addToColumn 立即执行!返回的是一个正在运行的 Promise
    const queries = [
      addToColumn('envelopes', 'budget', id1, -amt, t), // 此处 t 未定义 → 实际调用 db.one → 脱离事务!
      addToColumn('envelopes', 'budget', id2, amt, t)
    ];
    return t.batch(queries); // 传入的是“已启动”的 Promise,错误无法被 t 捕获
  • ✅ 正确示范(延迟执行,由 t 驱动):

    async function transferEnvelopeBudgetByIds(req, res, next) {
      try {
        const result = await db.tx(async t => {
          // ✅ 所有操作均在 t 作用域内按需执行,自动受事务保护
          const from = await t.one(
            'UPDATE ${table:name} SET ${column:name} = ${column:name} + ${amount:csv} WHERE id = ${id:csv} RETURNING *',
            { table: 'envelopes', column: 'budget', amount: -req.transferBudget, id: req.envelopeFromId }
          );
          const to = await t.one(
            'UPDATE ${table:name} SET ${column:name} = ${column:name} + ${amount:csv} WHERE id = ${id:csv} RETURNING *',
            { table: 'envelopes', column: 'budget', amount: req.transferBudget, id: req.envelopeToId }
          );
          return { from, to };
        });
        req.updatedEnvelopes = result;
        next();
      } catch (err) {
        // ✅ 所有数据库错误(连接失败、SQL 错误、约束冲突等)均由 db.tx 自动捕获并回滚
        next(err);
      }
    }

?️ 进阶优化:复用查询逻辑(推荐)

为保持代码可维护性,可将参数化查询封装为纯函数,但务必接受 t 参数并默认回退到 db

// ✅ 安全的可复用查询函数:支持事务内(t)和独立(db)两种上下文
function addToColumn(tableName, columnName, entryId, amountToAdd, t = db) {
  return t.one(
    'UPDATE ${table:name} SET ${column:name} = ${column:name} + ${amount:csv} WHERE id = ${id:csv} RETURNING *',
    { table: tableName, column: columnName, amount: amountToAdd, id: entryId }
  );
}

// 在事务中调用(自动使用 t)
async function transferEnvelopeBudgetByIds(req, res, next) {
  try {
    const result = await db.tx(async t => {
      const [from, to] = await Promise.all([
        addToColumn('envelopes', 'budget', req.envelopeFromId, -req.transferBudget, t),
        addToColumn('envelopes', 'budget', req.envelopeToId, req.transferBudget, t)
      ]);
      return { from, to };
    });
    req.updatedEnvelopes = result;
    next();
  } catch (err) {
    next(err);
  }
}
? 提示:Promise.all() 在此场景下是安全的,因为所有 Promise 均由 t 创建,其 rejection 会被 db.tx() 统一捕获并触发自动回滚。若需严格顺序执行(如依赖前序结果),则改用 await 逐个调用。

⚠️ 关键注意事项

  • t.batch() 已废弃:pg-promise v10+ 中 Task.batch() 方法已被移除,官方文档明确推荐直接使用 Promise.all() 或 await 序列。
  • 绝不提前执行 Promise:任何数据库操作(如 db.one(), t.one())都应在 db.tx() 回调内部调用,确保其生命周期完全处于事务控制之下。
  • 错误处理集中化:事务内的所有错误应由 db.tx() 自动处理(自动回滚),外部只需 try/catch 捕获最终异常并交由 Express 错误中间件处理,避免在每个查询后 .catch()(这会中断事务链路)。
  • 环境健壮性:建议全局监听 process.on('unhandledRejection') 作为兜底,防止遗漏的 Promise 错误导致进程退出:
    process.on('unhandledRejection', (reason, promise) => {
      console.error('Unhandled Rejection at:', promise, 'reason:', reason);
      // 记录日志、告警,但不要在此处调用 process.exit()
    });

遵循以上模式,即可彻底规避“Uncaught Exception”,构建出高可靠性、易维护的 PostgreSQL 事务逻辑。