基于 QuTrunk 和 PyTorch、Gitlab CICD 构建和部署混合量子神经网络进行手写数字识别应用

PyTorch
Amazon Braket
0
0
### **一、简介** 最近许多领域的进展加快了对少数二维图像/投影的分类、回归和检测问题的需求。通常,这些现代技术的核心是利用神经网络,神经网络可以通过深度学习算法实现。在我们的神经网络架构中,我们嵌入了一个动态可编程的量子电路,作为隐藏层,以学习正确的参数,从 MNIST 数据库中正确分类手写数字。 ### **二、环境准备** 1. Amazon IAM Console 中创建访问密钥 Access Key,并保持状态为 Active 活动状态 2. Amazon Braket Console 创建 AmazonBraketJobsExecutionPolicy 策略的角色,并启用第三方设备 3. 安装亚马逊云科技命令行界面(Amazon CLI),并使用 amazon configure 命令来配置凭证文件。 ``` [default] aws_access_key_id = YOUR_ACCESS_KEY_ID aws_secret_access_key = YOUR_SECRET_KEY region = us-east-1 ``` 4. 准备 Python 环境并安装 PyTorch、Torchvision、QuTrunk、AmazonBraketSDK 等 ### **三、MNIST 数据集** - MNIST 数据集是机器学习领域中,一个非常经典的数据集,也是手写数字图像识别任务的经典训练集。MNIST 数据集是由美国国家标准和技术研究院(NIST)创建,包含了大量的手写数字图像,用于训练和测试各种分类算法和模型。 - MNIST 数据集包含了 60000 个训练样本和 10000 个测试样本。每个样本都是一张 28_28 像素的灰度图像,表示了一个 0-9 的手写数字。这些图像已经被处理成 28_28 的像素值矩阵,每个像素点的值介于 0 到 255 之间。为方便训练与测试,这些像素值已经被归一化到 0 到 1 之间。 - 使用 MNIST 数据集进行手写数字识别任务的训练,通常的方法是将图像像素展平成一维向量(28*28=784),然后使用各种分类算法和模型进行训练和预测。由于 MNIST 数据集样本的标签是已知的,因此我们可以通过比较算法或模型预测结果和样本标签的差异程度,来评估算法或模型的准确性和可靠性。 - MNIST 数据集的应用范围广泛,它被广泛地应用于机器学习、计算机视觉、深度学习等领域的手写数字图像识别任务。对于提高算法或模型的准确性、精度和效率方面,它是一个不可或缺的数据集。同时,基于 MNIST 数据集的手写数字识别实验,也被广泛用于学生的计算机视觉和深度学习课程的教学。 ``` import numpy as np import torch.utils.data from torchvision import datasets, transforms def trim_dataset(dataset, start_num, end_num, n_samples): trimmed_dataset = np.where(dataset.targets == start_num)[0][:n_samples] for i in range(start_num + 1, end_num + 1): trimmed_dataset = np.append(trimmed_dataset, np.where(dataset.targets == i)[0][:n_samples]) return trimmed_dataset def get_data_loader(start, end, n_samples, dataset_dir, train=True): dataset = datasets.MNIST( root=dataset_dir, train=train, download=True, transform=transforms.Compose([transforms.ToTensor()]) ) idx = trim_dataset(dataset, start, end, n_samples) dataset.data = dataset.data[idx] dataset.targets = dataset.targets[idx] loader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=True, pin_memory=train) return loader ``` ### **四、定义量子线路** ``` from qutrunk.backends.braket.braket import BackendBraket from braket.aws import AwsDevice def get_backend(): session.add_braket_user_agent("QuTrunkBraketProvider/0.0.1") device = AwsDevice('arn:aws:braket:us-east-1::device/qpu/ionq/Harmony',) return BackendBraket( device, name='ionq', description=f"AWS Device: {device.provider_name} {device.name}.", online_date=device.properties.service.updatedAt, ) class Circuit: def __init__(self, n_qubits: int, shots: int, backend: Optional[Backend] = None): self.n_qubits = n_qubits self.shots = shots self.backend = backend if backend is not None else get_backend() @cached_property def outputs(self) -> List[str]: measurements = list(itertools.product([0, 1], repeat=self.n_qubits)) return [''.join([str(bit) for bit in measurement]) for measurement in measurements] def create_circuit(self) -> QCircuit: circuit = QCircuit(self.backend) thetas = circuit.create_parameters([f'Theta{k}' for k in range(self.n_qubits)]) qr = circuit.allocate(self.n_qubits) All(H) * qr Barrier * qr # noqa for i, theta in enumerate(thetas): Ry(theta) * qr[i] for i in range(self.n_qubits): Measure * qr[i] # noqa return circuit def calc_expectation_z(self, counts) -> np.ndarray: expects = np.zeros(len(self.outputs)) for index, label in enumerate(self.outputs): expects[index] = counts.get(label, 0) / self.shots return expects def run(self, i) -> np.ndarray: circuit = self.create_circuit() circuit.bind_parameters({ f'Theta{k}': i[k].item() for k in range(self.n_qubits) }) result = circuit.run(shots=self.shots) counts = result.get_counts() counts = json.loads(counts) counts = {k: v for c in counts for k, v in c.items()} expects = self.calc_expectation_z(counts) global circuit_runtimes circuit_runtimes += 1 return expects ``` ### **五、定义 QNN** 定义 QuantumLayer ``` class QuantumLayer(Function): SHIFT: float = None NUM_QUBITS: int = None circuit_cls = Circuit @staticmethod def forward(ctx, inputs): # noqa (complex) if not hasattr(ctx, 'circuit'): ctx.circuit = Circuit(settings.NUM_QUBITS, shots=settings.NUM_SHOTS) exp_value = ctx.circuit.run(inputs) result = torch.tensor(np.array([exp_value])) ctx.save_for_backward(result, inputs) return result @staticmethod def backward(ctx, grad_output): # noqa (complex) forward_tensor, input_numbers = ctx.saved_tensors gradients = torch.Tensor() for k in range(settings.NUM_QUBITS): shift_right = input_numbers.detach().clone() shift_right[k] = shift_right[k] + settings.SHIFT shift_left = input_numbers.detach().clone() shift_left[k] = shift_left[k] - settings.SHIFT expectation_right = ctx.circuit.run(shift_right) expectation_left = ctx.circuit.run(shift_left) gradient = torch.tensor(np.array([expectation_right])) - torch.tensor(np.array([expectation_left])) gradients = torch.cat((gradients, gradient.float())) result = torch.Tensor(gradients) return (result.float() * grad_output.float()).T ``` 定义网络,加入量子线路层 ``` class QNN(nn.Module): def __init__(self): super(QNN, self).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(10, 20, kernel_size=5) self.conv2_drop = nn.Dropout2d() self.fc1 = nn.Linear(320, 50) self.fc2 = nn.Linear(50, settings.NUM_QUBITS) self.qc = QuantumLayer.apply self.qcsim = nn.Linear(settings.NUM_QUBITS, 1) self.fc3 = nn.Linear(1, 4) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 320) x = F.relu(self.fc1(x)) x = F.dropout(x, training=self.training) x = self.fc2(x) x = np.pi * torch.tanh(x) MODE = 'QC' # 'QC' or 'QC_sim' if MODE == 'QC': x = self.qc(x[0]) # QUANTUM LAYER else: x = self.qcsim(x) return x def predict(self, x): # apply softmax pred = self.forward(x) ans = torch.argmax(pred[0]).item() return torch.tensor(ans) def load_checkpoint(self): checkpoint = torch.load(settings.CHECKPOINT_FILE) self.load_state_dict(checkpoint) self.eval() return self ``` ### **六、模型训练** ``` def train(epochs, samples, test): logging.info('Training model with %d epochs and %d samples...', epochs, samples) network = QNN() network.to(settings.DEVICE) optimizer = optim.Adam(network.parameters(), lr=settings.LEARNING_RATE) loss_list = [] loss_func = nn.CrossEntropyLoss() train_loader = get_data_loader( settings.START_DIGIT, settings.END_DIGIT, samples, settings.DATASETS_PATH, train=True ) for epoch in range(epochs): total_loss = [] for batch_idx, (data, target) in enumerate(train_loader): optimizer.zero_grad() output = network(data.to(settings.DEVICE)) loss = loss_func(output, target.to(settings.DEVICE)) loss.backward() optimizer.step() total_loss.append(loss.item()) loss_list.append(sum(total_loss) / len(total_loss)) logging.info('Training [%d%%]\tLoss: %.4f', 100 * (epoch + 1) / epochs, loss_list[-1]) ``` Amazon Braket 控制台查看量子任务 ![image.png](https://dev-media.amazoncloud.cn/915312d4e87343b6a03051f5c0b10514_image.png "image.png") ``` Training model with 10 epochs and 150 samples... Training [10%] Loss: 2.0896 Training [20%] Loss: 2.0831 Training [30%] Loss: 2.0789 Training [40%] Loss: 2.0710 Training [50%] Loss: 2.0625 Training [60%] Loss: 2.0551 Training [70%] Loss: 2.0495 Training [80%] Loss: 2.0480 Training [90%] Loss: 2.0476 Training [100%] Loss: 2.0470 ``` 3- qubit Loss Curve ryN - 0123456 - 15 epochs old archetecture.jpg ![image.png](https://dev-media.amazoncloud.cn/0466f4d1ce804733bbebc1988725e3bb_image.png "image.png") ### **七、Gitlab CICD 部署和测试** #### **7.1创建 Runner** ``` # Download the binary for your system sudo curl -L --output /usr/local/bin/gitlab-runner https://gitlab-runner-downloads.s3.amazonaws.com/latest/binaries/gitlab-runner-linux-amd64 # Give it permission to execute sudo chmod +x /usr/local/bin/gitlab-runner # Create a GitLab Runner user sudo useradd --comment 'GitLab Runner' --create-home gitlab-runner --shell /bin/bash # Add gitlab-runner to docker group sudo usermod -a -G docker gitlab-runner # Install and run as a service sudo gitlab-runner install --user=gitlab-runner --working-directory=/home/gitlab-runner sudo gitlab-runner start # Register runner sudo gitlab-runner register --url $GITLAB_URL --registration-token $REGISTRATION_TOKEN ``` Gitlab CICD 设置中查看 Runner 已注册成功: ![image.png](https://dev-media.amazoncloud.cn/62e7af52e24748a794132586240605a2_image.png "image.png") #### **7.2编写 CICD 配置** .gitlab-ci.yml ``` stages: - deploy docker-deploy: stage: deploy script: - docker compose up -d --build only: - develop ``` #### **7.3查看流水线** ![image.png](https://dev-media.amazoncloud.cn/7a27e5023d1248a397bddc3758e9fdec_image.png "image.png") 流水线已运行成功 #### **7.4测试应用** ![image.png](https://dev-media.amazoncloud.cn/c325c2f9f3054fe9ab336df5b33a3dba_image.png "image.png") ![image.png](https://dev-media.amazoncloud.cn/e6d222db2b7447aba34b0775f27e9c95_image.png "image.png")
0
目录
关闭