把回测里的特征拼接做对,是所有”看起来很赚钱”的策略落到实盘之前必须翻过的一道墙。前一篇 《数据陷阱:幸存者偏差、复权、前视、未来函数》 讨论了数据本身的偏差。这一篇要回答更工程化的一个问题:当你已经有了一份”看起来干净”的原始数据,怎么把它组织成能够被研究、回测、实盘三个场景都安全使用的”特征”?
直觉上这是一份”列存数据库选型 + ETL”的工作。但只要把”特征”这个词替换成”模型在时间 \(t\) 上能够依赖的所有信息”,问题就立刻退化成一个时间正确性(point-in-time correctness,下文简称 PIT)约束:模型在 \(t\) 上看到的每一行,必须只包含在物理时间 \(t\) 之前真实可获取的信息。一切血缘、版本、查询接口、存储后端的设计,都是在这个约束上叠加的工程实现。
通用 ML Feature Store(如 Feast、Tecton、Hopsworks)当然也讲 point-in-time correctness,但它们的关注点是”训练样本与服务样本不一致”。量化场景把这个约束加重到了另一个层级:错一秒就是泄漏,错一天就是夏普凭空多出 0.5;并且这种错误在回测里不会触发任何报错,只会让净值曲线变得更光滑。这就是为什么本文不打算把通用 Feature Store 的概念照搬过来,而是从量化研究的实际工作流出发,给出一套可以直接落地的最小工程实现。
本文环境:Python 3.11、polars 1.7、pandas 2.2、duckdb 1.1、pyarrow 17.0。代码段都在该环境下能直接运行。涉及的 Wind / CRSP / Compustat 字段语义来自厂商公开手册。
风险提示:本文不构成任何投资建议。文中的代码用于演示工程结构,未做交易级容错与权限管理;直接用于实盘需要补充行情对接、风控、清算、灾备等部分。回测越”对”越接近实盘,但不等于实盘一定盈利。
一、特征存储要解决什么问题
1.1 为什么不能直接用一张大宽表
最朴素的做法是把研究员需要的所有特征拼成一张以
(date, symbol) 为主键的大宽表,存成 Parquet
或者放到
ClickHouse。这个做法在策略只有一两个、研究员只有两三个、特征只有几十列的时候一点问题都没有。
问题在它增长以后:
- 同一个特征会被不同研究员用不同口径重复实现。
turnover_20d在研究员 A 那里是 20 个交易日的累积换手,在研究员 B 那里变成了 20 个自然日;两份回测都说自己用了”20 日换手”,结果跑出来的 IC 不一样。 - 上游一次修复会引发下游全部回测失效,但没人知道有哪些回测真的依赖了那一列。等到风控周会上发现”上周这只股票的因子值不对”,已经无法回到一个月前的那张表去复现当时的研究结论。
- 离线回测的特征定义和实盘推理的特征定义随着时间慢慢漂移。最常见的形式是:研究阶段用的是
pandas.rolling(20).mean(),实盘上线时被工程师重写成了 C++ 增量算法,两者对停牌日的处理方式不同,结果训练集和服务集之间出现了系统性偏差。
特征存储要解决的,就是这三类问题:复用、版本化、训练-服务一致性。它不是一个新数据库,而是一层契约:所有特征的定义、版本、依赖、可用时间都被显式管理,研究员、回测引擎、实盘系统通过这层契约共享同一份”事实”。
1.2 与一般 ML Feature Store 的区别
把 Feast 的官方文档(feast 0.40 版本)和
Tecton 的设计文档摆在一起,能看到通用 Feature Store
的核心抽象是
Entity + FeatureView + point_in_time_join。这套抽象对推荐、风控的批
+ 在线场景已经够用,但是放到量化里有几个地方会卡:
| 维度 | 通用 ML Feature Store | 量化特征仓库 |
|---|---|---|
| 时间正确性约束 | 服务时刻一致即可 | 任意历史时刻都必须可还原 |
| 时间维度 | 单一 event_timestamp |
bitemporal:valid_time + transaction_time |
| 数据修订(restatement) | 通常忽略 | 必须显式建模(财报修订、复权调整、tick 重传) |
| 拼接语义 | as-of join,可容许小误差 | as-of join,左闭右开严格不等式,按
transaction_time |
| 在线特征 | 高 QPS 单点 lookup | 低 QPS,但要严格匹配离线版本 |
| 特征粒度 | 用户、商品 | 标的 × 时间 × 多频段;面板数据为主 |
| 漂移监控 | 训练 vs 服务的统计分布 | 加上”今天的快照 vs 历史快照”的逐字段一致性 |
通用 Feature Store 默认的 event_timestamp
是单时间维度,这在推荐场景里是合理的——一个 user click
事件只发生过一次,不会被”修订”。量化里几乎所有”基本面”类特征都会被修订:年报会更正季报,复权因子会因为新的分红事件而回溯刷新历史,甚至
tick 数据本身在交易所端就有”撤单订正”。这意味着仅靠
event_timestamp
一个时间维不够,必须显式区分”数据指代的时刻”(valid
time)和”数据真正可被读到的时刻”(transaction
time),才能在任意一个历史时间点 \(t\) 还原出”在 \(t\)
我能看到什么”。这就是下一节要讲的 bitemporal 模型。
1.3 一句话定义
把这套思路压缩成一句话:
量化特征仓库 = bitemporal 数据模型 + 内容寻址的版本化 + as-of join 查询接口 + 训练-服务共享的转换函数。
后面每一节都是在拆这句话的某一部分。
二、点时正确性:valid_time 与 transaction_time
2.1 单时间维度模型为什么不够
考虑 A 股某公司 2024 年一季报:
- 报告期末 2024-03-31。
- 一季报披露 2024-04-25。
- 该公司在 2024-08-15 发布中报时,对一季度的”应收账款”做了追溯调整。
一个研究员在 2024-09-01 站在屏幕前,问:“2024-05-01 那天,这家公司的应收账款是多少?”
如果数据库里只有一个时间维度
report_date = 2024-03-31,并存了一行”调整后”的应收账款值,那么这位研究员看到的就是
2024-08-15 之后才存在的那个值。可在
2024-05-01,市场上能拿到的是 4 月 25
日发布的”调整前”的值。模型如果在回测里用了”调整后”的值去预测
5 月 1 日的股价,那是泄漏未来。
所以”事件时刻”和”数据落库时刻”必须分开存。这就是 bitemporal data model 的最小动机。
2.2 两个时间维度的精确定义
引用 Snodgrass 在 Developing Time-Oriented Database Applications in SQL(1999)里给出的术语:
- valid
time(有效时间):现实世界里这条事实成立的时段。比如”2024
Q1 应收账款 = 1.23 亿”在现实里指代的就是
[2024-03-31, …)这个区间。 - transaction time(事务时间):这条事实被记录到数据库里的时间戳。也就是”数据库知道这件事”的时刻。
两个时间维度组合起来,每条特征记录至少四个字段:
entity_id, feature_name, value, valid_from, valid_to, txn_from, txn_to
为了简化,通常退化成”开端点 + null 表示开区间”:
entity_id, feature_name, value, valid_time, transaction_time
后续行的 (valid_time, transaction_time)
隐含了前一行的失效。这就是 SQL:2011 里 system-versioned
table 的简化版本。
2.3 PIT 查询的标准形式
“在物理时间 \(T\) 站在屏幕前,问 entity \(e\) 的特征 \(f\) 在 valid_time \(V\) 上的值是多少”,标准 SQL 写法:
SELECT value
FROM features
WHERE entity_id = :e
AND feature_name = :f
AND valid_time <= :V
AND transaction_time <= :T
ORDER BY valid_time DESC, transaction_time DESC
LIMIT 1;注意两个不等式都是 <=,并且排序是先
valid_time 后
transaction_time。这两条规则结合起来,刚好对应”我在
\(T\) 能看到的、关于不晚于
\(V\) 的最新事实”。
这条查询是整个特征仓库的命脉。错一个等号就是泄漏:把
<= 写成 <
会少一行边界数据;把 transaction_time <=
漏掉就退化成单时间维度,等于偷看未来。
2.4 一个最小可运行的例子
import polars as pl
from datetime import date
# 同一个事实被报告两次:4-25 公告原始值,8-15 修订后值
df = pl.DataFrame({
"entity_id": ["600000", "600000", "600000"],
"feature": ["receivables_q1", "receivables_q1", "receivables_q2"],
"valid_time": [date(2024, 3, 31), date(2024, 3, 31), date(2024, 6, 30)],
"transaction_time": [date(2024, 4, 25), date(2024, 8, 15), date(2024, 8, 15)],
"value": [1.23, 1.18, 1.41],
})
def pit_lookup(df, entity, feature, valid_at, txn_at):
return (
df.filter(
(pl.col("entity_id") == entity)
& (pl.col("feature") == feature)
& (pl.col("valid_time") <= valid_at)
& (pl.col("transaction_time") <= txn_at)
)
.sort(["valid_time", "transaction_time"])
.tail(1)
)
# 站在 2024-05-01 看 Q1:拿到 1.23
print(pit_lookup(df, "600000", "receivables_q1", date(2024, 5, 1), date(2024, 5, 1)))
# 站在 2024-09-01 看 Q1:拿到修订后 1.18
print(pit_lookup(df, "600000", "receivables_q1", date(2024, 9, 1), date(2024, 9, 1)))两次查询返回不同的值,但是数据本身没有被覆盖——任何历史时刻的”当时视角”都可以重建。这是 bitemporal 模型相对单时间维度模型的根本优势。
上图里有三条特征事件:v1、v2、v3。每条事件有两个时间戳:橙色的 valid_time(特征指代的现实时刻)和紫色的 transaction_time(特征落库时刻)。两个时间戳之间的延迟来自两类原因:上游数据源的发布延迟(基本面、宏观)以及计算管道本身的处理时延(行情聚合、特征算子)。如果回测把”行情时刻 10:00 上看到的最新特征”用 valid_time 去匹配,会把 v1 当作可用,但实际上 v1 直到 10:08 才落库;用 transaction_time 去匹配,10:00 上还看不到 v1,必须等到 10:30 这一条行情才能拼上 v1。这就是图里红色与绿色两个区域的差别。
2.5 不是所有数据都是 bitemporal
不要为了”理论上完整”就把每张表都改成两个时间维度。下面这些数据其实是单时间维度:
- 历史成交记录
(trade_id, ts, price, size):trade 一旦成交不会被修订(除非交易所撤单,那是异常路径)。 - 市场静态信息
(symbol, listing_date):valid_time 等于 transaction_time。 - 模型自身的预测输出:模型预测一旦落地,认为 transaction_time = valid_time。
要 bitemporal 的,是那些”上游会修订、不修订的版本本身有研究价值”的数据:
- 公司财报、分析师一致预期、宏观指标。
- 任何被复权 / 复盘 / 重新估算的衍生指标。
- 任何来自第三方供应商、且供应商会发布”corrections”的数据。
- 自己计算出来的、依赖会被修订的上游字段的派生特征。
判断标准很简单:问一句”如果今天的我重新跑一遍这条数据,得到的结果和当年那条一样吗?“ 如果不一定,就上 bitemporal。
三、特征的时间维度
bitemporal 解决了”事实本身可能被修订”的问题,但没解决”特征本身就是时间窗口聚合”的问题。一个 20 日均线在 2024-04-01 上的值,意味着用了 [2024-03-04, 2024-04-01] 这 20 个交易日的数据。这里又有三个时间概念,混淆任意两个都会引发偏差。
3.1 三个时间概念
- 事件时间(event time):原始数据点本身发生的时间戳。tick 数据里就是逐笔成交时间;分钟 K 线里是 K 线右端的时间戳。
- 特征时间(feature time):特征值”代表”的时刻,通常等于事件时间,也可以是事件时间所在窗口的右端。
- 生效时间(effective time):模型可以使用这个特征值做决策的最早时刻。它等于特征时间加上”从原始事件到特征落库的总延迟”。
举例:用 09:30 到 10:00 的成交计算的 30 分钟波动率:
- event time 是 [09:30, 10:00) 区间内的每一笔。
- feature time 通常取 10:00(区间右端)。
- effective time = 10:00 + 计算延迟。如果计算延迟 50ms,那么 10:00:00.050 之后的行情才能用这个特征。
很多回测框架默认
feature_time == effective_time,这在低频日级回测里不出问题(一天的延迟几十毫秒可以忽略),但在分钟级、tick
级的策略里会引入隐性的前视偏差。
3.2 滚动、累计、衰减的边界写法
定义清楚边界:
| 形式 | 数学定义 | feature_time | 实现要点 |
|---|---|---|---|
| 滚动均值 | \(\mu_t = \frac{1}{w}\sum_{i=t-w+1}^{t} x_i\) | \(t\) | 区间右闭,左端 \(t-w+1\) 用交易日历计算,不用自然日 |
| 累计求和 | \(S_t = \sum_{i \le t} x_i\) | \(t\) | 跨年要明确是否清零 |
| 指数衰减 | \(\mu_t = \alpha x_t + (1-\alpha)\mu_{t-1}\) | \(t\) | 半衰期对应的 \(\alpha\) 要写在元数据里 |
| 前 \(k\) 期值 | \(x_{t-k}\) | \(t\) | \(k\) 是交易日数还是自然日数要约定 |
| 跨期变化 | \(\Delta_k x_t = x_t - x_{t-k}\) | \(t\) | 跟前 \(k\) 期值同样要约定日历 |
一个非常常见的错误是在面板数据上用 pandas 的
rolling("20D"):它走的是自然日,会把停牌日当作有数据来对待。换到交易日历必须先把面板按交易日对齐成稠密矩阵,再用
rolling(20)。
3.3 与 PIT 的关系
把 bitemporal 和窗口聚合放到一起:
- 窗口聚合的
feature_time用的是窗口内最晚一个事件的valid_time。 - 窗口聚合的
transaction_time用的是窗口内最晚一个事件的transaction_time加上计算延迟。
这条规则很关键。考虑用基本面”应收账款”算过去 4 个季度的均值:
- 4 个季度里,最晚一个的 valid_time 是 Q4 的报告期末。
- 最晚一个的 transaction_time 是 Q4 财报披露日(甚至更晚,如果 Q4 之后还有修订)。
所以这个均值特征的 transaction_time
不是”算这一行的当天”,而是”窗口内最晚一份数据真正落库的那一天”。如果你把
transaction_time 简单地等同于”特征生成 cron
跑的那一天”,回测里就会出现 Q4 财报还没披露但你已经在用 Q4
数据算均值——这是一个非常隐蔽的前视偏差。
四、存储后端选型
存储后端的选择不是孤立的技术品味,而是受三个因素约束:数据量、读写比、查询模式。下面这张表把量化场景常见的几种后端按这三个维度做了对比。
| 后端 | 容量极限(单机) | 写吞吐 | as-of join 友好度 | 列存 | 工程复杂度 |
|---|---|---|---|---|---|
| Parquet + Arrow + DuckDB | 10 TB+(依赖磁盘) | 中(批写) | 高(DuckDB 原生 ASOF JOIN) | 是 | 低 |
| ClickHouse | 100 TB+ | 高 | 中(需要 array join 技巧) | 是 | 中 |
| QuestDB | 数 TB | 极高(亿级 tick/秒) | 高(原生 ASOF JOIN) | 是 | 中 |
| TimescaleDB | 数 TB | 中 | 中(hypertable + LATERAL) | 否(行存为主) | 中 |
| Polars in-memory | 受内存限制 | 高 | 高(asof_join) | 是 | 极低 |
| Redis / KV | 受内存限制 | 极高 | 不适合(无 join) | 否 | 低 |
4.1 Parquet + Arrow + DuckDB:研究和回测的默认组合
研究阶段不需要 OLAP 集群。一台带 NVMe 的工作站 + 几张 Parquet 文件 + DuckDB 已经能覆盖几年的中国 A 股 minute bar、几年的美股 tick top-of-book,而且单机部署、无运维。
按照”特征 × 月份”做物理分区:
features/
factor=mom_20/
year_month=202401/part-0.parquet
year_month=202402/part-0.parquet
factor=turnover_20/
year_month=202401/part-0.parquet
DuckDB 直接
SELECT * FROM 'features/**/*.parquet',靠 Hive
partitioning 做分区裁剪。这种布局的好处:
- 每次新增/修订一个特征,只动它自己的目录,不影响别人。
- 老化策略可以按目录做:“去年的 mom_20 我不再修订了,归档到对象存储”。
- 单文件不大(每个分区几十 MB 到几百 MB),能吃满列存的压缩与谓词下推。
DuckDB 1.0 之后原生支持
ASOF JOIN,这一项让它从”OLAP
玩具”变成了量化研究的事实标准之一:
SELECT
p.ts AS price_ts, p.symbol, p.last_price, f.value AS pe_ttm
FROM prices p
ASOF LEFT JOIN features f
ON p.symbol = f.entity_id
AND p.ts >= f.transaction_time
WHERE f.feature = 'pe_ttm';这条 SQL 完整表达了”对每一行行情,找到 transaction_time 不晚于行情时间的最新一条 PE_TTM”。语义干净到不需要在 Python 层再写一遍。
4.2 ClickHouse:策略上线之后的中间存储
研究跑通进入实盘之后,特征量级从”几亿行”长到”几百亿行”,研究员需要的不是新查询能力而是”在可接受的时间内查任意一段历史”。这时候 DuckDB 单机吃不住,进 ClickHouse 是常见选择。
ClickHouse 的 ReplacingMergeTree 配合
(entity_id, valid_time, transaction_time)
主键,可以把 bitemporal 写得很自然:
CREATE TABLE features (
entity_id String,
feature String,
valid_time DateTime64(6),
transaction_time DateTime64(6),
value Float64,
version String
) ENGINE = ReplacingMergeTree(transaction_time)
ORDER BY (entity_id, feature, valid_time, transaction_time)
PARTITION BY (feature, toYYYYMM(valid_time));但 ClickHouse 没有原生 ASOF JOIN(截至 24.x 版本提供了
ASOF JOIN 关键字,但限制比 DuckDB
多:仅支持等值条件 +
一个不等值条件,且不能跨表多列)。在量化场景里,ASOF JOIN
通常需要 entity_id 等值 +
transaction_time 不等值,刚好踩在 ClickHouse
的支持边界上,能用,但写复杂时要小心。
4.3 QuestDB / TimescaleDB:tick 级时序
如果要存原始 tick(而不是衍生特征),QuestDB 在 tick
入库速度上是经过实测的领先者。它的 ASOF JOIN
语法和 DuckDB 几乎一致,可以无缝迁移研究代码。TimescaleDB 是
PostgreSQL 扩展,行存为主,对 tick 数据的压缩比不如 QuestDB
/ ClickHouse,但好处是研究员在公司只熟悉 PostgreSQL
的时候省学习成本。
工程上的判断:tick 入库走 QuestDB / ClickHouse,特征仓库走 Parquet + DuckDB(研究)+ ClickHouse(生产),是 2024 年中国量化机构里最常见的分层。
4.4 内存形式:Polars 与 Arrow
Polars 不是存储后端,但是把 Parquet 文件读进来后做 PIT
拼接最自然的形式。Polars 1.x 的 join_asof
接口与 DuckDB 的 ASOF JOIN
在语义上一致,但运行在单进程内,不用过 SQL parser。
import polars as pl
prices = pl.scan_parquet("prices/**/*.parquet")
features = pl.scan_parquet("features/factor=pe_ttm/**/*.parquet")
joined = (
prices.sort(["symbol", "ts"])
.join_asof(
features.sort(["entity_id", "transaction_time"]),
left_on="ts",
right_on="transaction_time",
by_left="symbol",
by_right="entity_id",
strategy="backward",
)
.collect(streaming=True)
)strategy="backward"
意味着对每个左表行,找右表里
transaction_time <= ts 的最大那一行。Polars
在内部把右表按 entity_id
分桶后做有序合并,复杂度近似 \(O(n+m)\),比 SQL 引擎的 hash
join 在这个语义下更高效。
五、版本化与血缘
5.1 为什么需要”特征版本”
研究员把 volatility_20d 的实现从”过去 20
个交易日 close-to-close 收益的标准差”改成了”过去 20 个交易日
high-low 范围的 Parkinson
估计”。同一个名字,不同口径。如果不做版本化,下面这些事情会发生:
- 旧策略的回测复现不出来。每一次代码 git checkout 回去,新数据已经覆盖了旧值。
- 新策略和旧策略并行回测,看到的是同一个表名,但实际是不同口径,对比毫无意义。
- 风控做 attribution 时无法回溯当年那个版本。
工程上的对策是把”特征定义”理解成内容寻址(content-addressable)的对象:每一份代码 + 参数 + 上游数据快照,hash 出一个唯一 ID,存到独立分区。
import hashlib, inspect, json
def feature_version(func, params, upstream_hashes):
src = inspect.getsource(func)
payload = json.dumps({
"src": src,
"params": params,
"upstream": sorted(upstream_hashes),
}, sort_keys=True).encode()
return hashlib.sha256(payload).hexdigest()[:12]落到目录结构上:
features/
factor=volatility_20d/
version=a1b2c3d4e5f6/ # close-to-close
year_month=202401/part-0.parquet
version=f6e5d4c3b2a1/ # Parkinson
year_month=202401/part-0.parquet
研究员通过 factor + version
显式声明依赖。任意时刻”最新版”由一张 mapping 表维护,但
mapping 不参与物理覆盖。
5.2 上游变更如何传播
特征 A 依赖特征 B 依赖原始数据 C。当 C 在 transaction_time T 上发生修订(例如分红除权造成复权因子刷新),A 和 B 必须在某个 transaction_time T’ >= T 上产生新的版本,并把新版本的物理记录写入新行——而不是覆盖旧行。
正确的做法:
- 数据治理层在 C 的修订事件上发布一条变更通知。
- 调度系统按依赖图找出所有受影响的下游特征。
- 对每个下游特征,在其 transaction_time 维度上追加新行;旧行保持不动。
- 物化视图(如果有)按依赖关系级联失效并重算。
这套机制在 DVC、LakeFS、Pachyderm 这些工具里都有不同程度的实现。如果不想引入额外依赖,最小可工作的做法是:
- 把每条物理写入操作记录到一张审计表(
feature_writes),字段包括feature, version, valid_range, transaction_time, upstream_versions, code_hash。 - 调度器读这张表来决定是否触发回填。
- 任意一次回测在保存结果时把”用到的所有 (feature, version)“作为元数据一起保存。
5.3 特征哈希:不只是版本号
哈希除了用作版本,还能解决一个细微但麻烦的问题:研究员在两台不同机器上各自重跑同一份特征代码,对同一段输入是不是真的产生完全相同的输出?
如果哈希是基于代码 + 上游 + 参数计算的,那么两份输出应该字节一致。把每个分区文件的 SHA256 作为分区元数据存一份,跨机器对比就能立刻发现”代码看起来一样但浮点结果不一样”这类问题——通常是 BLAS 实现差异、并行 reduce 顺序差异、或者依赖库小版本差异引起的。
这个一致性检查在策略上线前必须做一次。否则线上推理用 MKL,研究员本地用 OpenBLAS,两份特征值在第 6 位小数上有偏差,模型在边界样本上的预测就会翻号。
六、查询接口:as_of_join 的几种写法
把 PIT 查询封装成可以直接调用的接口,研究员就不会自己写错。下面给出三种实现:纯 Polars、纯 DuckDB、以及把两者混在一起的 hybrid 版本。
6.1 Polars
join_asof:研究阶段首选
import polars as pl
def as_of_join(left: pl.LazyFrame, right: pl.LazyFrame,
left_time: str, right_time: str,
by_left: str, by_right: str) -> pl.LazyFrame:
"""对 left 表的每一行,按 by 列分组,从 right 表里取 right_time <= left_time 的最大那一行。"""
return (
left.sort([by_left, left_time])
.join_asof(
right.sort([by_right, right_time]),
left_on=left_time,
right_on=right_time,
by_left=by_left,
by_right=by_right,
strategy="backward",
)
)把它放到 utility 模块里,研究员的代码就只剩”声明左右表 +
调一次”。strategy="backward"
不要给研究员选项——量化场景下”forward”几乎一定是 bug。
6.2 DuckDB
ASOF JOIN:上规模数据
当面板数据超过几亿行、polars 单进程吃不下,DuckDB 的 ASOF JOIN 是最方便的选择。它能直接吃 Parquet 路径,不需要先加载到内存:
import duckdb
con = duckdb.connect()
con.execute("""
CREATE OR REPLACE VIEW prices AS SELECT * FROM 'data/prices/**/*.parquet';
CREATE OR REPLACE VIEW pe_ttm AS
SELECT * FROM 'data/features/factor=pe_ttm/version=a1b2c3d4e5f6/**/*.parquet';
""")
result = con.execute("""
SELECT p.ts, p.symbol, p.close, f.value AS pe_ttm
FROM prices p
ASOF LEFT JOIN pe_ttm f
ON p.symbol = f.entity_id
AND p.ts >= f.transaction_time
WHERE p.ts BETWEEN '2024-01-02' AND '2024-06-30';
""").pl() # 直接拿到 polars DataFrameDuckDB 的 ASOF JOIN 在语义上等价于 Polars 的
strategy="backward",但执行计划由 DuckDB
优化器决定,能用上分区裁剪与谓词下推。在 2024 年的工作站(32
核、128GB、NVMe)上,对 5 年中国 A 股 minute bar × 50
个特征做 ASOF JOIN,端到端在两位数秒级。
6.3 一致性测试
无论哪种实现,都必须有一份”金标准”测试集来验证:
def test_asof_join_correctness():
# 构造两条特征:在 t=10 落库,valid=t-5
features = pl.DataFrame({
"entity_id": ["A", "A"],
"transaction_time": [10, 20],
"valid_time": [5, 15],
"value": [1.0, 2.0],
})
prices = pl.DataFrame({
"symbol": ["A", "A", "A", "A"],
"ts": [9, 10, 19, 20],
})
out = as_of_join(prices.lazy(), features.lazy(),
"ts", "transaction_time", "symbol", "entity_id").collect()
# ts=9 时还看不到任何特征
# ts=10 时看到 value=1.0
# ts=19 时看到 value=1.0
# ts=20 时看到 value=2.0
assert out["value"].to_list() == [None, 1.0, 1.0, 2.0]这个测试很短,但它把 PIT 拼接的边界条件钉死了。每次升级 polars / duckdb / pyarrow 都要重跑一遍。
6.4 一个常见的踩坑:等号边界
ts >= transaction_time 还是
ts > transaction_time?两者差一行。
工程判断(不是规范规定):在 transaction_time
上用 >=,在 valid_time 上用
<=,并把”特征 transaction_time 上加一个
ε(计算延迟)“作为统一约定。 这样能保证:
- 哪怕特征在 ts 这一刻整点写完了,模型也不会”瞬时”用到它。
- 真实生产里特征落库到推理 lookup 的 RTT 至少是几个毫秒。
把这个约定写进 utility 函数,永远不要让研究员手写不等号。
七、在线/离线一致:消除 training-serving skew
7.1 skew 的常见来源
把训练集的特征和服务时(在线推理时)的特征放在一起做差分,会发现差异通常来自下面几类:
| 来源 | 典型表现 | 检测方式 |
|---|---|---|
| 时间窗 | 训练用 20 个交易日,服务用 20 个自然日 | 算同一日期的特征对比 |
| 边界条件 | 停牌日、新股、退市日处理不同 | 在边界标的上专门校验 |
| 数值精度 | float32 vs float64、累加顺序 | 用 SHA256 比对二进制 |
| 缺失填充 | 训练用 ffill,服务用 0 | 抽查 NaN 多的列 |
| 时区与日历 | UTC vs 本地、节假日列表不同 | 跨年那一周对比 |
| 上游版本 | 训练用 v1,服务漏升级到 v2 | 写入特征时记录依赖版本 |
7.2 用同一份代码
最干净的解法是离线和在线共享同一份 transform 函数。具体做法:
- 把每个特征定义成一个纯函数
def f(state, event) -> (new_state, value)。 - 离线侧用 batch 引擎(polars / duckdb)对历史事件序列扫一遍这个函数。
- 在线侧用流式引擎(自己写的 actor 或 Flink)对实时事件序列扫一遍同一个函数。
这样无论是训练还是服务,特征值的产生路径完全一样,边界条件、缺失处理、累加顺序都强制对齐。
from dataclasses import dataclass
@dataclass
class RollingMeanState:
buf: list
window: int
def rolling_mean(state: RollingMeanState, event):
state.buf.append(event["value"])
if len(state.buf) > state.window:
state.buf.pop(0)
return state, sum(state.buf) / len(state.buf)离线就是
reduce(rolling_mean, events, initial_state),在线就是把它包成
actor handler。两者跑出来的中间状态、最终输出,逐 tick
对得上。
7.3 Shadow trading
即便代码统一,部署环境差异也会带来 skew。最稳的做法是上线后做一段时间的 shadow trading:
- 在线系统对每一笔实时行情产生信号、不下单(或者下到模拟通道)。
- 同步把”在线计算的特征值”回写到离线 store 的一个独立分区。
- 每天 EOD 对比”在线特征”和”离线 batch 重跑的特征”,逐字段对差。
- 差值超过阈值(例如相对误差 1e-6)触发报警。
shadow trading 时间长度按策略性质决定:对低频日线策略 5 个交易日就够了,tick 级 HFT 通常要跑 2-4 周才能把节假日、涨跌停、临停等异常路径都覆盖到。
上图的关键不是”两条管道”而是”一份逻辑表达”。transform.py
这个盒子在离线和在线两条路径上各出现一次,但它们引用的是同一份代码、同一个
version hash。离线 store 是 bitemporal 的、可回放的;在线
store 只保留每个实体最新一条,是离线 store 的”投影”。shadow
trading 把在线的输出回灌到离线 store 的另一个分区,使得 EOD
对差成为一次普通的列比较查询,不需要专门的对账系统。
八、生产工程:监控、回填、与回测引擎解耦
8.1 特征仓库不是回测引擎的内嵌组件
很多团队第一版的实现里,回测引擎和特征仓库写在一起。这在团队还小的时候没问题,等到第二条策略上线、第二个研究员加入、要做不同频率的回测时,问题立刻出现:每个研究员都在改回测引擎的取数代码,特征定义到处复制。
解耦的边界很简单:
- 特征仓库只负责”在 transaction_time T
上能读到什么”。它对外暴露的接口就是
(entity_id, feature, valid_at, txn_at) -> value,再加上批量化的 ASOF JOIN。 - 回测引擎只负责”按时间推进,查 feature,过策略,下单,成交”。它对特征的查询统一过特征仓库的接口,不直接读 Parquet。
- 实盘系统同理。它和回测共享同一个查询接口,只是
txn_at等于now()。
这个边界一旦立起来,回测和实盘的差异只剩”时钟来源”。这是最小可行的解耦方式。
8.2 监控的几条主线
特征仓库的可观测性,至少覆盖这几条:
- 新鲜度(freshness):每个特征当前最新的 transaction_time 距离现在多久。低于约定阈值(例如基本面类 24h、行情类 1 分钟)报警。
- 行数/分布漂移:每天每分区的行数、关键统计量(mean/std/p95/null_ratio)写入监控 DB,与历史 7 天均值对比,超 3 倍标准差报警。
- 版本漂移:依赖版本 mapping 与实际查询使用的版本是否一致。
- 离线-在线一致:shadow trading 的逐字段差值。
- 审计完整性:
feature_writes表的写入是否单调,是否有 transaction_time 倒退。
8.3 回填(backfill)的正确做法
回填是这件事的关键风险点。研究员发现一个 bug,要”重跑过去三年的某个特征”。错误做法:直接覆盖物理记录。正确做法:
- 给重跑出的数据分配一个新的 version。
- 写入新版本的目录,旧版本保留。
- 修改 mapping 表把”最新版”指到新 version。
- 已经依赖旧 version 的策略不受影响(除非主动升级)。
这条做法最大的代价是磁盘——每次回填都要存一份完整副本。但相对于”覆盖后无法复现旧回测”的代价,磁盘是最便宜的那部分。可以用对象存储 + 生命周期策略把超过 N 个月的旧版本归档到冷存储。
8.4 重算(recompute)vs 回填
两者经常被混用,有必要区分:
- 回填:因为某条特征本来就缺,或者错误地缺了一段时间,把这段时间补上。
- 重算:因为代码或上游变了,对已有数据全部重算一遍。
回填只增加 transaction_time 上的新行,valid_time 维度上是”新覆盖了之前没数据的那段”。重算等于发布新版本,valid_time 维度全段覆盖,但落到的是新 version 分区。
绝大多数研究员把这两件事混为”重跑数据”,工程上必须明确区分。一条规则:任何对历史 valid_time 上”已有值”的修改,都必须走新 version;只有对历史 valid_time 上”原本就缺”的补齐,可以走原 version 的回填。
8.5 与回测引擎的接口稳定性
特征仓库对回测引擎的接口要尽量窄:
class FeatureStore:
def get_at(self, entity_id, feature, valid_at, txn_at, version=None) -> float | None: ...
def asof_join(self, left_df, feature, by_left, time_left, version=None) -> DataFrame: ...
def list_versions(self, feature) -> list[VersionMeta]: ...接口越窄,研究员越不容易绕过约束。任何看起来像”我先把整张表读出来再 self join”的需求,都应该在 review 时拒掉,让研究员说出他真正想要的”在某个时间点上的某个值”。
8.6 一份上线前的检查表
把前面讨论过的工程细节压缩成一份 Checklist:
九、工程判断与个人观点
9.1 何时不该上 Feature Store
不是每个团队都需要一个独立的特征仓库。下面的情况,自己手写两层 Parquet + 一份 utility 函数就够了:
- 全公司只有一两个研究员、一两条策略。
- 数据量不超过几十 GB。
- 没有”基本面会被修订”这类 bitemporal 需求,纯行情衍生因子。
引入 Feast 这类框架,前期工程成本在两到四周;如果没有上面三条触发因素,这段时间花在写策略上回报更高。我在实际工程里见到过几次”先上 Feast 再说”的决策,最后变成研究员跟框架的抽象作斗争,反而把研究节奏拖慢。
9.2 通用 Feature Store 框架在量化里的定位
Feast、Tecton、Hopsworks 这几个框架,对推荐风控类场景做得比较干净,但是放到量化里有几个不太顺手的地方:
- 默认的
event_timestamp单时间维度模型,把 transaction_time 当作隐式的 “ingestion time”,没有独立暴露给查询。bitemporal 必须自己在 schema 上扩展。 - 物化视图与缓存语义偏向”高 QPS 在线 lookup”,对量化的”批 ASOF JOIN”优化不到位。
- 离线 store 对 polars / duckdb 的支持不如对 Spark / BigQuery 完善——量化研究员的工具链通常正好在前者。
我认为更务实的路径是:自建一层薄抽象,把 PIT 语义、版本化、ASOF JOIN 三件事钉死,存储后端用 Parquet + DuckDB / ClickHouse,不要为了”用上 Feature Store”而引入 Feast。 当团队规模到了 10 个研究员、5 条以上策略并行、几百个特征互相依赖的时候,再评估是否要切到 Feast / Tecton。
9.3 bitemporal 的工程代价
bitemporal 不是免费的。每一行多两个时间字段,存储膨胀大约 10%-20%(这对 Parquet 列存影响有限)。但真正的代价在心智:
- 研究员要时刻意识到”两个时间不一样”。
- 任何聚合都要决定”按哪个时间聚合”。
- 写新特征时要主动声明 transaction_time 怎么来。
在我看来,这是必要的代价。任何想”先简单一点,单时间维度跑起来再说”的尝试,最终都会以”上线后发现数据漂移、回查不到当时状态”告终。bitemporal 是一次性投入的复杂度,不是反复出现的隐性 bug。这个判断和 Snodgrass 在他书里 1999 年给出的几乎完全一样——只不过 1999 年的关注点是审计和合规,2024 年的关注点是回测可复现。
9.4 一个被低估的细节:交易日历
绝大多数 PIT 错误最后查出来的根因是”日历对错了”。日历错有几种:
- 用了自然日而不是交易日。
- 跨市场(A 股 + 港股 + 美股)用了同一份日历。
- 节假日列表过期。
- 半日市(圣诞前夜、A 股春节前夜)没特殊处理。
- 特殊停牌(如 2023 年 9 月 18 日 A 股部分券商系统故障)没标记。
把交易日历本身做成一个 bitemporal 的特征:valid_time 是某个日期,value 是”这一天是不是交易日 / 是否半日市”,transaction_time 是日历版本发布时间。这样跨年回测时可以用”当时的日历版本”,避免今天补丁了一个历史日历又把过去的回测打乱的尴尬。
9.5 关于”实时特征”的边界
很多团队上来就想做”100ms 级实时特征”,但仔细问一下需求,发现真正用得上的实时特征不超过五个(盘口压力、最近 N 笔大单、累计成交方向)。剩下的所谓实时特征,本质是”低延迟的批处理”。
工程判断:先把 1 秒级 / 1 分钟级的特征做对、做齐、做版本化,再上 100ms 以下的真实时特征。 真实时特征对架构的要求显著提升(共享内存、无锁队列、状态持久化),用错时机会拖慢整体迭代速度。
十、把上面这些拼到一起:一个最小可运行的特征仓库
下面的代码不是产品级实现,但足够把前面所有约束钉死。它演示:
- bitemporal 模型;
- 内容寻址版本;
- ASOF JOIN 接口;
- 离线/在线共享 transform;
- 写入审计。
# file: mini_fs.py
from __future__ import annotations
import hashlib, inspect, json, os
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import polars as pl
ROOT = Path("./fs")
@dataclass(frozen=True)
class FeatureMeta:
name: str
version: str
upstream: tuple[str, ...]
code_hash: str
created_at: str
def _hash_obj(obj) -> str:
return hashlib.sha256(
json.dumps(obj, sort_keys=True, default=str).encode()
).hexdigest()[:12]
def define_feature(name: str, upstream: list[str]):
def deco(fn):
code = inspect.getsource(fn)
code_hash = _hash_obj(code)
version = _hash_obj({"code": code_hash, "upstream": sorted(upstream)})
meta = FeatureMeta(name, version, tuple(sorted(upstream)),
code_hash, datetime.utcnow().isoformat())
fn.__feature_meta__ = meta
return fn
return deco
def write_partition(name: str, version: str, df: pl.DataFrame):
"""对每条记录附 transaction_time,并写入分区目录;从不覆盖。"""
txn = datetime.utcnow()
df = df.with_columns(pl.lit(txn).alias("transaction_time"))
out_dir = ROOT / f"factor={name}" / f"version={version}" \
/ f"year_month={txn:%Y%m}"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"part-{txn:%Y%m%d%H%M%S}.parquet"
df.write_parquet(out_path)
_audit_log(name, version, txn, len(df), str(out_path))
return out_path
def _audit_log(name, version, txn, n_rows, path):
log_path = ROOT / "_audit.jsonl"
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "a") as f:
f.write(json.dumps({
"ts": txn.isoformat(),
"feature": name, "version": version,
"rows": n_rows, "path": path,
}) + "\n")
def read_feature(name: str, version: str | None = None) -> pl.LazyFrame:
base = ROOT / f"factor={name}"
if version is None:
# 取目录里最新一个版本
version = sorted(p.name.split("=")[1] for p in base.iterdir())[-1]
pattern = str(base / f"version={version}" / "**" / "*.parquet")
return pl.scan_parquet(pattern)
def asof_lookup(prices: pl.LazyFrame, feature_name: str,
left_time: str = "ts", left_by: str = "symbol",
version: str | None = None) -> pl.LazyFrame:
feat = read_feature(feature_name, version)
return (
prices.sort([left_by, left_time])
.join_asof(
feat.sort(["entity_id", "transaction_time"]),
left_on=left_time, right_on="transaction_time",
by_left=left_by, by_right="entity_id",
strategy="backward",
)
)
# --- 一个真实例子:20 日 close-to-close 收益的滚动均值 ---
@define_feature(name="ret_mean_20", upstream=["prices"])
def compute_ret_mean_20(prices: pl.DataFrame) -> pl.DataFrame:
return (
prices.sort(["symbol", "ts"])
.with_columns(
pl.col("close").pct_change().over("symbol").alias("ret"),
)
.with_columns(
pl.col("ret").rolling_mean(window_size=20).over("symbol").alias("value"),
)
.select([
pl.col("symbol").alias("entity_id"),
pl.col("ts").alias("valid_time"),
"value",
])
.drop_nulls()
)
if __name__ == "__main__":
# 假设外部加载了 prices DataFrame
prices = pl.read_parquet("data/prices.parquet")
out = compute_ret_mean_20(prices)
meta = compute_ret_mean_20.__feature_meta__
write_partition(meta.name, meta.version, out)这份代码省略了错误处理、并发写入、对象存储后端、依赖版本声明等工业细节,但它已经把 PIT 的关键约束都嵌进去了:
transaction_time由写入侧自己注入,研究员代码里没有写权限。- 版本由代码 + 上游 hash 决定,不能手填。
- 写入永远是”新建文件”,不能覆盖。
- ASOF JOIN 走
transaction_time不是valid_time。 - 审计日志单调追加。
把它扩展到生产,主要是把 ROOT 从本地路径换成
S3 / HDFS、加上 ClickHouse
索引层、补充并发与重试。但底层的语义模型不需要改。
十、附录 A:bitemporal 之外的几个边界问题
把 bitemporal、版本化、ASOF JOIN 三件事钉死之后,系统里还有几类问题不会被它们自动消除。这些问题在前面的章节里都有零散提及,这里集中拆开讲。
A.1 复权因子的二次注入
复权因子(adjustment factor)本身就是一个被反复修订的衍生量。常见的踩坑顺序是:
- 行情存了”未复权”的成交价。
- 复权因子表存了
(symbol, valid_time, factor),每次新增分红/送股事件时追加一行。 - 研究员算因子时用
close * factor得到”前复权”价格。
问题在第 3
步。研究员要的”前复权”,是站在某个截止日往回看,把这一天的价格设为基准;但复权因子表里的
factor
通常是站在”今天”的视角算的,把今天设为基准。两个基准点不一样,结果就错。
正确的做法:对每条原始行情,在 ASOF JOIN 复权因子时,把复权因子 = 1 / (今天的因子 / 历史那一天的因子) 计算成”截止日为查询时刻 T”的相对因子。这要求复权因子也走 bitemporal 路径——不同 transaction_time 上”今天的因子”不一样。
在工程上更稳妥的做法是把”前复权价”做成一个特征,而不是查询时计算。这样 PIT 拼接逻辑只调一次 ASOF JOIN,就拿到当时视角下的前复权价。
A.2 多频段聚合的对齐口径
研究员经常需要”用日线特征预测分钟级方向”,或者”用分钟级特征预测日线方向”。两边的 transaction_time 节奏不一样,对齐时容易踩坑。
低频特征拼到高频上的规则相对干净:用 ASOF JOIN,每条高频样本拿到不晚于自己 transaction_time 的最新低频值。
高频特征聚合到低频的规则更绕。常见错法:把当天所有分钟特征的均值赋给”当日”,但当日 EOD 收盘前的最后一分钟也被纳入了均值。如果模型的预测目标是”次日开盘方向”,这个均值只能用收盘后的 transaction_time,不能用收盘前。否则模型会”在 14:55 用了 14:58 还没发生的均值”。
工程实现:聚合特征的 transaction_time 取
max(window 内所有原始事件的 transaction_time) + ε,ε
是计算延迟,通常给 1
秒就够。然后把它写进特征仓库,研究员通过常规 ASOF JOIN
拿。
A.3 缺失值的语义
bitemporal 里的”缺失”有两种:
- 真缺失:valid_time 上确实没数据(标的还没上市、停牌期)。
- 假缺失:valid_time 上有数据,但 transaction_time 上还没拿到。
ASOF JOIN 的默认行为对两者都返回 NULL,但研究员处理时通常要区分:真缺失要 ffill 或者跳过;假缺失要等下一次 transaction_time 到来。
把这两种情况显式编码成两列:value +
availability,availability 取值
available / pending / not_listed / suspended,比单纯一个
NULL 携带的信息多得多。模型层在处理时也能用上:对 pending
状态的特征用最近一次的可用值,对 not_listed 跳过样本。
A.4 时间戳精度
行情时间戳的精度从秒到纳秒都有人在用。要在特征仓库里统一一种:
- 全表全特征统一使用
DateTime64(6)(微秒精度)或DateTime64(9)(纳秒精度)。 - 不允许混用秒级和毫秒级——ASOF JOIN 比较时类型隐式转换会引发 off-by-one。
- 涉及时区的字段统一用 UTC 存,在显示层做时区转换。
A 股 minute bar 默认使用上海时区(UTC+8),美股使用 America/New_York(含夏令时)。两个市场的特征落到同一张表时,存 UTC 是唯一不出错的做法。研究员看的时候再转回本地时区。
A.5 字符串实体 ID 的稳定性
entity_id
看起来是字符串就完事了,但实际上有几个坑:
- A 股代码会因为”借壳上市”被复用。例如某个代码在 2015 年代表 A 公司,2020 年被另一家壳上市公司接盘。两段历史属于不同实体。
- ETF / 基金代码可能在合并、改名后保留。
- 美股 ticker 几乎一定会被复用(公司退市后过一段时间,ticker 释放给新公司)。
工业做法:内部用 permanent_id(PERMNO、Wind
的 s_info_windcode、CRSP 的 PERMNO)作为
entity_id,外部展示时再映射回 ticker。这样跨年研究就不会因为
ticker 复用而错配。CRSP 的 PERMNO
是这件事的事实标准之一。
十一、附录 B:一份可运行的端到端示例
把第十节的代码放到一起,再补上 prices 数据、回测查询,构成一个端到端能跑的演示。环境:Python 3.11、polars 1.7、numpy 1.26。
# file: demo_pipeline.py
import numpy as np, polars as pl
from datetime import datetime, timedelta
from pathlib import Path
import shutil
from mini_fs import (define_feature, write_partition,
asof_lookup, read_feature, ROOT)
# 清理上一次运行
if ROOT.exists(): shutil.rmtree(ROOT)
# 1. 造一份合成行情:5 只标的,60 天日线
np.random.seed(42)
symbols = [f"S{i:03d}" for i in range(5)]
dates = [datetime(2024, 1, 1) + timedelta(days=i) for i in range(60)]
rows = []
for s in symbols:
p = 100.0
for d in dates:
p *= (1 + np.random.normal(0, 0.01))
rows.append({"symbol": s, "ts": d, "close": p})
prices = pl.DataFrame(rows)
# 2. 计算特征:20 日均收益
@define_feature(name="ret_mean_20", upstream=["prices"])
def compute_ret_mean_20(prices: pl.DataFrame) -> pl.DataFrame:
return (
prices.sort(["symbol", "ts"])
.with_columns(pl.col("close").pct_change().over("symbol").alias("ret"))
.with_columns(pl.col("ret").rolling_mean(window_size=20).over("symbol").alias("value"))
.select([
pl.col("symbol").alias("entity_id"),
pl.col("ts").alias("valid_time"),
"value",
])
.drop_nulls()
)
out = compute_ret_mean_20(prices)
meta = compute_ret_mean_20.__feature_meta__
print("写入版本:", meta.version)
write_partition(meta.name, meta.version, out)
# 3. ASOF JOIN:把特征拼回到行情
joined = asof_lookup(prices.lazy(), "ret_mean_20").collect()
print(joined.tail(5))
# 4. 演示版本切换:换一个口径
@define_feature(name="ret_mean_20", upstream=["prices"])
def compute_ret_mean_20_v2(prices: pl.DataFrame) -> pl.DataFrame:
"""改成中位数而不是均值"""
return (
prices.sort(["symbol", "ts"])
.with_columns(pl.col("close").pct_change().over("symbol").alias("ret"))
.with_columns(pl.col("ret").rolling_median(window_size=20).over("symbol").alias("value"))
.select([
pl.col("symbol").alias("entity_id"),
pl.col("ts").alias("valid_time"),
"value",
])
.drop_nulls()
)
out2 = compute_ret_mean_20_v2(prices)
meta2 = compute_ret_mean_20_v2.__feature_meta__
print("新版本:", meta2.version, "(与旧版本不同)")
write_partition(meta2.name, meta2.version, out2)
# 5. 旧版本和新版本同时存在,互不覆盖
old = asof_lookup(prices.lazy(), "ret_mean_20", version=meta.version).collect()
new = asof_lookup(prices.lazy(), "ret_mean_20", version=meta2.version).collect()
print("旧 / 新版本均值:", old["value"].mean(), new["value"].mean())跑出来的结果会显示:
meta.version与meta2.version不同(因为代码 hash 变了)。- 两个版本的物理文件分别存在不同目录。
- 新旧版本可以独立查询,互不覆盖。
- 审计日志
_audit.jsonl里两次写入都有完整记录。
这份 demo 没有覆盖 bitemporal 的”修订”路径——它对每个
valid_time 只写一次。要演示修订,需要在第二次调用
write_partition 之前把 valid_time
不变、value 改一下,再写一次。读出来时按
(valid_time, transaction_time)
双排序,就会看到两个版本并存,PIT
查询能拿到符合时间约束的那一行。
十二、回到一开始的问题:什么时候算”做对了”
回头看本文一开始的那句压缩定义:
量化特征仓库 = bitemporal 数据模型 + 内容寻址的版本化 + as-of join 查询接口 + 训练-服务共享的转换函数。
四件事每一件都不复杂,但只要少做任何一件,回测和实盘的差距就会以隐蔽的方式出现。判断特征仓库是不是”做对了”,最直接的标准不是有没有引入 Feast,也不是有没有部署 ClickHouse,而是回答下面这五个问题:
- 任意一个研究员,在任意一个历史时间点 \(T\),能不能复现”在 \(T\) 时这个策略看到的全部特征”?
- 上游一份数据被修订时,能不能列出所有受影响的下游特征和回测?
- 同一份特征在离线和在线两条路径上,跑出来是不是逐字节一致?
- 任何一次回填或重算,能不能不破坏既有研究结果?
- 接到风控质疑”这个因子值是不是错的”,能不能在 5 分钟内说清楚这个值的来源、版本、上游、计算时间?
如果五个问题都能回答”能”,特征仓库这件事就算做到位了。落到实盘里能赚多少,那是策略本身的问题,不再是工程的问题。这一篇能够替你解决的,到此为止。
下一篇 《另类数据:卫星、新闻、社交、链上》 会把视角从”已有数据怎么管”切到”新数据怎么进”——卫星图、新闻 NLP、社交情绪、链上数据,每一类都对本文讨论的 bitemporal 模型提出了新问题。
十三、参考资料
论文与书籍
- Snodgrass, R. T. (1999). Developing Time-Oriented Database Applications in SQL. Morgan Kaufmann. bitemporal 模型的奠基性论述。
- Johnston, T. (2014). Bitemporal Data: Theory and Practice. Morgan Kaufmann. 工业实践视角的补充。
- López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley. 第 4 章关于样本不平衡与”信息时刻”,第 7 章 PIT 与 purged CV。
- Kakodkar, N. (2022). Practical MLOps Feature Store Patterns. O’Reilly Report. Feature Store 在通用 ML 场景的工程经验。
规范与文档
- ISO/IEC 9075:2011 (SQL:2011) Part 2: Foundation. system-versioned tables 的标准定义。
- DuckDB Documentation, ASOF Joins.
https://duckdb.org/docs/sql/query_syntax/from#asof-joins - Polars User Guide, join_asof.
https://docs.pola.rs/ - ClickHouse Documentation, ASOF JOIN Usage.
https://clickhouse.com/docs/en/sql-reference/statements/select/join - QuestDB Documentation, ASOF JOIN.
https://questdb.io/docs/reference/sql/asof-join/
框架与工具
- Feast:
https://docs.feast.dev/,关注其point_in_time_join实现细节。 - Tecton 设计文档:
https://docs.tecton.ai/,特别是 streaming feature 的语义。 - LakeFS:
https://docs.lakefs.io/,面向数据版本化的对象存储抽象。 - DVC:
https://dvc.org/,数据 + 模型版本化的 git 风格工具。
数据源参考
- CRSP US Stock Database Guide。
- Compustat Point-in-Time Database Guide。
- Wind / Choice / 聚源 PIT 财务字段说明。
- 上交所、深交所交易日历公开发布。
上下篇导航
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【量化交易】量化交易全景:从信号到订单的工程链路
量化交易不是策略写得好就能赚钱,更难的是把数据、特征、因子、信号、组合、执行、风控、复盘这八段链路在工程上连成一条不漏数据、不串时间、不丢订单的流水线。本文是【量化交易】系列的总目录与读图,给出八段链路的输入输出、失败模式、不变量清单,并用研究流程图把从一个想法到一笔实盘订单之间所有该过的卡点串起来。
【量化交易】市场结构:交易所、做市商、暗池、ECN
系统梳理全球市场结构(Market Structure)的工程图景:从证券交易所、衍生品交易所、加密交易所,到做市商、暗池、ECN/ATS,再到 Maker-Taker 收费、PFOF、Reg NMS 与 MiFID II 的监管影响;给出量化策略选择交易场所的判断框架与基于 ccxt 的多交易所行情聚合代码。
【量化交易】市场微结构:订单簿、价差、流动性、冲击
系统讲解市场微结构的核心概念与可计算工具:限价订单簿的数据模型、报价/有效/已实现价差、Roll 模型、四维流动性度量、Kyle's lambda、订单流不平衡(OFI)、Almgren-Chriss 框架下的临时与永久冲击、PIN 与 VPIN、Hawkes 过程,并给出基于 polars 的 L2 增量处理与系数估计代码。
【量化交易】订单类型与执行语义:限价、市价、IOC、FOK、冰山
把 Limit、Market、IOC、FOK、Iceberg、Stop、MOO/MOC 这些常被混为一谈的订单类型还原为价格、数量、时效、可见性、触发五个独立维度,并对照 A 股、港股、美股、CME、Binance 五个市场的实际语义差异,给出量化系统中的订单工厂、状态机与风控前置校验的工程实现。