Building an Apache Kafka data processing Java application using the Amazon CDK

Java
Node.js
TypeScript
Kafka
海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"#### **Building an Apache Kafka data processing Java application using the AWS CDK**\n\nPiotr Chotkowski, Cloud Application Development Consultant, AWS Professional Services\n\nUsing a Java application to process data queued in Apache Kafka is a common use case across many industries. Event-driven and microservices architectures, for example, often rely on Apache Kafka for data streaming and component decoupling. You can use it as a message queue or an event bus, as well as a way to improve resilience and reproducibility of events occurring inside of the application.\n\nIn this post, I walk you through the process of creating a simple end-to-end data processing application using AWS tools and services as well as other industry standard techniques. We start with a brief architecture overview and an infrastructure definition. Then you see how with just a few lines of code you can set up an Apache Kafka cluster using [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/cn/msk/) (Amazon MSK) and the [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) (AWS CDK). Next, I show you how to shape your project structure and package your application for deployment. We also look at the implementation details and how we can create Kafka topics in Amazon MSK cluster as well as send and receive messages from Apache Kafka using services such as [AWS Lambda](https://aws.amazon.com/cn/lambda/) and [AWS Fargate](https://aws.amazon.com/cn/fargate/).\n\nI use the AWS CDK to automate infrastructure creation and application deployment. The AWS CDK is an open-source software development framework to define your cloud application resources using familiar programming languages. For more information, see the [Developer Guide](https://docs.aws.amazon.com/cdk/v2/guide/home.html), [AWS CDK Intro Workshop](https://cdkworkshop.com/), and the [AWS CDK Examples GitHub repo](https://github.com/aws-samples/aws-cdk-examples).\n\nAll the code presented in this post is open sourced and available on [GitHub](https://github.com/aws-samples/amazon-msk-java-app-cdk).\n\n#### **Overview of solution**\n\nThe following diagram illustrates our overall architecture.\n\n![image.png](https://dev-media.amazoncloud.cn/ad36374753c34b26ab9fbc84a9f37f4d_image.png)\n\nTriggering the TransactionHandler Lambda function publishes messages to an Apache Kafka topic. The application is packaged in a container and deployed to ECS Fargate, consumes messages from the Kafka topic, processes them, and stores the results in an [Amazon DynamoDB](https://aws.amazon.com/cn/dynamodb/) table. The KafkaTopicHandler Lambda function is called once during deployment to create Kafka topic. Both the Lambda function and the consumer application publish logs to [Amazon CloudWatch](https://aws.amazon.com/cn/cloudwatch/).\n\nTo follow along with this post, you need the following prerequisites:\n\n- An active AWS account\n- Java SE Development Kit (JDK) 11\n- Apache Maven\n- The AWS CDK\n- The AWS Command Line Interface (AWS CLI) version 2\n- Docker\n\n#### **Project structure and infrastructure definition**\n\nThe project consists of three main parts: the infrastructure (including Kafka cluster and Amazon DynamoDB), a Spring Boot Java consumer application, and Lambda producer code.\n\nLet’s start with exploring the infrastructure and deployment definition. It’s implemented using a set of AWS CDK [stacks](https://docs.aws.amazon.com/cdk/v2/guide/stacks.html) and [constructs](https://docs.aws.amazon.com/cdk/v2/guide/constructs.html). I’ve chosen Typescript as my language here mainly because of personal preference. However if you prefer you can use CDK with other languages. At the time of writing, AWS CDK supports Python, TypeScript, Java, .NET and Go. For more information, see [Working with the AWS CDK](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html).\n\nLet’s look at the project directory structure. All AWS CDK stacks are located in the [amazon-msk-java-app-cdk/lib](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lib)directory. In [amazon-msk-java-app-cdk/bin](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/bin), you can find the main AWS CDK app where all of the stacks are instantiated. [amazon-msk-java-app-cdk/lambda](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lambda)ontains code for```TransactionHandler```,which publishes messages to a Kafka topic, as well as code for ```KafkaTopicHandler```,which is responsible for creating Kafka topic. The business logic for the Kafka consumer, which is a Java Maven project, is in the [consumer](https://github.com/aws-samples/amazon-msk-java-app-cdk/tree/main/consumer) directory. The Dockerfile necessary for Fargate container creation is located in [consumer/docker/Dockerfile](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/docker/Dockerfile). Finally, [doc](https://github.com/aws-samples/amazon-msk-java-app-cdk/tree/main/doc) contains architecture diagrams and scripts contains the deployment script.\n\n#### **Setting up your Kafka cluster**\n\nThe central part of the architecture is the Kafka cluster created using Amazon MSK, which is relatively easy to define and deploy with the AWS CDK. In the following code, I use the CfnCluster construct to set up my cluster:\n\n```\nTypeScript\n```\nnew msk.CfnCluster(this, \"kafkaCluster\", {\n ```\nbrokerNodeGroupInfo: {\n```\nsecurityGroups: [vpcStack.kafkaSecurityGroup.securityGroupId],\n```\n clientSubnets: [...vpcStack.vpc.selectSubnets({\n```\n subnetType: ec2.SubnetType.PRIVATE\n```\n }).subnetIds],\n```\ninstanceType: \"kafka.t3.small\",\n```\nstorageInfo: {\n```\nebsStorageInfo: {\n```\n volumeSize: 5\n```\n }\n```\n }\n```\n },\n```\n clusterName: \"TransactionsKafkaCluster\",\n```\nkafkaVersion: \"2.7.0\",\n```\n numberOfBrokerNodes: 2\n```\n});\n```\n```\n\n```vpcStack```n the preceding code refers to the AWS CDK stack containing the VPC definition. Because we’re using this cluster for demonstration purposes only, I limit storage to 5 GB, the instance type to [kafka.t3.small](https://aws.amazon.com/cn/msk/pricing/), and the number of broker nodes to two, which is the minimum allowed number. We don’t want to connect to this cluster from outside the VPC, so I place the cluster in a private subnet of my VPC. For more information about the allowed settings, see [interface CfnClusterProps](https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-msk.CfnClusterProps.html). To learn more about Amazon MSK, check out the [Amazon MSK Labs workshop](https://amazonmsk-labs.workshop.aws/en/).\n\n#### **Topic creation**\n\nexport const handler = async (event: any, context: any = {}): Promise<any> => {\n```\n try {\n```\n if (event.RequestType === 'Create' || event.RequestType === 'Update') {\n```\n let result = await createTopic(event.ResourceProperties.topicConfig);\n```\nresponse.send(event, context, response.SUCCESS, {alreadyExists: !result});\n```\n } else if (event.RequestType === 'Delete') {\n```\nawait deleteTopic(event.ResourceProperties.topicConfig.topic);\n```\n response.send(event, context, response.SUCCESS, {deleted: true});\n```\n }\n```\n } catch (e) {\n```\n response.send(event, context, response.FAILED, {reason: e});\n```\n }\n```\n}\n```\n```\n\nHandler is called once when the [KafkaTopicStack](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lib/kafka-topic-stack.ts) is deployed and once when it’s destroyed. I use the admin client from the [KafkaJS](https://kafka.js.org/) open-source library to create Kafka topic on ‘Create’ AWS CloudFormation event and to destroy it on ‘Delete’ event. Calling KafkaJS’s ```createTopics```method will resolve to```true```if the topic was created successfully or false if it already exists.\n\n#### **Consumer implementation details**\n\nThe main purpose of the Kafka consumer part of this project is to process and validate incoming transaction messages and store results in the DynamoDB table. The consumer application is written in Java with the use of the Spring Boot framework. The core part of functionality is implemented in the [KafkaConsumer](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/src/main/java/amazon/aws/samples/kafka/KafkaConsumer.java) class. I use the [KafkaListener](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html) annotation to define the entry point for incoming messages. Spring takes care of most of the boilerplate code for us, in particular, we don’t need to write the logic to manually pull messages from the Kafka topic or worry about deserialization. All you need to do is provide the necessary elements in the configuration class. In the following code, the Spring Boot configuration is located in the [ApplicationConfiguration](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/src/main/java/amazon/aws/samples/kafka/ApplicationConfig.java) class:\n\n```\nJava\n```\n@Bean\n```\npublic ConsumerFactory<String, byte[]> consumerFactory(KafkaConsumerProperties properties) {\n```\nMap<String, Object> configs = new HashMap<>();\n```\n configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress());\n```\nconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());\n```\n configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);\n```\nconfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);\n```\n configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, \"SSL\");\n```\nconfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation());\n```\n LOGGER.info(configs.toString());\n```\n\n```\n return new DefaultKafkaConsumerFactory<>(configs);\n```\n}\n```\n```\n@Bean\n```\npublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConsumerFactory<String, byte[]> consumerFactory) {\n```\nConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();\n```\nreturn factory;\n```\n}\n```\n```\n\nThe preceding code sets up the Kafka consumer configuration. We get the bootstrap servers address string and Kafka consumer group ID from the environment variables that are set up during application deployment. By default, Amazon MSK uses TLS 1.2 for secure communication, so we need to set up SSL configuration in our application as well. For more information about encryption, see [Amazon MSK Encryption](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html).\n\nFor the deserialization of incoming Kafka messages, I use classes provided by the Apache Kafka library. To enable Spring to deserialize Kafka JSON messages into POJOs, I use the ```ByteArrayDeserializer``` class combined with```\nByteArrayJsonMessageConverter```.That way, Spring simply passes bytes as is from the deserializer to the message converter, and the converter transforms bytes into Java objects using Jackson’s```ObjectMapper```underneath. I use this approach because it allows me to send plaintext JSON messages. We don’t need anything more sophisticated for the purpose of this post. Depending on your needs, you can use different combinations of deserializers and message converters or dedicated deserializers, such as [KafkaAvroDeserializer](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html), which uses the schema registry to figure out the target type.\n\nFor more information about how to use Apache Kafka with Spring framework please refer to the [Spring documentation](https://docs.spring.io/spring-kafka/reference/html/).\n\n#### **Consumer deployment**\n\nWe complete three high-level steps to deploy the consumer application into Fargate.\n\nFirst, we need to build and package our application into an executable JAR. I use the [Apache Maven Shade plugin](https://maven.apache.org/plugins/maven-shade-plugin/) with Spring Boot Maven plugin dependency. It’s configured in the consumer application [pom.xml](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/pom.xml). The JAR is created during the package phase of the Maven project build and placed in the [consumer/docker](https://github.com/aws-samples/amazon-msk-java-app-cdk/tree/main/consumer/docker) directory next to the Dockerfile.\n\nNext, we define the image used to create the ECS task container. To do that, we create a Dockerfile, which is a text file containing all the instructions and configuration necessary to assemble a Docker image. I use Amazon Linux 2 as a base for the image, additionally installing Java 11 [Amazon Corretto](https://aws.amazon.com/cn/corretto/?filtered-posts.sort-by=item.additionalFields.createdDate&filtered-posts.sort-order=desc) distribution, awslogs, and a CloudWatch agent. For the SSL configuration, we also need to copy the ```truststore```file. In [line 9](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/57edb764259629a66258ee968ad790c3c541bc4b/consumer/docker/Dockerfile#L9), we copy the executable JAR built in the previous step from the local location into the image. The last line in the Dockerfile is an entry point starting the consumer application. It’s a standard Java command:\n\n```\nBash\n```\njava -cp kafka-consumer-1.0-SNAPSHOT-shaded.jar amazon.aws.samples.kafka.ConsumerApplication\n```\n```\n\nFinally, we reference the Dockerfile in the AWS CDK stack. We do this inside the [fargate-stack.ts]() file. We define the infrastructure necessary to run our containerized application in the ECS task. To use the local Dockerfile image definition inside the AWS CDK stack, you need to create the asset [DockerImageAsset](https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-ecr-assets.DockerImageAsset.html):\n\n```\nTypeScript\n```\nconst image = new assets.DockerImageAsset(this, \"ConsumerImage\", {\n```\n directory: '../consumer/docker'\n```\n});\n```\n```\nNext, we reference this image asset in the definition of the ECS task using the [ContainerImage.fromDockerImageAsset](https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-ecs.ContainerImage.html#static-fromwbrdockerwbrimagewbrassetasset) method:\n\n```\nTypeScript\n```\nfargateTaskDefinition.addContainer(\"KafkaConsumer\", {\n```\n image: ecs.ContainerImage.fromDockerImageAsset(image),\n```\nlogging: ecs.LogDrivers.awsLogs({streamPrefix: 'KafkaConsumer'}),\n```\n environment: {\n```\n'TABLE_NAME': this.tableName,\n```\n 'GROUP_ID': this.groupId,\n```\n'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,\n```\n 'REGION': this.region,\n```\n'TOPIC_NAME': topicName.valueAsString\n```\n }\n```\n});\n```\n```\n\nDuring the AWS CDK stack deployment, the image defined in the Dockerfile is created and uploaded to an [Amazon Elastic Container Registry](https://aws.amazon.com/cn/ecr/) (Amazon ECR) repository. That image is used to create and start the ECS task container, thereby starting our consumer application. For more information about other ways of obtaining images, see the [Amazon ECS Construct Library](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-ecs-readme.html#images).\n\n#### **Producer implementation details**\n\nNow let’s deploy our Kafka producer code. The AWS CDK stack definition for that part is located in the l[ambda-stack.ts](https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lib/lambda-stack.ts)file.\n\n```\nTypeScript\n```\nlet transactionHandler = new NodejsFunction(this, \"TransactionHandler\", {\n```\n runtime: Runtime.NODEJS_14_X,\n```\nentry: 'lambda/transaction-handler.ts',\n```\n handler: 'handler',\n```\nvpc: vpcStack.vpc,\n```\n securityGroups: [vpcStack.lambdaSecurityGroup],\n```\nfunctionName: 'TransactionHandler',\n```\n timeout: Duration.minutes(5),\n```\nenvironment: {\n```\n 'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,\n```\n 'TOPIC_NAME': topicName.valueAsString\n```\n }\n```\n});\n```\n```\n\nThis is a relatively short piece of code. The Amazon CDK```NodejsFunction``` construct allows us to package our business logic code and deploy it as a Node.js Lambda function to the AWS Cloud. Due to internal AWS CDK packaging and deployment logic, it makes your life easier if you place the directory containing your Lambda code in the AWS CDK root directory next to the bin and lib directories. In the properties, in the ```entry``` field, you have to point to the local file containing your code. This is the relative path from the AWS CDK root directory. You can pass environment variables inside of the```environment``` field. For this post, I pass Kafka’s bootstrap address string and topic name that I need in order to communicate with the Kafka cluster and send messages from within the Lambda function. If esbuild is available, it’s used to bundle your code in your environment. Otherwise, bundling occurs in a Docker container. This means that if you don’t want to use esbuild, you have to start a Docker daemon before deploying your AWS CDK stack. For more information about the ```NodejsFunction```construct, see the [Amazon Lambda Node.js Library](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lambda-nodejs-readme.html).\n\n#### **Execution walk through**\n\nOnce we deploy the application it’s time to test it. To trigger Lambda function and send a message to the Kafka queue you can use the following AWS CLI command.\n\n```\nBash\n```\naws lambda invoke --cli-binary-format raw-in-base64-out --function-name TransactionHandler --log-type Tail --payload '{ \"accountId\": \"account_123\", \"value\": 456}' /dev/stdout --query 'LogResult' --output text | base64 –d\n\n```\n```\nHere you are adding 456 to the balance of the account ```account_123```. Lambda function sends JSON message to the Amazon MSK cluster. The consumer application pulls the message from the Kafka topic in the form of bytes and transforms it to an instance of POJO class. Next the consumer business logic executes and the application stores results in the Amazon DynamoDB table. You can run following command to see the content of the table.\n\n```\nBash\n```\naws dynamodb scan --table-name Accounts --query \"Items[*].[id.S,Balance.N]\" --output text\n```\n```\n\nAll the logs from execution are stored in Amazon CloudWatch. To view them you can go to AWS console or run [aws logs tail](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/logs/tail.html) command with specified CloudWatch Logs group.\n\nYou can experiment with the application by sending multiple messages with different values of accountId and value fields of JSON payload.\n\nYou can experiment with the application by sending multiple messages with different values of accountId and value fields of JSON payload.\n\n#### **Conclusion**\n\nIn this post, we discussed different techniques to implement and deploy your application using AWS CDK constructs, Java and Typescript application code. High-level AWS CDK constructs enable you to quickly define the cloud infrastructure of your system and let you focus more on implementing your business logic. You can use a mix of programing languages that best fit your use case and keep all your code and infrastructure definitions in one place.\n\nTo run the code presented in this post, follow the prerequisites and usage steps described in the README file of the GitHub project.\n\nStay tuned for more content about cloud application development. If you have any questions or suggestions, please leave a comment. I hope you have enjoyed reading this post and learned something new. If you did, please share with your colleagues. Happy coding!\n\n[More from this author](https://aws.amazon.com/cn/blogs/developer/author/pcchotko/)\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n","render":"<h4><a id=\"Building_an_Apache_Kafka_data_processing_Java_application_using_the_AWS_CDK_0\"></a><strong>Building an Apache Kafka data processing Java application using the AWS CDK</strong></h4>\n<p>Piotr Chotkowski, Cloud Application Development Consultant, AWS Professional Services</p>\n<p>Using a Java application to process data queued in Apache Kafka is a common use case across many industries. Event-driven and microservices architectures, for example, often rely on Apache Kafka for data streaming and component decoupling. You can use it as a message queue or an event bus, as well as a way to improve resilience and reproducibility of events occurring inside of the application.</p>\n<p>In this post, I walk you through the process of creating a simple end-to-end data processing application using AWS tools and services as well as other industry standard techniques. We start with a brief architecture overview and an infrastructure definition. Then you see how with just a few lines of code you can set up an Apache Kafka cluster using <a href=\"https://aws.amazon.com/cn/msk/\" target=\"_blank\">Amazon Managed Streaming for Apache Kafka</a> (Amazon MSK) and the <a href=\"https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html\" target=\"_blank\">AWS Cloud Development Kit</a> (AWS CDK). Next, I show you how to shape your project structure and package your application for deployment. We also look at the implementation details and how we can create Kafka topics in Amazon MSK cluster as well as send and receive messages from Apache Kafka using services such as <a href=\"https://aws.amazon.com/cn/lambda/\" target=\"_blank\">AWS Lambda</a> and <a href=\"https://aws.amazon.com/cn/fargate/\" target=\"_blank\">AWS Fargate</a>.</p>\n<p>I use the AWS CDK to automate infrastructure creation and application deployment. The AWS CDK is an open-source software development framework to define your cloud application resources using familiar programming languages. For more information, see the <a href=\"https://docs.aws.amazon.com/cdk/v2/guide/home.html\" target=\"_blank\">Developer Guide</a>, <a href=\"https://cdkworkshop.com/\" target=\"_blank\">AWS CDK Intro Workshop</a>, and the <a href=\"https://github.com/aws-samples/aws-cdk-examples\" target=\"_blank\">AWS CDK Examples GitHub repo</a>.</p>\n<p>All the code presented in this post is open sourced and available on <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk\" target=\"_blank\">GitHub</a>.</p>\n<h4><a id=\"Overview_of_solution_12\"></a><strong>Overview of solution</strong></h4>\n<p>The following diagram illustrates our overall architecture.</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/ad36374753c34b26ab9fbc84a9f37f4d_image.png\" alt=\"image.png\" /></p>\n<p>Triggering the TransactionHandler Lambda function publishes messages to an Apache Kafka topic. The application is packaged in a container and deployed to ECS Fargate, consumes messages from the Kafka topic, processes them, and stores the results in an <a href=\"https://aws.amazon.com/cn/dynamodb/\" target=\"_blank\">Amazon DynamoDB</a> table. The KafkaTopicHandler Lambda function is called once during deployment to create Kafka topic. Both the Lambda function and the consumer application publish logs to <a href=\"https://aws.amazon.com/cn/cloudwatch/\" target=\"_blank\">Amazon CloudWatch</a>.</p>\n<p>To follow along with this post, you need the following prerequisites:</p>\n<ul>\n<li>An active AWS account</li>\n<li>Java SE Development Kit (JDK) 11</li>\n<li>Apache Maven</li>\n<li>The AWS CDK</li>\n<li>The AWS Command Line Interface (AWS CLI) version 2</li>\n<li>Docker</li>\n</ul>\n<h4><a id=\"Project_structure_and_infrastructure_definition_29\"></a><strong>Project structure and infrastructure definition</strong></h4>\n<p>The project consists of three main parts: the infrastructure (including Kafka cluster and Amazon DynamoDB), a Spring Boot Java consumer application, and Lambda producer code.</p>\n<p>Let’s start with exploring the infrastructure and deployment definition. It’s implemented using a set of AWS CDK <a href=\"https://docs.aws.amazon.com/cdk/v2/guide/stacks.html\" target=\"_blank\">stacks</a> and <a href=\"https://docs.aws.amazon.com/cdk/v2/guide/constructs.html\" target=\"_blank\">constructs</a>. I’ve chosen Typescript as my language here mainly because of personal preference. However if you prefer you can use CDK with other languages. At the time of writing, AWS CDK supports Python, TypeScript, Java, .NET and Go. For more information, see <a href=\"https://docs.aws.amazon.com/cdk/v2/guide/work-with.html\" target=\"_blank\">Working with the AWS CDK</a>.</p>\n<p>Let’s look at the project directory structure. All AWS CDK stacks are located in the <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lib\" target=\"_blank\">amazon-msk-java-app-cdk/lib</a>directory. In <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/bin\" target=\"_blank\">amazon-msk-java-app-cdk/bin</a>, you can find the main AWS CDK app where all of the stacks are instantiated. <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lambda\" target=\"_blank\">amazon-msk-java-app-cdk/lambda</a>ontains code for<code>TransactionHandler</code>,which publishes messages to a Kafka topic, as well as code for <code>KafkaTopicHandler</code>,which is responsible for creating Kafka topic. The business logic for the Kafka consumer, which is a Java Maven project, is in the <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/tree/main/consumer\" target=\"_blank\">consumer</a> directory. The Dockerfile necessary for Fargate container creation is located in <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/docker/Dockerfile\" target=\"_blank\">consumer/docker/Dockerfile</a>. Finally, <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/tree/main/doc\" target=\"_blank\">doc</a> contains architecture diagrams and scripts contains the deployment script.</p>\n<h4><a id=\"Setting_up_your_Kafka_cluster_37\"></a><strong>Setting up your Kafka cluster</strong></h4>\n<p>The central part of the architecture is the Kafka cluster created using Amazon MSK, which is relatively easy to define and deploy with the AWS CDK. In the following code, I use the CfnCluster construct to set up my cluster:</p>\n<pre><code class=\"lang-\">TypeScript\n</code></pre>\n<p>new msk.CfnCluster(this, “kafkaCluster”, {</p>\n<pre><code class=\"lang-\">brokerNodeGroupInfo: {\n</code></pre>\n<p>securityGroups: [vpcStack.kafkaSecurityGroup.securityGroupId],</p>\n<pre><code class=\"lang-\"> clientSubnets: [...vpcStack.vpc.selectSubnets({\n</code></pre>\n<p>subnetType: ec2.SubnetType.PRIVATE</p>\n<pre><code class=\"lang-\"> }).subnetIds],\n</code></pre>\n<p>instanceType: “kafka.t3.small”,</p>\n<pre><code class=\"lang-\">storageInfo: {\n</code></pre>\n<p>ebsStorageInfo: {</p>\n<pre><code class=\"lang-\"> volumeSize: 5\n</code></pre>\n<p>}</p>\n<pre><code class=\"lang-\"> }\n</code></pre>\n<p>},</p>\n<pre><code class=\"lang-\"> clusterName: &quot;TransactionsKafkaCluster&quot;,\n</code></pre>\n<p>kafkaVersion: “2.7.0”,</p>\n<pre><code class=\"lang-\"> numberOfBrokerNodes: 2\n</code></pre>\n<p>});</p>\n<pre><code class=\"lang-\"></code></pre>\n<p><code>vpcStack</code>n the preceding code refers to the AWS CDK stack containing the VPC definition. Because we’re using this cluster for demonstration purposes only, I limit storage to 5 GB, the instance type to <a href=\"https://aws.amazon.com/cn/msk/pricing/\" target=\"_blank\">kafka.t3.small</a>, and the number of broker nodes to two, which is the minimum allowed number. We don’t want to connect to this cluster from outside the VPC, so I place the cluster in a private subnet of my VPC. For more information about the allowed settings, see <a href=\"https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-msk.CfnClusterProps.html\" target=\"_blank\">interface CfnClusterProps</a>. To learn more about Amazon MSK, check out the <a href=\"https://amazonmsk-labs.workshop.aws/en/\" target=\"_blank\">Amazon MSK Labs workshop</a>.</p>\n<h4><a id=\"Topic_creation_82\"></a><strong>Topic creation</strong></h4>\n<p>export const handler = async (event: any, context: any = {}): Promise&lt;any&gt; =&gt; {</p>\n<pre><code class=\"lang-\"> try {\n</code></pre>\n<p>if (event.RequestType === ‘Create’ || event.RequestType === ‘Update’) {</p>\n<pre><code class=\"lang-\"> let result = await createTopic(event.ResourceProperties.topicConfig);\n</code></pre>\n<p>response.send(event, context, response.SUCCESS, {alreadyExists: !result});</p>\n<pre><code class=\"lang-\"> } else if (event.RequestType === 'Delete') {\n</code></pre>\n<p>await deleteTopic(event.ResourceProperties.topicConfig.topic);</p>\n<pre><code class=\"lang-\"> response.send(event, context, response.SUCCESS, {deleted: true});\n</code></pre>\n<p>}</p>\n<pre><code class=\"lang-\"> } catch (e) {\n</code></pre>\n<p>response.send(event, context, response.FAILED, {reason: e});</p>\n<pre><code class=\"lang-\"> }\n</code></pre>\n<p>}</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>Handler is called once when the <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lib/kafka-topic-stack.ts\" target=\"_blank\">KafkaTopicStack</a> is deployed and once when it’s destroyed. I use the admin client from the <a href=\"https://kafka.js.org/\" target=\"_blank\">KafkaJS</a> open-source library to create Kafka topic on ‘Create’ AWS CloudFormation event and to destroy it on ‘Delete’ event. Calling KafkaJS’s <code>createTopics</code>method will resolve to<code>true</code>if the topic was created successfully or false if it already exists.</p>\n<h4><a id=\"Consumer_implementation_details_114\"></a><strong>Consumer implementation details</strong></h4>\n<p>The main purpose of the Kafka consumer part of this project is to process and validate incoming transaction messages and store results in the DynamoDB table. The consumer application is written in Java with the use of the Spring Boot framework. The core part of functionality is implemented in the <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/src/main/java/amazon/aws/samples/kafka/KafkaConsumer.java\" target=\"_blank\">KafkaConsumer</a> class. I use the <a href=\"https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html\" target=\"_blank\">KafkaListener</a> annotation to define the entry point for incoming messages. Spring takes care of most of the boilerplate code for us, in particular, we don’t need to write the logic to manually pull messages from the Kafka topic or worry about deserialization. All you need to do is provide the necessary elements in the configuration class. In the following code, the Spring Boot configuration is located in the <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/src/main/java/amazon/aws/samples/kafka/ApplicationConfig.java\" target=\"_blank\">ApplicationConfiguration</a> class:</p>\n<pre><code class=\"lang-\">Java\n</code></pre>\n<p>@Bean</p>\n<pre><code class=\"lang-\">public ConsumerFactory&lt;String, byte[]&gt; consumerFactory(KafkaConsumerProperties properties) {\n</code></pre>\n<p>Map&lt;String, Object&gt; configs = new HashMap&lt;&gt;();</p>\n<pre><code class=\"lang-\"> configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapAddress());\n</code></pre>\n<p>configs.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());</p>\n<pre><code class=\"lang-\"> configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);\n</code></pre>\n<p>configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);</p>\n<pre><code class=\"lang-\"> configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, &quot;SSL&quot;);\n</code></pre>\n<p>configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, properties.getTrustStoreLocation());</p>\n<pre><code class=\"lang-\"> LOGGER.info(configs.toString());\n</code></pre>\n<pre><code class=\"lang-\"> return new DefaultKafkaConsumerFactory&lt;&gt;(configs);\n</code></pre>\n<p>}</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>@Bean</p>\n<pre><code class=\"lang-\">public ConcurrentKafkaListenerContainerFactory&lt;?, ?&gt; kafkaListenerContainerFactory(ConsumerFactory&lt;String, byte[]&gt; consumerFactory) {\n</code></pre>\n<p>ConcurrentKafkaListenerContainerFactory&lt;String, byte[]&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();</p>\n<pre><code class=\"lang-\">return factory;\n</code></pre>\n<p>}</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>The preceding code sets up the Kafka consumer configuration. We get the bootstrap servers address string and Kafka consumer group ID from the environment variables that are set up during application deployment. By default, Amazon MSK uses TLS 1.2 for secure communication, so we need to set up SSL configuration in our application as well. For more information about encryption, see <a href=\"https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html\" target=\"_blank\">Amazon MSK Encryption</a>.</p>\n<p>For the deserialization of incoming Kafka messages, I use classes provided by the Apache Kafka library. To enable Spring to deserialize Kafka JSON messages into POJOs, I use the <code>ByteArrayDeserializer</code> class combined with<code> ByteArrayJsonMessageConverter</code>.That way, Spring simply passes bytes as is from the deserializer to the message converter, and the converter transforms bytes into Java objects using Jackson’s<code>ObjectMapper</code>underneath. I use this approach because it allows me to send plaintext JSON messages. We don’t need anything more sophisticated for the purpose of this post. Depending on your needs, you can use different combinations of deserializers and message converters or dedicated deserializers, such as <a href=\"https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html\" target=\"_blank\">KafkaAvroDeserializer</a>, which uses the schema registry to figure out the target type.</p>\n<p>For more information about how to use Apache Kafka with Spring framework please refer to the <a href=\"https://docs.spring.io/spring-kafka/reference/html/\" target=\"_blank\">Spring documentation</a>.</p>\n<h4><a id=\"Consumer_deployment_167\"></a><strong>Consumer deployment</strong></h4>\n<p>We complete three high-level steps to deploy the consumer application into Fargate.</p>\n<p>First, we need to build and package our application into an executable JAR. I use the <a href=\"https://maven.apache.org/plugins/maven-shade-plugin/\" target=\"_blank\">Apache Maven Shade plugin</a> with Spring Boot Maven plugin dependency. It’s configured in the consumer application <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/consumer/pom.xml\" target=\"_blank\">pom.xml</a>. The JAR is created during the package phase of the Maven project build and placed in the <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/tree/main/consumer/docker\" target=\"_blank\">consumer/docker</a> directory next to the Dockerfile.</p>\n<p>Next, we define the image used to create the ECS task container. To do that, we create a Dockerfile, which is a text file containing all the instructions and configuration necessary to assemble a Docker image. I use Amazon Linux 2 as a base for the image, additionally installing Java 11 <a href=\"https://aws.amazon.com/cn/corretto/?filtered-posts.sort-by=item.additionalFields.createdDate&amp;filtered-posts.sort-order=desc\" target=\"_blank\">Amazon Corretto</a> distribution, awslogs, and a CloudWatch agent. For the SSL configuration, we also need to copy the <code>truststore</code>file. In <a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/57edb764259629a66258ee968ad790c3c541bc4b/consumer/docker/Dockerfile#L9\" target=\"_blank\">line 9</a>, we copy the executable JAR built in the previous step from the local location into the image. The last line in the Dockerfile is an entry point starting the consumer application. It’s a standard Java command:</p>\n<pre><code class=\"lang-\">Bash\n</code></pre>\n<p>java -cp kafka-consumer-1.0-SNAPSHOT-shaded.jar amazon.aws.samples.kafka.ConsumerApplication</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>Finally, we reference the Dockerfile in the AWS CDK stack. We do this inside the <a href=\"\" target=\"_blank\">fargate-stack.ts</a> file. We define the infrastructure necessary to run our containerized application in the ECS task. To use the local Dockerfile image definition inside the AWS CDK stack, you need to create the asset <a href=\"https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-ecr-assets.DockerImageAsset.html\" target=\"_blank\">DockerImageAsset</a>:</p>\n<pre><code class=\"lang-\">TypeScript\n</code></pre>\n<p>const image = new assets.DockerImageAsset(this, “ConsumerImage”, {</p>\n<pre><code class=\"lang-\"> directory: '../consumer/docker'\n</code></pre>\n<p>});</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>Next, we reference this image asset in the definition of the ECS task using the <a href=\"https://docs.aws.amazon.com/cdk/api/v1/docs/@aws-cdk_aws-ecs.ContainerImage.html#static-fromwbrdockerwbrimagewbrassetasset\" target=\"_blank\">ContainerImage.fromDockerImageAsset</a> method:</p>\n<pre><code class=\"lang-\">TypeScript\n</code></pre>\n<p>fargateTaskDefinition.addContainer(“KafkaConsumer”, {</p>\n<pre><code class=\"lang-\"> image: ecs.ContainerImage.fromDockerImageAsset(image),\n</code></pre>\n<p>logging: ecs.LogDrivers.awsLogs({streamPrefix: ‘KafkaConsumer’}),</p>\n<pre><code class=\"lang-\"> environment: {\n</code></pre>\n<p>‘TABLE_NAME’: this.tableName,</p>\n<pre><code class=\"lang-\"> 'GROUP_ID': this.groupId,\n</code></pre>\n<p>‘BOOTSTRAP_ADDRESS’: bootstrapAddress.valueAsString,</p>\n<pre><code class=\"lang-\"> 'REGION': this.region,\n</code></pre>\n<p>‘TOPIC_NAME’: topicName.valueAsString</p>\n<pre><code class=\"lang-\"> }\n</code></pre>\n<p>});</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>During the AWS CDK stack deployment, the image defined in the Dockerfile is created and uploaded to an <a href=\"https://aws.amazon.com/cn/ecr/\" target=\"_blank\">Amazon Elastic Container Registry</a> (Amazon ECR) repository. That image is used to create and start the ECS task container, thereby starting our consumer application. For more information about other ways of obtaining images, see the <a href=\"https://docs.aws.amazon.com/cdk/api/v1/docs/aws-ecs-readme.html#images\" target=\"_blank\">Amazon ECS Construct Library</a>.</p>\n<h4><a id=\"Producer_implementation_details_225\"></a><strong>Producer implementation details</strong></h4>\n<p>Now let’s deploy our Kafka producer code. The AWS CDK stack definition for that part is located in the l<a href=\"https://github.com/aws-samples/amazon-msk-java-app-cdk/blob/main/amazon-msk-java-app-cdk/lib/lambda-stack.ts\" target=\"_blank\">ambda-stack.ts</a>file.</p>\n<pre><code class=\"lang-\">TypeScript\n</code></pre>\n<p>let transactionHandler = new NodejsFunction(this, “TransactionHandler”, {</p>\n<pre><code class=\"lang-\"> runtime: Runtime.NODEJS_14_X,\n</code></pre>\n<p>entry: ‘lambda/transaction-handler.ts’,</p>\n<pre><code class=\"lang-\"> handler: 'handler',\n</code></pre>\n<p>vpc: vpcStack.vpc,</p>\n<pre><code class=\"lang-\"> securityGroups: [vpcStack.lambdaSecurityGroup],\n</code></pre>\n<p>functionName: ‘TransactionHandler’,</p>\n<pre><code class=\"lang-\"> timeout: Duration.minutes(5),\n</code></pre>\n<p>environment: {</p>\n<pre><code class=\"lang-\"> 'BOOTSTRAP_ADDRESS': bootstrapAddress.valueAsString,\n</code></pre>\n<p>‘TOPIC_NAME’: topicName.valueAsString</p>\n<pre><code class=\"lang-\"> }\n</code></pre>\n<p>});</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>This is a relatively short piece of code. The Amazon CDK<code>NodejsFunction</code> construct allows us to package our business logic code and deploy it as a Node.js Lambda function to the AWS Cloud. Due to internal AWS CDK packaging and deployment logic, it makes your life easier if you place the directory containing your Lambda code in the AWS CDK root directory next to the bin and lib directories. In the properties, in the <code>entry</code> field, you have to point to the local file containing your code. This is the relative path from the AWS CDK root directory. You can pass environment variables inside of the<code>environment</code> field. For this post, I pass Kafka’s bootstrap address string and topic name that I need in order to communicate with the Kafka cluster and send messages from within the Lambda function. If esbuild is available, it’s used to bundle your code in your environment. Otherwise, bundling occurs in a Docker container. This means that if you don’t want to use esbuild, you have to start a Docker daemon before deploying your AWS CDK stack. For more information about the <code>NodejsFunction</code>construct, see the <a href=\"https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lambda-nodejs-readme.html\" target=\"_blank\">Amazon Lambda Node.js Library</a>.</p>\n<h4><a id=\"Execution_walk_through_262\"></a><strong>Execution walk through</strong></h4>\n<p>Once we deploy the application it’s time to test it. To trigger Lambda function and send a message to the Kafka queue you can use the following AWS CLI command.</p>\n<pre><code class=\"lang-\">Bash\n</code></pre>\n<p>aws lambda invoke --cli-binary-format raw-in-base64-out --function-name TransactionHandler --log-type Tail --payload ‘{ “accountId”: “account_123”, “value”: 456}’ /dev/stdout --query ‘LogResult’ --output text | base64 –d</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>Here you are adding 456 to the balance of the account <code>account_123</code>. Lambda function sends JSON message to the Amazon MSK cluster. The consumer application pulls the message from the Kafka topic in the form of bytes and transforms it to an instance of POJO class. Next the consumer business logic executes and the application stores results in the Amazon DynamoDB table. You can run following command to see the content of the table.</p>\n<pre><code class=\"lang-\">Bash\n</code></pre>\n<p>aws dynamodb scan --table-name Accounts --query “Items[*].[id.S,Balance.N]” --output text</p>\n<pre><code class=\"lang-\"></code></pre>\n<p>All the logs from execution are stored in Amazon CloudWatch. To view them you can go to AWS console or run <a href=\"https://awscli.amazonaws.com/v2/documentation/api/latest/reference/logs/tail.html\" target=\"_blank\">aws logs tail</a> command with specified CloudWatch Logs group.</p>\n<p>You can experiment with the application by sending multiple messages with different values of accountId and value fields of JSON payload.</p>\n<p>You can experiment with the application by sending multiple messages with different values of accountId and value fields of JSON payload.</p>\n<h4><a id=\"Conclusion_288\"></a><strong>Conclusion</strong></h4>\n<p>In this post, we discussed different techniques to implement and deploy your application using AWS CDK constructs, Java and Typescript application code. High-level AWS CDK constructs enable you to quickly define the cloud infrastructure of your system and let you focus more on implementing your business logic. You can use a mix of programing languages that best fit your use case and keep all your code and infrastructure definitions in one place.</p>\n<p>To run the code presented in this post, follow the prerequisites and usage steps described in the README file of the GitHub project.</p>\n<p>Stay tuned for more content about cloud application development. If you have any questions or suggestions, please leave a comment. I hope you have enjoyed reading this post and learned something new. If you did, please share with your colleagues. Happy coding!</p>\n<p><a href=\"https://aws.amazon.com/cn/blogs/developer/author/pcchotko/\" target=\"_blank\">More from this author</a></p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭
contact-us