Contents Menu Expand Light mode Dark mode Auto light/dark mode
Light Logo Dark Logo
Aiven.io GitHub
Log in Get started for free
Light Logo Dark Logo
Get started
Light Logo Dark Logo
  • Platform
    • Concepts
      • Aiven service nodes firewall configuration
      • Authentication tokens
      • Availability zones
      • Billing
        • Tax information regarding Aiven services
        • Billing groups
        • Corporate billing
        • Hourly billing model for all services
      • Beta services
      • Cloud security
      • About logging, metrics and alerting
      • Organizations, projects, and managing access permissions
      • Service forking
      • Backups at Aiven
      • Service power cycle
      • Service memory limits
      • Out of memory conditions
      • Static IP addresses
      • TLS/SSL certificates
      • Bring your own account (BYOA)
      • Dynamic Disk Sizing
      • Enhanced compliance environments (ECE)
      • Disaster Recovery testing scenarios
      • Choosing a time series database
      • Service level agreement
      • Maintenance window
      • Service resources
      • Service integration
    • HowTo
      • User and authentication management
        • Add authentication methods
        • Change your email address
        • Create an authentication token
        • Create a new Aiven service user
        • Create and manage teams
        • Manage two-factor authentication
        • Get technical notifications
        • Reactivate suspended projects
      • Organization and project management
        • Create organizations and organizational units
        • Manage projects
      • Service management
        • Create a new service
        • Fork your service
        • Pause or terminate your service
        • Rename a service
        • Scale your service
        • Migrate service to another cloud or region
        • Migrate a public service to a Virtual Private Cloud (VPC)
        • Recover a deleted service
        • Add additional storage
        • Tag your Aiven resources
        • Search for services
        • Access service logs
        • Prepare services for high load
        • Create a service integration
      • Network management
        • Download a CA certificate
        • Restrict network access to your service
        • Enable public access in a VPC
        • Manage static IP addresses
        • Handle resolution errors of private IP addresses
        • Attach a VPC to an AWS Transit Gateway
        • Manage Virtual Private Cloud (VPC) peering
        • Set up Virtual Private Cloud (VPC) peering on Google Cloud Platform (GCP)
        • Set up Virtual Private Cloud (VPC) peering on AWS
        • Set up Azure virtual network peering
        • Use AWS PrivateLink with Aiven services
        • Use Azure Private Link with Aiven services beta
        • Use Google Private Service Connect with Aiven services beta
      • Monitoring management
        • Monitoring services
        • Use Prometheus with Aiven
        • Increase metrics limit setting for Datadog
        • Access JMX metrics via Jolokia
      • Billing management
        • Manage payment card
        • Create billing groups
        • Manage billing groups
        • Billing contact
        • Update your tax status
        • Assign projects to billing groups
        • Solve payment issues when upgrading to larger service plans
        • Request service custom plans
        • Set up Google Cloud Marketplace
        • Move to Google Cloud Marketplace
        • Set up Azure Marketplace
      • SAML Authentication
        • Set up SAML authentication
        • Setting up SAML with OneLogin
        • Setting up SAML with Azure
        • Setting up SAML with Okta
        • Setting up SAML with Auth0
        • Setting up SAML with JumpCloud
      • Get support in the Aiven console
    • Reference
      • EOL for major versions of Aiven Services
      • List of available cloud regions
      • Password policy
      • Project member privileges
      • Default service IP address and hostname
  • Integrations
    • Datadog
      • Send metrics to Datadog
      • Send logs to Datadog
    • Amazon CloudWatch
      • CloudWatch Metrics
      • CloudWatch Logs
        • Send logs to AWS CloudWatch from Aiven web console
        • Send logs to AWS CloudWatch from Aiven client
    • Google Cloud Logging
    • RSyslog
      • Logtail
      • Loggly
    • Send logs to Elasticsearch®
    • Prometheus system metrics
  • Aiven tools
    • Aiven Console
    • Aiven CLI
      • avn account
        • avn account authentication-method
        • avn account team
      • avn billing-group
      • avn card
      • avn cloud
      • avn credits
      • avn events
      • avn mirrormaker
      • avn project
      • avn service
        • avn service acl
        • avn service connection-info
        • avn service connection-pool
        • avn service connector
        • avn service database
        • avn service es-acl
        • avn service flink
        • avn service integration
        • avn service m3
        • avn service privatelink
        • avn service schema-registry-acl
        • avn service index
        • avn service tags
        • avn service topic
        • avn service user
      • avn ticket
      • avn user
        • avn user access-token
      • avn vpc
    • Aiven API
      • API examples
    • Aiven Terraform provider
      • Get started
      • HowTo
        • Enable debug logging
        • Upgrade the Aiven Terraform Provider from v1 to v2
        • Upgrade the Aiven Terraform Provider from v2 to v3
        • Use PostgreSQL provider alongside Aiven Terraform Provider
        • Promote PostgreSQL read-only replica to master
        • Upgrade to OpenSearch® with Terraform
        • Azure virtual network peering
      • Concepts
        • Data sources in Terraform
      • Reference
        • Aiven Terraform Cookbook
          • Apache Kafka and OpenSearch
          • Multicloud PostgreSQL
          • Apache Kafka and Apache Flink
          • Apache Kafka and Apache MirrorMaker
          • Apache Kafka with Karapace
          • Visualize PostgreSQL metrics with Grafana
          • PostgreSQL with custom configs
          • Apache Kafka MongoDB Source Connector
          • Debezium Source Connector across clouds
          • Apache Kafka with topics and HTTP sink connector
          • Apache Kafka with custom configurations
          • M3 and M3 Aggregator
          • PostgreSQL® read-only replica
          • Configure ClickHouse user's access
          • Apache Kafka and ClickHouse
          • ClickHouse and PostgreSQL
        • Troubleshooting
          • Private access error when using VPC
    • Aiven Operator for Kubernetes
  • Apache Kafka
    • Get started
    • Sample data generator
    • Concepts
      • Upgrade procedure
      • Scaling options
      • Access control lists permission mapping
      • Schema registry authorization
      • Apache Kafka® REST API
      • Compacted topics
      • Partition segments
      • Authentication types
      • NOT_LEADER_FOR_PARTITION errors
      • Configuration backups for Apache Kafka®
    • HowTo
      • Code samples
        • Connect with Python
        • Connect with Java
        • Connect with Go
        • Connect with command line
        • Connect with NodeJS
      • Tools
        • Configure properties for Apache Kafka® toolbox
        • Use kcat with Aiven for Apache Kafka®
        • Connect to Apache Kafka® with Conduktor
        • Use Kafdrop Web UI with Aiven for Apache Kafka®
        • Use Provectus® UI for Apache Kafka® with Aiven for Apache Kafka®
        • Use Kpow with Aiven for Apache Kafka®
        • Connect Aiven for Apache Kafka® with Klaw
      • Security
        • Configure Java SSL keystore and truststore to access Apache Kafka
        • Manage users and access control lists
        • Monitor and alert logs for denied ACL
        • Use SASL Authentication with Apache Kafka®
        • Renew and Acknowledge service user SSL certificates
        • Encrypt data using a custom serde
      • Administration tasks
        • Schema registry
          • Use Karapace with Aiven for Apache Kafka®
        • Get the best from Apache Kafka®
        • Manage configurations with Apache Kafka® CLI tools
        • Manage Apache Kafka® parameters
        • View and reset consumer group offsets
        • Configure log cleaner for topic compaction
        • Prevent full disks
        • Set Apache ZooKeeper™ configuration
        • Avoid OutOfMemoryError errors in Aiven for Apache Kafka®
      • Integrations
        • Integration of logs into Apache Kafka® topic
        • Use Apache Kafka® Streams with Aiven for Apache Kafka®
        • Use Apache Flink® with Aiven for Apache Kafka®
        • Configure Apache Kafka® metrics sent to Datadog
        • Use ksqlDB with Aiven for Apache Kafka
        • Add kafka.producer. and kafka.consumer Datadog metrics
      • Topic/schema management
        • Creating an Apache Kafka® topic
        • Create Apache Kafka® topics automatically
        • Get partition details of an Apache Kafka® topic
        • Use schema registry in Java with Aiven for Apache Kafka®
        • Change data retention period
    • Reference
      • Advanced parameters
      • Metrics available via Prometheus
    • Apache Kafka Connect
      • Getting started
      • Concepts
        • List of available Apache Kafka® Connect connectors
        • JDBC source connector modes
        • Causes of “connector list not currently available”
      • HowTo
        • Administration tasks
          • Get the best from Apache Kafka® Connect
          • Bring your own Apache Kafka® Connect cluster
          • Enable Apache Kafka® Connect on Aiven for Apache Kafka®
          • Enable Apache Kafka® Connect connectors auto restart on failures
          • Manage Kafka Connect logging level
          • Request a new connector
        • Source connectors
          • PostgreSQL to Kafka
          • PostgreSQL to Kafka with Debezium
          • MySQL to Kafka
          • MySQL to Kafka with Debezium
          • SQL Server to Kafka
          • SQL Server to Kafka with Debezium
          • MongoDB to Kafka
          • Handle PostgreSQL® node replacements when using Debezium for change data capture
          • MongoDB to Kafka with Debezium
          • Cassandra to Kafka
          • MQTT to Kafka
          • Google Pub/Sub to Kafka
          • Google Pub/Sub Lite to Kafka
          • Couchbase to Kafka
        • Sink connectors
          • Kafka to another database with JDBC
          • Configure AWS for an S3 sink connector
          • Kafka to S3 (Aiven)
          • Use AWS IAM assume role credentials provider
          • Kafka to S3 (Confluent)
          • Configure GCP for a Google Cloud Storage sink connector
          • Kafka to GCS
          • Configure GCP for a Google BigQuery sink connector
          • Kafka to Big Query
          • Kafka to OpenSearch
          • Kafka to Elasticsearch
          • Configure Snowflake for a sink connector
          • Kakfa to Snowflake
          • Kafka to HTTP
          • Kafka to MongoDB
          • Kafka to MongoDB (by Lenses)
          • Kafka to InfluxDB
          • Kafka to Redis
          • Kafka to Cassandra
          • Kafka to Couchbase
          • Kafka to Google Pub/Sub
          • Kafka to Google Pub/Sub Lite
          • Kafka to Splunk
          • Kafka to MQTT
      • Reference
        • Advanced parameters
        • AWS S3 sink connector naming and data format
          • S3 sink connector by Aiven naming and data formats
          • S3 sink connector by Confluent naming and data formats
        • Google Cloud Storage sink connector naming and data formats
        • Metrics available via Prometheus
    • Apache Kafka MirrorMaker2
      • Getting started
      • Concepts
        • Disaster recovery and migration
          • Active-Active Setup
          • Active-Passive Setup
        • Topics included in a replication flow
        • MirrorMaker 2 common parameters
      • HowTo
        • Integrate an external Apache Kafka® cluster in Aiven
        • Set up an Apache Kafka® MirrorMaker 2 replication flow
        • Setup Apache Kafka® MirrorMaker 2 monitoring
        • Remove topic prefix when replicating with Apache Kafka® MirrorMaker 2
      • Reference
        • List of advanced parameters
        • Known issues
        • Terminology for Aiven for Apache Kafka® MirrorMaker 2
    • Karapace
      • Getting started with Karapace
      • Concepts
        • Karapace schema registry authorization
        • ACLs definition
        • Apache Kafka® REST proxy authorization
      • HowTo
        • Enable Karapace schema registry and REST APIs
        • Enable Karapace schema registry authorization
        • Enable Apache Kafka® REST proxy authorization
        • Manage Karapace schema registry authorization
        • Manage Apache Kafka® REST proxy authorization
  • Apache Flink
    • Overview
      • Architecture overview
      • Aiven for Apache Flink features
      • Managed service features
      • Plans and pricing
      • Limitations
    • Quickstart
    • Concepts
      • Aiven Flink applications
      • Built-in SQL editor
      • Flink tables
      • Checkpoints
      • Savepoints
      • Event and processing times
      • Watermarks
      • Windows
      • Stardand and upsert connectors
      • Settings for Apache Kafka® connectors
    • HowTo
      • Get started
      • Data service integrations
      • Aiven for Apache Flink applications
        • Create Apache Flink applications
        • Manage Apache Flink applications
      • Apache Flink tables
        • Manage Apache Flink tables
        • Create Apache Flink tables with data sources
          • Apache Kafka®-based Apache Flink® table
          • Confluent Avro-based Apache Flink® table
          • PostgreSQL®-based Apache Flink® table
          • OpenSearch®-based Apache Flink® table
          • Slack-based Apache Flink® table
          • DataGen-based Apache Flink® table
      • Manage cluster
      • Advanced topics
        • Define OpenSearch® timestamp data in SQL pipeline
    • Reference
      • Advanced parameters
  • Apache Cassandra
    • Overview
    • Quickstart
    • Concepts
      • Tombstones
      • Cross-cluster replication
    • HowTo
      • Get started
      • Connect to service
        • Connect with cqlsh
        • Connect with Python
        • Connect with Go
      • Manage service
        • Manage data with DSBULK
        • Stress test with nosqlbench
      • Manage cluster
      • Cross-cluster replication
        • Enable CCR
        • Manage CCR
        • Disable CCR
    • Reference
      • Advanced parameters
  • ClickHouse
    • Overview
      • Features overview
      • Architecture overview
      • Plans and pricing
      • Limits and limitations
    • Quickstart
    • Concepts
      • Online analytical processing
      • ClickHouse® as a columnar database
      • Indexing and data processing in ClickHouse®
      • Disaster recovery
      • Strings
    • HowTo
      • Get started
        • Load data
        • Secure a service
      • Connect to service
        • Connect with the ClickHouse client
        • Connect with Go
        • Connect with Python
        • Connect with Node.js
        • Connect with PHP
        • Connect with Java
      • Manage service
        • Manage users and roles
        • Manage user permissions with Terraform
        • Manage databases and tables
        • Query databases
        • Create materialized views
        • Monitor performance
        • Read and write data across shards
        • Copy data across ClickHouse servers
      • Manage cluster
      • Integrate service
        • Connect to Grafana
        • Connect to Apache Kafka
        • Connect to PostgreSQL
        • Connect a service as a data source (Apache Kafka and PostgreSQL)
        • Connect services via integration databases
        • Connect to external DBs with JDBC
    • Reference
      • Supported table engines
      • ClickHouse metrics in Grafana
      • Formats for ClickHouse-Kafka data exchange
      • Advanced parameters
  • Grafana
    • Overview
      • Features overview
      • Plans and pricing
    • Quickstart
    • HowTo
      • User access
        • Log in to Aiven for Grafana
        • Update Grafana® service credentials
      • Manage dashboards
        • Dashboard previews
        • Replace strings in Grafana® dashboards
      • Alerts and notifcations
      • Manage cluster
    • Reference
      • Advanced parameters
      • Plugins
  • InfluxDB
    • Get started
    • Concepts
      • Continuous queries
      • InfluxDB® retention policies
    • HowTo
      • Migrate data from self-hosted InfluxDB® to Aiven
    • Reference
      • Advanced parameters for Aiven for InfluxDB®
  • M3DB
    • Get started
    • Concepts
      • Aiven for M3 components
      • About M3DB namespaces and aggregation
      • About scaling M3
    • HowTo
      • Visualize M3DB data with Grafana®
      • Monitor Aiven services with M3DB
      • Use M3DB as remote storage for Prometheus
      • Write to M3 from Telegraf
      • Telegraf to M3 to Grafana® Example
      • Write data to M3DB with Go
      • Write data to M3DB with PHP
      • Write data to M3DB with Python
    • Reference
      • Terminology
      • Advanced parameters
      • Advanced parameters M3Aggregator
  • MySQL
    • Get started
    • Concepts
      • MySQL max_connections
      • Understand MySQL backups
      • Understanding MySQL memory usage
      • MySQL replication
      • MySQL tuning for concurrency
    • HowTo
      • Code samples
        • Connect to MySQL from the command line
        • Using mysqlsh
        • Using mysql
        • Connect to MySQL with Python
        • Connect to MySQL using MySQLx with Python
        • Connect to MySQL with Java
        • Connect to MySQL with PHP
      • Create additional MySQL® databases
      • Create remote replicas
      • Connect to MySQL with MySQL Workbench
      • Run pre-migration checks
      • Migrate to Aiven with CLI
      • Migrate to Aiven via console
      • Backup and restore with mysqldump
      • Prevent MySQL disk full
      • Reclaim disk space
      • Identify disk usage issues
      • Disable foreign key checks
      • Enable slow query logging
      • Create new tables without primary keys
      • Create missing primary keys
    • Reference
      • Advanced parameters
      • Resource capability per plan
  • OpenSearch
    • Quickstart
      • Sample dataset: recipes
    • Overview
      • Service overview
      • Plans and pricing
    • Concepts
      • Access control
      • Backups
      • Indices
      • Aggregations
      • High availability in Aiven for OpenSearch®
      • OpenSearch® vs Elasticsearch
      • Optimal number of shards
      • When to create a new index
      • OpenSearch® cross-cluster replication beta
    • HowTo
      • Access control
      • Connect with service
        • Connect with cURL
        • Connect with NodeJS
        • Connect with Python
      • Data management
        • Copy data from OpenSearch to Aiven for OpenSearch® using elasticsearch-dump
        • Copy data from Aiven for OpenSearch® to AWS S3 using elasticsearch-dump
      • Search and aggregation
        • Search with Python
        • Search with NodeJS
        • Aggregation with NodeJS
      • Migrate Elasticsearch data
      • Cross-cluster replication
      • Manage service
        • Restore an OpenSearch® backup
        • Set index retention patterns
        • Create alerts with OpenSearch® API
        • Handle low disk space
      • Integrate service
        • Manage OpenSearch® log integration
        • Integrate with Grafana®
      • Upgrade to OpenSearch
        • Upgrade to OpenSearch®
        • Upgrade Elasticsearch clients to OpenSearch®
    • OpenSearch Dashboards
      • Getting started
      • HowTo
        • Getting started with Dev tools
        • Create alerts with OpenSearch® Dashboards
    • Reference
      • Plugins
      • Advanced parameters
      • Automatic adjustment of replication factors
      • REST API endpoint access
      • Low disk space watermarks
  • PostgreSQL
    • Get started
    • Sample dataset: Pagila
    • Concepts
      • About aiven-db-migrate
      • Perform DBA-type tasks in Aiven for PostgreSQL®
      • High availability
      • PostgreSQL® backups
      • Connection pooling
      • About PostgreSQL® disk usage
      • Aiven for PostgreSQL® shared buffers
      • About TimescaleDB
      • Upgrade and failover procedures
    • HowTo
      • Code samples
        • Connect with Go
        • Connect with Java
        • Connect with NodeJS
        • Connect with PHP
        • Connect with Python
      • DBA tasks
        • Create additional PostgreSQL® databases
        • Perform a PostgreSQL® major version upgrade
        • Install or update an extension
        • Create manual PostgreSQL® backups
        • Restore PostgreSQL® from a backup
        • Migrate to a different cloud provider or region
        • Claim public schema ownership
        • Manage connection pooling
        • Access PgBouncer statistics
        • Use the PostgreSQL® dblink extension
        • Use the PostgreSQL® pg_repack extension
        • Enable JIT in PostgreSQL®
        • Identify and repair issues with PostgreSQL® indexes with REINDEX
        • Identify PostgreSQL® slow queries
        • Detect and terminate long-running queries
        • Optimize PostgreSQL® slow queries
        • Check and avoid transaction ID wraparound
        • Prevent PostgreSQL® full disk issues
      • Replication and migration
        • Create and use read-only replicas
        • Set up logical replication to Aiven for PostgreSQL®
        • Migrate to Aiven for PostgreSQL® with aiven-db-migrate
          • Enable logical replication on Amazon Aurora PostgreSQL®
          • Enable logical replication on Amazon RDS PostgreSQL®
          • Enable logical replication on Google Cloud SQL
        • Migrate to Aiven for PostgreSQL® with pg_dump and pg_restore
        • Migrating to Aiven for PostgreSQL® using Bucardo
        • Migrate between PostgreSQL® instances using aiven-db-migrate in Python
      • Integrations
        • Connect with psql
        • Connect with pgAdmin
        • Connect with Rivery
        • Connect with Skyvia
        • Connect with Zapier
        • Database monitoring with Datadog
        • Visualize PostgreSQL® data with Grafana®
        • Monitor PostgreSQL® metrics with Grafana®
        • Monitor PostgreSQL® metrics with pgwatch2
        • Connect two PostgreSQL® services via datasource integration
        • Report and analyze with Google Data Studio
    • Troubleshooting
      • Connection pooling
    • Reference
      • Advanced parameters
      • Connection limits per plan
      • Deprecated TLS versions
      • Extensions
      • Keep-alive connections parameters
      • Metrics exposed to Grafana
      • Resource capability per plan
      • Supported log formats
      • Terminology
  • Redis
    • Overview
    • Quickstart
    • Concepts
      • High availablilty
      • Lua scripts
      • Memory management and persistence
    • HowTo
      • Connect to service
        • Connect with redis-cli
        • Connect with Go
        • Connect with NodeJS
        • Connect with PHP
        • Connect with Python
        • Connect with Java
      • DBA tasks
        • Configure ACL permissions in Aiven for Redis®*
        • Migrate from Redis®* to Aiven for Redis®*
      • Estimate maximum number of connection
      • Manage SSL connectivity
      • Handle warning overcommit_memory
      • Benchmark performance
    • Reference
      • Advanced parameters
  • Community
    • Documentation
      • Create anonymous links
      • Create orphan pages
      • Rename files and adding redirects
    • Catch the Bus - Aiven challenge with ClickHouse
    • Rolling - Aiven challenge with Apache Kafka and Apache Flink
  • Tutorials
    • Streaming anomaly detection with Apache Flink, Apache Kafka and PostgreSQL
Get started for free Log in GitHub Aiven.io
Back to top

Streaming anomaly detection with Apache Flink®, Apache Kafka® and PostgreSQL®#

What you will learn#

Follow this tutorial and you’ll learn how to build a streaming anomaly detection system. In particular, we’ll cover the following:

  • How to create a fake streaming dataset.

  • How to create and use Apache Kafka for data streaming.

  • How to create and use PostgreSQL® to store threshold data.

  • How to create and use Apache Flink® to define streaming data pipelines.

  • How to push the outcome of the anomaly detection system as a Slack notification.

What are you going to build#

Anomaly detection is a way to find unusual or unexpected things in data. It is immensely helpful in a variety of fields, such as fraud detection, network security, quality control and others. By following this tutorial you will build your own streaming anomaly detection system.

For example: a payment processor might set up anomaly detection against an e-commerce store if it notices that the store – which sells its items in Indian Rupees and is only configured to sell to the Indian market – is suddenly receiving a high volume of orders from Spain. This behavior could indicate fraud. Another example is that of a domain hosting service implementing a CAPTCHA against an IP address it deems is interacting with one of its domains in rapid succession.

While it’s often easier to validate anomalies in data once they due to be stored in the database, it’s more useful to check in-stream and address unwanted activity before it affects our dataset.

We can check for anomalies in data by creating filtering pipelines using Apache Fiink®. Apache Flink is a flexible, open source tool for in-stream and batch processing of data. It lets us run SQL-like queries against a stream of data and perform actions based on the results of those queries.

In this tutorial you’ll use a fake Internet of Things (IoT) sensor that generates data on a CPU usage for various devices as our continuous flow of data. Once the data is flowing, you’ll then create a basic filtering pipeline to separate the usage values surpassing a fixed threshold (80%).

This example mimics a scenario where you might want to separate and generate alerts for anomalies in single events. For instance, a sensor having a CPU utilization of 99% might create a heating problem and therefore you might want to notify the team in charge of the inventory to schedule the replacement.

graph LR; id1(IoT device)-- sensor reading -->id2(usage > 80%?); id2-- yes -->id3(notification);

However, receiving a notification on every sensor reading that surpasses a fixed threshold can be overwhelming and create false positives for short usage spikes. Therefore you’ll create a second, more advanced, pipeline to average the sensor readings values over 30 seconds windows and then compare the results with various thresholds, defined for every IoT device, and stored in a reference table.

Reading the average CPU value in 30 second windows will improve your initial implementation and help avoid false positive notifications for single spikes, but still let you flag components that are at potential risk of overheating. The threshold lookup enables a more precise definition of alert ranges depending on the device type.

graph LR; id1(IoT device)-- sensor reading -->id2(average 30 seconds); id2-- average reading -->id4(over threshold?); id4-- yes --> id5(notification); id3(threshold table)-- thresholds --> id4;

The tutorial includes:

  • Apache Flink for data transformation.

  • Apache Kafka for data streaming.

  • PostgreSQL® for data storage/query.

  • Slack as notification system.

In this tutorial we’ll be using Aiven services, specifically Aiven for Apache Flink®, Aiven for Apache Kafka®, and Aiven for PostgreSQL®. All of these are open source tools widely available. We encourage you to sign up for a free trial to follow along as it will reduce any issues you might have with networking and getting services to communicate with each other to nearly zero.

Architecture overview#

To build near real-time anomaly detection system, you’ll build a streaming data pipeline that will be able to process the IoT sensor readings as soon as they are generated. The pipeline relies on two sources: the first source is an Apache Kafka topic that contains the fake stream of IoT metrics data and the second is a table in PostgreSQL® database containing alerting thresholds, defined for each IoT device. Then an Apache Flink® service combines the data, applies some transformation SQL to find the anomalies, and pushes the result to a separate Apache Kafka® topic or a Slack channel for team notification.

graph TD; id1(Apache Kafka)-- IoT metrics stream -->id3(Apache Flink); id2(PostgreSQL)-- alerting threshold data -->id3; id3-- filtered/aggregated data -->id1; id3-- filtered data -->id7(Slack);

Prerequisites#

The tutorial uses Aiven services, therefore you’ll need a valid Aiven account. On top of the Aiven account, you will also need the following three items:

  • Docker, needed for the fake data generator for Apache Kafka. Check out the related installation instructions.

  • Slack App and Token: the data pipeline output is a notifications to a Slack channel, check out the needed steps to set up a Slack app and retrieve the Slack authentication token.

  • psql a terminal based tool to interact with PostgreSQL where the threshold data will be stored.

Create the Aiven services#

In this section you’ll create all the services needed to define the anomaly detection system via the Aiven Console:

  • An Aiven for Apache Kafka® named demo-kafka for data streaming, this is where the stream of IoT sensor readings will land.

  • An Aiven for Apache Flink® named demo-flink for streaming data transformation, to define the anomaly detection queries.

  • An Aiven for PostgreSQL® named demo-postgresql for alerting thresholds storage and query.

Create an Aiven for Apache Kafka® service#

The Aiven for Apache Kafka service is responsible for receiving the inbound stream of IoT sensor readings. Create the service with the following steps:

  1. Log in to the Aiven web console.

  2. On the Services page, click Create a new service.

    This opens a new page with the available service options.

    Aiven Console view for creating a new service
  3. Select Apache Kafka®.

  4. Select the cloud provider and region that you want to run your service on.

  5. Select business-4 as service plan. The business-4 plan allows you to define the service integrations needed to define Apache Flink streaming transformations over the Apache Kafka topic.

  1. Enter demo-kafka as name for your service.

  2. Click Create Service under the summary on the right side of the console

Customise the Aiven for Apache Kafka service#

Now that your service is created, you need to customise its functionality. In the Overview tab of your freshly created service, you’ll see a bunch of toggles and properties. Change these two:

  1. Enable the Apache Kafka REST APIs to manage and query via the Aiven Console.

    Navigate to Kafka REST API (Karapace) > Enable.

  2. Enable the automatic creation of Apache Kafka topics to create new Apache Kafka® topics on the fly while pushing a first record.

    Navigate to Advanced configuration > Add configuration option > kafka.auto_create_topics_enable, switch the setting on and then click Save advanced configuration.

Create an Aiven for PostgreSQL® service#

The PostgreSQL database is where you’ll store the threshold data for each IoT device. These thresholds represent the alerting range of each IoT device, e.g. a device might trigger an alert when the usage is over 90%, for other devices, the threshold should be 60%.

You can create the Aiven for PostgreSQL database with the following steps:

  1. Log in to the Aiven web console.

  2. On the Services page, click Create a new service.

  3. Select PostgreSQL®.

  4. Select the cloud provider and region that you want to run your service on.

  5. Select Startup-4 as service plan. The Startup-4 plan allows you to define the service integrations needed to define Apache Flink streaming transformations over the data in the PostgreSQL® table.

  1. Enter demo-postgresql as name for your service.

  2. Click Create Service under the summary on the right side of the console

Create an Aiven for Apache Flink service#

The Apache Flink service is where you’ll define the streaming data pipelines to calculate and detect the anomalies.

You can create the Aiven for Apache Flink service with the following steps:

  1. Log in to the Aiven web console.

  2. On the Services page, click Create a new service.

  3. Select Apache Flink®.

  4. Select the cloud provider and region that you want to run your service on.

  5. Select business-4 as service plan. The business-4 is the minimal plan available for Aiven for Apache Flink, enough to define all the data transformations in this tutorial.

  1. Enter demo-flink as name for your service.

  2. Click Create Service under the summary on the right side of the console.

Integrate Aiven for Apache Flink service with sources and sinks#

After creating the service, you’ll be redirected to the service details page. Apache Flink doesn’t work in isolation, it needs data sources and sinks. Therefore you’ll need to define the integrations between Apache Flink service and:

  • Aiven for Apache Kafka®, which contains the stream of IoT sensor readings.

  • Aiven for PostgreSQL®, which contains the alerting thresholds.

You can define the service integrations, in the Aiven for Apache Flink® Overview tab, with the following steps:

  1. Click Get started on the banner at the top of the Overview page.

    Aiven for Apache Flink Overview tab, showing the **Get started** button
  2. Select Aiven for Apache Kafka® and then select the demo-kafka service.

  3. Click Integrate.

  4. Click the + icon under Data Flow.

  5. Check the Aiven for PostgreSQL checkbox in the Aiven Data Services section.

  6. Select Aiven for PostgreSQL® and then select the demo-postgresql service.

  7. Click Integrate.

Once the above steps are completed, your Data Flow section should be similar to the below:

Aiven for Apache Flink Overview tab, showing the Integrations to Aiven for Apache Kafka and Aiven for PostgreSQL

Set up the IoT metrics streaming dataset#

Now that the plumbing of all the components is sorted, it’s time for you to create a continuous stream of fake IoT data that will land in an Aiven for Apache Kafka topic. There are various ways to generate fake data, for the tutorial you’ll use the Dockerized fake data producer for Aiven for Apache Kafka® allowing you to generate a continuous flow of data with a minimal setup.

Create an Aiven authentication token#

The Dockerized fake data producer for Aiven for Apache Kafka® requires an Aiven authentication token to fetch all the Apache Kafka connection parameters.

You can create an authentication token with the following steps:

  1. Log in to the Aiven web console.

  2. Click the user icon in the top-right corner of the page.

  3. Click Authentication tab and scroll down to Authentication tokens.

    Aiven Console showing the authentication tokens
  4. Click the Generate token button.

  5. Enter a description (optional) and a time limit (optional) for the token. Leave the Max age hours field empty if you do not want the token to expire.

    Aiven Console showing the authentication tokens
  6. Click Generate token.

  7. Click the Copy icon or select and copy the access token.

    Note

    You cannot get the token later after you close this view.

  8. Store the token safely and treat this just like a password.

  9. Click Close.

Start the fake IoT data generator#

It’s time to start streaming the fake IoT data that you’ll later process with with Apache Flink:

Note

You can also use other existing data, although the examples in this tutorial are based on the IoT sample data.

  1. Clone the Dockerized fake data producer for Aiven for Apache Kafka® repository to your computer:

    git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker.git
    
  2. Navigate in the to the fake-data-producer-for-apache-kafka-docker directory and copy the conf/env.conf.sample file to conf/env.conf.

  3. Edit the conf/env.conf file and update the following parameters:

    • PROJECT_NAME to the Aiven project name where your services have been created.

    • SERVICE_NAME to the Aiven for Apache Kafka service name demo-kafka.

    • TOPIC to cpu_load_stats_real.

    • NR_MESSAGES to 0.

      Note

      The NR_MESSAGES option defines the number of messages that the tool creates when you run it. Setting this parameter to 0 creates a continuous flow of messages that never stops.

    • USERNAME to the username used to login in the Aiven console.

    • TOKEN to the Aiven token generated at the previous step of this tutorial.

    Note

    See the Dockerized fake data producer for Aiven for Apache Kafka® instructions for details on the parameters.

  4. Run the following command to build the Docker image:

    docker build -t fake-data-producer-for-apache-kafka-docker .
    
  5. Run the following command to run the Docker image:

    docker run fake-data-producer-for-apache-kafka-docker
    

    You should now see the above command pushing IoT sensor reading events to the cpu_load_stats_real topic in your Apache Kafka® service:

    {"hostname": "dopey", "cpu": "cpu4", "usage": 98.3335306302198, "occurred_at": 1633956789277}
    {"hostname": "sleepy", "cpu": "cpu2", "usage": 87.28240549074823, "occurred_at": 1633956783483}
    {"hostname": "sleepy", "cpu": "cpu1", "usage": 85.3384018012967, "occurred_at": 1633956788484}
    {"hostname": "sneezy", "cpu": "cpu1", "usage": 89.11518629380006, "occurred_at": 1633956781891}
    {"hostname": "sneezy", "cpu": "cpu2", "usage": 89.69951046388306, "occurred_at": 1633956788294}
    

Check the data in Apache Kafka#

To check if your fake data producer is running, head to Apache Kafka in the Aiven console and look for the cpu_load_stats_real topic:

  1. Log in to the Aiven web console.

  2. Click on the Aiven for Apache Kafka service name demo-kafka.

  3. Click on the Topics tab.

  4. On the cpu_load_stats_real line, select the ... symbol and then click on Topic messages.

    Aiven for Apache Kafka Topic tab, showing the ``cpu_load_stats_real`` topic being created and the location of the ``...`` icon
  5. Click on the Fetch Messages button.

  6. Toggle the Decode from base64 option.

  7. You should see the messages being pushed to the Apache Kafka topic:

    detail of the messages in the ``cpu_load_stats_real`` topic including both key and value in JSON format
  8. Click again on the Fetch Messages button to refresh the visualization with new messages.

Create a basic anomaly detection pipeline with filtering#

The first anomaly detection pipeline that you’ll create showcases a basic anomaly detection system: you want to flag any sensor reading exceeding a fixed 80% threshold since it could represent a heating anomaly. You’ll read the IoT sensor readings from the cpu_load_stats_real in Apache Kafka, build a filtering pipeline in Apache Flink, and push the readings above the 80% threshold back to Apache Kafka, but to a separate cpu_load_stats_real_filter topic.

graph TD; id1(Kafka topic: cpu_load_stats_real)-- IoT metrics stream -->id2(Flink application: filtering); id2-- is CPU high? -->id3(Kafka topic: cpu_load_stats_real_filter);

The steps to create the filtering pipeline are the following:

  1. Create a new Aiven for Apache Flink application.

  2. Define a source table to read the metrics data from your Apache Kafka® topic.

  3. Define a sink table to send the processed messages to a separate Apache Kafka® topic.

  4. Define a SQL transformation definition to process the data.

  5. Create an application deployment to execute the pipeline.

If you feel brave, you can go ahead and try try yourself in the Aiven Console. Otherwise you can follow the steps below:

  1. In the Aiven Console, open the Aiven for Apache Flink service named demo-flink and go to the Applications tab.

  2. Click Create new application to create your Flink application.

    The Apache Flink **Application** tab with the **Create Application** button
  3. Name the new application filtering and click Create application.

    The Apache Flink **Application** named ``filtering``
  4. Create the first version of the application by clicking on Create first version button.

  5. In the Add source tables tab, create the source table (named CPU_IN), pointing to the Apache Kafka® topic cpu_load_stats_real where the IoT sensor readings are stored by:

    • Select Aiven for Apache Kafka - demo-kafka as Integrated service

    • Paste the following SQL:

      CREATE TABLE CPU_IN (
          hostname STRING,
          cpu STRING,
          usage DOUBLE,
          occurred_at BIGINT,
          time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
          )
      WITH (
         'connector' = 'kafka',
         'properties.bootstrap.servers' = '',
         'topic' = 'cpu_load_stats_real',
         'value.format' = 'json',
         'scan.startup.mode' = 'earliest-offset'
      )
      

    Once created, the source table tab should look like the following:

    Source table tab with ``CPU_IN`` table defined

    Before saving the source table definition, you can check if it matches the data in the topic by clicking on the triangle next to Run. You should see the populated data.

    The Apache Flink source definition with SQL preview of the data
  6. Navigate to the Add sink table tab.

  7. Create the sink table (named CPU_OUT_FILTER), pointing to a new Apache Kafka® topic named cpu_load_stats_real_filter where the readings exceeding the 80% threshold will land, by:

    • Clicking on the Add your first sink table.

    • Selecting Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Pasting the following SQL:

      CREATE TABLE CPU_OUT_FILTER (
          time_ltz TIMESTAMP(3),
          hostname STRING,
          cpu STRING,
          usage DOUBLE
          )
      WITH (
         'connector' = 'kafka',
         'properties.bootstrap.servers' = '',
         'topic' = 'cpu_load_stats_real_filter',
         'value.format' = 'json',
         'scan.startup.mode' = 'earliest-offset'
      )
      

    Once created, the sink table tab should look like the following:

    Sink table tab with ``CPU_OUT`` table defined
  8. Navigate to the Create statement tab.

  9. Enter the following as the transformation SQL statement, taking data from the CPU_IN table and pushing the samples over the 80% threshold to CPU_OUT_FILTER:

    INSERT INTO CPU_OUT_FILTER 
    SELECT 
        time_ltz, 
        hostname, 
        cpu, 
        usage 
    FROM CPU_IN 
    WHERE usage > 80
    

    If you’re curious, you can preview the output of the transformation by clicking on the triangle next to the Run section, the Create statement window should be similar to the following image.

    The Apache Flink data transformation with SQL preview of the data
  10. Click Save and deploy later.

  11. Click Create deployment.

  12. Accept the default deployment parameters and click on Deploy without a savepoint.

    Detail of the new deployment screen showing the default version, savepoint and parallelism parameters
  13. The new application deployment status will show Initializing and then Running: version 1.

Once the application is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_real_filter topic of your demo-kafka Apache Kafka service.

The Apache Flink data transformation with SQL preview of the data

Important

Congratulations! You created your first streaming anomaly detection pipeline!

The data is now available in the Apache Kafka topic named cpu_load_stats_real_filter, from there you could either write your own Apache Kafka consumer to read the high sensor records or use Kafka Connect to sink the data to a wide range of technologies.

Evolve the anomaly detection pipeline with windowing and threshold lookup#

In most production environments, you wouldn’t want to send an alert on every measurement above the threshold. Sometimes CPUs spike momentarily, for example, and come back down in usage milliseconds later. What’s really useful to you in production is if a CPU spike is sustained over a certain period of time.

If a CPU usage spike happens continuously for a 30 seconds interval, there might be a problem. In this step, you’ll aggregate the CPU load over a configured time using windows and the event time. By averaging the CPU values over a time window you can filter out short term spikes in usage, and flag only anomaly scenarios where the usage is consistently above a pre-defined threshold for a long period of time.

To add a bit of complexity, and mimic a real scenario, we’ll also move away from a fixed 80% threshold, and compare the average utilization figures with the different thresholds, set in a reference table (stored in PostgreSQL), for the various IoT devices based on their hostname. Every IoT device is different, and various devices usually have different alerting ranges. The reference table provides an example of variable, device dependent, thresholds.

graph TD; id1(Kafka topic: cpu_load_stats_real)-- IoT metrics stream -->id3(Flink application: cpu_aggregation); id3-- 30-second average CPU -->id4(Kafka topic: cpu_agg_stats); id4-- aggregated data -->id5(Flink application: cpu_agg); id6(Postgresql table: thresholds)-- threshold -->id5(Flink application: cpu_agg_comparison); id5-- over threshold -->id7(Slack notification);

Create the windowing pipeline#

In this step, you’ll create a pipeline to average the CPU metrics figures in 30 seconds windows. Averaging the metric over a time window allows to avoid notification for temporary spikes.

graph TD; id1(Kafka topic: cpu_load_stats_real)-- IoT metrics stream -->id3(Flink application: cpu_aggregation); id3-- 30-second average CPU -->id4(Kafka topic: cpu_agg_stats);

Note

In this section, you will be able to reuse CPU_IN source table definition created previously. Importing a working table definition, rather than re-defining it, is a good practice to avoid mistakes.

To complete the section, you will perform the following steps:

  • Create a new Aiven for Apache Flink application.

  • Import the previously created CPU_IN source table to read the metrics data from your Apache Kafka® topic.

  • Define a sink table to send the processed messages to a separate Apache Kafka® topic.

  • Define a SQL transformation definition to process the data.

  • Create an application deployment to execute the pipeline.

You can go ahead an try yourself to define the windowing pipeline. If, on the other side, you prefer a step by step approach, follow the instructions below:

  1. In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.

  2. Click on Create new application and name it cpu_agg.

  3. Click on Create first version.

  4. To import the source CPU_IN table from the previously created filtering application:
    • Click on Import existing source table

    • Select filtering as application, Version 1 as version, CPU_IN as table and click Next

    • Click on Add table

  5. Navigate to the Add sink tables tab.

  6. Create the sink table (named CPU_OUT_AGG) pointing to a new Apache Kafka® topic named cpu_agg_stats, where the 30 second aggregated data will land, by:

    • Clicking on the Add your first sink table.

    • Selecting Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Pasting the following SQL:

      CREATE TABLE CPU_OUT_AGG(
          window_start TIMESTAMP(3),
          window_end TIMESTAMP(3),
          hostname STRING,
          cpu STRING,
          usage_avg DOUBLE,
          usage_max DOUBLE
          )
      WITH (
         'connector' = 'kafka',
         'properties.bootstrap.servers' = '',
         'topic' = 'cpu_agg_stats',
         'value.format' = 'json',
         'scan.startup.mode' = 'earliest-offset'
      )
      
    • Click Add table.

  7. Navigate to the Create statement tab.

  8. Enter the following as the transformation SQL statement, taking data from the CPU_IN table, aggregating the data over a 30 seconds window, and pushing the output to CPU_OUT_AGG:

    INSERT INTO CPU_OUT_AGG
    SELECT 
        window_start,
        window_end, 
        hostname, 
        cpu, 
        AVG(usage), 
        MAX(usage)
    FROM 
        TABLE( TUMBLE( TABLE CPU_IN, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
    GROUP BY 
        window_start,
        window_end, 
        hostname, 
        cpu
    
  9. Click Save and deploy later.

  10. Click Create deployment.

  11. Accept the default deployment parameters and click on Deploy without a savepoint.

  12. The new application deployment status will show Initializing and then Running: version 1.

When the application is running, you should start to see messages containing the 30 seconds CPU average in the cpu_agg_stats topic of your demo-kafka service.

Create a threshold table in PostgreSQL#

You will use a PostgreSQL table to store the various IoT thresholds based on the hostname. The table will later be used by a Flink application to compare the average CPU usage with the thresholds and send the notifications to a Slack channel.

You can create the thresholds table in the demo-postgresql service with the following steps:

Note

The below instructions assume psql is installed in your local machine.

  1. In the Aiven Console, open the Aiven for PostgreSQL service demo-postgresql.

  2. In the Overview tab locate the Service URI parameter and copy the value.

  3. Connect via psql to demo postgresql with the following terminal command, replacing the <SERVICE_URI> placeholder with the Service URI string copied in the step above:

    psql "<SERVICE_URI>"
    
  4. Create the cpu_thresholds table and populate the values with the following code:

    CREATE TABLE cpu_thresholds
        (
        hostname VARCHAR, 
        allowed_top INT
        );
    
    INSERT INTO cpu_thresholds 
        VALUES  ('doc', 20),
                ('grumpy', 30),
                ('sleepy',40),
                ('bashful',60),
                ('happy',70),
                ('sneezy',80),
                ('dopey',90);
    
  5. Enter the following command to check that the threshold values are correctly populated:

    SELECT * FROM cpu_thresholds;
    

    The output shows you the content of the table:

    hostname | allowed_top
    ---------+------------
    doc      |     20
    grumpy   |     30
    sleepy   |     40
    bashful  |     60
    happy    |     70
    sneezy   |     80
    dopey    |     90
    

Create the notification pipeline comparing average CPU data with the thresholds#

At this point, you should have both a stream of the 30 seconds average CPU metrics coming from Apache Kafka, and a set of “per-device” thresholds stored in the PostgreSQL database. This section showcases how you can compare the usage with the thresholds and send a slack notification identifying anomaly situations of when the usage is exceeding the thresholds.

graph TD; id1(Kafka topic: cpu_agg_stats); id1-- aggregated CPU data -->id2(Flink application: cpu_agg); id3(Postgresql table: thresholds)-- threshold -->id2(Flink application: cpu_agg_comparison); id2-- over threshold -->id4(Slack notification);

You can complete the section with the following steps:

  • Create a new Aiven for Apache Flink application.

  • Create a source table to read the aggregated metrics data from your Apache Kafka® topic.

  • Define a sink table to send the processed messages to a separate Slack channel.

  • Define a SQL transformation definition to process the data.

  • Create an application deployment to execute the pipeline.

To create the notification data pipeline, you can go ahead an try yourself or follow the steps below:

  1. In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.

  2. Click on Create new application and name it cpu_notification.

  3. Click on Create first version.

  4. To create a source table CPU_IN_AGG pointing to the Apache Kafka topic cpu_agg_stats:

    • Click on Add your first source table.

    • Select Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Paste the following SQL:

      CREATE TABLE CPU_IN_AGG(
          window_start TIMESTAMP(3),
          window_end TIMESTAMP(3),
          hostname STRING,
          cpu STRING,
          usage_avg DOUBLE,
          usage_max DOUBLE
          )
      WITH (
         'connector' = 'kafka',
         'properties.bootstrap.servers' = '',
         'topic' = 'cpu_agg_stats',
         'value.format' = 'json',
         'scan.startup.mode' = 'earliest-offset'
      )
      
    • Click Add table.

  5. To create a source table CPU_THRESHOLDS pointing to the PostgreSQL table cpu_thresholds:

    • Click on Add new table.

    • Select Aiven for PostgreSQL - demo-postgresql as Integrated service.

    • Paste the following SQL:

      CREATE TABLE CPU_THRESHOLDS(
          hostname STRING,
          allowed_top INT,
          PRIMARY KEY (hostname) NOT ENFORCED
          )
      WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:postgresql://',
          'table-name' = 'public.cpu_thresholds'
      )
      
    • Click Add table.

  6. Navigate to the Add sink tables tab.

  7. To create a sink table SLACK_SINK pointing to a Slack channel for notifications:

    • Click on Add your first sink table.

    • Select No integrated service as Integrated service.

    • Paste the following SQL, replacing the <SLACK_TOKEN> placeholder with the Slack authentication token:

      create table SLACK_SINK (
          channel_id STRING,
          message STRING
      ) WITH (
          'connector' = 'slack',
          'token' = '<SLACK_TOKEN>'
      )
      
  8. Navigate to the Create statement tab.

  9. Enter the following as the transformation SQL statement, taking data from the CPU_IN_AGG table, comparing it with the threshold values from CPU_THRESHOLDS and pushing the samples over the threshold to SLACK_SINK:

    INSERT INTO SLACK_SINK
    SELECT
        '<CHANNEL_ID>', 
        'host:' || CPU.hostname || 
        ' CPU: ' || cpu || 
        ' avg CPU value:' ||  TRY_CAST(usage_avg as string) || 
        ' over the threshold ' || TRY_CAST(allowed_top as string)
    FROM CPU_IN_AGG CPU INNER JOIN CPU_THRESHOLDS
        ON CPU.hostname = CPU_THRESHOLDS.hostname 
    WHERE usage_avg > allowed_top
    

    Note

    The <CHANNEL_ID> placeholder needs to be replaced by the Slack channel ID parameter.

  10. Click Save and deploy later.

  11. Click Create deployment.

  12. Accept the default deployment parameters and click on Deploy without a savepoint.

  13. The new application deployment status will show Initializing and then Running: version 1.

When the application is running, you should start to see notifications about the IoT devices having CPU usage going over the defined thresholds in the Slack channel.

A list of Slack notifications driven by the anomaly detection data pipeline

Important

Congratulations! You created an advanced streaming data pipeline including windowing, joining data coming from different technologies and a Slack notification system

Did you find this useful?

Apache, Apache Kafka, Kafka, Apache Flink, Flink, Apache Cassandra, and Cassandra are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. M3, M3 Aggregator, M3 Coordinator, OpenSearch, PostgreSQL, MySQL, InfluxDB, Grafana, Terraform, and Kubernetes are trademarks and property of their respective owners. *Redis is a registered trademark of Redis Ltd. Any rights therein are reserved to Redis Ltd. Any use by Aiven is for referential purposes only and does not indicate any sponsorship, endorsement or affiliation between Redis and Aiven. All product and service names used in this website are for identification purposes only and do not imply endorsement.

Copyright © 2022, Aiven Team | Show Source | Last updated: March 2023
Contents
  • Streaming anomaly detection with Apache Flink®, Apache Kafka® and PostgreSQL®
    • What you will learn
    • What are you going to build
    • Architecture overview
    • Prerequisites
    • Create the Aiven services
      • Create an Aiven for Apache Kafka® service
      • Customise the Aiven for Apache Kafka service
      • Create an Aiven for PostgreSQL® service
      • Create an Aiven for Apache Flink service
      • Integrate Aiven for Apache Flink service with sources and sinks
    • Set up the IoT metrics streaming dataset
      • Create an Aiven authentication token
      • Start the fake IoT data generator
      • Check the data in Apache Kafka
    • Create a basic anomaly detection pipeline with filtering
    • Evolve the anomaly detection pipeline with windowing and threshold lookup
      • Create the windowing pipeline
      • Create a threshold table in PostgreSQL
      • Create the notification pipeline comparing average CPU data with the thresholds