Build THIS Real Time CDC in AWS
Important Links
Introduction
In this video, I am going to show you how to build a scalable CDC solution using AWS S3, Glue, and Lambda.
Architecture Overview
RDS (Source Database): We start with an RDS MySQL database, which acts as our source for transactional data.
AWS DMS (Database Migration Service): DMS is used to capture changes in real-time and push them to an S3 bucket as CDC files. Amazon S3 (Landing and Results Buckets):
An S3 bucket is used to store the incoming CDC files from DMS. Another bucket stores the transformed results after Glue job processing.
AWS Lambda: A Lambda function is triggered when new files arrive in the landing bucket. It identifies the file and starts a Glue job for further processing.
AWS Glue Job: The Glue job performs transformations on the CDC data. Final results are written back to the results bucket in S3.
IAM Roles: IAM roles ensure secure access between services (e.g., RDS to DMS, Lambda, and Glue). Monitoring:
CloudWatch monitors logs and performance for Lambda, Glue, and DMS tasks.
Creating the RDS MySQL Instance
Before creating the database, we need to configure a custom parameter group. This allows us to enable binary logging, which is critical for the CDC process.
(Step-by-step visuals and narration):
- Click on Parameter Groups on the left menu and select Create Parameter Group.
- Choose
MySQL
as the parameter group family and give it a name likecdc-mysql-parameters
. - Once created, find your parameter group in the list and click Edit Parameters.
- Search for the parameter
binlog_format
and set it toROW
. This ensures that every row change is logged, which DMS requires for CDC. - Save your changes, and the parameter group is ready.
Now, let’s create the RDS instance and attach our parameter group to it.
- Go back to the RDS dashboard and click Create Database.
- For the engine, select MySQL, and choose the version compatible with DMS (e.g.,
9.x
). - Under the database creation method, select Standard Create.
- In the settings section, enter a unique DB instance identifier like
cdc-demo-instance
. - Specify your username and password for the admin account.
- For the instance size, choose
db.t3.micro
for this demo, as it’s cost-efficient and sufficient for small workloads. - Under Additional Configurations, select the parameter group you just created:
cdc-mysql-parameters
. - Enable backups and logging if needed, then click Create Database."
It will take a few minutes for the instance to be created. Once the status changes to Available
, your RDS MySQL database is ready for use.
Setting Up S3 Buckets
In this step, we’ll create two S3 buckets. The first will store the source data from our RDS MySQL database. The second bucket will hold the processed results from our Glue job.
Let’s start by creating the source bucket. Navigate to the S3 service from the AWS Management Console.
- Click Create Bucket.
- Provide a unique bucket name. For example,
cdc-s3-source-bucket
. Make sure to follow AWS bucket naming conventions. - Select the desired AWS region. For best performance, choose the same region as your RDS and Glue services.
- Leave other settings as default for now, and click Create Bucket.
Our source bucket is now ready. This is where AWS DMS will replicate the data changes.
Next, we’ll create a second bucket for storing the results processed by AWS Glue.
- Click Create Bucket again.
- Enter a name like
cdc-s3-result-bucket
. This bucket will hold the transformed data after Glue processing. - Ensure it’s in the same AWS region as the source bucket and other services.
- Optionally, enable versioning for added safety. This allows you to keep track of changes to your results over time.
- Click Create Bucket to finish.
In the next step, we’ll set up AWS DMS to start replicating changes from our RDS MySQL instance to the source bucket.
Creating DMS Endpoints and IAM Role
In this step, we’ll create the necessary endpoints for AWS Database Migration Service (DMS) to transfer data from our RDS MySQL instance to the S3 source bucket. We’ll also configure the IAM role for DMS to access both the source and target services securely.
Create the Source Endpoint (for RDS MySQL)
Let’s start by creating the source endpoint. In the AWS Management Console, navigate to the DMS service.
- Click on Endpoints under the Database Migration section on the left-hand menu.
- Click Create endpoint to begin setting up the source endpoint for our RDS MySQL instance.
- Select Source endpoint as the endpoint type.
- For Endpoint identifier, give it a name like
rds-source-endpoint
. - Select MySQL for the database engine since we’re using RDS MySQL as our source.
- Enter the Server name, Port (default 3306), and Database name from our RDS instance.
- For the Username and Password, use the credentials we created earlier in the RDS setup.
- Test the connection to ensure the configuration is correct.
- Click Create endpoint once the test is successful.
Create IAM Role for Target Endpoint
Next, we’ll create an IAM role that will allow DMS to access the target S3 bucket. This IAM role needs the appropriate permissions to read and write to our S3 buckets.
- In the AWS Console, navigate to the IAM service.
- Click Roles on the left menu and then click Create role.
- Choose AWS service as the trusted entity type and select DMS as the service that will use this role.
- Next, attach the necessary policies. We’ll attach the AmazonS3FullAccess policy for full access to our S3 buckets.
- Optionally, you can create a custom policy with more restrictive permissions to limit access to only the buckets we created earlier.
- Give the role a name, like
dms-s3-access-role
, and click Create role.
Create the Destination Endpoint (for S3)
[Scene 4: Creating the Destination Endpoint]
Now, let’s create the destination endpoint, which is the S3 bucket where DMS will place the replicated data.
- Go back to the DMS console and navigate to Endpoints.
- Click Create endpoint again to set up the target endpoint.
- This time, select Target endpoint as the endpoint type.
- For the Endpoint identifier, enter rds-target-endpoint`.
- Select S3 as the target database engine.
- Under Bucket name, select the S3 bucket where the data will be written. This should be the
cdc-s3-source-bucket
we created earlier. - For IAM role, choose the IAM role we created in the previous step,
dms-s3-access-role
. - Click Create endpoint to complete the process.
Now, we have successfully created both the source and destination endpoints for DMS, as well as the IAM role that provides DMS the necessary permissions to interact with our S3 buckets.
In the next step, we’ll set up the replication instance in DMS and start the data migration process.
Creating the DMS Replication Instance
Now that we have configured the endpoints, it’s time to create the DMS replication instance. The replication instance will handle the actual data migration between our source RDS MySQL database and the target S3 bucket.
Creating the DMS Replication Instance
Let’s start by creating a replication instance in the AWS DMS service. From the DMS console, click on Replication instances under the Database migration section.
- Click on the Create replication instance button.
- Give your replication instance a name, like
dms-replication-instance
. - Under Replication instance class, choose an instance size that fits your migration needs. For small to medium datasets, a dms.r5.large instance should be sufficient.
- For Allocated storage, select the amount of storage you need. The default of 100 GB is usually enough, but you can adjust based on your dataset.
- Choose the VPC where your source RDS instance resides. You’ll likely use the default VPC, but ensure it’s the same network as your source database.
- Leave Multi-AZ set to No for now, unless you need high availability for your replication instance.
- Select Publicly accessible to allow the replication instance to connect to resources outside your VPC, like S3.
- Under Replication instance settings, choose the IAM role that DMS will use to interact with AWS services, which should be the same role we configured in the previous step.
- Click Create replication instance to launch the instance.
Once you click Create, the replication instance will start being provisioned. This might take a few minutes, so be patient as AWS spins up the instance.
When the status of the replication instance changes to Available, it means that it is ready for use and we can start the migration task.
Now that the replication instance is ready, in the next step, we’ll configure and start the actual migration task to move data from the RDS MySQL instance to our S3 bucket.
Connecting and Loading Data to RDS MySQL Database Using MySQL Workbench
Now that we’ve set up the RDS instance and replication, it’s time to connect to the database from MySQL Workbench. In this step, we’ll walk through adding a security rule, connecting to the database, and loading some sample data.
Adding Inbound Rule to Default Security Group
First, we need to ensure that our RDS instance is accessible from our local machine. To do that, we’ll modify the inbound rules of the security group attached to the RDS instance.
Adding the Inbound Rule
- Go to the EC2 Dashboard, and click on Security Groups in the left-hand menu.
- Find the security group associated with your RDS instance, which you can find in the VPC section of the RDS dashboard.
- Click on the security group, then go to the Inbound rules tab.
- Click on Edit inbound rules, and add a new rule:
- Type: MySQL/Aurora
- Protocol: TCP
- Port range: 3306
- Source: Select My IP to allow connections from your local machine.
- Click Save rules to apply the changes.
Now that we’ve updated the security group, we can securely connect to the RDS instance from MySQL Workbench.
Connecting to RDS Instance via MySQL Workbench
Open MySQL Workbench on your local machine. We’ll create a new connection to the RDS MySQL instance.
- Click on the + sign next to MySQL Connections to create a new connection.
- In the Connection Name field, give your connection a name, such as
RDS-MySQL-Connection
. - In the Hostname field, enter the Endpoint of the RDS instance. You can find this in the Connectivity & security tab of your RDS instance.
- For Port, enter
3306
, which is the default MySQL port. - Enter the Username (for example,
admin
), and provide the password that you created during the RDS setup. - Click Test Connection to make sure everything is configured correctly.
- If the connection is successful, click OK to save it.
Now that we’re connected, we can start working with the database.
Creating a Schema, Table, and Loading Dummy Data
To get started, let’s create a new schema to store our data. In the Navigator panel, click on Schemas, then right-click and choose Create Schema.
- Name the schema
cdc_schema
(or something meaningful for your use case). Click Apply to create the schema. - Click Finish to confirm the creation of the schema.
[Scene 6: Creating a Table in the Schema]
(Screen recording showing SQL script for creating a table):
Voiceover:
Now, let’s create a table inside the cdc_schema
schema. Use the following SQL command to create a simple Information
table.
USE cdc_schema;
CREATE TABLE Information (
id INT AUTO_INCREMENT PRIMARY KEY,
FullName VARCHAR(255),
City VARCHAR(255)
);
Voiceover:
Run the SQL script to create the Information
table.
[Scene 7: Inserting Dummy Data]
(Screen recording of inserting data into the table):
Voiceover:
Next, we’ll insert some dummy data into the Information
table. Use the following SQL command to insert a few rows.
<!-- Create Schema -->
CREATE SCHEMA data_schema;
CREATE TABLE data_schema.Information (
UserID int,
Name varchar(255),
JobDesc varchar(255),
PRIMARY KEY (UserID)
);
<!-- Insert Data -->
INSERT INTO data_schema.Information VALUES (1,'John Doe','Software Engineer');
INSERT INTO data_schema.Information VALUES (2,'Jane Doe','DevOps Engineer');
<!-- Check it -->
Select count(*) from data_schema.Information;
Creating DMS Task
In this step, we’ll be creating the AWS DMS task that will allow us to migrate the data from our source database in RDS to our target destination, typically an S3 bucket or another RDS instance. Let’s go through the process of setting up this DMS task.
First, navigate to the AWS DMS (Database Migration Service) in the AWS Management Console. You can search for DMS in the search bar and select it.
Creating a New Migration Task
- Once you’re in the DMS Dashboard, click on Database migration tasks on the left sidebar.
- Click on the Create task button to start creating your new DMS task.
Configuring the DMS Task Settings
Enter a Task identifier for the migration task. This is a unique name that will help you identify the task later. For example, cdc-migration-task
.
Select the Replication instance that you created in Step 6. This instance will handle the actual migration process.
Next, we’ll select the Source and Target endpoints. For the Source endpoint, select the RDS endpoint that you created earlier in Step 5. And for the Target endpoint, select the S3 or any other destination you set up.
Now, select the Migration type. If you are doing a one-time migration, choose Full load. If you need continuous replication with change data capture, select Full load and ongoing replication.
Next, you can configure Table mappings if you need to migrate specific tables or schemas. This step is optional and can be skipped if you want to migrate everything.
- Click on Add new mapping rule.
- Select the Schema and Table you want to include in the migration. If you want to migrate all tables, leave it blank.
After reviewing all the settings, click on the Create task button at the bottom to create your migration task. The task will appear in your Tasks list, and it will start running automatically.
You can monitor the progress of your task from the Task dashboard. Here, you’ll see the status of the migration, whether it’s running or completed, and if any issues arise, you’ll get detailed logs.
That’s it for setting up the DMS task. The data migration process is now running, and you can check back to see the progress. In the next step, we’ll validate the migrated data and ensure everything is correctly transferred.
Create Lambda Function
In this step, we’ll be creating an AWS Lambda function that will automatically trigger when a file is uploaded to our S3 bucket. The Lambda function will use the file name and S3 bucket details to start the data migration process. Let’s walk through the process.
Creating the IAM Role for Lambda Before creating the Lambda function, we first need to create an IAM role that grants the Lambda function necessary permissions. Go to the IAM Management Console, and under Roles, click Create role.
Choose Lambda as the trusted entity since we’re creating a role for a Lambda function.
Next, assign the required permissions. To allow the Lambda function to interact with S3, we’ll attach the AmazonS3ReadOnlyAccess policy. Additionally, if your Lambda needs to interact with other services like Glue or DMS, make sure to assign those permissions as well.
Give the role a name, for example, lambda-s3-role
, and click Create role. This role will be associated with the Lambda function to enable it to perform its tasks.
Creating the Lambda Function
Now, let’s create the Lambda function. Go to the Lambda Console and click Create function.
Choose the Author from Scratch option. Give your function a name like cdc-file-trigger
. For the runtime, choose Python 3.9 or your preferred version.
Under Permissions, select the Choose an existing role option, and choose the IAM role we created earlier (lambda-s3-role
). This will give the Lambda function access to the S3 bucket and other services.
Adding a Trigger to Lambda
Next, let’s configure a trigger for our Lambda function. We’ll use S3 as the trigger, so whenever a file is uploaded to a specific S3 bucket, the Lambda function will be triggered.
Select S3 from the trigger options. Choose the S3 bucket you want to trigger the Lambda function on, for example, cdc-s3-source-bucket
. We’ll set it up to trigger on ObjectCreated (All) events.
If you only want to trigger the Lambda function for certain files, such as files with a .csv
extension, you can add an event filter like this: *.csv
.
Now, let’s write the Lambda function code. This code will capture the S3 event and extract the bucket name and file name, which will be passed as part of the event.
Writing Lambda Code
Here’s the basic Lambda code you’ll use to get the S3 bucket name and the file name from the event.
import json
import boto3
def lambda_handler(event, context):
# Extract bucket name and file name from the S3 event
bucketName = event["Records"][0]["s3"]["bucket"]["name"]
fileName = event["Records"][0]["s3"]["object"]["key"]
print(f"Bucket Name: {bucketName}, File Name: {fileName}")
# Initialize the Glue and S3 clients
glue = boto3.client('glue')
s3_client = boto3.client('s3')
# Get the latest folder in the result bucket
result_bucket = 'cdc-s3-result-bucket'
latest_folder = ''
try:
result = s3_client.list_objects_v2(Bucket=result_bucket, Prefix='', Delimiter='/')
if 'CommonPrefixes' in result:
folder_list = [x['Prefix'] for x in result['CommonPrefixes']]
if folder_list:
latest_folder = sorted(folder_list)[-1]
print(f"Latest Folder: {latest_folder}")
else:
print("No folders found in the result bucket.")
else:
print("No CommonPrefixes found. Result bucket might be empty or lacks folder structure.")
except Exception as e:
print(f"Error retrieving the latest folder: {e}")
# Start the Glue job if we have a valid latest_folder
if latest_folder:
try:
response = glue.start_job_run(
JobName='cdc-transformation-job',
Arguments={
'--s3_target_path_key': fileName,
'--s3_target_path_bucket': bucketName,
'--s3_latest_folder': latest_folder
}
)
print(f"Glue job started with response: {response}")
except Exception as e:
print(f"Error starting Glue job: {e}")
else:
print("Latest folder not found; skipping Glue job trigger.")
return {
'statusCode': 200,
'body': json.dumps('Lambda completed')
}
Voiceover:
This code grabs the bucket name and file name from the incoming event and prints them to CloudWatch Logs. You can later use these variables to trigger your data processing pipeline or pass them to other services like AWS Glue or DMS.
Click Deploy to save and deploy your Lambda function. To test it, upload a file to your S3 bucket. The Lambda function will be triggered automatically, and you’ll see the output in CloudWatch Logs.
Now your Lambda function is up and running. It will be triggered every time a file is uploaded to the specified S3 bucket. In the next step, we’ll see how this function can interact with AWS Glue to process the data.
Create Glue Job
Create IAM Role for Glue Job Before creating the Glue job, we need an IAM role that grants it the necessary permissions to read from S3, write to S3, and possibly interact with other services like AWS Redshift or RDS. Go to the IAM Management Console, and click Create role.
Select Glue as the trusted entity. This grants AWS Glue the required permissions.
Attach the following policies to the role:
- AWSGlueServiceRole: This gives Glue permissions to run jobs and interact with other AWS services.
- AmazonS3FullAccess: This allows Glue to read from and write to S3 buckets. If you want to limit access, you can specify the exact S3 buckets for better security.
Name your role, for example, glue-job-role
, and click Create role. This role will be associated with your Glue job to allow it to interact with your S3 buckets and other services.
Create the Glue Job
Next, let’s create the Glue job. In the AWS Glue Console, click Jobs under ETL. Then, click Add job.
Give your job a name, like cdc-transformation-job
. Select Spark as the Job type (since we’ll be using Apache Spark for transformation). Choose Python as the script language.
Under IAM Role, choose the IAM role we just created (glue-job-role
). This grants Glue the required permissions to interact with S3 and perform its tasks.
Next, under Script file name, choose where the script will be stored. You can either store it in an S3 bucket or write it inline. For simplicity, let’s store it in an S3 bucket.
Adding Code to Lambda Function to Trigger Glue Job
Now that the Glue job is created, we need to add code to our Lambda function to trigger the Glue job when a file is uploaded to S3. Let’s go back to our Lambda function.
In the Lambda function, we’ll use the boto3 AWS SDK to start the Glue job.
This code invokes the Glue job cdc-transformation-job
and passes the S3 bucket name and file name as job arguments. The Glue job will process the data based on the parameters provided. [Code is Below With Transformation…]
Adding Transformation Code to Glue Job
Next, let’s add the transformation code in our Glue job. Navigate to the Glue job script editor and paste the following transformation logic. This script will read data from the S3 bucket, transform it, and load it into a destination like Redshift, RDS, or another S3 location.
from awsglue.utils import getResolvedOptions
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from datetime import datetime
# Get arguments passed from Lambda
args = getResolvedOptions(sys.argv, ['s3_target_path_key', 's3_target_path_bucket', 's3_latest_folder'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']
latest_folder = args['s3_latest_folder']
# Create a Spark session
spark = SparkSession.builder.appName("CDC").getOrCreate()
# Define the file paths for input and output
inputFilePath = f"s3://{bucket}/{fileName}"
resultFilePath = f"s3://cdc-s3-result-bucket/{latest_folder}"
finalFilePath = f"s3://cdc-s3-result-bucket/{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
# CDC processing logic
if "LOAD" in fileName:
# For the first load, process the data and write to the final file path
fldf = spark.read.csv(inputFilePath, header=False)
fldf = fldf.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "FullName").withColumnRenamed("_c2", "City")
fldf.write.mode("overwrite").csv(finalFilePath)
else:
# For updates, read the incoming file and existing data, then perform insert/update/delete
udf = spark.read.csv(inputFilePath, header=False)
udf = udf.withColumnRenamed("_c0", "action").withColumnRenamed("_c1", "id").withColumnRenamed("_c2", "FullName").withColumnRenamed("_c3", "City")
ffdf = spark.read.csv(resultFilePath, header=False)
ffdf = ffdf.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "FullName").withColumnRenamed("_c2", "City")
for row in udf.collect():
if row["action"] == 'U': # Update
ffdf = ffdf.withColumn("FullName", when(ffdf["id"] == row["id"], row["FullName"]).otherwise(ffdf["FullName"]))
ffdf = ffdf.withColumn("City", when(ffdf["id"] == row["id"], row["City"]).otherwise(ffdf["City"]))
if row["action"] == 'I': # Insert
insertedRow = [list(row)[1:]]
columns = ['id', 'FullName', 'City']
newdf = spark.createDataFrame(insertedRow, columns)
ffdf = ffdf.union(newdf)
if row["action"] == 'D': # Delete
ffdf = ffdf.filter(ffdf.id != row["id"])
# Write the final result back to S3
ffdf.coalesce(1).write.mode("overwrite").csv(finalFilePath)
This script reads data from the source S3 location, filters it to only include rows where the age
is greater than 30, and then writes the result to a new S3 location (transformed/
). You can customize the transformation logic as needed.
Now that we’ve added the transformation code to the Glue job and integrated it with Lambda, the next step is to test the entire process by uploading a file to S3 and observing the data flow.
First, let’s insert a file into our S3 bucket. Go to the S3 dashboard, open your bucket cdc-s3-source-bucket
, and navigate to the folder where you want to upload the file. In this case, we are working with the cdc_schema/Information
folder.
Click Upload, select the file you want to insert, and then click Upload. This will trigger the S3 event that Lambda listens for, sending the file details to the Lambda function.
Once the file is uploaded, the Lambda function is triggered by the S3 event. The Lambda function then reads the bucket name and file name, then triggers the Glue job. Let’s head over to CloudWatch Logs to see this in action.
Here in the CloudWatch logs, you can see the Lambda function logging the bucket name and file name it received from the S3 event. The Lambda function will then use this data to trigger the Glue job with the relevant parameters.
Once the Glue job is triggered, let’s check the Glue dashboard to ensure that the job is running. Go to the Jobs section in the Glue console, and you should see your cdc-transformation-job
running. You can monitor the progress and check for any errors.
Here, we can see that the job has started successfully, and we can view the logs to track the data transformation process. If there are any issues, the logs will provide helpful error messages.
Once the job completes successfully, the transformed data is written back to S3 in the designated Result folder. Let’s go to the cdc-s3-result-bucket
and check the <latest-folder-name>/
folder to verify that the transformed data is present.
Congratulations! You’ve successfully run the CDC process, from uploading a file to S3, triggering the Lambda function, transforming the data with Glue, and loading it into the target database.
That’s it for today’s article! We’ve covered the entire process of setting up a Continuous Data Capture (CDC) pipeline in AWS, from uploading data to S3 and triggering Lambda, to transforming and loading data using Glue. You’ve also seen how to automate this with DMS for real-time replication and how to integrate it with an RDS instance.
Bye! Until Next Article! Take Care.