#### **前言**
一些传统游戏架构中,采用 MySQL 存储玩家存档数据,利用分库分表分散单库单表的存储和性能压力,从而达到支持更多玩家的目的。随着数据量增长,数据表中 varchar 类型已经无法满足游戏中单字段的存储需求,而 blob 字段的应用对于这种架构下改造成本是最低的,因此一些游戏开始在最初设计的时候,数据库表结构就采用了 Blob 字段作为其玩家的游戏任务、道具等数据的存储。
Blob 字段在 MySQL 5.6 / 5.7中存在 bug(**MySQL Bugs: #96466**),这个 bug 有概率导致数据库集群崩溃,造成数据丢失。即使在 MySQL 8.0中,由于引擎本身设计的限制,在单表20GB以上,高频的更新就会导致数据库出现性能受限。并且随着表增大,性能问题会越来越明显。
随着当游戏业务爆发时增长的时候,传统关系型数据库在分库分表的时候,需要进行应用改造,同时存在一定的停机维护时间。而且这些扩展完成后,在游戏的夕阳期进行收缩也需要进行应用改造,这无疑对业务开发和基础运维的部门造成了很多额外的工作量。
[Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 在应用到这个场景上是非常适用的。在业务发展任意阶段,都可以实现0停机的扩展,自动伸缩的特性。而且这一切对于应用层是完全透明的。同时在日常运维中也可以贴合业务负载进行动态扩缩容,从而进一步降低成本。
MySQL Bugs: #96466:
[https://bugs.mysql.com/bug.php?id=96466](https://bugs.mysql.com/bug.php?id=96466)
#### **概述**
本文主要讲述在游戏场景下,根据 **[Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail)** 的限制(每个项目都必须小于400KB),在限制下尽可能存储更多的数据和当存储量超出限制时,扩展存储的最大化利用空间。重点描述如何利用 [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail)+[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 保存玩家存档中的大数据量属性,避免数据存在 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 上后,在数据写入 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 时,发生读取到[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)旧存档的情况。同时利用 gzip 压缩减少数据大小,减少 IO 的开销提升性能。
[Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail):
[https://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/ServiceQuotas.html#limits-items](https://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/ServiceQuotas.html#limits-items)
#### **架构图**
![image.png](https://dev-media.amazoncloud.cn/fadd423bfdda463f94e2dcb33deca8b5_image.png)
#### **实战编码**
##### **目标**
1.所有数据保存前都进行 gzip 压缩,读取后都用gzip解压。
2.[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 存储和 [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 的 binary 字段存储可以自适应。如果用户数据压缩后如果大于指定的值则写入[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail),否则直接保存到当前数据库项目中的字段。
3.[Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 项目读取的时候,解析解压后的字段,如果字符串以s3://开头,则继续从 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 中获取数据
4.设置 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 读锁字段,判断当前状态是否正在写入 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail),以阻塞读进程。在每个项目需要写入 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 前都会设置read_lock 为 Ture,[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 写成功后则设置为 False。读取记录后,read_lock 是否为 True,如果是判断被阻塞,进程会等待一段时间后进行重试,直到重试次数超出指定的值。重试超时后,读进程会认为写进程可能由于某种原因导致写永远无法成功,于是会将 read_lock 设置成 False。
第一步:初始化环境参数
```
from time import sleep
import boto3
import gzip
import random
import json
import hashlib
import logging
# 写入 S3 的门槛,超过这个值数据会写入 S3,否则保存在数据库内,默认值350KB
UPLOAD_TO_S3_THRESHOLD_BYTES = 358400
# 用户数据库保存的目标 S3 存储桶
USER_DATA_BUCKET = 'linyesh-user-data'
# 遇到S3有读锁,重新请求最大次数,超出次数限制锁会被自动清除
S3_READ_LOCK_RETRY_TIMES = 10
# 遇到 S3 有读锁,读请求重试间隔时间
S3_READ_RETRY_INTERVAL = 0.2
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
```
**参数说明**
UPLOAD_TO_S3_THRESHOLD_BYTES:为字段最大的数据存储长度限制。单位为:字节数。由于 [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 一个项目(Item)数据大小限制为400KB。我们除了数据存档中最大字段还必须预留一部分空间给其他字段,避免整个 Item 超出400KB。
USER_DATA_BUCKET:[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 用于存储超出400KB后的玩家大字段数据。需要提前建好,具体步骤参考:**创建存储桶**([https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/userguide/create-bucket-overview.html](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/userguide/create-bucket-overview.html))
S3_READ_LOCK_RETRY_TIMES:限制当玩家在 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 上的存档处在写入状态时候,读请求重试的次数。在项目处于读锁状态的时候,读进程会等待一段时间后重试。
S3_READ_RETRY_INTERVAL:读锁状态下,重试读的间隔时间,单位:秒。
注意:S3_READ_LOCK_RETRY_TIMES 乘以 S3_READ_RETRY_INTERVAL 的时间理论上必须小于 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 存档上传时间的最大值,因此实际使用本文中的代码应该根据存档可能的大小来调整这2个参数。否则可能存档会有大概率会发生脏读的情况。
**第二步:创建 [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 表**
```
def create_tables():
"""
创建表
:return:
"""
response = dynamodb.create_table(
TableName='players',
KeySchema=[
{
'AttributeName': 'username',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'username',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
# Wait until the table exists.
response.wait_until_exists()
# Print out some data about the table.
logger.debug(response.item_count)
```
**第三步:编写辅助逻辑**
**指数级回退函数**
```
def run_with_backoff(function, retries=5, **function_parameters):
base_backoff = 0.1 # base 100ms backoff
max_backoff = 10 # sleep for maximum 10 seconds
tries = 0
while True:
try:
return function(function_parameters)
except (ConnectionError, TimeoutError):
if tries >= retries:
raise
backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))
logger.debug(f"sleeping for {backoff:.2f}s")
sleep(backoff)
tries += 1
```
**[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 路径判断函数**
```
def is_s3_path(content):
return content.startswith('s3://')
```
** [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 文件获取**
```
def get_s3_object(key):
response = s3.get_object(Bucket=USER_DATA_BUCKET, Key=s3_key_generator(key))
return response['Body']
```
**检查大小超限**
```
def check_threshold(current_size):
return current_size > UPLOAD_TO_S3_THRESHOLD_BYTES
```
**[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) Key 生成函数**
这个函数可以将玩家的存档随机分配到S3桶下不同的 Prefix 中,这有利于提高 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 中 IO 的性能。
```
def s3_key_generator(key):
s3_prefix = hashlib.md5((key).encode('utf-8')).hexdigest()[:8]
return s3_prefix + '/' + key
```
**文件上传到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)**
```
def upload_content_to_s3(obj_param):
s3_key = s3_key_generator(obj_param['key'])
try:
response = s3.put_object(
Body=obj_param['content_bytes'],
Bucket=USER_DATA_BUCKET,
Key=s3_key)
return "s3://%s/%s" % (USER_DATA_BUCKET, s3_key)
except Exception as e:
logger.error(e)
raise e
```
**第四步:编写主体逻辑**
**写入单个项目到 [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 数据库**
```
def put_item(load_data):
gzip_data
= gzip.compress(load_data) # 压缩数据
logger.debug('压缩后大小%.2fKB,原始大小%.2fKB,压缩率 %.2f%%' % (
len(gzip_data) / 1024.0,
len(load_data) / 1024.0,
100.0 * len(gzip_data) / len(load_data)))
table = dynamodb.Table('players')
player_username = 'player' + str(random.randint(1, 1000))
if check_threshold(len(gzip_data)):
try:
# 读锁保护
table.update_item(
Key={
'username': player_username,
},
UpdateExpression="set read_lock = :read_lock",
ExpressionAttributeValues={
':read_lock': True,
},
)
# 写入数据到S3
s3_path = run_with_backoff(upload_content_to_s3, key=player_username, content_bytes=gzip_data)
# 解除读锁保护,同时存储数据在S3上到路径
response = table.put_item(
Item={
'username': player_username,
'read_lock': False,
'inventory': gzip.compress(s3_path.encode(encoding='utf-8', errors='strict')),
}
)
logger.debug('成功上传大纪录到S3,路径:%s' % s3_path)
except Exception as e:
logger.debug('存档失败')
logger.error(e)
else:
response = table.put_item(
Item={
'username': player_username,
'inventory': gzip_data,
}
)
logger.debug('成功上传纪录, username=%s' % player_username)
```
**读取数据库中一条玩家记录**
```
def get_player_profile(uid):
"""
读取记录
:param uid: 玩家id
:return:
"""
table = dynamodb.Table('players')
player_name = 'player' + str(uid)
retry_count = 0
while True:
response = table.get_item(
Key={
'username': player_name,
}
)
if 'Item' not in response:
logger.error('Not Found')
return {}
item = response['Item']
# 检查读锁信息, 如果存在锁根据参数设置,间隔一段时间重新读取记录
if 'read_lock' in item and item['read_lock']:
retry_count += 1
logger.info('当前第%d次重试' % retry_count)
# 如果超时无法读取记录,则消除读锁,并重新读取记录
if retry_count < S3_READ_LOCK_RETRY_TIMES:
sleep(S3_READ_RETRY_INTERVAL)
continue
else:
table.update_item(
Key={
'username': player_name,
},
UpdateExpression="set read_lock = :read_lock",
ExpressionAttributeValues={
':read_lock': False,
},
)
inventory_bin = gzip.decompress(item['inventory'].value) # 解压缩数据
inventory_str = inventory_bin.decode("utf-8")
if is_s3_path(inventory_str):
player_data = gzip.decompress(get_s3_object(player_name).read())
inventory_json = json.loads(player_data)
else:
inventory_json = json.loads(inventory_str)
user_profile = {**response['Item'], **{'inventory': inventory_json}}
```
**最后,编写测试逻辑**
准备几个不同大小的 json 文件,观察写入数据库中的变化。
```
if __name__ == '__main__':
path_example = 'small.json'
# path_example = '500kb.json'
# path_example = '2MB.json'
with open(path_example, 'r') as load_f:
load_str = json.dumps(json.load(load_f))
test_data = load_str.encode(encoding='utf-8', errors='strict')
put_item(test_data)
# player_profile = get_player_profile(238)
# logger.info(player_profile)
```
如果需要测试读锁,可以将数据库中单个项目的 read_lock 手动设置成 True,然后观察读取逻辑在这个过程中的变化。
#### **限制**
对于存在单用户有高并发存档需求的游戏而言,以上设计中并未包含在数据存储在 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 上后,出现并发写的场景考虑。如果有此场景的需求,需要一些应用逻辑或者架构调整。
#### **结论**
在本次测试中发现,json 格式的数据使用 gzip 后,压缩率约为25%左右,理论上我们可以把单个项目(item)中可以存储最大约为1.6MB的数据项。即便有少量压缩后超过400KB的数据,也可以存储到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 上,仅在 [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/?trk=cndc-detail) 中存储元数据和大字段数据在 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)上的路径。gzip 会带来一些额外的计算和 IO 开销,但是这些开销主要会落在游戏服务器上,对于数据库来说反而减少了 IO 的开销。
在大多数场景下,玩家数据即便不压缩也很少会超过400KB。这种情况下,建议可以尝试对比压缩启用和不启用两种场景的性能数据。以决定哪种方式更适合自己的游戏。
**本篇作者**
**林业**
亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案的咨询与架构设计。拥有超过14年研发经验,曾打造千万级用户 APP,多项Github 开源项目贡献者。在游戏、IOT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。