使用 Amazon Bedrock 和 RAG 构建 Text2SQL 行业数据查询助手

SQL
Amazon SageMaker
Amazon Bedrock
0
0
### 背景 随着企业数据量的持续增长,如何让非技术人员也能轻松分析数据、获得商业洞察成为了当前的痛点。本文将介绍如何使用亚马逊云科技的大语言模型服务 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) 以及 RAG (Retrieval Augmented Generation),实现 Text2SQL 功能,以此为基础构建基于大语言模型(LLM)的行业数据查询助手,达到使用自然语言询问直接获取数据分析结果的目的。针对不同行业,我们都能见到类似的数据查询难题:通用的 BI 看板无法对即时、特殊的需求进行相应。具体地: 【**场景 A**】某家俱品牌出海电商运营辅助场景。1、数据分析 SQL 生成工具:利用 LLM 辅助数据分析代码工具,对收集到的数据进行分析。可以帮助数据工程团队在数据探索和开发阶段提高效率。2、电商服务平台能够收集和整理与销量相关的数据,对亚马逊电商和第三方独立站运营数据,包括订单量、访问量、产品评价、客诉数、站点 CPC 和 CPO 等进行电商运营辅助分析、选品策略辅助分析。 【**场景 B**】某冷链物流公司,对人(司机)、货(货物信息)、场(仓储)、动作(配送)等场景中进行提问,实现更精细化的监控和管理,从而提升运营效率、降低成本,并提供更优质的物流服务。例如:今日华北地区配送超时的订单比例有多少,华东地区最近一周哪些司机的配送过程中出现了频繁的延误问题等? 【**场景 C**】某头部广告公司,业务目标是优化广告投放决策,提高广告效果,降低试错成本。希望尝试创新应用 LLM 的语义理解能力,进行广告数据分析、广告词汇扩展等工作,可以快速进行广告拓词、广告归因分析等,辅助决策,替代传统需要大量手工构建模型的方式,提升工作效率。利用 LLM 的语义和学习能力进行知识推理和数据学习,实现更智能的广告投放决策,可以辅助广告公司优化媒介选择、时间安排、创意设计、预算分配等多方面决策。 技术层面,Text2SQL 是一个典型的[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)任务,在大语言模型(LLM)出现之前,曾有 seq2seq 等模型架构将其视作机器翻译任务,建立数据和自然语言之间的对齐和映射关系,但在 WikiSQL 等数据集上准确率非常低。在 LLM 出现后,通过其推理、理解以及指令遵循能力,Text2SQL 任务的成功率可以得到非常大的提升。本文旨在通过讲解不同的行业场景中落地的难点,来展现如何利用亚马逊云科技的服务加以设计理念来解决它们。 ### 用户体验 第一步:用户通过问答界面提出问题,如“本月销量前十的产品线是什么?” 第二步:系统直接返回 SQL 生成结果,并通过其查询到对应的数据结果 ```js SELECT i.item_id, COUNT(*) AS num_sales FROM interactions WHERE i.event_type = 'purchase' AND MONTH(i.timestamp) = 12 GROUP BY i.item_id ORDER BY num_sales DESC LIMIT 10; ``` ### 方案难点 构建自然语言数据查询助手是许多行业中企业的重点需求,但实现过程中困难重重。其中最核心的问题是 LLM 对于长尾知识、特定规则的不熟悉,以及对过分复杂提问的理解难度较高。利用 LLM 的推理能力和通用知识理解,辅以外部的知识体系(RAG)和执行规则来达到较好的生成效果。 ![image.png](https://dev-media.amazoncloud.cn/03ca8821dc3942dc95c7c649d46eb4d4_image.png "image.png") #### 1)LLM 无法识别长尾领域知识 在**电商领域**中,跳出率指的是一个衡量网站用户行为的指标,表示只访问了一个页面就离开网站的访客数量占所有访客的比例。同时,基于用户习惯的不同,跳出率也可能被称为“单页访问率”或“单次访问率”。若有相应问题是用了单页访问率作为问题,那么大模型有可能无法理解他们所说的内容。 因此可以使用增强检索(RAG)来获取正确的范例,作为生成 SQL 时 Prompt 的一部分来提高性能。这类做法通常被称为 Few-Shot (FS),指模型在推理时给予少量样本,但不进行权重更新。在 Text2SQL 上,我们有两种做法:1)使用 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) 及 Amazon OpenSearch 来对问题进行向量化,并在问题的向量库中匹配类似的问题。2)使用传统 NLP 模型进行命名实体识别(NER),并进而进行标签匹配,最终把对应正确的问题和 SQL 获取。在持续的客户使用和反馈频率上,我们可以长期提升生成准确率。 ![image.png](https://dev-media.amazoncloud.cn/1cc864b93b67457c818f9bb892ad3018_image.png "image.png") RAG in Text2SQL 的若干实现方式 #### 2)数据库的元数据过于庞大复杂,影响最终效果 在**呼叫中心案例**中,存在表格可能有一个表专门用于存储用户基本信息(如用户 ID、姓名、联系方式等),另一个表用于存储用户的账单信息(如账单 ID、用户 ID、账单日期、金额等),再有一个表用于存储用户的呼叫记录(如呼叫 ID、用户 ID、呼叫日期、通话时长等)。这样,每个表的宽度(即字段数量)可能都不会很大,但是由于需要处理的数据种类多,所以表的数量可能会非常多,常常出现上百张表。表的复杂不仅提高了 SQL 生成难度也受到输入字符数量的限制。 因此,在原始数据上构建指标体系是最有效的方案,通过实现新的少量宽表来减少 SQL 生成的复杂度。 ![image.png](https://dev-media.amazoncloud.cn/49e649bfc3044bc0a54bd4be566c1f00_image.png "image.png") 指标层的提取的可以大大降低生成难度 与此同时,我们可以构建选表流程(Schema Linking)来将表格的选取当作单独的步骤剥离,在选好表之后,再将对应表的结构加入 Prompt 中。这样可以有效地减少 Token 的输入量及提高生成的准确率。 ![image.png](https://dev-media.amazoncloud.cn/c8407b93a72a4565b84ed98d920e7732_image.png "image.png") Schema Linking 可以解决 Input Token 不足的和过高复杂度的问题 #### 3)语法错误 在 **Text2SQL 任务**中,经常会出现执行错误,例如:1)Group By,Join,Nested 的错误,由数据库在执行时返回;2)列、表的选择错误,甚至无法将表名或列名拼写对。通常我们采用两种方式提升准确率: 1. 使用思维链(Chain-of-Thoughts)拆解步骤:把一个多步推理问题分解出多个中间步骤,并且让大语言模型更加可解释,并且提升整体生成质量,例如在复杂的 Text2SQL 任务中。 2. 使用模型自我修复错误:这通常也是思维链的一部分。以下是一个示例: 错误的 SQL 输入: ```js SELECT * FROM Orders WHERE Customer_ID = 100; ``` 将错误重新喂入 LLM: ```js (Previous DDL, hints, and question pasted here) After we executed SQL (SQL pasted here), we observed the following error. ```ERROR: column "Customer_ID" does not exist``` Please fix the SQL and give explanation: ``` 输出的回答: ```js The issue here is that the column name is incorrect. In our Orders table, the column is named CustomerID, not Customer_ID. Here is the fixed SQL statement: ```sql SELECT * FROM Orders WHERE CustomerID = 100; ``` #### 4)提问无法回答 当提问者问出“为什么我的销量下降”,这个问题已经不再是 SQL 语句能够解决的。因此我们需要进行意图识别来进行问题分流,其目的是: 1. 将最常见通用的问题导向模版,提升整体的准确率 2. 将不太可能回答的问题识别出来,并告知用户超出的提问范围 3. 将可以回答,但暂时缺少关键信息的问题识别出来,并告知用户需要补充哪些信息 4. 将模版之外的问题直接导向 Text2SQL 模块 ![image.png](https://dev-media.amazoncloud.cn/c194412611114e06abb169c8082ee5a8_image.png "image.png") 利用 LLM 实现意图识别和意图分类,再区别化处理不同形态的分类 ### 解决方案指引 我们提供的标准方案构建于亚马逊云科技的服务体系之上,问答页面使用 ECS 容器上部署。用户问题由 API Gateway 路由到 Lambda 函数,在 Amazon Lambda 中查询历史问答库、转换 SQL、获取结果。历史问答库使用 OpenSearch 服务。SQL 转换使用 RAG 大语言模型服务。数据库使用 Amazon Aurora 储存必要的查询、用户信息。该系统最大的价值是提供标准部署方案,以较高的准确度使用 Amazon Bedrock 自动完成自然语言到 SQL 的转换,无需人工编码规则。 ![image.png](https://dev-media.amazoncloud.cn/9598e123090d4694aec37a8bbe7e7d67_image.png "image.png") 架构图 ![image.png](https://dev-media.amazoncloud.cn/b07683a6c02442788a077843850ff37d_image.png "image.png") ![image.png](https://dev-media.amazoncloud.cn/66f9db687fce488c8b0b958f1b9d36db_image.png "image.png") ![image.png](https://dev-media.amazoncloud.cn/be86f59e72ed4cc7b7847a355e21827a_image.png "image.png") ### 示例代码 生成 SQL 的模型方面,我们可以根据客户需求,选择 Bedrock 提供的业界翘楚的商业化模型(如:Claude),也可以选择在亚马逊云科技机器学习基础设施平台 Amazon SageMaker 上部署 code generation 的开源模型(如:SQLCoder/CodeLlama)。 如果客户没有太多 LLM 技术栈的积累,及足够的算法工程师团队,那么 Amazon Bedrock 是一个较好的选择,在 Amazon Bedrock 上提供了广泛的商业化成熟的 Functional Model,其中 Anthropic 的 Claude 模型经测试在各项 sql 生成中稳定性和功能均表现突出。 Amazon Bedrock 通过 SDK 调用 Claude 模型进行 sql 生成及自然语言交互的核心代码示例如下: ```js def get_bedrock_client(assumed_role: Optional[str] = None, region: Optional[str] = None, runtime: Optional[bool] = True, ): if region is None: target_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION")) else: target_region = region print(f"Create new client\n Using region: {target_region}") session_kwargs = {"region_name": target_region} client_kwargs = {**session_kwargs} profile_name = os.environ.get("AWS_PROFILE") if profile_name: print(f" Using profile: {profile_name}") session_kwargs["profile_name"] = profile_name retry_config = Config( region_name=target_region, retries={ "max_attempts": 10, "mode": "standard", }, ) session = boto3.Session(**session_kwargs) if assumed_role: print(f" Using role: {assumed_role}", end='') sts = session.client("sts") response = sts.assume_role( RoleArn=str(assumed_role), RoleSessionName="langchain-llm-1" ) print(" ... successful!") client_kwargs["aws_access_key_id"] = response["Credentials"]["AccessKeyId"] client_kwargs["aws_secret_access_key"] = response["Credentials"]["SecretAccessKey"] client_kwargs["aws_session_token"] = response["Credentials"]["SessionToken"] if runtime: service_name='bedrock-runtime' else: service_name='bedrock' client_kwargs["aws_access_key_id"] = os.environ.get("AWS_ACCESS_KEY_ID","") client_kwargs["aws_secret_access_key"] = os.environ.get("AWS_SECRET_ACCESS_KEY","") bedrock_client = session.client( service_name=service_name, config=retry_config, **client_kwargs ) print("boto3 Bedrock client successfully created!") print(bedrock_client._endpoint) return bedrock_client #role based initial client####### os.environ["AWS_DEFAULT_REGION"] = "us-west-2" # E.g. "us-east-1" os.environ["AWS_PROFILE"] = "default" os.environ["BEDROCK_ASSUME_ROLE"] = "arn:aws:iam::687912291502:role/service-role/AmazonSageMaker-ExecutionRole-20211013T113123" # E.g. "arn:aws:..." #新boto3 sdk session方式初始化bedrock boto3_bedrock = get_bedrock_client( assumed_role=os.environ.get("BEDROCK_ASSUME_ROLE", None), region=os.environ.get("AWS_DEFAULT_REGION", None) ) parameters_bedrock = { "max_tokens_to_sample": 2048, #"temperature": 0.5, "temperature": 0, #"top_k": 250, #"top_p": 1, "stop_sequences": ["\n\nHuman"], } ####langchain bedrock集成示例 bedrock_llm = Bedrock(model_id="anthropic.claude-v2", client=boto3_bedrock, model_kwargs=parameters_ db = SQLDatabase.from_uri( "mysql+pymysql://admin:admin12345678@database-us-west-2-demo.cluster-c1qvx9wzmmcz.us-west-2.rds.amazonaws.com/llm", include_tables=['dws_truck_portrait_index_sum_da','dws_ots_waybill_info_da','ads_bi_quality_monitor_shipping_detail','dim_pub_truck_info'], # we include only one table to save tokens in the prompt :) sample_rows_in_table_info=0) db_chain = CustomerizedSQLDatabaseChain.from_llm(llm=bedrock_llm, db=db, verbose=False, return_sql=True) db_chain.run("2022年的运输总量是多少吨?") ``` 如上代码示例,其中 - get_bedrock_client:该函数为 amazon boto3 sdk 获取 bedrock 服务客户端的方法,可以支持 AKSK 或者 IAM Role 方式 - db =SQLDatabase.from_uri 及 CustomerizedSQLDatabaseChain:该部分为通过 bedrock 客户端调用 Claude 模型,并通过 langchain SqlDatabaseChain 进行数据库连接及最终 sql 生成 如果客户希望有更多的模型选择性,且自身算法工程师的研发团队有一定的技术实力,希望对模型做更多的定制,以便更加适应自身业务系统的个性化需求(如 fine tune 模型微调),从而更加嵌入到自身的端到端业务流程,那可以选择业界的开源 sql 生成模型,其中 SQLCoder 模型在基本 sql 生成中稳定性和功能较其他开源模型更加优质。 SQLCoder 模型是一个 SOTA 大型语言模型, 在开发者的开源评估框架 SQLEval 中,SQLCoder 的性能优于其他多款主要的开源模型。其中 SQLCoder 15b 参数的模型,在大量 SQL 复杂查询上进行微调后,在针对单个数据库模式进行推理生成时,它的性能和功能在某些场景甚至优于 OpenAI 的 GPT-3.5,因此比较适合对自身业务数据库定制化需求比较高的 SQL 生成场景。 在 Amazon SageMaker 平台上,可以通过 inference endpoint 终端节点,简单的几行代码即可部署 SQLCoder 的模型,一旦部署完成,即可通过 SageMaker 的 inference sdk 接口进行推理生成,简化工程化落地工作量,提高生产化上线的效率。 使用 Amazon SageMaker inference endpoint 部署 SQLcoder 15b 模型及推理调用的代码示例如下: ```js inference_image_uri = ( f"763104351884.dkr.ecr.{region}.amazonaws.com/djl-inference:0.23.0-deepspeed0.9.5-cu118" ) engine=Python option.entryPoint=model.py option.load_in_4bit=TRUE option.tensor_parallel_degree=4 option.s3url=s3://sagemaker-us-west-2-687912291502/LLM-RAG/workshop/LLM_sqlcoder_model/ ####SageMaker inference endpoint部署sdk from sagemaker.utils import name_from_base import boto3 model_name = name_from_base(f"sqlcoder") #Note: Need to specify model_name print(f"Image going to be used is ---- > {inference_image_uri}") create_model_response = sm_client.create_model( ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer={ "Image": inference_image_uri, "ModelDataUrl": s3_code_artifact }, ) model_arn = create_model_response["ModelArn"] print(f"Created Model: {model_arn}") endpoint_config_name = f"{model_name}-config" endpoint_name = f"{model_name}-endpoint" # Note: ml.g4dn.2xlarge 也可以选择 endpoint_config_response = sm_client.create_endpoint_config( EndpointConfigName=endpoint_config_name, ProductionVariants=[ { "VariantName": "variant1", "ModelName": model_name, "InstanceType": "ml.g4dn.12xlarge", "InitialInstanceCount": 1 }, ], ) create_endpoint_response = sm_client.create_endpoint( EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name ) print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}") #### 模型推理测试 import json import boto3 smr_client = boto3.client("sagemaker-runtime") parameters = { "max_new_tokens": 400, "do_sample": False } question="我想知道top客户的住址" sql_prompt= """ ### Instructions: Your task is to convert a question into a SQL query, given a Mysql database schema. Adhere to these rules: - **Deliberately go through the question and database schema word by word** to appropriately answer the question - **Use Table Aliases** to prevent ambiguity. For example, `SELECT table1.col1, table2.col1 FROM table1 JOIN table2 ON table1.id = table2.id`. - When creating a ratio, always cast the numerator as float ### Input: Generate a SQL query that answers the question `{question}`. This query will run on a database whose schema is represented in this string: ...省略 ### Response: Based on your instructions, here is the SQL query I have generated to answer the question `{question}`: ```sql """.format(question=question) response_model = smr_client.invoke_endpoint( EndpointName=endpoint_name, Body=json.dumps( { "inputs": sql_prompt, "parameters": parameters } ), ContentType="application/json", ) result_sql=response_model['Body'].read().decode('utf8') print(result_sql.split("```sql")[-1].split("```")[0].split(";")[0].strip().replace("\\\\n"," ") + ";") ``` 如上示例代码所示,其中: - inference_image_uri:该部分为 SageMaker inference 推理镜像,SageMaker 提供了各种开箱即用的推理容器镜像,如 deepspeed,vllm,tensorRT,此处我们选择 deepspeed 的推理加速镜像 - engine=Python:该部分及其它配置项为 inference 推理容器的配置文件,可以指定推理的各种优化方式及模型加载位置等 - entryPoint=model.py:指定模型推理脚本 - load_in_4bit=TRUE:是否 4bit 量化加载 - tensor_parallel_degree=4:多卡并行推理 - s3url=s3://sagemaker-us-west-2-687912291502/LLM-RAG/workshop/LLM_sqlcoder_model/: 模型文件 S3 路径 - create_endpoint_config:配置部署终端节点,如机型,实例数量 - sql_prompt:生成 sql 提示词 - response_model = smr_client.invoke_endpoint:触发推理生成 sql 在元数据召回/意图识别精准调整 sql 生成时,向量化模型也是其中重要的一环,通过将自然语言文本(包括一个单词、短语甚至大型文档)转换为数字表示形式,然后使用这些向量从向量数据库中准确搜索相关段落,这样您就能充分利用自己的私域专业元数据(如数据库名,表名及明细字段等)与 sql 生成的基础模型(FM)组合,进行基于语义相似度的搜索,提高检索 sql 生成的库表元数据的准确性,及终端用户提问的语义识别的准确性。 关于向量化模型的选择,与 sql 生成模型一样,也可以选择商业化或者开源模型两种部署方式,其中商业化模型可以选择 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) Titan Embedding 模型,它针对文本检索进行了优化,超过 25 种语言,包括英语、中文和西班牙语。最多可以输入 8192 个 token,因此非常适合处理 sql 生成场景中较大的元数据检索的需求。并且 Bedrock Titan 的输出向量可以支持 1,536 个维度的长度,使其具有更高的准确性。可通过 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) 的[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)体验获得,因此您可以使用单个 API 轻松访问它,无需管理任何基础设施。 同样,如果针对向量嵌入模型需要更多的定制,以在语义相似检索时获得更高的召回准确率,那么开源的bge模型是一个不错的选择。BGE 模型是国内智源发布的开源可商用的中英文语义向量模型 BGE(BAAI General Embedding),BGE 保持了同等参数量级模型中的最小向量维度,使用成本更低。在中文语义向量综合表征能力评测 C-MTEB 的实验结果显示(Table 1),BGE 中文模型(BGE-zh)在对接大语言模型最常用到的检索能力上领先优势尤为显著。 在 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 的 deploy 部署 bge large zh 版本模型及 embedding 向量化核心代码示例如下: ```js serving.properties engine=Python option.tensor_parallel_degree=1 option.s3url = s3://sagemaker-us-east-1-106839800180/LLM-RAG/workshop/bge-zh-model/ #### SageMaker endpoint创建及部署 model_name = name_from_base("bge-zh-15") print(model_name) print(f"Image going to be used is ---- > {inference_image_uri}") create_model_response = sm_client.create_model( ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer={ "Image": inference_image_uri, "ModelDataUrl": s3_code_artifact }, ) model_arn = create_model_response["ModelArn"] print(f"Created Model: {model_arn}") endpoint_config_name = f"{model_name}-config" endpoint_name = f"{model_name}-endpoint" endpoint_config_response = sm_client.create_endpoint_config( EndpointConfigName=endpoint_config_name, ProductionVariants=[ { "VariantName": "variant1", "ModelName": model_name, "InstanceType": "ml.g4dn.xlarge", "InitialInstanceCount": 1, "ContainerStartupHealthCheckTimeoutInSeconds": 15*60, }, ], ) create_endpoint_response = sm_client.create_endpoint( EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name ) print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}") def load_model(properties): tensor_parallel = properties["tensor_parallel_degree"] model_location = properties['model_dir'] if "model_id" in properties: model_location = properties['model_id'] logging.info(f"Loading model in {model_location}") model = FlagModel(model_location) return model model = None def handle(inputs: Input): global model if not model: model = load_model(inputs.get_properties()) if inputs.is_empty(): return None data = inputs.get_as_json() input_sentences = None inputs = data["inputs"] if isinstance(inputs, list): input_sentences = inputs else: input_sentences = [inputs] is_query = data["is_query"] instruction = data["instruction"] logging.info(f"inputs: {input_sentences}") logging.info(f"is_query: {is_query}") logging.info(f"instruction: {instruction}") if is_query and instruction: input_sentences = [ instruction + sent for sent in input_sentences ] sentence_embeddings = model.encode(input_sentences) result = {"sentence_embeddings": sentence_embeddings} return Output().add_as_json(result) def get_vector_by_sm_endpoint(questions, sm_client, endpoint_name): parameters = { } response_model = sm_client.invoke_endpoint( EndpointName=endpoint_name, Body=json.dumps( { "inputs": questions, "is_query": True, "instruction" : "Represent this sentence for searching relevant passages:" } ), ContentType="application/json", ) json_str = response_model['Body'].read().decode('utf8') json_obj = json.loads(json_str) embeddings = json_obj['sentence_embeddings'] return embeddings ``` 如上示例代码所示,其中: - load_model:模型加载的方法,BGE 需要使用 FlagModel 类进行加载 - handle:使用 bge 模型对输入文本进行向量化的方法,其中 instruction 指令为“为以下中文文本生成向量嵌入” - properties:与 SQLCoder 模型部署一样,BGE 向量模型在 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 上的部署配置文件 - get_vector_by_sm_endpoint:调用上面的 SageMaker 的 endpoint 进行文本向量化 ### 总结 不同行业对数据的需求千变万化,将自然语言问题自动转换为 SQL 查询一直是自然语言处理领域的难点。有了 LLM 的进步使这变得可能。我们希望看到在更多企业场景中应用该解决方案,赋能业务人员与数据自由对话。 ![开发者尾巴.gif](https://dev-media.amazoncloud.cn/0afa2e9c23ff46318dd04e8f6f10ed53_%E5%BC%80%E5%8F%91%E8%80%85%E5%B0%BE%E5%B7%B4.gif "开发者尾巴.gif")
0
目录
关闭