{"value":"Building a robust data lake is very beneficial because it enables organizations have a holistic view of their business and empowers data-driven decisions. The curated layer of a data lake is able to hydrate multiple homogeneous data products, unlocking limitless capabilities to address current and future requirements. However, some concepts of how data lakes work feel counter-intuitive for professionals with a traditional database background.\n\nData lakes are by design append-only, meaning that the new records with an existing primary key don’t update the existing values out-of-the box; instead the new values are appended, resulting in having multiple occurrences of the same primary key in the data lake. Furthermore, special care needs to be taken to handle row deletions, and even if there is a way to identify deleted primary keys, it’s not straightforward to incorporate them to the data lake.\n\nTraditionally, processing between the different layers of data lakes is performed using distributed processing engines such as Apache Spark that can be deployed in a managed or serverless way using services such as Amazon EMR or Amazon Web Services Glue. Spark has recently introduced frameworks that give data lakes a flavor of ACID properties, such as Apache Hudi, Apache Iceberg, and Delta Lake. However, if you’re coming from a database background, there is a significant learning curve to adopt these technologies that involves understanding the concepts, moving away from SQL to a more general-purpose language, and adopting a specific ACID framework and its complexities.\n\nIn this post, we discuss how to implement a data lake that supports updates and deletions of individual rows using the Amazon Redshift ANSI SQL compliant syntax as our main processing language. We also take advantage of its serverless offering to address scenarios where consumption patterns don’t justify creating a managed Amazon Redshift cluster, which makes our solution cost-efficient. We use Python to interact with the Amazon Web Services API, but you can also use any other Amazon Web Services SDK. Finally, we use Amazon Web Services Glue auto-generated code to ingest data to [Amazon Redshift Serverless](https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-serverless.html) and Amazon Web Services Glue crawlers to create metadata for our datasets.\n\n\n#### **Solution overview**\n\n\nMost of the services we use for this post can be treated as placeholders. For instance, we use [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/) (with MongoDB compatibility) as our data source system, because one of the key features of a data lake is that it can support structured and unstructured data. We use [Amazon Web Services Database Migration Service](https://aws.amazon.com/cn/dms/) (Amazon Web Services DMS) to ingest data from Amazon DocumentDB to the data lake on [Amazon Simple Storage Service](https://aws.amazon.com/cn/s3/) (Amazon S3). The change data capture (CDC) capabilities of [Amazon Web Services Database Migration Service](https://aws.amazon.com/cn/dms/) (Amazon Web Services DMS) enable us to identify both updated and deleted rows that we want to propagate to the data lake. We use an Amazon Web Services Glue job to load raw data to Redshift Serverless to use job bookmarks, which allows each run of the load job to ingest new data. You could replace the Amazon Web Services Glue job with [Amazon Redshift Spectrum](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum.html) or the Amazon Redshift [COPY command](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) if there is a different way to identify newly arrived data.\n\nAfter new data is ingested to Amazon Redshift, we use it as our extract, transform, and load (ETL) engine. We trigger a stored procedure to curate the new data, upsert it to our existing table, and unload it to the data lake. To handle data deletions, we have created a [scalar UDF](https://docs.aws.amazon.com/redshift/latest/dg/udf-creating-a-lambda-sql-udf.html) in Amazon Web Services Lambda that we can call from Amazon Redshift to delete the S3 partitions that have been affected by the newly ingested dataset before rewriting them with the updated values. The following diagram showcases the architecture of our solution.\n\n![image.png](https://dev-media.amazoncloud.cn/62b00afc85ee4feeab16602a1648b046_image.png)\n\n\n#### **Data ingestion**\n\n\nThe dataset we use for this post is available on [GitHub](https://github.com/aws-samples/amazon-documentdb-samples/tree/master/datasets). After we create our Amazon DocumentDB instance (for this post, we used engine version 4.0.0 and the db.t3.medium instance class), we ingest the sample dataset. Ingested records look like the this:\n\n```\n{\n\t\"_id\": ObjectId(\"61f998d9b7599a1e904ae84d\"),\n\t\"Case_Type\": \"Confirmed\",\n\t\"Cases\": 1661,\n\t\"Difference\": 135,\n\t\"Date\": \"5/12/2020\",\n\t\"Country_Region\": \"Sudan\",\n\t\"Province_State\": \"N/A\",\n\t\"Admin2\": \"\",\n\t\"Combined_Key\": \"Sudan\",\n\t\"Lat\": 12.8628,\n\t\"Long\": 30.2176,\n\t\"Prep_Flow_Runtime\": \"6/5/2022 11:15:39 PM\"\n}\n```\n\nWe then create an Amazon Web Services DMS dms.t3.medium instance using engine version 3.4.6, a [source endpoint for Amazon DocumentDB](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.DocumentDB.html#CHAP_Source.DocumentDB.ConfigureCDC), and a [target endpoint for Amazon S3](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.S3.html), adding ```dataFormat=parquet```; as an extra configuration, so that records are stored in Parquet format in the landing zone of the data lake. After we confirm the connectivity of the endpoints using the **Test connection** feature, we create a database migration task for full load and ongoing replication. We can confirm data is migrated successfully to Amazon S3 by browsing in the S3 location and choosing **Query with S3 Select** on the Parquet file that has been generated. The result looks like the following screenshot.\n\n![image.png](https://dev-media.amazoncloud.cn/349b628d13844b42a92e1bc5c05f8c65_image.png)\n\nWe then catalog the ingested data using an Amazon Web Services Glue crawler on the landing zone S3 bucket, so that we can query the data with [Amazon Athena](https://aws.amazon.com/cn/athena/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc) and process it with Amazon Web Services Glue.\n\n\n\n#### **Set up Redshift Serverless**\n\n\nAmazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers. It’s not uncommon to want to take advantage of the rich features Amazon Redshift provides without having a workload demanding enough to justify purchasing an Amazon Redshift provisioned cluster. To address such scenarios, we have launched [Redshift Serverless](https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-serverless.html), which includes all the features provisioned clusters have but enables you to only pay for the workloads you run rather than paying for the entire time your Amazon Redshift cluster is up.\n\nOur next step is to [set up our Redshift Serverless instance](https://docs.aws.amazon.com/redshift/latest/mgmt/serverless-console-getting-started.html). On the Amazon Redshift console, we choose **Redshift Serverless**, select the required settings (similar to creating a provisioned Amazon Redshift cluster), and choose **Save configuration**. If you intend to access your Redshift Serverless instance via a JDBC client, you might need to [enable public access](https://aws.amazon.com/cn/premiumsupport/knowledge-center/redshift-cluster-private-public/). After the setup is complete, we can start using the Redshift Serverless endpoint.\n\n\n#### **Load data to Redshift Serverless**\n\n\nTo confirm our serverless point is accessible, we can create an Amazon Web Services Glue connection and test its connectivity. The type of the connection is JDBC, and we can find our serverless JDBC URL from the **Workgroup configuration** section on our Redshift Serverless console. For the connection to be successful, we need to configure the connectivity between the Amazon Redshift and Amazon Web Services Glue security groups. For more details, refer to [Connecting to a JDBC Data Store in a VPC](https://docs.aws.amazon.com/glue/latest/dg/connection-JDBC-VPC.html). After connectivity is configured correctly, we can test the connection and if the test is successful, we should see the following message.\n\n![image.png](https://dev-media.amazoncloud.cn/b0a42be028e54d54b70a759a7e656c9b_image.png)\n\nWe use an Amazon Web Services Glue job to load the historical dataset to Amazon Redshift; however, we don’t apply any business logic at this step because we want to implement our transformations using ANSI SQL in Amazon Redshift. Our Amazon Web Services Glue job is mostly auto-generated boilerplate code:\n\n\n```\nimport sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\n\nargs = getResolvedOptions(sys.argv, [\"JOB_NAME\"])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args[\"JOB_NAME\"], args)\n\ndatasource0 = glueContext.create_dynamic_frame.from_catalog(database = \"document-landing\", table_name = \"cases\", transformation_ctx = \"datasource0\")\n\nglueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = \"redshift-serverless\", connection_options = {\"dbtable\": \"cases_stage\", \"database\": \"dev\"}, redshift_tmp_dir = args[\"TempDir\"], transformation_ctx = \"datasink4\")\n\njob.commit()\n```\n\nThe job reads the table ```cases``` from the database ```document_landing``` from our Amazon Web Services Glue Data Catalog, created by the crawler after data ingestion. It copies the underlying data to the table ```cases_stage``` in the database dev of the cluster defined in the Amazon Web Services Glue connection ```redshift-serverless```. We can run this job and use the Amazon Redshift query editor v2 on the Amazon Redshift console to confirm its success by seeing the newly created table, as shown in the following screenshot.\n\n![image.png](https://dev-media.amazoncloud.cn/38c71a1506904d6a8b1904c8092cb16d_image.png)\n\nWe can now query and transform the historical data using Redshift Serverless.\n\n\n#### **Transform stored procedure**\n\n\nWe have created a stored procedure that performs some common transformations, such as converting a text to date and extracting the year, month, and day from it, and creating some boolean flag fields. After the transformations are performed and stored to the ```cases``` table, we want to unload the data to our data lake S3 bucket and finally empty the ```cases_stage``` table, preparing it to receive the next CDC load of our pipeline. See the following code:\n\n```\nCREATE OR REPLACE PROCEDURE public.historicalload() LANGUAGE plpgsql AS $$\nDECLARE\n sql text;\n s3folder varchar(65535);\n iamrole varchar(1000);\n unload varchar(65535);\nbegin\n create table cases as (\n SELECT oid__id\n , case_type\n , cases\n , difference\n , TO_DATE(date, 'mm/dd/yyyy') date\n , country_region\n , province_state\n , admin2\n , combined_key\n , lat\n , long\n , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime\n , fips\n , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year\n , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month\n , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day\n , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death\n , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed\n FROM \"dev\".\"public\".\"cases_stage\");\n sql:='select * from \"dev\".\"public\".\"cases\"';\n s3folder:= s3://object-path/name-prefix ';\n iamrole:='arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>';\n unload := 'unload ('''||sql||''') to '''||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)';\n execute unload;\n truncate \"dev\".\"public\".\"cases_stage\";\nEND;\n$$\n```\n\nNote that for unloading data to Amazon S3, we need to create the [Amazon Web Services Identity and Access Management](https://aws.amazon.com/cn/iam/) (IAM) role ```arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>```, give it the required Amazon S3 permissions, and [associate it with our Redshift Serverless instance](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-add-role.html). Calling the stored procedure after ingesting the historical dataset to the ```cases_stage``` table results in loading the transformed and partitioned dataset in the S3 folder specified in Parquet format. After we crawl the folder with an Amazon Web Services Glue crawler, we can verify the results using Athena.\n\n![image.png](https://dev-media.amazoncloud.cn/8b580392d5354578b8453fc9d46a8157_image.png)\n\nWe can revisit the loading Amazon Web Services Glue job we described before to automatically invoke the steps we triggered manually. We can use an Amazon Redshift [postactions](https://aws.amazon.com/cn/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/) configuration to trigger the stored procedure as soon as the loading of the staging table is complete. We can also use Boto3 to trigger the Amazon Web Services Glue crawler when data is unloaded to Amazon S3. Therefore, the revisited Amazon Web Services Glue job looks like the following code:\n\n```\nimport sys\nimport boto3\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\n\nargs = getResolvedOptions(sys.argv, [\"JOB_NAME\"])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args[\"JOB_NAME\"], args)\n\nglueClient = boto3.client('glue')\n\ndatasource0 = glueContext.create_dynamic_frame.from_catalog(database = \"document-landing\", table_name = \"cases\", transformation_ctx = \"datasource0\")\n\nglueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = \"redshift-serverless\", connection_options = {\"dbtable\": \"cases_stage\", \"database\": \"dev\", \"postactions\":post_query}, redshift_tmp_dir = args[\"TempDir\"], transformation_ctx = \"datasink4\")\nglueClient = boto3.client('glue')\nresponse = glueClient.start_crawler(Name='document-lake')\njob.commit()\n```\n\n\n#### **CDC load**\n\n\nBecause the Amazon Web Services DMS migration task we configured is for full load and ongoing replication, it can capture updates and deletions on a record level. Updates come with an OP column with value U, such as in the following example:\n\n```\nU,61f998d9b7599a1e843038cc,Confirmed,25,1,3/24/2020,Sudan,N/A,,Sudan,12.8628,30.2176,6/4/2020 11:15:39 PM,\n```\n\nDeletions have an ```OP``` column with value ```D```, the primary key to be deleted, and all other fields empty:\n\n```\nD,61f998d9b7599a1e904ae941,,,,,,,,,,,,\n```\n\nEnabling [job bookmarks](https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html) for our Amazon Web Services Glue job guarantees that subsequent runs of the job only process new data since the last checkpoint that was set in the previous run of the job without any changes in our code. After the CDC data is loaded in the staging table, our next step is to delete rows with OP column D, perform a [merge operation to replace existing rows](https://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html), and upload the result to our S3 bucket. However, because S3 objects are immutable, we need to delete the partitions that would be affected by the latest ingested dataset and rewrite them from Amazon Redshift. Amazon Redshift doesn’t have an out-of-the-box way to delete S3 objects; however, we can use a [scalar Lambda UDF](https://docs.aws.amazon.com/redshift/latest/dg/udf-creating-a-lambda-sql-udf.html) that takes as an argument the partition to be deleted and removes all the objects under the partition. The Python code of our Lambda function uses Boto3 and looks like the following example:\n\n```\nimport json\nimport boto3\n\ndatalakeBucket = '<DATA-LAKE-BUCKET>'\nfolder = 'prod/cases/'\n \ndef lambda_handler(event, context):\n ret = {}\n res = []\n s3Resource = boto3.resource('s3')\n bucket = s3Resource.Bucket(datalakeBucket)\n for argument in event['arguments']:\n partition = argument[0]\n bucket.objects.filter(Prefix=folder + partition).delete()\n res.append(True)\n ret['success'] = True\n ret['results'] = res\n return json.dumps(ret)\n```\n\nWe then need to register the Lambda UDF from in our Amazon Redshift cluster by running the following code:\n\n```\ncreate or replace external function deletePartition(path VARCHAR)\nreturns boolean stable \nLAMBDA 'deletePartition'\n```\n\nThe last edge case we might need to take care of is the scenario of a CDC dataset containing multiple records for the same dataset. The approach we take here is to use another date field that is available in the dataset, ```prep_flow_time```, to keep the latest record. To implement this logic in SQL, we use a nested query with the ```row_number``` window function.\n\nOn a high level, the upsert stored procedure includes the following steps:\nIAM_ROLE 'arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>';\n\n1. Delete partitions that are being updated from the S3 data lake, using the scalar Lambda UDF.\n2. Delete rows that are being updated from Amazon Redshift.\n3. Implement the transformations and compute the latest values of the updated rows.\n4. Insert them into the target table in Amazon Redshift.\n5. Unload the affected partitions to the S3 data lake.\n6. Truncate the staging table.\n\nSee the following code:\n\n```\nCREATE OR REPLACE PROCEDURE upsert()\nAS $$\nDECLARE\n sql text;\n deletePartition text;\n s3folder varchar(65535);\n iamrole varchar(1000);\n unload varchar(65535);\nbegin\n \n drop table if exists affected_dates;\n create temp table affected_dates as select date from cases where oid__id in (select distinct oid__id from cases_stage cs);\n deletePartition:='select deletePartition(partitionPath) from \n (select distinct ''year='' || year::VARCHAR || ''/month='' || month::VARCHAR || ''/day='' || day::VARCHAR partitionPath\n from cases\n where date in (select date from affected_dates));';\n execute deletePartition;\n\t\n delete from cases using cases_stage \n where cases.oid__id = cases_stage.oid__id;\n insert into cases \n select oid__id\n , case_type\n , cases\n , difference\n , date\n , country_region\n , province_state\n , admin2\n , combined_key\n , lat\n , long\n , prep_flow_runtime\n , fips\n , year\n , month\n , day\n , is_death\n , is_confirmed\nfrom\n(SELECT row_number() over (partition by oid__id order by TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') desc) seq\n , oid__id\n , case_type\n , cases\n , difference\n , TO_DATE(date, 'mm/dd/yyyy') date\n , country_region\n , province_state\n , admin2\n , combined_key\n , lat\n , long\n , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime\n , fips\n , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year\n , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month\n , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day\n , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death\n , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed\n from cases_stage where op = 'U') where seq = 1;\n \n sql:='select *\n from cases\n where date in (select date from affected_dates);';\n s3folder:='s3://<DATA-LAKE-BUCKET>/prod/cases/';\n iamrole:=' arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>';\n unload := 'unload ('''||sql||''') to ''' ||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)'; \n execute unload;\n truncate cases_stage;\nEND;\n$$ LANGUAGE plpgsql;\n```\n\nFinally, we can modify the aforementioned Amazon Web Services Glue job to use Boto3 to decide whether this is a historical load or a CDC by using Boto3 to check if the table exists in the Amazon Web Services Glue Data Catalog. The final version of the Amazon Web Services Glue job is as follows:\n\n```\nimport sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\n\nargs = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])\n\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\nspark.sql(\"set spark.sql.legacy.timeParserPolicy=LEGACY\")\njob = Job(glueContext)\njob.init(args['JOB_NAME'], args)\nglueClient = boto3.client('glue')\ntableExists = True\npost_query=\"call upsert();\"\ntry:\n response = glueClient.get_table(DatabaseName='document-data-lake', Name='cases')\nexcept glueClient.exceptions.EntityNotFoundException:\n tableExists = False\n post_query=\"call historicalLoad();\"\n\ndatasource0 = glueContext.create_dynamic_frame.from_catalog(database = \"document-landing\", table_name = \"cases\", transformation_ctx = \"datasource0\")\n\nglueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = \"redshift-serverless\", connection_options = {\"dbtable\": \"cases_stage\", \"database\": \"dev\", \"postactions\":post_query}, redshift_tmp_dir = args[\"TempDir\"], transformation_ctx = \"datasink4\")\nif not tableExists:\n response = glueClient.start_crawler(Name='document-lake')\n\njob.commit()\n```\n\nRunning this job for the first time with job bookmarks enabled loads the historical data to the Redshift Serverless staging table and triggers the historical load stored procedure, which in turn performs the transformations we implemented using SQL and unloads the result to the S3 data lake. Subsequent runs ingest newly arrived data from the landing zone, ingests them to the staging table, and performs the upsert process, deleting and repopulating the affected partitions in the data lake.\n\n\n#### **Conclusion**\n\n\nBuilding a modern data lake that maintains a most recent view involves some edge cases that aren’t intuitive if you come from a RDBMS background. In this post, we described an Amazon Web Services native approach that takes advantage of the rich features and SQL syntax of Amazon Redshift, paying only for the resources used thanks to Redshift Serverless and without using any external framework.\n\nThe introduction of Amazon Redshift Serverless unlocks the hundreds of Redshift features released every year to users that do not require a cluster that’s always up and running. You can start experimenting with this approach of managing your data lake with Redshift, as well as addressing other use cases that are now easier to solve with Redshift Serverless.\n\n\n#### **About the author**\n\n\n![image.png](https://dev-media.amazoncloud.cn/71a71e6c92bd4849997cec6e57bf3b59_image.png)\n\n**George Komninos** is a solutions architect for the Amazon Web Services Data Lab. He helps customers convert their ideas to a production-ready data product. Before Amazon Web Services, he spent three years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.","render":"<p>Building a robust data lake is very beneficial because it enables organizations have a holistic view of their business and empowers data-driven decisions. The curated layer of a data lake is able to hydrate multiple homogeneous data products, unlocking limitless capabilities to address current and future requirements. However, some concepts of how data lakes work feel counter-intuitive for professionals with a traditional database background.</p>\n<p>Data lakes are by design append-only, meaning that the new records with an existing primary key don’t update the existing values out-of-the box; instead the new values are appended, resulting in having multiple occurrences of the same primary key in the data lake. Furthermore, special care needs to be taken to handle row deletions, and even if there is a way to identify deleted primary keys, it’s not straightforward to incorporate them to the data lake.</p>\n<p>Traditionally, processing between the different layers of data lakes is performed using distributed processing engines such as Apache Spark that can be deployed in a managed or serverless way using services such as Amazon EMR or Amazon Web Services Glue. Spark has recently introduced frameworks that give data lakes a flavor of ACID properties, such as Apache Hudi, Apache Iceberg, and Delta Lake. However, if you’re coming from a database background, there is a significant learning curve to adopt these technologies that involves understanding the concepts, moving away from SQL to a more general-purpose language, and adopting a specific ACID framework and its complexities.</p>\n<p>In this post, we discuss how to implement a data lake that supports updates and deletions of individual rows using the Amazon Redshift ANSI SQL compliant syntax as our main processing language. We also take advantage of its serverless offering to address scenarios where consumption patterns don’t justify creating a managed Amazon Redshift cluster, which makes our solution cost-efficient. We use Python to interact with the Amazon Web Services API, but you can also use any other Amazon Web Services SDK. Finally, we use Amazon Web Services Glue auto-generated code to ingest data to <a href=\"https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-serverless.html\" target=\"_blank\">Amazon Redshift Serverless</a> and Amazon Web Services Glue crawlers to create metadata for our datasets.</p>\n<h4><a id=\"Solution_overview_9\"></a><strong>Solution overview</strong></h4>\n<p>Most of the services we use for this post can be treated as placeholders. For instance, we use <a href=\"https://aws.amazon.com/cn/documentdb/\" target=\"_blank\">Amazon DocumentDB</a> (with MongoDB compatibility) as our data source system, because one of the key features of a data lake is that it can support structured and unstructured data. We use <a href=\"https://aws.amazon.com/cn/dms/\" target=\"_blank\">Amazon Web Services Database Migration Service</a> (Amazon Web Services DMS) to ingest data from Amazon DocumentDB to the data lake on <a href=\"https://aws.amazon.com/cn/s3/\" target=\"_blank\">Amazon Simple Storage Service</a> (Amazon S3). The change data capture (CDC) capabilities of <a href=\"https://aws.amazon.com/cn/dms/\" target=\"_blank\">Amazon Web Services Database Migration Service</a> (Amazon Web Services DMS) enable us to identify both updated and deleted rows that we want to propagate to the data lake. We use an Amazon Web Services Glue job to load raw data to Redshift Serverless to use job bookmarks, which allows each run of the load job to ingest new data. You could replace the Amazon Web Services Glue job with <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum.html\" target=\"_blank\">Amazon Redshift Spectrum</a> or the Amazon Redshift <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html\" target=\"_blank\">COPY command</a> if there is a different way to identify newly arrived data.</p>\n<p>After new data is ingested to Amazon Redshift, we use it as our extract, transform, and load (ETL) engine. We trigger a stored procedure to curate the new data, upsert it to our existing table, and unload it to the data lake. To handle data deletions, we have created a <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/udf-creating-a-lambda-sql-udf.html\" target=\"_blank\">scalar UDF</a> in Amazon Web Services Lambda that we can call from Amazon Redshift to delete the S3 partitions that have been affected by the newly ingested dataset before rewriting them with the updated values. The following diagram showcases the architecture of our solution.</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/62b00afc85ee4feeab16602a1648b046_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"Data_ingestion_19\"></a><strong>Data ingestion</strong></h4>\n<p>The dataset we use for this post is available on <a href=\"https://github.com/aws-samples/amazon-documentdb-samples/tree/master/datasets\" target=\"_blank\">GitHub</a>. After we create our Amazon DocumentDB instance (for this post, we used engine version 4.0.0 and the db.t3.medium instance class), we ingest the sample dataset. Ingested records look like the this:</p>\n<pre><code class=\"lang-\">{\n\t"_id": ObjectId("61f998d9b7599a1e904ae84d"),\n\t"Case_Type": "Confirmed",\n\t"Cases": 1661,\n\t"Difference": 135,\n\t"Date": "5/12/2020",\n\t"Country_Region": "Sudan",\n\t"Province_State": "N/A",\n\t"Admin2": "",\n\t"Combined_Key": "Sudan",\n\t"Lat": 12.8628,\n\t"Long": 30.2176,\n\t"Prep_Flow_Runtime": "6/5/2022 11:15:39 PM"\n}\n</code></pre>\n<p>We then create an Amazon Web Services DMS dms.t3.medium instance using engine version 3.4.6, a <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.DocumentDB.html#CHAP_Source.DocumentDB.ConfigureCDC\" target=\"_blank\">source endpoint for Amazon DocumentDB</a>, and a <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.S3.html\" target=\"_blank\">target endpoint for Amazon S3</a>, adding <code>dataFormat=parquet</code>; as an extra configuration, so that records are stored in Parquet format in the landing zone of the data lake. After we confirm the connectivity of the endpoints using the <strong>Test connection</strong> feature, we create a database migration task for full load and ongoing replication. We can confirm data is migrated successfully to Amazon S3 by browsing in the S3 location and choosing <strong>Query with S3 Select</strong> on the Parquet file that has been generated. The result looks like the following screenshot.</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/349b628d13844b42a92e1bc5c05f8c65_image.png\" alt=\"image.png\" /></p>\n<p>We then catalog the ingested data using an Amazon Web Services Glue crawler on the landing zone S3 bucket, so that we can query the data with <a href=\"https://aws.amazon.com/cn/athena/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc\" target=\"_blank\">Amazon Athena</a> and process it with Amazon Web Services Glue.</p>\n<h4><a id=\"Set_up_Redshift_Serverless_49\"></a><strong>Set up Redshift Serverless</strong></h4>\n<p>Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers. It’s not uncommon to want to take advantage of the rich features Amazon Redshift provides without having a workload demanding enough to justify purchasing an Amazon Redshift provisioned cluster. To address such scenarios, we have launched <a href=\"https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-serverless.html\" target=\"_blank\">Redshift Serverless</a>, which includes all the features provisioned clusters have but enables you to only pay for the workloads you run rather than paying for the entire time your Amazon Redshift cluster is up.</p>\n<p>Our next step is to <a href=\"https://docs.aws.amazon.com/redshift/latest/mgmt/serverless-console-getting-started.html\" target=\"_blank\">set up our Redshift Serverless instance</a>. On the Amazon Redshift console, we choose <strong>Redshift Serverless</strong>, select the required settings (similar to creating a provisioned Amazon Redshift cluster), and choose <strong>Save configuration</strong>. If you intend to access your Redshift Serverless instance via a JDBC client, you might need to <a href=\"https://aws.amazon.com/cn/premiumsupport/knowledge-center/redshift-cluster-private-public/\" target=\"_blank\">enable public access</a>. After the setup is complete, we can start using the Redshift Serverless endpoint.</p>\n<h4><a id=\"Load_data_to_Redshift_Serverless_57\"></a><strong>Load data to Redshift Serverless</strong></h4>\n<p>To confirm our serverless point is accessible, we can create an Amazon Web Services Glue connection and test its connectivity. The type of the connection is JDBC, and we can find our serverless JDBC URL from the <strong>Workgroup configuration</strong> section on our Redshift Serverless console. For the connection to be successful, we need to configure the connectivity between the Amazon Redshift and Amazon Web Services Glue security groups. For more details, refer to <a href=\"https://docs.aws.amazon.com/glue/latest/dg/connection-JDBC-VPC.html\" target=\"_blank\">Connecting to a JDBC Data Store in a VPC</a>. After connectivity is configured correctly, we can test the connection and if the test is successful, we should see the following message.</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/b0a42be028e54d54b70a759a7e656c9b_image.png\" alt=\"image.png\" /></p>\n<p>We use an Amazon Web Services Glue job to load the historical dataset to Amazon Redshift; however, we don’t apply any business logic at this step because we want to implement our transformations using ANSI SQL in Amazon Redshift. Our Amazon Web Services Glue job is mostly auto-generated boilerplate code:</p>\n<pre><code class=\"lang-\">import sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\n\nargs = getResolvedOptions(sys.argv, ["JOB_NAME"])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args["JOB_NAME"], args)\n\ndatasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")\n\nglueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")\n\njob.commit()\n</code></pre>\n<p>The job reads the table <code>cases</code> from the database <code>document_landing</code> from our Amazon Web Services Glue Data Catalog, created by the crawler after data ingestion. It copies the underlying data to the table <code>cases_stage</code> in the database dev of the cluster defined in the Amazon Web Services Glue connection <code>redshift-serverless</code>. We can run this job and use the Amazon Redshift query editor v2 on the Amazon Redshift console to confirm its success by seeing the newly created table, as shown in the following screenshot.</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/38c71a1506904d6a8b1904c8092cb16d_image.png\" alt=\"image.png\" /></p>\n<p>We can now query and transform the historical data using Redshift Serverless.</p>\n<h4><a id=\"Transform_stored_procedure_96\"></a><strong>Transform stored procedure</strong></h4>\n<p>We have created a stored procedure that performs some common transformations, such as converting a text to date and extracting the year, month, and day from it, and creating some boolean flag fields. After the transformations are performed and stored to the <code>cases</code> table, we want to unload the data to our data lake S3 bucket and finally empty the <code>cases_stage</code> table, preparing it to receive the next CDC load of our pipeline. See the following code:</p>\n<pre><code class=\"lang-\">CREATE OR REPLACE PROCEDURE public.historicalload() LANGUAGE plpgsql AS $$\nDECLARE\n sql text;\n s3folder varchar(65535);\n iamrole varchar(1000);\n unload varchar(65535);\nbegin\n create table cases as (\n SELECT oid__id\n , case_type\n , cases\n , difference\n , TO_DATE(date, 'mm/dd/yyyy') date\n , country_region\n , province_state\n , admin2\n , combined_key\n , lat\n , long\n , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime\n , fips\n , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year\n , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month\n , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day\n , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death\n , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed\n FROM "dev"."public"."cases_stage");\n sql:='select * from "dev"."public"."cases"';\n s3folder:= s3://object-path/name-prefix ';\n iamrole:='arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>';\n unload := 'unload ('''||sql||''') to '''||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)';\n execute unload;\n truncate "dev"."public"."cases_stage";\nEND;\n$$\n</code></pre>\n<p>Note that for unloading data to Amazon S3, we need to create the <a href=\"https://aws.amazon.com/cn/iam/\" target=\"_blank\">Amazon Web Services Identity and Access Management</a> (IAM) role <code>arn:aws:iam::<Amazon Web Services account-id>:role/<role-name></code>, give it the required Amazon S3 permissions, and <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-add-role.html\" target=\"_blank\">associate it with our Redshift Serverless instance</a>. Calling the stored procedure after ingesting the historical dataset to the <code>cases_stage</code> table results in loading the transformed and partitioned dataset in the S3 folder specified in Parquet format. After we crawl the folder with an Amazon Web Services Glue crawler, we can verify the results using Athena.</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/8b580392d5354578b8453fc9d46a8157_image.png\" alt=\"image.png\" /></p>\n<p>We can revisit the loading Amazon Web Services Glue job we described before to automatically invoke the steps we triggered manually. We can use an Amazon Redshift <a href=\"https://aws.amazon.com/cn/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/\" target=\"_blank\">postactions</a> configuration to trigger the stored procedure as soon as the loading of the staging table is complete. We can also use Boto3 to trigger the Amazon Web Services Glue crawler when data is unloaded to Amazon S3. Therefore, the revisited Amazon Web Services Glue job looks like the following code:</p>\n<pre><code class=\"lang-\">import sys\nimport boto3\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\n\nargs = getResolvedOptions(sys.argv, ["JOB_NAME"])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args["JOB_NAME"], args)\n\nglueClient = boto3.client('glue')\n\ndatasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")\n\nglueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev", "postactions":post_query}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")\nglueClient = boto3.client('glue')\nresponse = glueClient.start_crawler(Name='document-lake')\njob.commit()\n</code></pre>\n<h4><a id=\"CDC_load_172\"></a><strong>CDC load</strong></h4>\n<p>Because the Amazon Web Services DMS migration task we configured is for full load and ongoing replication, it can capture updates and deletions on a record level. Updates come with an OP column with value U, such as in the following example:</p>\n<pre><code class=\"lang-\">U,61f998d9b7599a1e843038cc,Confirmed,25,1,3/24/2020,Sudan,N/A,,Sudan,12.8628,30.2176,6/4/2020 11:15:39 PM,\n</code></pre>\n<p>Deletions have an <code>OP</code> column with value <code>D</code>, the primary key to be deleted, and all other fields empty:</p>\n<pre><code class=\"lang-\">D,61f998d9b7599a1e904ae941,,,,,,,,,,,,\n</code></pre>\n<p>Enabling <a href=\"https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html\" target=\"_blank\">job bookmarks</a> for our Amazon Web Services Glue job guarantees that subsequent runs of the job only process new data since the last checkpoint that was set in the previous run of the job without any changes in our code. After the CDC data is loaded in the staging table, our next step is to delete rows with OP column D, perform a <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html\" target=\"_blank\">merge operation to replace existing rows</a>, and upload the result to our S3 bucket. However, because S3 objects are immutable, we need to delete the partitions that would be affected by the latest ingested dataset and rewrite them from Amazon Redshift. Amazon Redshift doesn’t have an out-of-the-box way to delete S3 objects; however, we can use a <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/udf-creating-a-lambda-sql-udf.html\" target=\"_blank\">scalar Lambda UDF</a> that takes as an argument the partition to be deleted and removes all the objects under the partition. The Python code of our Lambda function uses Boto3 and looks like the following example:</p>\n<pre><code class=\"lang-\">import json\nimport boto3\n\ndatalakeBucket = '<DATA-LAKE-BUCKET>'\nfolder = 'prod/cases/'\n \ndef lambda_handler(event, context):\n ret = {}\n res = []\n s3Resource = boto3.resource('s3')\n bucket = s3Resource.Bucket(datalakeBucket)\n for argument in event['arguments']:\n partition = argument[0]\n bucket.objects.filter(Prefix=folder + partition).delete()\n res.append(True)\n ret['success'] = True\n ret['results'] = res\n return json.dumps(ret)\n</code></pre>\n<p>We then need to register the Lambda UDF from in our Amazon Redshift cluster by running the following code:</p>\n<pre><code class=\"lang-\">create or replace external function deletePartition(path VARCHAR)\nreturns boolean stable \nLAMBDA 'deletePartition'\n</code></pre>\n<p>The last edge case we might need to take care of is the scenario of a CDC dataset containing multiple records for the same dataset. The approach we take here is to use another date field that is available in the dataset, <code>prep_flow_time</code>, to keep the latest record. To implement this logic in SQL, we use a nested query with the <code>row_number</code> window function.</p>\n<p>On a high level, the upsert stored procedure includes the following steps:<br />\nIAM_ROLE ‘arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>’;</p>\n<ol>\n<li>Delete partitions that are being updated from the S3 data lake, using the scalar Lambda UDF.</li>\n<li>Delete rows that are being updated from Amazon Redshift.</li>\n<li>Implement the transformations and compute the latest values of the updated rows.</li>\n<li>Insert them into the target table in Amazon Redshift.</li>\n<li>Unload the affected partitions to the S3 data lake.</li>\n<li>Truncate the staging table.</li>\n</ol>\n<p>See the following code:</p>\n<pre><code class=\"lang-\">CREATE OR REPLACE PROCEDURE upsert()\nAS $$\nDECLARE\n sql text;\n deletePartition text;\n s3folder varchar(65535);\n iamrole varchar(1000);\n unload varchar(65535);\nbegin\n \n drop table if exists affected_dates;\n create temp table affected_dates as select date from cases where oid__id in (select distinct oid__id from cases_stage cs);\n deletePartition:='select deletePartition(partitionPath) from \n (select distinct ''year='' || year::VARCHAR || ''/month='' || month::VARCHAR || ''/day='' || day::VARCHAR partitionPath\n from cases\n where date in (select date from affected_dates));';\n execute deletePartition;\n\t\n delete from cases using cases_stage \n where cases.oid__id = cases_stage.oid__id;\n insert into cases \n select oid__id\n , case_type\n , cases\n , difference\n , date\n , country_region\n , province_state\n , admin2\n , combined_key\n , lat\n , long\n , prep_flow_runtime\n , fips\n , year\n , month\n , day\n , is_death\n , is_confirmed\nfrom\n(SELECT row_number() over (partition by oid__id order by TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') desc) seq\n , oid__id\n , case_type\n , cases\n , difference\n , TO_DATE(date, 'mm/dd/yyyy') date\n , country_region\n , province_state\n , admin2\n , combined_key\n , lat\n , long\n , TO_DATE(prep_flow_runtime, 'mm/dd/yyyy') prep_flow_runtime\n , fips\n , DATE_PART(year, TO_DATE(date, 'mm/dd/yyyy'))::smallint as year\n , DATE_PART(month, TO_DATE(date, 'mm/dd/yyyy'))::smallint as month\n , DATE_PART(day, TO_DATE(date, 'mm/dd/yyyy'))::smallint as day\n , CASE WHEN case_type = 'Deaths' then 1 else 0 end is_death\n , CASE WHEN case_type = 'Confirmed' then 1 else 0 end is_confirmed\n from cases_stage where op = 'U') where seq = 1;\n \n sql:='select *\n from cases\n where date in (select date from affected_dates);';\n s3folder:='s3://<DATA-LAKE-BUCKET>/prod/cases/';\n iamrole:=' arn:aws:iam::<Amazon Web Services account-id>:role/<role-name>';\n unload := 'unload ('''||sql||''') to ''' ||s3folder||''' iam_role '''||iamrole||''' ALLOWOVERWRITE MAXFILESIZE 100 MB PARALLEL PARQUET PARTITION BY (year,month,day)'; \n execute unload;\n truncate cases_stage;\nEND;\n$$ LANGUAGE plpgsql;\n</code></pre>\n<p>Finally, we can modify the aforementioned Amazon Web Services Glue job to use Boto3 to decide whether this is a historical load or a CDC by using Boto3 to check if the table exists in the Amazon Web Services Glue Data Catalog. The final version of the Amazon Web Services Glue job is as follows:</p>\n<pre><code class=\"lang-\">import sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\n\nargs = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])\n\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\nspark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")\njob = Job(glueContext)\njob.init(args['JOB_NAME'], args)\nglueClient = boto3.client('glue')\ntableExists = True\npost_query="call upsert();"\ntry:\n response = glueClient.get_table(DatabaseName='document-data-lake', Name='cases')\nexcept glueClient.exceptions.EntityNotFoundException:\n tableExists = False\n post_query="call historicalLoad();"\n\ndatasource0 = glueContext.create_dynamic_frame.from_catalog(database = "document-landing", table_name = "cases", transformation_ctx = "datasource0")\n\nglueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "redshift-serverless", connection_options = {"dbtable": "cases_stage", "database": "dev", "postactions":post_query}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")\nif not tableExists:\n response = glueClient.start_crawler(Name='document-lake')\n\njob.commit()\n</code></pre>\n<p>Running this job for the first time with job bookmarks enabled loads the historical data to the Redshift Serverless staging table and triggers the historical load stored procedure, which in turn performs the transformations we implemented using SQL and unloads the result to the S3 data lake. Subsequent runs ingest newly arrived data from the landing zone, ingests them to the staging table, and performs the upsert process, deleting and repopulating the affected partitions in the data lake.</p>\n<h4><a id=\"Conclusion_345\"></a><strong>Conclusion</strong></h4>\n<p>Building a modern data lake that maintains a most recent view involves some edge cases that aren’t intuitive if you come from a RDBMS background. In this post, we described an Amazon Web Services native approach that takes advantage of the rich features and SQL syntax of Amazon Redshift, paying only for the resources used thanks to Redshift Serverless and without using any external framework.</p>\n<p>The introduction of Amazon Redshift Serverless unlocks the hundreds of Redshift features released every year to users that do not require a cluster that’s always up and running. You can start experimenting with this approach of managing your data lake with Redshift, as well as addressing other use cases that are now easier to solve with Redshift Serverless.</p>\n<h4><a id=\"About_the_author_353\"></a><strong>About the author</strong></h4>\n<p><img src=\"https://dev-media.amazoncloud.cn/71a71e6c92bd4849997cec6e57bf3b59_image.png\" alt=\"image.png\" /></p>\n<p><strong>George Komninos</strong> is a solutions architect for the Amazon Web Services Data Lab. He helps customers convert their ideas to a production-ready data product. Before Amazon Web Services, he spent three years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.</p>\n"}