土法炼钢兴趣小组的算法知识备份

【量化交易】行情与基本面数据管线:tick、bar、因子库

文章导航

分类入口
quant
标签入口
#data-pipeline#tick#bar#factor-library#parquet

目录

策略代码可以重写,因子可以重算,回测引擎可以换;唯一不能重来的是数据本身。一笔 2019 年 3 月 4 日上午 10:32:18 的成交,如果当时没采下来、或者采下来时把毫秒戳记成了秒戳、或者把买卖方向标错了,事后无论花多少算力都补不回那一刻的市场状态。数据管线是量化系统里少数几个「错了就永远错」的地方,因此它的工程纪律比策略代码严苛得多。

这篇文章把一条完整的数据管线从最上游的行情接入写到最下游的因子查询接口。中间会反复回到几个被低估的问题:tick 数据为什么不能直接喂给策略;按时间切的 K 线为什么在加密市场上是有偏的;财报数据为什么必须用「公告时间」而非「报告期」做时间索引;为什么前复权价适合看图、后复权价适合写策略;为什么 Parquet 的列存压缩对量化数据天然友好但对实时写入又是一种负担。

范围与版本说明:本文涉及的代码示例使用 Python 3.11、polars 1.6、pyarrow 17.0、duckdb 1.1。数据格式标准遵循 Apache Arrow 17.0 与 Apache Parquet 2.10;公司行动定义参考上交所《上市公司分红实施细则(2024)》、深交所同名细则、Nasdaq Corporate Actions Guide 2024、CFA Institute《Equity Asset Valuation》第 3 版。Tick/bar 重采样定义参考 Marcos López de Prado《Advances in Financial Machine Learning》(Wiley,2018)第二章。

风险提示:本文涉及的数据处理示例不构成任何投资建议;任何在生产策略中使用的数据管线都必须经过独立审计和样本外验证。把脏数据当干净数据用,再优秀的策略也只是把噪声放大成亏损。


一、量化数据的层次

量化系统里所谓「数据」从来不是一个东西。同一个标的,从交易所撮合产生事件、到回测引擎取一个 5 分钟收盘价,中间至少跨越了五层抽象。把这五层抽象搞混,就会出现「我用 tick 写了个高频策略,回测年化 800%,实盘上线第一周亏 30%」这种典型故事。

数据层次总图

行情与基本面数据管线分层

上图把整条管线分成五层:接入层 → 规范化层 → 派生层 → 服务层 → 可观测层。这不是装饰性的分层,而是工程上必须强制执行的边界。每一层都要单独落盘,下游可以重算上游,但上游绝不允许丢失。这条规则的代价是磁盘——一份原始 tick 加规范化 tick 加多种 bar 加因子库,可能比裸 tick 大 4 到 6 倍——但它换来的是:当某天发现「派生层的 dollar bar 算错了」,你只需要重跑派生脚本,而不必重新去交易所拉历史 tick(很多时候已经拉不到了)。

各层语义

层级 时间粒度 体积级别 直接消费者 主要职责
接入层(raw) tick / 事件级 TB / 标的年 写盘服务自己 二进制落盘、序列号校验
规范化层(norm) tick / 事件级 0.5 ~ 1 × raw ETL、回放器 时区、字段、单位、标的代码统一
派生层(bars / factors) 1m / 1d / 事件 0.01 ~ 0.1 × raw 回测、研究 重采样、聚合、特征工程
服务层(views) 视情况 内存或缓存 策略、可视化 统一查询接口、PIT 强制
可观测层(ops) 旁路 元数据 监控、SRE 缺失、漂移、SLA

体积级别的数字不是定理,但量级是合理的:US 股票市场全市场 tick 一天就 1~3 TB;规范化基本不缩,可能因为抛弃了协议头略有缩减;时间 bar 大约是 tick 的 1/100;因子库一般每标的每日一行,每个因子一列,又比 bar 缩 1~2 个量级。

为什么要分这么多层

把这套分层和别人最常见的两层做法(「原始数据 + bar 表」)比一下,差异立刻就出来:

一个容易被忽略的细节

很多团队把「Level-2 行情」当一种数据来收,实际上 Level-2 行情至少有三种:Top-of-Book 快照(每个 tick 给出顶档买卖价)、多档快照(每个 tick 给出 N 档买卖价)、逐笔委托/成交(每个事件一条记录)。这三者在压缩比、可用因子、重建难度上差异巨大。沪深市场公开的 Level-2 是「多档快照 + 逐笔委托 + 逐笔成交」混合,下游若只取「多档快照」就丢失了排队信息(Queue Position);若用「逐笔委托」自己重建订单簿,又要正确处理撤单、改单的回放顺序,这是一个非平凡的工程任务。

把数据当作「采下来就行」是最容易翻车的态度。从一开始就要明确:我接入的是哪一种数据,下游能从这个数据派生出哪些信息,不能派生出哪些信息

数据层与策略频率的对应

不同频率的策略对应不同的数据层。把这层对应写清楚,可以避免后期反复推翻:

策略频率 主要消费层 典型粒度 历史深度需求 主要风险
高频做市 / 套利(μs ~ ms) 接入层 + 规范化层 tick / 逐单 1 ~ 3 个月 延迟、丢包、回放偏差
日内(分钟~小时) 派生层(dollar bar、imbalance bar) bar 2 ~ 5 年 bar 重采样口径、滑点
跨日(日 ~ 周) 派生层(time bar) + 因子库 日 bar、日因子 10 ~ 20 年 复权、PIT、幸存者偏差
跨月(月 ~ 季) 因子库 + 基本面 季频财报、月度因子 20 ~ 30 年 财报修正、业务口径变化

这张表的一个含义是:研究阶段不要让低频策略消费高频数据。低频策略用 tick 数据训练时序模型,是常见的「过参数化」陷阱——模型记住了高频噪声,但样本外的市场状态分布完全不同。


二、行情数据源与数据质量评估

主流数据源对照

数据源 市场 协议/接口 历史深度 质量典型问题 价格区间(USD/年)
交易所组播(CME MDP 3.0、Nasdaq ITCH) 期货、美股 UDP 组播 + SBE/ITCH 实时 + 历史 dump 序列号缺口、跨日切换 数十万起
Polygon.io 美股 WebSocket + REST 5 ~ 20 年 极小数复权差错、L2 重建延迟 0.2 ~ 5 万
IEX Cloud 美股 REST 5 年 仅 IEX 自己的盘口,不代表 NBBO 0.1 ~ 3 万
Tardis.dev 加密 历史 dump(Parquet/CSV) 2017 + 部分交易所早期 ws 不完整 0.5 ~ 2 万
CCXT 加密 REST + WebSocket(聚合) 实时 各交易所字段不齐、限速 开源免费 + 自建
CTP 国内期货 二进制 当日实时 历史不全,依赖中间商 通过期货公司
XTP 国内股票 二进制 当日实时 Level-1 顶档为主 通过证券公司
Wind A 股、港股、债券、宏观 API、SDK 20 年 + 复权数据偶有重算变化 数万人民币
同花顺 iFinD 同上 API 同上 行业分类口径变化 数万人民币

挑数据源的工程师常犯一个错:把「价格、覆盖范围、API 易用性」三件事当成同等重要。实际上,质量风险一旦发生,前两项的优势会被瞬间抹平。一个很有代表性的故事:2018 年某美国数据厂商把 SPY 的某次拆股复权因子写错了 0.001,导致使用其历史数据的几百个量化基金回测同时偏差 0.1%,发现的时候已经过了三个月。

数据质量的六个维度

下面这六个维度是工程上可以逐项检查的。空泛地说「数据质量好」毫无意义。

  1. 完整性(Completeness):在交易时段内,单位时间的 tick 数是否符合预期?空缺时段是否有交易日历支撑(停牌、半日市)?
  2. 及时性(Timeliness):从交易所产生事件到落到本地盘的延迟分布是什么?P50/P99/最大延迟分别是多少?
  3. 一致性(Consistency):同一笔成交在 tick 和 trade tape 里是否一致?多档盘口的顶档和 Level-1 BBO 是否一致?
  4. 唯一性(Uniqueness):同一笔成交是否被记录两次?同一报单 ID 是否出现两次(重传未去重)?
  5. 正确性(Accuracy):随机抽样 N 条记录,与上游交易所原始报文逐字段比对,错误率?
  6. 可追溯性(Traceability):每条规范化记录是否能追溯到原始报文的 offset?无法追溯就无法事后修正。

工程上落实这六项靠的是一组自动化质检脚本,每天交易结束后跑一遍,结果落到时序数据库做趋势监控。本文最后一节会展开。

一段评估代码

下面这段是规范化 tick 落盘后的最简质检脚本。它假设落盘文件是按交易日分区的 Parquet,schema 至少包含 (ts_ns, sym, px, qty, side)

import polars as pl
from datetime import date

def daily_quality_check(path: str, trading_date: date, sym: str) -> dict:
    """对单标的单交易日的 tick 文件做最小质检,返回指标字典。"""
    df = pl.scan_parquet(path).filter(
        (pl.col("trade_date") == trading_date) & (pl.col("sym") == sym)
    ).collect()

    if df.height == 0:
        return {"status": "empty", "ticks": 0}

    # 1. 时间单调
    ts = df["ts_ns"]
    monotonic = (ts.diff().fill_null(0) >= 0).all()

    # 2. 价格合理性:偏离当日中位数 30% 以上视为异常
    px_med = df["px"].median()
    outlier_ratio = ((df["px"] - px_med).abs() / px_med > 0.3).sum() / df.height

    # 3. 单位时间 tick 密度(按分钟)
    per_min = df.group_by_dynamic("ts_ns", every="1m").agg(pl.len().alias("n"))
    quiet_min = (per_min["n"] == 0).sum()

    # 4. 重复检测(同 ts_ns + 同 px + 同 qty + 同 side 视为重复)
    dup = df.group_by(["ts_ns", "px", "qty", "side"]).len()
    dup_ratio = (dup["len"] > 1).sum() / df.height

    return {
        "status": "ok",
        "ticks": df.height,
        "monotonic": bool(monotonic),
        "outlier_ratio": float(outlier_ratio),
        "quiet_minutes": int(quiet_min),
        "dup_ratio": float(dup_ratio),
        "px_min": float(df["px"].min()),
        "px_max": float(df["px"].max()),
        "px_median": float(px_med),
    }

这个脚本不是「演示」,它是真正在生产里要跑的形态:每天结算后跑,结果写到指标库,然后阈值告警。比如「quiet_minutes 在交易时段超过 5 分钟」、「dup_ratio 超过 1‰」、「monotonic == false」都应该触发人工排查。

个人观点:不要迷信「免费数据」

我在多个团队里见过同一个错误:因为 yfinance/akshare/baostock 这些开源数据接口免费、API 干净,就直接拿来当生产数据源。它们适合做研究探索、不适合做实盘信号。原因有三:

  1. 这些接口本身是爬虫,数据来源页面格式变化时会静默失败或返回错误数据。
  2. 复权因子的更新频率不可保证。某次拆股事件之后,前复权因子可能 24 小时之后才修正过来,期间下载到的数据是错的。
  3. 没有 SLA。今天它跑得好,明天可能整天宕机,你的策略要么延迟一天、要么挂掉。

正确做法是:研究阶段允许用免费数据快速迭代,但所有进入回测台账的数据必须从付费源(或交易所一手源)抽出,并通过质检流水线,两份要做差异比对。差异超过阈值的,研究结论一律不可采信。


三、Tick 到 Bar 的聚合

策略不会直接消费 tick——绝大多数因子、信号、模型的输入是某种「Bar」。把 tick 聚合成 bar 这一步,看似只是 groupby + agg,却是数据管线里最被低估的一处偏差源

五种 bar 的对照

de Prado 在《Advances in Financial Machine Learning》第二章里提出了 tick bar、volume bar、dollar bar、imbalance bar 等概念,核心论点是:按时间切的 bar 在统计性质上是劣势选择,因为它会把高活跃和低活跃的市场状态用同一根 K 线表达。

Bar 类型 触发条件 优点 缺点 适用场景
时间 bar(time bar) 每隔 ΔT 一根 简单、与日历对齐 信息含量分布不均,盘前/午盘 K 线噪声大 长周期、跨标的对齐研究
Tick bar 每 N 笔成交一根 信息密度均匀 撤单、改单密集时仍偏 中频、逐笔策略
Volume bar 每 N 手成交一根 信息密度均匀 + 抗成交粒度噪声 大单可能跨多根 流动性研究
Dollar bar 每 N 元成交额一根 价格与体量同时归一,跨标的最易对齐 计算稍复杂 因子工程主推
Imbalance bar(TIB/VIB/DIB) 累计买卖不平衡量超过 θ 一根 直接编码订单流方向 θ 估计偏差敏感 订单流策略、做市

为什么 dollar bar 在加密和美股上特别重要

加密市场上的一个直观现象:BTC 从 1 万美元涨到 10 万美元,「成交一手」从来不是固定金额。如果你按 tick bar 或 volume bar 切,过去和现在的 bar 在「单根 bar 代表多少经济价值」这件事上差了 10 倍。dollar bar 直接以成交金额为阈值,把「单根 bar 的经济意义」固定下来,这对模型训练特别重要——它降低了样本协方差结构的非平稳性。

美股上也有类似现象,但弱一些(拆股能部分缓解);A 股上个股拆股极少,但行业指数随时间膨胀,dollar bar 同样比 volume bar 稳。

Dollar bar 的实现

下面这段代码是一个可直接运行的 dollar bar 重采样器。它使用 polars 的流式扫描,对几百 GB 的 tick 文件可以在单机内常数内存跑完。

import polars as pl
from pathlib import Path

def build_dollar_bars(
    tick_path: str,
    out_path: str,
    sym: str,
    dollar_threshold: float,
) -> None:
    """把单标的的规范化 tick 重采样成 dollar bar 并写出 Parquet。

    输入 schema: ts_ns(i64), sym(str), px(f64), qty(f64), side(i8)
    输出 schema: open_ts, close_ts, open, high, low, close, vwap, volume, dollar, n_trades
    """
    lf = pl.scan_parquet(tick_path).filter(pl.col("sym") == sym).sort("ts_ns")

    # 流式收集;对超大文件应分块处理,本示例假设单标的可装入内存
    df = lf.collect(streaming=True)
    if df.height == 0:
        return

    px = df["px"].to_numpy()
    qty = df["qty"].to_numpy()
    ts = df["ts_ns"].to_numpy()

    bars = []
    cum_dollar = 0.0
    bar = _new_bar(ts[0], px[0])
    for i in range(df.height):
        notional = px[i] * qty[i]
        bar["high"] = max(bar["high"], px[i])
        bar["low"] = min(bar["low"], px[i])
        bar["close"] = px[i]
        bar["close_ts"] = ts[i]
        bar["volume"] += qty[i]
        bar["dollar"] += notional
        bar["px_qty_sum"] += notional
        bar["n_trades"] += 1
        cum_dollar += notional

        if cum_dollar >= dollar_threshold:
            bar["vwap"] = bar["px_qty_sum"] / bar["volume"] if bar["volume"] > 0 else bar["close"]
            bars.append(bar)
            cum_dollar = 0.0
            if i + 1 < df.height:
                bar = _new_bar(ts[i + 1], px[i + 1])

    if bar["n_trades"] > 0:
        bar["vwap"] = bar["px_qty_sum"] / bar["volume"] if bar["volume"] > 0 else bar["close"]
        bars.append(bar)

    out = pl.DataFrame(bars).drop("px_qty_sum")
    Path(out_path).parent.mkdir(parents=True, exist_ok=True)
    out.write_parquet(out_path, compression="zstd")


def _new_bar(ts: int, px: float) -> dict:
    return {
        "open_ts": int(ts),
        "close_ts": int(ts),
        "open": float(px),
        "high": float(px),
        "low": float(px),
        "close": float(px),
        "vwap": float(px),
        "volume": 0.0,
        "dollar": 0.0,
        "px_qty_sum": 0.0,
        "n_trades": 0,
    }

这段代码有几个工程上的细节值得拎出来:

Imbalance bar 简述

Imbalance bar 的触发条件是「累计买卖不平衡 ∑sign(Δp_t) × v_t 的绝对值超过期望值的两倍」。它的实现要点是 EWMA 估计期望值,并允许阈值随市场状态自适应。这一类 bar 在做市与订单流策略里很有用,因为它直接把 bar 的边界对齐到「方向被打破」的事件,而不是被动等成交额累计。

实现细节读者可以参考 mlfinlab 库的 data_structures 子模块(开源,作者团队即 de Prado 实验室)。我自己的经验是:imbalance bar 在低流动性标的上要谨慎,期望值的 EWMA 估计偏差可能让阈值长期偏低,进而 bar 数量爆炸。

时间 bar 与事件 bar 的混合

实务上一个折中是主用事件 bar、辅以时间 bar。例如策略主信号基于 dollar bar,但同时维护一份 5 分钟 time bar 用来:

  1. 跨标的对齐。多标的组合需要在同一时刻取每个标的的状态,dollar bar 在不同标的上时刻不一致,time bar 直接对齐。
  2. 可视化。人看 K 线图习惯固定时间间隔。
  3. 风控。固定时间窗口内的成交量、波动率统计用 time bar 计算更直观。

两套 bar 同时维护,存储成本按倍增加,但研究效率收益往往比成本高出一个数量级。

一个常被忽视的口径问题

聚合 tick 时,「成交方向」是用 Lee-Ready 算法推断还是直接用交易所标记,会显著影响 imbalance bar 的形态。Lee-Ready 假设买卖方向取决于成交价相对前一笔中间价的位置,但在国内市场上交易所通常直接给出主动方向(买驱动还是卖驱动)。两者结果差异不小,研究里要明确口径。


四、存储格式选型

bar 之后的数据怎么存,很大程度上决定了之后整个研究链路的速度。十年前这个问题的答案是 HDF5 或自研二进制;今天它收敛到了几个具体选项。

主要格式与系统对照

格式/系统 存储模型 写入吞吐 列读吞吐 压缩比 跨语言 实时支持
CSV 行存(文本) 极低 极低
HDF5 / pytables 行存为主 一般
Parquet 列存 高(批写) 极高 强(Arrow)
Apache Arrow IPC 列存(内存优化) 极高 极强
ClickHouse 列存(MergeTree) 极高 极高 极好 通过驱动 是(写流式)
QuestDB 列存(时序优化) 极高 极高 通过驱动
kdb+/q 列存(专有) 极高 极高 一般
DuckDB 列存 + 嵌入式 极高

列存 vs 行存

量化数据天然偏列存。一个直观例子:你在因子库里有 200 个因子(200 列),每次研究只读 5 个因子。行存要读完整行再丢弃 195 列;列存只读 5 列文件块。单次查询的 IO 量就差 40 倍。再加上列内同质数据压缩比远高于行内异质数据(一列 close 价的浮点序列,差分编码 + zstd 经常能压到原始的 10%),列存的优势在量化场景里几乎没有反例。

Parquet + DuckDB 的研究台栈

我推荐的研究阶段栈就一行:Parquet(落盘) + DuckDB(查询) + Polars(变换)。原因是:

DuckDB 的一个特别实用的能力:对分区 Parquet 自动 prune 文件。下面例子里,DuckDB 会根据 WHERE 子句只扫描 2024-03 月份的分区文件,省下绝大部分 IO:

import duckdb

con = duckdb.connect()
df = con.execute("""
    SELECT sym, AVG(close) AS avg_close, SUM(volume) AS total_vol
    FROM read_parquet('data/bars_1m/year=*/month=*/*.parquet', hive_partitioning=true)
    WHERE year = 2024 AND month = 3 AND sym IN ('AAPL', 'MSFT', 'NVDA')
    GROUP BY sym
""").pl()

这种用法在小到 100 GB 的研究数据上,单机笔记本就能交互式查询。

ClickHouse / QuestDB 的实盘场景

实盘 tick 落盘是另一个故事。Parquet 不擅长「每秒几万次小写入」,因为列存的批量编码和压缩需要积攒一定批次才能 flush。这时候选项变成:

我自己的工程判断(不是规范结论):如果团队没有既有 kdb+ 资产,ClickHouse 是 2024 年后的默认选择。ClickHouse 的 SQL 语法大家熟悉,列存压缩在 tick 数据上能压到 5~10 倍,物化视图机制可以直接在写入时合成 1m bar。QuestDB 在极致写入吞吐上更强,但 SQL 兼容性和生态稍弱。

一个常见反模式

每隔半年都会看到团队问:「我们 tick 数据 200 GB,是不是该上 Hadoop / Spark / 数据湖?」

绝大多数情况下答案是不需要。单机 Polars + DuckDB + 分区 Parquet 在 1 TB 以下的数据规模上比小规模分布式集群更快、更便宜、更容易运维。只有当总数据量稳定超过 10 TB、并且需要多人并发查询时,才有必要考虑 Spark / Trino / Iceberg 这类栈。早早上分布式,唯一的产出是 DevOps 工时和云账单。

分区策略

Parquet 落盘时分区设计直接决定了下游查询的扫描效率。常见三种分区方式:

分区维度 优点 缺点 适用场景
仅按日期 简单,时间范围扫描快 单标的查询要扫所有标的 多标的组合研究
标的 + 日期 单标的查询最优 标的数多时小文件爆炸 单标的深度研究
月份 + 标的 hash 分桶 均衡 实现稍复杂 大规模生产

我的工程默认:「年/月」 + 「标的」二级分区。年/月作为粗粒度时间剪枝,标的作为细粒度过滤,单文件控制在 100 MB ~ 500 MB(Parquet 的甜区)。

Compaction(小文件合并)

实时落盘容易产出大量小文件——每分钟一个 Parquet,单标的一年 50 万个文件。这种形态下 DuckDB / Polars 的查询速度会被打开文件元数据的开销吃掉。生产中需要一个夜间 compaction 任务,把当日的小文件合并成大文件:

def compact_daily(date: str, sym: str) -> None:
    pattern = f"data/raw/sym={sym}/date={date}/*.parquet"
    df = pl.scan_parquet(pattern).collect(streaming=True)
    out = f"data/compact/sym={sym}/date={date}.parquet"
    df.sort("ts_ns").write_parquet(out, compression="zstd")
    # 验证后删除原小文件

row_group_size 也要调:默认 1M 行偏大,对量化随机访问不友好;改成 100K ~ 200K 行更顺手。


五、增量与回填

数据管线最难写好的不是「全量第一次跑」,而是「每天增量更新 + 偶尔回填一段历史」这两件事。它们会同时撞到三个棘手问题:断点续传、幂等、数据修正

断点续传的状态机

每个抽取任务都应当是一个有状态的迁移过程,最少包含四个状态:

PENDING -> FETCHING -> FETCHED -> NORMALIZED -> COMMITTED

任何一步失败回到 PENDING 重试,重试时根据当前状态决定从哪里开始:FETCHED 之后的 ETL 可以从本地缓存重跑,不必重新拉数据源。这套状态可以放在 SQLite 或 PostgreSQL 的 ingest_jobs 表里:

CREATE TABLE ingest_jobs (
    job_id        TEXT PRIMARY KEY,
    source        TEXT NOT NULL,       -- e.g. 'polygon_tick'
    sym           TEXT NOT NULL,
    trade_date    DATE NOT NULL,
    status        TEXT NOT NULL,       -- PENDING/FETCHING/.../COMMITTED
    raw_path      TEXT,                -- 接入层落盘路径
    norm_path     TEXT,                -- 规范化落盘路径
    rows_raw      BIGINT,
    rows_norm     BIGINT,
    checksum_raw  TEXT,                -- 原始数据 sha256
    checksum_norm TEXT,
    created_at    TIMESTAMP NOT NULL,
    updated_at    TIMESTAMP NOT NULL,
    error_msg     TEXT
);

幂等

一个抽取任务被重复触发不应当造成重复数据。落实幂等最常用的方式有两种:

  1. 写入路径包含主键。规范化层的文件名/分区设计成 {source}/{sym}/{trade_date}.parquet。同一份输入产生同一份输出,重跑覆盖即可。
  2. 使用临时文件 + 原子重命名。先写到 xxx.parquet.tmp,落盘成功后再 os.rename 到目标路径。这样下游不会读到半截文件。

回填校验

「这个月的数据回填完了,怎么知道对不对?」这是工程上必答的问题。我推荐三层校验:

  1. 行数校验:与上游 API 提供的统计端点对账,例如 Polygon 提供 /v3/reference/dividends?date=YYYY-MM-DD 的总行数。
  2. 关键字段校验:抽样 100 条记录,逐字段比对原始报文。
  3. 派生层一致性:用回填后的 tick 重算昨日 bar,与昨日实盘 bar 比较。差异超过 ε 即告警。

一个真实事故的形态

我见过一个非常难查的事故:回填脚本对历史日的处理和对当日实时数据的处理走了两条代码路径。两条路径对「成交方向缺失」的 fallback 不一致——回填路径把缺失视为 0(不区分买卖),实时路径把缺失视为 1(默认买)。结果回填一年历史后,最近三个月的数据来自实时路径,更早的来自回填路径,因子值在三个月分界处出现一个无法解释的跳变,研究员找了两周才定位。

教训:实时路径和回填路径应当共享同一个 ETL 函数。差别仅在数据来源(实时 ws vs 历史 dump),下游所有处理必须走同一条代码。这个约束写进 code review checklist 里。

增量调度

每天交易结束后的增量任务通常按「市场 → 数据源 → 标的」组织。Airflow / Prefect / Dagster 是三种主流编排器;我个人在量化场景里偏好 Dagster,原因是它的「资产(asset)」抽象与数据管线的物化目标更对齐:每个 asset 是一份具体的 Parquet 文件,依赖关系自动推断。

最低限度的依赖结构示例:

raw_tick(sym, date)
   └─> norm_tick(sym, date)
         └─> dollar_bar(sym)
         └─> time_bar(sym, freq=1m)
                 └─> factor.mom_20d(sym, date)
                 └─> factor.vol_20d(sym, date)

当某天上游 raw_tick 重写后,编排器自动把所有下游标记为 stale 并重算。这套机制让回填变成一个一致的图重算问题,而不是手工运行多个脚本。

重传与去重

实时 ws 路径上一定会有重传(网络抖动、连接重连)。规范化层在落盘前必须做去重。去重键的选择很重要:

去重后,写一行 metric 到监控:当日去重比率。如果某天突然飙升到 5%,说明上游或自己的接入服务出了状态问题。


六、基本面与公司行动

行情数据已经够头疼,基本面数据更难——它的「时间」概念是双重的,公司行动事件还会反过来修改历史价格。

PIT:基本面数据的时间双键

每条基本面记录至少有两个时间:

同一个 period_dt 可能被多次公告:初次披露、修正、年度审计后再修正。所以每条记录还要有 revision_no。规范化层落盘 schema 类似:

sym        | period_dt  | announce_dt | revision_no | metric_name | value
600519.SH  | 2024-03-31 | 2024-04-28  |           1 | net_income  | 1.23e10
600519.SH  | 2024-03-31 | 2024-08-30  |           2 | net_income  | 1.21e10

回测时默认只能看到 announce_dt <= nowrevision_no 为该时点最大者的记录。任何不做这个过滤就直接 join 的代码都是 PIT 漏洞。

公司行动的三种类型

  1. 现金分红(Cash Dividend):除权日股价下跌,但价格序列本身不变;复权因子改变。
  2. 拆股 / 合股(Split / Reverse Split):除权日价格按比例变化;流通股数同步变化。
  3. 配股 / 增发(Rights / Secondary Offering):以低于市价的发行价向老股东配售,理论上摊薄股价;处理最复杂。

A 股还有「送股」「转增股」等特殊形式,但本质都可以归到分红或拆股的扩展。

复权计算

前复权后复权与不复权三条价格曲线

上图展示了同一只股票在 t1 发生「10 派 5 元 + 一拆二」之后,三种价格序列的形态。三条线背后是同一组成交价,差别只是用什么因子去归一化

复权因子定义(不计配股的简化版):

factor(t) = ∏_{event ≤ t}  (close_before − cash_div) / (close_before × (1 + split_ratio))

写成 Python:

import polars as pl

def compute_adjust_factor(
    prices: pl.DataFrame,        # columns: trade_date, close
    actions: pl.DataFrame,       # columns: ex_date, cash_div, split_ratio
) -> pl.DataFrame:
    """计算每个交易日的累计复权因子。actions 必须按 ex_date 升序。"""
    out = prices.sort("trade_date").with_columns(pl.lit(1.0).alias("factor"))
    rows = out.to_dicts()
    actions_by_date = {a["ex_date"]: a for a in actions.to_dicts()}

    cum = 1.0
    for r in rows:
        ev = actions_by_date.get(r["trade_date"])
        if ev is not None:
            # 用前一交易日 close(即除权前价格)计算因子
            close_before = r["prev_close"]
            div = ev.get("cash_div", 0.0)
            split = ev.get("split_ratio", 0.0)
            f = (close_before - div) / (close_before * (1.0 + split))
            cum *= f
        r["factor"] = cum
    return pl.DataFrame(rows)


def backward_adjust(prices: pl.DataFrame, factor: pl.DataFrame) -> pl.DataFrame:
    """后复权:早期不变,未来放大。"""
    return (
        prices.join(factor.select(["trade_date", "factor"]), on="trade_date")
        .with_columns((pl.col("close") / pl.col("factor")).alias("close_adj_back"))
    )


def forward_adjust(prices: pl.DataFrame, factor: pl.DataFrame) -> pl.DataFrame:
    """前复权:未来不变,历史缩小。"""
    f_now = factor["factor"][-1]
    return (
        prices.join(factor.select(["trade_date", "factor"]), on="trade_date")
        .with_columns((pl.col("close") * f_now / pl.col("factor")).alias("close_adj_fwd"))
    )

哪种复权用在哪种场景

配股的坑

配股的复权因子定义里多一项「配股价 × 配股比例」:

factor = (close_before − cash_div + rights_price × rights_ratio) /
         (close_before × (1 + split_ratio + rights_ratio))

这里有一个工程陷阱:配股价是要除权日之前公告的,但配股是否成功要等配股缴款日。如果某次配股失败(达不到最低认购比例),复权因子需要回滚。少数数据厂商在这种情况下处理不当,会出现「这次配股因子加进去后又拿掉,但缓存了一周才刷新」。这也是为什么生产策略要保留自己计算复权因子的能力,而不是完全信任厂商提供的复权后价格。

停复牌与退市

停牌期间没有交易,时间 bar 必须留下空白,但绝对不要把空白填成上一根。下游回测引擎要明确处理「这一根 bar 不存在」的情况。退市更敏感,相关讨论留到下一篇《幸存者偏差》。


七、因子库设计

经过前面五层处理,数据终于到了量化研究的工作面:因子库。因子库的设计有几个不可妥协的需求:PIT 严格、版本化、可复现、可查询

因子库的最小 schema

我推荐的最小 schema 长这样:

factor_value:
    factor_name   TEXT NOT NULL    -- 因子名,唯一标识
    factor_version TEXT NOT NULL   -- 因子版本,semver 或 git short sha
    sym           TEXT NOT NULL    -- 标的代码(统一格式)
    value_dt      DATE NOT NULL    -- 因子值对应的「业务日期」
    available_at  TIMESTAMP NOT NULL  -- 该值实际可用的时刻(PIT 关键字段)
    value         DOUBLE
    PRIMARY KEY (factor_name, factor_version, sym, value_dt)

value_dtavailable_at 必须严格区分:

回测和研究查询必须按 available_at <= as_of 过滤,否则一定会有未来函数。

版本化的两种粒度

factor_version 可以是:

  1. Semver 风格v1.0.0v1.1.0v2.0.0。优点是人类可读,方便沟通。
  2. 代码 hash 风格abcd1234,对应计算因子的代码 commit。优点是机器精确。

我倾向两者并存:表里存代码 hash,元数据表里维护「semver → 当前指向哪个 hash」。这样一边可以跟踪每次计算的精确代码,一边在论文/报告里用 semver 引用。

物理存储

因子库的物理存储模式有两种思路:

  1. 长表(long format):上面的 schema,一行一个值。优点是 schema 稳定,新增因子不动表结构;缺点是行数爆炸,N 标的 × M 因子 × T 日。
  2. 宽表(wide format):每个因子一列。优点是查询直接;缺点是新增因子要 ALTER TABLE。

我的工程选择是:底层物理存长表(Parquet 分区按 factor_name)上层视图按需展开为宽表。理由是底层长表对 ETL 友好(每个因子独立写出,不互相耦合),上层宽表对研究友好(直接喂给 numpy/pandas)。

一个 DuckDB 因子库实现

下面这段代码给出了一个最小可用的因子库:写入用 polars + Parquet 分区,查询用 DuckDB 视图,PIT 过滤强制内置。

import duckdb
import polars as pl
from pathlib import Path
from datetime import datetime

class FactorLib:
    def __init__(self, root: str):
        self.root = Path(root)
        self.con = duckdb.connect()
        # 设置内置视图
        self.con.execute(f"""
            CREATE OR REPLACE VIEW factor_value AS
            SELECT * FROM read_parquet(
                '{self.root}/factor_name=*/factor_version=*/*.parquet',
                hive_partitioning = true
            )
        """)

    def write(
        self,
        name: str,
        version: str,
        df: pl.DataFrame,  # columns: sym, value_dt, available_at, value
    ) -> str:
        """写入一批因子值。同名同版本同 (sym, value_dt) 的旧值被覆盖。"""
        out_dir = self.root / f"factor_name={name}" / f"factor_version={version}"
        out_dir.mkdir(parents=True, exist_ok=True)

        # 简化:按 value_dt 月份分文件
        for (year, month), part in df.with_columns(
            pl.col("value_dt").dt.year().alias("y"),
            pl.col("value_dt").dt.month().alias("m"),
        ).group_by(["y", "m"]):
            f = out_dir / f"y={year}-m={month:02d}.parquet"
            part.drop(["y", "m"]).write_parquet(str(f), compression="zstd")
        return str(out_dir)

    def fetch(
        self,
        name: str,
        version: str,
        symbols: list[str],
        as_of: datetime,
        start_dt: datetime | None = None,
    ) -> pl.DataFrame:
        """PIT 安全的因子值查询:只返回 available_at <= as_of 的值。"""
        sym_list = ",".join(f"'{s}'" for s in symbols)
        start_clause = f"AND value_dt >= '{start_dt.date()}'" if start_dt else ""
        return self.con.execute(f"""
            SELECT sym, value_dt, value, available_at
            FROM factor_value
            WHERE factor_name = '{name}'
              AND factor_version = '{version}'
              AND sym IN ({sym_list})
              AND available_at <= TIMESTAMP '{as_of.isoformat()}'
              {start_clause}
            ORDER BY sym, value_dt
        """).pl()

    def latest_per_day(
        self,
        name: str,
        version: str,
        symbols: list[str],
        as_of: datetime,
    ) -> pl.DataFrame:
        """每个 (sym, value_dt) 取 available_at <= as_of 的最新一条。处理修正版本。"""
        df = self.fetch(name, version, symbols, as_of)
        return df.sort("available_at").group_by(["sym", "value_dt"]).last()

fetch 是查询接口的关键:as_of 参数强制传入,没有默认值。这是工程上防止 PIT 漏洞最有效的手段——研究员忘了传,代码直接报错,比写错日后再发现要好得多。

用法

import polars as pl
from datetime import datetime

lib = FactorLib("/data/factor_lib")

# 写入:动量因子 v1.0.0
df = pl.DataFrame({
    "sym": ["AAPL", "AAPL", "AAPL"],
    "value_dt": [datetime(2024,1,1).date(), datetime(2024,1,2).date(), datetime(2024,1,3).date()],
    "available_at": [datetime(2024,1,1,21,0), datetime(2024,1,2,21,0), datetime(2024,1,3,21,0)],
    "value": [0.012, -0.003, 0.015],
})
lib.write("mom_20d", "v1.0.0", df)

# 查询:作为 2024-01-02 22:00 时点的研究者,能看到的所有动量因子
out = lib.fetch(
    name="mom_20d",
    version="v1.0.0",
    symbols=["AAPL"],
    as_of=datetime(2024, 1, 2, 22, 0),
)
print(out)
# 应该只返回 1/1 和 1/2 两条;1/3 的 available_at 还在未来,不可见

因子定义文件

每个因子还要有一份「定义文件」与它的版本号一一对应。我用 YAML:

name: mom_20d
version: v1.0.0
description: 20 个交易日的累计对数收益率
window: 20
input_factors:
  - log_return_1d@v1
update_schedule: daily_after_close
available_offset_minutes: 30   # 收盘后 30 分钟可用
owner: research-team
created_at: 2024-01-01
last_modified_commit: abcd1234

这份文件随因子代码一起进 git,和数据库里的版本号一一对应。「因子定义和因子值物理上分开,但通过版本号锁定」是因子库可复现性的核心

一个被低估的工程细节

因子计算的「业务日期」与「数据日期」要谨慎区分。例:

这条逻辑在「跨时区市场」上更微妙——US 股票的收盘是北京时间次日凌晨,因子的 available_at 必须按 UTC 或本地化对齐,否则跨市场组合会出现 PIT 漏洞。

因子值修正与回填

因子也会修正——上游基本面数据修正、计算代码 bug 修复后重算,都会产生「同一 (sym, value_dt) 出现新值」。处理方式有两种:

  1. 新版本号:把修正后的值写到 v1.0.1,老值 v1.0.0 保留。研究员显式选择版本。
  2. 同版本覆盖 + 修订时间戳:在表里加 revision_ts 字段,fetch 默认取最新。

我倾向方式一。版本永远显式,可复现性最强;老回测结果不受新数据影响——一篇研究报告里说「使用 mom_20d v1.2.0」时,三年后还能用 v1.2.0 重跑得到同样数字,这是非常有价值的。

因子之间的依赖

复杂因子往往依赖简单因子。例如「行业中性化的 20 日动量」依赖 mom_20d + industry_classification。这种依赖应当显式写在 YAML 定义里,不要在因子计算代码里硬编码。理由是:当依赖的底层因子升级版本,所有下游因子的版本号也要同步推进,YAML 让这个传播变得可审计。


八、可观测性

数据管线写完不是结束,它每天都在运行,每天都可能坏。可观测性是把「坏了多久才发现」从「下游研究员发现因子值不对」(典型 1~4 周)压到「自动告警」(典型 1~30 分钟)。

三类必须监控的指标

一、缺失监控(completeness)

每天交易结束后跑一组 SQL:

-- 哪些标的、哪些数据源缺当日规范化数据?
SELECT source, sym
FROM expected_universe e
LEFT JOIN normalized_data n
  ON e.source = n.source AND e.sym = n.sym AND n.trade_date = '2026-04-30'
WHERE n.sym IS NULL;

expected_universe 是当日预期应有数据的全集,按市场+交易日维护。任何缺失都告警。

二、漂移检测(drift)

每个核心字段(价格、成交量、tick 数)的当日统计与历史 30 天滚动分布对比:

def detect_drift(today: pl.DataFrame, history: pl.DataFrame, col: str) -> dict:
    """简单的 KS-style 漂移检测。"""
    today_med = today[col].median()
    today_std = today[col].std()
    hist_med = history[col].median()
    hist_std = history[col].std()
    z_med = (today_med - hist_med) / (hist_std + 1e-9)
    return {
        "today_med": today_med,
        "hist_med": hist_med,
        "z_score": float(z_med),
        "alert": abs(z_med) > 4,  # 4σ 触发告警
    }

漂移告警的阈值要按指标设定,不能一刀切。比如「单日成交量」相对于 30 天均值 4σ 是合理告警;但「价格」4σ 太松,因为大盘单日 ±5% 是常事。告警阈值需要经验校准 + 持续 review

三、SLA 监控(timeliness)

每个数据源都要定义 SLA:

数据源 SLA 类型 阈值 监控点
美股实时 tick 端到端延迟 P99 500 ms 接入服务
美股日 bar 落盘时刻 收盘后 30 分钟 ETL 服务
A 股财报 公告日 +1 工作日 24 小时 入库服务
因子库 mom_20d 收盘后 + 30 分钟 30 分钟 因子服务

SLA 违反就告警。一个工程上常见的反模式是把 SLA 写在 wiki 文档里但不做自动监控,结果「数据晚到了 3 小时」要等人发现。

元数据表

把上面这些监控落地,需要一张元数据表:

CREATE TABLE data_run_metadata (
    pipeline      TEXT NOT NULL,
    run_date      DATE NOT NULL,
    started_at    TIMESTAMP NOT NULL,
    finished_at   TIMESTAMP,
    status        TEXT NOT NULL,  -- RUNNING/SUCCESS/FAILED/STALE
    rows_in       BIGINT,
    rows_out      BIGINT,
    quality_json  JSON,           -- 包含上面 daily_quality_check 的全部指标
    sla_breached  BOOLEAN DEFAULT FALSE,
    alert_sent    BOOLEAN DEFAULT FALSE,
    PRIMARY KEY (pipeline, run_date)
);

可观测性的本质是「让数据管线本身也变成数据」。每个 ETL 任务运行都是一行记录,所有质量指标都进数据库,告警基于查询,dashboard 基于查询,回顾基于查询。这套元数据每天要按时入库,比上层因子库还要可靠

可观测的工程纪律

我自己的判断(不是规范结论):监控覆盖度比花哨的 dashboard 重要 10 倍。一个 Grafana 面板能看到 50 个指标的趋势,但只要漏了「某个标的某天没数据」,事故就会发生。先把覆盖度做满,再做漂亮的可视化

告警分级

监控不是「告警越多越好」。一个会有 200 个标的的管线,如果每个标的每天 10 条监控规则,告警数量足以淹没所有人。建议按严重程度分级:

级别 触发条件举例 响应方式 SLA
P0 因子库写入失败、SLA 严重违反 立即 PagerDuty / 电话 15 分钟内响应
P1 数据缺失超过 10 个标的、漂移超 6σ Slack 高优先级 1 小时内
P2 单标的缺失、漂移 4σ 警告 Slack 普通频道 当日处理
P3 dup_ratio 略升、quiet_minutes 微增 周报汇总 每周 review

P0/P1 必须是「某人正在 oncall」,P2/P3 可以批处理。这套分级和后端服务监控没有本质差别,但量化数据的特殊性是 P0 级别的事故影响是累积的——今天一个错误的因子值进了仓位决策,明天这个仓位就成事实。所以数据管线的 P0 标准要比通用后端更严。

历史回看

每月做一次「数据质量回看」很有价值:把过去 30 天的 data_run_metadata 全捞出来,看哪些指标在恶化趋势上。很多事故是缓慢恶化的——比如某个数据源每天比 SLA 多迟到 30 秒,单看每天没问题,但累计三个月后 SLA 已经名存实亡。趋势分析能在崩溃前发现。


九、把全流程串起来:一个最小例子

最后用一段代码把前面的所有概念串起来——从原始 tick 一路到 PIT 因子值。这个例子不是玩具,它是真实的最小骨架。

import polars as pl
import duckdb
from datetime import datetime, timedelta
from pathlib import Path

ROOT = Path("/data/quant")

def step1_normalize_tick(raw_path: str, sym: str, trade_date) -> str:
    """规范化原始 tick:统一时区、字段、单位。"""
    df = pl.read_parquet(raw_path)
    df = df.with_columns(
        pl.col("ts").dt.cast_time_unit("ns").alias("ts_ns"),
        pl.lit(sym).alias("sym"),
        pl.col("price").cast(pl.Float64).alias("px"),
        pl.col("size").cast(pl.Float64).alias("qty"),
        pl.col("aggressor").cast(pl.Int8).alias("side"),
    ).select(["ts_ns", "sym", "px", "qty", "side"])

    out = ROOT / "norm_tick" / f"sym={sym}" / f"date={trade_date}.parquet"
    out.parent.mkdir(parents=True, exist_ok=True)
    df.write_parquet(out, compression="zstd")
    return str(out)


def step2_build_dollar_bars(norm_path: str, sym: str, threshold: float) -> str:
    """重采样为 dollar bar。"""
    out = ROOT / "dollar_bars" / f"sym={sym}.parquet"
    build_dollar_bars(norm_path, str(out), sym, threshold)  # 见第三节
    return str(out)


def step3_compute_factor(bar_path: str, sym: str, version: str) -> pl.DataFrame:
    """从 dollar bar 上计算 20-bar 动量因子。"""
    bars = pl.read_parquet(bar_path).sort("close_ts")
    bars = bars.with_columns(
        (pl.col("close").log() - pl.col("close").log().shift(20)).alias("mom_20")
    ).drop_nulls()
    return bars.select([
        pl.lit(sym).alias("sym"),
        pl.from_epoch(pl.col("close_ts"), time_unit="ns").dt.date().alias("value_dt"),
        pl.from_epoch(pl.col("close_ts"), time_unit="ns").alias("available_at"),
        pl.col("mom_20").alias("value"),
    ])


def step4_write_factor(df: pl.DataFrame, name: str, version: str) -> None:
    lib = FactorLib(str(ROOT / "factor_lib"))
    lib.write(name, version, df)


def step5_research_query(name: str, version: str, syms: list[str], as_of: datetime):
    lib = FactorLib(str(ROOT / "factor_lib"))
    return lib.fetch(name, version, syms, as_of)

这五步分别对应:接入 → 规范化 → 派生 bar → 派生因子 → 服务查询。在生产里,每一步都被单独的服务/脚本执行,状态记录在 ingest_jobs 表里,质量指标记录在 data_run_metadata 表里。研究员只看到 step5_research_query——这是数据管线对外的唯一入口。


十、结论与边界

回到开头的论点:数据管线是量化系统里少数几个「错了就永远错」的地方。本文给出的方案不是唯一解,但有几条原则任何方案都不能违反。

  1. 原始数据必须落盘且永不覆盖。下游可以重算,上游不能丢失。
  2. PIT 是默认行为,不是可选项。所有查询接口强制传 as_of,没有默认值。
  3. 复权计算自己掌握。不要把复权后价格当作权威输入,要求数据源同时提供原始价和复权因子。
  4. 派生层与原始层分离。bar、因子都是派生层,可以重算;接入与规范化是上游。
  5. 可观测性写进元数据。监控本身是数据管线的一部分,不是事后补救。

适用边界也要说清:

下一篇《幸存者偏差》会接着讨论:本文管线里那些「已经退市的标的」、「指数成分变化」、「数据源覆盖偏差」如何被悄悄塞进研究结论,以及怎么系统性地把它们拎出来。

给读者的最后一条工程建议

如果你正在从零搭一条量化数据管线,强烈建议先把「最简单的一条端到端链路」打通:单一标的、单一数据源、单一因子、单一回测。从原始 tick 到回测结果,全链路跑通后再扩张。常见错误是「先把所有标的、所有数据源都接进来」,结果半年之后还在 debug 接入层、连一根 bar 都还没正经研究过。

数据管线的复杂度在于细节的累积。早一天把端到端跑通,就早一天知道哪些细节是真问题。半年之内多次重构这条管线是正常的;唯一不正常的是「半年都在画架构图、还没跑过任何一条真实数据」。


参考资料

书 / 论文

规范 / 文档

工具 / 库


系列导航

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-05-01 · quant

【量化交易】量化交易全景:从信号到订单的工程链路

量化交易不是策略写得好就能赚钱,更难的是把数据、特征、因子、信号、组合、执行、风控、复盘这八段链路在工程上连成一条不漏数据、不串时间、不丢订单的流水线。本文是【量化交易】系列的总目录与读图,给出八段链路的输入输出、失败模式、不变量清单,并用研究流程图把从一个想法到一笔实盘订单之间所有该过的卡点串起来。

2026-05-01 · quant

【量化交易】市场结构:交易所、做市商、暗池、ECN

系统梳理全球市场结构(Market Structure)的工程图景:从证券交易所、衍生品交易所、加密交易所,到做市商、暗池、ECN/ATS,再到 Maker-Taker 收费、PFOF、Reg NMS 与 MiFID II 的监管影响;给出量化策略选择交易场所的判断框架与基于 ccxt 的多交易所行情聚合代码。

2026-05-01 · quant

【量化交易】市场微结构:订单簿、价差、流动性、冲击

系统讲解市场微结构的核心概念与可计算工具:限价订单簿的数据模型、报价/有效/已实现价差、Roll 模型、四维流动性度量、Kyle's lambda、订单流不平衡(OFI)、Almgren-Chriss 框架下的临时与永久冲击、PIN 与 VPIN、Hawkes 过程,并给出基于 polars 的 L2 增量处理与系数估计代码。

2026-05-01 · quant

【量化交易】订单类型与执行语义:限价、市价、IOC、FOK、冰山

把 Limit、Market、IOC、FOK、Iceberg、Stop、MOO/MOC 这些常被混为一谈的订单类型还原为价格、数量、时效、可见性、触发五个独立维度,并对照 A 股、港股、美股、CME、Binance 五个市场的实际语义差异,给出量化系统中的订单工厂、状态机与风控前置校验的工程实现。


By .