土法炼钢兴趣小组的算法知识备份

DP 在工业界:资源调度、广告投放与路径规划

目录

动态规划(Dynamic Programming)在算法教材里往往以”最长公共子序列”或”背包问题”的面貌出现。然而,当你真正走进工业系统的腹地,会发现 DP 的思想无处不在——只是它被包装成了不同的名字:pacing 算法、Viterbi 解码、策略迭代、序列比对。本文将带你穿越从广告投放到生物信息学的多个工业场景,看看 DP 是如何从教科书走进生产环境的。

一、为什么 DP 在工业界如此重要

1.1 从竞赛到生产

算法竞赛中的 DP 问题通常有一个干净的状态定义和明确的转移方程。但工业场景往往更加复杂:

1.2 DP 的核心优势

尽管有上述挑战,DP 在工业界依然不可替代,原因在于:

  1. 最优子结构:很多工业问题天然具有”大问题可以拆解为小问题”的结构。
  2. 可解释性:相比黑盒模型,DP 的决策过程是透明的,便于调试和审计。
  3. 理论保证:在满足条件的情况下,DP 能给出精确最优解或有界近似解。
  4. 工程可控性:DP 的时间和空间复杂度可以精确分析,便于做系统设计。

1.3 本文覆盖的工业场景

场景 DP 变体 核心挑战
广告 pacing 在线背包 + 拉格朗日松弛 预算约束下的实时竞价
动态定价 有限 horizon MDP 需求不确定性下的价格决策
CDN 调度 背包变体 多级缓存的内容放置
Viterbi 解码 trellis 上的最短路径 隐状态序列的最优推断
序列对齐 编辑距离变体 生物序列的相似性度量
强化学习 近似 DP 连续状态空间的值函数逼近

二、广告 Pacing:预算约束下的在线背包

2.1 问题建模

广告平台每天要为数以万计的广告主分配展示机会。每个广告主有一个日预算 \(B\),平台需要在一天的时间跨度内(\(T\) 个时段)合理分配花费,使得总价值(点击、转化等)最大化。

这本质上是一个在线背包问题

形式化地:

\[ \max \sum_{t=1}^{T} \sum_{i \in \mathcal{I}_t} x_{t,i} \cdot v_{t,i} \]

\[ \text{s.t.} \quad \sum_{t=1}^{T} \sum_{i \in \mathcal{I}_t} x_{t,i} \cdot c_{t,i} \leq B, \quad x_{t,i} \in \{0, 1\} \]

其中 \(\mathcal{I}_t\) 是时段 \(t\) 到来的展示机会集合,\(v_{t,i}\) 是价值,\(c_{t,i}\) 是花费。

2.2 拉格朗日松弛:将约束问题转为无约束

直接求解在线整数规划是 NP-hard 的。拉格朗日松弛的核心思想是引入一个乘子 \(\lambda \geq 0\),将预算约束”吸收”到目标函数中:

\[ L(\lambda) = \max_{x} \sum_{t,i} x_{t,i} (v_{t,i} - \lambda \cdot c_{t,i}) + \lambda B \]

直观理解:\(\lambda\) 是”钱的价格”——每花一块钱,你要付出 \(\lambda\) 的代价。当 \(\lambda\) 很大时,系统会非常节省;当 \(\lambda\) 很小时,系统会大手大脚。

对偶分解的优势:给定 \(\lambda\),每个展示机会的决策变成独立的——只需判断 \(v_{t,i} - \lambda \cdot c_{t,i} > 0\) 即可,完全不需要考虑其他展示机会。这就把一个全局优化问题转化为了大量的局部贪心决策。

2.3 PID 控制器调节 Lambda

如何选择最优的 \(\lambda\)?理论上,我们需要求解对偶问题:

\[ \min_{\lambda \geq 0} L(\lambda) \]

在实际系统中,常用 PID 控制器来动态调节 \(\lambda\)

class PacingPIDController:
    """PID 控制器实现广告预算 pacing。

    通过动态调整 lambda(花费惩罚系数),
    使得广告主的花费曲线尽量贴合理想的 pacing 曲线。
    """

    def __init__(self, budget: float, total_periods: int,
                 kp: float = 0.5, ki: float = 0.1, kd: float = 0.05):
        self.budget = budget
        self.total_periods = total_periods
        self.kp = kp  # 比例系数
        self.ki = ki  # 积分系数
        self.kd = kd  # 微分系数

        self.lambda_val = 1.0       # 初始 lambda
        self.integral_error = 0.0   # 累积误差
        self.prev_error = 0.0       # 上一次误差
        self.spent = 0.0            # 已花费

    def ideal_spend(self, period: int) -> float:
        """理想花费曲线:均匀分配。"""
        return self.budget * (period / self.total_periods)

    def update(self, period: int, period_spend: float) -> float:
        """根据当前花费情况更新 lambda。

        返回值:
            更新后的 lambda 值。
        """
        self.spent += period_spend

        # 误差 = 实际花费 - 理想花费(正值意味着花多了)
        error = self.spent - self.ideal_spend(period)

        # PID 更新
        self.integral_error += error
        derivative = error - self.prev_error
        self.prev_error = error

        delta = (self.kp * error +
                 self.ki * self.integral_error +
                 self.kd * derivative)

        # lambda 越大,越倾向于不花钱
        self.lambda_val = max(0.01, self.lambda_val + delta / self.budget)

        return self.lambda_val

    def should_bid(self, value: float, cost: float) -> bool:
        """根据当前 lambda 决定是否参与竞价。"""
        return value - self.lambda_val * cost > 0

2.4 在线学习 + DP 的混合策略

纯 PID 方法在流量模式稳定时效果不错,但面对流量突变(如突发新闻、促销活动)时反应滞后。工业界常用的改进方案是在线学习与 DP 的混合

  1. 离线阶段:基于历史数据,用 DP 预计算不同流量模式下的最优 \(\lambda\) 表。
def offline_dp_lambda_table(
    budget: float,
    num_periods: int,
    num_budget_levels: int,
    historical_value_distributions: list
) -> list:
    """离线预计算 lambda 查找表。

    dp[t][b] = 从时段 t 开始、剩余预算为 b 时的最优期望价值。
    lambda_table[t][b] = 对应的最优 lambda。
    """
    budget_step = budget / num_budget_levels

    # dp[t][b]: 从时段 t 开始,剩余预算 b 的最优期望价值
    dp = [[0.0] * (num_budget_levels + 1) for _ in range(num_periods + 1)]
    lambda_table = [[0.0] * (num_budget_levels + 1)
                    for _ in range(num_periods)]

    for t in range(num_periods - 1, -1, -1):
        dist = historical_value_distributions[t]
        for b in range(num_budget_levels + 1):
            remaining_budget = b * budget_step
            best_val = dp[t + 1][b]
            best_lambda = 0.0

            # 尝试不同的 lambda 值
            for lam in [i * 0.1 for i in range(1, 51)]:
                expected_val = 0.0
                expected_cost = 0.0

                for v, c, prob in dist:
                    if v - lam * c > 0:
                        expected_val += v * prob
                        expected_cost += c * prob

                # 预计花费后的剩余预算
                new_b = max(0, int((remaining_budget - expected_cost)
                                   / budget_step))
                total = expected_val + dp[t + 1][new_b]

                if total > best_val:
                    best_val = total
                    best_lambda = lam

            dp[t][b] = best_val
            lambda_table[t][b] = best_lambda

    return lambda_table
  1. 在线阶段:用在线学习算法(如 multiplicative weights 或 online gradient descent)实时修正 \(\lambda\)
class OnlineLambdaLearner:
    """在线梯度下降更新 lambda。"""

    def __init__(self, lambda_table: list, learning_rate: float = 0.01):
        self.lambda_table = lambda_table
        self.lr = learning_rate
        self.lambda_correction = 0.0

    def get_lambda(self, period: int, remaining_budget_level: int) -> float:
        """获取修正后的 lambda。"""
        base_lambda = self.lambda_table[period][remaining_budget_level]
        return max(0.01, base_lambda + self.lambda_correction)

    def update(self, gradient: float):
        """根据梯度更新修正量。

        gradient > 0 表示花费偏多,需要增大 lambda。
        """
        self.lambda_correction += self.lr * gradient

2.5 工程细节

广告 pacing 的工程陷阱:流量预估误差会使离线表失效,需多套流量模式表并根据实时信号切换;多广告主并发竞价导致相互影响,需考虑纳什均衡或迭代求解;预算将尽时 \(\lambda\) 应急剧增大,但过于激进会放弃尾部优质流量,实践中常设保底出价策略。

三、动态定价:Bellman 方程的近似求解

3.1 有限 Horizon MDP 建模

电商平台需要为商品动态定价:价格太高则无人购买,价格太低则利润流失。当库存有限时,这变成了一个有限时间范围的马尔可夫决策过程(MDP)。

状态\((t, s)\),其中 \(t\) 是当前时段,\(s\) 是剩余库存。

动作:设定价格 \(p \in \mathcal{P}\)(离散价格集合)。

转移:给定价格 \(p\),需求量 \(d\) 服从某个分布 \(D(p)\)(通常价格越高需求越小)。

\[ P(s' | s, p) = P(d = s - s') = D(p)(s - s') \]

收益\(r(s, p, d) = p \cdot \min(d, s)\)

Bellman 方程

\[ V_t(s) = \max_{p \in \mathcal{P}} \left\{ \mathbb{E}_{d \sim D(p)} \left[ p \cdot \min(d, s) + V_{t+1}(s - \min(d, s)) \right] \right\} \]

3.2 价值迭代

import numpy as np
from typing import Tuple


def dynamic_pricing_value_iteration(
    num_periods: int,
    max_inventory: int,
    prices: list,
    demand_pmf: callable,
    discount: float = 1.0
) -> Tuple[np.ndarray, np.ndarray]:
    """有限 horizon 动态定价的价值迭代。

    参数:
        num_periods: 总时段数。
        max_inventory: 最大库存量。
        prices: 可选价格列表。
        demand_pmf: demand_pmf(price, d) 返回价格为 price 时需求为 d 的概率。
        discount: 折扣因子。

    返回:
        (V, policy): V[t][s] 是最优值函数,policy[t][s] 是最优价格。
    """
    V = np.zeros((num_periods + 1, max_inventory + 1))
    policy = np.zeros((num_periods, max_inventory + 1))

    # 从最后一个时段往前迭代
    for t in range(num_periods - 1, -1, -1):
        for s in range(max_inventory + 1):
            if s == 0:
                V[t][s] = 0.0
                continue

            best_val = -np.inf
            best_price = prices[0]

            for p in prices:
                expected = 0.0
                for d in range(s + 1):
                    prob = demand_pmf(p, d)
                    sold = min(d, s)
                    revenue = p * sold
                    future = V[t + 1][s - sold]
                    expected += prob * (revenue + discount * future)

                # 需求可能超过库存
                for d in range(s + 1, s + 50):
                    prob = demand_pmf(p, d)
                    if prob < 1e-10:
                        break
                    revenue = p * s  # 全部卖完
                    future = V[t + 1][0]
                    expected += prob * (revenue + discount * future)

                if expected > best_val:
                    best_val = expected
                    best_price = p

            V[t][s] = best_val
            policy[t][s] = best_price

    return V, policy

3.3 策略迭代

策略迭代是价值迭代的替代方案,通过交替执行”策略评估”和”策略改进”两步,在某些情况下收敛更快。其核心区别在于:价值迭代每次只做一步 Bellman 更新,而策略迭代在当前策略下做完整的评估(多次迭代直到收敛),再做策略改进。

在实际的动态定价系统中,两种方法常常混合使用——用价值迭代快速得到初始策略,再用策略迭代精细调优。

3.4 近似方法

当状态空间太大时(高维库存、连续价格),精确 DP 不可行。工业界常用:特征化值函数 \(V(s) \approx \theta^T \phi(s)\)仿真优化(MCTS、模拟退火)搜索策略空间;分解方法将多商品问题分解为单商品子问题再用拉格朗日乘子协调。

四、CDN 调度:多级缓存的内容放置问题

4.1 问题定义

CDN(Content Delivery Network)需要决定将哪些内容缓存在哪些节点上。这是一个经典的背包问题变体:

4.2 离线策略:经典 0-1 背包

如果我们知道未来一段时间的访问模式,可以用经典 0-1 背包的 DP 求解:

def cdn_cache_placement_dp(
    items: list,
    capacity: int
) -> Tuple[float, list]:
    """CDN 缓存放置的 0-1 背包 DP。

    参数:
        items: [(content_id, size, value), ...] 内容列表。
        capacity: 缓存容量(归一化为整数单位)。

    返回:
        (max_value, selected_ids): 最大价值和选中的内容 ID。
    """
    n = len(items)
    dp = [0.0] * (capacity + 1)
    choice = [[False] * (capacity + 1) for _ in range(n)]

    for i in range(n):
        cid, size, value = items[i]
        for c in range(capacity, size - 1, -1):
            if dp[c - size] + value > dp[c]:
                dp[c] = dp[c - size] + value
                choice[i][c] = True

    # 回溯
    selected = []
    c = capacity
    for i in range(n - 1, -1, -1):
        if choice[i][c]:
            selected.append(items[i][0])
            c -= items[i][1]

    return dp[capacity], selected

4.3 在线策略

现实中的 CDN 不可能预知未来的访问模式,需要在线策略:

LRU / LFU:最常见的启发式方法。LRU 淘汰最久未使用的内容,LFU 淘汰访问频率最低的内容。这些方法简单高效,但没有理论最优性保证。

在线背包算法:对每个到来的请求,基于当前缓存状态和内容价值做决策。关键是如何估计内容的”未来价值”——这又回到了 Bellman 方程。

class OnlineCDNCache:
    """基于阈值的在线缓存策略。

    使用类似广告 pacing 的 lambda 阈值机制。
    """

    def __init__(self, capacity: int, threshold: float = 1.0):
        self.capacity = capacity
        self.threshold = threshold  # 类似 lambda 的阈值
        self.cache = {}  # content_id -> (size, estimated_value)
        self.used = 0

    def _value_density(self, value: float, size: int) -> float:
        """单位存储的价值密度。"""
        return value / max(size, 1)

    def request(self, content_id: str, size: int,
                estimated_value: float) -> bool:
        """处理一个内容请求。

        如果内容不在缓存中,决定是否缓存。
        返回是否命中缓存。
        """
        if content_id in self.cache:
            # 命中,更新价值估计
            self.cache[content_id] = (size, estimated_value)
            return True

        density = self._value_density(estimated_value, size)
        if density < self.threshold:
            return False  # 价值密度不够,不缓存

        # 如果空间不够,淘汰低密度内容
        while self.used + size > self.capacity and self.cache:
            min_id = min(self.cache,
                         key=lambda k: self._value_density(
                             self.cache[k][1], self.cache[k][0]))
            min_density = self._value_density(
                self.cache[min_id][1], self.cache[min_id][0])
            if min_density >= density:
                break  # 淘汰代价太高
            self.used -= self.cache[min_id][0]
            del self.cache[min_id]

        if self.used + size <= self.capacity:
            self.cache[content_id] = (size, estimated_value)
            self.used += size
            return False  # 未命中但已缓存

        return False  # 未命中且未缓存

4.4 多级缓存

现实的 CDN 通常是多级的:边缘节点 → 区域节点 → 源站。边缘节点容量小但延迟低,区域节点容量大但延迟高。这可以建模为一个树形背包问题,其中每个节点的决策依赖于其父节点的缓存状态。精确求解仍可用 DP,但规模往往需要近似算法。

五、Viterbi 算法:HMM 最优路径

5.1 隐马尔可夫模型回顾

隐马尔可夫模型(HMM)由以下要素组成:

给定观测序列 \(O = (O_1, O_2, \ldots, O_T)\)解码问题是找到最可能的隐状态序列:

\[ Q^* = \arg\max_{Q} P(Q | O, \lambda) \]

5.2 Trellis 图上的最短路径 DP

Viterbi 算法的核心思想是在 trellis 图上做 DP。定义 Viterbi 变量:

\[ \delta_t(j) = \max_{q_1, \ldots, q_{t-1}} P(q_1, \ldots, q_{t-1}, q_t = s_j, O_1, \ldots, O_t | \lambda) \]

递推关系:

\[ \delta_{t+1}(j) = \max_{1 \leq i \leq N} [\delta_t(i) \cdot a_{ij}] \cdot b_j(O_{t+1}) \]

回溯指针:

\[ \psi_{t+1}(j) = \arg\max_{1 \leq i \leq N} [\delta_t(i) \cdot a_{ij}] \]

下图展示了 Viterbi 算法在 trellis 图上的运行过程:

Viterbi Trellis 图示意

图中紫色粗线标出了最优路径——即概率最大的隐状态序列。每个节点上的 \(\delta\) 值是到达该节点的最大累积概率。算法从左到右计算所有 \(\delta\) 值,然后从右到左回溯得到最优路径。

5.3 完整实现

import numpy as np
from typing import List, Tuple


class HiddenMarkovModel:
    """隐马尔可夫模型的完整实现。"""

    def __init__(self, num_states: int, num_observations: int):
        self.N = num_states
        self.M = num_observations
        self.pi = np.ones(num_states) / num_states
        self.A = np.ones((num_states, num_states)) / num_states
        self.B = np.ones((num_states, num_observations)) / num_observations

    def set_params(self, pi: np.ndarray, A: np.ndarray, B: np.ndarray):
        """设置模型参数。"""
        self.pi = pi.copy()
        self.A = A.copy()
        self.B = B.copy()

    def viterbi(self, observations: List[int]) -> Tuple[List[int], float]:
        """Viterbi 算法:求最优隐状态序列。

        参数:
            observations: 观测序列(整数索引)。

        返回:
            (best_path, log_prob): 最优路径和对应的对数概率。
        """
        T = len(observations)
        N = self.N

        # 使用对数概率避免下溢
        log_pi = np.log(self.pi + 1e-300)
        log_A = np.log(self.A + 1e-300)
        log_B = np.log(self.B + 1e-300)

        # delta[t][j]: 在时刻 t 处于状态 j 的最大对数概率
        delta = np.full((T, N), -np.inf)
        # psi[t][j]: 回溯指针
        psi = np.zeros((T, N), dtype=int)

        # 初始化
        delta[0] = log_pi + log_B[:, observations[0]]

        # 递推
        for t in range(1, T):
            for j in range(N):
                # delta[t][j] = max_i(delta[t-1][i] + log_A[i][j]) + log_B[j][o_t]
                scores = delta[t - 1] + log_A[:, j]
                psi[t][j] = np.argmax(scores)
                delta[t][j] = scores[psi[t][j]] + log_B[j, observations[t]]

        # 终止
        best_last = np.argmax(delta[T - 1])
        log_prob = delta[T - 1][best_last]

        # 回溯
        path = [0] * T
        path[T - 1] = best_last
        for t in range(T - 2, -1, -1):
            path[t] = psi[t + 1][path[t + 1]]

        return path, log_prob

    def forward(self, observations: List[int]) -> float:
        """前向算法:计算观测序列的对数似然。"""
        T = len(observations)
        log_alpha = np.log(self.pi + 1e-300) + np.log(
            self.B[:, observations[0]] + 1e-300)

        for t in range(1, T):
            new_log_alpha = np.full(self.N, -np.inf)
            for j in range(self.N):
                log_terms = log_alpha + np.log(self.A[:, j] + 1e-300)
                max_term = np.max(log_terms)
                new_log_alpha[j] = (max_term
                                    + np.log(np.sum(np.exp(log_terms - max_term)))
                                    + np.log(self.B[j, observations[t]] + 1e-300))
            log_alpha = new_log_alpha

        max_alpha = np.max(log_alpha)
        return max_alpha + np.log(np.sum(np.exp(log_alpha - max_alpha)))

5.4 在语音识别中的应用

语音识别是 Viterbi 算法最经典的应用场景之一:

在实际的语音识别系统中,HMM 与深度学习结合:DNN 替代 GMM 计算发射概率 \(b_j(o)\);Viterbi 仍用于音素级最优路径;语言模型通过 WFST 与声学模型合并后,在合并图上做 Viterbi 搜索。

5.5 GPS Map Matching

GPS map matching 是 Viterbi 算法在地理信息领域的一个精彩应用。给定一条嘈杂的 GPS 轨迹,需要将其映射到道路网络上的一条路径。

建模为 HMM

import math
from typing import List, Tuple, Dict


class GPSMapMatcher:
    """基于 HMM + Viterbi 的 GPS 轨迹匹配。"""

    def __init__(self, sigma_z: float = 4.07, beta: float = 3.0):
        """
        参数:
            sigma_z: GPS 测量噪声的标准差(米)。
            beta: 转移概率的参数。
        """
        self.sigma_z = sigma_z
        self.beta = beta

    def emission_prob(self, gps_point: Tuple[float, float],
                      candidate_point: Tuple[float, float]) -> float:
        """发射概率:GPS 点与候选路段投影点之间的距离。"""
        dist = self._haversine(gps_point, candidate_point)
        return (1.0 / (math.sqrt(2 * math.pi) * self.sigma_z)
                * math.exp(-0.5 * (dist / self.sigma_z) ** 2))

    def transition_prob(self, route_dist: float,
                        great_circle_dist: float) -> float:
        """转移概率:路网距离与直线距离的差异。"""
        dt = abs(route_dist - great_circle_dist)
        return (1.0 / self.beta) * math.exp(-dt / self.beta)

    def match(
        self,
        gps_points: List[Tuple[float, float]],
        candidates: List[List[Dict]],
        route_distances: callable
    ) -> List[int]:
        """Viterbi 解码匹配 GPS 轨迹到道路网络。

        参数:
            gps_points: GPS 采样点序列。
            candidates: candidates[t] 是第 t 个 GPS 点的候选路段列表。
                每个候选是 {'segment_id': int, 'proj_point': (lat, lon)}。
            route_distances: 函数,输入两个路段 ID,返回路网最短距离。

        返回:
            匹配到的路段 ID 序列。
        """
        T = len(gps_points)

        # 初始化
        prev_delta = {}
        prev_psi = {}

        for i, cand in enumerate(candidates[0]):
            prob = self.emission_prob(gps_points[0], cand['proj_point'])
            prev_delta[i] = math.log(max(prob, 1e-300))
            prev_psi[i] = -1

        # 记录回溯路径
        all_psi = [{}]

        # 递推
        for t in range(1, T):
            curr_delta = {}
            curr_psi = {}
            gc_dist = self._haversine(gps_points[t - 1], gps_points[t])

            for j, cand_j in enumerate(candidates[t]):
                emit = self.emission_prob(gps_points[t], cand_j['proj_point'])
                log_emit = math.log(max(emit, 1e-300))

                best_score = -math.inf
                best_prev = 0

                for i, cand_i in enumerate(candidates[t - 1]):
                    if i not in prev_delta:
                        continue
                    rd = route_distances(cand_i['segment_id'],
                                         cand_j['segment_id'])
                    trans = self.transition_prob(rd, gc_dist)
                    score = prev_delta[i] + math.log(max(trans, 1e-300))

                    if score > best_score:
                        best_score = score
                        best_prev = i

                curr_delta[j] = best_score + log_emit
                curr_psi[j] = best_prev

            prev_delta = curr_delta
            all_psi.append(curr_psi)

        # 终止
        best_j = max(prev_delta, key=prev_delta.get)

        # 回溯
        path_indices = [0] * T
        path_indices[T - 1] = best_j
        for t in range(T - 2, -1, -1):
            path_indices[t] = all_psi[t + 1][path_indices[t + 1]]

        return [candidates[t][path_indices[t]]['segment_id']
                for t in range(T)]

    @staticmethod
    def _haversine(p1: Tuple[float, float],
                   p2: Tuple[float, float]) -> float:
        """Haversine 公式计算两点之间的距离(米)。"""
        R = 6371000.0
        lat1, lon1 = math.radians(p1[0]), math.radians(p1[1])
        lat2, lon2 = math.radians(p2[0]), math.radians(p2[1])
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = (math.sin(dlat / 2) ** 2
             + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2)
        return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))


## 六、序列对齐:生物信息学的 Needleman-Wunsch / Smith-Waterman

### 6.1 全局对齐:Needleman-Wunsch

生物信息学中的序列对齐是 DP 最早的工业应用之一。给定两条蛋白质或 DNA 序列,我们需要找到它们之间的最佳对齐方式。

Needleman-Wunsch 算法求解**全局对齐**——将两条序列从头到尾完整对齐:

**状态**:$F(i, j)$ 表示将序列 $A$ 的前 $i$ 个字符与序列 $B$ 的前 $j$ 个字符对齐的最优得分。

**转移方程**

$$
F(i, j) = \max \begin{cases}
F(i-1, j-1) + S(A_i, B_j) & \text{(匹配/替换)} \\
F(i-1, j) + d & \text{(A 中插入空位)} \\
F(i, j-1) + d & \text{(B 中插入空位)}
\end{cases}
$$

其中 $S(A_i, B_j)$ 是替换矩阵(如 BLOSUM62),$d$ 是空位罚分。

```python
def needleman_wunsch(
    seq_a: str,
    seq_b: str,
    match_score: int = 1,
    mismatch_score: int = -1,
    gap_penalty: int = -2
) -> Tuple[str, str, int]:
    """Needleman-Wunsch 全局序列对齐。

    返回:
        (aligned_a, aligned_b, score): 对齐后的序列和得分。
    """
    m, n = len(seq_a), len(seq_b)

    # 初始化得分矩阵
    F = [[0] * (n + 1) for _ in range(m + 1)]
    for i in range(m + 1):
        F[i][0] = i * gap_penalty
    for j in range(n + 1):
        F[0][j] = j * gap_penalty

    # 填表
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if seq_a[i - 1] == seq_b[j - 1]:
                diag = F[i - 1][j - 1] + match_score
            else:
                diag = F[i - 1][j - 1] + mismatch_score
            up = F[i - 1][j] + gap_penalty
            left = F[i][j - 1] + gap_penalty
            F[i][j] = max(diag, up, left)

    # 回溯
    aligned_a, aligned_b = [], []
    i, j = m, n

    while i > 0 or j > 0:
        if i > 0 and j > 0:
            if seq_a[i - 1] == seq_b[j - 1]:
                score_diag = F[i - 1][j - 1] + match_score
            else:
                score_diag = F[i - 1][j - 1] + mismatch_score

        if i > 0 and j > 0 and F[i][j] == score_diag:
            aligned_a.append(seq_a[i - 1])
            aligned_b.append(seq_b[j - 1])
            i -= 1
            j -= 1
        elif i > 0 and F[i][j] == F[i - 1][j] + gap_penalty:
            aligned_a.append(seq_a[i - 1])
            aligned_b.append('-')
            i -= 1
        else:
            aligned_a.append('-')
            aligned_b.append(seq_b[j - 1])
            j -= 1

    return ''.join(reversed(aligned_a)), ''.join(reversed(aligned_b)), F[m][n]

6.2 局部对齐:Smith-Waterman

Smith-Waterman 算法求解局部对齐——找到两条序列中得分最高的子序列对齐。与 Needleman-Wunsch 的区别在于:

  1. \(F(i, j)\) 的最小值为 0(允许”重新开始”)。
  2. 最优对齐不一定从右下角开始,而是从得分最高的单元格开始。
def smith_waterman(
    seq_a: str,
    seq_b: str,
    match_score: int = 2,
    mismatch_score: int = -1,
    gap_penalty: int = -1
) -> Tuple[str, str, int]:
    """Smith-Waterman 局部序列对齐。"""
    m, n = len(seq_a), len(seq_b)

    F = [[0] * (n + 1) for _ in range(m + 1)]
    max_score = 0
    max_pos = (0, 0)

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if seq_a[i - 1] == seq_b[j - 1]:
                diag = F[i - 1][j - 1] + match_score
            else:
                diag = F[i - 1][j - 1] + mismatch_score
            up = F[i - 1][j] + gap_penalty
            left = F[i][j - 1] + gap_penalty
            F[i][j] = max(0, diag, up, left)

            if F[i][j] > max_score:
                max_score = F[i][j]
                max_pos = (i, j)

    # 回溯(直到得分为 0)
    aligned_a, aligned_b = [], []
    i, j = max_pos

    while i > 0 and j > 0 and F[i][j] > 0:
        if seq_a[i - 1] == seq_b[j - 1]:
            score_diag = F[i - 1][j - 1] + match_score
        else:
            score_diag = F[i - 1][j - 1] + mismatch_score

        if F[i][j] == score_diag:
            aligned_a.append(seq_a[i - 1])
            aligned_b.append(seq_b[j - 1])
            i -= 1
            j -= 1
        elif F[i][j] == F[i - 1][j] + gap_penalty:
            aligned_a.append(seq_a[i - 1])
            aligned_b.append('-')
            i -= 1
        else:
            aligned_a.append('-')
            aligned_b.append(seq_b[j - 1])
            j -= 1

    return ''.join(reversed(aligned_a)), ''.join(reversed(aligned_b)), max_score

6.3 工程优化

生物信息学中的序列比对面临规模挑战——基因组有数十亿碱基对。常见优化:Banded alignment 只计算对角线附近带状区域,将 \(O(mn)\) 降为 \(O(mk)\)SIMD 加速利用 CPU 向量指令并行计算;Hirschberg 算法用分治法将空间降为 \(O(\min(m,n))\)种子-延伸先用 k-mer 索引找候选区域,再局部精确对齐(BLAST 的核心思想)。

七、近似 DP 与强化学习的联系

7.1 从 Bellman 方程到 Q-learning

DP 和强化学习(RL)之间的联系比很多人想象的要深刻。事实上,RL 的理论基础就是近似动态规划。

Bellman 最优方程(DP 的核心):

\[ V^*(s) = \max_{a} \left[ R(s, a) + \gamma \sum_{s'} P(s' | s, a) V^*(s') \right] \]

Q-learning 更新(RL 的核心算法之一):

\[ Q(s, a) \leftarrow Q(s, a) + \alpha \left[ r + \gamma \max_{a'} Q(s', a') - Q(s, a) \right] \]

对比可以发现,Q-learning 本质上是 Bellman 最优方程的随机近似(stochastic approximation)——不需要知道转移概率 \(P(s'|s,a)\),而是通过采样来逼近期望。

7.2 DP、蒙特卡洛与 TD 学习的统一视角

方法 需要模型? 更新方式 bootstrapping?
动态规划 全状态扫描
蒙特卡洛 完整轨迹采样
TD(0) 单步采样
TD(lambda) 多步采样

这三者构成了一个连续谱:

7.3 函数近似与深度 RL

当状态空间太大时,无法为每个状态存储一个值。这时需要用函数近似:

\[ V(s) \approx V(s; \theta) = \theta^T \phi(s) \]

或者用深度神经网络:

\[ V(s) \approx V_\theta(s) = \text{DNN}_\theta(s) \]

这就是近似动态规划(Approximate Dynamic Programming, ADP),也是现代深度强化学习的理论基础。

class SimpleValueNetwork:
    """简单的值函数近似网络(线性函数近似)。

    实际系统中通常用深度神经网络替代。
    """

    def __init__(self, feature_dim: int, learning_rate: float = 0.01):
        self.weights = np.zeros(feature_dim)
        self.lr = learning_rate

    def predict(self, features: np.ndarray) -> float:
        """预测状态值。"""
        return np.dot(self.weights, features)

    def update(self, features: np.ndarray, target: float):
        """TD 更新。"""
        prediction = self.predict(features)
        td_error = target - prediction
        self.weights += self.lr * td_error * features


class QLearningAgent:
    """基于表格的 Q-learning 实现。"""

    def __init__(self, num_states: int, num_actions: int,
                 alpha: float = 0.1, gamma: float = 0.99,
                 epsilon: float = 0.1):
        self.Q = np.zeros((num_states, num_actions))
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon

    def choose_action(self, state: int) -> int:
        """epsilon-greedy 策略选择动作。"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.Q.shape[1])
        return int(np.argmax(self.Q[state]))

    def update(self, state: int, action: int, reward: float,
               next_state: int, done: bool):
        """Q-learning 更新。"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.Q[next_state])

        self.Q[state, action] += self.alpha * (target - self.Q[state, action])

7.4 工业界的 RL 应用

近似 DP / RL 在工业界的实际应用:推荐系统将用户会话建模为 MDP 来优化长期留存;Google 使用 RL 优化数据中心冷却系统节省约 40% 能源;自动驾驶路径规划大量使用近似 DP;金融交易的最优执行策略可建模为有限 horizon MDP。

八、完整演示:Pacing 模拟 + Viterbi 解码

将前文的各个组件串联起来,构成可运行的完整演示。

8.1 广告 Pacing 模拟

import random
from dataclasses import dataclass
from typing import List


@dataclass
class Impression:
    """一次展示机会。"""
    value: float   # 预估价值(CTR * bid)
    cost: float    # 花费(CPC)


def generate_traffic(num_periods: int, imps_per_period: int,
                     seed: int = 42) -> List[List[Impression]]:
    """生成日内流量,上午/晚间高峰,下午低谷。"""
    rng = random.Random(seed)
    traffic = []
    for t in range(num_periods):
        hour = (t * 24) // num_periods
        if 8 <= hour <= 11 or 19 <= hour <= 22:
            mult = 1.5
        elif 12 <= hour <= 17:
            mult = 0.8
        else:
            mult = 0.5
        n = int(imps_per_period * mult)
        period = []
        for _ in range(n):
            v = rng.expovariate(1.0) * 0.5
            c = v * rng.uniform(0.3, 0.8)
            period.append(Impression(value=v, cost=c))
        traffic.append(period)
    return traffic


def simulate_pacing(budget: float, traffic: List[List[Impression]],
                    ctrl: PacingPIDController) -> dict:
    """模拟完整 pacing 过程并返回统计信息。"""
    total_val, total_cost, total_imp, won = 0.0, 0.0, 0, 0
    for t, imps in enumerate(traffic):
        p_cost = 0.0
        for imp in imps:
            total_imp += 1
            if total_cost >= budget:
                break
            if ctrl.should_bid(imp.value, imp.cost):
                spend = min(imp.cost, budget - total_cost)
                total_val += imp.value
                total_cost += spend
                p_cost += spend
                won += 1
        ctrl.update(t + 1, p_cost)

    return {'value': total_val, 'cost': total_cost,
            'impressions': total_imp, 'won': won,
            'utilization': total_cost / budget}


# 运行演示
budget, periods = 100.0, 24
traffic = generate_traffic(periods, 200)
ctrl = PacingPIDController(budget, periods, kp=0.3, ki=0.05, kd=0.02)
result = simulate_pacing(budget, traffic, ctrl)
print(f"价值={result['value']:.2f}  花费={result['cost']:.2f}/{budget}"
      f"  利用率={result['utilization']:.1%}  胜率={result['won']/result['impressions']:.1%}")

8.2 Viterbi 解码演示

def run_viterbi_demo():
    """天气-活动 HMM 解码演示。"""
    # 隐状态:0=晴天, 1=阴天, 2=雨天;观测:0=散步, 1=购物, 2=清洁
    hmm = HiddenMarkovModel(num_states=3, num_observations=3)
    hmm.set_params(
        pi=np.array([0.6, 0.3, 0.1]),
        A=np.array([[0.7, 0.2, 0.1],
                     [0.3, 0.4, 0.3],
                     [0.2, 0.3, 0.5]]),
        B=np.array([[0.6, 0.3, 0.1],   # 晴天
                     [0.3, 0.4, 0.3],   # 阴天
                     [0.1, 0.4, 0.5]])  # 雨天
    )
    observations = [0, 1, 2, 2, 1, 0, 0, 1, 2]
    names = ['晴天', '阴天', '雨天']
    path, lp = hmm.viterbi(observations)
    print("最优路径:", ' -> '.join(names[s] for s in path))
    print(f"对数概率: {lp:.4f}  概率: {np.exp(lp):.2e}")

run_viterbi_demo()

九、工程实践中的陷阱与技巧

9.1 常见问题清单

问题 场景 根因 对策
数值下溢 Viterbi、前向算法 连乘概率趋近于零 使用对数概率空间
状态空间爆炸 动态定价、CDN 调度 状态维度过高 状态聚合、函数近似
维度灾难 多约束背包 约束数量多 拉格朗日松弛分解约束
模型失配 广告 pacing 流量模式漂移 在线学习实时校正
过拟合历史 离线 DP 表 历史不代表未来 多场景表 + 在线切换
延迟反馈 广告转化、定价 花费即时但价值延迟 乐观估计 + 后修正
并发冲突 多广告主 pacing 共享流量池竞争 分布式 dual decomposition
序列长度限制 Viterbi 解码 长序列内存爆炸 分段解码 + 波束搜索
空位罚分选择 序列对齐 不同罚分策略结果差异大 仿射空位罚分(开启+延伸)
折扣因子敏感 MDP 定价 gamma 的微小变化影响策略 敏感性分析 + 稳健优化

9.2 性能优化

计算优化:稀疏转移矩阵只遍历非零转移,从 \(O(N^2 T)\) 降为 \(O(|E| T)\);对数空间计算配合 log-sum-exp 保持数值稳定;多条序列的 Viterbi 解码用矩阵运算批量处理以利用 GPU。

内存优化:滚动数组将空间从 \(O(T \times N)\) 降为 \(O(N)\);Hirschberg 技巧在 \(O(\min(m,n))\) 空间完成回溯;位运算或哈希表表示可达状态集合。

9.3 调试建议

  1. 小规模验证:先在手动可验证的小实例上确认正确性。
  2. 对比暴力解:小规模输入用暴力枚举验证 DP 结果。
  3. 不变量检查:递推过程中加入断言,检查概率归一化、单调性等性质。
  4. 可视化:将 DP 表或 trellis 图可视化,直观检查合理性。

十、不同 DP 变体的对比

10.1 精确 DP vs 近似 DP

维度 精确 DP 近似 DP
适用范围 小状态空间 大/连续状态空间
最优性 保证最优 近似最优
计算成本 \(O(\|S\| \times \|A\|)\) 每次迭代 取决于近似方法
典型应用 背包、Viterbi、序列对齐 推荐系统、自动驾驶
工程复杂度 较低 较高(需要训练)

10.2 在线 vs 离线策略

维度 离线策略 在线策略
信息假设 知道所有输入 输入逐个到来
最优性 全局最优 竞争比保证
计算时机 提前计算 实时计算
典型应用 CDN 周期性缓存规划 广告实时竞价
适应性 差(需要重新计算) 好(动态调整)

十一、个人思考

11.1 DP 的本质是什么

经过这么多工业场景的讨论,我想谈谈对 DP 本质的理解。

DP 的核心不是”记忆化”,也不是”填表”——这些只是实现手段。DP 的本质是一种结构化的决策分解:将一个复杂的序贯决策问题,分解为一系列相互关联的子问题,并利用子问题之间的递推关系高效求解。

Bellman 方程 \(V(s) = \max_a [R(s,a) + \gamma V(s')]\) 简洁地表达了这种分解:当前的最优决策只取决于当前状态和未来的最优值函数。这就是”最优子结构”的精髓——你不需要记住是怎么到达当前状态的,只需要知道从这里出发如何走最好。

11.2 教科书与工业界的鸿沟

教科书上的 DP 往往给人一种”只要写对转移方程就行”的错觉。但在工业实践中,转移方程只是冰山一角。真正的挑战在于:

  1. 状态设计:如何将复杂的现实问题抽象为合适的状态表示。状态太粗则丢失关键信息,太细则计算不可行。
  2. 在线化:将离线 DP 方案改造为能够实时决策的在线算法。
  3. 鲁棒性:模型参数不准时,DP 的策略如何优雅地退化而不是灾难性地崩溃。
  4. 系统集成:如何将 DP 模块嵌入到大规模分布式系统中,处理延迟、并发、故障等工程问题。

11.3 DP 与机器学习的融合

近年来两者融合加速:可微分 DP 将递推嵌入神经网络做端到端训练;学习搜索用学习的启发式函数指导 DP 搜索(如 AlphaFold);模型基础 RL 先学世界模型再做 DP 规划(如 MuZero)。我认为,纯 DP 需要精确模型假设,纯学习需要大量数据,未来方向是用学习估计模型参数,用 DP 利用问题结构做高效规划

11.4 给工程师的建议

  1. 先确认问题结构:检查是否存在最优子结构和重叠子问题。
  2. 从简单模型开始:用最简化的状态定义验证思路,再逐步复杂化。
  3. 重视近似方案:快速、稳定、接近最优的方案往往比缓慢的精确方案更有价值。
  4. 关注系统全局:DP 模块的输出影响下游,下游反馈又改变 DP 输入。避免局部最优陷阱。

十二、总结与参考

12.1 核心要点

本文覆盖了 DP 在工业界的六大应用场景:广告 pacing(在线背包 + 拉格朗日松弛 + PID)、动态定价(有限 horizon MDP + 价值迭代)、CDN 调度(背包变体 + 在线/离线策略)、Viterbi 解码(trellis 最短路径 + GPS 轨迹匹配)、序列对齐(NW / SW 算法)、近似 DP 与 RL(Bellman 方程 → Q-learning)。

贯穿所有场景的核心思想:利用递推结构,将全局优化分解为局部决策的链式组合

12.2 一句话总结每个技术

技术 一句话总结
拉格朗日松弛 为约束标价,把约束优化变成无约束优化
PID 控制 用误差信号的比例-积分-微分来动态调节参数
价值迭代 从终态开始,逐步往前算出每个状态的最优值
Viterbi 在 trellis 图上做 DP,找概率最大的隐状态路径
Needleman-Wunsch 两条序列从头到尾的最优全局对齐
Smith-Waterman 两条序列中得分最高的局部子序列对齐
Q-learning Bellman 方程的无模型随机近似

12.3 参考文献

  1. Bellman, R. (1957). Dynamic Programming. Princeton University Press.
  2. Viterbi, A. (1967). “Error bounds for convolutional codes and an asymptotically optimum decoding algorithm.” IEEE Transactions on Information Theory.
  3. Needleman, S. B., & Wunsch, C. D. (1970). “A general method applicable to the search for similarities in the amino acid sequence of two proteins.” Journal of Molecular Biology.
  4. Smith, T. F., & Waterman, M. S. (1981). “Identification of common molecular subsequences.” Journal of Molecular Biology.
  5. Newstadt, P., et al. (2009). “Hidden Markov map matching through noise and sparseness.” ACM SIGSPATIAL.
  6. Agarwal, D., et al. (2014). “Budget pacing for targeted online advertisements at LinkedIn.” KDD.
  7. Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). MIT Press.
  8. Bertsekas, D. P. (2019). Reinforcement Learning and Optimal Control. Athena Scientific.
  9. Powell, W. B. (2011). Approximate Dynamic Programming: Solving the Curses of Dimensionality (2nd ed.). Wiley.
  10. Gallager, R. G. (2008). Principles of Digital Communication. Cambridge University Press.

算法系列导航上一篇:区间 DP 与矩阵链乘 | 下一篇:快速傅里叶变换

相关阅读PageRank 与随机游走 | 负载均衡算法


By .