Say Goodbye to ETL Headaches: Build Your Cloud-Native Streaming Pipeline with AWS Managed Apache Flink!

Augusto de Nevrezé
16 min readAug 30, 2024

--

Introduction: Why Read This?

Streaming ETL is the backbone of modern data engineering, a topic we’ve explored in depth in our previous post. It empowers organizations to extract value from data in real-time, eliminating the lag associated with traditional batch processing. However, let’s face it: building and maintaining a robust streaming ETL pipeline can be a Herculean task, especially when you’re wrestling with the scale and complexity of cloud environments.

Before you dive headfirst into a DIY solution where you’re juggling scalability issues and drowning in monitoring alerts, take a breath. There’s a smarter way. Enter managed solutions — your ticket to focusing on what really matters: the streaming process itself. Why spend countless hours babysitting infrastructure when you could be crafting elegant data transformations and deriving actionable insights?

In this post, we’re cutting through the complexity and exploring AWS Flink Managed Service. It’s not just another tool; it’s your secret weapon for building cloud-native streaming ETL pipelines without the infrastructure headaches.

You’ll learn how to:

  1. Set up a complete streaming pipeline, from data generation to processing and storage
  2. Leverage PyFlink to write powerful, yet easy-to-understand stream processing jobs
  3. Use maven package system to simplify pyFlink dependencies control.
  4. Integrate with AWS services like Kinesis and S3 for seamless data flow.
  5. Navigate common challenges and apply best practices for cloud-based stream processing.
  6. Monitor and optimize your streaming ETL pipeline for performance and cost-efficiency.

So, if you’re ready to elevate your data engineering skills and harness the power of real-time data processing in the cloud, let’s dive in!

Common Issues and Their Solutions

Let’s get real for a moment: setting up a streaming ETL pipeline isn’t always a smooth ride. I’ve hit a few bumps along the way, and I want to share those with you. My hope is that by talking about these hiccups, I can help you avoid the same headaches. Before jumping to a detailed step-by-step guide, I think it’s useful to showcase the main problems found. So, let’s dive into some of the issues I encountered and how I managed to overcome them.

Handling expired security tokens

I hit a snag right from the start when I realized that my AWS credentials weren’t properly set up in the IDE environment. If you’ve encountered an error log that looks something like this:

Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: The security token included in the request is expired (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredTokenException; Request ID:<quest-id>; Proxy: null) at … 

You’re definitely not alone. These error logs can be pretty overwhelming, often filled with a ton of details from the Java Virtual Machine and various library adapters. The crucial part is identifying the key issue — in this case, the ExpiredTokenException.

To resolve this, you’ll need to refresh your AWS credentials. If you’re using Visual Studio Code (VS Code) and prefer not to install the AWS Toolkit extension, you can manually refresh your credentials in the .vscode/launch.json file. Here’s how to do it:

  1. Open the .vscode/launch.json file.
  2. Add or update the “env” configuration block with your current AWS credentials:

"env": {
"IS_LOCAL": "true",
"AWS_ACCESS_KEY_ID": ,
"AWS_SECRET_ACCESS_KEY": ,
"AWS_SESSION_TOKEN":
}

3. Make sure to include “IS_LOCAL”: “true” as an environment variable. This ensures the code runs correctly on your local machine.

By keeping these credentials up-to-date, you can avoid the frustration of interrupted workflows due to expired tokens. This step might seem minor, but it’s crucial for maintaining a smooth development experience when working with AWS services.

Dealing with timestamp parsing in Flink SQL

Working with timestamps in Flink SQL can be a bit tricky, especially when your data sources generate timestamps in formats that Flink’s parser doesn’t recognize out of the box. I ran into this issue when trying to generate timestamp values with Python like this:

datetime.now(timezone.utc).isoformat()

In contrast, if you format your datetime string with this pattern: %Y-%m-%dT%H:%M:%S.%fZ then you are adhering to the ISO-8601 standard, which Flink can properly interpret as a UTC time.

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/

By making this adjustment in the iot_producer.py, I ensured that when creating the DDL table statement to consume data from the sensor’s topic, the timestamp values would be correctly decoded.

For example, in your input_table creation statement, you should configure the timestamp field accordingly:

S3 sink configuration challenges

Running PyFlink locally comes with its own set of requirements, particularly when you need to interact with AWS services like S3. To enable S3 access in your local PyFlink setup, you’ll need to download and configure the appropriate S3 File System Hadoop plugin.

First, you need to determine the home directory of PyFlink. This is essential because you’ll be copying the plugin file into this directory. To find the PyFlink home directory, run the following command in your terminal. Make sure your virtual environment is activated if you’re using one:

poetry run python -c “import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__)))”

Once you have the PyFlink home directory, the next step is to download the S3 File System Hadoop plugin. For Flink 1.19, you should download the flink-s3-fs-hadoop-1.19.1.jar from the maven repository website (ensure to check for the latest version if there have been updates since August 2024). If you’re using a different Flink version, make sure to download the appropriate plugin for your specific version (see the list of available plugin versions on the Flink downloads page).

After downloading the plugin, copy it into the [flink_home]/lib/ directory. This step is crucial as Flink needs to recognize the S3 file system to interact with S3 buckets effectively.

By setting up the S3 File System Hadoop plugin, you ensure that PyFlink can read from and write to S3 buckets, allowing you to simulate a production environment locally. This setup is ideal for testing and debugging your streaming ETL pipelines without having to deploy them to AWS immediately.

Storing data to multiple sinks

Creating multiple sinks to write data using the Table API can be more challenging than it seems. While it’s relatively straightforward to set up locally, I encountered issues when trying to run the same setup in a managed session. Specifically, I noticed that data wasn’t being written to the S3 bucket as expected.

To do this correctly I’ve found useful information in this StackOverflow question, which led me to the appropriate documentation. The technique involves creating a statement set, as described in the documentation:

Multiple INSERT statements can be executed through the add_insert_sql() method of the StatementSet which can be created by the TableEnvironment.create_statement_set() method. The add_insert_sql() method is a lazy execution, they will be executed only when StatementSet.execute() is invoked.

https://gist.github.com/augustodn/452142f90ba4efa819f9384f2d005f41

Setting Up the Environment

Before we upload the code to the AWS cloud, we’ll start by working locally. Once everything works smoothly we’ll upload the code to AWS. As of the time of writing, the latest Flink version available in the Managed Service is 1.19, combined with Python 3.11. Ensure you meet the following prerequisites:

  • Python 3.11
  • Git client
  • Java Development Kit version 11 (recommended)
  • Apache Maven

If you need to install any of these, refer to the official AWS documentation.

A Managed Service for Apache Flink application typically includes the following components:

  • Runtime properties: Predefined values that influence application behavior without code changes.
  • Sources: Data fed into the application for processing.
  • Transformations: Data processing to filter, enrich, or aggregate the incoming stream.
  • Sinks: One or more destinations for the processed information.

In this case, we’ll consume data from a Kinesis Data Stream topic, filter it to detect IoT devices with temperatures above a certain threshold, and store the messages in two different sinks:

  • Alerts will be stored in a Kinesis Data Stream.
  • Raw messages will be sent to a data lake, specifically to a designated key in an S3 bucket.

This approach is similar to what we did in the previous post, where we stored raw data in a PostgreSQL table and sent alerts to a Kafka topic. One key advantage of storing raw information in the data lake is that it can be processed later. By running a crawler, the table resulting from crawling the bucket can be cataloged in a Glue catalog, making it available for querying using the Athena service.

Creating the Kinesis Data Stream topics

We’re going to create 2 kinesis data stream topics in order to receive and send data from IoT sensors accordingly. The 2 topics will be called as usual, sensors for the incoming data and alerts for the filtered events, after the Flink stream processor.

Step by Step Guide

Alongside these notes you can find detailed information in the README from the repository.

Creating the Kinesis Data Streams

As already mentioned, we’ll create 2 topics, one to receive the data from the IoT sensors and the other to send the alert messages when the temperature is above a certain threshold.

The easiest way to create them is by using the CLI, alternatively you can use the AWS console to do it. If you prefer to do it this way, please refer to this documentation.

aws kinesis create-stream \
--stream-name sensors \
--shard-count 1 \
--region us-east-1

Replace sensors by alerts to create the second Kinesis Data Stream.

Creating a bucket to store the source files

We’re going to create a bucket to place the code for the managed Flink session. As we did it before, we’re going to do it through the CLI, but feel free to use the console as well.

aws s3api create-bucket \
--bucket ka-app-code-[username] \
--region us-east-1

Place your username instead of [username] in the bucket argument. This bucket name is only a suggestion, you can choose the name you consider more relevant for this lab.

Clone the repository

Make sure to clone the repository to your local machine using the following command:

git clone https://github.com/augustodn/pyflink-managed-aws.git

Installing dependencies with poetry

For managing dependencies, I highly recommend using Poetry. It’s an excellent tool for handling Python dependencies, and you can refer to the official documentation if you have any questions or need further guidance. Additionally, using Poetry in conjunction with pyenv is strongly advised. If you haven’t installed pyenv yet, you can follow this guide or refer to the official documentation for setup instructions.

Once you have both tools in place, navigate to your cloned repository and run the following command to install the necessary dependencies. This setup will ensure compatibility with the AWS Flink Managed Service:

poetry install

Generating simulated IoT sensor data

Now that you have already cloned the repo it’s time to start doing real work with it. The first thing we’re going to do is to generate data and put it into the `sensors` Kinesis Data Stream topic. For so, we have to first start by generating IoT packages to the KDS topic by

poetry run python iot_producer.py

You can control the rate of message sending by adjusting the SLEEP_TIME and MAX_WORKERS constants. By default, the system uses 10 threads (workers) and a sleep time of 2 seconds, which results in a total of 300 messages per minute. Reducing the SLEEP_TIME (which can be set to a float) or increasing the number of workers (though I don’t recommend increasing workers) will increase the message production rate and the load on the streaming pipeline components. The default settings are generally sufficient to capture a substantial number of alert events.

Developing the PyFlink job

For this job, we’ll do something similar to what we did before: read data from one topic, filter messages and publish them into another one. At the same time, we’ll store raw messages in a data lake so we can use them for later analysis or dashboarding. For simplicity sake, the initial architecture diagram is shown below.

Storing data permanently in an S3 bucket could be useful for building dashboards or even training a model to predict when IoT devices might fail. This bucket is part of a larger data lake where we keep data from various sources. It can also be the landing zone of a lakehouse using a medallion architecture.

We’re going to break down the code from the multi_sink.py Flink job, where all the stream processing logic is handled in one single file. For this particular lab, we’ll use the SQL API for the transformations, and I’ll walk you through the details of how it all works.

Reading from Kinesis input stream

After the initialization is done and parameters read, the following sentence creates a source table to connect with the proper kinesis data stream.

table_env.execute_sql(
create_input_table(input_table_name, input_stream, input_region, stream_initpos)
)

The create_input_table is a string that contains a DDL SQL statement defining how we’ll consume the data. It specifies the message structure. If you expect the message structure to change, it’s important to plan for this in advance; otherwise, the processor might fail to consume the messages properly from the kinesis topic.

We use a similar approach to define the sink tables, setting them up with their own declarations.

Implementing filtering logic

In this case, there’s no need to implement a function which filters the messages traversing the pipeline, instead we’re using a simple SQL sentence to pick the messages we want, isn’t that cool? 😎. Notice the usage of message.temperature to select a nested element from the raw message coming from the IoT sensors.

In this sentence, we also indicate the destination of the messages, which is parameterized in the output_table_name variable.

Storing raw data in S3 data lake

Raw messages are also stored in an S3 bucket. We have to define it in the application_properties.json in the output.bucket key

{
"PropertyGroupId": "producer.config.1",
"PropertyMap": {
"output.bucket": [datalake-bucket]
}
}

Note: Please change the output.bucket value with a proper string.

In this particular case, since we’re not manipulating the incoming messages, a simple SELECT * is enough.

statement_set.add_insert_sql(
f"INSERT INTO {s3_table_name} SELECT * FROM {input_table_name}" # nosec
)

Finally, to execute all the accumulated sentences we have to actually execute them

table_result = statement_set.execute()

Notice that if you’re running this job locally, it’s important to use the wait method from the instructions applied to the `table_env` object.

if is_local:
table_result.wait()

Running the job locally

You can run the application from the command line as a regular python file through poetry or by using your IDE. Typically the IDE recognizes the poetry environment and takes this into account when running your files. As already mentioned in the handling expired security tokens paragraph, I’ve been using VS Code as IDE and shown you how to set the IS_LOCAL environment variable. Should you use any other IDE, please check how it controls the environment variables. Please notice that you’ll have to declare it even if you’re running the application through the console. Then you can run the job with the following command:

mvn package
poetry run python multi_sink.py

Notice that we’re using the maven package system, this will generate a ”big-fat-jar” file with all the project’s needed dependencies. You can find similar information in the following AWS guide under the “Run your Python application locally”

You can check the application logs by running the following command.

poetry run python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"

However, these logs are quite basic. For more advanced logging and monitoring, you might find tools like the one provided by Datorios more helpful.

Setting up AWS Managed Apache Flink application

After the previous walkthrough around the Flink job it’s time to set-up the managed application in AWS. You can find more information in the previous referenced guide, but for simplicity the steps will be shown here.

Upload the application package to the S3 bucket

  1. Open the Amazon S3 console at https://console.aws.amazon.com/s3/.
  2. Choose the bucket you previously created for the application code.
  3. Choose Upload.
  4. Choose Add files.
  5. Navigate to the .zip file generated in the previous step: target/multi-sink-0.0.1.zip.
  6. Choose Upload without changing any other settings.

Creating the application in AWS console

  1. Open the Managed Service for Apache Flink console at https://console.aws.amazon.com/flink
  2. Verify that the correct Region is selected: US East (N. Virginia)us-east-1.
  3. Open the right-side menu and choose Apache Flink applications and then Create streaming application. Alternatively, choose Create streaming application from the Get started section of the initial page.
  4. On the Create streaming applications page:
    a. For “Choose a method to set up the stream processing application”, choose Create from scratch.
    b. For Apache Flink configuration, Application Flink version, choose Apache Flink 1.19.
  5. For Application configuration:
    a. For Application name, enter multi-sink-app.
    b. For Description, enter an appropriate description for the Flink job.
    c. In Access to application resources, choose Create / update IAM role kinesis-analytics-multi-sink-app-us-east-1 with required policies.
  6. For Template for applications settings:
    a. For Templates, choose Development.
  7. Choose Create streaming application.

Let the console setup create the IAM role and policy for you. The policy and role name should be similar to these ones:

  • Policy: kinesis-analytics-service-multi-sink-app-us-east-1
  • Role: kinesis-analytics-multi-sink-app-us-east-1

Configuring permissions and roles

Edit the IAM policy to add permissions to access the Amazon S3 bucket. The easiest way to access it is from the recently created Flink application.

  1. Head to Application Details.
  2. Click on the IAM role link to edit the permissions.

In the IAM role details page, you’ll see a custom policy recently created. Since this is a demo, we’re going to add global permissions to the services that we’ll be using. For so, add the followings:

  • AmazonKinesisAnalyticsFullAccess
  • AmazonKinesisFullAccess
  • AmazonS3FullAccess
  • CloudWatchFullAccess
  • CloudWatchLogsFullAccess

You can review the official documentation to add more restrictive policies for a production environment. Check the Edit the IAM policy section.

Configuring the Application

We’re going to set up the same parameters as the ones given in the application_properties.json local file for the current application in the AWS console. This will store the parameters in the `/etc/flink/` folder, under the same filename. For so,

  1. On the multi-sink-app page, choose Configure.
  2. In the Application code location section:
    a. For the Amazon S3 bucket, select the bucket you previously created for the application code. Choose Browse and select the correct bucket, then choose Choose. Don’t select the bucket name.
    b. For Path to Amazon S3 object, enter multi-sink-0.0.1.zip

3. For Access permissions, choose Create / update IAM role kinesis-analytics-multi-sink-app-us-east-1 with required policies.

4. Move to Runtime properties and keep the default values for all other settings.

5. Choose Add new item and add each of the following parameters:

Starting the Flink application

  1. On the console for Amazon Managed Service for Apache Flink, choose multi-sink-app and choose Run.
  2. On the next page, the Application restore configuration page, choose Run with the latest snapshot and then choose Run.

The Status in Application details transitions from Ready to Starting and then to Running when the application has started.

Monitoring and verifying the results

You can monitor the configuration and check the Flink dashboard once the application is in running mode.

  1. Check the status value inside the application page to validate that the application is running.
  2. Under the Monitoring tab you can check the Metrics and Logs of the application.

Using the Flink Dashboard

You can monitor the Flink dashboard by clicking on the Open Apache Flink dashboard button inside the app details page. This will open a standard Apache Flink Dashboard.

  1. Check if the service is running OK.
  2. If you haven’t done it before, start the iot_producer.py from your local to start sending messages.
  3. Check if the messages are being processed in Flink by entering in the Jobs/Running Jobs page.

It should look like this

Checking the Kinesis output stream and S3 bucket

You can alternatively check data in the alerts topic. For so,

  1. Open the Kinesis console at https://console.aws.amazon.com/kinesis.
  2. Verify that the Region is the same as the one you are using to run this tutorial. By default, it is us-east-1US East (N. Virginia). Change the Region if necessary.
  3. Choose Data Streams.
  4. Select the alerts stream.
  5. Choose the Data viewer tab.
  6. Select any Shard (it should be only one). Keep Latest as Starting position, and then choose Get records. You might see a “no record found for this request” error. If so, choose Retry getting records. The newest records published to the stream display.
  7. Select the value in the Data column to inspect the content of the record in JSON format.

You should see something similar to this

Stopping the Application

  1. From the Action dropdown list, choose Stop.
  2. The Status in Application details transitions from Running to Stopping, and then to Ready when the application is completely stopped.

Conclusion

Recap of Accomplishments

In this post, we successfully built and deployed a cloud-native ETL pipeline using AWS Flink Managed Service. We started by setting up the necessary infrastructure, including Kinesis Data Streams and S3 buckets, followed by writing and testing a PyFlink job locally. We then deployed this job in a managed environment, allowing it to process real-time data from IoT sensors, filter critical events, and store the results in multiple destinations. Along the way, we tackled common challenges such as handling expired security tokens, parsing timestamps correctly, and managing S3 sink configurations. In this particular occasion we have also leveraged from the maven package capabilities to zip the project and put all the necessary dependencies inside a single jar file. This helps to automate library dependencies management.

Benefits of Using AWS Flink Managed Service

AWS Flink Managed Service streamlines the complexities associated with managing and scaling real-time data processing pipelines. By offloading the infrastructure management to AWS, you can focus on developing your stream processing logic without worrying about the underlying architecture. The service seamlessly integrates with other AWS offerings like Kinesis and S3, enabling efficient data flow and storage. Moreover, it offers built-in monitoring and optimization tools that help you maintain cost-efficiency and high performance, making it a robust solution for modern data engineering needs. However, this is not a complete observability tool set.

Next Steps

Now that we’ve successfully built and deployed our streaming ETL pipeline using AWS Flink Managed Service, it’s time to extend our capabilities by testing window events. Windowing is a powerful concept in stream processing that allows us to aggregate or analyze data over specific time intervals, which is crucial for detecting trends, spikes, and patterns in real-time data streams.

In the upcoming steps, we will:

  1. Implement Tumbling Windows: We’ll configure tumbling windows to capture events within fixed time intervals, enabling us to perform operations like counting occurrences, calculating averages, or detecting anomalies within each window.
  2. Simulate and Monitor Events: To ensure our windowing strategy is effective, we’ll generate simulated IoT sensor data and monitor how the pipeline processes these events. We’ll focus on the accuracy and efficiency of event aggregation and alert generation. To help with this task we’re going to use Datorios observability tools.
  3. Optimize Window Performance: Finally, we’ll review and optimize the performance of our windowed operations, tweaking parameters to balance throughput and latency for our specific use case.

By testing these window events, we’ll unlock new insights from our data stream, making our pipeline even more robust and responsive to real-time conditions.

Building streaming pipelines can be tricky, but you don’t have to do it alone. At SquadraLabs, we’ve got a team of specialists ready to help you tackle your project and make the most of your data. If you’re looking for expert guidance or just want to chat about your options, feel free to reach out to us!

--

--

Augusto de Nevrezé
Augusto de Nevrezé

Written by Augusto de Nevrezé

I write about Data Science, AI, ML & DL. I’m electronics engineer. My motto: “Per Aspera Ad Astra”. Follow me in twtr @augusto_dn

No responses yet