Create a PostgreSQL®-based Apache Flink® table
To build data pipelines, Apache Flink® requires source and target data structures to be mapped as Flink tables. This functionality can be achieved via the Aiven console or Aiven CLI.
A Flink table can be defined over an existing or new Aiven for PostgreSQL® table to be able to source or sink streaming data. To define a table over an PostgreSQL® table, the table name and columns data format need to be defined, together with the Flink table name to use as reference when building data pipelines.
To define Flink tables, an existing integration must be available between the Aiven for Flink service and one or more Aiven for PostgreSQL® services.
Create a PostgreSQL®-based Apache Flink® table with Aiven Console
To create a Flink table based on Aiven for PostgreSQL® via Aiven console:
In the Aiven for Apache Flink service page, select Application from the left sidebar.
Create a new application or select an existing one with Aiven for PostgreSQL® integration.
If editing an existing application, create a new version to make changes to the source or sink tables.
In the Create new version screen, select Add source tables.
Select Add new table or select Edit if you want to edit an existing source table.
In the Add new source table or Edit source table screen, select the Aiven for PostgreSQL® service as the integrated service.
In the Table SQL section, enter the SQL statement to create the PostgreSQL-based Apache Flink table with the following details:
When using a PostgreSQL® table as target of a Flink data pipeline, the table needs to exist before starting the Flink application otherwise it will fail.
To create a sink table, select Add sink tables and repeat steps 4-6 for sink tables.
In the Create statement section, write the SQL schema that defines the fields retrieved from the PostgreSQL® table and any additional transformations, such as format casting or timestamp extraction.
Example: Define a Flink table over a PostgreSQL® table
The Aiven for PostgreSQL® service named
pg-demo contains a table named
students in the
public schema with the following structure:
CREATE TABLE students_tbl (
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://',
'table-name' = 'public.students'
url will be substituted with the appropriate address during runtime.