Create a real-time alerting solution - Aiven console#

This tutorial shows you an example of how to combine Aiven for Apache Flink® with Aiven for Apache Kafka®, Aiven for PostgreSQL® and Aiven for OpenSearch® services to create a solution that provides real-time alerting data for CPU loads.

Architecture overview#

This example involves creating an Apache Kafka® source topic that provides a stream of metrics data, a PostgreSQL® database that contains data on the alerting thresholds, and an Apache Flink® service that combines these two services and pushes the filtered data to a separate Apache Kafka® topic, PostgreSQL® table or OpenSearch® index.

graph LR; id1(Kafka)-- metrics stream -->id3(Flink); id2(PostgreSQL)-- threshold data -->id3; id3-. filtered data .->id4(Kafka); id3-. filtered/aggregated data .->id5(PostgreSQL); id3-. filtered data .->id6(OpenSearch);

This article includes the steps that you need when using the Aiven web console along with a few different samples of how you can set thresholds for alerts. For connecting to your PostgreSQL® service, this example uses the Aiven CLI calling psql, but you can also use other tools if you prefer.

In addition, the instructions show you how to use a separate Python-based tool, Dockerized fake data producer for Aiven for Apache Kafka®, to create sample records for your Apache Kafka® topic that provides the streamed data.

Requirements#

Set up Aiven services#

  1. Follow the steps in this article to create these services:

    • An Aiven for Apache Kafka® service with the Business-4 service plan, named demo-kafka This service provides the CPU load input streams

    • An Aiven for PostgreSQL® service with the Business-4 service plan, named demo-postgresql This service contains the CPU alerting threshold values, and is also used as a data sink

    • An Aiven for OpenSearch® service with the Business-4 service plan, named demo-opensearch This service is going to contain filtered CPU data streams for further analysis and visualization

    • An Aiven for Apache Flink® service with the Business-4 service plan, named demo-flink This service defines the data transformation and aggregation pipelines

  2. Select the demo-kafka service and change the following settings on the Overview page:

    • Kafka REST API (Karapace) > Enable

      Note

      The Kafka REST API enables you to manage Apache Kafka via REST APIs and also to view the data in your Apache Kafka® topics.

    • Advanced configuration > Add configuration option > kafka.auto_create_topics_enable, switch the setting on and then click Save advanced configuration

      Note

      The kafka.auto_create_topics_enable setting allows you to create new Apache Kafka® topics as you configure your Apache Flink® data tables, so that you do not need to create the topics in advance.

  3. Select the demo-flink service and add the service integrations:

    1. Click Get started on the banner at the top of the Overview page.

    2. Select Aiven for Apache Kafka® and then select the demo-kafka service.

    3. Click Integrate.

    4. Click the + icon under Data Flow.

    5. Select Aiven for PostgreSQL® and then select the demo-postgresql service.

    6. Click Integrate.

    7. Click the + icon under Data Flow.

    8. Select Aiven for OpenSearch and then select the demo-opensearch service.

    9. Click Integrate.

Set up sample data#

These steps show you how to create sample records to provide streamed data that is processed by the data pipelines presented in this tutorial. You can also use other existing data, although many of the examples in this tutorial are based on the use of this sample data.

Before you start, clone the Dockerized fake data producer for Aiven for Apache Kafka® Git repository to your computer.

  1. Follow these instructions to create an authentication token for your Aiven account.

    This is required to allow the tool to connect to a service in your Aiven account.

  2. Go to the data producer tool directory and copy the conf/env.conf.sample file to conf/env.conf.

  3. Edit the conf/env.conf file and update the parameters with your Aiven account information and the authentication token that you created.

    Set TOPIC to be cpu_load_stats_real, and set NR_MESSAGES to be 0.

    Note

    The NR_MESSAGES option defines the number of messages that the tool creates when you run it. Setting this parameter to 0 creates a continuous flow of messages that never stops.

    See the instructions for the tool for details on the parameters.

  4. Run the following command to build the Docker image:

    docker build -t fake-data-producer-for-apache-kafka-docker .
    
  5. Run the following command to run the Docker image:

    docker run fake-data-producer-for-apache-kafka-docker
    

    This command pushes the following type of events to the cpu_load_stats_real topic in your Apache Kafka® service:

    {"hostname": "dopey", "cpu": "cpu4", "usage": 98.3335306302198, "occurred_at": 1633956789277}
    {"hostname": "sleepy", "cpu": "cpu2", "usage": 87.28240549074823, "occurred_at": 1633956783483}
    {"hostname": "sleepy", "cpu": "cpu1", "usage": 85.3384018012967, "occurred_at": 1633956788484}
    {"hostname": "sneezy", "cpu": "cpu1", "usage": 89.11518629380006, "occurred_at": 1633956781891}
    {"hostname": "sneezy", "cpu": "cpu2", "usage": 89.69951046388306, "occurred_at": 1633956788294}
    

Create a pipeline for basic filtering#

The first example filters any instances of high CPU load based on a fixed threshold and pushes the high values into a separate Apache Kafka® topic.

graph LR; id1(Kafka source)-- metrics stream -->id2(Flink job); id2-- high CPU -->id3(Kafka sink);

You need to configure:

  • A source table to read the metrics data from your Apache Kafka® topic

  • A sink table to send the processed messages to a separate Apache Kafka® topic

  • A Flink job to process the data

To create the filtering data pipeline you can follow the steps below:

  1. In the Aiven web console, select the Jobs & Data tab in your Aiven for Apache Flink® service.

  2. Go to the Data Tables subtab.

  3. Create the source Apache Kafka® table:

    1. Select your Apache Kafka® service.

    2. Select cpu_load_stats_real as the topic.

    3. Select Kafka as the connector type.

    4. Select Key not used as the key.

    5. Select JSON as the value data format.

    6. Enter CPU_IN as the name

    7. Enter the following as the CPU_IN SQL schema

      hostname STRING,
      cpu STRING,
      usage DOUBLE,
      occurred_at BIGINT,
      proctime AS PROCTIME(),
      time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
      WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
      
    8. Click Create Table.

  4. Create the sink Apache Kafka® table:

    1. Select your Apache Kafka® service.

    2. Enter cpu_load_stats_real_filter as the topic.

    3. Select Kafka as the connector type.

    4. Select Key not used as the key.

    5. Select JSON as the value data format.

    6. Enter CPU_OUT_FILTER as the name

    7. Enter the following as the CPU_OUT_FILTER SQL schema:

      time_ltz TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage DOUBLE
      
    8. Click Create Table.

  5. Go to the Create SQL Job subtab.

  6. Enter simple_filter as the job name, select CPU_IN and CPU_OUT_FILTER as the tables.

  7. Enter the following as the filtering SQL statement:

    INSERT INTO CPU_OUT_FILTER 
    SELECT 
        time_ltz, 
        hostname, 
        cpu, 
        usage 
    FROM CPU_IN 
    WHERE usage > 80
    
  8. click Execute job

    The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

    When the job is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_real_filter topic of your demo-kafka service.

Create a pipeline with windowing#

The second example aggregates the CPU load over a configured time using windows and event time.

graph LR; id1(Kafka source)-- timestamped metrics -->id3(Flink job); id3-- 30-second average CPU -->id4(Kafka sink);

The example reuses the CPU_IN Apache Kafka® source table previously created. In addition, you need to configure:

  • A new sink table to send the processed messages to a separate Apache Kafka® topic

  • A new Flink job to process the data

To create the data pipeline you can follow the steps below:

  1. In the Aiven web console, select the Jobs & Data tab in your Aiven for Apache Flink® service.

  2. Go to the Data Tables subtab.

  3. Create the sink Apache Kafka® table:

    1. Select your Apache Kafka® service.

    2. Enter cpu_load_stats_agg as the topic.

    3. Select Kafka as the connector type.

    4. Select Key not used as the key.

    5. Select JSON as the value data format.

    6. Enter CPU_OUT_AGG as the name

    7. Enter the following as the CPU_OUT_AGG SQL schema:

      window_start TIMESTAMP(3),
      window_end TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage_avg DOUBLE,
      usage_max DOUBLE
      
    8. Click Create Table.

  4. Go to the Create SQL Job subtab.

  5. Enter simple_agg as the job name, select CPU_OUT_AGG and CPU_IN as the tables.

  6. Enter the following as the filtering SQL statement:

    INSERT INTO CPU_OUT_AGG
    SELECT 
        window_start,
        window_end, 
        hostname, 
        cpu, 
        AVG(usage), 
        MAX(usage)
    FROM 
        TABLE( TUMBLE( TABLE CPU_IN, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
    GROUP BY 
        window_start,
        window_end, 
        hostname, 
        cpu
    
  7. Click Execute job.

    The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

    When the job is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_agg topic of your demo-kafka service.

Create an aggregated data pipeline with Apache Kafka® and PostgreSQL®#

The fourth example highlights the instances where the average CPU load over a windowed interval exceeds the threshold and stores the results in PostgreSQL®.

graph LR; id1(Kafka source)-- timestamped metrics -->id3(Flink job); id2(PosgreSQL source)-- host-specific thresholds -->id3; id3-- high 30-second average CPU -->id4(PostgreSQL sink);

This uses the same CPU_IN Kafka source table and SOURCE_THRESHOLDS PostgreSQL® source table that you created earlier. In addition, you need to define:

  • A new sink table to store the processed data in PostgreSQL®

  • A new Flink job to process the data

To create the data pipeline you can follow the steps below:

Note

For creating and configuring the tables in your PostgreSQL® service, these steps use the Aiven CLI to call psql. You can instead use other tools to complete these steps if you prefer.

  1. In the Aiven CLI, run the following command to connect to the demo-postgresql service:

    avn service cli demo-postgresql --project PROJECT_NAME
    
  2. Enter the following command to set up the PostgreSQL® table for storing the results:

    CREATE TABLE CPU_LOAD_STATS_AGG_PG (
        time_ltz TIMESTAMP(3) PRIMARY KEY, 
        nr_cpus_over_threshold INT
    );
    
  3. In the Aiven web console, select the Jobs & Data tab in your Aiven for Apache Flink® service.

  4. Go to the Data Tables subtab.

  5. Create a Flink table to sink data to the PostgreSQL® service

    1. Select your PostgreSQL® service

    2. Enter cpu_load_stats_agg_pg as the table

    3. Enter CPU_OUT_AGG_PG as the name

    4. Enter the following as the CPU_OUT_AGG_PG SQL schema:

      time_ltz TIMESTAMP(3),
      nr_cpus_over_threshold BIGINT,
      PRIMARY KEY (time_ltz) NOT ENFORCED
      
    5. Click Create Table

  6. Create the Flink data pipeline calculating the CPU average over the time window and checking the value against the thresholds

    1. Go to the Create SQL Job subtab

    2. Enter simple_filter_pg_agg as the name

    3. Select the CPU_OUT_AGG_PG, CPU_IN, and SOURCE_THRESHOLDS tables

    4. Enter the following SQL to join the tables, calculate the average over a window and filter the high CPU average values:

      INSERT INTO CPU_OUT_AGG_PG 
      WITH JOINING_INFO AS(
          SELECT time_ltz, 
              CPU.hostname, 
              cpu, 
              usage, 
              allowed_top 
          FROM CPU_IN CPU INNER JOIN SOURCE_THRESHOLDS 
              FOR SYSTEM_TIME AS OF proctime AS ST 
              ON CPU.hostname = ST.hostname
      ),
      WINDOWING AS (
          SELECT 
              window_start,
              window_end, 
              hostname, 
              cpu, 
              AVG(usage) USAGE, 
              allowed_top
          FROM TABLE(TUMBLE(TABLE JOINING_INFO, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
          GROUP BY 
              window_start,
              window_end, 
              hostname, 
              cpu, 
              allowed_top
      )
      SELECT 
          window_start, 
          COUNT(*) 
      FROM WINDOWING
      WHERE USAGE > allowed_top
      GROUP BY 
          window_start
      
    1. Click Execute job

      The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

      When the job is running, you should start to see entries indicating hosts with high CPU loads in the cpu_load_stats_agg_pg table of your demo-postgresql database.

Replicate the filter stream of data to OpenSearch® for further analysis and data visualization#

The last example takes the list of filtered high CPU samples contained in the CPU_OUT_FILTER_PG Flink table and, after filtering for only the happy and sleepy hostnames, pushes the result to an Aiven for OpenSearch® index for further analysis and data visualization.

graph LR; id4(Kafka source)-- host with high CPU -->id5(Current Flink job); id5-- host with high CPU -->id6(OpenSearch sink);

This uses the CPU_OUT_FILTER_PG Flink table defined during the third example containing the list of CPU samples above the host-specific threshold defined in PostgreSQL®. In addition, you need to define:

  • A new sink table to store the filtered data in OpenSearch®

  • A new Flink job to process the data

To create the data pipeline you can follow the steps below:

  1. In the Aiven web console, select the Jobs & Data tab in your Aiven for Apache Flink® service.

  2. Go to the Data Tables subtab.

  3. Create a Flink table to sink data to the OpenSearch® service

    1. Select your OpenSearch® service

    2. Enter cpu_high_load as the index

    3. Enter CPU_OUT_OS as the name

    4. Enter the following as the CPU_OUT_OS SQL schema:

      time_ltz STRING,
      hostname STRING,
      cpu STRING,
      usage DOUBLE,
      threshold INT
      

      Note

      We can reuse a similar definition to the CPU_OUT_FILTER_PG Flink table since they share the same columns. The only difference is the time_ltz column which is now STRING, as we need to translate the Flink TIMESTAMP to the timestamp format accepted by OpenSearch®.

    5. Click Create Table

  4. Create the Flink data pipeline calculating the CPU average over the time window and checking the value against the thresholds

    1. Go to the Create SQL Job subtab

    2. Enter data_filtering_replication as the name

    3. Select the CPU_OUT_FILTER_PG and CPU_OUT_OS tables

    4. Enter the following SQL to select from the source table, filter happy and sleepy hostnames and push the data to CPU_OUT_OS:

      INSERT INTO CPU_OUT_OS
      SELECT
          DATE_FORMAT(time_ltz, 'yyyy/MM/dd hh:mm:ss'),
          hostname,
          cpu,
          usage,
          threshold
      FROM CPU_OUT_FILTER_PG
      WHERE hostname in ('happy','sleepy')
      

      The above SQL converts the local_ltz field to a string in the format yyyy/MM/dd hh:mm:ss which is recognised by OpenSearch as timestamp.

    5. Click Execute job

      The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

      When the job is running, you should start to see entries indicating samples of the sleepy and happy hostnames with high CPU loads in the cpu_high_load table of your demo-opensearch OpenSearch service. You can use OpenSearch Dashboard to discover more about the datapoints and build advanced visualizations.