Amazon S3 自 2006 年发布以来,已经成为事实上的对象存储接口标准。无论是公有云(AWS、阿里云 OSS、腾讯云 COS)还是私有部署(MinIO、Ceph RGW),几乎所有对象存储系统都兼容 S3 API。理解 S3 API 的设计细节,不仅是使用 AWS 的前提,更是理解整个对象存储生态的基础。
本文从工程视角深入剖析 S3 API 的核心机制:从请求签名到分片上传,从服务端查询到生命周期管理,从跨区域复制到性能优化。每个主题都附带可运行的代码示例和生产环境中的最佳实践。
一、S3 API 设计哲学
1.1 RESTful 接口设计
S3 是最早大规模采用 RESTful 架构风格(RESTful Architectural Style)的云服务之一。其核心设计原则:
- 资源(Resource)由统一资源标识符(URI)唯一标识
- 使用标准超文本传输协议(HTTP)方法表达操作语义
- 无状态(Stateless)——每个请求包含所有必要信息
- 通过 HTTP 头部(Header)传递元数据(Metadata)
# S3 资源的 URI 结构
# 路径风格(Path-Style)—— 已逐步弃用
https://s3.amazonaws.com/{bucket-name}/{key}
# 虚拟托管风格(Virtual-Hosted Style)—— 推荐
https://{bucket-name}.s3.amazonaws.com/{key}
# 区域特定端点(Region-Specific Endpoint)
https://{bucket-name}.s3.{region}.amazonaws.com/{key}
HTTP 方法与 S3 操作的映射关系:
PUT -> CreateBucket / PutObject / PutBucketPolicy ...
GET -> GetObject / ListObjects / GetBucketLocation ...
DELETE -> DeleteObject / DeleteBucket ...
HEAD -> HeadObject / HeadBucket(检查存在性与元数据)
POST -> Multipart Upload 相关操作 / DeleteObjects(批量删除)
值得注意的是,S3 并非严格遵循 REST 语义。例如批量删除(Delete Multiple Objects)使用 POST 而非 DELETE,因为 HTTP DELETE 规范不鼓励携带请求体(Request Body)。这是工程实用性对理论纯粹性的妥协。
1.2 从最终一致性到强一致性
S3 的一致性模型(Consistency Model)经历了重要演变:
# 2020 年 12 月之前的一致性模型:
#
# 新对象 PUT -> 写后读一致性(Read-after-Write Consistency)
# 覆盖 PUT / DELETE -> 最终一致性(Eventual Consistency)
#
# 这意味着:
# 1. PUT 一个新对象后,立即 GET 可以读到
# 2. 覆盖一个已有对象后,立即 GET 可能读到旧版本
# 3. DELETE 一个对象后,立即 GET 可能仍然读到该对象
# 4. LIST 操作可能不会立即反映最近的 PUT/DELETE
# 2020 年 12 月之后:
#
# 所有操作 -> 强读后写一致性(Strong Read-after-Write Consistency)
#
# 这意味着:
# 1. 任何成功的写操作(PUT/DELETE)之后,
# 后续的读操作(GET/HEAD/LIST)保证返回最新状态
# 2. 无额外成本,无性能损失
# 3. 适用于所有区域、所有存储类别
这个变更对应用开发的影响是深远的。在最终一致性时代,开发者必须设计补偿机制:
# 最终一致性时代的典型补偿模式(已不再需要)
import time
def put_and_verify(s3_client, bucket, key, body, max_retries=5):
"""上传后轮询验证——在强一致性时代已无必要"""
s3_client.put_object(Bucket=bucket, Key=key, Body=body)
for attempt in range(max_retries):
try:
response = s3_client.head_object(Bucket=bucket, Key=key)
return response
except s3_client.exceptions.ClientError:
time.sleep(2 ** attempt) # 指数退避
raise RuntimeError(f"对象 {key} 在 {max_retries} 次重试后仍不可见")强一致性的实现依赖于 S3 内部的分布式共识机制。AWS 在不改变外部接口的前提下,通过内部架构升级实现了这一保证——这本身就是优秀的接口设计:内部实现自由演进,外部契约只增强不破坏。
1.3 幂等性与错误处理
S3 API 对幂等性(Idempotency)的处理值得关注:
# 天然幂等操作:
PUT Object -> 重复执行结果相同(覆盖语义)
DELETE Object -> 删除不存在的对象返回 204(不报错)
GET Object -> 只读操作,天然幂等
# 非幂等操作需要特殊处理:
Multipart Upload -> 使用 Upload ID 跟踪状态
POST(表单上传) -> 无天然幂等性,需要客户端去重
S3 的错误响应(Error Response)采用 XML 格式:
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchKey</Code>
<Message>The specified key does not exist.</Message>
<Key>my-missing-object.txt</Key>
<RequestId>tx00000000000000000001-006...</RequestId>
<HostId>...</HostId>
</Error>常见的 HTTP 状态码映射:
400 Bad Request -> MalformedXML / InvalidArgument
403 Forbidden -> AccessDenied / SignatureDoesNotMatch
404 Not Found -> NoSuchBucket / NoSuchKey
409 Conflict -> BucketAlreadyExists / OperationAborted
412 Precondition Failed -> PreconditionFailed(条件请求失败)
416 Range Not Satisfiable -> InvalidRange
500 Internal Server Error -> InternalError(应重试)
503 Service Unavailable -> SlowDown / ServiceUnavailable(应退避重试)
二、Bucket 与 Object 模型
2.1 Bucket 命名规范
桶(Bucket)是 S3 中的顶层命名空间容器。Bucket 名称在全球范围内唯一(跨所有 AWS 账户):
# Bucket 命名规则:
# 1. 长度:3-63 个字符
# 2. 只能包含小写字母、数字、连字符(-)和点(.)
# 3. 必须以字母或数字开头和结尾
# 4. 不能是 IP 地址格式(如 192.168.1.1)
# 5. 不能以 xn-- 开头(保留前缀)
# 6. 不能以 -s3alias 结尾(保留后缀)
# 7. 同一分区(Partition)内全局唯一
# 好的命名:
my-app-prod-data-us-east-1
company-logs-2025
analytics-raw-events
# 避免的命名:
My-Bucket # 不允许大写
my..bucket # 不允许连续的点
-my-bucket # 不能以连字符开头
192.168.1.1 # 不能是 IP 格式
2.2 区域选择策略
区域(Region)选择直接影响延迟、成本和合规性:
# 区域选择的关键因素:
# 1. 延迟
# - 选择离用户/计算资源最近的区域
# - 同区域 EC2 到 S3 的延迟通常 < 5ms
# - 跨区域延迟可能 50-200ms
# 2. 成本差异(以标准存储每 GB/月为例)
# us-east-1 (弗吉尼亚) : $0.023
# us-west-2 (俄勒冈) : $0.023
# eu-west-1 (爱尔兰) : $0.023
# ap-northeast-1 (东京) : $0.025
# ap-southeast-1 (新加坡) : $0.025
# cn-north-1 (北京) : ¥0.173(约 $0.024)
# 3. 合规要求
# - GDPR:欧洲用户数据可能需要存储在欧洲区域
# - 中国:中国区域需要单独的 AWS 账户(由光环新网/西云数据运营)
# - 数据主权:某些行业要求数据不能出境
2.3 扁平命名空间与虚拟目录
S3 采用扁平命名空间(Flat Namespace)——不存在真正的目录层级:
# S3 的存储模型是一个简单的键值映射:
#
# Bucket: my-data
# ┌─────────────────────────────────────────────┐
# │ Key (字符串) -> Value (字节流) │
# │─────────────────────────────────────────────│
# │ "photos/2025/01/cat.jpg" -> [binary...] │
# │ "photos/2025/01/dog.jpg" -> [binary...] │
# │ "photos/2025/02/bird.jpg" -> [binary...] │
# │ "logs/app.log" -> [binary...] │
# │ "README.md" -> [binary...] │
# └─────────────────────────────────────────────┘
#
# "photos/2025/01/" 不是一个目录——它只是键名的一部分
# 斜杠 "/" 没有任何特殊语义,只是一个普通字符
虚拟目录的模拟通过 LIST 操作的 Prefix 和
Delimiter 参数实现:
import boto3
s3 = boto3.client('s3')
# 模拟 "列出 photos/2025/ 下的子目录和文件"
response = s3.list_objects_v2(
Bucket='my-data',
Prefix='photos/2025/',
Delimiter='/'
)
# CommonPrefixes 包含"子目录"
for prefix in response.get('CommonPrefixes', []):
print(f"目录: {prefix['Prefix']}")
# 输出:
# 目录: photos/2025/01/
# 目录: photos/2025/02/
# Contents 包含当前"目录"下的直接对象
for obj in response.get('Contents', []):
print(f"文件: {obj['Key']}, 大小: {obj['Size']}")2.4 对象元数据
每个对象(Object)由键(Key)、值(Value)和元数据(Metadata)三部分组成:
# 系统定义元数据(System-Defined Metadata):
Content-Type : MIME 类型,如 application/json
Content-Length : 对象大小(字节)
Content-Encoding : 编码方式,如 gzip
Last-Modified : 最后修改时间
ETag : 实体标签,通常是 MD5 哈希
x-amz-storage-class : 存储类别
x-amz-server-side-encryption : 服务端加密方式
# 用户自定义元数据(User-Defined Metadata):
# 以 x-amz-meta- 为前缀
x-amz-meta-author : "ltl"
x-amz-meta-project : "storage-blog"
x-amz-meta-version : "2.1"
# 限制:
# - 用户自定义元数据总大小不超过 2KB
# - 键名不区分大小写,存储时转为小写
# - 元数据在对象创建后不可单独修改(需要复制对象)
三、核心操作:PUT/GET/DELETE/LIST
3.1 请求签名 V4
AWS 签名版本 4(Signature Version 4,简称 SigV4)是当前标准的请求认证机制。理解签名过程对于调试和实现兼容客户端至关重要:
# SigV4 签名流程:
#
# 步骤 1:创建规范请求(Canonical Request)
# ┌──────────────────────────────────────┐
# │ HTTP Method │ GET
# │ Canonical URI │ /my-bucket/my-key
# │ Canonical Query String │ (空或排序后的查询参数)
# │ Canonical Headers │ host:... x-amz-date:...
# │ Signed Headers │ host;x-amz-content-sha256;x-amz-date
# │ Hashed Payload │ SHA256(请求体)
# └──────────────────────────────────────┘
#
# 步骤 2:创建待签名字符串(String to Sign)
# ┌──────────────────────────────────────┐
# │ Algorithm │ AWS4-HMAC-SHA256
# │ RequestDateTime │ 20250925T120000Z
# │ CredentialScope │ 20250925/us-east-1/s3/aws4_request
# │ HashedCanonicalRequest │ SHA256(步骤1的结果)
# └──────────────────────────────────────┘
#
# 步骤 3:计算签名密钥(Signing Key)
# kDate = HMAC-SHA256("AWS4" + SecretKey, Date)
# kRegion = HMAC-SHA256(kDate, Region)
# kService = HMAC-SHA256(kRegion, "s3")
# kSigning = HMAC-SHA256(kService, "aws4_request")
#
# 步骤 4:计算最终签名
# Signature = HEX(HMAC-SHA256(kSigning, StringToSign))
用 Python 手动实现签名过程(仅用于理解原理,生产环境请使用 SDK):
import hashlib
import hmac
import datetime
def sign(key, msg):
"""HMAC-SHA256 签名"""
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def get_signature_key(secret_key, date_stamp, region, service):
"""派生签名密钥"""
k_date = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
k_region = sign(k_date, region)
k_service = sign(k_region, service)
k_signing = sign(k_service, 'aws4_request')
return k_signing
def create_authorization_header(
method, uri, query_string, headers,
payload, access_key, secret_key, region, service
):
"""构建完整的 Authorization 头部"""
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d')
# 步骤 1:规范请求
canonical_headers = ''
signed_headers_list = sorted(headers.keys())
for h in signed_headers_list:
canonical_headers += f'{h}:{headers[h]}\n'
signed_headers = ';'.join(signed_headers_list)
payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
canonical_request = '\n'.join([
method, uri, query_string,
canonical_headers, signed_headers, payload_hash
])
# 步骤 2:待签名字符串
credential_scope = f'{date_stamp}/{region}/{service}/aws4_request'
string_to_sign = '\n'.join([
'AWS4-HMAC-SHA256', amz_date,
credential_scope,
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
])
# 步骤 3-4:计算签名
signing_key = get_signature_key(secret_key, date_stamp, region, service)
signature = hmac.new(
signing_key, string_to_sign.encode('utf-8'), hashlib.sha256
).hexdigest()
# 构建 Authorization 头部
authorization = (
f'AWS4-HMAC-SHA256 '
f'Credential={access_key}/{credential_scope}, '
f'SignedHeaders={signed_headers}, '
f'Signature={signature}'
)
return authorization3.2 PutObject 详解
import boto3
import hashlib
import base64
s3 = boto3.client('s3', region_name='us-east-1')
# 基础上传
s3.put_object(
Bucket='my-bucket',
Key='data/report.json',
Body=b'{"status": "ok"}',
ContentType='application/json',
# 服务端加密
ServerSideEncryption='AES256',
# 用户自定义元数据
Metadata={
'author': 'ltl',
'version': '1.0'
},
# 存储类别
StorageClass='STANDARD',
# 设置缓存控制(通过 CloudFront 分发时有用)
CacheControl='max-age=86400',
)
# 带条件写入(防止覆盖已有对象)
try:
s3.put_object(
Bucket='my-bucket',
Key='config/settings.json',
Body=b'{"debug": false}',
# 仅当对象不存在时才写入
IfNoneMatch='*',
)
except s3.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'PreconditionFailed':
print("对象已存在,跳过写入")
# 带内容校验的上传
data = b'important data that must not be corrupted'
md5_hash = base64.b64encode(hashlib.md5(data).digest()).decode()
s3.put_object(
Bucket='my-bucket',
Key='critical/data.bin',
Body=data,
ContentMD5=md5_hash, # S3 会验证完整性
)3.3 GetObject 与条件请求
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
# 基础下载
response = s3.get_object(Bucket='my-bucket', Key='data/report.json')
body = response['Body'].read()
content_type = response['ContentType']
last_modified = response['LastModified']
# 范围请求(Range Request)—— 下载部分内容
response = s3.get_object(
Bucket='my-bucket',
Key='large-file.bin',
Range='bytes=0-1023' # 只下载前 1KB
)
partial_data = response['Body'].read()
# 条件请求 —— 避免不必要的传输
response = s3.get_object(
Bucket='my-bucket',
Key='data/report.json',
IfModifiedSince=last_modified, # 仅当修改后才返回
)
# 流式读取大文件(避免一次性加载到内存)
response = s3.get_object(Bucket='my-bucket', Key='huge-file.csv')
stream = response['Body']
chunk_size = 8 * 1024 * 1024 # 8MB
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
process_chunk(chunk)
stream.close()3.4 分页列表(ListObjectsV2)
S3 的 LIST 操作每次最多返回 1000 个对象,处理大量对象需要分页:
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
def list_all_objects(bucket, prefix=''):
"""使用分页器列出所有对象"""
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(
Bucket=bucket,
Prefix=prefix,
PaginationConfig={
'PageSize': 1000, # 每页最多 1000
}
)
total_count = 0
total_size = 0
for page in page_iterator:
for obj in page.get('Contents', []):
total_count += 1
total_size += obj['Size']
yield obj
print(f"共 {total_count} 个对象,总大小 {total_size / (1024**3):.2f} GB")
# 使用生成器逐个处理
for obj in list_all_objects('my-bucket', prefix='logs/2025/'):
if obj['Key'].endswith('.gz'):
print(f"{obj['Key']} -> {obj['Size']} bytes")3.5 批量删除
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
def batch_delete(bucket, prefix, dry_run=True):
"""批量删除指定前缀下的所有对象"""
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
delete_count = 0
for page in pages:
objects = page.get('Contents', [])
if not objects:
continue
if dry_run:
for obj in objects:
print(f"[DRY RUN] 将删除: {obj['Key']}")
delete_count += len(objects)
continue
# 每次最多删除 1000 个对象
delete_list = [{'Key': obj['Key']} for obj in objects]
response = s3.delete_objects(
Bucket=bucket,
Delete={
'Objects': delete_list,
'Quiet': True, # 静默模式,只返回错误
}
)
errors = response.get('Errors', [])
if errors:
for err in errors:
print(f"删除失败: {err['Key']} -> {err['Code']}")
delete_count += len(objects) - len(errors)
print(f"{'[DRY RUN] ' if dry_run else ''}共删除 {delete_count} 个对象")四、Multipart Upload 工程实践
4.1 为什么需要分片上传
分片上传(Multipart Upload)是 S3 处理大文件上传的核心机制:
# 单次 PUT 上传的限制:
# - 最大对象大小:5 GB
# - 无法暂停/恢复
# - 网络中断需要完全重传
# - 无法并发上传
# Multipart Upload 的能力:
# - 最大对象大小:5 TB
# - 分片大小:5 MB ~ 5 GB
# - 最多 10,000 个分片
# - 支持断点续传(已上传的分片不需要重传)
# - 支持并发上传(多线程/多进程/多机器)
# - 上传过程中可以开始处理已上传的分片
4.2 分片大小选择
分片大小的选择是一个工程权衡:
# 分片大小选择的权衡:
#
# ┌──────────────┬──────────────┬──────────────┬───────────────────┐
# │ 文件大小 │ 推荐分片大小 │ 分片数量 │ 说明 │
# ├──────────────┼──────────────┼──────────────┼───────────────────┤
# │ 100 MB │ 16 MB │ ~7 │ 单连接即可 │
# │ 1 GB │ 64 MB │ ~16 │ 适度并发 │
# │ 10 GB │ 128 MB │ ~80 │ 充分并发 │
# │ 100 GB │ 512 MB │ ~200 │ 减少分片管理开销 │
# │ 1 TB │ 1 GB │ ~1024 │ 接近分片上限 │
# │ 5 TB │ 512 MB │ ~10000 │ 最大文件,最多分片 │
# └──────────────┴──────────────┴──────────────┴───────────────────┘
#
# 选择原则:
# 1. 分片不能太小:每个分片都有 HTTP 开销,太小导致请求数过多
# 2. 分片不能太大:失败重传的代价大,内存占用高
# 3. 总分片数不超过 10,000
# 4. 考虑网络带宽:分片大小应能在合理时间内上传完成
# - 100 Mbps 网络:64 MB 分片约需 5 秒
# - 1 Gbps 网络:256 MB 分片约需 2 秒
4.3 完整的分片上传实现
import os
import hashlib
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from botocore.config import Config
class MultipartUploader:
"""生产级分片上传器,支持并发上传与断点续传"""
def __init__(self, bucket, region='us-east-1', max_concurrency=10):
self.bucket = bucket
self.s3 = boto3.client(
's3',
region_name=region,
config=Config(
max_pool_connections=max_concurrency + 5,
retries={'max_attempts': 3, 'mode': 'adaptive'},
)
)
self.max_concurrency = max_concurrency
def upload_file(self, file_path, key, part_size=64 * 1024 * 1024):
"""上传文件,自动选择单次上传或分片上传"""
file_size = os.path.getsize(file_path)
# 小于 100MB 使用单次上传
if file_size < 100 * 1024 * 1024:
self._simple_upload(file_path, key)
return
self._multipart_upload(file_path, key, part_size)
def _simple_upload(self, file_path, key):
"""单次上传"""
with open(file_path, 'rb') as f:
self.s3.put_object(Bucket=self.bucket, Key=key, Body=f)
print(f"单次上传完成: {key}")
def _multipart_upload(self, file_path, key, part_size):
"""分片上传"""
file_size = os.path.getsize(file_path)
# 步骤 1:初始化分片上传
response = self.s3.create_multipart_upload(
Bucket=self.bucket,
Key=key,
ContentType=self._guess_content_type(key),
)
upload_id = response['UploadId']
print(f"分片上传已初始化: UploadId={upload_id}")
try:
# 步骤 2:并发上传分片
parts = self._upload_parts(
file_path, key, upload_id, file_size, part_size
)
# 步骤 3:完成分片上传
self.s3.complete_multipart_upload(
Bucket=self.bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={
'Parts': sorted(parts, key=lambda x: x['PartNumber'])
},
)
print(f"分片上传完成: {key} ({file_size / (1024**3):.2f} GB)")
except Exception as e:
# 上传失败时中止,释放已上传的分片(避免产生费用)
print(f"上传失败,中止分片上传: {e}")
self.s3.abort_multipart_upload(
Bucket=self.bucket, Key=key, UploadId=upload_id
)
raise
def _upload_parts(self, file_path, key, upload_id, file_size, part_size):
"""并发上传所有分片"""
parts = []
lock = threading.Lock()
uploaded_bytes = 0
def upload_single_part(part_number, offset, size):
nonlocal uploaded_bytes
with open(file_path, 'rb') as f:
f.seek(offset)
data = f.read(size)
response = self.s3.upload_part(
Bucket=self.bucket,
Key=key,
UploadId=upload_id,
PartNumber=part_number,
Body=data,
)
with lock:
uploaded_bytes += size
progress = (uploaded_bytes / file_size) * 100
print(f" 分片 {part_number} 完成 ({progress:.1f}%)")
return {
'PartNumber': part_number,
'ETag': response['ETag'],
}
# 构建分片任务列表
tasks = []
offset = 0
part_number = 1
while offset < file_size:
size = min(part_size, file_size - offset)
tasks.append((part_number, offset, size))
offset += size
part_number += 1
# 并发执行
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
futures = {
executor.submit(upload_single_part, pn, off, sz): pn
for pn, off, sz in tasks
}
for future in as_completed(futures):
result = future.result()
parts.append(result)
return parts
@staticmethod
def _guess_content_type(key):
"""根据文件扩展名猜测 MIME 类型"""
import mimetypes
content_type, _ = mimetypes.guess_type(key)
return content_type or 'application/octet-stream'
# 使用示例
if __name__ == '__main__':
uploader = MultipartUploader(
bucket='my-data-bucket',
region='us-east-1',
max_concurrency=8,
)
uploader.upload_file(
file_path='/data/large-dataset.parquet',
key='datasets/2025/large-dataset.parquet',
part_size=128 * 1024 * 1024, # 128MB 分片
)4.4 断点续传
断点续传的核心是利用 ListParts
查询已上传的分片:
import boto3
def resume_upload(s3_client, bucket, key, upload_id, file_path, part_size):
"""恢复中断的分片上传"""
# 查询已上传的分片
uploaded_parts = {}
paginator = s3_client.get_paginator('list_parts')
for page in paginator.paginate(
Bucket=bucket, Key=key, UploadId=upload_id
):
for part in page.get('Parts', []):
uploaded_parts[part['PartNumber']] = part['ETag']
print(f"已上传 {len(uploaded_parts)} 个分片,从断点继续...")
file_size = os.path.getsize(file_path)
all_parts = []
offset = 0
part_number = 1
while offset < file_size:
size = min(part_size, file_size - offset)
if part_number in uploaded_parts:
# 该分片已上传,跳过
all_parts.append({
'PartNumber': part_number,
'ETag': uploaded_parts[part_number],
})
print(f" 分片 {part_number} 已存在,跳过")
else:
# 上传缺失的分片
with open(file_path, 'rb') as f:
f.seek(offset)
data = f.read(size)
response = s3_client.upload_part(
Bucket=bucket, Key=key,
UploadId=upload_id,
PartNumber=part_number,
Body=data,
)
all_parts.append({
'PartNumber': part_number,
'ETag': response['ETag'],
})
print(f" 分片 {part_number} 上传完成")
offset += size
part_number += 1
# 完成上传
s3_client.complete_multipart_upload(
Bucket=bucket, Key=key, UploadId=upload_id,
MultipartUpload={
'Parts': sorted(all_parts, key=lambda x: x['PartNumber'])
},
)
print("断点续传完成")4.5 清理未完成的分片上传
未完成的分片上传会持续占用存储空间并产生费用:
import boto3
from datetime import datetime, timezone, timedelta
def cleanup_incomplete_uploads(s3_client, bucket, max_age_days=7):
"""清理超过指定天数的未完成分片上传"""
cutoff = datetime.now(timezone.utc) - timedelta(days=max_age_days)
paginator = s3_client.get_paginator('list_multipart_uploads')
cleaned = 0
for page in paginator.paginate(Bucket=bucket):
for upload in page.get('Uploads', []):
if upload['Initiated'] < cutoff:
s3_client.abort_multipart_upload(
Bucket=bucket,
Key=upload['Key'],
UploadId=upload['UploadId'],
)
print(f"已中止: {upload['Key']} (UploadId: {upload['UploadId']})")
cleaned += 1
print(f"共清理 {cleaned} 个未完成的分片上传")
# 更好的做法:配置 Bucket 生命周期策略自动清理
lifecycle_rule = {
'Rules': [{
'ID': 'abort-incomplete-multipart',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'AbortIncompleteMultipartUpload': {
'DaysAfterInitiation': 7
}
}]
}五、S3 Select 与 Glacier Select
5.1 S3 Select 原理
S3 Select 允许在服务端使用简单的结构化查询语言(SQL)过滤对象内容,避免将整个对象下载到客户端:
# 传统方式 vs S3 Select:
#
# ┌─────────────────────────────────────────────────────────────┐
# │ 传统方式 │
# │ │
# │ S3 存储 网络传输 客户端 │
# │ ┌─────────┐ ──────────────> ┌──────────────┐ │
# │ │ 10 GB │ 传输 10 GB │ 下载 10 GB │ │
# │ │ CSV 文件 │ │ 解析全部数据 │ │
# │ └─────────┘ │ 过滤出 100 MB │ │
# │ └──────────────┘ │
# └─────────────────────────────────────────────────────────────┘
#
# ┌─────────────────────────────────────────────────────────────┐
# │ S3 Select │
# │ │
# │ S3 存储 网络传输 客户端 │
# │ ┌─────────────┐ ──────────> ┌──────────┐ │
# │ │ 10 GB CSV │ 仅传输 │ 直接获得 │ │
# │ │ 服务端执行SQL │ 100 MB │ 100 MB │ │
# │ │ 过滤出结果 │ │ 结果数据 │ │
# │ └─────────────┘ └──────────┘ │
# └─────────────────────────────────────────────────────────────┘
#
# 优势:减少 99% 的数据传输,降低延迟和成本
5.2 支持的数据格式与 SQL 语法
# S3 Select 支持的输入格式:
# - CSV(逗号分隔值)
# - JSON(JavaScript 对象表示法)
# - Apache Parquet(列式存储格式)
# 支持的压缩格式(CSV/JSON):
# - GZIP
# - BZIP2
# - 无压缩
# SQL 语法支持:
# - SELECT / FROM / WHERE / LIMIT
# - 聚合函数:COUNT, SUM, AVG, MIN, MAX
# - 字符串函数:TRIM, SUBSTRING, CHAR_LENGTH, LOWER, UPPER
# - 日期函数:EXTRACT, DATE_ADD, DATE_DIFF, UTCNOW
# - 类型转换:CAST
# - 逻辑运算:AND, OR, NOT, IN, BETWEEN, LIKE
# - 不支持:JOIN, GROUP BY, ORDER BY, 子查询
5.3 S3 Select 实战示例
import boto3
import json
s3 = boto3.client('s3', region_name='us-east-1')
def query_csv_with_s3_select(bucket, key, sql_expression):
"""使用 S3 Select 查询 CSV 文件"""
response = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression=sql_expression,
InputSerialization={
'CSV': {
'FileHeaderInfo': 'USE', # 使用第一行作为列名
'RecordDelimiter': '\n',
'FieldDelimiter': ',',
'QuoteCharacter': '"',
},
'CompressionType': 'GZIP',
},
OutputSerialization={
'JSON': {
'RecordDelimiter': '\n',
},
},
)
records = []
for event in response['Payload']:
if 'Records' in event:
payload = event['Records']['Payload'].decode('utf-8')
for line in payload.strip().split('\n'):
if line:
records.append(json.loads(line))
elif 'Stats' in event:
stats = event['Stats']['Details']
print(f"扫描字节数: {stats['BytesScanned']:,}")
print(f"处理字节数: {stats['BytesProcessed']:,}")
print(f"返回字节数: {stats['BytesReturned']:,}")
ratio = stats['BytesReturned'] / stats['BytesScanned'] * 100
print(f"数据缩减比: {ratio:.2f}%")
return records
# 查询示例:从访问日志中筛选错误请求
results = query_csv_with_s3_select(
bucket='my-logs',
key='access-logs/2025/09/25.csv.gz',
sql_expression="""
SELECT s.timestamp, s.method, s.path, s.status_code, s.response_time
FROM s3object s
WHERE CAST(s.status_code AS INT) >= 500
AND CAST(s.response_time AS FLOAT) > 1.0
LIMIT 100
""",
)
for r in results:
print(f"{r['timestamp']} {r['method']} {r['path']} -> {r['status_code']}")
def query_parquet_with_s3_select(bucket, key):
"""查询 Parquet 文件——Parquet 的列式特性使 S3 Select 更高效"""
response = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression="""
SELECT user_id, event_type, COUNT(*) as cnt
FROM s3object s
WHERE event_type = 'purchase'
""",
InputSerialization={
'Parquet': {}, # Parquet 不需要额外配置
},
OutputSerialization={
'JSON': {'RecordDelimiter': '\n'},
},
)
for event in response['Payload']:
if 'Records' in event:
print(event['Records']['Payload'].decode('utf-8'))5.4 S3 Select 与 Athena 的对比
| 特性 | S3 Select | Amazon Athena |
|---|---|---|
| 查询范围 | 单个对象 | 跨多个对象/整个数据集 |
| SQL 能力 | 基础(无 JOIN/GROUP BY) | 完整的 ANSI SQL(Presto/Trino) |
| 支持格式 | CSV、JSON、Parquet | CSV、JSON、Parquet、ORC、Avro 等 |
| 元数据管理 | 无 | AWS Glue Data Catalog |
| 延迟 | 毫秒级(流式返回) | 秒到分钟级(需要规划查询) |
| 计费方式 | 扫描数据量 + 返回数据量 | 扫描数据量($5/TB) |
| 适用场景 | 单文件快速过滤 | 大规模数据分析 |
| 并发能力 | 高(每个请求独立) | 有并发查询数限制 |
选择建议:如果只需要从单个对象中提取少量数据,S3 Select 更快更便宜;如果需要跨文件分析或复杂查询,使用 Athena。
5.5 Glacier Select
Glacier Select 将类似的查询能力扩展到归档存储(Glacier)类别。它允许直接查询存储在 Glacier 中的数据,而无需先恢复(Restore)整个对象:
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
# Glacier Select 通过 RestoreObject 发起
s3.restore_object(
Bucket='my-archive',
Key='historical-data/2020-transactions.csv.gz',
RestoreRequest={
'Type': 'SELECT',
'Tier': 'Standard', # Standard: 3-5 小时; Expedited: 1-5 分钟
'SelectParameters': {
'InputSerialization': {
'CSV': {'FileHeaderInfo': 'USE'},
'CompressionType': 'GZIP',
},
'ExpressionType': 'SQL',
'Expression': """
SELECT * FROM s3object s
WHERE s.amount > '10000'
AND s.currency = 'USD'
""",
'OutputSerialization': {
'CSV': {},
},
},
'OutputLocation': {
'S3': {
'BucketName': 'my-results',
'Prefix': 'glacier-select-output/',
},
},
},
)六、版本控制与对象锁定
6.1 版本控制
版本控制(Versioning)为对象的每次修改保留独立的版本:
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
# 启用版本控制
s3.put_bucket_versioning(
Bucket='my-bucket',
VersioningConfiguration={'Status': 'Enabled'},
)
# 上传同一个 Key 的多个版本
s3.put_object(Bucket='my-bucket', Key='config.json', Body=b'{"v": 1}')
s3.put_object(Bucket='my-bucket', Key='config.json', Body=b'{"v": 2}')
s3.put_object(Bucket='my-bucket', Key='config.json', Body=b'{"v": 3}')
# 列出所有版本
response = s3.list_object_versions(Bucket='my-bucket', Prefix='config.json')
for version in response.get('Versions', []):
print(
f"VersionId: {version['VersionId']}, "
f"IsLatest: {version['IsLatest']}, "
f"LastModified: {version['LastModified']}, "
f"Size: {version['Size']}"
)
# 获取特定版本
response = s3.get_object(
Bucket='my-bucket',
Key='config.json',
VersionId='specific-version-id-here',
)
old_content = response['Body'].read()
# 删除操作在启用版本控制后的行为:
# 不指定 VersionId 的 DELETE -> 插入删除标记(Delete Marker)
# 指定 VersionId 的 DELETE -> 永久删除该版本
s3.delete_object(Bucket='my-bucket', Key='config.json')
# 此时 GET config.json 返回 404,但所有历史版本仍然存在
# 恢复被删除的对象:删除"删除标记"
response = s3.list_object_versions(Bucket='my-bucket', Prefix='config.json')
for marker in response.get('DeleteMarkers', []):
if marker['IsLatest']:
s3.delete_object(
Bucket='my-bucket',
Key='config.json',
VersionId=marker['VersionId'],
)
print("已恢复对象(删除了删除标记)")
break6.2 MFA Delete
多因素认证删除(MFA Delete)要求在永久删除对象版本或更改版本控制状态时提供 MFA 验证码:
# MFA Delete 的作用:
# 1. 防止意外或恶意的永久删除
# 2. 防止未授权更改版本控制配置
# 3. 只能由 Bucket 所有者(根账户)启用
# 4. 不能通过 IAM 策略授权
# 启用 MFA Delete(必须使用根账户凭证):
# aws s3api put-bucket-versioning \
# --bucket my-bucket \
# --versioning-configuration Status=Enabled,MFADelete=Enabled \
# --mfa "arn:aws:iam::123456789012:mfa/root-account-mfa-device 123456"
# 执行需要 MFA 的操作:
# aws s3api delete-object \
# --bucket my-bucket \
# --key config.json \
# --version-id "abc123..." \
# --mfa "arn:aws:iam::123456789012:mfa/root-account-mfa-device 654321"
6.3 对象锁定(Object Lock)与 WORM
对象锁定(Object Lock)实现了一次写入多次读取(Write Once Read Many,简称 WORM)语义:
import boto3
from datetime import datetime, timezone, timedelta
s3 = boto3.client('s3', region_name='us-east-1')
# 创建启用对象锁定的 Bucket(只能在创建时启用)
s3.create_bucket(
Bucket='compliance-bucket',
ObjectLockEnabledForBucket=True,
)
# 配置默认保留策略
s3.put_object_lock_configuration(
Bucket='compliance-bucket',
ObjectLockConfiguration={
'ObjectLockEnabled': 'Enabled',
'Rule': {
'DefaultRetention': {
'Mode': 'COMPLIANCE', # 或 'GOVERNANCE'
'Days': 365, # 保留 365 天
}
}
},
)
# 上传对象并设置保留期
retain_until = datetime.now(timezone.utc) + timedelta(days=2555) # 7 年
s3.put_object(
Bucket='compliance-bucket',
Key='audit/2025/financial-report.pdf',
Body=open('/data/report.pdf', 'rb'),
ObjectLockMode='COMPLIANCE',
ObjectLockRetainUntilDate=retain_until,
)
# COMPLIANCE 模式 vs GOVERNANCE 模式:
#
# COMPLIANCE(合规模式):
# - 任何人(包括根账户)都不能在保留期内删除或覆盖对象
# - 保留期不能缩短,只能延长
# - 适用于法规合规要求(如 SEC 17a-4、HIPAA)
#
# GOVERNANCE(治理模式):
# - 具有特定 IAM 权限的用户可以绕过保留策略
# - 保留期可以缩短或移除
# - 适用于内部数据保护策略
# 设置法律保留(Legal Hold)
s3.put_object_legal_hold(
Bucket='compliance-bucket',
Key='audit/2025/financial-report.pdf',
LegalHold={'Status': 'ON'},
)
# 法律保留独立于保留期,可以随时添加/移除
# 在法律保留生效期间,对象不能被删除(即使保留期已过)七、生命周期策略
7.1 存储类别与成本对比
S3 提供多种存储类别(Storage Class),适用于不同的访问模式:
| 存储类别 | 可用性 | 最短存储 | 检索费用 | 每 GB/月(美元) | 典型场景 |
|---|---|---|---|---|---|
| Standard | 99.99% | 无 | 无 | $0.023 | 频繁访问的活跃数据 |
| Intelligent-Tiering | 99.9% | 无 | 无 | $0.023(自动分层) | 访问模式不可预测 |
| Standard-IA | 99.9% | 30 天 | 每 GB $0.01 | $0.0125 | 不频繁但需快速访问 |
| One Zone-IA | 99.5% | 30 天 | 每 GB $0.01 | $0.01 | 可重建的非关键数据 |
| Glacier Instant | 99.9% | 90 天 | 每 GB $0.03 | $0.004 | 季度访问的归档 |
| Glacier Flexible | 99.99% | 90 天 | 按请求计费 | $0.0036 | 年度访问的归档 |
| Glacier Deep Archive | 99.99% | 180 天 | 按请求计费 | $0.00099 | 极少访问的合规归档 |
7.2 生命周期规则配置
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
lifecycle_config = {
'Rules': [
{
# 规则 1:日志文件的自动分层与过期
'ID': 'log-lifecycle',
'Status': 'Enabled',
'Filter': {'Prefix': 'logs/'},
'Transitions': [
{
# 30 天后转为低频访问
'Days': 30,
'StorageClass': 'STANDARD_IA',
},
{
# 90 天后转为 Glacier Instant Retrieval
'Days': 90,
'StorageClass': 'GLACIER_IR',
},
{
# 365 天后转为 Glacier Deep Archive
'Days': 365,
'StorageClass': 'DEEP_ARCHIVE',
},
],
'Expiration': {
# 2555 天(约 7 年)后自动删除
'Days': 2555,
},
},
{
# 规则 2:自动清理未完成的分片上传
'ID': 'abort-incomplete-uploads',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'AbortIncompleteMultipartUpload': {
'DaysAfterInitiation': 7,
},
},
{
# 规则 3:自动清理过期的删除标记
'ID': 'cleanup-delete-markers',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'Expiration': {
'ExpiredObjectDeleteMarker': True,
},
},
{
# 规则 4:旧版本的生命周期管理
'ID': 'version-lifecycle',
'Status': 'Enabled',
'Filter': {'Prefix': 'data/'},
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 30,
'StorageClass': 'STANDARD_IA',
},
{
'NoncurrentDays': 90,
'StorageClass': 'GLACIER',
},
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 365,
# 保留最近 3 个非当前版本
'NewerNoncurrentVersions': 3,
},
},
{
# 规则 5:基于标签过滤
'ID': 'temp-data-cleanup',
'Status': 'Enabled',
'Filter': {
'Tag': {
'Key': 'lifecycle',
'Value': 'temporary',
},
},
'Expiration': {
'Days': 1,
},
},
],
}
s3.put_bucket_lifecycle_configuration(
Bucket='my-bucket',
LifecycleConfiguration=lifecycle_config,
)7.3 Intelligent-Tiering 自动分层
智能分层(Intelligent-Tiering)根据访问模式自动在不同存储层之间移动对象:
# Intelligent-Tiering 的分层结构:
#
# ┌──────────────────────────────────────────────────────────┐
# │ Intelligent-Tiering 分层 │
# │ │
# │ ┌─────────────────────────────────────────────┐ │
# │ │ 频繁访问层(Frequent Access Tier) │ │
# │ │ - 与 Standard 相同的性能 │ │
# │ │ - 对象创建时默认进入此层 │ │
# │ └───────────────┬─────────────────────────────┘ │
# │ │ 30 天无访问 │
# │ v │
# │ ┌─────────────────────────────────────────────┐ │
# │ │ 低频访问层(Infrequent Access Tier) │ │
# │ │ - 成本降低 40% │ │
# │ │ - 自动提升:再次访问时回到频繁访问层 │ │
# │ └───────────────┬─────────────────────────────┘ │
# │ │ 90 天无访问(可选,需启用) │
# │ v │
# │ ┌─────────────────────────────────────────────┐ │
# │ │ 归档即时访问层(Archive Instant Access) │ │
# │ │ - 成本降低 68% │ │
# │ │ - 毫秒级检索 │ │
# │ └───────────────┬─────────────────────────────┘ │
# │ │ 180 天无访问(可选,需启用) │
# │ v │
# │ ┌─────────────────────────────────────────────┐ │
# │ │ 归档访问层(Archive Access Tier) │ │
# │ │ - 3-5 小时检索 │ │
# │ └───────────────┬─────────────────────────────┘ │
# │ │ 730 天无访问(可选,需启用) │
# │ v │
# │ ┌─────────────────────────────────────────────┐ │
# │ │ 深度归档访问层(Deep Archive Access Tier) │ │
# │ │ - 12 小时检索 │ │
# │ └─────────────────────────────────────────────┘ │
# └──────────────────────────────────────────────────────────┘
#
# 注意:每个对象收取少量的监控和自动分层费用(约 $0.0025/千对象/月)
# 适合大于 128KB 的对象(小对象始终保持在频繁访问层)
八、跨区域复制
8.1 CRR 与 SRR
S3 提供两种复制机制:
- 跨区域复制(Cross-Region Replication,简称 CRR):在不同 AWS 区域之间复制对象
- 同区域复制(Same-Region Replication,简称 SRR):在同一区域的不同 Bucket 之间复制对象
import boto3
import json
s3 = boto3.client('s3', region_name='us-east-1')
# 前提条件:
# 1. 源和目标 Bucket 都必须启用版本控制
# 2. 需要创建 IAM 角色授权 S3 执行复制
# 启用源 Bucket 的版本控制
s3.put_bucket_versioning(
Bucket='source-bucket',
VersioningConfiguration={'Status': 'Enabled'},
)
# 配置复制规则
replication_config = {
'Role': 'arn:aws:iam::123456789012:role/s3-replication-role',
'Rules': [
{
'ID': 'replicate-all',
'Status': 'Enabled',
'Priority': 1,
'Filter': {
'Prefix': '', # 复制所有对象
},
'Destination': {
'Bucket': 'arn:aws:s3:::destination-bucket',
'StorageClass': 'STANDARD',
# 可选:指定不同的存储类别
# 'StorageClass': 'STANDARD_IA',
# 可选:跨账户复制需要指定账户 ID
# 'Account': '987654321098',
# 'AccessControlTranslation': {
# 'Owner': 'Destination',
# },
# 可选:启用复制时间控制(Replication Time Control)
'ReplicationTime': {
'Status': 'Enabled',
'Time': {
'Minutes': 15, # 99.99% 的对象在 15 分钟内复制完成
},
},
# 可选:启用复制指标
'Metrics': {
'Status': 'Enabled',
'EventThreshold': {
'Minutes': 15,
},
},
},
'DeleteMarkerReplication': {
'Status': 'Enabled', # 复制删除标记
},
},
{
'ID': 'replicate-logs-to-archive',
'Status': 'Enabled',
'Priority': 2,
'Filter': {
'Prefix': 'logs/',
},
'Destination': {
'Bucket': 'arn:aws:s3:::archive-bucket',
'StorageClass': 'GLACIER',
},
'DeleteMarkerReplication': {
'Status': 'Disabled',
},
},
],
}
s3.put_bucket_replication(
Bucket='source-bucket',
ReplicationConfiguration=replication_config,
)8.2 复制延迟与监控
import boto3
from datetime import datetime, timezone
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')
def get_replication_metrics(bucket_name, rule_id, hours=24):
"""查询复制延迟指标"""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=hours)
metrics = {
# 等待复制的操作数
'OperationsPendingReplication': 'Count',
# 等待复制的数据量(字节)
'BytesPendingReplication': 'Bytes',
# 复制延迟(秒)
'ReplicationLatency': 'Seconds',
# 失败的复制操作数
'OperationsFailedReplication': 'Count',
}
for metric_name, unit in metrics.items():
response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName=metric_name,
Dimensions=[
{'Name': 'SourceBucket', 'Value': bucket_name},
{'Name': 'DestinationBucket', 'Value': 'destination-bucket'},
{'Name': 'RuleId', 'Value': rule_id},
],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average', 'Maximum'],
)
if response['Datapoints']:
latest = max(response['Datapoints'], key=lambda x: x['Timestamp'])
print(f"{metric_name}: 平均={latest['Average']:.2f}, 最大={latest['Maximum']:.2f} {unit}")8.3 双向复制
双向复制(Bi-directional Replication)允许两个 Bucket 互为源和目标,实现多主(Multi-Primary)写入:
# 双向复制架构:
#
# 区域 A (us-east-1) 区域 B (eu-west-1)
# ┌──────────────────┐ ┌──────────────────┐
# │ bucket-a │ -------> │ bucket-b │
# │ │ │ │
# │ 应用写入 ------+│ <------- │+------ 应用写入 │
# └──────────────────┘ └──────────────────┘
#
# 关键配置:
# 1. 两个 Bucket 各自配置复制规则指向对方
# 2. 必须启用 "Replica modification sync" 防止复制循环
# 3. S3 通过内部标记识别复制的对象,避免无限循环
# 注意事项:
# - 写入冲突:如果同一个 Key 在两端同时被修改,
# 最终状态取决于哪个写入先到达对方——这是"最后写入胜出"语义
# - 删除传播:需要明确配置是否复制删除标记
# - 延迟:跨大陆复制延迟通常在几秒到几分钟
九、S3 Event Notifications
9.1 事件类型
S3 可以在对象级别操作发生时发送事件通知(Event Notification):
# 支持的事件类型:
# 对象创建事件:
s3:ObjectCreated:* # 所有创建事件
s3:ObjectCreated:Put # PUT 上传
s3:ObjectCreated:Post # POST 上传
s3:ObjectCreated:Copy # 复制操作
s3:ObjectCreated:CompleteMultipartUpload # 分片上传完成
# 对象删除事件:
s3:ObjectRemoved:* # 所有删除事件
s3:ObjectRemoved:Delete # DELETE 操作
s3:ObjectRemoved:DeleteMarkerCreated # 删除标记创建
# 对象恢复事件(从 Glacier):
s3:ObjectRestore:Post # 恢复请求已发起
s3:ObjectRestore:Completed # 恢复完成
s3:ObjectRestore:Delete # 恢复的副本已过期
# 其他事件:
s3:ReducedRedundancyLostObject # RRS 对象丢失
s3:Replication:* # 复制相关事件
s3:LifecycleTransition # 生命周期转换
s3:IntelligentTiering # 智能分层变更
s3:ObjectTagging:* # 标签变更
s3:ObjectAcl:Put # ACL 变更
9.2 事件目标配置
import boto3
import json
s3 = boto3.client('s3', region_name='us-east-1')
notification_config = {
# Lambda 函数触发
'LambdaFunctionConfigurations': [
{
'Id': 'process-new-images',
'LambdaFunctionArn': (
'arn:aws:lambda:us-east-1:123456789012'
':function:image-processor'
),
'Events': ['s3:ObjectCreated:*'],
'Filter': {
'Key': {
'FilterRules': [
{'Name': 'prefix', 'Value': 'uploads/images/'},
{'Name': 'suffix', 'Value': '.jpg'},
]
}
},
},
],
# SQS 队列
'QueueConfigurations': [
{
'Id': 'log-processing-queue',
'QueueArn': (
'arn:aws:sqs:us-east-1:123456789012:log-processing'
),
'Events': ['s3:ObjectCreated:*'],
'Filter': {
'Key': {
'FilterRules': [
{'Name': 'prefix', 'Value': 'logs/'},
{'Name': 'suffix', 'Value': '.gz'},
]
}
},
},
],
# SNS 主题
'TopicConfigurations': [
{
'Id': 'deletion-alerts',
'TopicArn': (
'arn:aws:sns:us-east-1:123456789012:s3-deletion-alerts'
),
'Events': ['s3:ObjectRemoved:*'],
},
],
}
s3.put_bucket_notification_configuration(
Bucket='my-bucket',
NotificationConfiguration=notification_config,
)9.3 Lambda 事件处理示例
import json
import urllib.parse
import boto3
s3_client = boto3.client('s3')
def lambda_handler(event, context):
"""处理 S3 事件的 Lambda 函数"""
for record in event['Records']:
# 解析事件信息
event_name = record['eventName']
bucket = record['s3']['bucket']['name']
key = urllib.parse.unquote_plus(
record['s3']['object']['key'], encoding='utf-8'
)
size = record['s3']['object'].get('size', 0)
etag = record['s3']['object'].get('eTag', '')
print(f"事件: {event_name}, 桶: {bucket}, 键: {key}, 大小: {size}")
# 根据事件类型分发处理
if event_name.startswith('ObjectCreated'):
handle_object_created(bucket, key, size)
elif event_name.startswith('ObjectRemoved'):
handle_object_removed(bucket, key)
def handle_object_created(bucket, key, size):
"""处理新对象创建"""
if key.endswith(('.jpg', '.jpeg', '.png')):
# 生成缩略图
generate_thumbnail(bucket, key)
elif key.endswith('.csv'):
# 触发 ETL 流水线
trigger_etl_pipeline(bucket, key)
elif key.endswith('.log.gz'):
# 索引日志数据
index_log_data(bucket, key)
def handle_object_removed(bucket, key):
"""处理对象删除"""
# 清理关联资源(如缩略图、索引等)
cleanup_related_resources(bucket, key)9.4 EventBridge 集成
Amazon EventBridge 提供了比原生通知更强大的事件路由能力:
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
# 启用 EventBridge 通知
s3.put_bucket_notification_configuration(
Bucket='my-bucket',
NotificationConfiguration={
'EventBridgeConfiguration': {}, # 启用即可,无需额外配置
},
)
# EventBridge 的优势:
# 1. 支持更丰富的事件过滤(按对象大小、元数据等)
# 2. 支持更多目标(Step Functions、API Gateway、Kinesis 等)
# 3. 支持事件归档和重放
# 4. 支持跨账户事件路由
# 5. 与 CloudTrail 集成,可以基于 API 调用触发十、S3 性能优化
10.1 性能基准
S3 的设计性能指标:
# S3 性能基准(截至 2024 年):
#
# 每个前缀的请求速率:
# - GET/HEAD: 5,500 次/秒
# - PUT/COPY/POST/DELETE: 3,500 次/秒
#
# 单个对象的最大吞吐量:
# - 单连接 GET:~100 MB/s(取决于实例网络带宽)
# - 多连接并发 Range GET:可达 ~100 Gbps(使用 S3 Express One Zone)
#
# 延迟:
# - 首字节延迟(Time to First Byte):通常 100-200ms
# - S3 Express One Zone:个位数毫秒
#
# 注意:这些是"每前缀"的限制
# 通过合理设计 Key 前缀,可以线性扩展总吞吐量
10.2 前缀分散策略
# 反模式:所有对象使用相同前缀
# 这会导致所有请求落在同一个分区,受单前缀速率限制
#
# 不推荐:
# data/2025-09-25/file-001.csv
# data/2025-09-25/file-002.csv
# data/2025-09-25/file-003.csv
# ... (所有请求都命中 "data/2025-09-25/" 前缀)
#
# 推荐:在前缀中引入分散因子
#
# 方法 1:使用哈希前缀
# a1b2/data/2025-09-25/file-001.csv
# c3d4/data/2025-09-25/file-002.csv
# e5f6/data/2025-09-25/file-003.csv
#
# 方法 2:反转日期
# 52-90-5202/data/file-001.csv (2025-09-25 反转)
# 52-90-5202/data/file-002.csv
#
# 方法 3:使用对象 ID 的一部分作为前缀
# 001/data/2025-09-25/file-001.csv
# 002/data/2025-09-25/file-002.csv
#
# 注意:自 2018 年起,S3 已经能够自动处理高请求速率的分区拆分,
# 但对于突发流量场景,预先分散前缀仍然是最佳实践。
import hashlib
def generate_distributed_key(logical_path, num_prefixes=16):
"""生成分散的对象键"""
hash_value = hashlib.md5(logical_path.encode()).hexdigest()
prefix = hash_value[:4] # 取前 4 个字符作为分散前缀
return f"{prefix}/{logical_path}"
# 示例
keys = [
generate_distributed_key(f"events/2025/09/25/event-{i:06d}.json")
for i in range(100)
]
# 结果示例:
# "a3f2/events/2025/09/25/event-000000.json"
# "7b1c/events/2025/09/25/event-000001.json"
# "d4e8/events/2025/09/25/event-000002.json"
# 请求均匀分布在 65536 个前缀中10.3 并发上传与下载
import os
import boto3
from boto3.s3.transfer import TransferConfig
s3 = boto3.client('s3', region_name='us-east-1')
# boto3 的 TransferConfig 控制并发行为
transfer_config = TransferConfig(
multipart_threshold=8 * 1024 * 1024, # 8MB 以上使用分片上传
max_concurrency=10, # 最大并发数
multipart_chunksize=8 * 1024 * 1024, # 每个分片 8MB
num_download_threads=10, # 下载线程数
max_io_queue=100, # IO 队列深度
io_chunksize=256 * 1024, # IO 块大小
)
# 上传
s3.upload_file(
'/data/large-file.parquet',
'my-bucket',
'data/large-file.parquet',
Config=transfer_config,
)
# 下载
s3.download_file(
'my-bucket',
'data/large-file.parquet',
'/data/downloaded.parquet',
Config=transfer_config,
)
# 使用 Range GET 并发下载大文件
from concurrent.futures import ThreadPoolExecutor
def parallel_download(bucket, key, output_path, part_size=64*1024*1024, workers=16):
"""并发 Range GET 下载"""
head = s3.head_object(Bucket=bucket, Key=key)
file_size = head['ContentLength']
# 预分配文件
with open(output_path, 'wb') as f:
f.seek(file_size - 1)
f.write(b'\0')
def download_range(start, end, part_num):
response = s3.get_object(
Bucket=bucket, Key=key,
Range=f'bytes={start}-{end}'
)
data = response['Body'].read()
with open(output_path, 'r+b') as f:
f.seek(start)
f.write(data)
return part_num, len(data)
tasks = []
offset = 0
part_num = 0
while offset < file_size:
end = min(offset + part_size - 1, file_size - 1)
tasks.append((offset, end, part_num))
offset = end + 1
part_num += 1
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(download_range, start, end, pn)
for start, end, pn in tasks
]
for f in futures:
pn, sz = f.result()
print(f"分片 {pn} 下载完成: {sz} 字节")
print(f"下载完成: {output_path} ({file_size / (1024**3):.2f} GB)")10.4 Transfer Acceleration
传输加速(Transfer Acceleration)利用 Amazon CloudFront 的全球边缘节点加速跨区域上传:
# Transfer Acceleration 工作原理:
#
# 不使用 Transfer Acceleration:
#
# 客户端 (东京) ----[公共互联网, ~200ms]----> S3 (us-east-1)
#
# 使用 Transfer Acceleration:
#
# 客户端 (东京) --[~10ms]--> CloudFront 边缘 (东京)
# |
# [AWS 骨干网, 优化路由]
# |
# v
# S3 (us-east-1)
#
# 效果:
# - 距离越远,加速效果越明显
# - 大文件(>1GB)受益最大
# - 额外成本:$0.04/GB(上传)、$0.08/GB(下载,部分区域)
# - 使用特殊端点:{bucket}.s3-accelerate.amazonaws.com
import boto3
s3 = boto3.client('s3', region_name='us-east-1')
# 启用 Transfer Acceleration
s3.put_bucket_accelerate_configuration(
Bucket='my-bucket',
AccelerateConfiguration={'Status': 'Enabled'},
)
# 使用加速端点上传
s3_accel = boto3.client(
's3',
region_name='us-east-1',
config=boto3.session.Config(
s3={'use_accelerate_endpoint': True}
),
)
# 上传——SDK 自动使用加速端点
s3_accel.upload_file(
'/data/large-file.tar.gz',
'my-bucket',
'uploads/large-file.tar.gz',
)
# 测试加速效果(AWS 提供的速度比较工具):
# https://s3-accelerate-speedtest.s3-accelerate.amazonaws.com/en/accelerate-speed-comparsion.html10.5 性能优化清单
# S3 性能优化清单:
#
# 上传优化:
# [ ] 大文件使用 Multipart Upload(>100MB)
# [ ] 合理设置分片大小(考虑网络带宽和重试成本)
# [ ] 使用并发上传(ThreadPoolExecutor / multiprocessing)
# [ ] 长距离传输考虑 Transfer Acceleration
# [ ] 配置 SDK 的重试策略为 adaptive 模式
# [ ] 设置合理的连接池大小(max_pool_connections)
#
# 下载优化:
# [ ] 大文件使用 Range GET 并发下载
# [ ] 仅下载需要的字段(S3 Select)
# [ ] 使用条件请求避免不必要的传输(If-None-Match / If-Modified-Since)
# [ ] 对于热数据,在前端使用 CloudFront 缓存
#
# 列表操作优化:
# [ ] 使用 ListObjectsV2(而非 V1)
# [ ] 使用 Delimiter 和 Prefix 缩小扫描范围
# [ ] 如果只需要判断对象是否存在,使用 HeadObject
# [ ] 对于超大规模列表,考虑 S3 Inventory
#
# 架构优化:
# [ ] 分散 Key 前缀,避免热点
# [ ] 选择合适的存储类别
# [ ] 配置生命周期策略自动转储
# [ ] 与计算资源部署在同一区域
# [ ] 使用 VPC Endpoint 避免公网流量费用
十一、参考文献
- Amazon S3 官方文档: https://docs.aws.amazon.com/s3/
- Amazon S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/
- AWS Signature Version 4 文档: https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html
- S3 Strong Consistency 公告: https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
- S3 性能优化指南: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
- S3 Multipart Upload 文档: https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
- S3 Select 用户指南: https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html
- S3 Object Lock 文档: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html
- S3 Replication 文档: https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication.html
- boto3 S3 Client 文档: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
- S3 存储类别对比: https://aws.amazon.com/s3/storage-classes/
- S3 Transfer Acceleration: https://docs.aws.amazon.com/AmazonS3/latest/userguide/transfer-acceleration.html
上一篇: 对象存储模型:从文件到对象的范式转变 下一篇: MinIO 架构与实现
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【存储工程】云对象存储内部架构
深入剖析云对象存储——S3的11个9持久性实现、元数据-索引-存储三层架构、跨AZ复制策略、存储类别实现差异与成本模型分析
【存储工程】对象存储模型:从文件到对象的范式转变
深入分析对象存储的设计哲学——文件系统与对象存储的本质差异、CAP 权衡、最终一致性到强一致性的演进,以及 S3 API 核心操作实战
数据库内核实验索引
汇总本站数据库内核与存储引擎实验文章,重点覆盖从零实现 LSM-Tree 及其工程权衡。
存储工程索引
汇总本站存储工程系列文章,覆盖 HDD、SSD、NVMe、持久内存、索引结构、压缩、分布式存储与对象存储。