如何高效合并两个有序文本文件并自动去重(基于时间顺序的智能追加)

本文介绍一种针对大型有序日志/时间序列文本文件的高效合并方法:在保持严格时间顺序的前提下,自动识别并跳过两文件间的重叠行,避免全量去重开销。

当处理按时间戳严格排序的大规模文本日志(如每日导出的 CSV 格式记录)时,常见的“追加+全局去重”方案(如 set 或 OrderedDict.fromkeys)存在明显缺陷:它忽略数据的天然有序性,强制加载全部内容、破坏原始顺序逻辑,并在内存中进行 O(n) 重复扫描——这对 GB 级文件极不友好。

更优解是利用有序性做边界探测与流式合并。核心思路如下:

  1. 定位重叠边界:读取 file1 的最后一行和 file2 的第一行,解析时间戳(如 2025-01-29 09:00:00),判断是否重叠;
  2. 跳过前缀重复段:若 file1 末行时间 ≤ file2 首行时间,说明存在重叠;此时从 file2 中找到第一个严格大于 file1 末行时间的行,从此处开始追加;
  3. 流式写入,零冗余内存:全程仅缓存关键行(最多几行),不加载整个文件到内存。

以下是生产级推荐实现(支持超大文件、安全、可复用):

from datetime import datetime

def smart_append_ordered_files(
    target_path: str,
    source_path: str,
    timestamp_format: str = "%Y-%m-%d %H:%M:%S",
    time_col_index: int = 0,
    delimiter: str = ","
) -> None:
    """
    将 source_path 文件智能追加到 target_path,自动跳过时间重叠行。
    假设两文件均按 timestamp_format 格式严格升序排列。
    """
    # 步骤1:读取 target 文件末行(仅最后一行)
    last_line = ""
    with open(target_path, "r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                last_line = line.strip()

    if not last_line:
        # target 为空,直接复制 source
        with open(source_path, "r", encoding="utf-8") as src, \
             open(target_path, "a", encoding="utf-8") as tgt:
            tgt.write(src.read())
        return

    # 解析 target 末行时间戳
    try:
        last_ts = datetime.strptime(last_line.split(delimiter)[time_col_index].strip(), timestamp_format)
    except (ValueError, IndexError) as e:
        raise ValueError(f"无法解析 target 文件末行时间戳: {last_line}") from e

    # 步骤2:流式读取 source,跳过 <= last_ts 的所有行
    appended = False
    with open(source_path, "r", encoding="utf-8") as src, \
         open(target_path, "a", encoding="utf-8") as tgt:
        for line in src:
            line = line.strip()
            if not line:
                continue
            try:
                # 提取并解析该行时间戳
                ts_str = line.split(delimiter)[time_col_index].strip()
                curr_ts = datetime.strptime(ts_str, timestamp_format)
                if curr_ts > last_ts:  # 严格大于才追加
                    if not appended:
                        tgt.write("\n")  # 补一个换行确保格式整洁
                        appended = True
                    tgt.write(line + "\n")
            except (ValueError, IndexError):
                # 时间解析失败 → 默认追加(保守策略,避免丢数据)
                if not appended:
                    tgt.write("\n")
                    appended = True
                tgt.write(line + "\n")

# 使用示例:
smart_append_ordered_files("log_jan_mar.txt", "log_mar_jun.txt")

优势总结

  • 内存友好:仅逐行读取,峰值内存 ≈ 单行长度,支持 TB 级文件;
  • 时间最优:最坏情况仅遍历 source 一次,无需排序或哈希;
  • 强健可靠:内置异常处理,对格式异常行降级处理;
  • 灵活可配:支持自定义分隔符、时间列索引、时间格式。

⚠️ 注意事项

  • 确保输入文件确实按时间升序排列,否则结果不可预测;
  • 若时间戳含毫秒或微秒,请同步更新 timestamp_format(如 "%Y-%m-%d %H:%M:%S.%f");
  • 生产环境建议添加文件锁或原子写入(如先写临时文件再 os.replace),避免并发写冲突。

该方法本质是“有序归并”的轻量变体,兼顾正确性、性能与工程鲁棒性,是处理时序数据分块合并的推荐实践。