利用 DynamoDB 和 S3 结合 gzip 压缩,最大化存储玩家数据

0
0
{"value":"### **前言**\n\n一些传统游戏架构中,采用 MySQL 存储玩家存档数据,利用分库分表分散单库单表的存储和性能压力,从而达到支持更多玩家的目的。随着数据量增长,数据表中 varchar 类型已经无法满足游戏中单字段的存储需求,而 blob 字段的应用对于这种架构下改造成本是最低的,因此一些游戏开始在最初设计的时候,数据库表结构就采用了 Blob 字段作为其玩家的游戏任务、道具等数据的存储。\n\nBlob 字段在 MySQL 5.6 / 5.7 中存在 bug([MySQL Bugs: #96466](https://bugs.mysql.com/bug.php?id=96466)),这个 bug 有概率导致数据库集群崩溃,造成数据丢失。即使在 MySQL 8.0 中,由于引擎本身设计的限制,在单表 20GB 以上,高频的更新就会导致数据库出现性能受限。并且随着表增大,性能问题会越来越明显。\n\n随着当游戏业务爆发时增长的时候,传统关系型数据库在分库分表的时候,需要进行应用改造,同时存在一定的停机维护时间。而且这些扩展完成后,在游戏的夕阳期进行收缩也需要进行应用改造,这无疑对业务开发和基础运维的部门造成了很多额外的工作量。\n\nDynamoDB 在应用到这个场景上是非常适用的。在业务发展任意阶段,都可以实现 0 停机的扩展,自动伸缩的特性。而且这一切对于应用层是完全透明的。同时在日常运维中也可以贴合业务负载进行动态扩缩容,从而进一步降低成本。\n\n### **概述**\n\n本文主要讲述在游戏场景下,根据 DynamoDB 的限制(每个项目都必须小于 400KB),在限制下尽可能存储更多的数据和当存储量超出限制时,扩展存储的最大化利用空间。重点描述如何利用 DynamoDB+S3 保存玩家存档中的大数据量属性,避免数据存在 S3 上后,在数据写入 S3 时,发生读取到 S3 旧存档的情况。同时利用 gzip 压缩减少数据大小,减少 IO 的开销提升性能。\n\n### **架构图**\n\n![image.png](https://dev-media.amazoncloud.cn/fd7fde90b68e4d58976928271cae1c0e_image.png)\n\n### **实战编码**\n\n#### **目标**\n\n1. 所有数据保存前都进行 gzip 压缩,读取后都用 gzip 解压。\n2. S3 存储和 DynamoDB 的 binary 字段存储可以自适应。如果用户数据压缩后如果大于指定的值则写入 S3,否则直接保存到当前数据库项目中的字段。\n3. DynamoDB 项目读取的时候,解析解压后的字段,如果字符串以 s3:// 开头,则继续从 S3 中获取数据\n4. 设置 S3 读锁字段,判断当前状态是否正在写入 S3,以阻塞读进程。在每个项目需要写入 S3 前都会设置 read_lock为Ture,S3 写成功后则设置为 False。读取记录后,read_lock 是否为 True,如果是判断被阻塞,进程会等待一段时间后进行重试,直到重试次数超出指定的值。重试超时后,读进程会认为写进程可能由于某种原因导致写永远无法成功,于是会将 read_lock 设置成 False。\n\n#### **第一步:初始化环境参数**\n\n```\\nfrom time import sleep\\nimport boto3\\nimport gzip\\nimport random\\nimport json\\nimport hashlib\\nimport logging\\n\\n# 写入 S3 的门槛,超过这个值数据会写入 S3,否则保存在数据库内,默认值 350KB\\nUPLOAD_TO_S3_THRESHOLD_BYTES = 358400\\n# 用户数据库保存的目标S3存储桶\\nUSER_DATA_BUCKET = 'linyesh-user-data'\\n# 遇到 S3 有读锁,重新请求最大次数,超出次数限制锁会被自动清除\\nS3_READ_LOCK_RETRY_TIMES = 10\\n# 遇到 S3 有读锁,读请求重试间隔时间\\nS3_READ_RETRY_INTERVAL = 0.2\\n\\ndynamodb = boto3.resource('dynamodb')\\ns3 = boto3.client('s3')\\nlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')\\nlogger = logging.getLogger(__name__)\\n```\n\n**参数说明**\n\n- UPLOAD_TO_S3_THRESHOLD_BYTES:为字段最大的数据存储长度限制。单位为:字节数。由于 DynamoDB 一个项目(Item)数据大小限制为 400KB。我们除了数据存档中最大字段还必须预留一部分空间给其他字段,避免整个 Item 超出 400KB。\n- USER_DATA_BUCKET:S3 用于存储超出 400KB 后的玩家大字段数据。需要提前建好,具体步骤参考:[创建存储桶](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/userguide/create-bucket-overview.html) \n- S3_READ_LOCK_RETRY_TIMES:限制当玩家在 S3 上的存档处在写入状态时候,读请求重试的次数。在项目处于读锁状态的时候,读进程会等待一段时间后重试。\n- S3_READ_RETRY_INTERVAL:读锁状态下,重试读的间隔时间,单位:秒。\n\n注意:```S3_READ_LOCK_RETRY_TIMES乘以S3_READ_RETRY_INTERVAL```\n 的时间理论上必须小于S3存档上传时间的最大值,因此实际使用本文中的代码应该根据存档可能的大小来调整这 2 个参数。否则可能存档会有大概率会发生脏读的情况。\n\n#### **第二步:创建 DynamoDB 表**\n\n```\\ndef create_tables():\\n \\"\\"\\"\\n 创建表\\n :return:\\n \\"\\"\\"\\n response = dynamodb.create_table(\\n TableName='players',\\n KeySchema=[\\n {\\n 'AttributeName': 'username',\\n 'KeyType': 'HASH'\\n }\\n ],\\n AttributeDefinitions=[\\n {\\n 'AttributeName': 'username',\\n 'AttributeType': 'S'\\n }\\n ],\\n ProvisionedThroughput={\\n 'ReadCapacityUnits': 5,\\n 'WriteCapacityUnits': 5\\n }\\n )\\n\\n # Wait until the table exists.\\n response.wait_until_exists()\\n\\n # Print out some data about the table.\\n logger.debug(response.item_count)\\n```\n\n#### **第三步:编写辅助逻辑**\n\n**指数级回退函数**\n\n```\\ndef run_with_backoff(function, retries=5, **function_parameters):\\n base_backoff = 0.1 # base 100ms backoff\\n max_backoff = 10 # sleep for maximum 10 seconds\\n tries = 0\\n while True:\\n try:\\n return function(function_parameters)\\n except (ConnectionError, TimeoutError):\\n if tries >= retries:\\n raise\\n backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))\\n logger.debug(f\\"sleeping for {backoff:.2f}s\\")\\n sleep(backoff)\\n tries += 1\\n```\n\n**S3 路径判断函数**\n\n```\\ndef is_s3_path(content):\\n return content.startswith('s3://')\\n```\n\n**S3 文件获取**\n\n```\\ndef get_s3_object(key):\\n response = s3.get_object(Bucket=USER_DATA_BUCKET, Key=s3_key_generator(key))\\n return response['Body']\\n```\n\n**检查大小超限**\n\n```\\ndef check_threshold(current_size):\\n return current_size > UPLOAD_TO_S3_THRESHOLD_BYTES\\n```\n\n**S3 Key 生成函数**\n\n这个函数可以将玩家的存档随机分配到 S3 桶下不同的 Prefix 中,这有利于提高 S3 中 IO 的性能。\n\n```\\ndef s3_key_generator(key): \\n s3_prefix = hashlib.md5((key).encode('utf-8')).hexdigest()[:8] \\n return s3_prefix + '/' + key \\n```\n\n**文件上传到 S3**\n\n```\\ndef upload_content_to_s3(obj_param): \\n s3_key = s3_key_generator(obj_param['key']) \\n try: \\n response = s3.put_object( \\n Body=obj_param['content_bytes'], \\n Bucket=USER_DATA_BUCKET, \\n Key=s3_key) \\n return \\"s3://%s/%s\\" % (USER_DATA_BUCKET, s3_key) \\n except Exception as e: \\n logger.error(e) \\n raise e \\n```\n\n#### **第四步:编写主体逻辑**\n\n**写入单个项目到 DynamoDB 数据库**\n\n```\\ndef put_item(load_data): \\n gzip_data = gzip.compress(load_data) # 压缩数据 \\n logger.debug('压缩后大小%.2fKB,原始大小 %.2fKB,压缩率 %.2f%%' % ( \\n len(gzip_data) / 1024.0, \\n len(load_data) / 1024.0, \\n 100.0 * len(gzip_data) / len(load_data))) \\n \\n table = dynamodb.Table('players') \\n player_username = 'player' + str(random.randint(1, 1000)) \\n if check_threshold(len(gzip_data)): \\n try: \\n # 读锁保护 \\n table.update_item( \\n Key={ \\n 'username': player_username, \\n }, \\n UpdateExpression=\\"set read_lock = :read_lock\\", \\n ExpressionAttributeValues={ \\n ':read_lock': True, \\n }, \\n ) \\n \\n # 写入数据到 S3 \\n s3_path = run_with_backoff(upload_content_to_s3, key=player_username, content_bytes=gzip_data) \\n # 解除读锁保护,同时存储数据在 S3 上到路径 \\n response = table.put_item( \\n Item={ \\n 'username': player_username, \\n 'read_lock': False, \\n 'inventory': gzip.compress(s3_path.encode(encoding='utf-8', errors='strict')), \\n } \\n ) \\n logger.debug('成功上传大纪录到S3,路径:%s' % s3_path) \\n except Exception as e: \\n logger.debug('存档失败') \\n logger.error(e) \\n else: \\n response = table.put_item( \\n Item={ \\n 'username': player_username, \\n 'inventory': gzip_data, \\n } \\n ) \\n logger.debug('成功上传纪录, username=%s' % player_username) \\n```\n\n**读取数据库中一条玩家记录**\n\n```\\ndef get_player_profile(uid): \\n \\"\\"\\" \\n 读取记录 \\n :param uid: 玩家 id \\n :return: \\n \\"\\"\\" \\n table = dynamodb.Table('players') \\n player_name = 'player' + str(uid) \\n \\n retry_count = 0 \\n while True: \\n response = table.get_item( \\n Key={ \\n 'username': player_name, \\n } \\n ) \\n \\n if 'Item' not in response: \\n logger.error('Not Found') \\n return {} \\n \\n item = response['Item'] \\n # 检查读锁信息, 如果存在锁根据参数设置,间隔一段时间重新读取记录 \\n if 'read_lock' in item and item['read_lock']: \\n retry_count += 1 \\n logger.info('当前第%d次重试' % retry_count) \\n # 如果超时无法读取记录,则消除读锁,并重新读取记录 \\n if retry_count < S3_READ_LOCK_RETRY_TIMES: \\n sleep(S3_READ_RETRY_INTERVAL) \\n continue \\n else: \\n table.update_item( \\n Key={ \\n 'username': player_name, \\n }, \\n UpdateExpression=\\"set read_lock = :read_lock\\", \\n ExpressionAttributeValues={ \\n ':read_lock': False, \\n }, \\n ) \\n \\n inventory_bin = gzip.decompress(item['inventory'].value) # 解压缩数据 \\n inventory_str = inventory_bin.decode(\\"utf-8\\") \\n if is_s3_path(inventory_str): \\n player_data = gzip.decompress(get_s3_object(player_name).read()) \\n inventory_json = json.loads(player_data) \\n else: \\n inventory_json = json.loads(inventory_str) \\n \\n user_profile = {**response['Item'], **{'inventory': inventory_json}} \\n return user_profile \\n```\n\n**最后,编写测试逻辑**\n\n准备几个不同大小的 json 文件,观察写入数据库中的变化。\n\n```\\nif __name__ == '__main__': \\n path_example = 'small.json' \\n # path_example = '500kb.json' \\n # path_example = '2MB.json' \\n with open(path_example, 'r') as load_f: \\n load_str = json.dumps(json.load(load_f)) \\n test_data = load_str.encode(encoding='utf-8', errors='strict') \\n put_item(test_data) \\n \\n # player_profile = get_player_profile(238) \\n # logger.info(player_profile) \\n```\n\n如果需要测试读锁,可以将数据库中单个项目的 read_lock 手动设置成 True,然后观察读取逻辑在这个过程中的变化。\n\n### **总结**\n\n在本次测试中发现,json 格式的数据使用 gzip 后,压缩率约为 25% 左右,理论上我们可以把单个项目(item) 中可以存储最大约为 1.6MB 的数据项。即便有少量压缩后超过 400KB 的数据,也可以存储到 S3 上,仅在 DynamoDB 中存储元数据和大字段数据在 S3 上的路径。\n\ngzip 会带来一些额外的计算和 IO 开销,但是这些开销主要会落在游戏服务器上,对于数据库来说反而减少了 IO 的开销。\n\n在大多数场景下,玩家数据即便不压缩也很少会超过 400KB。这种情况下,建议可以尝试对比压缩启用和不启用两种场景的性能数据。以决定哪种方式更适合自己的游戏。\n\n### **限制**\n\n对于存在单用户有高并发存档需求的游戏而言,以上设计中并未包含在数据存储在 S3 上后,出现并发写的场景考虑。如果有此场景的需求,需要一些应用逻辑或者架构调整。\n\n### **本篇作者**\n\n![image.png](https://dev-media.amazoncloud.cn/5785850c29b74f9687a23ae6c998cfb2_image.png)\n\n#### **林业**\n\nAmazon 解决方案架构师,负责基于 Amazon 的云计算方案的咨询与架构设计。拥有超过 14 年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IOT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。","render":"<h3><a id=\\"_0\\"></a><strong>前言</strong></h3>\\n<p>一些传统游戏架构中,采用 MySQL 存储玩家存档数据,利用分库分表分散单库单表的存储和性能压力,从而达到支持更多玩家的目的。随着数据量增长,数据表中 varchar 类型已经无法满足游戏中单字段的存储需求,而 blob 字段的应用对于这种架构下改造成本是最低的,因此一些游戏开始在最初设计的时候,数据库表结构就采用了 Blob 字段作为其玩家的游戏任务、道具等数据的存储。</p>\n<p>Blob 字段在 MySQL 5.6 / 5.7 中存在 bug(<a href=\\"https://bugs.mysql.com/bug.php?id=96466\\" target=\\"_blank\\">MySQL Bugs: #96466</a>),这个 bug 有概率导致数据库集群崩溃,造成数据丢失。即使在 MySQL 8.0 中,由于引擎本身设计的限制,在单表 20GB 以上,高频的更新就会导致数据库出现性能受限。并且随着表增大,性能问题会越来越明显。</p>\\n<p>随着当游戏业务爆发时增长的时候,传统关系型数据库在分库分表的时候,需要进行应用改造,同时存在一定的停机维护时间。而且这些扩展完成后,在游戏的夕阳期进行收缩也需要进行应用改造,这无疑对业务开发和基础运维的部门造成了很多额外的工作量。</p>\n<p>DynamoDB 在应用到这个场景上是非常适用的。在业务发展任意阶段,都可以实现 0 停机的扩展,自动伸缩的特性。而且这一切对于应用层是完全透明的。同时在日常运维中也可以贴合业务负载进行动态扩缩容,从而进一步降低成本。</p>\n<h3><a id=\\"_10\\"></a><strong>概述</strong></h3>\\n<p>本文主要讲述在游戏场景下,根据 DynamoDB 的限制(每个项目都必须小于 400KB),在限制下尽可能存储更多的数据和当存储量超出限制时,扩展存储的最大化利用空间。重点描述如何利用 DynamoDB+S3 保存玩家存档中的大数据量属性,避免数据存在 S3 上后,在数据写入 S3 时,发生读取到 S3 旧存档的情况。同时利用 gzip 压缩减少数据大小,减少 IO 的开销提升性能。</p>\n<h3><a id=\\"_14\\"></a><strong>架构图</strong></h3>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/fd7fde90b68e4d58976928271cae1c0e_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"_18\\"></a><strong>实战编码</strong></h3>\\n<h4><a id=\\"_20\\"></a><strong>目标</strong></h4>\\n<ol>\\n<li>所有数据保存前都进行 gzip 压缩,读取后都用 gzip 解压。</li>\n<li>S3 存储和 DynamoDB 的 binary 字段存储可以自适应。如果用户数据压缩后如果大于指定的值则写入 S3,否则直接保存到当前数据库项目中的字段。</li>\n<li>DynamoDB 项目读取的时候,解析解压后的字段,如果字符串以 s3:// 开头,则继续从 S3 中获取数据</li>\n<li>设置 S3 读锁字段,判断当前状态是否正在写入 S3,以阻塞读进程。在每个项目需要写入 S3 前都会设置 read_lock为Ture,S3 写成功后则设置为 False。读取记录后,read_lock 是否为 True,如果是判断被阻塞,进程会等待一段时间后进行重试,直到重试次数超出指定的值。重试超时后,读进程会认为写进程可能由于某种原因导致写永远无法成功,于是会将 read_lock 设置成 False。</li>\n</ol>\\n<h4><a id=\\"_27\\"></a><strong>第一步:初始化环境参数</strong></h4>\\n<pre><code class=\\"lang-\\">from time import sleep\\nimport boto3\\nimport gzip\\nimport random\\nimport json\\nimport hashlib\\nimport logging\\n\\n# 写入 S3 的门槛,超过这个值数据会写入 S3,否则保存在数据库内,默认值 350KB\\nUPLOAD_TO_S3_THRESHOLD_BYTES = 358400\\n# 用户数据库保存的目标S3存储桶\\nUSER_DATA_BUCKET = 'linyesh-user-data'\\n# 遇到 S3 有读锁,重新请求最大次数,超出次数限制锁会被自动清除\\nS3_READ_LOCK_RETRY_TIMES = 10\\n# 遇到 S3 有读锁,读请求重试间隔时间\\nS3_READ_RETRY_INTERVAL = 0.2\\n\\ndynamodb = boto3.resource('dynamodb')\\ns3 = boto3.client('s3')\\nlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')\\nlogger = logging.getLogger(__name__)\\n</code></pre>\\n<p><strong>参数说明</strong></p>\\n<ul>\\n<li>UPLOAD_TO_S3_THRESHOLD_BYTES:为字段最大的数据存储长度限制。单位为:字节数。由于 DynamoDB 一个项目(Item)数据大小限制为 400KB。我们除了数据存档中最大字段还必须预留一部分空间给其他字段,避免整个 Item 超出 400KB。</li>\n<li>USER_DATA_BUCKET:S3 用于存储超出 400KB 后的玩家大字段数据。需要提前建好,具体步骤参考:<a href=\\"https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/userguide/create-bucket-overview.html\\" target=\\"_blank\\">创建存储桶</a></li>\\n<li>S3_READ_LOCK_RETRY_TIMES:限制当玩家在 S3 上的存档处在写入状态时候,读请求重试的次数。在项目处于读锁状态的时候,读进程会等待一段时间后重试。</li>\n<li>S3_READ_RETRY_INTERVAL:读锁状态下,重试读的间隔时间,单位:秒。</li>\n</ul>\\n<p>注意:<code>S3_READ_LOCK_RETRY_TIMES乘以S3_READ_RETRY_INTERVAL</code><br />\\n的时间理论上必须小于S3存档上传时间的最大值,因此实际使用本文中的代码应该根据存档可能的大小来调整这 2 个参数。否则可能存档会有大概率会发生脏读的情况。</p>\n<h4><a id=\\"_DynamoDB__63\\"></a><strong>第二步:创建 DynamoDB 表</strong></h4>\\n<pre><code class=\\"lang-\\">def create_tables():\\n &quot;&quot;&quot;\\n 创建表\\n :return:\\n &quot;&quot;&quot;\\n response = dynamodb.create_table(\\n TableName='players',\\n KeySchema=[\\n {\\n 'AttributeName': 'username',\\n 'KeyType': 'HASH'\\n }\\n ],\\n AttributeDefinitions=[\\n {\\n 'AttributeName': 'username',\\n 'AttributeType': 'S'\\n }\\n ],\\n ProvisionedThroughput={\\n 'ReadCapacityUnits': 5,\\n 'WriteCapacityUnits': 5\\n }\\n )\\n\\n # Wait until the table exists.\\n response.wait_until_exists()\\n\\n # Print out some data about the table.\\n logger.debug(response.item_count)\\n</code></pre>\\n<h4><a id=\\"_98\\"></a><strong>第三步:编写辅助逻辑</strong></h4>\\n<p><strong>指数级回退函数</strong></p>\\n<pre><code class=\\"lang-\\">def run_with_backoff(function, retries=5, **function_parameters):\\n base_backoff = 0.1 # base 100ms backoff\\n max_backoff = 10 # sleep for maximum 10 seconds\\n tries = 0\\n while True:\\n try:\\n return function(function_parameters)\\n except (ConnectionError, TimeoutError):\\n if tries &gt;= retries:\\n raise\\n backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))\\n logger.debug(f&quot;sleeping for {backoff:.2f}s&quot;)\\n sleep(backoff)\\n tries += 1\\n</code></pre>\\n<p><strong>S3 路径判断函数</strong></p>\\n<pre><code class=\\"lang-\\">def is_s3_path(content):\\n return content.startswith('s3://')\\n</code></pre>\\n<p><strong>S3 文件获取</strong></p>\\n<pre><code class=\\"lang-\\">def get_s3_object(key):\\n response = s3.get_object(Bucket=USER_DATA_BUCKET, Key=s3_key_generator(key))\\n return response['Body']\\n</code></pre>\\n<p><strong>检查大小超限</strong></p>\\n<pre><code class=\\"lang-\\">def check_threshold(current_size):\\n return current_size &gt; UPLOAD_TO_S3_THRESHOLD_BYTES\\n</code></pre>\\n<p><strong>S3 Key 生成函数</strong></p>\\n<p>这个函数可以将玩家的存档随机分配到 S3 桶下不同的 Prefix 中,这有利于提高 S3 中 IO 的性能。</p>\n<pre><code class=\\"lang-\\">def s3_key_generator(key): \\n s3_prefix = hashlib.md5((key).encode('utf-8')).hexdigest()[:8] \\n return s3_prefix + '/' + key \\n</code></pre>\\n<p><strong>文件上传到 S3</strong></p>\\n<pre><code class=\\"lang-\\">def upload_content_to_s3(obj_param): \\n s3_key = s3_key_generator(obj_param['key']) \\n try: \\n response = s3.put_object( \\n Body=obj_param['content_bytes'], \\n Bucket=USER_DATA_BUCKET, \\n Key=s3_key) \\n return &quot;s3://%s/%s&quot; % (USER_DATA_BUCKET, s3_key) \\n except Exception as e: \\n logger.error(e) \\n raise e \\n</code></pre>\\n<h4><a id=\\"_167\\"></a><strong>第四步:编写主体逻辑</strong></h4>\\n<p><strong>写入单个项目到 DynamoDB 数据库</strong></p>\\n<pre><code class=\\"lang-\\">def put_item(load_data): \\n gzip_data = gzip.compress(load_data) # 压缩数据 \\n logger.debug('压缩后大小%.2fKB,原始大小 %.2fKB,压缩率 %.2f%%' % ( \\n len(gzip_data) / 1024.0, \\n len(load_data) / 1024.0, \\n 100.0 * len(gzip_data) / len(load_data))) \\n \\n table = dynamodb.Table('players') \\n player_username = 'player' + str(random.randint(1, 1000)) \\n if check_threshold(len(gzip_data)): \\n try: \\n # 读锁保护 \\n table.update_item( \\n Key={ \\n 'username': player_username, \\n }, \\n UpdateExpression=&quot;set read_lock = :read_lock&quot;, \\n ExpressionAttributeValues={ \\n ':read_lock': True, \\n }, \\n ) \\n \\n # 写入数据到 S3 \\n s3_path = run_with_backoff(upload_content_to_s3, key=player_username, content_bytes=gzip_data) \\n # 解除读锁保护,同时存储数据在 S3 上到路径 \\n response = table.put_item( \\n Item={ \\n 'username': player_username, \\n 'read_lock': False, \\n 'inventory': gzip.compress(s3_path.encode(encoding='utf-8', errors='strict')), \\n } \\n ) \\n logger.debug('成功上传大纪录到S3,路径:%s' % s3_path) \\n except Exception as e: \\n logger.debug('存档失败') \\n logger.error(e) \\n else: \\n response = table.put_item( \\n Item={ \\n 'username': player_username, \\n 'inventory': gzip_data, \\n } \\n ) \\n logger.debug('成功上传纪录, username=%s' % player_username) \\n</code></pre>\\n<p><strong>读取数据库中一条玩家记录</strong></p>\\n<pre><code class=\\"lang-\\">def get_player_profile(uid): \\n &quot;&quot;&quot; \\n 读取记录 \\n :param uid: 玩家 id \\n :return: \\n &quot;&quot;&quot; \\n table = dynamodb.Table('players') \\n player_name = 'player' + str(uid) \\n \\n retry_count = 0 \\n while True: \\n response = table.get_item( \\n Key={ \\n 'username': player_name, \\n } \\n ) \\n \\n if 'Item' not in response: \\n logger.error('Not Found') \\n return {} \\n \\n item = response['Item'] \\n # 检查读锁信息, 如果存在锁根据参数设置,间隔一段时间重新读取记录 \\n if 'read_lock' in item and item['read_lock']: \\n retry_count += 1 \\n logger.info('当前第%d次重试' % retry_count) \\n # 如果超时无法读取记录,则消除读锁,并重新读取记录 \\n if retry_count &lt; S3_READ_LOCK_RETRY_TIMES: \\n sleep(S3_READ_RETRY_INTERVAL) \\n continue \\n else: \\n table.update_item( \\n Key={ \\n 'username': player_name, \\n }, \\n UpdateExpression=&quot;set read_lock = :read_lock&quot;, \\n ExpressionAttributeValues={ \\n ':read_lock': False, \\n }, \\n ) \\n \\n inventory_bin = gzip.decompress(item['inventory'].value) # 解压缩数据 \\n inventory_str = inventory_bin.decode(&quot;utf-8&quot;) \\n if is_s3_path(inventory_str): \\n player_data = gzip.decompress(get_s3_object(player_name).read()) \\n inventory_json = json.loads(player_data) \\n else: \\n inventory_json = json.loads(inventory_str) \\n \\n user_profile = {**response['Item'], **{'inventory': inventory_json}} \\n return user_profile \\n</code></pre>\\n<p><strong>最后,编写测试逻辑</strong></p>\\n<p>准备几个不同大小的 json 文件,观察写入数据库中的变化。</p>\n<pre><code class=\\"lang-\\">if __name__ == '__main__': \\n path_example = 'small.json' \\n # path_example = '500kb.json' \\n # path_example = '2MB.json' \\n with open(path_example, 'r') as load_f: \\n load_str = json.dumps(json.load(load_f)) \\n test_data = load_str.encode(encoding='utf-8', errors='strict') \\n put_item(test_data) \\n \\n # player_profile = get_player_profile(238) \\n # logger.info(player_profile) \\n</code></pre>\\n<p>如果需要测试读锁,可以将数据库中单个项目的 read_lock 手动设置成 True,然后观察读取逻辑在这个过程中的变化。</p>\n<h3><a id=\\"_294\\"></a><strong>总结</strong></h3>\\n<p>在本次测试中发现,json 格式的数据使用 gzip 后,压缩率约为 25% 左右,理论上我们可以把单个项目(item) 中可以存储最大约为 1.6MB 的数据项。即便有少量压缩后超过 400KB 的数据,也可以存储到 S3 上,仅在 DynamoDB 中存储元数据和大字段数据在 S3 上的路径。</p>\n<p>gzip 会带来一些额外的计算和 IO 开销,但是这些开销主要会落在游戏服务器上,对于数据库来说反而减少了 IO 的开销。</p>\n<p>在大多数场景下,玩家数据即便不压缩也很少会超过 400KB。这种情况下,建议可以尝试对比压缩启用和不启用两种场景的性能数据。以决定哪种方式更适合自己的游戏。</p>\n<h3><a id=\\"_302\\"></a><strong>限制</strong></h3>\\n<p>对于存在单用户有高并发存档需求的游戏而言,以上设计中并未包含在数据存储在 S3 上后,出现并发写的场景考虑。如果有此场景的需求,需要一些应用逻辑或者架构调整。</p>\n<h3><a id=\\"_306\\"></a><strong>本篇作者</strong></h3>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/5785850c29b74f9687a23ae6c998cfb2_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"_310\\"></a><strong>林业</strong></h4>\\n<p>Amazon 解决方案架构师,负责基于 Amazon 的云计算方案的咨询与架构设计。拥有超过 14 年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IOT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭