### **一、简介**
最近许多领域的进展加快了对少数二维图像/投影的分类、回归和检测问题的需求。通常,这些现代技术的核心是利用神经网络,神经网络可以通过深度学习算法实现。在我们的神经网络架构中,我们嵌入了一个动态可编程的量子电路,作为隐藏层,以学习正确的参数,从 MNIST 数据库中正确分类手写数字。
### **二、环境准备**
1. Amazon IAM Console 中创建访问密钥 Access Key,并保持状态为 Active 活动状态
2. [Amazon Braket](https://aws.amazon.com/cn/braket/?trk=cndc-detail) Console 创建 AmazonBraketJobsExecutionPolicy 策略的角色,并启用第三方设备
3. 安装亚马逊云科技命令行界面(Amazon CLI),并使用 amazon configure 命令来配置凭证文件。
```
[default]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_KEY
region = us-east-1
```
4. 准备 Python 环境并安装 PyTorch、Torchvision、QuTrunk、AmazonBraketSDK 等
### **三、MNIST 数据集**
- MNIST 数据集是[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)领域中,一个非常经典的数据集,也是手写数字图像识别任务的经典训练集。MNIST 数据集是由美国国家标准和技术研究院(NIST)创建,包含了大量的手写数字图像,用于训练和测试各种分类算法和模型。
- MNIST 数据集包含了 60000 个训练样本和 10000 个测试样本。每个样本都是一张 28_28 像素的灰度图像,表示了一个 0-9 的手写数字。这些图像已经被处理成 28_28 的像素值矩阵,每个像素点的值介于 0 到 255 之间。为方便训练与测试,这些像素值已经被归一化到 0 到 1 之间。
- 使用 MNIST 数据集进行手写数字识别任务的训练,通常的方法是将图像像素展平成一维向量(28*28=784),然后使用各种分类算法和模型进行训练和预测。由于 MNIST 数据集样本的标签是已知的,因此我们可以通过比较算法或模型预测结果和样本标签的差异程度,来评估算法或模型的准确性和可靠性。
- MNIST 数据集的应用范围广泛,它被广泛地应用于[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)、计算机视觉、深度学习等领域的手写数字图像识别任务。对于提高算法或模型的准确性、精度和效率方面,它是一个不可或缺的数据集。同时,基于 MNIST 数据集的手写数字识别实验,也被广泛用于学生的计算机视觉和深度学习课程的教学。
```
import numpy as np
import torch.utils.data
from torchvision import datasets, transforms
def trim_dataset(dataset, start_num, end_num, n_samples):
trimmed_dataset = np.where(dataset.targets == start_num)[0][:n_samples]
for i in range(start_num + 1, end_num + 1):
trimmed_dataset = np.append(trimmed_dataset, np.where(dataset.targets == i)[0][:n_samples])
return trimmed_dataset
def get_data_loader(start, end, n_samples, dataset_dir, train=True):
dataset = datasets.MNIST(
root=dataset_dir,
train=train,
download=True,
transform=transforms.Compose([transforms.ToTensor()])
)
idx = trim_dataset(dataset, start, end, n_samples)
dataset.data = dataset.data[idx]
dataset.targets = dataset.targets[idx]
loader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=True, pin_memory=train)
return loader
```
### **四、定义量子线路**
```
from qutrunk.backends.braket.braket import BackendBraket
from braket.aws import AwsDevice
def get_backend():
session.add_braket_user_agent("QuTrunkBraketProvider/0.0.1")
device = AwsDevice('arn:aws:braket:us-east-1::device/qpu/ionq/Harmony',)
return BackendBraket(
device,
name='ionq',
description=f"AWS Device: {device.provider_name} {device.name}.",
online_date=device.properties.service.updatedAt,
)
class Circuit:
def __init__(self, n_qubits: int, shots: int, backend: Optional[Backend] = None):
self.n_qubits = n_qubits
self.shots = shots
self.backend = backend if backend is not None else get_backend()
@cached_property
def outputs(self) -> List[str]:
measurements = list(itertools.product([0, 1], repeat=self.n_qubits))
return [''.join([str(bit) for bit in measurement]) for measurement in measurements]
def create_circuit(self) -> QCircuit:
circuit = QCircuit(self.backend)
thetas = circuit.create_parameters([f'Theta{k}' for k in range(self.n_qubits)])
qr = circuit.allocate(self.n_qubits)
All(H) * qr
Barrier * qr # noqa
for i, theta in enumerate(thetas):
Ry(theta) * qr[i]
for i in range(self.n_qubits):
Measure * qr[i] # noqa
return circuit
def calc_expectation_z(self, counts) -> np.ndarray:
expects = np.zeros(len(self.outputs))
for index, label in enumerate(self.outputs):
expects[index] = counts.get(label, 0) / self.shots
return expects
def run(self, i) -> np.ndarray:
circuit = self.create_circuit()
circuit.bind_parameters({
f'Theta{k}': i[k].item()
for k in range(self.n_qubits)
})
result = circuit.run(shots=self.shots)
counts = result.get_counts()
counts = json.loads(counts)
counts = {k: v for c in counts for k, v in c.items()}
expects = self.calc_expectation_z(counts)
global circuit_runtimes
circuit_runtimes += 1
return expects
```
### **五、定义 QNN**
定义 QuantumLayer
```
class QuantumLayer(Function):
SHIFT: float = None
NUM_QUBITS: int = None
circuit_cls = Circuit
@staticmethod
def forward(ctx, inputs): # noqa (complex)
if not hasattr(ctx, 'circuit'):
ctx.circuit = Circuit(settings.NUM_QUBITS, shots=settings.NUM_SHOTS)
exp_value = ctx.circuit.run(inputs)
result = torch.tensor(np.array([exp_value]))
ctx.save_for_backward(result, inputs)
return result
@staticmethod
def backward(ctx, grad_output): # noqa (complex)
forward_tensor, input_numbers = ctx.saved_tensors
gradients = torch.Tensor()
for k in range(settings.NUM_QUBITS):
shift_right = input_numbers.detach().clone()
shift_right[k] = shift_right[k] + settings.SHIFT
shift_left = input_numbers.detach().clone()
shift_left[k] = shift_left[k] - settings.SHIFT
expectation_right = ctx.circuit.run(shift_right)
expectation_left = ctx.circuit.run(shift_left)
gradient = torch.tensor(np.array([expectation_right])) - torch.tensor(np.array([expectation_left]))
gradients = torch.cat((gradients, gradient.float()))
result = torch.Tensor(gradients)
return (result.float() * grad_output.float()).T
```
定义网络,加入量子线路层
```
class QNN(nn.Module):
def __init__(self):
super(QNN, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, settings.NUM_QUBITS)
self.qc = QuantumLayer.apply
self.qcsim = nn.Linear(settings.NUM_QUBITS, 1)
self.fc3 = nn.Linear(1, 4)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
x = np.pi * torch.tanh(x)
MODE = 'QC' # 'QC' or 'QC_sim'
if MODE == 'QC':
x = self.qc(x[0]) # QUANTUM LAYER
else:
x = self.qcsim(x)
return x
def predict(self, x):
# apply softmax
pred = self.forward(x)
ans = torch.argmax(pred[0]).item()
return torch.tensor(ans)
def load_checkpoint(self):
checkpoint = torch.load(settings.CHECKPOINT_FILE)
self.load_state_dict(checkpoint)
self.eval()
return self
```
### **六、模型训练**
```
def train(epochs, samples, test):
logging.info('Training model with %d epochs and %d samples...', epochs, samples)
network = QNN()
network.to(settings.DEVICE)
optimizer = optim.Adam(network.parameters(), lr=settings.LEARNING_RATE)
loss_list = []
loss_func = nn.CrossEntropyLoss()
train_loader = get_data_loader(
settings.START_DIGIT,
settings.END_DIGIT,
samples,
settings.DATASETS_PATH,
train=True
)
for epoch in range(epochs):
total_loss = []
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = network(data.to(settings.DEVICE))
loss = loss_func(output, target.to(settings.DEVICE))
loss.backward()
optimizer.step()
total_loss.append(loss.item())
loss_list.append(sum(total_loss) / len(total_loss))
logging.info('Training [%d%%]\\tLoss: %.4f', 100 * (epoch + 1) / epochs, loss_list[-1])
```
[Amazon Braket](https://aws.amazon.com/cn/braket/?trk=cndc-detail) 控制台查看量子任务
![image.png](https://dev-media.amazoncloud.cn/915312d4e87343b6a03051f5c0b10514_image.png "image.png")
```
Training model with 10 epochs and 150 samples...
Training [10%] Loss: 2.0896
Training [20%] Loss: 2.0831
Training [30%] Loss: 2.0789
Training [40%] Loss: 2.0710
Training [50%] Loss: 2.0625
Training [60%] Loss: 2.0551
Training [70%] Loss: 2.0495
Training [80%] Loss: 2.0480
Training [90%] Loss: 2.0476
Training [100%] Loss: 2.0470
```
3- qubit Loss Curve ryN - 0123456 - 15 epochs old archetecture.jpg
![image.png](https://dev-media.amazoncloud.cn/0466f4d1ce804733bbebc1988725e3bb_image.png "image.png")
### **七、Gitlab CICD 部署和测试**
#### **7.1创建 Runner**
```
# Download the binary for your system
sudo curl -L --output /usr/local/bin/gitlab-runner https://gitlab-runner-downloads.s3.amazonaws.com/latest/binaries/gitlab-runner-linux-amd64
# Give it permission to execute
sudo chmod +x /usr/local/bin/gitlab-runner
# Create a GitLab Runner user
sudo useradd --comment 'GitLab Runner' --create-home gitlab-runner --shell /bin/bash
# Add gitlab-runner to docker group
sudo usermod -a -G docker gitlab-runner
# Install and run as a service
sudo gitlab-runner install --user=gitlab-runner --working-directory=/home/gitlab-runner
sudo gitlab-runner start
# Register runner
sudo gitlab-runner register --url \$GITLAB_URL --registration-token \$REGISTRATION_TOKEN
```
Gitlab CICD 设置中查看 Runner 已注册成功:
![image.png](https://dev-media.amazoncloud.cn/62e7af52e24748a794132586240605a2_image.png "image.png")
#### **7.2编写 CICD 配置**
.gitlab-ci.yml
```
stages:
- deploy
docker-deploy:
stage: deploy
script:
- docker compose up -d --build
only:
- develop
```
#### **7.3查看流水线**
![image.png](https://dev-media.amazoncloud.cn/7a27e5023d1248a397bddc3758e9fdec_image.png "image.png")
流水线已运行成功
#### **7.4测试应用**
![image.png](https://dev-media.amazoncloud.cn/c325c2f9f3054fe9ab336df5b33a3dba_image.png "image.png")
![image.png](https://dev-media.amazoncloud.cn/e6d222db2b7447aba34b0775f27e9c95_image.png "image.png")