### 背景
随着企业数据量的持续增长,如何让非技术人员也能轻松分析数据、获得商业洞察成为了当前的痛点。本文将介绍如何使用亚马逊云科技的大语言模型服务 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) 以及 RAG (Retrieval Augmented Generation),实现 Text2SQL 功能,以此为基础构建基于大语言模型(LLM)的行业数据查询助手,达到使用自然语言询问直接获取数据分析结果的目的。针对不同行业,我们都能见到类似的数据查询难题:通用的 BI 看板无法对即时、特殊的需求进行相应。具体地:
【**场景 A**】某家俱品牌出海电商运营辅助场景。1、数据分析 SQL 生成工具:利用 LLM 辅助数据分析代码工具,对收集到的数据进行分析。可以帮助数据工程团队在数据探索和开发阶段提高效率。2、电商服务平台能够收集和整理与销量相关的数据,对亚马逊电商和第三方独立站运营数据,包括订单量、访问量、产品评价、客诉数、站点 CPC 和 CPO 等进行电商运营辅助分析、选品策略辅助分析。
【**场景 B**】某冷链物流公司,对人(司机)、货(货物信息)、场(仓储)、动作(配送)等场景中进行提问,实现更精细化的监控和管理,从而提升运营效率、降低成本,并提供更优质的物流服务。例如:今日华北地区配送超时的订单比例有多少,华东地区最近一周哪些司机的配送过程中出现了频繁的延误问题等?
【**场景 C**】某头部广告公司,业务目标是优化广告投放决策,提高广告效果,降低试错成本。希望尝试创新应用 LLM 的语义理解能力,进行广告数据分析、广告词汇扩展等工作,可以快速进行广告拓词、广告归因分析等,辅助决策,替代传统需要大量手工构建模型的方式,提升工作效率。利用 LLM 的语义和学习能力进行知识推理和数据学习,实现更智能的广告投放决策,可以辅助广告公司优化媒介选择、时间安排、创意设计、预算分配等多方面决策。
技术层面,Text2SQL 是一个典型的[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)任务,在大语言模型(LLM)出现之前,曾有 seq2seq 等模型架构将其视作机器翻译任务,建立数据和自然语言之间的对齐和映射关系,但在 WikiSQL 等数据集上准确率非常低。在 LLM 出现后,通过其推理、理解以及指令遵循能力,Text2SQL 任务的成功率可以得到非常大的提升。本文旨在通过讲解不同的行业场景中落地的难点,来展现如何利用亚马逊云科技的服务加以设计理念来解决它们。
### 用户体验
第一步:用户通过问答界面提出问题,如“本月销量前十的产品线是什么?”
第二步:系统直接返回 SQL 生成结果,并通过其查询到对应的数据结果
```js
SELECT i.item_id, COUNT(*) AS num_sales FROM interactions WHERE i.event_type = 'purchase'
AND MONTH(i.timestamp) = 12 GROUP BY i.item_id ORDER BY num_sales DESC
LIMIT 10;
```
### 方案难点
构建自然语言数据查询助手是许多行业中企业的重点需求,但实现过程中困难重重。其中最核心的问题是 LLM 对于长尾知识、特定规则的不熟悉,以及对过分复杂提问的理解难度较高。利用 LLM 的推理能力和通用知识理解,辅以外部的知识体系(RAG)和执行规则来达到较好的生成效果。
![image.png](https://dev-media.amazoncloud.cn/03ca8821dc3942dc95c7c649d46eb4d4_image.png "image.png")
#### 1)LLM 无法识别长尾领域知识
在**电商领域**中,跳出率指的是一个衡量网站用户行为的指标,表示只访问了一个页面就离开网站的访客数量占所有访客的比例。同时,基于用户习惯的不同,跳出率也可能被称为“单页访问率”或“单次访问率”。若有相应问题是用了单页访问率作为问题,那么大模型有可能无法理解他们所说的内容。
因此可以使用增强检索(RAG)来获取正确的范例,作为生成 SQL 时 Prompt 的一部分来提高性能。这类做法通常被称为 Few-Shot (FS),指模型在推理时给予少量样本,但不进行权重更新。在 Text2SQL 上,我们有两种做法:1)使用 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) 及 Amazon OpenSearch 来对问题进行向量化,并在问题的向量库中匹配类似的问题。2)使用传统 NLP 模型进行命名实体识别(NER),并进而进行标签匹配,最终把对应正确的问题和 SQL 获取。在持续的客户使用和反馈频率上,我们可以长期提升生成准确率。
![image.png](https://dev-media.amazoncloud.cn/1cc864b93b67457c818f9bb892ad3018_image.png "image.png")
RAG in Text2SQL 的若干实现方式
#### 2)数据库的元数据过于庞大复杂,影响最终效果
在**呼叫中心案例**中,存在表格可能有一个表专门用于存储用户基本信息(如用户 ID、姓名、联系方式等),另一个表用于存储用户的账单信息(如账单 ID、用户 ID、账单日期、金额等),再有一个表用于存储用户的呼叫记录(如呼叫 ID、用户 ID、呼叫日期、通话时长等)。这样,每个表的宽度(即字段数量)可能都不会很大,但是由于需要处理的数据种类多,所以表的数量可能会非常多,常常出现上百张表。表的复杂不仅提高了 SQL 生成难度也受到输入字符数量的限制。
因此,在原始数据上构建指标体系是最有效的方案,通过实现新的少量宽表来减少 SQL 生成的复杂度。
![image.png](https://dev-media.amazoncloud.cn/49e649bfc3044bc0a54bd4be566c1f00_image.png "image.png")
指标层的提取的可以大大降低生成难度
与此同时,我们可以构建选表流程(Schema Linking)来将表格的选取当作单独的步骤剥离,在选好表之后,再将对应表的结构加入 Prompt 中。这样可以有效地减少 Token 的输入量及提高生成的准确率。
![image.png](https://dev-media.amazoncloud.cn/c8407b93a72a4565b84ed98d920e7732_image.png "image.png")
Schema Linking 可以解决 Input Token 不足的和过高复杂度的问题
#### 3)语法错误
在 **Text2SQL 任务**中,经常会出现执行错误,例如:1)Group By,Join,Nested 的错误,由数据库在执行时返回;2)列、表的选择错误,甚至无法将表名或列名拼写对。通常我们采用两种方式提升准确率:
1. 使用思维链(Chain-of-Thoughts)拆解步骤:把一个多步推理问题分解出多个中间步骤,并且让大语言模型更加可解释,并且提升整体生成质量,例如在复杂的 Text2SQL 任务中。
2. 使用模型自我修复错误:这通常也是思维链的一部分。以下是一个示例:
错误的 SQL 输入:
```js
SELECT *
FROM Orders WHERE Customer_ID = 100;
```
将错误重新喂入 LLM:
```js
(Previous DDL, hints, and question pasted here)
After we executed SQL (SQL pasted here), we observed the following error.
```ERROR: column "Customer_ID" does not exist```
Please fix the SQL and give explanation:
```
输出的回答:
```js
The issue here is that the column name is incorrect. In our Orders table, the column is named CustomerID, not Customer_ID.
Here is the fixed SQL statement:
```sql
SELECT *
FROM Orders WHERE CustomerID = 100;
```
#### 4)提问无法回答
当提问者问出“为什么我的销量下降”,这个问题已经不再是 SQL 语句能够解决的。因此我们需要进行意图识别来进行问题分流,其目的是:
1. 将最常见通用的问题导向模版,提升整体的准确率
2. 将不太可能回答的问题识别出来,并告知用户超出的提问范围
3. 将可以回答,但暂时缺少关键信息的问题识别出来,并告知用户需要补充哪些信息
4. 将模版之外的问题直接导向 Text2SQL 模块
![image.png](https://dev-media.amazoncloud.cn/c194412611114e06abb169c8082ee5a8_image.png "image.png")
利用 LLM 实现意图识别和意图分类,再区别化处理不同形态的分类
### 解决方案指引
我们提供的标准方案构建于亚马逊云科技的服务体系之上,问答页面使用 ECS 容器上部署。用户问题由 API Gateway 路由到 Lambda 函数,在 Amazon Lambda 中查询历史问答库、转换 SQL、获取结果。历史问答库使用 OpenSearch 服务。SQL 转换使用 RAG 大语言模型服务。数据库使用 Amazon Aurora 储存必要的查询、用户信息。该系统最大的价值是提供标准部署方案,以较高的准确度使用 Amazon Bedrock 自动完成自然语言到 SQL 的转换,无需人工编码规则。
![image.png](https://dev-media.amazoncloud.cn/9598e123090d4694aec37a8bbe7e7d67_image.png "image.png")
架构图
![image.png](https://dev-media.amazoncloud.cn/b07683a6c02442788a077843850ff37d_image.png "image.png")
![image.png](https://dev-media.amazoncloud.cn/66f9db687fce488c8b0b958f1b9d36db_image.png "image.png")
![image.png](https://dev-media.amazoncloud.cn/be86f59e72ed4cc7b7847a355e21827a_image.png "image.png")
### 示例代码
生成 SQL 的模型方面,我们可以根据客户需求,选择 Bedrock 提供的业界翘楚的商业化模型(如:Claude),也可以选择在亚马逊云科技机器学习基础设施平台 Amazon SageMaker 上部署 code generation 的开源模型(如:SQLCoder/CodeLlama)。
如果客户没有太多 LLM 技术栈的积累,及足够的算法工程师团队,那么 Amazon Bedrock 是一个较好的选择,在 Amazon Bedrock 上提供了广泛的商业化成熟的 Functional Model,其中 Anthropic 的 Claude 模型经测试在各项 sql 生成中稳定性和功能均表现突出。
Amazon Bedrock 通过 SDK 调用 Claude 模型进行 sql 生成及自然语言交互的核心代码示例如下:
```js
def get_bedrock_client(assumed_role: Optional[str] = None,
region: Optional[str] = None,
runtime: Optional[bool] = True,
):
if region is None:
target_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION"))
else:
target_region = region
print(f"Create new client\n Using region: {target_region}")
session_kwargs = {"region_name": target_region}
client_kwargs = {**session_kwargs}
profile_name = os.environ.get("AWS_PROFILE")
if profile_name:
print(f" Using profile: {profile_name}")
session_kwargs["profile_name"] = profile_name
retry_config = Config(
region_name=target_region,
retries={
"max_attempts": 10,
"mode": "standard",
},
)
session = boto3.Session(**session_kwargs)
if assumed_role:
print(f" Using role: {assumed_role}", end='')
sts = session.client("sts")
response = sts.assume_role(
RoleArn=str(assumed_role),
RoleSessionName="langchain-llm-1"
)
print(" ... successful!")
client_kwargs["aws_access_key_id"] = response["Credentials"]["AccessKeyId"]
client_kwargs["aws_secret_access_key"] = response["Credentials"]["SecretAccessKey"]
client_kwargs["aws_session_token"] = response["Credentials"]["SessionToken"]
if runtime:
service_name='bedrock-runtime'
else:
service_name='bedrock'
client_kwargs["aws_access_key_id"] = os.environ.get("AWS_ACCESS_KEY_ID","")
client_kwargs["aws_secret_access_key"] = os.environ.get("AWS_SECRET_ACCESS_KEY","")
bedrock_client = session.client(
service_name=service_name,
config=retry_config,
**client_kwargs
)
print("boto3 Bedrock client successfully created!")
print(bedrock_client._endpoint)
return bedrock_client
#role based initial client#######
os.environ["AWS_DEFAULT_REGION"] = "us-west-2" # E.g. "us-east-1"
os.environ["AWS_PROFILE"] = "default"
os.environ["BEDROCK_ASSUME_ROLE"] = "arn:aws:iam::687912291502:role/service-role/AmazonSageMaker-ExecutionRole-20211013T113123" # E.g. "arn:aws:..."
#新boto3 sdk session方式初始化bedrock
boto3_bedrock = get_bedrock_client(
assumed_role=os.environ.get("BEDROCK_ASSUME_ROLE", None),
region=os.environ.get("AWS_DEFAULT_REGION", None)
)
parameters_bedrock = {
"max_tokens_to_sample": 2048,
#"temperature": 0.5,
"temperature": 0,
#"top_k": 250,
#"top_p": 1,
"stop_sequences": ["\n\nHuman"],
}
####langchain bedrock集成示例
bedrock_llm = Bedrock(model_id="anthropic.claude-v2",
client=boto3_bedrock,
model_kwargs=parameters_
db = SQLDatabase.from_uri(
"mysql+pymysql://admin:admin12345678@database-us-west-2-demo.cluster-c1qvx9wzmmcz.us-west-2.rds.amazonaws.com/llm",
include_tables=['dws_truck_portrait_index_sum_da','dws_ots_waybill_info_da','ads_bi_quality_monitor_shipping_detail','dim_pub_truck_info'], # we include only one table to save tokens in the prompt :)
sample_rows_in_table_info=0)
db_chain = CustomerizedSQLDatabaseChain.from_llm(llm=bedrock_llm, db=db, verbose=False, return_sql=True)
db_chain.run("2022年的运输总量是多少吨?")
```
如上代码示例,其中
- get_bedrock_client:该函数为 amazon boto3 sdk 获取 bedrock 服务客户端的方法,可以支持 AKSK 或者 IAM Role 方式
- db =SQLDatabase.from_uri 及 CustomerizedSQLDatabaseChain:该部分为通过 bedrock 客户端调用 Claude 模型,并通过 langchain SqlDatabaseChain 进行数据库连接及最终 sql 生成
如果客户希望有更多的模型选择性,且自身算法工程师的研发团队有一定的技术实力,希望对模型做更多的定制,以便更加适应自身业务系统的个性化需求(如 fine tune 模型微调),从而更加嵌入到自身的端到端业务流程,那可以选择业界的开源 sql 生成模型,其中 SQLCoder 模型在基本 sql 生成中稳定性和功能较其他开源模型更加优质。
SQLCoder 模型是一个 SOTA 大型语言模型, 在开发者的开源评估框架 SQLEval 中,SQLCoder 的性能优于其他多款主要的开源模型。其中 SQLCoder 15b 参数的模型,在大量 SQL 复杂查询上进行微调后,在针对单个数据库模式进行推理生成时,它的性能和功能在某些场景甚至优于 OpenAI 的 GPT-3.5,因此比较适合对自身业务数据库定制化需求比较高的 SQL 生成场景。
在 Amazon SageMaker 平台上,可以通过 inference endpoint 终端节点,简单的几行代码即可部署 SQLCoder 的模型,一旦部署完成,即可通过 SageMaker 的 inference sdk 接口进行推理生成,简化工程化落地工作量,提高生产化上线的效率。
使用 Amazon SageMaker inference endpoint 部署 SQLcoder 15b 模型及推理调用的代码示例如下:
```js
inference_image_uri = (
f"763104351884.dkr.ecr.{region}.amazonaws.com/djl-inference:0.23.0-deepspeed0.9.5-cu118"
)
engine=Python
option.entryPoint=model.py
option.load_in_4bit=TRUE
option.tensor_parallel_degree=4
option.s3url=s3://sagemaker-us-west-2-687912291502/LLM-RAG/workshop/LLM_sqlcoder_model/
####SageMaker inference endpoint部署sdk
from sagemaker.utils import name_from_base
import boto3
model_name = name_from_base(f"sqlcoder") #Note: Need to specify model_name
print(f"Image going to be used is ---- > {inference_image_uri}")
create_model_response = sm_client.create_model(
ModelName=model_name,
ExecutionRoleArn=role,
PrimaryContainer={
"Image": inference_image_uri,
"ModelDataUrl": s3_code_artifact
},
)
model_arn = create_model_response["ModelArn"]
print(f"Created Model: {model_arn}")
endpoint_config_name = f"{model_name}-config"
endpoint_name = f"{model_name}-endpoint"
# Note: ml.g4dn.2xlarge 也可以选择
endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"VariantName": "variant1",
"ModelName": model_name,
"InstanceType": "ml.g4dn.12xlarge",
"InitialInstanceCount": 1
},
],
)
create_endpoint_response = sm_client.create_endpoint(
EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")
#### 模型推理测试
import json
import boto3
smr_client = boto3.client("sagemaker-runtime")
parameters = {
"max_new_tokens": 400,
"do_sample": False
}
question="我想知道top客户的住址"
sql_prompt= """
### Instructions:
Your task is to convert a question into a SQL query, given a Mysql database schema.
Adhere to these rules:
- **Deliberately go through the question and database schema word by word** to appropriately answer the question
- **Use Table Aliases** to prevent ambiguity. For example, `SELECT table1.col1, table2.col1 FROM table1 JOIN table2 ON table1.id = table2.id`.
- When creating a ratio, always cast the numerator as float
### Input:
Generate a SQL query that answers the question `{question}`.
This query will run on a database whose schema is represented in this string:
...省略
### Response:
Based on your instructions, here is the SQL query I have generated to answer the question `{question}`:
```sql
""".format(question=question)
response_model = smr_client.invoke_endpoint(
EndpointName=endpoint_name,
Body=json.dumps(
{
"inputs": sql_prompt,
"parameters": parameters
}
),
ContentType="application/json",
)
result_sql=response_model['Body'].read().decode('utf8')
print(result_sql.split("```sql")[-1].split("```")[0].split(";")[0].strip().replace("\\\\n"," ") + ";")
```
如上示例代码所示,其中:
- inference_image_uri:该部分为 SageMaker inference 推理镜像,SageMaker 提供了各种开箱即用的推理容器镜像,如 deepspeed,vllm,tensorRT,此处我们选择 deepspeed 的推理加速镜像
- engine=Python:该部分及其它配置项为 inference 推理容器的配置文件,可以指定推理的各种优化方式及模型加载位置等
- entryPoint=model.py:指定模型推理脚本
- load_in_4bit=TRUE:是否 4bit 量化加载
- tensor_parallel_degree=4:多卡并行推理
- s3url=s3://sagemaker-us-west-2-687912291502/LLM-RAG/workshop/LLM_sqlcoder_model/: 模型文件 S3 路径
- create_endpoint_config:配置部署终端节点,如机型,实例数量
- sql_prompt:生成 sql 提示词
- response_model = smr_client.invoke_endpoint:触发推理生成 sql
在元数据召回/意图识别精准调整 sql 生成时,向量化模型也是其中重要的一环,通过将自然语言文本(包括一个单词、短语甚至大型文档)转换为数字表示形式,然后使用这些向量从向量数据库中准确搜索相关段落,这样您就能充分利用自己的私域专业元数据(如数据库名,表名及明细字段等)与 sql 生成的基础模型(FM)组合,进行基于语义相似度的搜索,提高检索 sql 生成的库表元数据的准确性,及终端用户提问的语义识别的准确性。
关于向量化模型的选择,与 sql 生成模型一样,也可以选择商业化或者开源模型两种部署方式,其中商业化模型可以选择 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) Titan Embedding 模型,它针对文本检索进行了优化,超过 25 种语言,包括英语、中文和西班牙语。最多可以输入 8192 个 token,因此非常适合处理 sql 生成场景中较大的元数据检索的需求。并且 Bedrock Titan 的输出向量可以支持 1,536 个维度的长度,使其具有更高的准确性。可通过 [Amazon Bedrock](https://aws.amazon.com/cn/bedrock/?trk=cndc-detail) 的[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)体验获得,因此您可以使用单个 API 轻松访问它,无需管理任何基础设施。
同样,如果针对向量嵌入模型需要更多的定制,以在语义相似检索时获得更高的召回准确率,那么开源的bge模型是一个不错的选择。BGE 模型是国内智源发布的开源可商用的中英文语义向量模型 BGE(BAAI General Embedding),BGE 保持了同等参数量级模型中的最小向量维度,使用成本更低。在中文语义向量综合表征能力评测 C-MTEB 的实验结果显示(Table 1),BGE 中文模型(BGE-zh)在对接大语言模型最常用到的检索能力上领先优势尤为显著。
在 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 的 deploy 部署 bge large zh 版本模型及 embedding 向量化核心代码示例如下:
```js
serving.properties
engine=Python
option.tensor_parallel_degree=1
option.s3url = s3://sagemaker-us-east-1-106839800180/LLM-RAG/workshop/bge-zh-model/
#### SageMaker endpoint创建及部署
model_name = name_from_base("bge-zh-15")
print(model_name)
print(f"Image going to be used is ---- > {inference_image_uri}")
create_model_response = sm_client.create_model(
ModelName=model_name,
ExecutionRoleArn=role,
PrimaryContainer={
"Image": inference_image_uri,
"ModelDataUrl": s3_code_artifact
},
)
model_arn = create_model_response["ModelArn"]
print(f"Created Model: {model_arn}")
endpoint_config_name = f"{model_name}-config"
endpoint_name = f"{model_name}-endpoint"
endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"VariantName": "variant1",
"ModelName": model_name,
"InstanceType": "ml.g4dn.xlarge",
"InitialInstanceCount": 1,
"ContainerStartupHealthCheckTimeoutInSeconds": 15*60,
},
],
)
create_endpoint_response = sm_client.create_endpoint(
EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")
def load_model(properties):
tensor_parallel = properties["tensor_parallel_degree"]
model_location = properties['model_dir']
if "model_id" in properties:
model_location = properties['model_id']
logging.info(f"Loading model in {model_location}")
model = FlagModel(model_location)
return model
model = None
def handle(inputs: Input):
global model
if not model:
model = load_model(inputs.get_properties())
if inputs.is_empty():
return None
data = inputs.get_as_json()
input_sentences = None
inputs = data["inputs"]
if isinstance(inputs, list):
input_sentences = inputs
else:
input_sentences = [inputs]
is_query = data["is_query"]
instruction = data["instruction"]
logging.info(f"inputs: {input_sentences}")
logging.info(f"is_query: {is_query}")
logging.info(f"instruction: {instruction}")
if is_query and instruction:
input_sentences = [ instruction + sent for sent in input_sentences ]
sentence_embeddings = model.encode(input_sentences)
result = {"sentence_embeddings": sentence_embeddings}
return Output().add_as_json(result)
def get_vector_by_sm_endpoint(questions, sm_client, endpoint_name):
parameters = {
}
response_model = sm_client.invoke_endpoint(
EndpointName=endpoint_name,
Body=json.dumps(
{
"inputs": questions,
"is_query": True,
"instruction" : "Represent this sentence for searching relevant passages:"
}
),
ContentType="application/json",
)
json_str = response_model['Body'].read().decode('utf8')
json_obj = json.loads(json_str)
embeddings = json_obj['sentence_embeddings']
return embeddings
```
如上示例代码所示,其中:
- load_model:模型加载的方法,BGE 需要使用 FlagModel 类进行加载
- handle:使用 bge 模型对输入文本进行向量化的方法,其中 instruction 指令为“为以下中文文本生成向量嵌入”
- properties:与 SQLCoder 模型部署一样,BGE 向量模型在 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 上的部署配置文件
- get_vector_by_sm_endpoint:调用上面的 SageMaker 的 endpoint 进行文本向量化
### 总结
不同行业对数据的需求千变万化,将自然语言问题自动转换为 SQL 查询一直是自然语言处理领域的难点。有了 LLM 的进步使这变得可能。我们希望看到在更多企业场景中应用该解决方案,赋能业务人员与数据自由对话。
![开发者尾巴.gif](https://dev-media.amazoncloud.cn/0afa2e9c23ff46318dd04e8f6f10ed53_%E5%BC%80%E5%8F%91%E8%80%85%E5%B0%BE%E5%B7%B4.gif "开发者尾巴.gif")