土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】日志架构:从 printf 到结构化日志管道

文章导航

分类入口
architecture
标签入口
#logging#structured-logging#ELK#Loki#log-pipeline

目录

一个 Go 服务在生产环境崩溃了,日志文件里有这么一行:

2024-03-15 14:23:07 ERROR: failed to process request

你看着这行日志,想回答几个问题:哪个用户的请求?请求的参数是什么?失败的原因是什么?这个错误和五分钟前另一个服务的超时有没有关系?——这行日志一个都答不了。

这不是某个工程师写日志的水平问题,这是非结构化日志(Unstructured Logging)的系统性缺陷。当系统只有一个进程、跑在一台机器上、日志量每天几十 MB 时,grep 加肉眼扫描勉强能用。但当系统变成 200 个微服务、跑在 500 个 Pod 上、每天产生 2 TB 日志时,非结构化日志就变成了噪音——你有海量的数据,但几乎没有信息。

分布式系统的日志架构要解决三个层次的问题:

  1. 格式问题:日志应该以什么结构产生?哪些字段是必须的?日志级别的语义是什么?
  2. 管道问题:日志从产生到可查询,中间经过哪些组件?怎么扛住流量洪峰?
  3. 存储与查询问题:日志存在哪里?怎么查?存多久?花多少钱?

上一篇 中我们讨论了密钥与证书管理的架构设计,本文聚焦可观测性(Observability)三大支柱之一的日志——另外两个支柱是 下一篇 要讨论的指标(Metrics)和链路追踪(Tracing)。


一、非结构化日志为什么在分布式系统中失效

1.1 一个真实的排障场景

某电商平台的订单服务报警:下单成功率从 99.5% 降到 95%。值班工程师打开日志系统,搜索 level:ERROR,得到上万条结果。日志长这样:

2024-03-15 14:23:07 ERROR failed to call payment service
2024-03-15 14:23:07 ERROR timeout
2024-03-15 14:23:08 ERROR payment failed for user
2024-03-15 14:23:08 ERROR connection reset by peer
2024-03-15 14:23:09 ERROR order creation failed

工程师想把这些错误关联起来:哪些 timeoutpayment failed 的原因?哪些 connection reset 导致了 order creation failed?——做不到。因为每条日志都是孤立的文本,没有 request_id 把同一个请求的日志串起来,没有 trace_id 把跨服务的调用链串起来,没有 error_code 来区分不同类型的失败。

工程师只能靠时间戳的毫秒级对齐去猜测因果关系——这在单机上偶尔能行,在分布式系统中基本不可能,因为不同机器的时钟有漂移,不同服务的日志到达日志系统的延迟也不同。

1.2 非结构化日志的五个系统性缺陷

缺陷 说明 后果
无法机器解析 每个开发者用自己的格式写日志,字段位置和分隔符不固定 日志采集器无法可靠地提取字段,Logstash 的 Grok 模式要为每种格式写正则
缺少关联标识 没有 trace_idspan_idrequest_id 等标识 无法将同一请求的日志串联,跨服务排障变成猜谜游戏
日志级别滥用 ERRORWARN 的语义没有团队级别的定义 告警系统基于 ERROR 计数触发,误报率高到没人看
敏感信息泄露 开发者习惯性地把请求参数原样打印 用户手机号、身份证号、密码明文出现在日志中
多行日志 异常堆栈、SQL 语句等跨多行 日志采集器按行切分,一条逻辑日志被拆成多条,搜索和聚合失效

1.3 量化对比:结构化 vs 非结构化

为了直观说明差异,看同一个事件的两种日志格式:

非结构化格式:

2024-03-15 14:23:07 ERROR [OrderService] Failed to create order for user 12345, payment timeout after 3000ms, order_id=ORD-98765, items=[SKU001, SKU002]

结构化 JSON 格式:

{
  "timestamp": "2024-03-15T14:23:07.342Z",
  "level": "ERROR",
  "logger": "OrderService",
  "message": "Failed to create order",
  "trace_id": "abc123def456",
  "span_id": "span789",
  "request_id": "req-001",
  "user_id": "12345",
  "order_id": "ORD-98765",
  "error_code": "PAYMENT_TIMEOUT",
  "timeout_ms": 3000,
  "items": ["SKU001", "SKU002"],
  "service": "order-service",
  "instance": "order-service-pod-7b8c9",
  "environment": "production"
}

第一种格式,人类读起来更舒服。第二种格式,机器可以解析每一个字段,支持按 user_id 过滤、按 error_code 聚合、按 trace_id 关联、按 timeout_ms 排序。在日均 2 TB 日志量的系统中,这个差异决定了排障时间是 5 分钟还是 5 小时。


二、结构化日志的设计规范

2.1 日志格式标准

结构化日志(Structured Logging)的核心思想很简单:日志是数据,不是文本。每条日志是一个包含固定字段的数据记录,而不是一个随意拼接的字符串。

一个推荐的结构化日志 JSON Schema:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "required": ["timestamp", "level", "message", "service"],
  "properties": {
    "timestamp": {
      "type": "string",
      "format": "date-time",
      "description": "ISO 8601 格式,UTC 时区,精确到毫秒"
    },
    "level": {
      "type": "string",
      "enum": ["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"]
    },
    "message": {
      "type": "string",
      "description": "简短的人类可读描述,不包含变量值"
    },
    "service": {
      "type": "string",
      "description": "服务名称,与 Kubernetes 的 Deployment 名称一致"
    },
    "instance": {
      "type": "string",
      "description": "实例标识,Kubernetes 环境下为 Pod 名称"
    },
    "trace_id": {
      "type": "string",
      "description": "分布式追踪的 trace ID,遵循 W3C Trace Context 规范"
    },
    "span_id": {
      "type": "string",
      "description": "当前 span 的 ID"
    },
    "request_id": {
      "type": "string",
      "description": "业务请求 ID,由网关层生成"
    },
    "environment": {
      "type": "string",
      "enum": ["development", "staging", "production"]
    }
  }
}

2.2 字段命名规范

字段命名(Field Naming)看似琐碎,但在日志量达到每天数十亿条之后,不一致的命名会导致查询困难和存储浪费。

推荐的命名规则:

  1. 使用 snake_case,不用 camelCasekebab-case——这与 Elasticsearch 的字段映射(Mapping)习惯一致,也是 OpenTelemetry 语义约定(Semantic Conventions)采用的风格
  2. 用有意义的前缀区分不同领域的字段:http_ 前缀用于 HTTP 相关字段(http_methodhttp_status_codehttp_path),db_ 前缀用于数据库相关字段(db_systemdb_statementdb_duration_ms),user_ 前缀用于用户相关字段
  3. 时间相关字段统一使用 _ms 后缀表示毫秒(duration_mstimeout_ms),避免混用秒和毫秒
  4. 布尔字段使用 is_ 前缀(is_retryis_cached

反面案例(同一个系统中的真实字段名混乱):

{
  "responseTime": 234,
  "response_time_ms": 234,
  "resp_time": 234,
  "duration": 0.234,
  "elapsed": "234ms"
}

五个字段表达同一个意思,格式各不相同。Elasticsearch 会为每个字段名创建独立的倒排索引(Inverted Index),导致存储膨胀和查询混乱。

2.3 日志级别的语义定义

日志级别(Log Level)是结构化日志中最容易被滥用的字段。大多数团队对 ERRORWARN 的边界没有明确定义,导致 ERROR 级别的日志中混入了大量不需要处理的信息,告警变成了噪音。

以下是一套经过工程实践检验的日志级别语义定义:

级别 语义 触发条件 是否告警 示例
FATAL 进程即将退出 不可恢复的错误 立即告警(页面通知) 端口被占用无法启动、OOM
ERROR 当前操作失败,需要人工介入 重试耗尽仍然失败、依赖服务不可用 告警(按频率) 支付回调处理失败、数据库连接池耗尽
WARN 当前操作成功但有异常情况 降级处理、重试成功、接近阈值 不告警,但纳入仪表板 缓存击穿走了数据库、重试 2 次后成功
INFO 业务流程的关键节点 请求开始/结束、状态变更、定时任务执行 不告警 订单创建成功、用户登录、配置重载
DEBUG 用于开发和排障的详细信息 中间计算结果、分支判断依据 不告警,生产环境默认关闭 SQL 语句、HTTP 请求/响应体
TRACE 最细粒度的跟踪信息 函数进入/退出、循环迭代 不告警,仅在专项排障时开启 每条消息的处理过程

核心原则:ERROR 意味着需要人工处理。如果一个错误会自动恢复,它最多是 WARN。

这个原则的直接推论是:生产环境中 ERROR 日志的数量应该足够少,少到值班工程师可以逐条查看。如果 ERROR 日志每小时产生上千条,要么是日志级别定义有问题,要么是系统本身有严重的可靠性问题。

2.4 上下文字段的注入方式

结构化日志的价值很大程度上来自上下文字段(Contextual Fields)——trace_idspan_idrequest_id 等。这些字段不应该由开发者在每个日志点手动传递,而应该通过框架层自动注入。

Go 语言的实现方式(使用 slog):

package main

import (
    "context"
    "log/slog"
    "os"
)

type contextKey string

const (
    traceIDKey   contextKey = "trace_id"
    spanIDKey    contextKey = "span_id"
    requestIDKey contextKey = "request_id"
)

// ContextHandler 自动从 context 中提取追踪字段并注入日志
type ContextHandler struct {
    inner slog.Handler
}

func NewContextHandler(inner slog.Handler) *ContextHandler {
    return &ContextHandler{inner: inner}
}

func (h *ContextHandler) Enabled(ctx context.Context, level slog.Level) bool {
    return h.inner.Enabled(ctx, level)
}

func (h *ContextHandler) Handle(ctx context.Context, r slog.Record) error {
    if traceID, ok := ctx.Value(traceIDKey).(string); ok {
        r.AddAttrs(slog.String("trace_id", traceID))
    }
    if spanID, ok := ctx.Value(spanIDKey).(string); ok {
        r.AddAttrs(slog.String("span_id", spanID))
    }
    if requestID, ok := ctx.Value(requestIDKey).(string); ok {
        r.AddAttrs(slog.String("request_id", requestID))
    }
    return h.inner.Handle(ctx, r)
}

func (h *ContextHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
    return NewContextHandler(h.inner.WithAttrs(attrs))
}

func (h *ContextHandler) WithGroup(name string) slog.Handler {
    return NewContextHandler(h.inner.WithGroup(name))
}

func main() {
    jsonHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.LevelInfo,
    })
    logger := slog.New(NewContextHandler(jsonHandler))
    slog.SetDefault(logger)

    // 模拟中间件注入追踪字段
    ctx := context.Background()
    ctx = context.WithValue(ctx, traceIDKey, "abc123def456")
    ctx = context.WithValue(ctx, spanIDKey, "span789")
    ctx = context.WithValue(ctx, requestIDKey, "req-001")

    // 开发者只需关注业务字段,追踪字段自动注入
    slog.InfoContext(ctx, "order created",
        slog.String("order_id", "ORD-98765"),
        slog.Int("item_count", 3),
    )
}

输出:

{
  "time": "2024-03-15T14:23:07.342Z",
  "level": "INFO",
  "msg": "order created",
  "order_id": "ORD-98765",
  "item_count": 3,
  "trace_id": "abc123def456",
  "span_id": "span789",
  "request_id": "req-001"
}

Java 语言的实现方式(使用 SLF4J + MDC):

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.UUID;

// Servlet Filter 自动注入追踪字段到 MDC
public class TraceContextFilter implements Filter {
    private static final String TRACE_ID = "trace_id";
    private static final String REQUEST_ID = "request_id";

    @Override
    public void doFilter(ServletRequest request, ServletResponse response,
                         FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpReq = (HttpServletRequest) request;
        try {
            // 从上游传递的 Header 中获取 trace_id,没有则生成
            String traceId = httpReq.getHeader("X-Trace-Id");
            if (traceId == null || traceId.isEmpty()) {
                traceId = UUID.randomUUID().toString().replace("-", "");
            }
            String requestId = UUID.randomUUID().toString().replace("-", "");

            MDC.put(TRACE_ID, traceId);
            MDC.put(REQUEST_ID, requestId);

            chain.doFilter(request, response);
        } finally {
            MDC.clear();
        }
    }
}

对应的 Logback 配置(JSON 格式输出):

<configuration>
    <appender name="JSON_STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder">
            <includeMdcKeyName>trace_id</includeMdcKeyName>
            <includeMdcKeyName>request_id</includeMdcKeyName>
            <customFields>
                {"service":"order-service","environment":"production"}
            </customFields>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="JSON_STDOUT" />
    </root>
</configuration>

2.5 message 字段的设计原则

message 字段是日志中唯一面向人类阅读的字段,但很多工程师把它当作了格式化字符串的容器。

错误做法:

slog.Info(fmt.Sprintf("User %s created order %s with %d items totaling $%.2f",
    userID, orderID, itemCount, total))

这样写的问题是:message 的内容每条都不同,无法按 message 字段进行聚合统计。你无法回答”过去一小时有多少条订单创建日志”这个问题,因为每条日志的 message 都是独一无二的字符串。

正确做法:

slog.Info("order created",
    slog.String("user_id", userID),
    slog.String("order_id", orderID),
    slog.Int("item_count", itemCount),
    slog.Float64("total_usd", total),
)

message 是一个固定的、可枚举的事件名称,变量值放在独立的字段中。这样你可以按 message = "order created" 过滤,然后按 total_usd 做聚合分析。


三、日志管道的架构设计

3.1 整体架构

日志从应用程序产生到工程师可以查询,需要经过一条完整的管道。一个典型的日志管道包含四个阶段:采集(Collection)、聚合(Aggregation)、存储(Storage)、查询(Query)。

graph LR
    A["应用程序<br/>JSON 日志"] --> B["日志代理<br/>Filebeat / Fluentd<br/>/ Vector"]
    B --> C["消息队列<br/>Kafka / Kinesis"]
    C --> D["日志聚合器<br/>Logstash / Vector<br/>/ Fluentd"]
    D --> E["存储引擎<br/>Elasticsearch<br/>/ Loki / S3"]
    E --> F["查询界面<br/>Kibana / Grafana"]

    style A fill:#e1f5fe
    style B fill:#fff3e0
    style C fill:#fce4ec
    style D fill:#fff3e0
    style E fill:#e8f5e9
    style F fill:#f3e5f5

每个阶段的职责:

阶段 职责 关键能力
采集 从日志源收集日志,发送到下一级 文件尾读(tail)、多行合并、资源占用低
缓冲 削峰填谷,解耦上下游 高吞吐、持久化、背压(Backpressure)
聚合 解析、过滤、转换、路由 字段提取、敏感信息脱敏、日志路由
存储 持久化存储,支持高效查询 全文索引或标签索引、压缩、分层存储
查询 提供搜索、过滤、可视化 全文搜索、字段过滤、时间范围查询、仪表板

3.2 日志采集代理的选型

日志采集代理(Log Agent)运行在每一台机器或每一个 Pod 上,是管道的第一个环节。选型要考虑的核心因素是资源消耗和可靠性。

三大主流代理对比:

维度 Filebeat Fluentd Vector
语言 Go Ruby + C Rust
内存占用(空闲) 约 30 MB 约 60 MB 约 15 MB
内存占用(高负载) 约 100 MB 约 200 MB 约 50 MB
CPU 占用
插件生态 中(Elastic 生态) 丰富(700+ 插件) 中(内置 Transform)
多行合并 支持 支持 支持
背压处理 磁盘队列 内存/文件缓冲 磁盘缓冲
配置复杂度 低(YAML) 中(自定义语法) 低(TOML/YAML)
数据处理能力 弱(主要做采集) 强(过滤、路由) 强(VRL 语言)
社区维护 Elastic 公司 CNCF 项目 Datadog 开源

Filebeat 配置示例(采集 Kubernetes Pod 日志):

filebeat.autodiscover:
  providers:
    - type: kubernetes
      node: ${NODE_NAME}
      hints.enabled: true
      hints.default_config:
        type: container
        paths:
          - /var/log/containers/*-${data.kubernetes.container.id}.log
        # 多行合并:Java 异常堆栈
        multiline.pattern: '^[[:space:]]+(at|\.{3}|Caused by)'
        multiline.negate: false
        multiline.match: after

processors:
  - add_kubernetes_metadata:
      host: ${NODE_NAME}
      matchers:
        - logs_path:
            logs_path: "/var/log/containers/"
  - decode_json_fields:
      fields: ["message"]
      target: ""
      overwrite_keys: true
  - drop_fields:
      fields: ["agent", "ecs", "host.name"]

output.kafka:
  hosts: ["kafka-0:9092", "kafka-1:9092", "kafka-2:9092"]
  topic: "logs-%{[kubernetes.namespace]}"
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  compression: lz4

Vector 配置示例(采集 + 转换 + 路由):

# 数据源:从文件采集
[sources.app_logs]
type = "file"
include = ["/var/log/app/*.log"]
read_from = "beginning"

# 转换:解析 JSON 并添加字段
[transforms.parse_json]
type = "remap"
inputs = ["app_logs"]
source = '''
. = parse_json!(.message)
.hostname = get_hostname!()
.pipeline_ts = now()

# 敏感信息脱敏:手机号中间四位替换为 ****
if exists(.phone) {
    .phone = redact(.phone, filters: [r'\d{3}\d{4}\d{4}'], redactor: {"type": "text", "replacement": "***"})
}
'''

# 转换:按日志级别路由
[transforms.route_by_level]
type = "route"
inputs = ["parse_json"]

[transforms.route_by_level.route]
error = '.level == "ERROR" || .level == "FATAL"'
normal = '.level == "INFO" || .level == "WARN"'
debug = '.level == "DEBUG" || .level == "TRACE"'

# 输出:ERROR 日志发到专用 Topic
[sinks.error_logs]
type = "kafka"
inputs = ["route_by_level.error"]
bootstrap_servers = "kafka:9092"
topic = "logs-error"
encoding.codec = "json"
compression = "lz4"

# 输出:普通日志发到通用 Topic
[sinks.normal_logs]
type = "kafka"
inputs = ["route_by_level.normal"]
bootstrap_servers = "kafka:9092"
topic = "logs-normal"
encoding.codec = "json"
compression = "lz4"

# 输出:DEBUG 日志采样后发送(只保留 10%)
[transforms.sample_debug]
type = "sample"
inputs = ["route_by_level.debug"]
rate = 10

[sinks.debug_logs]
type = "kafka"
inputs = ["sample_debug"]
bootstrap_servers = "kafka:9092"
topic = "logs-debug"
encoding.codec = "json"

3.3 Kafka 作为日志缓冲层

在日志管道中引入消息队列(Message Queue)的核心目的是削峰填谷(Peak Shaving)和解耦上下游。

为什么选 Kafka 而不是其他消息队列?

  1. 吞吐量:单个 Kafka 分区的写入吞吐可以达到 10 MB/s 以上,一个有 30 个分区的 Topic 可以轻松承受 300 MB/s 的日志流量
  2. 持久化:日志写入 Kafka 后会持久化到磁盘,即使下游的 Elasticsearch 宕机,日志也不会丢失
  3. 多消费者:同一份日志可以被多个消费者组消费——一个写入 Elasticsearch 用于搜索,另一个写入 S3 用于长期归档,第三个用于实时异常检测
  4. 背压传导:当下游处理速度跟不上时,日志在 Kafka 中堆积,不会压垮日志代理或应用程序

Kafka Topic 设计建议:

# 按命名空间分 Topic,而不是按服务分
# 按服务分会导致 Topic 数量爆炸(200 个微服务 = 200 个 Topic)
topics:
  - name: logs-production
    partitions: 30
    replication_factor: 3
    config:
      retention.ms: 259200000        # 3 天
      retention.bytes: 107374182400  # 100 GB per partition
      compression.type: lz4
      segment.bytes: 1073741824      # 1 GB per segment
      message.max.bytes: 1048576     # 1 MB 单条上限

  - name: logs-error
    partitions: 10
    replication_factor: 3
    config:
      retention.ms: 604800000        # 7 天,ERROR 日志保留更久
      compression.type: lz4

3.4 背压与流量控制

日志管道的流量控制(Flow Control)是一个容易被忽视但后果严重的问题。当下游处理能力不足时,日志会在管道的某个环节堆积,如果没有适当的背压机制(Backpressure),最终会导致以下问题之一:

  1. OOM:日志在内存中堆积,耗尽进程的内存
  2. 日志丢弃:缓冲区满后新日志被丢弃,且通常是静默丢弃
  3. 应用程序阻塞:日志库的写入操作阻塞了业务线程

背压策略的设计原则:

应用程序 --[异步写入]--> 本地缓冲 --[批量发送]--> Kafka --[消费]--> 聚合器 --[写入]--> 存储
     |                      |                        |                   |
     v                      v                        v                   v
  永不阻塞            磁盘溢出缓冲           分区堆积告警         降级处理
  (最多丢弃)        (内存不够写磁盘)     (消费者组 lag)    (降低写入频率)

核心原则是:日志系统永远不能影响业务系统的可用性。如果必须在丢日志和阻塞业务之间选择,选丢日志。

Go 语言中实现非阻塞日志写入:

package asynclog

import (
    "context"
    "log/slog"
    "sync"
)

type AsyncHandler struct {
    inner  slog.Handler
    ch     chan slog.Record
    wg     sync.WaitGroup
    done   chan struct{}
}

func NewAsyncHandler(inner slog.Handler, bufferSize int) *AsyncHandler {
    h := &AsyncHandler{
        inner: inner,
        ch:    make(chan slog.Record, bufferSize),
        done:  make(chan struct{}),
    }
    h.wg.Add(1)
    go h.drain()
    return h
}

func (h *AsyncHandler) drain() {
    defer h.wg.Done()
    for {
        select {
        case r := <-h.ch:
            _ = h.inner.Handle(context.Background(), r)
        case <-h.done:
            // 排空缓冲区
            for {
                select {
                case r := <-h.ch:
                    _ = h.inner.Handle(context.Background(), r)
                default:
                    return
                }
            }
        }
    }
}

func (h *AsyncHandler) Handle(ctx context.Context, r slog.Record) error {
    select {
    case h.ch <- r:
        // 成功放入缓冲区
    default:
        // 缓冲区满,丢弃日志(不阻塞业务)
        // 这里可以增加一个 metrics 计数器来监控丢弃量
    }
    return nil
}

func (h *AsyncHandler) Enabled(ctx context.Context, level slog.Level) bool {
    return h.inner.Enabled(ctx, level)
}

func (h *AsyncHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
    return &AsyncHandler{inner: h.inner.WithAttrs(attrs), ch: h.ch, done: h.done}
}

func (h *AsyncHandler) WithGroup(name string) slog.Handler {
    return &AsyncHandler{inner: h.inner.WithGroup(name), ch: h.ch, done: h.done}
}

func (h *AsyncHandler) Close() {
    close(h.done)
    h.wg.Wait()
}

四、ELK 与 Loki 的架构对比

日志存储与查询系统有两大主流方案:ELK Stack(Elasticsearch + Logstash + Kibana)和 Grafana Loki。它们的设计哲学截然不同,适用的场景也不同。

4.1 ELK Stack 的架构

ELK Stack 的核心是 Elasticsearch——一个基于 Apache Lucene 的分布式搜索引擎。它的设计思路是全文索引(Full-text Indexing):对日志中的每一个字段建立倒排索引,查询时通过索引快速定位匹配的文档。

graph TB
    subgraph "ELK Stack 架构"
        A["日志代理<br/>Filebeat"] --> B["Logstash<br/>解析 + 转换"]
        B --> C["Elasticsearch Cluster"]
        C --> D["Kibana<br/>查询 + 可视化"]

        subgraph "Elasticsearch Cluster"
            E["Master Node<br/>集群管理"]
            F["Data Node 1<br/>Hot 层"]
            G["Data Node 2<br/>Hot 层"]
            H["Data Node 3<br/>Warm 层"]
            I["Data Node 4<br/>Cold 层"]
        end
    end

    style A fill:#e1f5fe
    style B fill:#fff3e0
    style C fill:#e8f5e9
    style D fill:#f3e5f5

Elasticsearch 的存储模型:

Elasticsearch 将日志存储在索引(Index)中,每个索引被分成多个分片(Shard),每个分片是一个独立的 Lucene 实例。当你写入一条日志时,Elasticsearch 会:

  1. 对每个字段的值进行分词(Tokenization)
  2. 为每个词项(Term)创建倒排索引条目
  3. 存储原始文档(_source 字段)
  4. 为数值字段创建 BKD 树索引

这意味着一条 1 KB 的日志在 Elasticsearch 中可能占用 3-5 KB 的存储空间(包括索引开销)。

Index 生命周期管理(ILM)配置示例:

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "50gb",
            "max_age": "1d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "2d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "logs-s3-repo"
          },
          "set_priority": {
            "priority": 0
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

4.2 Grafana Loki 的架构

Loki 的设计哲学与 Elasticsearch 完全不同。Loki 的口号是”Like Prometheus, but for logs”——它只对标签(Label)建索引,不对日志内容建索引。查询时,Loki 先通过标签定位到一组日志流(Log Stream),然后在这组流中进行暴力搜索(Brute-force Search)。

这听起来像是在走回头路,但这个设计决策背后有清晰的工程考量:

  1. 存储成本低:不建倒排索引,存储膨胀系数接近 1(日志压缩后甚至小于 1)
  2. 写入吞吐高:只需要写入日志块(Chunk)和少量标签索引,不需要实时更新倒排索引
  3. 运维简单:不需要管理 JVM 堆内存、分片均衡、索引合并等复杂的 Elasticsearch 运维任务

Loki 的存储模型:

Loki 将日志组织为流(Stream),每个流由一组标签唯一标识。例如 {namespace="production", service="order-service", level="ERROR"} 就是一个流。同一个流中的日志按时间排序,压缩后存储在块(Chunk)中。

# Loki 配置示例
auth_enabled: false

server:
  http_listen_port: 3100

common:
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks
      rules_directory: /loki/rules
  replication_factor: 1
  ring:
    kvstore:
      store: inmemory

schema_config:
  configs:
    - from: 2024-01-01
      store: tsdb
      object_store: s3
      schema: v13
      index:
        prefix: loki_index_
        period: 24h

storage_config:
  tsdb_shipper:
    active_index_directory: /loki/index
    cache_location: /loki/cache
  aws:
    s3: s3://us-east-1/loki-logs-bucket
    bucketnames: loki-logs-bucket
    region: us-east-1

limits_config:
  retention_period: 720h   # 30 天
  ingestion_rate_mb: 10
  ingestion_burst_size_mb: 20
  max_streams_per_user: 10000
  max_label_name_length: 1024
  max_label_value_length: 2048
  max_label_names_per_series: 30

chunk_store_config:
  chunk_cache_config:
    embedded_cache:
      enabled: true
      max_size_mb: 500

Loki 的标签设计原则:

标签的选择对 Loki 的性能影响巨大。标签的基数(Cardinality)越高,流的数量越多,索引的开销越大。

好的标签(低基数):
  namespace: production / staging / development     (3 个值)
  service:   order-service / user-service / ...     (几十个值)
  level:     ERROR / WARN / INFO / DEBUG            (4 个值)
  env:       prod / staging / dev                   (3 个值)

坏的标签(高基数):
  user_id:     数百万个不同的值
  trace_id:    每个请求一个
  request_id:  每个请求一个
  ip_address:  数万个不同的值
  pod_name:    数百个,且随部署变化

高基数标签应该放在日志内容中,用 LogQL 的过滤表达式查询,而不是作为标签索引。

4.3 ELK vs Loki 对比

维度 ELK Stack Grafana Loki
索引策略 全文倒排索引(每个字段) 仅标签索引,内容不索引
查询模式 任意字段精确/模糊搜索 先按标签过滤,再在流内搜索
查询语言 KQL / Lucene Query LogQL
存储膨胀 3-5 倍(原始日志大小) 约 1 倍(压缩后更小)
写入延迟 秒级(需要构建索引) 亚秒级
查询速度(已索引字段) 毫秒级 不适用
查询速度(全文搜索) 毫秒级到秒级 秒级到分钟级(暴力搜索)
存储后端 本地 SSD / Searchable Snapshot 对象存储(S3、GCS、MinIO)
每 TB/月存储成本 约 200-500 美元(SSD) 约 20-30 美元(S3)
运维复杂度 高(JVM 调优、分片管理) 低(无状态组件 + 对象存储)
成熟度 高(10 年以上) 中(2018 年发布)
适用场景 需要高频、复杂的日志搜索 日志量大但搜索频率不高

成本对比示例(日均 1 TB 日志,保留 30 天):

ELK Stack(自建):
  存储: 1 TB/天 × 4 倍膨胀 × 30 天 = 120 TB SSD
  SSD 成本: 120 TB × $100/TB/月 ≈ $12,000/月
  计算: 6 台 Data Node (32 核 128 GB) ≈ $6,000/月
  总计: ≈ $18,000/月

Grafana Loki + S3:
  存储: 1 TB/天 × 0.5 倍压缩 × 30 天 = 15 TB S3
  S3 成本: 15 TB × $23/TB/月 ≈ $345/月
  计算: 3 台 Ingester (8 核 32 GB) ≈ $1,500/月
  查询: 2 台 Querier (16 核 64 GB) ≈ $1,200/月
  总计: ≈ $3,045/月

Loki 的成本优势来自两个因素:不建倒排索引省了存储膨胀,使用对象存储(Object Storage)代替 SSD 省了单位存储成本。代价是查询速度慢——当你需要在 30 天的日志中搜索一个特定的 user_id 时,ELK 可以在秒级返回结果,Loki 可能需要几十秒甚至几分钟。

4.4 查询语言对比

Kibana KQL 查询示例:

# 查找特定用户的 ERROR 日志
level: "ERROR" AND user_id: "12345" AND service: "order-service"

# 查找支付超时的日志(过去 1 小时)
message: "payment" AND message: "timeout" AND @timestamp >= now-1h

# 聚合:按 error_code 统计过去 24 小时的错误分布
# 使用 Elasticsearch DSL

对应的 Elasticsearch DSL:

{
  "query": {
    "bool": {
      "must": [
        {"term": {"level": "ERROR"}},
        {"range": {"@timestamp": {"gte": "now-24h"}}}
      ]
    }
  },
  "aggs": {
    "error_distribution": {
      "terms": {
        "field": "error_code",
        "size": 20
      }
    }
  }
}

LogQL 查询示例:

# 查找特定用户的 ERROR 日志
{service="order-service", level="ERROR"} |= `user_id` | json | user_id = "12345"

# 查找支付超时的日志(过去 1 小时)
{service="payment-service"} |= `timeout` | json | duration_ms > 3000

# 统计过去 24 小时每个 error_code 的出现次数
sum by (error_code) (
  count_over_time(
    {service="order-service", level="ERROR"} | json [24h]
  )
)

# 计算 P99 延迟(对 duration_ms 字段)
quantile_over_time(0.99,
  {service="order-service"} | json | unwrap duration_ms [5m]
) by (service)

4.5 选型决策框架

选择 ELK 还是 Loki,不应该基于”哪个更好”,而应该基于你的具体场景:

选 ELK 的条件:

选 Loki 的条件:


五、日志采样与动态日志级别

5.1 为什么需要日志采样

一个日均处理 1 亿请求的服务,如果每个请求产生 10 条 INFO 日志,每天就有 10 亿条日志。按每条日志 500 字节计算,日均日志量约 500 GB。这些日志中,99% 是正常请求的常规记录,排障时完全用不到。

日志采样(Log Sampling)的思路是:不是所有日志都值得存储。对于正常请求的 INFO 日志,只保留 1% 或 10% 的样本就足够做统计分析;对于 ERROR 日志,则保留 100%。

5.2 采样策略

固定比率采样(Fixed-rate Sampling):

最简单的策略。每 N 条日志只保留 1 条。适合日志量大且每条日志价值差不多的场景。

package sampling

import (
    "context"
    "log/slog"
    "sync/atomic"
)

// RateSamplingHandler 按固定比率采样日志
type RateSamplingHandler struct {
    inner    slog.Handler
    rate     uint64
    counter  atomic.Uint64
    minLevel slog.Level  // 高于此级别的日志不采样,全部保留
}

func NewRateSamplingHandler(inner slog.Handler, rate int, minLevel slog.Level) *RateSamplingHandler {
    return &RateSamplingHandler{
        inner:    inner,
        rate:     uint64(rate),
        minLevel: minLevel,
    }
}

func (h *RateSamplingHandler) Enabled(ctx context.Context, level slog.Level) bool {
    return h.inner.Enabled(ctx, level)
}

func (h *RateSamplingHandler) Handle(ctx context.Context, r slog.Record) error {
    // WARN 及以上级别不采样
    if r.Level >= h.minLevel {
        return h.inner.Handle(ctx, r)
    }

    // 对 INFO 和 DEBUG 进行采样
    n := h.counter.Add(1)
    if n%h.rate == 0 {
        r.AddAttrs(slog.Uint64("sample_rate", h.rate))
        return h.inner.Handle(ctx, r)
    }
    return nil
}

func (h *RateSamplingHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
    return &RateSamplingHandler{
        inner:    h.inner.WithAttrs(attrs),
        rate:     h.rate,
        minLevel: h.minLevel,
    }
}

func (h *RateSamplingHandler) WithGroup(name string) slog.Handler {
    return &RateSamplingHandler{
        inner:    h.inner.WithGroup(name),
        rate:     h.rate,
        minLevel: h.minLevel,
    }
}

基于请求的采样(Per-request Sampling):

比逐条采样更好的策略。在请求入口(网关或中间件)决定这个请求是否被采样,如果是,整个请求链路的所有日志都保留;如果否,整个链路的日志都丢弃。这样保证了被采样请求的日志完整性。

package sampling

import (
    "context"
    "math/rand"
)

type sampledKey struct{}

// ShouldSample 在请求入口调用,决定是否采样
func ShouldSample(ctx context.Context, rate float64) context.Context {
    sampled := rand.Float64() < rate
    return context.WithValue(ctx, sampledKey{}, sampled)
}

// IsSampled 在日志处理器中调用,判断当前请求是否被采样
func IsSampled(ctx context.Context) bool {
    v, ok := ctx.Value(sampledKey{}).(bool)
    if !ok {
        return true // 没有采样标记的日志默认保留
    }
    return v
}

自适应采样(Adaptive Sampling):

采样率根据当前日志的速率动态调整。当日志速率低于阈值时不采样(全保留),速率超过阈值后逐步提高采样率。这种策略在流量突增时自动降低日志量,在低流量时保留所有日志。

package sampling

import (
    "sync"
    "time"
)

// AdaptiveSampler 根据日志速率动态调整采样率
type AdaptiveSampler struct {
    mu          sync.Mutex
    threshold   int     // 每秒日志条数阈值
    count       int     // 当前窗口的计数
    windowStart time.Time
    currentRate float64 // 当前采样率,1.0 表示全保留
}

func NewAdaptiveSampler(threshold int) *AdaptiveSampler {
    return &AdaptiveSampler{
        threshold:   threshold,
        windowStart: time.Now(),
        currentRate: 1.0,
    }
}

func (s *AdaptiveSampler) ShouldLog() bool {
    s.mu.Lock()
    defer s.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(s.windowStart)

    // 每秒重置窗口
    if elapsed >= time.Second {
        ratePerSec := float64(s.count) / elapsed.Seconds()
        if ratePerSec > float64(s.threshold) {
            s.currentRate = float64(s.threshold) / ratePerSec
        } else {
            s.currentRate = 1.0
        }
        s.count = 0
        s.windowStart = now
    }

    s.count++

    if s.currentRate >= 1.0 {
        return true
    }
    // 使用计数器取模来决定是否采样,避免 rand 的锁竞争
    return s.count%int(1.0/s.currentRate) == 0
}

5.3 动态日志级别

生产环境的默认日志级别通常是 INFO——DEBUG 日志量太大,不适合常态化开启。但排障时经常需要临时开启 DEBUG 级别来获取更详细的信息。

动态日志级别(Dynamic Log Level) 是指在不重启服务的情况下,通过 API 调用或配置变更来实时修改日志级别。

Go 语言实现(基于 slog.LevelVar):

package main

import (
    "encoding/json"
    "log/slog"
    "net/http"
    "os"
)

var logLevel = new(slog.LevelVar) // 默认 INFO

func init() {
    handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
        Level: logLevel,
    })
    slog.SetDefault(slog.New(handler))
}

type levelRequest struct {
    Level string `json:"level"`
}

func handleSetLevel(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPut {
        http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
        return
    }

    var req levelRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request body", http.StatusBadRequest)
        return
    }

    var level slog.Level
    if err := level.UnmarshalText([]byte(req.Level)); err != nil {
        http.Error(w, "invalid log level", http.StatusBadRequest)
        return
    }

    logLevel.Set(level)
    slog.Info("log level changed", slog.String("new_level", req.Level))

    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "level": logLevel.Level().String(),
    })
}

func main() {
    http.HandleFunc("/admin/log-level", handleSetLevel)

    slog.Info("server starting", slog.String("addr", ":8080"))
    http.ListenAndServe(":8080", nil)
}

调用方式:

# 临时开启 DEBUG 级别
curl -X PUT http://order-service:8080/admin/log-level \
  -H "Content-Type: application/json" \
  -d '{"level": "DEBUG"}'

# 恢复为 INFO 级别
curl -X PUT http://order-service:8080/admin/log-level \
  -H "Content-Type: application/json" \
  -d '{"level": "INFO"}'

Java 实现(基于 Spring Boot Actuator):

Spring Boot 内置了日志级别动态调整的 Actuator 端点:

# application.yml
management:
  endpoints:
    web:
      exposure:
        include: loggers
  endpoint:
    loggers:
      enabled: true
# 查看当前日志级别
curl http://order-service:8080/actuator/loggers/com.example.order

# 修改日志级别
curl -X POST http://order-service:8080/actuator/loggers/com.example.order \
  -H "Content-Type: application/json" \
  -d '{"configuredLevel": "DEBUG"}'

5.4 条件日志

条件日志(Conditional Logging)是比动态日志级别更精细的控制手段。它允许你只对特定条件(某个用户、某个请求、某个特征标记)开启详细日志,而不影响其他请求。

package conditional

import (
    "context"
    "log/slog"
)

type debugUserKey struct{}

// WithDebugUser 在中间件中为特定用户开启详细日志
func WithDebugUser(ctx context.Context, userID string, debugUsers map[string]bool) context.Context {
    if debugUsers[userID] {
        return context.WithValue(ctx, debugUserKey{}, true)
    }
    return ctx
}

// ConditionalHandler 根据上下文条件决定是否记录 DEBUG 日志
type ConditionalHandler struct {
    inner slog.Handler
}

func (h *ConditionalHandler) Enabled(ctx context.Context, level slog.Level) bool {
    if level >= slog.LevelInfo {
        return h.inner.Enabled(ctx, level)
    }
    // DEBUG 级别只对标记的请求开启
    if v, ok := ctx.Value(debugUserKey{}).(bool); ok && v {
        return true
    }
    return false
}

func (h *ConditionalHandler) Handle(ctx context.Context, r slog.Record) error {
    return h.inner.Handle(ctx, r)
}

func (h *ConditionalHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
    return &ConditionalHandler{inner: h.inner.WithAttrs(attrs)}
}

func (h *ConditionalHandler) WithGroup(name string) slog.Handler {
    return &ConditionalHandler{inner: h.inner.WithGroup(name)}
}

这种方式在排查特定用户问题时非常有用:你可以在不影响系统整体日志量的情况下,获取这个用户所有请求的完整 DEBUG 日志。

5.5 采样策略的组合应用

实际的生产系统通常会组合使用多种采样策略:

日志产生
    |
    v
[级别判断] ── FATAL/ERROR ──> 100% 保留
    |
    v
[级别判断] ── WARN ──> 100% 保留
    |
    v
[级别判断] ── INFO ──> [请求采样?]
    |                       |
    |                  已采样请求 ──> 100% 保留
    |                       |
    |                  未采样请求 ──> [自适应采样] ──> 1-10% 保留
    |
    v
[级别判断] ── DEBUG ──> [条件日志?]
                            |
                       条件匹配 ──> 100% 保留
                            |
                       条件不匹配 ──> 丢弃

六、工程案例:Uber 的结构化日志迁移

6.1 背景

Uber 的微服务架构在 2015-2016 年间经历了爆炸式增长,服务数量从几十个增长到上千个。早期每个服务使用自己的日志格式和日志库——有的用 Python 的 logging 模块,有的用 Go 的 log 包,有的用 Node.js 的 winston。日志格式五花八门,排障时工程师需要先搞清楚每个服务的日志格式,才能开始分析问题。

Uber 的工程团队在一篇技术博客中描述了他们面临的核心挑战:

  1. 日志量爆炸:每天产生超过数十 TB 的日志,存储和检索成本高昂
  2. 格式不统一:上千个服务使用不同的日志格式,无法统一解析
  3. 排障效率低:跨服务的问题定位平均耗时超过 1 小时
  4. 日志级别混乱:大量服务把正常的业务流程记为 ERROR,导致告警疲劳(Alert Fatigue)

6.2 解决方案:Zap 日志库

Uber 开发了 Zap 日志库来解决这些问题。Zap 的核心设计目标是高性能和结构化输出。

Zap 的技术特点:

  1. 零分配(Zero Allocation):在热路径(Hot Path)上避免内存分配,减少 GC 压力。Zap 的基准测试显示,它比 Go 标准库的 log 包快 4-10 倍
  2. 强类型字段:使用 zap.String()zap.Int() 等强类型构造函数,而不是 interface{} 参数,避免反射开销
  3. 两级 APIzap.Logger(高性能、强类型)和 zap.SugaredLogger(类似 printf 的便利 API,有少量性能损失)
package main

import (
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
    "time"
)

func main() {
    config := zap.Config{
        Level:       zap.NewAtomicLevelAt(zap.InfoLevel),
        Development: false,
        Encoding:    "json",
        EncoderConfig: zapcore.EncoderConfig{
            TimeKey:        "timestamp",
            LevelKey:       "level",
            NameKey:        "logger",
            CallerKey:      "caller",
            MessageKey:     "message",
            StacktraceKey:  "stacktrace",
            LineEnding:     zapcore.DefaultLineEnding,
            EncodeLevel:    zapcore.CapitalLevelEncoder,
            EncodeTime:     zapcore.ISO8601TimeEncoder,
            EncodeDuration: zapcore.MillisDurationEncoder,
            EncodeCaller:   zapcore.ShortCallerEncoder,
        },
        OutputPaths:      []string{"stdout"},
        ErrorOutputPaths: []string{"stderr"},
    }

    logger, _ := config.Build()
    defer logger.Sync()

    // 强类型结构化日志
    logger.Info("order created",
        zap.String("order_id", "ORD-98765"),
        zap.String("user_id", "12345"),
        zap.Int("item_count", 3),
        zap.Float64("total_usd", 99.99),
        zap.Duration("processing_time", 23*time.Millisecond),
        zap.String("trace_id", "abc123def456"),
    )
}

6.3 迁移策略

Uber 的迁移不是一次性完成的,而是分阶段推进:

第一阶段:统一日志库

所有新服务必须使用 Zap,存量服务在下一次重大更新时迁移。通过内部的代码检查工具(Linter)强制检查:新代码中不允许直接调用 fmt.Printlnlog.Printf

第二阶段:统一日志格式

定义了内部的日志格式规范,包括必选字段(timestamplevelmessageservice)和推荐字段(trace_idspan_id)。所有服务的日志输出必须是 JSON 格式。

第三阶段:统一日志管道

将各服务自建的日志收集方案统一到公司级别的日志平台。每个 Kubernetes 节点上运行一个日志代理,负责采集所有 Pod 的 stdout 日志,经过 Kafka 缓冲后写入集中存储。

第四阶段:日志治理

6.4 效果

迁移完成后,Uber 报告了以下改进:


七、日志安全与合规

7.1 PII 脱敏

日志中经常无意间包含个人可识别信息(PII, Personally Identifiable Information),例如手机号、身份证号、邮箱地址、信用卡号等。在 GDPR(通用数据保护条例)和中国的《个人信息保护法》等法规下,日志中的 PII 如果没有适当保护,可能构成合规风险。

脱敏策略应该在日志写入时执行,而不是在存储层执行。 因为一旦 PII 进入日志管道,就很难保证中间的每一个环节(Kafka、临时文件、备份)都不会泄露。

package sanitize

import (
    "regexp"
    "strings"
)

var (
    phonePattern    = regexp.MustCompile(`1[3-9]\d{9}`)
    idCardPattern   = regexp.MustCompile(`\d{17}[\dXx]`)
    emailPattern    = regexp.MustCompile(`[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}`)
    creditCardPattern = regexp.MustCompile(`\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}`)
)

// SanitizeField 对字段值进行脱敏
func SanitizeField(key, value string) string {
    switch {
    case strings.Contains(key, "phone") || strings.Contains(key, "mobile"):
        return maskPhone(value)
    case strings.Contains(key, "id_card") || strings.Contains(key, "identity"):
        return maskIDCard(value)
    case strings.Contains(key, "email"):
        return maskEmail(value)
    case strings.Contains(key, "card_number") || strings.Contains(key, "credit_card"):
        return maskCreditCard(value)
    case strings.Contains(key, "password") || strings.Contains(key, "secret") || strings.Contains(key, "token"):
        return "***REDACTED***"
    }
    return value
}

func maskPhone(phone string) string {
    if len(phone) != 11 {
        return "***"
    }
    return phone[:3] + "****" + phone[7:]
}

func maskIDCard(id string) string {
    if len(id) < 8 {
        return "***"
    }
    return id[:4] + "**********" + id[len(id)-4:]
}

func maskEmail(email string) string {
    parts := strings.Split(email, "@")
    if len(parts) != 2 {
        return "***"
    }
    name := parts[0]
    if len(name) <= 2 {
        return "**@" + parts[1]
    }
    return name[:2] + "***@" + parts[1]
}

func maskCreditCard(card string) string {
    clean := strings.ReplaceAll(strings.ReplaceAll(card, " ", ""), "-", "")
    if len(clean) < 8 {
        return "***"
    }
    return clean[:4] + " **** **** " + clean[len(clean)-4:]
}

Vector 中的脱敏配置:

[transforms.sanitize_pii]
type = "remap"
inputs = ["parse_json"]
source = '''
# 手机号脱敏
if exists(.phone) {
    .phone = redact(.phone, filters: [r'(1[3-9]\d)\d{4}(\d{4})'], redactor: {"type": "text", "replacement": "$1****$2"})
}

# password 字段直接删除
if exists(.password) {
    del(.password)
}

# 请求体中的敏感信息
if exists(.request_body) {
    .request_body = redact(.request_body, filters: [
        r'"password"\s*:\s*"[^"]*"',
        r'"token"\s*:\s*"[^"]*"'
    ], redactor: {"type": "text", "replacement": "\"***REDACTED***\""})
}
'''

7.2 日志防篡改

在安全审计(Security Audit)场景中,日志的完整性至关重要。攻击者在入侵系统后,通常会尝试删除或修改日志来掩盖痕迹。

防篡改的技术手段:

  1. 追加写入(Append-only)存储:使用 S3 的对象锁(Object Lock)或 WORM(Write Once Read Many)存储,确保日志写入后不可修改或删除
{
  "Rules": [
    {
      "ObjectLockEnabled": "Enabled",
      "DefaultRetention": {
        "Mode": "COMPLIANCE",
        "Days": 365
      }
    }
  ]
}
  1. 日志签名链:每条日志包含前一条日志的哈希值,形成一条哈希链(Hash Chain)。任何日志被篡改都会导致链条断裂
package auditlog

import (
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "sync"
    "time"
)

type AuditEntry struct {
    Sequence  uint64 `json:"seq"`
    Timestamp string `json:"ts"`
    Event     string `json:"event"`
    Actor     string `json:"actor"`
    Detail    string `json:"detail"`
    PrevHash  string `json:"prev_hash"`
    Hash      string `json:"hash"`
}

type AuditLogger struct {
    mu       sync.Mutex
    seq      uint64
    prevHash string
}

func NewAuditLogger() *AuditLogger {
    return &AuditLogger{
        prevHash: "0000000000000000000000000000000000000000000000000000000000000000",
    }
}

func (l *AuditLogger) Log(event, actor, detail string) AuditEntry {
    l.mu.Lock()
    defer l.mu.Unlock()

    l.seq++
    entry := AuditEntry{
        Sequence:  l.seq,
        Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
        Event:     event,
        Actor:     actor,
        Detail:    detail,
        PrevHash:  l.prevHash,
    }

    payload, _ := json.Marshal(map[string]interface{}{
        "seq":       entry.Sequence,
        "ts":        entry.Timestamp,
        "event":     entry.Event,
        "actor":     entry.Actor,
        "detail":    entry.Detail,
        "prev_hash": entry.PrevHash,
    })
    hash := sha256.Sum256(payload)
    entry.Hash = hex.EncodeToString(hash[:])
    l.prevHash = entry.Hash

    return entry
}
  1. 独立的日志收集通道:安全审计日志通过独立于业务日志的通道发送到隔离的存储系统,应用程序的运维人员无权访问

7.3 日志访问控制

不是所有工程师都应该能看到所有日志。支付服务的日志可能包含交易细节,用户服务的日志可能包含登录信息——这些日志的访问应该受到限制。

Elasticsearch 的字段级别安全配置:

# elasticsearch-roles.yml
payment_team:
  cluster: []
  indices:
    - names: ["logs-payment-*"]
      privileges: ["read"]
      field_security:
        grant: ["*"]
        except: ["credit_card_number", "cvv"]

general_engineer:
  cluster: []
  indices:
    - names: ["logs-*"]
      privileges: ["read"]
      field_security:
        grant: ["timestamp", "level", "message", "service",
                "trace_id", "span_id", "request_id",
                "http_method", "http_path", "http_status_code",
                "duration_ms", "error_code"]

八、日志运维:轮转、保留与成本管理

8.1 日志轮转

当日志直接写入文件时(容器化之前的传统部署模式),日志轮转(Log Rotation)是必须的——否则日志文件会无限增长直到磁盘写满。

logrotate 配置示例:

/var/log/app/*.log {
    daily
    rotate 7
    compress
    delaycompress
    missingok
    notifempty
    maxsize 500M
    copytruncate
    dateext
    dateformat -%Y%m%d
    postrotate
        # 通知应用程序重新打开日志文件
        /bin/kill -USR1 $(cat /var/run/app.pid 2>/dev/null) 2>/dev/null || true
    endscript
}

在 Kubernetes 环境中,应用程序应该将日志输出到 stdout/stderr,由容器运行时(Container Runtime)负责日志轮转。kubelet 的日志管理配置:

# kubelet 配置
containerLogMaxSize: "100Mi"
containerLogMaxFiles: 5

8.2 保留策略

日志的保留期限(Retention Period)需要在合规要求、排障需求和成本之间取平衡。

日志类型 建议保留期限 存储层级 依据
安全审计日志 1-7 年 对象存储(冷存储) 法规要求(等保、GDPR)
ERROR/FATAL 日志 90 天 热存储 + 温存储 用于事后分析和复盘
INFO 日志 14-30 天 热存储(前 3 天)+ 温存储 排障窗口期
DEBUG 日志 3-7 天 热存储 仅在主动排障时产生
访问日志(Access Log) 30-90 天 温存储 安全分析、流量分析

8.3 成本优化策略

日志是分布式系统中成本增长最快的领域之一。以下是经过实践验证的成本优化策略:

策略一:分层存储(Tiered Storage)

热层(Hot Tier):  最近 3 天的日志,存在 SSD,支持快速查询
   |
   v  (3 天后自动迁移)
温层(Warm Tier): 3-30 天的日志,存在 HDD 或低频 S3,查询速度略慢
   |
   v  (30 天后自动迁移)
冷层(Cold Tier): 30 天-1 年的日志,存在 S3 Glacier,需要解冻才能查询
   |
   v  (1 年后自动删除)
归档层(Archive): 审计日志例外,永久保留在 S3 Glacier Deep Archive

策略二:日志压缩

不同压缩算法的对比:

算法 压缩比 压缩速度 解压速度 适用场景
LZ4 2-3 倍 极快 极快 实时管道(Kafka、Loki)
Snappy 2-3 倍 Elasticsearch Shard
ZSTD 4-6 倍 温/冷存储归档
GZIP 4-6 倍 传统归档

策略三:日志裁剪

在日志管道的聚合阶段,移除不需要的字段来减少存储量:

# Vector 配置:移除高体积低价值的字段
[transforms.trim_fields]
type = "remap"
inputs = ["parse_json"]
source = '''
# 移除大体积的请求/响应体(只保留前 500 字符的摘要)
if exists(.request_body) {
    body = string!(.request_body)
    if length(body) > 500 {
        .request_body_preview = slice!(body, 0, 500)
        del(.request_body)
    }
}

# 移除冗余的 Kubernetes 元数据
del(.kubernetes.labels)
del(.kubernetes.annotations)
del(.kubernetes.node_labels)

# 移除重复的时间戳字段(保留 timestamp,移除 @timestamp 和 time)
del(."@timestamp")
del(.time)
'''

策略四:日志配额

为每个服务设定日志量配额,超过配额的日志自动降级处理:

# 日志配额配置示例
quotas:
  default:
    max_bytes_per_day: 10GB
    action_on_exceed: sample  # sample | drop | alert
    sample_rate: 0.1          # 超额后只保留 10%

  overrides:
    - service: payment-service
      max_bytes_per_day: 50GB  # 支付服务配额更高
      action_on_exceed: alert  # 超额只告警不丢弃

    - service: health-check
      max_bytes_per_day: 1GB
      action_on_exceed: drop   # 健康检查日志超额直接丢弃

九、日志与可观测性的集成

9.1 日志、指标、追踪的关联

日志是可观测性(Observability)三大支柱之一,与指标(Metrics)和追踪(Tracing)共同构成完整的观测体系。三者之间不应该是孤立的,而应该通过共享标识符相互关联。

指标(Metrics)                 追踪(Tracing)
  |                                |
  |  按 service + error_code       |  按 trace_id
  |  关联                          |  关联
  |                                |
  +-------- 日志(Logs) ----------+

关键关联字段:

Grafana 中的关联配置(Loki 数据源):

# Grafana 数据源配置
apiVersion: 1
datasources:
  - name: Loki
    type: loki
    url: http://loki:3100
    jsonData:
      derivedFields:
        # 从日志中提取 trace_id,生成到 Tempo 的跳转链接
        - datasourceUid: tempo
          matcherRegex: '"trace_id":"(\w+)"'
          name: TraceID
          url: '$${__value.raw}'
          urlDisplayLabel: "View Trace"

  - name: Tempo
    type: tempo
    uid: tempo
    url: http://tempo:3200
    jsonData:
      tracesToLogs:
        datasourceUid: loki
        tags: ["service"]
        mappedTags:
          - key: service.name
            value: service
        filterByTraceID: true
        filterBySpanID: true

9.2 从日志中提取指标

某些情况下,从日志中提取指标(Metrics from Logs)比直接在代码中埋点更方便——特别是对于无法修改代码的第三方服务。

Vector 从日志中提取指标的配置:

[transforms.extract_metrics]
type = "log_to_metric"
inputs = ["parse_json"]

[[transforms.extract_metrics.metrics]]
type = "counter"
field = "level"
name = "log_lines_total"
tags.service = "{{service}}"
tags.level = "{{level}}"

[[transforms.extract_metrics.metrics]]
type = "histogram"
field = "duration_ms"
name = "request_duration_ms"
tags.service = "{{service}}"
tags.http_method = "{{http_method}}"

[sinks.prometheus_metrics]
type = "prometheus_exporter"
inputs = ["extract_metrics"]
address = "0.0.0.0:9598"

十、Fluentd 与 Fluent Bit 在 Kubernetes 中的部署

10.1 DaemonSet 模式

在 Kubernetes 环境中,日志采集代理通常以 DaemonSet 模式部署——每个节点运行一个 Pod,负责采集该节点上所有容器的日志。

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: logging
  labels:
    app: fluent-bit
spec:
  selector:
    matchLabels:
      app: fluent-bit
  template:
    metadata:
      labels:
        app: fluent-bit
    spec:
      serviceAccountName: fluent-bit
      tolerations:
        - operator: Exists
      containers:
        - name: fluent-bit
          image: fluent/fluent-bit:3.0
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 256Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
              readOnly: true
            - name: containers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: config
              mountPath: /fluent-bit/etc/
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: containers
          hostPath:
            path: /var/lib/docker/containers
        - name: config
          configMap:
            name: fluent-bit-config

Fluent Bit 配置(ConfigMap):

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: logging
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush         5
        Log_Level     info
        Daemon        off
        Parsers_File  parsers.conf
        HTTP_Server   On
        HTTP_Listen   0.0.0.0
        HTTP_Port     2020

    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*.log
        Parser            cri
        DB                /var/log/flb_kube.db
        Mem_Buf_Limit     10MB
        Skip_Long_Lines   On
        Refresh_Interval  10

    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Merge_Log           On
        Merge_Log_Key       log_processed
        K8S-Logging.Parser  On
        K8S-Logging.Exclude On

    [FILTER]
        Name    modify
        Match   kube.*
        Remove  stream
        Remove  logtag

    [OUTPUT]
        Name            kafka
        Match           kube.*
        Brokers         kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092
        Topics          logs-kubernetes
        Format          json
        Timestamp_Key   @timestamp
        rdkafka.compression.type lz4
        rdkafka.queue.buffering.max.messages 100000
        rdkafka.queue.buffering.max.ms 1000

  parsers.conf: |
    [PARSER]
        Name        cri
        Format      regex
        Regex       ^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<log>.*)$
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L%z

10.2 Sidecar 模式

对于需要特殊日志处理的服务(例如需要对日志做业务级别的转换),可以使用 Sidecar 模式——在业务 Pod 中额外运行一个日志代理容器。

Sidecar 模式的优点是每个服务可以有独立的日志处理逻辑,缺点是资源开销大——每个 Pod 多一个容器。通常建议优先使用 DaemonSet 模式,只对少数特殊服务使用 Sidecar。


十一、日志架构的反模式

在实际的日志系统建设中,以下反模式(Anti-pattern)值得警惕:

11.1 日志作为消息总线

反模式描述: 用日志来传递服务间的业务数据。例如服务 A 把处理结果写入日志,服务 B 通过解析日志来获取这个结果。

问题: 日志系统没有消息队列的投递保证(至少一次、精确一次)。日志可能被采样、延迟、丢失。日志格式一旦变化(例如增加一个字段),下游的解析逻辑就会失败。

正确做法: 使用消息队列(Kafka、RabbitMQ)或事件驱动架构来传递业务数据,日志只用于记录和排障。

11.2 日志过度记录

反模式描述: 把 HTTP 请求的完整请求体和响应体都记录到日志中。

问题: 一个包含图片上传的请求,请求体可能有几十 MB。即使是普通的 API 请求,完整的 JSON 请求/响应体也会让日志量膨胀数倍。更严重的是,请求体中可能包含敏感信息。

正确做法: 只记录请求的关键元数据(方法、路径、状态码、延迟),对请求体只记录前 N 个字符的摘要,或者完全不记录。

11.3 同步日志写入

反模式描述: 日志写入操作在业务请求的关键路径上同步执行——写日志失败会导致业务请求失败。

问题: 当日志系统(文件系统、网络)出现问题时,业务请求被阻塞甚至失败。日志系统的故障不应该影响业务系统的可用性。

正确做法: 日志写入应该是异步的、非阻塞的。使用有界缓冲区,缓冲区满时丢弃日志而不是阻塞。

11.4 标签爆炸(Loki 特有)

反模式描述: 在 Loki 中使用高基数(High Cardinality)的值作为标签——例如 user_idrequest_idip_address

问题: Loki 为每个唯一的标签组合创建一个日志流。如果 user_id 有 100 万个不同的值,就会创建 100 万个流,索引体积膨胀,写入和查询性能急剧下降。

正确做法: 只用低基数的字段作为标签(namespace、service、level、env),高基数的值放在日志内容中,用 LogQL 的 | json | user_id = "12345" 语法查询。

11.5 忽略多行日志

反模式描述: 没有配置多行合并,导致 Java 异常堆栈被拆成几十条独立的日志。

问题: 搜索一个异常时,只能搜到第一行(异常类名和消息),后续的堆栈信息散落在其他日志条目中,无法关联。按 ERROR 计数统计时,一个异常被计为几十个错误。

正确做法: 在日志采集代理中配置多行合并规则。更好的做法是在应用层使用结构化日志——把整个堆栈作为一个字段输出:

logger.Error("request failed",
    zap.Error(err),                    // 自动捕获 error message
    zap.String("stacktrace", string(debug.Stack())), // 堆栈作为字段
)

十二、生产环境日志架构清单

在实际落地日志架构时,可以用以下清单来检查是否覆盖了关键的设计点:

日志产生层:

日志管道层:

日志存储与查询层:

成本管理层:


参考资料

  1. Uber Engineering Blog, “Introducing Zap: Uber’s High-Performance Logging Library for Go”,详细介绍了 Uber 开发高性能结构化日志库的动机和设计决策
  2. Grafana Labs, “Loki: Prometheus-inspired, open source logging for cloud natives”,Loki 的官方文档和设计原理说明
  3. Elastic, “Elasticsearch: The Definitive Guide”,Elasticsearch 的权威指南,包括索引管理和性能调优
  4. OpenTelemetry Documentation, “Logs Specification”,OpenTelemetry 日志规范,定义了日志数据模型和语义约定
  5. Google Cloud Architecture Center, “Logging best practices”,Google Cloud 关于日志最佳实践的架构指南
  6. Charity Majors, Liz Fong-Jones, George Miranda, “Observability Engineering”, O’Reilly, 2022,可观测性工程的系统性著作,涵盖日志、指标、追踪的统一理论
  7. CNCF, “Fluentd Documentation”,Fluentd 官方文档,包含插件生态和性能调优指南
  8. Vector Documentation, “Vector Remap Language (VRL)”,Vector 的数据转换语言参考
  9. Martin Kleppmann, “Designing Data-Intensive Applications”, O’Reilly, 2017,第 11 章关于流处理和日志的讨论
  10. Cloudflare Blog, “How we scaled our log processing pipeline”,Cloudflare 关于大规模日志管道的工程实践分享

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .