Create a real-time alerting solution - Aiven console#
Warning
As with many beta products, we are currently redesigning the Aiven for Apache Flink® experience, APIs, and CLI calls.
We are also updating all examples in the documentation to reflect these changes. During this process, you may encounter error messages as we introduce new improvements.
This article demonstrates how to integrate Aiven for Apache Flink® with Aiven for Apache Kafka®, Aiven for PostgreSQL®, and Aiven for OpenSearch® to create real-time alerting data for CPU loads.
Architecture overview#
This example involves creating an Apache Kafka® source topic that provides a stream of metrics data, a PostgreSQL® database that contains data on the alerting thresholds, and an Apache Flink® service that combines these two services and pushes the filtered data to a separate Apache Kafka® topic, PostgreSQL® table or OpenSearch® index.
This article includes the steps you need when using the Aiven web console and a few different samples of how you can set thresholds for alerts. For connecting to your PostgreSQL® service, this example uses the Aiven CLI calling psql, but you can also use other tools if you prefer.
In addition, the instructions show you how to use a separate Python-based tool, Dockerized fake data producer for Aiven for Apache Kafka®, to create sample records for your Apache Kafka® topic that provides the streamed data.
Requirements#
An Aiven account
Dockerized fake data producer for Aiven for Apache Kafka® and Docker to generate sample data (optional)
Set up Aiven services#
Follow these instructions to create these services:
An Aiven for Apache Kafka® service with the Business-4 service plan, named
demo-kafka
This service provides the CPU load input streamsAn Aiven for PostgreSQL® service with the Business-4 service plan, named
demo-postgresql
This service contains the CPU alerting threshold values, and is also used as a data sinkAn Aiven for OpenSearch® service with the Business-4 service plan, named
demo-opensearch
This service is going to contain filtered CPU data streams for further analysis and visualizationAn Aiven for Apache Flink® service with the Business-4 service plan, named
demo-flink
This service defines the data transformation and aggregation pipelines
Select the
demo-kafka
service and change the following settings on the Overview page:Kafka REST API (Karapace) > Enable
Note
The Kafka REST API enables you to manage Apache Kafka via REST APIs and also to view the data in your Apache Kafka® topics.
Advanced configuration > Add configuration option >
kafka.auto_create_topics_enable
, switch the setting on and then click Save advanced configurationNote
The
kafka.auto_create_topics_enable
setting allows you to create new Apache Kafka® topics as you configure your Apache Flink® data tables, so that you do not need to create the topics in advance.
Select the
demo-flink
service and add the data service integration:Click Get started on the banner at the top of the Overview page.
Select Aiven for Apache Kafka® and then select the
demo-kafka
service.Click Integrate.
Click the + icon under Data Flow.
Select Aiven for PostgreSQL® and then select the
demo-postgresql
service.Click Integrate.
Click the + icon under Data Flow.
Select Aiven for OpenSearch and then select the
demo-opensearch
service.Click Integrate.
Set up sample data#
These steps show you how to create sample records to provide streamed data that is processed by the data pipelines presented in this tutorial. You can also use other existing data, although many of the examples in this article are based on this sample data.
Before you start, clone the Dockerized fake data producer for Aiven for Apache Kafka® Git repository to your computer.
Follow these instructions to create an authentication token for your Aiven account.
This is required to allow the tool to connect to a service in your Aiven account.
Go to the data producer tool directory and copy the
conf/env.conf.sample
file toconf/env.conf
.Edit the
conf/env.conf
file and update the parameters with your Aiven account information and the authentication token that you created.Set
TOPIC
to becpu_load_stats_real
, and setNR_MESSAGES
to be0
.Note
The
NR_MESSAGES
option defines the number of messages that the tool creates when you run it. Setting this parameter to0
creates a continuous flow of messages that never stops.See the instructions for the tool for details on the parameters.
Run the following command to build the Docker image:
docker build -t fake-data-producer-for-apache-kafka-docker .
Run the following command to run the Docker image:
docker run fake-data-producer-for-apache-kafka-docker
This command pushes the following type of events to the
cpu_load_stats_real
topic in your Apache Kafka® service:{"hostname": "dopey", "cpu": "cpu4", "usage": 98.3335306302198, "occurred_at": 1633956789277} {"hostname": "sleepy", "cpu": "cpu2", "usage": 87.28240549074823, "occurred_at": 1633956783483} {"hostname": "sleepy", "cpu": "cpu1", "usage": 85.3384018012967, "occurred_at": 1633956788484} {"hostname": "sneezy", "cpu": "cpu1", "usage": 89.11518629380006, "occurred_at": 1633956781891} {"hostname": "sneezy", "cpu": "cpu2", "usage": 89.69951046388306, "occurred_at": 1633956788294}
Create a pipeline for basic filtering#
The first example filters any instances of high CPU load based on a fixed threshold and pushes the high values into a separate Apache Kafka® topic.
You need to configure:
A source table to read the metrics data from your Apache Kafka® topic
A sink table to send the processed messages to a separate Apache Kafka® topic
A Flink application to process the data
To create the filtering data pipeline you can follow the steps below:
In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.
Click Create new application to create your Flink application.
In the Add source table, create the source Apache Kafka® table by selecting the related integration and pasting the following SQL:
CREATE TABLE CPU_IN ( hostname STRING, cpu STRING, usage DOUBLE, occurred_at BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3), WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '', 'topic' = 'cpu_load_stats_real', 'value.format' = 'json', 'scan.startup.mode' = 'earliest-offset' )
In the Add sink table, create the sink Apache Kafka® table by selecting the related integration and pasting the following SQL:
CREATE TABLE CPU_OUT_FILTER ( time_ltz TIMESTAMP(3), hostname STRING, cpu STRING, usage DOUBLE ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '', 'topic' = 'cpu_load_stats_real_filter', 'value.format' = 'json', 'scan.startup.mode' = 'earliest-offset' )
In the Create statement screen, enter
simple_filter
as the job name, selectCPU_IN
andCPU_OUT_FILTER
as the tables. Enter the following as the filtering SQL statement:INSERT INTO CPU_OUT_FILTER SELECT time_ltz, hostname, cpu, usage FROM CPU_IN WHERE usage > 80
Click Save and deploy later and on the application landing screen, click Create deployment.
The new application deployment status will show Initializing and then Running.
When the application is running, you should start to see messages indicating hosts with high CPU loads in the
cpu_load_stats_real_filter
topic of yourdemo-kafka
service.
Create a pipeline with windowing#
The second example aggregates the CPU load over a configured time using windows and event time.
The example reuses the CPU_IN
Apache Kafka® source table previously created. In addition, you need to configure:
A new sink table to send the processed messages to a separate Apache Kafka® topic
A new version of the Flink application to process the data
To create the data pipeline you can follow the steps below:
In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.
Click Create new version and and click Add sink tables.
Create the sink Apache Kafka® table:
Select your Apache Kafka® service.
Enter
cpu_load_stats_agg
as the topic.Select Kafka as the connector type.
Select Key not used as the key.
Select JSON as the value data format.
Enter
CPU_OUT_AGG
as the nameEnter the following as the
CPU_OUT_AGG
SQL schema:window_start TIMESTAMP(3), window_end TIMESTAMP(3), hostname STRING, cpu STRING, usage_avg DOUBLE, usage_max DOUBLE
Click Add table.
In the Create statement screen, enter
simple_agg
as the job name, selectCPU_OUT_AGG
andCPU_IN
as the tables. Enter the following as the filtering SQL statement:INSERT INTO CPU_OUT_AGG SELECT window_start, window_end, hostname, cpu, AVG(usage), MAX(usage) FROM TABLE( TUMBLE( TABLE CPU_IN, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS)) GROUP BY window_start, window_end, hostname, cpu
Click Save and deploy later and on the application landing screen, click Create deployment.
The new application deployment status will show Initializing and then Running.
When the application is running, you should start to see messages indicating hosts with high CPU loads in the
cpu_load_stats_agg
topic of yourdemo-kafka
service.
Create a Flink SQL job using PostgreSQL® thresholds#
The third example defines host-specific thresholds in a PostgreSQL® table. The thresholds table is joined with the inbound stream of CPU measurements by hostname to filter instances of CPU load going over the defined thresholds.
This uses the same CPU_IN
Apache Kafka® source table that you created earlier. In addition, you need to define:
A sink table to send the processed messages to a separate Apache Kafka® topic
A source table to get the PostgreSQL® threshold data
A Flink application to process the data.
To create the data pipeline you can follow the steps below:
Note
For creating and configuring the tables in your PostgreSQL® service, these steps use the Aiven CLI to call psql
. You can instead use other tools to complete these steps if you prefer.
If you haven’t yet logged in to the Aiven CLI, then use the authentication token generated earlier to do so:
avn user login YOUR_EMAIL_ADDRESS --token
The command will prompt for the authentication token.
In the Aiven CLI, run the following command to connect to the
demo-postgresql
service:avn service cli demo-postgresql --project PROJECT_NAME
Enter the following commands to set up the PostgreSQL® table containing the threshold values:
CREATE TABLE cpu_thresholds (hostname VARCHAR, allowed_top INT); INSERT INTO cpu_thresholds VALUES ('doc', 20),('grumpy', 30),('sleepy',40),('bashful',60), ('happy',70),('sneezy',80),('dopey',90);
Enter the following command to check that the threshold values are created:
SELECT * FROM cpu_thresholds;
The output shows you the content of the table:
hostname | allowed_top ---------+------------ doc | 20 grumpy | 30 sleepy | 40 bashful | 60 happy | 70 sneezy | 80 dopey | 90
In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.
Click Create new version and and click Add source tables.
Create the Flink source table pointing to the PostgreSQL® table
Select your PostgreSQL® service
Enter
public.cpu_thresholds
as the tableEnter
SOURCE_THRESHOLDS
as the nameEnter the following as the
SOURCE_THRESHOLDS
SQL schema:hostname STRING, allowed_top INT, PRIMARY KEY (hostname) NOT ENFORCED
click Add table
Click Add sink table and create the Flink sink table pointing to the Apache Kafka® topic:
Select your Apache Kafka® service.
Enter
cpu_load_stats_real_filter_pg
as the topic.Select Kafka as the connector type.
Select Key not used as the key.
Select JSON as the value data format.
Enter
CPU_OUT_FILTER_PG
as the nameEnter the following as the
CPU_OUT_FILTER_PG
SQL schema:time_ltz TIMESTAMP(3), hostname STRING, cpu STRING, usage DOUBLE, threshold INT
Click Add table.
Create the Flink data pipeline joining the stream of CPU measurement with the host specific thresholds to filter high CPU samples. In the Create statement screen,
Enter
simple_filter_pg
as the nameSelect the
CPU_OUT_FILTER_PG
,CPU_IN
, andSOURCE_THRESHOLDS
tablesEnter the following SQL statement to join the tables and filter:
INSERT INTO CPU_OUT_FILTER_PG SELECT time_ltz, CPU.hostname, cpu, usage, allowed_top FROM CPU_IN CPU INNER JOIN SOURCE_THRESHOLDS FOR SYSTEM_TIME AS OF proctime AS ST ON CPU.hostname = ST.hostname WHERE usage > allowed_top
Click Save and deploy later and on the application landing screen, click Create deployment.
The new application deployment status will show Initializing and then Running.
When the job is running, you should start to see messages indicating CPU loads that exceed the PostgreSQL®-defined thresholds in the
cpu_load_stats_real_filter_pg
topic of yourdemo-kafka
service.
Create an aggregated data pipeline with Apache Kafka® and PostgreSQL®#
The fourth example highlights the instances where the average CPU load over a windowed interval exceeds the threshold and stores the results in PostgreSQL®.
This uses the same CPU_IN
Kafka source table and SOURCE_THRESHOLDS
PostgreSQL® source table that you created earlier. In addition, you need to define:
A new sink table to store the processed data in PostgreSQL®
A new Flink application to process the data
To create the data pipeline you can follow the steps below:
Note
For creating and configuring the tables in your PostgreSQL® service, these steps use the Aiven CLI to call psql
. You can instead use other tools to complete these steps if you prefer.
In the Aiven CLI, run the following command to connect to the
demo-postgresql
service:avn service cli demo-postgresql --project PROJECT_NAME
Enter the following command to set up the PostgreSQL® table for storing the results:
CREATE TABLE CPU_LOAD_STATS_AGG_PG ( time_ltz TIMESTAMP(3) PRIMARY KEY, nr_cpus_over_threshold INT );
In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.
Click Create new version and and click Add sink tables.
Create a Flink table to sink data to the PostgreSQL® service
Select your PostgreSQL® service
Enter
cpu_load_stats_agg_pg
as the tableEnter
CPU_OUT_AGG_PG
as the nameEnter the following as the
CPU_OUT_AGG_PG
SQL schema:time_ltz TIMESTAMP(3), nr_cpus_over_threshold BIGINT, PRIMARY KEY (time_ltz) NOT ENFORCED
Click Add table
Create the Flink data pipeline calculating the CPU average over the time window and checking the value against the thresholds. In the Create statement screen,
Enter
simple_filter_pg_agg
as the nameSelect the
CPU_OUT_AGG_PG
,CPU_IN
, andSOURCE_THRESHOLDS
tablesEnter the following SQL to join the tables, calculate the average over a window and filter the high CPU average values:
INSERT INTO CPU_OUT_AGG_PG WITH JOINING_INFO AS( SELECT time_ltz, CPU.hostname, cpu, usage, allowed_top FROM CPU_IN CPU INNER JOIN SOURCE_THRESHOLDS FOR SYSTEM_TIME AS OF proctime AS ST ON CPU.hostname = ST.hostname ), WINDOWING AS ( SELECT window_start, window_end, hostname, cpu, AVG(usage) USAGE, allowed_top FROM TABLE(TUMBLE(TABLE JOINING_INFO, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS)) GROUP BY window_start, window_end, hostname, cpu, allowed_top ) SELECT window_start, COUNT(*) FROM WINDOWING WHERE USAGE > allowed_top GROUP BY window_start
Click Save and deploy later and on the application landing screen, click Create deployment.
The new application deployment status will show Initializing and then Running.
When the job is running, you should start to see entries indicating hosts with high CPU loads in the
cpu_load_stats_agg_pg
table of yourdemo-postgresql
database.
Replicate the filter stream of data to OpenSearch® for further analysis and data visualization#
The last example takes the list of filtered high CPU samples contained in the CPU_OUT_FILTER_PG
Flink table and, after filtering for only the happy
and sleepy
hostnames, pushes the result to an Aiven for OpenSearch® index for further analysis and data visualization.
This uses the CPU_OUT_FILTER_PG
Flink table defined during the third example containing the list of CPU samples above the host-specific threshold defined in PostgreSQL®. In addition, you need to define:
A new sink table to store the filtered data in OpenSearch®
A new Flink application to process the data
To create the data pipeline you can follow the steps below:
In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.
Click Create new version and and click Add sink tables.
Create a Flink table to sink data to the OpenSearch® service
Select your OpenSearch® service
Enter
cpu_high_load
as the indexEnter
CPU_OUT_OS
as the nameEnter the following as the
CPU_OUT_OS
SQL schema:time_ltz STRING, hostname STRING, cpu STRING, usage DOUBLE, threshold INT
Note
We can reuse a similar definition to the
CPU_OUT_FILTER_PG
Flink table since they share the same columns. The only difference is thetime_ltz
column which is nowSTRING
, as we need to translate the FlinkTIMESTAMP
to the timestamp format accepted by OpenSearch®.Click Add table
Create the Flink data pipeline calculating the CPU average over the time window and checking the value against the thresholds. In the Create statement screen,
Enter
data_filtering_replication
as the nameSelect the
CPU_OUT_FILTER_PG
andCPU_OUT_OS
tablesEnter the following SQL to select from the source table, filter
happy
andsleepy
hostnames and push the data toCPU_OUT_OS
:INSERT INTO CPU_OUT_OS SELECT DATE_FORMAT(time_ltz, 'yyyy/MM/dd hh:mm:ss'), hostname, cpu, usage, threshold FROM CPU_OUT_FILTER_PG WHERE hostname in ('happy','sleepy')
The above SQL converts the
local_ltz
field to a string in the formatyyyy/MM/dd hh:mm:ss
which is recognised by OpenSearch as timestamp.
Click Save and deploy later and on the application landing screen, click Create deployment.
The new application deployment status will show Initializing and then Running.
When the job is running, you should start to see entries indicating samples of the
sleepy
andhappy
hostnames with high CPU loads in thecpu_high_load
table of yourdemo-opensearch
OpenSearch service. You can use OpenSearch Dashboard to discover more about the datapoints and build advanced visualizations.