土法炼钢兴趣小组的算法知识备份

【存储工程】S3 API 深度解析

文章导航

分类入口
storage
标签入口
#s3#aws#multipart-upload#s3-select#lifecycle#cross-region-replication#boto3

目录

Amazon S3 自 2006 年发布以来,已经成为事实上的对象存储接口标准。无论是公有云(AWS、阿里云 OSS、腾讯云 COS)还是私有部署(MinIO、Ceph RGW),几乎所有对象存储系统都兼容 S3 API。理解 S3 API 的设计细节,不仅是使用 AWS 的前提,更是理解整个对象存储生态的基础。

本文从工程视角深入剖析 S3 API 的核心机制:从请求签名到分片上传,从服务端查询到生命周期管理,从跨区域复制到性能优化。每个主题都附带可运行的代码示例和生产环境中的最佳实践。

一、S3 API 设计哲学

1.1 RESTful 接口设计

S3 是最早大规模采用 RESTful 架构风格(RESTful Architectural Style)的云服务之一。其核心设计原则:

# S3 资源的 URI 结构

# 路径风格(Path-Style)—— 已逐步弃用
https://s3.amazonaws.com/{bucket-name}/{key}

# 虚拟托管风格(Virtual-Hosted Style)—— 推荐
https://{bucket-name}.s3.amazonaws.com/{key}

# 区域特定端点(Region-Specific Endpoint)
https://{bucket-name}.s3.{region}.amazonaws.com/{key}

HTTP 方法与 S3 操作的映射关系:

PUT    -> CreateBucket / PutObject / PutBucketPolicy ...
GET    -> GetObject / ListObjects / GetBucketLocation ...
DELETE -> DeleteObject / DeleteBucket ...
HEAD   -> HeadObject / HeadBucket(检查存在性与元数据)
POST   -> Multipart Upload 相关操作 / DeleteObjects(批量删除)

值得注意的是,S3 并非严格遵循 REST 语义。例如批量删除(Delete Multiple Objects)使用 POST 而非 DELETE,因为 HTTP DELETE 规范不鼓励携带请求体(Request Body)。这是工程实用性对理论纯粹性的妥协。

1.2 从最终一致性到强一致性

S3 的一致性模型(Consistency Model)经历了重要演变:

# 2020 年 12 月之前的一致性模型:
#
# 新对象 PUT          -> 写后读一致性(Read-after-Write Consistency)
# 覆盖 PUT / DELETE   -> 最终一致性(Eventual Consistency)
#
# 这意味着:
# 1. PUT 一个新对象后,立即 GET 可以读到
# 2. 覆盖一个已有对象后,立即 GET 可能读到旧版本
# 3. DELETE 一个对象后,立即 GET 可能仍然读到该对象
# 4. LIST 操作可能不会立即反映最近的 PUT/DELETE

# 2020 年 12 月之后:
#
# 所有操作           -> 强读后写一致性(Strong Read-after-Write Consistency)
#
# 这意味着:
# 1. 任何成功的写操作(PUT/DELETE)之后,
#    后续的读操作(GET/HEAD/LIST)保证返回最新状态
# 2. 无额外成本,无性能损失
# 3. 适用于所有区域、所有存储类别

这个变更对应用开发的影响是深远的。在最终一致性时代,开发者必须设计补偿机制:

# 最终一致性时代的典型补偿模式(已不再需要)
import time

def put_and_verify(s3_client, bucket, key, body, max_retries=5):
    """上传后轮询验证——在强一致性时代已无必要"""
    s3_client.put_object(Bucket=bucket, Key=key, Body=body)

    for attempt in range(max_retries):
        try:
            response = s3_client.head_object(Bucket=bucket, Key=key)
            return response
        except s3_client.exceptions.ClientError:
            time.sleep(2 ** attempt)  # 指数退避

    raise RuntimeError(f"对象 {key}{max_retries} 次重试后仍不可见")

强一致性的实现依赖于 S3 内部的分布式共识机制。AWS 在不改变外部接口的前提下,通过内部架构升级实现了这一保证——这本身就是优秀的接口设计:内部实现自由演进,外部契约只增强不破坏。

1.3 幂等性与错误处理

S3 API 对幂等性(Idempotency)的处理值得关注:

# 天然幂等操作:
PUT Object     -> 重复执行结果相同(覆盖语义)
DELETE Object  -> 删除不存在的对象返回 204(不报错)
GET Object     -> 只读操作,天然幂等

# 非幂等操作需要特殊处理:
Multipart Upload -> 使用 Upload ID 跟踪状态
POST(表单上传)  -> 无天然幂等性,需要客户端去重

S3 的错误响应(Error Response)采用 XML 格式:

<?xml version="1.0" encoding="UTF-8"?>
<Error>
  <Code>NoSuchKey</Code>
  <Message>The specified key does not exist.</Message>
  <Key>my-missing-object.txt</Key>
  <RequestId>tx00000000000000000001-006...</RequestId>
  <HostId>...</HostId>
</Error>

常见的 HTTP 状态码映射:

400 Bad Request         -> MalformedXML / InvalidArgument
403 Forbidden           -> AccessDenied / SignatureDoesNotMatch
404 Not Found           -> NoSuchBucket / NoSuchKey
409 Conflict            -> BucketAlreadyExists / OperationAborted
412 Precondition Failed -> PreconditionFailed(条件请求失败)
416 Range Not Satisfiable -> InvalidRange
500 Internal Server Error -> InternalError(应重试)
503 Service Unavailable   -> SlowDown / ServiceUnavailable(应退避重试)

二、Bucket 与 Object 模型

2.1 Bucket 命名规范

桶(Bucket)是 S3 中的顶层命名空间容器。Bucket 名称在全球范围内唯一(跨所有 AWS 账户):

# Bucket 命名规则:
# 1. 长度:3-63 个字符
# 2. 只能包含小写字母、数字、连字符(-)和点(.)
# 3. 必须以字母或数字开头和结尾
# 4. 不能是 IP 地址格式(如 192.168.1.1)
# 5. 不能以 xn-- 开头(保留前缀)
# 6. 不能以 -s3alias 结尾(保留后缀)
# 7. 同一分区(Partition)内全局唯一

# 好的命名:
my-app-prod-data-us-east-1
company-logs-2025
analytics-raw-events

# 避免的命名:
My-Bucket              # 不允许大写
my..bucket             # 不允许连续的点
-my-bucket             # 不能以连字符开头
192.168.1.1            # 不能是 IP 格式

2.2 区域选择策略

区域(Region)选择直接影响延迟、成本和合规性:

# 区域选择的关键因素:

# 1. 延迟
#    - 选择离用户/计算资源最近的区域
#    - 同区域 EC2 到 S3 的延迟通常 < 5ms
#    - 跨区域延迟可能 50-200ms

# 2. 成本差异(以标准存储每 GB/月为例)
#    us-east-1 (弗吉尼亚)    : $0.023
#    us-west-2 (俄勒冈)      : $0.023
#    eu-west-1 (爱尔兰)      : $0.023
#    ap-northeast-1 (东京)   : $0.025
#    ap-southeast-1 (新加坡) : $0.025
#    cn-north-1 (北京)       : ¥0.173(约 $0.024)

# 3. 合规要求
#    - GDPR:欧洲用户数据可能需要存储在欧洲区域
#    - 中国:中国区域需要单独的 AWS 账户(由光环新网/西云数据运营)
#    - 数据主权:某些行业要求数据不能出境

2.3 扁平命名空间与虚拟目录

S3 采用扁平命名空间(Flat Namespace)——不存在真正的目录层级:

# S3 的存储模型是一个简单的键值映射:
#
# Bucket: my-data
# ┌─────────────────────────────────────────────┐
# │  Key (字符串)              ->  Value (字节流) │
# │─────────────────────────────────────────────│
# │  "photos/2025/01/cat.jpg"  ->  [binary...]  │
# │  "photos/2025/01/dog.jpg"  ->  [binary...]  │
# │  "photos/2025/02/bird.jpg" ->  [binary...]  │
# │  "logs/app.log"            ->  [binary...]  │
# │  "README.md"               ->  [binary...]  │
# └─────────────────────────────────────────────┘
#
# "photos/2025/01/" 不是一个目录——它只是键名的一部分
# 斜杠 "/" 没有任何特殊语义,只是一个普通字符

虚拟目录的模拟通过 LIST 操作的 PrefixDelimiter 参数实现:

import boto3

s3 = boto3.client('s3')

# 模拟 "列出 photos/2025/ 下的子目录和文件"
response = s3.list_objects_v2(
    Bucket='my-data',
    Prefix='photos/2025/',
    Delimiter='/'
)

# CommonPrefixes 包含"子目录"
for prefix in response.get('CommonPrefixes', []):
    print(f"目录: {prefix['Prefix']}")
    # 输出:
    # 目录: photos/2025/01/
    # 目录: photos/2025/02/

# Contents 包含当前"目录"下的直接对象
for obj in response.get('Contents', []):
    print(f"文件: {obj['Key']}, 大小: {obj['Size']}")

2.4 对象元数据

每个对象(Object)由键(Key)、值(Value)和元数据(Metadata)三部分组成:

# 系统定义元数据(System-Defined Metadata):
Content-Type          : MIME 类型,如 application/json
Content-Length        : 对象大小(字节)
Content-Encoding      : 编码方式,如 gzip
Last-Modified         : 最后修改时间
ETag                  : 实体标签,通常是 MD5 哈希
x-amz-storage-class   : 存储类别
x-amz-server-side-encryption : 服务端加密方式

# 用户自定义元数据(User-Defined Metadata):
# 以 x-amz-meta- 为前缀
x-amz-meta-author     : "ltl"
x-amz-meta-project    : "storage-blog"
x-amz-meta-version    : "2.1"

# 限制:
# - 用户自定义元数据总大小不超过 2KB
# - 键名不区分大小写,存储时转为小写
# - 元数据在对象创建后不可单独修改(需要复制对象)

三、核心操作:PUT/GET/DELETE/LIST

3.1 请求签名 V4

AWS 签名版本 4(Signature Version 4,简称 SigV4)是当前标准的请求认证机制。理解签名过程对于调试和实现兼容客户端至关重要:

# SigV4 签名流程:
#
# 步骤 1:创建规范请求(Canonical Request)
# ┌──────────────────────────────────────┐
# │ HTTP Method                          │  GET
# │ Canonical URI                        │  /my-bucket/my-key
# │ Canonical Query String               │  (空或排序后的查询参数)
# │ Canonical Headers                    │  host:...  x-amz-date:...
# │ Signed Headers                       │  host;x-amz-content-sha256;x-amz-date
# │ Hashed Payload                       │  SHA256(请求体)
# └──────────────────────────────────────┘
#
# 步骤 2:创建待签名字符串(String to Sign)
# ┌──────────────────────────────────────┐
# │ Algorithm                            │  AWS4-HMAC-SHA256
# │ RequestDateTime                      │  20250925T120000Z
# │ CredentialScope                      │  20250925/us-east-1/s3/aws4_request
# │ HashedCanonicalRequest               │  SHA256(步骤1的结果)
# └──────────────────────────────────────┘
#
# 步骤 3:计算签名密钥(Signing Key)
# kDate    = HMAC-SHA256("AWS4" + SecretKey, Date)
# kRegion  = HMAC-SHA256(kDate, Region)
# kService = HMAC-SHA256(kRegion, "s3")
# kSigning = HMAC-SHA256(kService, "aws4_request")
#
# 步骤 4:计算最终签名
# Signature = HEX(HMAC-SHA256(kSigning, StringToSign))

用 Python 手动实现签名过程(仅用于理解原理,生产环境请使用 SDK):

import hashlib
import hmac
import datetime


def sign(key, msg):
    """HMAC-SHA256 签名"""
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()


def get_signature_key(secret_key, date_stamp, region, service):
    """派生签名密钥"""
    k_date = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
    k_region = sign(k_date, region)
    k_service = sign(k_region, service)
    k_signing = sign(k_service, 'aws4_request')
    return k_signing


def create_authorization_header(
    method, uri, query_string, headers,
    payload, access_key, secret_key, region, service
):
    """构建完整的 Authorization 头部"""
    t = datetime.datetime.utcnow()
    amz_date = t.strftime('%Y%m%dT%H%M%SZ')
    date_stamp = t.strftime('%Y%m%d')

    # 步骤 1:规范请求
    canonical_headers = ''
    signed_headers_list = sorted(headers.keys())
    for h in signed_headers_list:
        canonical_headers += f'{h}:{headers[h]}\n'
    signed_headers = ';'.join(signed_headers_list)

    payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()

    canonical_request = '\n'.join([
        method, uri, query_string,
        canonical_headers, signed_headers, payload_hash
    ])

    # 步骤 2:待签名字符串
    credential_scope = f'{date_stamp}/{region}/{service}/aws4_request'
    string_to_sign = '\n'.join([
        'AWS4-HMAC-SHA256', amz_date,
        credential_scope,
        hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
    ])

    # 步骤 3-4:计算签名
    signing_key = get_signature_key(secret_key, date_stamp, region, service)
    signature = hmac.new(
        signing_key, string_to_sign.encode('utf-8'), hashlib.sha256
    ).hexdigest()

    # 构建 Authorization 头部
    authorization = (
        f'AWS4-HMAC-SHA256 '
        f'Credential={access_key}/{credential_scope}, '
        f'SignedHeaders={signed_headers}, '
        f'Signature={signature}'
    )
    return authorization

3.2 PutObject 详解

import boto3
import hashlib
import base64

s3 = boto3.client('s3', region_name='us-east-1')

# 基础上传
s3.put_object(
    Bucket='my-bucket',
    Key='data/report.json',
    Body=b'{"status": "ok"}',
    ContentType='application/json',
    # 服务端加密
    ServerSideEncryption='AES256',
    # 用户自定义元数据
    Metadata={
        'author': 'ltl',
        'version': '1.0'
    },
    # 存储类别
    StorageClass='STANDARD',
    # 设置缓存控制(通过 CloudFront 分发时有用)
    CacheControl='max-age=86400',
)

# 带条件写入(防止覆盖已有对象)
try:
    s3.put_object(
        Bucket='my-bucket',
        Key='config/settings.json',
        Body=b'{"debug": false}',
        # 仅当对象不存在时才写入
        IfNoneMatch='*',
    )
except s3.exceptions.ClientError as e:
    if e.response['Error']['Code'] == 'PreconditionFailed':
        print("对象已存在,跳过写入")

# 带内容校验的上传
data = b'important data that must not be corrupted'
md5_hash = base64.b64encode(hashlib.md5(data).digest()).decode()

s3.put_object(
    Bucket='my-bucket',
    Key='critical/data.bin',
    Body=data,
    ContentMD5=md5_hash,  # S3 会验证完整性
)

3.3 GetObject 与条件请求

import boto3

s3 = boto3.client('s3', region_name='us-east-1')

# 基础下载
response = s3.get_object(Bucket='my-bucket', Key='data/report.json')
body = response['Body'].read()
content_type = response['ContentType']
last_modified = response['LastModified']

# 范围请求(Range Request)—— 下载部分内容
response = s3.get_object(
    Bucket='my-bucket',
    Key='large-file.bin',
    Range='bytes=0-1023'  # 只下载前 1KB
)
partial_data = response['Body'].read()

# 条件请求 —— 避免不必要的传输
response = s3.get_object(
    Bucket='my-bucket',
    Key='data/report.json',
    IfModifiedSince=last_modified,  # 仅当修改后才返回
)

# 流式读取大文件(避免一次性加载到内存)
response = s3.get_object(Bucket='my-bucket', Key='huge-file.csv')
stream = response['Body']

chunk_size = 8 * 1024 * 1024  # 8MB
while True:
    chunk = stream.read(chunk_size)
    if not chunk:
        break
    process_chunk(chunk)

stream.close()

3.4 分页列表(ListObjectsV2)

S3 的 LIST 操作每次最多返回 1000 个对象,处理大量对象需要分页:

import boto3

s3 = boto3.client('s3', region_name='us-east-1')


def list_all_objects(bucket, prefix=''):
    """使用分页器列出所有对象"""
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(
        Bucket=bucket,
        Prefix=prefix,
        PaginationConfig={
            'PageSize': 1000,  # 每页最多 1000
        }
    )

    total_count = 0
    total_size = 0

    for page in page_iterator:
        for obj in page.get('Contents', []):
            total_count += 1
            total_size += obj['Size']
            yield obj

    print(f"共 {total_count} 个对象,总大小 {total_size / (1024**3):.2f} GB")


# 使用生成器逐个处理
for obj in list_all_objects('my-bucket', prefix='logs/2025/'):
    if obj['Key'].endswith('.gz'):
        print(f"{obj['Key']} -> {obj['Size']} bytes")

3.5 批量删除

import boto3

s3 = boto3.client('s3', region_name='us-east-1')


def batch_delete(bucket, prefix, dry_run=True):
    """批量删除指定前缀下的所有对象"""
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

    delete_count = 0

    for page in pages:
        objects = page.get('Contents', [])
        if not objects:
            continue

        if dry_run:
            for obj in objects:
                print(f"[DRY RUN] 将删除: {obj['Key']}")
            delete_count += len(objects)
            continue

        # 每次最多删除 1000 个对象
        delete_list = [{'Key': obj['Key']} for obj in objects]
        response = s3.delete_objects(
            Bucket=bucket,
            Delete={
                'Objects': delete_list,
                'Quiet': True,  # 静默模式,只返回错误
            }
        )

        errors = response.get('Errors', [])
        if errors:
            for err in errors:
                print(f"删除失败: {err['Key']} -> {err['Code']}")

        delete_count += len(objects) - len(errors)

    print(f"{'[DRY RUN] ' if dry_run else ''}共删除 {delete_count} 个对象")

四、Multipart Upload 工程实践

4.1 为什么需要分片上传

分片上传(Multipart Upload)是 S3 处理大文件上传的核心机制:

# 单次 PUT 上传的限制:
# - 最大对象大小:5 GB
# - 无法暂停/恢复
# - 网络中断需要完全重传
# - 无法并发上传

# Multipart Upload 的能力:
# - 最大对象大小:5 TB
# - 分片大小:5 MB ~ 5 GB
# - 最多 10,000 个分片
# - 支持断点续传(已上传的分片不需要重传)
# - 支持并发上传(多线程/多进程/多机器)
# - 上传过程中可以开始处理已上传的分片

4.2 分片大小选择

分片大小的选择是一个工程权衡:

# 分片大小选择的权衡:
#
# ┌──────────────┬──────────────┬──────────────┬───────────────────┐
# │   文件大小    │  推荐分片大小 │   分片数量    │      说明          │
# ├──────────────┼──────────────┼──────────────┼───────────────────┤
# │  100 MB      │  16 MB       │  ~7          │  单连接即可        │
# │  1 GB        │  64 MB       │  ~16         │  适度并发          │
# │  10 GB       │  128 MB      │  ~80         │  充分并发          │
# │  100 GB      │  512 MB      │  ~200        │  减少分片管理开销   │
# │  1 TB        │  1 GB        │  ~1024       │  接近分片上限       │
# │  5 TB        │  512 MB      │  ~10000      │  最大文件,最多分片  │
# └──────────────┴──────────────┴──────────────┴───────────────────┘
#
# 选择原则:
# 1. 分片不能太小:每个分片都有 HTTP 开销,太小导致请求数过多
# 2. 分片不能太大:失败重传的代价大,内存占用高
# 3. 总分片数不超过 10,000
# 4. 考虑网络带宽:分片大小应能在合理时间内上传完成
#    - 100 Mbps 网络:64 MB 分片约需 5 秒
#    - 1 Gbps 网络:256 MB 分片约需 2 秒

4.3 完整的分片上传实现

import os
import hashlib
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from botocore.config import Config


class MultipartUploader:
    """生产级分片上传器,支持并发上传与断点续传"""

    def __init__(self, bucket, region='us-east-1', max_concurrency=10):
        self.bucket = bucket
        self.s3 = boto3.client(
            's3',
            region_name=region,
            config=Config(
                max_pool_connections=max_concurrency + 5,
                retries={'max_attempts': 3, 'mode': 'adaptive'},
            )
        )
        self.max_concurrency = max_concurrency

    def upload_file(self, file_path, key, part_size=64 * 1024 * 1024):
        """上传文件,自动选择单次上传或分片上传"""
        file_size = os.path.getsize(file_path)

        # 小于 100MB 使用单次上传
        if file_size < 100 * 1024 * 1024:
            self._simple_upload(file_path, key)
            return

        self._multipart_upload(file_path, key, part_size)

    def _simple_upload(self, file_path, key):
        """单次上传"""
        with open(file_path, 'rb') as f:
            self.s3.put_object(Bucket=self.bucket, Key=key, Body=f)
        print(f"单次上传完成: {key}")

    def _multipart_upload(self, file_path, key, part_size):
        """分片上传"""
        file_size = os.path.getsize(file_path)

        # 步骤 1:初始化分片上传
        response = self.s3.create_multipart_upload(
            Bucket=self.bucket,
            Key=key,
            ContentType=self._guess_content_type(key),
        )
        upload_id = response['UploadId']
        print(f"分片上传已初始化: UploadId={upload_id}")

        try:
            # 步骤 2:并发上传分片
            parts = self._upload_parts(
                file_path, key, upload_id, file_size, part_size
            )

            # 步骤 3:完成分片上传
            self.s3.complete_multipart_upload(
                Bucket=self.bucket,
                Key=key,
                UploadId=upload_id,
                MultipartUpload={
                    'Parts': sorted(parts, key=lambda x: x['PartNumber'])
                },
            )
            print(f"分片上传完成: {key} ({file_size / (1024**3):.2f} GB)")

        except Exception as e:
            # 上传失败时中止,释放已上传的分片(避免产生费用)
            print(f"上传失败,中止分片上传: {e}")
            self.s3.abort_multipart_upload(
                Bucket=self.bucket, Key=key, UploadId=upload_id
            )
            raise

    def _upload_parts(self, file_path, key, upload_id, file_size, part_size):
        """并发上传所有分片"""
        parts = []
        lock = threading.Lock()
        uploaded_bytes = 0

        def upload_single_part(part_number, offset, size):
            nonlocal uploaded_bytes
            with open(file_path, 'rb') as f:
                f.seek(offset)
                data = f.read(size)

            response = self.s3.upload_part(
                Bucket=self.bucket,
                Key=key,
                UploadId=upload_id,
                PartNumber=part_number,
                Body=data,
            )

            with lock:
                uploaded_bytes += size
                progress = (uploaded_bytes / file_size) * 100
                print(f"  分片 {part_number} 完成 ({progress:.1f}%)")

            return {
                'PartNumber': part_number,
                'ETag': response['ETag'],
            }

        # 构建分片任务列表
        tasks = []
        offset = 0
        part_number = 1
        while offset < file_size:
            size = min(part_size, file_size - offset)
            tasks.append((part_number, offset, size))
            offset += size
            part_number += 1

        # 并发执行
        with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
            futures = {
                executor.submit(upload_single_part, pn, off, sz): pn
                for pn, off, sz in tasks
            }
            for future in as_completed(futures):
                result = future.result()
                parts.append(result)

        return parts

    @staticmethod
    def _guess_content_type(key):
        """根据文件扩展名猜测 MIME 类型"""
        import mimetypes
        content_type, _ = mimetypes.guess_type(key)
        return content_type or 'application/octet-stream'


# 使用示例
if __name__ == '__main__':
    uploader = MultipartUploader(
        bucket='my-data-bucket',
        region='us-east-1',
        max_concurrency=8,
    )
    uploader.upload_file(
        file_path='/data/large-dataset.parquet',
        key='datasets/2025/large-dataset.parquet',
        part_size=128 * 1024 * 1024,  # 128MB 分片
    )

4.4 断点续传

断点续传的核心是利用 ListParts 查询已上传的分片:

import boto3


def resume_upload(s3_client, bucket, key, upload_id, file_path, part_size):
    """恢复中断的分片上传"""
    # 查询已上传的分片
    uploaded_parts = {}
    paginator = s3_client.get_paginator('list_parts')
    for page in paginator.paginate(
        Bucket=bucket, Key=key, UploadId=upload_id
    ):
        for part in page.get('Parts', []):
            uploaded_parts[part['PartNumber']] = part['ETag']

    print(f"已上传 {len(uploaded_parts)} 个分片,从断点继续...")

    file_size = os.path.getsize(file_path)
    all_parts = []
    offset = 0
    part_number = 1

    while offset < file_size:
        size = min(part_size, file_size - offset)

        if part_number in uploaded_parts:
            # 该分片已上传,跳过
            all_parts.append({
                'PartNumber': part_number,
                'ETag': uploaded_parts[part_number],
            })
            print(f"  分片 {part_number} 已存在,跳过")
        else:
            # 上传缺失的分片
            with open(file_path, 'rb') as f:
                f.seek(offset)
                data = f.read(size)

            response = s3_client.upload_part(
                Bucket=bucket, Key=key,
                UploadId=upload_id,
                PartNumber=part_number,
                Body=data,
            )
            all_parts.append({
                'PartNumber': part_number,
                'ETag': response['ETag'],
            })
            print(f"  分片 {part_number} 上传完成")

        offset += size
        part_number += 1

    # 完成上传
    s3_client.complete_multipart_upload(
        Bucket=bucket, Key=key, UploadId=upload_id,
        MultipartUpload={
            'Parts': sorted(all_parts, key=lambda x: x['PartNumber'])
        },
    )
    print("断点续传完成")

4.5 清理未完成的分片上传

未完成的分片上传会持续占用存储空间并产生费用:

import boto3
from datetime import datetime, timezone, timedelta


def cleanup_incomplete_uploads(s3_client, bucket, max_age_days=7):
    """清理超过指定天数的未完成分片上传"""
    cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days)

    paginator = s3_client.get_paginator('list_multipart_uploads')
    cleaned = 0

    for page in paginator.paginate(Bucket=bucket):
        for upload in page.get('Uploads', []):
            if upload['Initiated'] < cutoff:
                s3_client.abort_multipart_upload(
                    Bucket=bucket,
                    Key=upload['Key'],
                    UploadId=upload['UploadId'],
                )
                print(f"已中止: {upload['Key']} (UploadId: {upload['UploadId']})")
                cleaned += 1

    print(f"共清理 {cleaned} 个未完成的分片上传")


# 更好的做法:配置 Bucket 生命周期策略自动清理
lifecycle_rule = {
    'Rules': [{
        'ID': 'abort-incomplete-multipart',
        'Status': 'Enabled',
        'Filter': {'Prefix': ''},
        'AbortIncompleteMultipartUpload': {
            'DaysAfterInitiation': 7
        }
    }]
}

五、S3 Select 与 Glacier Select

5.1 S3 Select 原理

S3 Select 允许在服务端使用简单的结构化查询语言(SQL)过滤对象内容,避免将整个对象下载到客户端:

# 传统方式 vs S3 Select:
#
# ┌─────────────────────────────────────────────────────────────┐
# │                    传统方式                                  │
# │                                                             │
# │  S3 存储          网络传输             客户端               │
# │  ┌─────────┐     ──────────────>     ┌──────────────┐      │
# │  │ 10 GB   │     传输 10 GB          │ 下载 10 GB    │      │
# │  │ CSV 文件 │                        │ 解析全部数据   │      │
# │  └─────────┘                         │ 过滤出 100 MB │      │
# │                                      └──────────────┘      │
# └─────────────────────────────────────────────────────────────┘
#
# ┌─────────────────────────────────────────────────────────────┐
# │                    S3 Select                                │
# │                                                             │
# │  S3 存储                网络传输        客户端              │
# │  ┌─────────────┐       ──────────>    ┌──────────┐         │
# │  │ 10 GB CSV   │       仅传输         │ 直接获得  │         │
# │  │ 服务端执行SQL │       100 MB        │ 100 MB   │         │
# │  │ 过滤出结果   │                     │ 结果数据  │         │
# │  └─────────────┘                      └──────────┘         │
# └─────────────────────────────────────────────────────────────┘
#
# 优势:减少 99% 的数据传输,降低延迟和成本

5.2 支持的数据格式与 SQL 语法

# S3 Select 支持的输入格式:
# - CSV(逗号分隔值)
# - JSON(JavaScript 对象表示法)
# - Apache Parquet(列式存储格式)

# 支持的压缩格式(CSV/JSON):
# - GZIP
# - BZIP2
# - 无压缩

# SQL 语法支持:
# - SELECT / FROM / WHERE / LIMIT
# - 聚合函数:COUNT, SUM, AVG, MIN, MAX
# - 字符串函数:TRIM, SUBSTRING, CHAR_LENGTH, LOWER, UPPER
# - 日期函数:EXTRACT, DATE_ADD, DATE_DIFF, UTCNOW
# - 类型转换:CAST
# - 逻辑运算:AND, OR, NOT, IN, BETWEEN, LIKE
# - 不支持:JOIN, GROUP BY, ORDER BY, 子查询

5.3 S3 Select 实战示例

import boto3
import json

s3 = boto3.client('s3', region_name='us-east-1')


def query_csv_with_s3_select(bucket, key, sql_expression):
    """使用 S3 Select 查询 CSV 文件"""
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        ExpressionType='SQL',
        Expression=sql_expression,
        InputSerialization={
            'CSV': {
                'FileHeaderInfo': 'USE',  # 使用第一行作为列名
                'RecordDelimiter': '\n',
                'FieldDelimiter': ',',
                'QuoteCharacter': '"',
            },
            'CompressionType': 'GZIP',
        },
        OutputSerialization={
            'JSON': {
                'RecordDelimiter': '\n',
            },
        },
    )

    records = []
    for event in response['Payload']:
        if 'Records' in event:
            payload = event['Records']['Payload'].decode('utf-8')
            for line in payload.strip().split('\n'):
                if line:
                    records.append(json.loads(line))
        elif 'Stats' in event:
            stats = event['Stats']['Details']
            print(f"扫描字节数: {stats['BytesScanned']:,}")
            print(f"处理字节数: {stats['BytesProcessed']:,}")
            print(f"返回字节数: {stats['BytesReturned']:,}")
            ratio = stats['BytesReturned'] / stats['BytesScanned'] * 100
            print(f"数据缩减比: {ratio:.2f}%")

    return records


# 查询示例:从访问日志中筛选错误请求
results = query_csv_with_s3_select(
    bucket='my-logs',
    key='access-logs/2025/09/25.csv.gz',
    sql_expression="""
        SELECT s.timestamp, s.method, s.path, s.status_code, s.response_time
        FROM s3object s
        WHERE CAST(s.status_code AS INT) >= 500
        AND CAST(s.response_time AS FLOAT) > 1.0
        LIMIT 100
    """,
)

for r in results:
    print(f"{r['timestamp']} {r['method']} {r['path']} -> {r['status_code']}")


def query_parquet_with_s3_select(bucket, key):
    """查询 Parquet 文件——Parquet 的列式特性使 S3 Select 更高效"""
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        ExpressionType='SQL',
        Expression="""
            SELECT user_id, event_type, COUNT(*) as cnt
            FROM s3object s
            WHERE event_type = 'purchase'
        """,
        InputSerialization={
            'Parquet': {},  # Parquet 不需要额外配置
        },
        OutputSerialization={
            'JSON': {'RecordDelimiter': '\n'},
        },
    )

    for event in response['Payload']:
        if 'Records' in event:
            print(event['Records']['Payload'].decode('utf-8'))

5.4 S3 Select 与 Athena 的对比

特性 S3 Select Amazon Athena
查询范围 单个对象 跨多个对象/整个数据集
SQL 能力 基础(无 JOIN/GROUP BY) 完整的 ANSI SQL(Presto/Trino)
支持格式 CSV、JSON、Parquet CSV、JSON、Parquet、ORC、Avro 等
元数据管理 AWS Glue Data Catalog
延迟 毫秒级(流式返回) 秒到分钟级(需要规划查询)
计费方式 扫描数据量 + 返回数据量 扫描数据量($5/TB)
适用场景 单文件快速过滤 大规模数据分析
并发能力 高(每个请求独立) 有并发查询数限制

选择建议:如果只需要从单个对象中提取少量数据,S3 Select 更快更便宜;如果需要跨文件分析或复杂查询,使用 Athena。

5.5 Glacier Select

Glacier Select 将类似的查询能力扩展到归档存储(Glacier)类别。它允许直接查询存储在 Glacier 中的数据,而无需先恢复(Restore)整个对象:

import boto3

s3 = boto3.client('s3', region_name='us-east-1')

# Glacier Select 通过 RestoreObject 发起
s3.restore_object(
    Bucket='my-archive',
    Key='historical-data/2020-transactions.csv.gz',
    RestoreRequest={
        'Type': 'SELECT',
        'Tier': 'Standard',  # Standard: 3-5 小时; Expedited: 1-5 分钟
        'SelectParameters': {
            'InputSerialization': {
                'CSV': {'FileHeaderInfo': 'USE'},
                'CompressionType': 'GZIP',
            },
            'ExpressionType': 'SQL',
            'Expression': """
                SELECT * FROM s3object s
                WHERE s.amount > '10000'
                AND s.currency = 'USD'
            """,
            'OutputSerialization': {
                'CSV': {},
            },
        },
        'OutputLocation': {
            'S3': {
                'BucketName': 'my-results',
                'Prefix': 'glacier-select-output/',
            },
        },
    },
)

六、版本控制与对象锁定

6.1 版本控制

版本控制(Versioning)为对象的每次修改保留独立的版本:

import boto3

s3 = boto3.client('s3', region_name='us-east-1')

# 启用版本控制
s3.put_bucket_versioning(
    Bucket='my-bucket',
    VersioningConfiguration={'Status': 'Enabled'},
)

# 上传同一个 Key 的多个版本
s3.put_object(Bucket='my-bucket', Key='config.json', Body=b'{"v": 1}')
s3.put_object(Bucket='my-bucket', Key='config.json', Body=b'{"v": 2}')
s3.put_object(Bucket='my-bucket', Key='config.json', Body=b'{"v": 3}')

# 列出所有版本
response = s3.list_object_versions(Bucket='my-bucket', Prefix='config.json')
for version in response.get('Versions', []):
    print(
        f"VersionId: {version['VersionId']}, "
        f"IsLatest: {version['IsLatest']}, "
        f"LastModified: {version['LastModified']}, "
        f"Size: {version['Size']}"
    )

# 获取特定版本
response = s3.get_object(
    Bucket='my-bucket',
    Key='config.json',
    VersionId='specific-version-id-here',
)
old_content = response['Body'].read()

# 删除操作在启用版本控制后的行为:
# 不指定 VersionId 的 DELETE -> 插入删除标记(Delete Marker)
# 指定 VersionId 的 DELETE -> 永久删除该版本
s3.delete_object(Bucket='my-bucket', Key='config.json')
# 此时 GET config.json 返回 404,但所有历史版本仍然存在

# 恢复被删除的对象:删除"删除标记"
response = s3.list_object_versions(Bucket='my-bucket', Prefix='config.json')
for marker in response.get('DeleteMarkers', []):
    if marker['IsLatest']:
        s3.delete_object(
            Bucket='my-bucket',
            Key='config.json',
            VersionId=marker['VersionId'],
        )
        print("已恢复对象(删除了删除标记)")
        break

6.2 MFA Delete

多因素认证删除(MFA Delete)要求在永久删除对象版本或更改版本控制状态时提供 MFA 验证码:

# MFA Delete 的作用:
# 1. 防止意外或恶意的永久删除
# 2. 防止未授权更改版本控制配置
# 3. 只能由 Bucket 所有者(根账户)启用
# 4. 不能通过 IAM 策略授权

# 启用 MFA Delete(必须使用根账户凭证):
# aws s3api put-bucket-versioning \
#   --bucket my-bucket \
#   --versioning-configuration Status=Enabled,MFADelete=Enabled \
#   --mfa "arn:aws:iam::123456789012:mfa/root-account-mfa-device 123456"

# 执行需要 MFA 的操作:
# aws s3api delete-object \
#   --bucket my-bucket \
#   --key config.json \
#   --version-id "abc123..." \
#   --mfa "arn:aws:iam::123456789012:mfa/root-account-mfa-device 654321"

6.3 对象锁定(Object Lock)与 WORM

对象锁定(Object Lock)实现了一次写入多次读取(Write Once Read Many,简称 WORM)语义:

import boto3
from datetime import datetime, timezone, timedelta

s3 = boto3.client('s3', region_name='us-east-1')

# 创建启用对象锁定的 Bucket(只能在创建时启用)
s3.create_bucket(
    Bucket='compliance-bucket',
    ObjectLockEnabledForBucket=True,
)

# 配置默认保留策略
s3.put_object_lock_configuration(
    Bucket='compliance-bucket',
    ObjectLockConfiguration={
        'ObjectLockEnabled': 'Enabled',
        'Rule': {
            'DefaultRetention': {
                'Mode': 'COMPLIANCE',  # 或 'GOVERNANCE'
                'Days': 365,           # 保留 365 天
            }
        }
    },
)

# 上传对象并设置保留期
retain_until = datetime.now(timezone.utc) + timedelta(days=2555)  # 7 年
s3.put_object(
    Bucket='compliance-bucket',
    Key='audit/2025/financial-report.pdf',
    Body=open('/data/report.pdf', 'rb'),
    ObjectLockMode='COMPLIANCE',
    ObjectLockRetainUntilDate=retain_until,
)

# COMPLIANCE 模式 vs GOVERNANCE 模式:
#
# COMPLIANCE(合规模式):
# - 任何人(包括根账户)都不能在保留期内删除或覆盖对象
# - 保留期不能缩短,只能延长
# - 适用于法规合规要求(如 SEC 17a-4、HIPAA)
#
# GOVERNANCE(治理模式):
# - 具有特定 IAM 权限的用户可以绕过保留策略
# - 保留期可以缩短或移除
# - 适用于内部数据保护策略

# 设置法律保留(Legal Hold)
s3.put_object_legal_hold(
    Bucket='compliance-bucket',
    Key='audit/2025/financial-report.pdf',
    LegalHold={'Status': 'ON'},
)
# 法律保留独立于保留期,可以随时添加/移除
# 在法律保留生效期间,对象不能被删除(即使保留期已过)

七、生命周期策略

7.1 存储类别与成本对比

S3 提供多种存储类别(Storage Class),适用于不同的访问模式:

存储类别 可用性 最短存储 检索费用 每 GB/月(美元) 典型场景
Standard 99.99% $0.023 频繁访问的活跃数据
Intelligent-Tiering 99.9% $0.023(自动分层) 访问模式不可预测
Standard-IA 99.9% 30 天 每 GB $0.01 $0.0125 不频繁但需快速访问
One Zone-IA 99.5% 30 天 每 GB $0.01 $0.01 可重建的非关键数据
Glacier Instant 99.9% 90 天 每 GB $0.03 $0.004 季度访问的归档
Glacier Flexible 99.99% 90 天 按请求计费 $0.0036 年度访问的归档
Glacier Deep Archive 99.99% 180 天 按请求计费 $0.00099 极少访问的合规归档

7.2 生命周期规则配置

import boto3

s3 = boto3.client('s3', region_name='us-east-1')

lifecycle_config = {
    'Rules': [
        {
            # 规则 1:日志文件的自动分层与过期
            'ID': 'log-lifecycle',
            'Status': 'Enabled',
            'Filter': {'Prefix': 'logs/'},
            'Transitions': [
                {
                    # 30 天后转为低频访问
                    'Days': 30,
                    'StorageClass': 'STANDARD_IA',
                },
                {
                    # 90 天后转为 Glacier Instant Retrieval
                    'Days': 90,
                    'StorageClass': 'GLACIER_IR',
                },
                {
                    # 365 天后转为 Glacier Deep Archive
                    'Days': 365,
                    'StorageClass': 'DEEP_ARCHIVE',
                },
            ],
            'Expiration': {
                # 2555 天(约 7 年)后自动删除
                'Days': 2555,
            },
        },
        {
            # 规则 2:自动清理未完成的分片上传
            'ID': 'abort-incomplete-uploads',
            'Status': 'Enabled',
            'Filter': {'Prefix': ''},
            'AbortIncompleteMultipartUpload': {
                'DaysAfterInitiation': 7,
            },
        },
        {
            # 规则 3:自动清理过期的删除标记
            'ID': 'cleanup-delete-markers',
            'Status': 'Enabled',
            'Filter': {'Prefix': ''},
            'Expiration': {
                'ExpiredObjectDeleteMarker': True,
            },
        },
        {
            # 规则 4:旧版本的生命周期管理
            'ID': 'version-lifecycle',
            'Status': 'Enabled',
            'Filter': {'Prefix': 'data/'},
            'NoncurrentVersionTransitions': [
                {
                    'NoncurrentDays': 30,
                    'StorageClass': 'STANDARD_IA',
                },
                {
                    'NoncurrentDays': 90,
                    'StorageClass': 'GLACIER',
                },
            ],
            'NoncurrentVersionExpiration': {
                'NoncurrentDays': 365,
                # 保留最近 3 个非当前版本
                'NewerNoncurrentVersions': 3,
            },
        },
        {
            # 规则 5:基于标签过滤
            'ID': 'temp-data-cleanup',
            'Status': 'Enabled',
            'Filter': {
                'Tag': {
                    'Key': 'lifecycle',
                    'Value': 'temporary',
                },
            },
            'Expiration': {
                'Days': 1,
            },
        },
    ],
}

s3.put_bucket_lifecycle_configuration(
    Bucket='my-bucket',
    LifecycleConfiguration=lifecycle_config,
)

7.3 Intelligent-Tiering 自动分层

智能分层(Intelligent-Tiering)根据访问模式自动在不同存储层之间移动对象:

# Intelligent-Tiering 的分层结构:
#
# ┌──────────────────────────────────────────────────────────┐
# │              Intelligent-Tiering 分层                    │
# │                                                          │
# │  ┌─────────────────────────────────────────────┐        │
# │  │  频繁访问层(Frequent Access Tier)            │        │
# │  │  - 与 Standard 相同的性能                     │        │
# │  │  - 对象创建时默认进入此层                      │        │
# │  └───────────────┬─────────────────────────────┘        │
# │                  │ 30 天无访问                           │
# │                  v                                       │
# │  ┌─────────────────────────────────────────────┐        │
# │  │  低频访问层(Infrequent Access Tier)         │        │
# │  │  - 成本降低 40%                              │        │
# │  │  - 自动提升:再次访问时回到频繁访问层           │        │
# │  └───────────────┬─────────────────────────────┘        │
# │                  │ 90 天无访问(可选,需启用)            │
# │                  v                                       │
# │  ┌─────────────────────────────────────────────┐        │
# │  │  归档即时访问层(Archive Instant Access)      │        │
# │  │  - 成本降低 68%                              │        │
# │  │  - 毫秒级检索                                │        │
# │  └───────────────┬─────────────────────────────┘        │
# │                  │ 180 天无访问(可选,需启用)           │
# │                  v                                       │
# │  ┌─────────────────────────────────────────────┐        │
# │  │  归档访问层(Archive Access Tier)             │        │
# │  │  - 3-5 小时检索                              │        │
# │  └───────────────┬─────────────────────────────┘        │
# │                  │ 730 天无访问(可选,需启用)           │
# │                  v                                       │
# │  ┌─────────────────────────────────────────────┐        │
# │  │  深度归档访问层(Deep Archive Access Tier)    │        │
# │  │  - 12 小时检索                               │        │
# │  └─────────────────────────────────────────────┘        │
# └──────────────────────────────────────────────────────────┘
#
# 注意:每个对象收取少量的监控和自动分层费用(约 $0.0025/千对象/月)
# 适合大于 128KB 的对象(小对象始终保持在频繁访问层)

八、跨区域复制

8.1 CRR 与 SRR

S3 提供两种复制机制:

import boto3
import json

s3 = boto3.client('s3', region_name='us-east-1')

# 前提条件:
# 1. 源和目标 Bucket 都必须启用版本控制
# 2. 需要创建 IAM 角色授权 S3 执行复制

# 启用源 Bucket 的版本控制
s3.put_bucket_versioning(
    Bucket='source-bucket',
    VersioningConfiguration={'Status': 'Enabled'},
)

# 配置复制规则
replication_config = {
    'Role': 'arn:aws:iam::123456789012:role/s3-replication-role',
    'Rules': [
        {
            'ID': 'replicate-all',
            'Status': 'Enabled',
            'Priority': 1,
            'Filter': {
                'Prefix': '',  # 复制所有对象
            },
            'Destination': {
                'Bucket': 'arn:aws:s3:::destination-bucket',
                'StorageClass': 'STANDARD',
                # 可选:指定不同的存储类别
                # 'StorageClass': 'STANDARD_IA',

                # 可选:跨账户复制需要指定账户 ID
                # 'Account': '987654321098',
                # 'AccessControlTranslation': {
                #     'Owner': 'Destination',
                # },

                # 可选:启用复制时间控制(Replication Time Control)
                'ReplicationTime': {
                    'Status': 'Enabled',
                    'Time': {
                        'Minutes': 15,  # 99.99% 的对象在 15 分钟内复制完成
                    },
                },
                # 可选:启用复制指标
                'Metrics': {
                    'Status': 'Enabled',
                    'EventThreshold': {
                        'Minutes': 15,
                    },
                },
            },
            'DeleteMarkerReplication': {
                'Status': 'Enabled',  # 复制删除标记
            },
        },
        {
            'ID': 'replicate-logs-to-archive',
            'Status': 'Enabled',
            'Priority': 2,
            'Filter': {
                'Prefix': 'logs/',
            },
            'Destination': {
                'Bucket': 'arn:aws:s3:::archive-bucket',
                'StorageClass': 'GLACIER',
            },
            'DeleteMarkerReplication': {
                'Status': 'Disabled',
            },
        },
    ],
}

s3.put_bucket_replication(
    Bucket='source-bucket',
    ReplicationConfiguration=replication_config,
)

8.2 复制延迟与监控

import boto3
from datetime import datetime, timezone

cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')


def get_replication_metrics(bucket_name, rule_id, hours=24):
    """查询复制延迟指标"""
    end_time = datetime.now(timezone.utc)
    start_time = end_time - timedelta(hours=hours)

    metrics = {
        # 等待复制的操作数
        'OperationsPendingReplication': 'Count',
        # 等待复制的数据量(字节)
        'BytesPendingReplication': 'Bytes',
        # 复制延迟(秒)
        'ReplicationLatency': 'Seconds',
        # 失败的复制操作数
        'OperationsFailedReplication': 'Count',
    }

    for metric_name, unit in metrics.items():
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/S3',
            MetricName=metric_name,
            Dimensions=[
                {'Name': 'SourceBucket', 'Value': bucket_name},
                {'Name': 'DestinationBucket', 'Value': 'destination-bucket'},
                {'Name': 'RuleId', 'Value': rule_id},
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=3600,
            Statistics=['Average', 'Maximum'],
        )

        if response['Datapoints']:
            latest = max(response['Datapoints'], key=lambda x: x['Timestamp'])
            print(f"{metric_name}: 平均={latest['Average']:.2f}, 最大={latest['Maximum']:.2f} {unit}")

8.3 双向复制

双向复制(Bi-directional Replication)允许两个 Bucket 互为源和目标,实现多主(Multi-Primary)写入:

# 双向复制架构:
#
#  区域 A (us-east-1)              区域 B (eu-west-1)
#  ┌──────────────────┐           ┌──────────────────┐
#  │  bucket-a        │  -------> │  bucket-b        │
#  │                  │           │                  │
#  │  应用写入 ------+│  <------- │+------ 应用写入   │
#  └──────────────────┘           └──────────────────┘
#
# 关键配置:
# 1. 两个 Bucket 各自配置复制规则指向对方
# 2. 必须启用 "Replica modification sync" 防止复制循环
# 3. S3 通过内部标记识别复制的对象,避免无限循环

# 注意事项:
# - 写入冲突:如果同一个 Key 在两端同时被修改,
#   最终状态取决于哪个写入先到达对方——这是"最后写入胜出"语义
# - 删除传播:需要明确配置是否复制删除标记
# - 延迟:跨大陆复制延迟通常在几秒到几分钟

九、S3 Event Notifications

9.1 事件类型

S3 可以在对象级别操作发生时发送事件通知(Event Notification):

# 支持的事件类型:

# 对象创建事件:
s3:ObjectCreated:*              # 所有创建事件
s3:ObjectCreated:Put            # PUT 上传
s3:ObjectCreated:Post           # POST 上传
s3:ObjectCreated:Copy           # 复制操作
s3:ObjectCreated:CompleteMultipartUpload  # 分片上传完成

# 对象删除事件:
s3:ObjectRemoved:*              # 所有删除事件
s3:ObjectRemoved:Delete         # DELETE 操作
s3:ObjectRemoved:DeleteMarkerCreated  # 删除标记创建

# 对象恢复事件(从 Glacier):
s3:ObjectRestore:Post           # 恢复请求已发起
s3:ObjectRestore:Completed      # 恢复完成
s3:ObjectRestore:Delete         # 恢复的副本已过期

# 其他事件:
s3:ReducedRedundancyLostObject  # RRS 对象丢失
s3:Replication:*                # 复制相关事件
s3:LifecycleTransition          # 生命周期转换
s3:IntelligentTiering           # 智能分层变更
s3:ObjectTagging:*              # 标签变更
s3:ObjectAcl:Put                # ACL 变更

9.2 事件目标配置

import boto3
import json

s3 = boto3.client('s3', region_name='us-east-1')

notification_config = {
    # Lambda 函数触发
    'LambdaFunctionConfigurations': [
        {
            'Id': 'process-new-images',
            'LambdaFunctionArn': (
                'arn:aws:lambda:us-east-1:123456789012'
                ':function:image-processor'
            ),
            'Events': ['s3:ObjectCreated:*'],
            'Filter': {
                'Key': {
                    'FilterRules': [
                        {'Name': 'prefix', 'Value': 'uploads/images/'},
                        {'Name': 'suffix', 'Value': '.jpg'},
                    ]
                }
            },
        },
    ],
    # SQS 队列
    'QueueConfigurations': [
        {
            'Id': 'log-processing-queue',
            'QueueArn': (
                'arn:aws:sqs:us-east-1:123456789012:log-processing'
            ),
            'Events': ['s3:ObjectCreated:*'],
            'Filter': {
                'Key': {
                    'FilterRules': [
                        {'Name': 'prefix', 'Value': 'logs/'},
                        {'Name': 'suffix', 'Value': '.gz'},
                    ]
                }
            },
        },
    ],
    # SNS 主题
    'TopicConfigurations': [
        {
            'Id': 'deletion-alerts',
            'TopicArn': (
                'arn:aws:sns:us-east-1:123456789012:s3-deletion-alerts'
            ),
            'Events': ['s3:ObjectRemoved:*'],
        },
    ],
}

s3.put_bucket_notification_configuration(
    Bucket='my-bucket',
    NotificationConfiguration=notification_config,
)

9.3 Lambda 事件处理示例

import json
import urllib.parse
import boto3

s3_client = boto3.client('s3')


def lambda_handler(event, context):
    """处理 S3 事件的 Lambda 函数"""
    for record in event['Records']:
        # 解析事件信息
        event_name = record['eventName']
        bucket = record['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(
            record['s3']['object']['key'], encoding='utf-8'
        )
        size = record['s3']['object'].get('size', 0)
        etag = record['s3']['object'].get('eTag', '')

        print(f"事件: {event_name}, 桶: {bucket}, 键: {key}, 大小: {size}")

        # 根据事件类型分发处理
        if event_name.startswith('ObjectCreated'):
            handle_object_created(bucket, key, size)
        elif event_name.startswith('ObjectRemoved'):
            handle_object_removed(bucket, key)


def handle_object_created(bucket, key, size):
    """处理新对象创建"""
    if key.endswith(('.jpg', '.jpeg', '.png')):
        # 生成缩略图
        generate_thumbnail(bucket, key)
    elif key.endswith('.csv'):
        # 触发 ETL 流水线
        trigger_etl_pipeline(bucket, key)
    elif key.endswith('.log.gz'):
        # 索引日志数据
        index_log_data(bucket, key)


def handle_object_removed(bucket, key):
    """处理对象删除"""
    # 清理关联资源(如缩略图、索引等)
    cleanup_related_resources(bucket, key)

9.4 EventBridge 集成

Amazon EventBridge 提供了比原生通知更强大的事件路由能力:

import boto3

s3 = boto3.client('s3', region_name='us-east-1')

# 启用 EventBridge 通知
s3.put_bucket_notification_configuration(
    Bucket='my-bucket',
    NotificationConfiguration={
        'EventBridgeConfiguration': {},  # 启用即可,无需额外配置
    },
)

# EventBridge 的优势:
# 1. 支持更丰富的事件过滤(按对象大小、元数据等)
# 2. 支持更多目标(Step Functions、API Gateway、Kinesis 等)
# 3. 支持事件归档和重放
# 4. 支持跨账户事件路由
# 5. 与 CloudTrail 集成,可以基于 API 调用触发

十、S3 性能优化

10.1 性能基准

S3 的设计性能指标:

# S3 性能基准(截至 2024 年):
#
# 每个前缀的请求速率:
# - GET/HEAD:  5,500 次/秒
# - PUT/COPY/POST/DELETE:  3,500 次/秒
#
# 单个对象的最大吞吐量:
# - 单连接 GET:~100 MB/s(取决于实例网络带宽)
# - 多连接并发 Range GET:可达 ~100 Gbps(使用 S3 Express One Zone)
#
# 延迟:
# - 首字节延迟(Time to First Byte):通常 100-200ms
# - S3 Express One Zone:个位数毫秒
#
# 注意:这些是"每前缀"的限制
# 通过合理设计 Key 前缀,可以线性扩展总吞吐量

10.2 前缀分散策略

# 反模式:所有对象使用相同前缀
# 这会导致所有请求落在同一个分区,受单前缀速率限制
#
# 不推荐:
#   data/2025-09-25/file-001.csv
#   data/2025-09-25/file-002.csv
#   data/2025-09-25/file-003.csv
#   ... (所有请求都命中 "data/2025-09-25/" 前缀)
#
# 推荐:在前缀中引入分散因子
#
# 方法 1:使用哈希前缀
#   a1b2/data/2025-09-25/file-001.csv
#   c3d4/data/2025-09-25/file-002.csv
#   e5f6/data/2025-09-25/file-003.csv
#
# 方法 2:反转日期
#   52-90-5202/data/file-001.csv  (2025-09-25 反转)
#   52-90-5202/data/file-002.csv
#
# 方法 3:使用对象 ID 的一部分作为前缀
#   001/data/2025-09-25/file-001.csv
#   002/data/2025-09-25/file-002.csv
#
# 注意:自 2018 年起,S3 已经能够自动处理高请求速率的分区拆分,
# 但对于突发流量场景,预先分散前缀仍然是最佳实践。
import hashlib


def generate_distributed_key(logical_path, num_prefixes=16):
    """生成分散的对象键"""
    hash_value = hashlib.md5(logical_path.encode()).hexdigest()
    prefix = hash_value[:4]  # 取前 4 个字符作为分散前缀
    return f"{prefix}/{logical_path}"


# 示例
keys = [
    generate_distributed_key(f"events/2025/09/25/event-{i:06d}.json")
    for i in range(100)
]

# 结果示例:
# "a3f2/events/2025/09/25/event-000000.json"
# "7b1c/events/2025/09/25/event-000001.json"
# "d4e8/events/2025/09/25/event-000002.json"
# 请求均匀分布在 65536 个前缀中

10.3 并发上传与下载

import os
import boto3
from boto3.s3.transfer import TransferConfig

s3 = boto3.client('s3', region_name='us-east-1')

# boto3 的 TransferConfig 控制并发行为
transfer_config = TransferConfig(
    multipart_threshold=8 * 1024 * 1024,  # 8MB 以上使用分片上传
    max_concurrency=10,                    # 最大并发数
    multipart_chunksize=8 * 1024 * 1024,  # 每个分片 8MB
    num_download_threads=10,               # 下载线程数
    max_io_queue=100,                      # IO 队列深度
    io_chunksize=256 * 1024,              # IO 块大小
)

# 上传
s3.upload_file(
    '/data/large-file.parquet',
    'my-bucket',
    'data/large-file.parquet',
    Config=transfer_config,
)

# 下载
s3.download_file(
    'my-bucket',
    'data/large-file.parquet',
    '/data/downloaded.parquet',
    Config=transfer_config,
)

# 使用 Range GET 并发下载大文件
from concurrent.futures import ThreadPoolExecutor


def parallel_download(bucket, key, output_path, part_size=64*1024*1024, workers=16):
    """并发 Range GET 下载"""
    head = s3.head_object(Bucket=bucket, Key=key)
    file_size = head['ContentLength']

    # 预分配文件
    with open(output_path, 'wb') as f:
        f.seek(file_size - 1)
        f.write(b'\0')

    def download_range(start, end, part_num):
        response = s3.get_object(
            Bucket=bucket, Key=key,
            Range=f'bytes={start}-{end}'
        )
        data = response['Body'].read()
        with open(output_path, 'r+b') as f:
            f.seek(start)
            f.write(data)
        return part_num, len(data)

    tasks = []
    offset = 0
    part_num = 0
    while offset < file_size:
        end = min(offset + part_size - 1, file_size - 1)
        tasks.append((offset, end, part_num))
        offset = end + 1
        part_num += 1

    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [
            executor.submit(download_range, start, end, pn)
            for start, end, pn in tasks
        ]
        for f in futures:
            pn, sz = f.result()
            print(f"分片 {pn} 下载完成: {sz} 字节")

    print(f"下载完成: {output_path} ({file_size / (1024**3):.2f} GB)")

10.4 Transfer Acceleration

传输加速(Transfer Acceleration)利用 Amazon CloudFront 的全球边缘节点加速跨区域上传:

# Transfer Acceleration 工作原理:
#
# 不使用 Transfer Acceleration:
#
#   客户端 (东京) ----[公共互联网, ~200ms]----> S3 (us-east-1)
#
# 使用 Transfer Acceleration:
#
#   客户端 (东京) --[~10ms]--> CloudFront 边缘 (东京)
#                               |
#                        [AWS 骨干网, 优化路由]
#                               |
#                               v
#                          S3 (us-east-1)
#
# 效果:
# - 距离越远,加速效果越明显
# - 大文件(>1GB)受益最大
# - 额外成本:$0.04/GB(上传)、$0.08/GB(下载,部分区域)
# - 使用特殊端点:{bucket}.s3-accelerate.amazonaws.com
import boto3

s3 = boto3.client('s3', region_name='us-east-1')

# 启用 Transfer Acceleration
s3.put_bucket_accelerate_configuration(
    Bucket='my-bucket',
    AccelerateConfiguration={'Status': 'Enabled'},
)

# 使用加速端点上传
s3_accel = boto3.client(
    's3',
    region_name='us-east-1',
    config=boto3.session.Config(
        s3={'use_accelerate_endpoint': True}
    ),
)

# 上传——SDK 自动使用加速端点
s3_accel.upload_file(
    '/data/large-file.tar.gz',
    'my-bucket',
    'uploads/large-file.tar.gz',
)

# 测试加速效果(AWS 提供的速度比较工具):
# https://s3-accelerate-speedtest.s3-accelerate.amazonaws.com/en/accelerate-speed-comparsion.html

10.5 性能优化清单

# S3 性能优化清单:
#
# 上传优化:
# [ ] 大文件使用 Multipart Upload(>100MB)
# [ ] 合理设置分片大小(考虑网络带宽和重试成本)
# [ ] 使用并发上传(ThreadPoolExecutor / multiprocessing)
# [ ] 长距离传输考虑 Transfer Acceleration
# [ ] 配置 SDK 的重试策略为 adaptive 模式
# [ ] 设置合理的连接池大小(max_pool_connections)
#
# 下载优化:
# [ ] 大文件使用 Range GET 并发下载
# [ ] 仅下载需要的字段(S3 Select)
# [ ] 使用条件请求避免不必要的传输(If-None-Match / If-Modified-Since)
# [ ] 对于热数据,在前端使用 CloudFront 缓存
#
# 列表操作优化:
# [ ] 使用 ListObjectsV2(而非 V1)
# [ ] 使用 Delimiter 和 Prefix 缩小扫描范围
# [ ] 如果只需要判断对象是否存在,使用 HeadObject
# [ ] 对于超大规模列表,考虑 S3 Inventory
#
# 架构优化:
# [ ] 分散 Key 前缀,避免热点
# [ ] 选择合适的存储类别
# [ ] 配置生命周期策略自动转储
# [ ] 与计算资源部署在同一区域
# [ ] 使用 VPC Endpoint 避免公网流量费用

十一、参考文献

  1. Amazon S3 官方文档: https://docs.aws.amazon.com/s3/
  2. Amazon S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/
  3. AWS Signature Version 4 文档: https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html
  4. S3 Strong Consistency 公告: https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
  5. S3 性能优化指南: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
  6. S3 Multipart Upload 文档: https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
  7. S3 Select 用户指南: https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html
  8. S3 Object Lock 文档: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html
  9. S3 Replication 文档: https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication.html
  10. boto3 S3 Client 文档: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
  11. S3 存储类别对比: https://aws.amazon.com/s3/storage-classes/
  12. S3 Transfer Acceleration: https://docs.aws.amazon.com/AmazonS3/latest/userguide/transfer-acceleration.html

上一篇: 对象存储模型:从文件到对象的范式转变 下一篇: MinIO 架构与实现

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2025-10-19 · storage

【存储工程】云对象存储内部架构

深入剖析云对象存储——S3的11个9持久性实现、元数据-索引-存储三层架构、跨AZ复制策略、存储类别实现差异与成本模型分析

2026-04-22 · db / storage

数据库内核实验索引

汇总本站数据库内核与存储引擎实验文章,重点覆盖从零实现 LSM-Tree 及其工程权衡。

2026-04-22 · storage

存储工程索引

汇总本站存储工程系列文章,覆盖 HDD、SSD、NVMe、持久内存、索引结构、压缩、分布式存储与对象存储。


By .