土法炼钢兴趣小组的算法知识备份

【量化交易】特征存储与时间对齐:point-in-time 正确性

文章导航

分类入口
quant
标签入口
#feature-store#point-in-time#time-alignment#reproducibility

目录

把回测里的特征拼接做对,是所有”看起来很赚钱”的策略落到实盘之前必须翻过的一道墙。前一篇 《数据陷阱:幸存者偏差、复权、前视、未来函数》 讨论了数据本身的偏差。这一篇要回答更工程化的一个问题:当你已经有了一份”看起来干净”的原始数据,怎么把它组织成能够被研究、回测、实盘三个场景都安全使用的”特征”?

直觉上这是一份”列存数据库选型 + ETL”的工作。但只要把”特征”这个词替换成”模型在时间 \(t\) 上能够依赖的所有信息”,问题就立刻退化成一个时间正确性(point-in-time correctness,下文简称 PIT)约束:模型在 \(t\) 上看到的每一行,必须只包含在物理时间 \(t\) 之前真实可获取的信息。一切血缘、版本、查询接口、存储后端的设计,都是在这个约束上叠加的工程实现。

通用 ML Feature Store(如 Feast、Tecton、Hopsworks)当然也讲 point-in-time correctness,但它们的关注点是”训练样本与服务样本不一致”。量化场景把这个约束加重到了另一个层级:错一秒就是泄漏,错一天就是夏普凭空多出 0.5;并且这种错误在回测里不会触发任何报错,只会让净值曲线变得更光滑。这就是为什么本文不打算把通用 Feature Store 的概念照搬过来,而是从量化研究的实际工作流出发,给出一套可以直接落地的最小工程实现。

本文环境:Python 3.11、polars 1.7、pandas 2.2、duckdb 1.1、pyarrow 17.0。代码段都在该环境下能直接运行。涉及的 Wind / CRSP / Compustat 字段语义来自厂商公开手册。

风险提示:本文不构成任何投资建议。文中的代码用于演示工程结构,未做交易级容错与权限管理;直接用于实盘需要补充行情对接、风控、清算、灾备等部分。回测越”对”越接近实盘,但不等于实盘一定盈利。


一、特征存储要解决什么问题

1.1 为什么不能直接用一张大宽表

最朴素的做法是把研究员需要的所有特征拼成一张以 (date, symbol) 为主键的大宽表,存成 Parquet 或者放到 ClickHouse。这个做法在策略只有一两个、研究员只有两三个、特征只有几十列的时候一点问题都没有。

问题在它增长以后:

特征存储要解决的,就是这三类问题:复用、版本化、训练-服务一致性。它不是一个新数据库,而是一层契约:所有特征的定义、版本、依赖、可用时间都被显式管理,研究员、回测引擎、实盘系统通过这层契约共享同一份”事实”。

1.2 与一般 ML Feature Store 的区别

把 Feast 的官方文档(feast 0.40 版本)和 Tecton 的设计文档摆在一起,能看到通用 Feature Store 的核心抽象是 Entity + FeatureView + point_in_time_join。这套抽象对推荐、风控的批 + 在线场景已经够用,但是放到量化里有几个地方会卡:

维度 通用 ML Feature Store 量化特征仓库
时间正确性约束 服务时刻一致即可 任意历史时刻都必须可还原
时间维度 单一 event_timestamp bitemporal:valid_time + transaction_time
数据修订(restatement) 通常忽略 必须显式建模(财报修订、复权调整、tick 重传)
拼接语义 as-of join,可容许小误差 as-of join,左闭右开严格不等式,按 transaction_time
在线特征 高 QPS 单点 lookup 低 QPS,但要严格匹配离线版本
特征粒度 用户、商品 标的 × 时间 × 多频段;面板数据为主
漂移监控 训练 vs 服务的统计分布 加上”今天的快照 vs 历史快照”的逐字段一致性

通用 Feature Store 默认的 event_timestamp 是单时间维度,这在推荐场景里是合理的——一个 user click 事件只发生过一次,不会被”修订”。量化里几乎所有”基本面”类特征都会被修订:年报会更正季报,复权因子会因为新的分红事件而回溯刷新历史,甚至 tick 数据本身在交易所端就有”撤单订正”。这意味着仅靠 event_timestamp 一个时间维不够,必须显式区分”数据指代的时刻”(valid time)和”数据真正可被读到的时刻”(transaction time),才能在任意一个历史时间点 \(t\) 还原出”在 \(t\) 我能看到什么”。这就是下一节要讲的 bitemporal 模型。

1.3 一句话定义

把这套思路压缩成一句话:

量化特征仓库 = bitemporal 数据模型 + 内容寻址的版本化 + as-of join 查询接口 + 训练-服务共享的转换函数。

后面每一节都是在拆这句话的某一部分。


二、点时正确性:valid_time 与 transaction_time

2.1 单时间维度模型为什么不够

考虑 A 股某公司 2024 年一季报:

一个研究员在 2024-09-01 站在屏幕前,问:“2024-05-01 那天,这家公司的应收账款是多少?”

如果数据库里只有一个时间维度 report_date = 2024-03-31,并存了一行”调整后”的应收账款值,那么这位研究员看到的就是 2024-08-15 之后才存在的那个值。可在 2024-05-01,市场上能拿到的是 4 月 25 日发布的”调整前”的值。模型如果在回测里用了”调整后”的值去预测 5 月 1 日的股价,那是泄漏未来。

所以”事件时刻”和”数据落库时刻”必须分开存。这就是 bitemporal data model 的最小动机。

2.2 两个时间维度的精确定义

引用 Snodgrass 在 Developing Time-Oriented Database Applications in SQL(1999)里给出的术语:

两个时间维度组合起来,每条特征记录至少四个字段:

entity_id, feature_name, value, valid_from, valid_to, txn_from, txn_to

为了简化,通常退化成”开端点 + null 表示开区间”:

entity_id, feature_name, value, valid_time, transaction_time

后续行的 (valid_time, transaction_time) 隐含了前一行的失效。这就是 SQL:2011 里 system-versioned table 的简化版本。

2.3 PIT 查询的标准形式

“在物理时间 \(T\) 站在屏幕前,问 entity \(e\) 的特征 \(f\) 在 valid_time \(V\) 上的值是多少”,标准 SQL 写法:

SELECT value
FROM features
WHERE entity_id = :e
  AND feature_name = :f
  AND valid_time <= :V
  AND transaction_time <= :T
ORDER BY valid_time DESC, transaction_time DESC
LIMIT 1;

注意两个不等式都是 <=,并且排序是先 valid_timetransaction_time。这两条规则结合起来,刚好对应”我在 \(T\) 能看到的、关于不晚于 \(V\) 的最新事实”。

这条查询是整个特征仓库的命脉。错一个等号就是泄漏:把 <= 写成 < 会少一行边界数据;把 transaction_time <= 漏掉就退化成单时间维度,等于偷看未来。

2.4 一个最小可运行的例子

import polars as pl
from datetime import date

# 同一个事实被报告两次:4-25 公告原始值,8-15 修订后值
df = pl.DataFrame({
    "entity_id": ["600000", "600000", "600000"],
    "feature": ["receivables_q1", "receivables_q1", "receivables_q2"],
    "valid_time": [date(2024, 3, 31), date(2024, 3, 31), date(2024, 6, 30)],
    "transaction_time": [date(2024, 4, 25), date(2024, 8, 15), date(2024, 8, 15)],
    "value": [1.23, 1.18, 1.41],
})

def pit_lookup(df, entity, feature, valid_at, txn_at):
    return (
        df.filter(
            (pl.col("entity_id") == entity)
            & (pl.col("feature") == feature)
            & (pl.col("valid_time") <= valid_at)
            & (pl.col("transaction_time") <= txn_at)
        )
        .sort(["valid_time", "transaction_time"])
        .tail(1)
    )

# 站在 2024-05-01 看 Q1:拿到 1.23
print(pit_lookup(df, "600000", "receivables_q1", date(2024, 5, 1), date(2024, 5, 1)))

# 站在 2024-09-01 看 Q1:拿到修订后 1.18
print(pit_lookup(df, "600000", "receivables_q1", date(2024, 9, 1), date(2024, 9, 1)))

两次查询返回不同的值,但是数据本身没有被覆盖——任何历史时刻的”当时视角”都可以重建。这是 bitemporal 模型相对单时间维度模型的根本优势。

PIT as-of-join 时间线

上图里有三条特征事件:v1、v2、v3。每条事件有两个时间戳:橙色的 valid_time(特征指代的现实时刻)和紫色的 transaction_time(特征落库时刻)。两个时间戳之间的延迟来自两类原因:上游数据源的发布延迟(基本面、宏观)以及计算管道本身的处理时延(行情聚合、特征算子)。如果回测把”行情时刻 10:00 上看到的最新特征”用 valid_time 去匹配,会把 v1 当作可用,但实际上 v1 直到 10:08 才落库;用 transaction_time 去匹配,10:00 上还看不到 v1,必须等到 10:30 这一条行情才能拼上 v1。这就是图里红色与绿色两个区域的差别。

2.5 不是所有数据都是 bitemporal

不要为了”理论上完整”就把每张表都改成两个时间维度。下面这些数据其实是单时间维度:

要 bitemporal 的,是那些”上游会修订、不修订的版本本身有研究价值”的数据:

判断标准很简单:问一句”如果今天的我重新跑一遍这条数据,得到的结果和当年那条一样吗?“ 如果不一定,就上 bitemporal。


三、特征的时间维度

bitemporal 解决了”事实本身可能被修订”的问题,但没解决”特征本身就是时间窗口聚合”的问题。一个 20 日均线在 2024-04-01 上的值,意味着用了 [2024-03-04, 2024-04-01] 这 20 个交易日的数据。这里又有三个时间概念,混淆任意两个都会引发偏差。

3.1 三个时间概念

举例:用 09:30 到 10:00 的成交计算的 30 分钟波动率:

很多回测框架默认 feature_time == effective_time,这在低频日级回测里不出问题(一天的延迟几十毫秒可以忽略),但在分钟级、tick 级的策略里会引入隐性的前视偏差。

3.2 滚动、累计、衰减的边界写法

定义清楚边界:

形式 数学定义 feature_time 实现要点
滚动均值 \(\mu_t = \frac{1}{w}\sum_{i=t-w+1}^{t} x_i\) \(t\) 区间右闭,左端 \(t-w+1\) 用交易日历计算,不用自然日
累计求和 \(S_t = \sum_{i \le t} x_i\) \(t\) 跨年要明确是否清零
指数衰减 \(\mu_t = \alpha x_t + (1-\alpha)\mu_{t-1}\) \(t\) 半衰期对应的 \(\alpha\) 要写在元数据里
\(k\) 期值 \(x_{t-k}\) \(t\) \(k\) 是交易日数还是自然日数要约定
跨期变化 \(\Delta_k x_t = x_t - x_{t-k}\) \(t\) 跟前 \(k\) 期值同样要约定日历

一个非常常见的错误是在面板数据上用 pandas 的 rolling("20D"):它走的是自然日,会把停牌日当作有数据来对待。换到交易日历必须先把面板按交易日对齐成稠密矩阵,再用 rolling(20)

3.3 与 PIT 的关系

把 bitemporal 和窗口聚合放到一起:

这条规则很关键。考虑用基本面”应收账款”算过去 4 个季度的均值:

所以这个均值特征的 transaction_time 不是”算这一行的当天”,而是”窗口内最晚一份数据真正落库的那一天”。如果你把 transaction_time 简单地等同于”特征生成 cron 跑的那一天”,回测里就会出现 Q4 财报还没披露但你已经在用 Q4 数据算均值——这是一个非常隐蔽的前视偏差。


四、存储后端选型

存储后端的选择不是孤立的技术品味,而是受三个因素约束:数据量、读写比、查询模式。下面这张表把量化场景常见的几种后端按这三个维度做了对比。

后端 容量极限(单机) 写吞吐 as-of join 友好度 列存 工程复杂度
Parquet + Arrow + DuckDB 10 TB+(依赖磁盘) 中(批写) 高(DuckDB 原生 ASOF JOIN)
ClickHouse 100 TB+ 中(需要 array join 技巧)
QuestDB 数 TB 极高(亿级 tick/秒) 高(原生 ASOF JOIN)
TimescaleDB 数 TB 中(hypertable + LATERAL) 否(行存为主)
Polars in-memory 受内存限制 高(asof_join) 极低
Redis / KV 受内存限制 极高 不适合(无 join)

4.1 Parquet + Arrow + DuckDB:研究和回测的默认组合

研究阶段不需要 OLAP 集群。一台带 NVMe 的工作站 + 几张 Parquet 文件 + DuckDB 已经能覆盖几年的中国 A 股 minute bar、几年的美股 tick top-of-book,而且单机部署、无运维。

按照”特征 × 月份”做物理分区:

features/
  factor=mom_20/
    year_month=202401/part-0.parquet
    year_month=202402/part-0.parquet
  factor=turnover_20/
    year_month=202401/part-0.parquet

DuckDB 直接 SELECT * FROM 'features/**/*.parquet',靠 Hive partitioning 做分区裁剪。这种布局的好处:

DuckDB 1.0 之后原生支持 ASOF JOIN,这一项让它从”OLAP 玩具”变成了量化研究的事实标准之一:

SELECT
    p.ts AS price_ts, p.symbol, p.last_price, f.value AS pe_ttm
FROM prices p
ASOF LEFT JOIN features f
    ON p.symbol = f.entity_id
   AND p.ts >= f.transaction_time
WHERE f.feature = 'pe_ttm';

这条 SQL 完整表达了”对每一行行情,找到 transaction_time 不晚于行情时间的最新一条 PE_TTM”。语义干净到不需要在 Python 层再写一遍。

4.2 ClickHouse:策略上线之后的中间存储

研究跑通进入实盘之后,特征量级从”几亿行”长到”几百亿行”,研究员需要的不是新查询能力而是”在可接受的时间内查任意一段历史”。这时候 DuckDB 单机吃不住,进 ClickHouse 是常见选择。

ClickHouse 的 ReplacingMergeTree 配合 (entity_id, valid_time, transaction_time) 主键,可以把 bitemporal 写得很自然:

CREATE TABLE features (
    entity_id String,
    feature String,
    valid_time DateTime64(6),
    transaction_time DateTime64(6),
    value Float64,
    version String
) ENGINE = ReplacingMergeTree(transaction_time)
ORDER BY (entity_id, feature, valid_time, transaction_time)
PARTITION BY (feature, toYYYYMM(valid_time));

但 ClickHouse 没有原生 ASOF JOIN(截至 24.x 版本提供了 ASOF JOIN 关键字,但限制比 DuckDB 多:仅支持等值条件 + 一个不等值条件,且不能跨表多列)。在量化场景里,ASOF JOIN 通常需要 entity_id 等值 + transaction_time 不等值,刚好踩在 ClickHouse 的支持边界上,能用,但写复杂时要小心。

4.3 QuestDB / TimescaleDB:tick 级时序

如果要存原始 tick(而不是衍生特征),QuestDB 在 tick 入库速度上是经过实测的领先者。它的 ASOF JOIN 语法和 DuckDB 几乎一致,可以无缝迁移研究代码。TimescaleDB 是 PostgreSQL 扩展,行存为主,对 tick 数据的压缩比不如 QuestDB / ClickHouse,但好处是研究员在公司只熟悉 PostgreSQL 的时候省学习成本。

工程上的判断:tick 入库走 QuestDB / ClickHouse,特征仓库走 Parquet + DuckDB(研究)+ ClickHouse(生产),是 2024 年中国量化机构里最常见的分层。

4.4 内存形式:Polars 与 Arrow

Polars 不是存储后端,但是把 Parquet 文件读进来后做 PIT 拼接最自然的形式。Polars 1.x 的 join_asof 接口与 DuckDB 的 ASOF JOIN 在语义上一致,但运行在单进程内,不用过 SQL parser。

import polars as pl

prices = pl.scan_parquet("prices/**/*.parquet")
features = pl.scan_parquet("features/factor=pe_ttm/**/*.parquet")

joined = (
    prices.sort(["symbol", "ts"])
    .join_asof(
        features.sort(["entity_id", "transaction_time"]),
        left_on="ts",
        right_on="transaction_time",
        by_left="symbol",
        by_right="entity_id",
        strategy="backward",
    )
    .collect(streaming=True)
)

strategy="backward" 意味着对每个左表行,找右表里 transaction_time <= ts 的最大那一行。Polars 在内部把右表按 entity_id 分桶后做有序合并,复杂度近似 \(O(n+m)\),比 SQL 引擎的 hash join 在这个语义下更高效。


五、版本化与血缘

5.1 为什么需要”特征版本”

研究员把 volatility_20d 的实现从”过去 20 个交易日 close-to-close 收益的标准差”改成了”过去 20 个交易日 high-low 范围的 Parkinson 估计”。同一个名字,不同口径。如果不做版本化,下面这些事情会发生:

工程上的对策是把”特征定义”理解成内容寻址(content-addressable)的对象:每一份代码 + 参数 + 上游数据快照,hash 出一个唯一 ID,存到独立分区。

import hashlib, inspect, json

def feature_version(func, params, upstream_hashes):
    src = inspect.getsource(func)
    payload = json.dumps({
        "src": src,
        "params": params,
        "upstream": sorted(upstream_hashes),
    }, sort_keys=True).encode()
    return hashlib.sha256(payload).hexdigest()[:12]

落到目录结构上:

features/
  factor=volatility_20d/
    version=a1b2c3d4e5f6/  # close-to-close
      year_month=202401/part-0.parquet
    version=f6e5d4c3b2a1/  # Parkinson
      year_month=202401/part-0.parquet

研究员通过 factor + version 显式声明依赖。任意时刻”最新版”由一张 mapping 表维护,但 mapping 不参与物理覆盖。

5.2 上游变更如何传播

特征 A 依赖特征 B 依赖原始数据 C。当 C 在 transaction_time T 上发生修订(例如分红除权造成复权因子刷新),A 和 B 必须在某个 transaction_time T’ >= T 上产生新的版本,并把新版本的物理记录写入新行——而不是覆盖旧行。

正确的做法:

  1. 数据治理层在 C 的修订事件上发布一条变更通知。
  2. 调度系统按依赖图找出所有受影响的下游特征。
  3. 对每个下游特征,在其 transaction_time 维度上追加新行;旧行保持不动。
  4. 物化视图(如果有)按依赖关系级联失效并重算。

这套机制在 DVC、LakeFS、Pachyderm 这些工具里都有不同程度的实现。如果不想引入额外依赖,最小可工作的做法是:

5.3 特征哈希:不只是版本号

哈希除了用作版本,还能解决一个细微但麻烦的问题:研究员在两台不同机器上各自重跑同一份特征代码,对同一段输入是不是真的产生完全相同的输出?

如果哈希是基于代码 + 上游 + 参数计算的,那么两份输出应该字节一致。把每个分区文件的 SHA256 作为分区元数据存一份,跨机器对比就能立刻发现”代码看起来一样但浮点结果不一样”这类问题——通常是 BLAS 实现差异、并行 reduce 顺序差异、或者依赖库小版本差异引起的。

这个一致性检查在策略上线前必须做一次。否则线上推理用 MKL,研究员本地用 OpenBLAS,两份特征值在第 6 位小数上有偏差,模型在边界样本上的预测就会翻号。


六、查询接口:as_of_join 的几种写法

把 PIT 查询封装成可以直接调用的接口,研究员就不会自己写错。下面给出三种实现:纯 Polars、纯 DuckDB、以及把两者混在一起的 hybrid 版本。

6.1 Polars join_asof:研究阶段首选

import polars as pl

def as_of_join(left: pl.LazyFrame, right: pl.LazyFrame,
                left_time: str, right_time: str,
                by_left: str, by_right: str) -> pl.LazyFrame:
    """对 left 表的每一行,按 by 列分组,从 right 表里取 right_time <= left_time 的最大那一行。"""
    return (
        left.sort([by_left, left_time])
        .join_asof(
            right.sort([by_right, right_time]),
            left_on=left_time,
            right_on=right_time,
            by_left=by_left,
            by_right=by_right,
            strategy="backward",
        )
    )

把它放到 utility 模块里,研究员的代码就只剩”声明左右表 + 调一次”。strategy="backward" 不要给研究员选项——量化场景下”forward”几乎一定是 bug。

6.2 DuckDB ASOF JOIN:上规模数据

当面板数据超过几亿行、polars 单进程吃不下,DuckDB 的 ASOF JOIN 是最方便的选择。它能直接吃 Parquet 路径,不需要先加载到内存:

import duckdb

con = duckdb.connect()
con.execute("""
    CREATE OR REPLACE VIEW prices AS SELECT * FROM 'data/prices/**/*.parquet';
    CREATE OR REPLACE VIEW pe_ttm AS
        SELECT * FROM 'data/features/factor=pe_ttm/version=a1b2c3d4e5f6/**/*.parquet';
""")

result = con.execute("""
    SELECT p.ts, p.symbol, p.close, f.value AS pe_ttm
    FROM prices p
    ASOF LEFT JOIN pe_ttm f
        ON p.symbol = f.entity_id
       AND p.ts >= f.transaction_time
    WHERE p.ts BETWEEN '2024-01-02' AND '2024-06-30';
""").pl()  # 直接拿到 polars DataFrame

DuckDB 的 ASOF JOIN 在语义上等价于 Polars 的 strategy="backward",但执行计划由 DuckDB 优化器决定,能用上分区裁剪与谓词下推。在 2024 年的工作站(32 核、128GB、NVMe)上,对 5 年中国 A 股 minute bar × 50 个特征做 ASOF JOIN,端到端在两位数秒级。

6.3 一致性测试

无论哪种实现,都必须有一份”金标准”测试集来验证:

def test_asof_join_correctness():
    # 构造两条特征:在 t=10 落库,valid=t-5
    features = pl.DataFrame({
        "entity_id": ["A", "A"],
        "transaction_time": [10, 20],
        "valid_time": [5, 15],
        "value": [1.0, 2.0],
    })
    prices = pl.DataFrame({
        "symbol": ["A", "A", "A", "A"],
        "ts": [9, 10, 19, 20],
    })
    out = as_of_join(prices.lazy(), features.lazy(),
                     "ts", "transaction_time", "symbol", "entity_id").collect()
    # ts=9 时还看不到任何特征
    # ts=10 时看到 value=1.0
    # ts=19 时看到 value=1.0
    # ts=20 时看到 value=2.0
    assert out["value"].to_list() == [None, 1.0, 1.0, 2.0]

这个测试很短,但它把 PIT 拼接的边界条件钉死了。每次升级 polars / duckdb / pyarrow 都要重跑一遍。

6.4 一个常见的踩坑:等号边界

ts >= transaction_time 还是 ts > transaction_time?两者差一行。

工程判断(不是规范规定):在 transaction_time 上用 >=,在 valid_time 上用 <=,并把”特征 transaction_time 上加一个 ε(计算延迟)“作为统一约定。 这样能保证:

把这个约定写进 utility 函数,永远不要让研究员手写不等号。


七、在线/离线一致:消除 training-serving skew

7.1 skew 的常见来源

把训练集的特征和服务时(在线推理时)的特征放在一起做差分,会发现差异通常来自下面几类:

来源 典型表现 检测方式
时间窗 训练用 20 个交易日,服务用 20 个自然日 算同一日期的特征对比
边界条件 停牌日、新股、退市日处理不同 在边界标的上专门校验
数值精度 float32 vs float64、累加顺序 用 SHA256 比对二进制
缺失填充 训练用 ffill,服务用 0 抽查 NaN 多的列
时区与日历 UTC vs 本地、节假日列表不同 跨年那一周对比
上游版本 训练用 v1,服务漏升级到 v2 写入特征时记录依赖版本

7.2 用同一份代码

最干净的解法是离线和在线共享同一份 transform 函数。具体做法:

  1. 把每个特征定义成一个纯函数 def f(state, event) -> (new_state, value)
  2. 离线侧用 batch 引擎(polars / duckdb)对历史事件序列扫一遍这个函数。
  3. 在线侧用流式引擎(自己写的 actor 或 Flink)对实时事件序列扫一遍同一个函数。

这样无论是训练还是服务,特征值的产生路径完全一样,边界条件、缺失处理、累加顺序都强制对齐。

from dataclasses import dataclass

@dataclass
class RollingMeanState:
    buf: list
    window: int

def rolling_mean(state: RollingMeanState, event):
    state.buf.append(event["value"])
    if len(state.buf) > state.window:
        state.buf.pop(0)
    return state, sum(state.buf) / len(state.buf)

离线就是 reduce(rolling_mean, events, initial_state),在线就是把它包成 actor handler。两者跑出来的中间状态、最终输出,逐 tick 对得上。

7.3 Shadow trading

即便代码统一,部署环境差异也会带来 skew。最稳的做法是上线后做一段时间的 shadow trading:

shadow trading 时间长度按策略性质决定:对低频日线策略 5 个交易日就够了,tick 级 HFT 通常要跑 2-4 周才能把节假日、涨跌停、临停等异常路径都覆盖到。

特征生命周期:离线/在线统一

上图的关键不是”两条管道”而是”一份逻辑表达”。transform.py 这个盒子在离线和在线两条路径上各出现一次,但它们引用的是同一份代码、同一个 version hash。离线 store 是 bitemporal 的、可回放的;在线 store 只保留每个实体最新一条,是离线 store 的”投影”。shadow trading 把在线的输出回灌到离线 store 的另一个分区,使得 EOD 对差成为一次普通的列比较查询,不需要专门的对账系统。


八、生产工程:监控、回填、与回测引擎解耦

8.1 特征仓库不是回测引擎的内嵌组件

很多团队第一版的实现里,回测引擎和特征仓库写在一起。这在团队还小的时候没问题,等到第二条策略上线、第二个研究员加入、要做不同频率的回测时,问题立刻出现:每个研究员都在改回测引擎的取数代码,特征定义到处复制。

解耦的边界很简单:

这个边界一旦立起来,回测和实盘的差异只剩”时钟来源”。这是最小可行的解耦方式。

8.2 监控的几条主线

特征仓库的可观测性,至少覆盖这几条:

  1. 新鲜度(freshness):每个特征当前最新的 transaction_time 距离现在多久。低于约定阈值(例如基本面类 24h、行情类 1 分钟)报警。
  2. 行数/分布漂移:每天每分区的行数、关键统计量(mean/std/p95/null_ratio)写入监控 DB,与历史 7 天均值对比,超 3 倍标准差报警。
  3. 版本漂移:依赖版本 mapping 与实际查询使用的版本是否一致。
  4. 离线-在线一致:shadow trading 的逐字段差值。
  5. 审计完整性feature_writes 表的写入是否单调,是否有 transaction_time 倒退。

8.3 回填(backfill)的正确做法

回填是这件事的关键风险点。研究员发现一个 bug,要”重跑过去三年的某个特征”。错误做法:直接覆盖物理记录。正确做法:

  1. 给重跑出的数据分配一个新的 version
  2. 写入新版本的目录,旧版本保留。
  3. 修改 mapping 表把”最新版”指到新 version。
  4. 已经依赖旧 version 的策略不受影响(除非主动升级)。

这条做法最大的代价是磁盘——每次回填都要存一份完整副本。但相对于”覆盖后无法复现旧回测”的代价,磁盘是最便宜的那部分。可以用对象存储 + 生命周期策略把超过 N 个月的旧版本归档到冷存储。

8.4 重算(recompute)vs 回填

两者经常被混用,有必要区分:

回填只增加 transaction_time 上的新行,valid_time 维度上是”新覆盖了之前没数据的那段”。重算等于发布新版本,valid_time 维度全段覆盖,但落到的是新 version 分区。

绝大多数研究员把这两件事混为”重跑数据”,工程上必须明确区分。一条规则:任何对历史 valid_time 上”已有值”的修改,都必须走新 version;只有对历史 valid_time 上”原本就缺”的补齐,可以走原 version 的回填。

8.5 与回测引擎的接口稳定性

特征仓库对回测引擎的接口要尽量窄:

class FeatureStore:
    def get_at(self, entity_id, feature, valid_at, txn_at, version=None) -> float | None: ...
    def asof_join(self, left_df, feature, by_left, time_left, version=None) -> DataFrame: ...
    def list_versions(self, feature) -> list[VersionMeta]: ...

接口越窄,研究员越不容易绕过约束。任何看起来像”我先把整张表读出来再 self join”的需求,都应该在 review 时拒掉,让研究员说出他真正想要的”在某个时间点上的某个值”。

8.6 一份上线前的检查表

把前面讨论过的工程细节压缩成一份 Checklist:


九、工程判断与个人观点

9.1 何时不该上 Feature Store

不是每个团队都需要一个独立的特征仓库。下面的情况,自己手写两层 Parquet + 一份 utility 函数就够了:

引入 Feast 这类框架,前期工程成本在两到四周;如果没有上面三条触发因素,这段时间花在写策略上回报更高。我在实际工程里见到过几次”先上 Feast 再说”的决策,最后变成研究员跟框架的抽象作斗争,反而把研究节奏拖慢。

9.2 通用 Feature Store 框架在量化里的定位

Feast、Tecton、Hopsworks 这几个框架,对推荐风控类场景做得比较干净,但是放到量化里有几个不太顺手的地方:

我认为更务实的路径是:自建一层薄抽象,把 PIT 语义、版本化、ASOF JOIN 三件事钉死,存储后端用 Parquet + DuckDB / ClickHouse,不要为了”用上 Feature Store”而引入 Feast。 当团队规模到了 10 个研究员、5 条以上策略并行、几百个特征互相依赖的时候,再评估是否要切到 Feast / Tecton。

9.3 bitemporal 的工程代价

bitemporal 不是免费的。每一行多两个时间字段,存储膨胀大约 10%-20%(这对 Parquet 列存影响有限)。但真正的代价在心智:

在我看来,这是必要的代价。任何想”先简单一点,单时间维度跑起来再说”的尝试,最终都会以”上线后发现数据漂移、回查不到当时状态”告终。bitemporal 是一次性投入的复杂度,不是反复出现的隐性 bug。这个判断和 Snodgrass 在他书里 1999 年给出的几乎完全一样——只不过 1999 年的关注点是审计和合规,2024 年的关注点是回测可复现。

9.4 一个被低估的细节:交易日历

绝大多数 PIT 错误最后查出来的根因是”日历对错了”。日历错有几种:

把交易日历本身做成一个 bitemporal 的特征:valid_time 是某个日期,value 是”这一天是不是交易日 / 是否半日市”,transaction_time 是日历版本发布时间。这样跨年回测时可以用”当时的日历版本”,避免今天补丁了一个历史日历又把过去的回测打乱的尴尬。

9.5 关于”实时特征”的边界

很多团队上来就想做”100ms 级实时特征”,但仔细问一下需求,发现真正用得上的实时特征不超过五个(盘口压力、最近 N 笔大单、累计成交方向)。剩下的所谓实时特征,本质是”低延迟的批处理”。

工程判断:先把 1 秒级 / 1 分钟级的特征做对、做齐、做版本化,再上 100ms 以下的真实时特征。 真实时特征对架构的要求显著提升(共享内存、无锁队列、状态持久化),用错时机会拖慢整体迭代速度。


十、把上面这些拼到一起:一个最小可运行的特征仓库

下面的代码不是产品级实现,但足够把前面所有约束钉死。它演示:

# file: mini_fs.py
from __future__ import annotations
import hashlib, inspect, json, os
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import polars as pl

ROOT = Path("./fs")

@dataclass(frozen=True)
class FeatureMeta:
    name: str
    version: str
    upstream: tuple[str, ...]
    code_hash: str
    created_at: str

def _hash_obj(obj) -> str:
    return hashlib.sha256(
        json.dumps(obj, sort_keys=True, default=str).encode()
    ).hexdigest()[:12]

def define_feature(name: str, upstream: list[str]):
    def deco(fn):
        code = inspect.getsource(fn)
        code_hash = _hash_obj(code)
        version = _hash_obj({"code": code_hash, "upstream": sorted(upstream)})
        meta = FeatureMeta(name, version, tuple(sorted(upstream)),
                           code_hash, datetime.utcnow().isoformat())
        fn.__feature_meta__ = meta
        return fn
    return deco

def write_partition(name: str, version: str, df: pl.DataFrame):
    """对每条记录附 transaction_time,并写入分区目录;从不覆盖。"""
    txn = datetime.utcnow()
    df = df.with_columns(pl.lit(txn).alias("transaction_time"))
    out_dir = ROOT / f"factor={name}" / f"version={version}" \
              / f"year_month={txn:%Y%m}"
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / f"part-{txn:%Y%m%d%H%M%S}.parquet"
    df.write_parquet(out_path)
    _audit_log(name, version, txn, len(df), str(out_path))
    return out_path

def _audit_log(name, version, txn, n_rows, path):
    log_path = ROOT / "_audit.jsonl"
    log_path.parent.mkdir(parents=True, exist_ok=True)
    with open(log_path, "a") as f:
        f.write(json.dumps({
            "ts": txn.isoformat(),
            "feature": name, "version": version,
            "rows": n_rows, "path": path,
        }) + "\n")

def read_feature(name: str, version: str | None = None) -> pl.LazyFrame:
    base = ROOT / f"factor={name}"
    if version is None:
        # 取目录里最新一个版本
        version = sorted(p.name.split("=")[1] for p in base.iterdir())[-1]
    pattern = str(base / f"version={version}" / "**" / "*.parquet")
    return pl.scan_parquet(pattern)

def asof_lookup(prices: pl.LazyFrame, feature_name: str,
                left_time: str = "ts", left_by: str = "symbol",
                version: str | None = None) -> pl.LazyFrame:
    feat = read_feature(feature_name, version)
    return (
        prices.sort([left_by, left_time])
        .join_asof(
            feat.sort(["entity_id", "transaction_time"]),
            left_on=left_time, right_on="transaction_time",
            by_left=left_by, by_right="entity_id",
            strategy="backward",
        )
    )

# --- 一个真实例子:20 日 close-to-close 收益的滚动均值 ---
@define_feature(name="ret_mean_20", upstream=["prices"])
def compute_ret_mean_20(prices: pl.DataFrame) -> pl.DataFrame:
    return (
        prices.sort(["symbol", "ts"])
        .with_columns(
            pl.col("close").pct_change().over("symbol").alias("ret"),
        )
        .with_columns(
            pl.col("ret").rolling_mean(window_size=20).over("symbol").alias("value"),
        )
        .select([
            pl.col("symbol").alias("entity_id"),
            pl.col("ts").alias("valid_time"),
            "value",
        ])
        .drop_nulls()
    )

if __name__ == "__main__":
    # 假设外部加载了 prices DataFrame
    prices = pl.read_parquet("data/prices.parquet")
    out = compute_ret_mean_20(prices)
    meta = compute_ret_mean_20.__feature_meta__
    write_partition(meta.name, meta.version, out)

这份代码省略了错误处理、并发写入、对象存储后端、依赖版本声明等工业细节,但它已经把 PIT 的关键约束都嵌进去了:

把它扩展到生产,主要是把 ROOT 从本地路径换成 S3 / HDFS、加上 ClickHouse 索引层、补充并发与重试。但底层的语义模型不需要改。


十、附录 A:bitemporal 之外的几个边界问题

把 bitemporal、版本化、ASOF JOIN 三件事钉死之后,系统里还有几类问题不会被它们自动消除。这些问题在前面的章节里都有零散提及,这里集中拆开讲。

A.1 复权因子的二次注入

复权因子(adjustment factor)本身就是一个被反复修订的衍生量。常见的踩坑顺序是:

  1. 行情存了”未复权”的成交价。
  2. 复权因子表存了 (symbol, valid_time, factor),每次新增分红/送股事件时追加一行。
  3. 研究员算因子时用 close * factor 得到”前复权”价格。

问题在第 3 步。研究员要的”前复权”,是站在某个截止日往回看,把这一天的价格设为基准;但复权因子表里的 factor 通常是站在”今天”的视角算的,把今天设为基准。两个基准点不一样,结果就错。

正确的做法:对每条原始行情,在 ASOF JOIN 复权因子时,把复权因子 = 1 / (今天的因子 / 历史那一天的因子) 计算成”截止日为查询时刻 T”的相对因子。这要求复权因子也走 bitemporal 路径——不同 transaction_time 上”今天的因子”不一样。

在工程上更稳妥的做法是把”前复权价”做成一个特征,而不是查询时计算。这样 PIT 拼接逻辑只调一次 ASOF JOIN,就拿到当时视角下的前复权价。

A.2 多频段聚合的对齐口径

研究员经常需要”用日线特征预测分钟级方向”,或者”用分钟级特征预测日线方向”。两边的 transaction_time 节奏不一样,对齐时容易踩坑。

低频特征拼到高频上的规则相对干净:用 ASOF JOIN,每条高频样本拿到不晚于自己 transaction_time 的最新低频值。

高频特征聚合到低频的规则更绕。常见错法:把当天所有分钟特征的均值赋给”当日”,但当日 EOD 收盘前的最后一分钟也被纳入了均值。如果模型的预测目标是”次日开盘方向”,这个均值只能用收盘后的 transaction_time,不能用收盘前。否则模型会”在 14:55 用了 14:58 还没发生的均值”。

工程实现:聚合特征的 transaction_time 取 max(window 内所有原始事件的 transaction_time) + εε 是计算延迟,通常给 1 秒就够。然后把它写进特征仓库,研究员通过常规 ASOF JOIN 拿。

A.3 缺失值的语义

bitemporal 里的”缺失”有两种:

ASOF JOIN 的默认行为对两者都返回 NULL,但研究员处理时通常要区分:真缺失要 ffill 或者跳过;假缺失要等下一次 transaction_time 到来。

把这两种情况显式编码成两列:value + availability,availability 取值 available / pending / not_listed / suspended,比单纯一个 NULL 携带的信息多得多。模型层在处理时也能用上:对 pending 状态的特征用最近一次的可用值,对 not_listed 跳过样本。

A.4 时间戳精度

行情时间戳的精度从秒到纳秒都有人在用。要在特征仓库里统一一种:

A 股 minute bar 默认使用上海时区(UTC+8),美股使用 America/New_York(含夏令时)。两个市场的特征落到同一张表时,存 UTC 是唯一不出错的做法。研究员看的时候再转回本地时区。

A.5 字符串实体 ID 的稳定性

entity_id 看起来是字符串就完事了,但实际上有几个坑:

工业做法:内部用 permanent_id(PERMNO、Wind 的 s_info_windcode、CRSP 的 PERMNO)作为 entity_id,外部展示时再映射回 ticker。这样跨年研究就不会因为 ticker 复用而错配。CRSP 的 PERMNO 是这件事的事实标准之一。


十一、附录 B:一份可运行的端到端示例

把第十节的代码放到一起,再补上 prices 数据、回测查询,构成一个端到端能跑的演示。环境:Python 3.11、polars 1.7、numpy 1.26。

# file: demo_pipeline.py
import numpy as np, polars as pl
from datetime import datetime, timedelta
from pathlib import Path
import shutil

from mini_fs import (define_feature, write_partition,
                     asof_lookup, read_feature, ROOT)

# 清理上一次运行
if ROOT.exists(): shutil.rmtree(ROOT)

# 1. 造一份合成行情:5 只标的,60 天日线
np.random.seed(42)
symbols = [f"S{i:03d}" for i in range(5)]
dates = [datetime(2024, 1, 1) + timedelta(days=i) for i in range(60)]
rows = []
for s in symbols:
    p = 100.0
    for d in dates:
        p *= (1 + np.random.normal(0, 0.01))
        rows.append({"symbol": s, "ts": d, "close": p})
prices = pl.DataFrame(rows)

# 2. 计算特征:20 日均收益
@define_feature(name="ret_mean_20", upstream=["prices"])
def compute_ret_mean_20(prices: pl.DataFrame) -> pl.DataFrame:
    return (
        prices.sort(["symbol", "ts"])
        .with_columns(pl.col("close").pct_change().over("symbol").alias("ret"))
        .with_columns(pl.col("ret").rolling_mean(window_size=20).over("symbol").alias("value"))
        .select([
            pl.col("symbol").alias("entity_id"),
            pl.col("ts").alias("valid_time"),
            "value",
        ])
        .drop_nulls()
    )

out = compute_ret_mean_20(prices)
meta = compute_ret_mean_20.__feature_meta__
print("写入版本:", meta.version)
write_partition(meta.name, meta.version, out)

# 3. ASOF JOIN:把特征拼回到行情
joined = asof_lookup(prices.lazy(), "ret_mean_20").collect()
print(joined.tail(5))

# 4. 演示版本切换:换一个口径
@define_feature(name="ret_mean_20", upstream=["prices"])
def compute_ret_mean_20_v2(prices: pl.DataFrame) -> pl.DataFrame:
    """改成中位数而不是均值"""
    return (
        prices.sort(["symbol", "ts"])
        .with_columns(pl.col("close").pct_change().over("symbol").alias("ret"))
        .with_columns(pl.col("ret").rolling_median(window_size=20).over("symbol").alias("value"))
        .select([
            pl.col("symbol").alias("entity_id"),
            pl.col("ts").alias("valid_time"),
            "value",
        ])
        .drop_nulls()
    )

out2 = compute_ret_mean_20_v2(prices)
meta2 = compute_ret_mean_20_v2.__feature_meta__
print("新版本:", meta2.version, "(与旧版本不同)")
write_partition(meta2.name, meta2.version, out2)

# 5. 旧版本和新版本同时存在,互不覆盖
old = asof_lookup(prices.lazy(), "ret_mean_20", version=meta.version).collect()
new = asof_lookup(prices.lazy(), "ret_mean_20", version=meta2.version).collect()
print("旧 / 新版本均值:", old["value"].mean(), new["value"].mean())

跑出来的结果会显示:

这份 demo 没有覆盖 bitemporal 的”修订”路径——它对每个 valid_time 只写一次。要演示修订,需要在第二次调用 write_partition 之前把 valid_time 不变、value 改一下,再写一次。读出来时按 (valid_time, transaction_time) 双排序,就会看到两个版本并存,PIT 查询能拿到符合时间约束的那一行。


十二、回到一开始的问题:什么时候算”做对了”

回头看本文一开始的那句压缩定义:

量化特征仓库 = bitemporal 数据模型 + 内容寻址的版本化 + as-of join 查询接口 + 训练-服务共享的转换函数。

四件事每一件都不复杂,但只要少做任何一件,回测和实盘的差距就会以隐蔽的方式出现。判断特征仓库是不是”做对了”,最直接的标准不是有没有引入 Feast,也不是有没有部署 ClickHouse,而是回答下面这五个问题:

  1. 任意一个研究员,在任意一个历史时间点 \(T\),能不能复现”在 \(T\) 时这个策略看到的全部特征”?
  2. 上游一份数据被修订时,能不能列出所有受影响的下游特征和回测?
  3. 同一份特征在离线和在线两条路径上,跑出来是不是逐字节一致?
  4. 任何一次回填或重算,能不能不破坏既有研究结果?
  5. 接到风控质疑”这个因子值是不是错的”,能不能在 5 分钟内说清楚这个值的来源、版本、上游、计算时间?

如果五个问题都能回答”能”,特征仓库这件事就算做到位了。落到实盘里能赚多少,那是策略本身的问题,不再是工程的问题。这一篇能够替你解决的,到此为止。

下一篇 《另类数据:卫星、新闻、社交、链上》 会把视角从”已有数据怎么管”切到”新数据怎么进”——卫星图、新闻 NLP、社交情绪、链上数据,每一类都对本文讨论的 bitemporal 模型提出了新问题。


十三、参考资料

论文与书籍

规范与文档

框架与工具

数据源参考


上下篇导航

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-05-01 · quant

【量化交易】量化交易全景:从信号到订单的工程链路

量化交易不是策略写得好就能赚钱,更难的是把数据、特征、因子、信号、组合、执行、风控、复盘这八段链路在工程上连成一条不漏数据、不串时间、不丢订单的流水线。本文是【量化交易】系列的总目录与读图,给出八段链路的输入输出、失败模式、不变量清单,并用研究流程图把从一个想法到一笔实盘订单之间所有该过的卡点串起来。

2026-05-01 · quant

【量化交易】市场结构:交易所、做市商、暗池、ECN

系统梳理全球市场结构(Market Structure)的工程图景:从证券交易所、衍生品交易所、加密交易所,到做市商、暗池、ECN/ATS,再到 Maker-Taker 收费、PFOF、Reg NMS 与 MiFID II 的监管影响;给出量化策略选择交易场所的判断框架与基于 ccxt 的多交易所行情聚合代码。

2026-05-01 · quant

【量化交易】市场微结构:订单簿、价差、流动性、冲击

系统讲解市场微结构的核心概念与可计算工具:限价订单簿的数据模型、报价/有效/已实现价差、Roll 模型、四维流动性度量、Kyle's lambda、订单流不平衡(OFI)、Almgren-Chriss 框架下的临时与永久冲击、PIN 与 VPIN、Hawkes 过程,并给出基于 polars 的 L2 增量处理与系数估计代码。

2026-05-01 · quant

【量化交易】订单类型与执行语义:限价、市价、IOC、FOK、冰山

把 Limit、Market、IOC、FOK、Iceberg、Stop、MOO/MOC 这些常被混为一谈的订单类型还原为价格、数量、时效、可见性、触发五个独立维度,并对照 A 股、港股、美股、CME、Binance 五个市场的实际语义差异,给出量化系统中的订单工厂、状态机与风控前置校验的工程实现。


By .