# 背景
随着互联网的爆炸式增长,在不同领域内都实现了上云,如电商、票务服务、社交等,据第三方厂商统计 2023 年爬虫流量请求占比达到 57 %,已经超过人工的访问流量。
爬虫是一种自动化程序,用于在互联网上按照预定规则抓取网页数据。为了不当利益,黑灰产会利用爬虫这样自动化的方式来最大化获取利益,进行欺诈行为影响用户体验甚至破坏平台生态。
# 需求
对于中小型公司而言,解决爬虫等恶意流量的最佳实践是接入一家 Bot management 服务商,专业的人做专业的事,公司聚焦在业务发展上。
对于大型公司而言,接入三方服务,可能会有以下缺点
- 成本高昂不减,云服务的边际成本不会随着业务发展而递减
- 不能满足定制化的需求,比如识别竞争对手的爬虫攻击、甚至运营人员的手动访问
- 攻击绕过防护时依赖三方,较被动,来回沟通成本高
基于以上,当有强烈需求时可以考虑推动自研的反爬系统。
# 相关服务介绍
在搭建反爬系统的解决方案中,我们利用了以下服务
- [Amazon CloudFront](https://aws.amazon.com/cn/cloudfront/?trk=cndc-detail) 是一项加快将静态和动态 Web 内容(例如 .html、.css、.js 和图像文件)分发给用户的速度的 Web 服务。
- [Elastic Load Balancing](https://aws.amazon.com/cn/elasticloadbalancing/?trk=cndc-detail) (ELB) 是一种自动调整应用程序负载到多个服务器上的负载均衡服务。它位于应用程序客户端和后台服务器之间,可以均衡来自客户端的流量,以确保所有服务器都处于适当的工作负载状态,以提升系统的稳定性和安全性。
- Amazon WAF 是一种网络安全服务,旨在保护Web应用程序免受常见的Web攻击。
- [Amazon API Gateway](https://aws.amazon.com/cn/api-gateway/?trk=cndc-detail) 是一项亚马逊云科技服务,用于创建、发布、维护、监控和保护任意规模的 REST、HTTP 和 WebSocket API,API Gateway 充当应用程序从后端服务访问数据、业务逻辑或功能的“前门”。
- Apache Kafka 是一个高吞吐量的分布式发布-订阅消息系统,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
- Apache Flink 是为分布式、高性能的流处理应用程序打造的开源流处理框架。Flink 不仅能提供同时支持高吞吐和 exactly-once 语义的实时计算,还能提供批量数据处理。
- Redis 是一个基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库,数百万开发人员将其用作缓存、矢量数据库、文档数据库、流引擎和消息代理。
- [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 是一项完全托管的服务,它汇集了大量工具,可为任何使用案例提供高性能、低成本的[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)(ML)。
- Decision engine 决策引擎是一种软件系统或工具,能够自动化地执行业务规则、策略或算法,帮助企业在不同情况下做出一致、可重复的决策。它主要用于处理复杂的业务逻辑,通过输入一组预定义的规则或模型,基于数据自动生成决策,减少人工干预,提高效率和准确性。
- [Amazon QuickSight](https://aws.amazon.com/cn/quicksight/?trk=cndc-detail) 是一种云原生的[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)商业智能(BI)服务。
- Google reCAPTCHA google 提供的一种用于识别人机的验证码服务。
# **整体架构概览**
![image.png](https://dev-media.amazoncloud.cn/4df5bca35eeb49288615f939f7f45fe9_image.png "image.png")
1. 客户端发起请求,通过 Javascript 或 SDK 采集 HTTP 请求 、设备信息、鼠标轨迹等日志,并将其上传至决策引擎服务,利用 Kafka 对日志进行分类处理,将不同类型的日志发送到对应的 Topic,供 Flink 进行后续处理和分析。
2. Flink 从各个 Kafka Topic 消费日志数据,计算出不同的特征信息,并将这些特征存储到 Redis 中,供 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 调用进行模型推理。计算结果将被回传至决策引擎,进一步用于决策制定。
3. 决策引擎将最终的决策结果通过加密的 token 输出,并存储在客户端的 cookie 中。
4. 客户端携带该加密 token 发起请求,流经 [Amazon API Gateway](https://aws.amazon.com/cn/api-gateway/?trk=cndc-detail),API Gateway 解密 token 并提取其中包含的决策结果。
5. 如果 token 不合法或过期,[Amazon API Gateway](https://aws.amazon.com/cn/api-gateway/?trk=cndc-detail) 会重新请求决策引擎以获取该请求的最新决策结果。
6. 当请求被识别为爬虫时,[Amazon API Gateway](https://aws.amazon.com/cn/api-gateway/?trk=cndc-detail) 将返回相应的处罚措施,例如 403 禁止访问或要求输入验证码。如果是验证码验证流程,客户端将请求相应的服务完成验证,并携带验证码 token 再次发起请求,API Gateway 会对该 token 进行解析和验证。
7. 若风险评估结果为安全,[Amazon API Gateway](https://aws.amazon.com/cn/api-gateway/?trk=cndc-detail) 将允许请求继续流转至后端服务,确保正常访问。
# 关键实现
http请求日志、滑动轨迹日志、设备日志数据中以滑动轨迹数据为例,讲述整个流程
## 1. 客户端加载采集轨迹脚本
demo 如下
```jsx
// 初始化变量来存储滑动数据
let touchStart = { x: 0, y: 0, t: 0};
let touchEnd = { x: 0, y: 0, t: 0};
let touchData = []; // 存储整个轨迹序列
let requestId; // 存储请求ID
// 从服务端获取请求ID
const requestId = req.headers['X-Request-ID'];
// 引入加密和压缩库
const crypto = require('crypto'); // Node.js环境
const zlib = require('zlib');
// 假设你有一个加密函数
function encryptAndCompress(data, key) {
// 先将数据转换为JSON字符串
const jsonString = JSON.stringify(data);
// 使用压缩算法压缩数据
return zlib.gzipSync(jsonString).toString('base64').replace(/\\+/g, '-').replace(/\\//g, '_').replace(/\\=/g, '');
}
// 发送滑动数据到服务器的函数
function sendDataToServer(data, requestId) {
// 对数据进行加密和压缩
const encryptedCompressedData = encryptAndCompress(data, 'your-encryption-key');
// 准备请求头部
const headers = {
'Content-Type': 'application/json',
'Anti-In': encryptedCompressedData,
'X-Request-ID': requestId
};
// 发送数据
fetch('/api/collect-trajectory', {
method: 'POST',
headers: headers,
body: JSON.stringify({}) // 因为数据已经放在头部,这里可以传递一个空对象或者根据后端需求调整
});
}
// 监听触摸移动事件
document.addEventListener('touchmove', function(e) {
e.preventDefault();
// 添加当前轨迹点到数组
touchData.push({
x: e.changedTouches[0].screenX,
y: e.changedTouches[0].screenY,
t: new Date().getTime()
});
}, false);
// 监听触摸结束事件
document.addEventListener('touchend', function(e) {
const data = {
start: touchStart,
end: touchEnd,
trajectory: touchData
};
// 发送收集的数据到服务器
sendDataToServer(data);
// 清空数据,准备下一次滑动
touchData = [];
}, false);
```
在这个示例中,我们使用了`touchstart`、`touchmove`和`touchend`事件来追踪用户的滑动行为。每次滑动都会产生一系列的坐标数据,这些数据包括触摸的起点、终点以及滑动过程中的所有坐标点,每个坐标点还包括时间戳。当用户停止滑动时,我们通过`fetch` API将这些数据发送到服务器。
滑动轨迹参数通常经过加密及压缩处理,随后被嵌入至HTTP请求的头部(Header)字段中。
示例
![](https://file.notion.so/f/f/c8085a67-71bd-4700-83f4-0d6d1fac6f9d/95a90dfd-bbbd-4939-9d6d-776fae23dda8/image.png?table=block\\&id=1101f8c5-75ed-807f-b657-f68e524bf823\\&spaceId=c8085a67-71bd-4700-83f4-0d6d1fac6f9d\\&expirationTimestamp=1727776800000\\&signature=184CIh_FfyUmEjAH4Pxn8opNCgD4VvYqX9L-EBizo44\\&downloadName=image.png)
![image.png](https://dev-media.amazoncloud.cn/d5a462625ccd4153978186311d6735b5_image.png "image.png")
## 2. 服务端接受请求并转发至 Kafka
demo 如下
```jsx
const express = require('express');
const kafka = require('kafka-node');
const app = express();
// Kafka客户端配置
const client = new kafka.KafkaClient({ /* Kafka连接配置 */ });
const producer = new kafka.Producer(client);
// 主题名称
const kafkaTopic = 'your-kafka-topic';
// 确保生产者准备好
producer.on('ready', () => {
console.log('Kafka producer is ready');
});
// 错误处理
producer.on('error', (err) => {
console.error('Error from Kafka producer:', err);
});
// 发送数据到Kafka的函数
function sendDataToKafka(data) {
const payload = [
{
topic: kafkaTopic,
messages: JSON.stringify(data)
}
];
producer.send(payload, (err, data) => {
if (err) console.error('Error sending message to Kafka:', err);
});
}
// HTTP接口,接收数据并将其发送到Kafka
app.post('/send-to-kafka', (req, res) => {
const data = req.headers['Anti-In']; // 假设请求体包含要发送的数据
sendDataToKafka(data);
res.status(200).end(); // 响应200状态码,无需返回内容
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server is running on port \${PORT}`);
});
```
## 3. Flink 消费 Kafka 进行特征计算工作
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import MapFunction
class FeatureCalculator(MapFunction):
def map(self, value):
# 在这里实现特征计算的逻辑
# 假设value是一个JSON字符串,解析并提取特征
data = json.loads(value)
# 计算特征...
features = calculate_features(data)
# 返回计算结果
return features
def calculate_features(data):
# 特征计算逻辑
# ...
return features
# Flink环境配置
env = StreamExecutionEnvironment.get_execution_environment()
# Kafka消费者配置
kafka_consumer_properties = {
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'flink-group'
}
kafka_consumer = FlinkKafkaConsumer('your-kafka-topic', SimpleStringSchema(), kafka_consumer_properties)
# 创建数据流
data_stream = env.add_source(kafka_consumer)
# 应用特征计算函数
feature_stream = data_stream.map(FeatureCalculator(), Types.ROW([Types.FLOAT()]))
# Kafka生产者配置
kafka_producer_properties = {
'bootstrap.servers': 'kafka-broker:9092'
}
kafka_producer = FlinkKafkaProducer('features_topic', SimpleStringSchema(), kafka_producer_properties)
# 将特征结果发送到新的Kafka topic
feature_stream.add_sink(kafka_producer)
# 执行Flink作业
env.execute('Feature Calculation and Sink Job')
```
轨迹数据与部分特征如下
```python
# 轨迹数据
#[x, y, 距 js 脚本初始化的时间差 t]
[[35,37,0],[11,0,91],[10,0,12],[9,0,14],[9,0,12],[9,0,14],[8,0,12],[8,0,16],[8,0,11],[7,0,16],[7,0,11],[6,0,10],[6,0,13],[6,0,18],[6,0,15],[5,0,17],[5,0,19],[5,0,12],[4,0,17],[4,0,18],[5,0,12],[3,0,16],[4,0,12],[4,0,13],[3,0,20],[3,0,12],[3,0,12],[3,0,12],[3,0,13],[2,0,20],[3,0,14],[2,0,11],[3,0,16],[2,0,15],[2,0,12],[2,0,17],[1,0,13],[2,0,13],[2,0,11],[1,0,17],[2,0,10],[1,0,16],[2,0,18],[1,0,11],[1,0,15],[1,0,12],[1,0,18],[2,0,11],[1,0,19],[1,0,35],[1,0,17],[1,0,16],[1,0,15],[1,0,11],[1,0,35],[1,0,10],[1,0,23],[1,0,27],[1,0,31],[1,0,36],[1,0,31],[1,0,45],[1,0,55],[1,0,68],[1,0,77],[1,0,94],[1,0,165],[1,0,315]]
```
| id | feature | comment |
| --- | --- | --- |
| 1 | t_sum_head_step_10 | 序列前10段滑动总用时 |
| 2 | t_mean_head_step_30 | 序列前30段滑动平均用时 |
| 3 | t_mean_head_step_10 | 序列前10段滑动平均用时 |
| 4 | t_median | 整体滑动每段用时中位数 |
| 5 | t_median_head_step_30 | 序列前30段滑动用时中位数 |
| 6 | t_median_tail_step_30 | 序列后30段滑动用时中位数 |
| 7 | t_mean | 整体滑动平均用时 |
| 8 | t_median_head_step_10 | 序列前30段滑动用时中位数 |
| 9 | t_mean_tail_step_30 | 序列后30段滑动平均用时 |
| 10 | x_a_min | x位移加速度最小值 |
## 4. Amazon Sagemaker 模型训练与实时推理
常见作弊模式
- 两次点击之间滑动点数过少
![image.png](https://dev-media.amazoncloud.cn/702d4f5192d147ccb7200a0cd066167f_image.png "image.png")
- 两次点击之间加入不符合人类习惯的轨迹
![image.png](https://dev-media.amazoncloud.cn/ebc79ab8a30f458ea8e2b4efc9199b00_image.png "image.png")
- 利用正常轨迹加随机噪音来生成新的轨迹
![image.png](https://dev-media.amazoncloud.cn/318040cda4444a369d858e99931a4d42_image.png "image.png")
### 对于已发现的攻击轨迹,可以使用有监督模型对历史轨迹数据建模
模型选择:TransformerEncoder,选择原因
- 能够学到轨迹序列的特征,无需做其他特征工程,同样适用的还有lstm,gru网络,但是不如transformer的视角能够看到全局的信息
- 计算速度快,rt耗时短,实验过gru,在300长度的序列上rt 100ms以上
模型结构:
![](https://file.notion.so/f/f/c8085a67-71bd-4700-83f4-0d6d1fac6f9d/e5e61557-f69c-4c1c-84bc-1cea6532331e/image.png?table=block\\&id=1111f8c5-75ed-808c-8ad7-ddfb8dd713a9\\&spaceId=c8085a67-71bd-4700-83f4-0d6d1fac6f9d\\&expirationTimestamp=1727776800000\\&signature=SC6ZnttUMHiBIKZyaSxbPvPRnE64LjKI42OJ-HM6cBE\\&downloadName=image.png)
![image.png](https://dev-media.amazoncloud.cn/e4bf28fa1bd7491fbe03451ef6534144_image.png "image.png")
轨迹序列长度取300,特征维度为7,进入模型后先embedding成300x32维特征,后接2层TranformerEncoder层进行特征提取,最后进入分类器,输出模型分。
通过设置合适的阈值来区分异常的轨迹数据。
### 对于未知的攻击轨迹,可以使用无监督模型对正常人的轨迹数据建模
模型选择:AutoEncoder
模型结构:
![](https://file.notion.so/f/f/c8085a67-71bd-4700-83f4-0d6d1fac6f9d/0697a112-a71d-4107-a2f5-d0118f7aae3e/image.png?table=block\\&id=1111f8c5-75ed-805d-97a7-f81e7d2c3e42\\&spaceId=c8085a67-71bd-4700-83f4-0d6d1fac6f9d\\&expirationTimestamp=1727776800000\\&signature=F2zZxFY3D\\_1Da1KTyKo58nSnxpP3pWfd-M2P0ILVA60\\&downloadName=image.png)
![image.png](https://dev-media.amazoncloud.cn/7b26946993dc49d0b2a85d8667f80e51_image.png "image.png")
```python
# 模型训练
model.compile(loss='mse',optimizer='adam')
```
模型 decode 后的矩阵结果和原输入特征矩阵的重构误差,值越大代表和正常数据偏差越大越异常,通过设置合适的阈值来区分异常的轨迹数据。
# 总结与展望
- 本文档详细介绍了如何利用亚马逊云科技服务搭建一个高效的反爬虫系统。通过整合Kafka、Redis、Flink、[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail)、Decision Engine、[Amazon QuickSight](https://aws.amazon.com/cn/quicksight/?trk=cndc-detail) 等组件,我们构建了一个强大的数据处理和分析流水线。系统不仅能够实时监控和识别恶意爬虫行为,还能通过[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)模型对已知和未知威胁进行预测和防御。
- 除了基于 HTTP、设备信息、鼠标轨迹等日志,对于更高阶的爬虫,需要在参数加密、代码混淆、APP 加固等安全工程的角度投入成本,防止黑产反编译逆向分析、防止请求被篡改或重放,这样才能做到纵深防御,提高黑产攻击成本,保护业务安全。
- 反爬是一场没有终点的比赛,只要有利益在,就会一直对抗,未来可以引入更先进的[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)算法,提高爬虫识别的准确率。
# 关于作者
**Jason Ding**(Siwei Ding)现任某大型在线快时尚电商公司资深安全工程师,擅长反爬、风控等业务场景的架构设计开发,在人工智能领域有一定的实践经验。曾对接过多家知名反爬服务提供商的经验,如 Google reCAPTCHA、Cloudflare、PerimeterX等,拥有丰富的黑产对抗实战经验。