Create a PostgreSQL®-based Apache Flink® table#
To build data pipelines, Apache Flink® requires source and target data structures to be mapped as Flink tables. This functionality can be achieved via the Aiven console or Aiven CLI.
A Flink table can be defined over an existing or new Aiven for PostgreSQL® table to be able to source or sink streaming data. To define a table over an PostgreSQL® table, the table name and columns data format need to be defined, together with the Flink table name to use as reference when building data pipelines.
To define Flink tables, an existing integration must be available between the Aiven for Flink service and one or more Aiven for PostgreSQL® services.
Create a PostgreSQL®-based Apache Flink® table with Aiven Console#
To create a Flink table based on Aiven for PostgreSQL® via Aiven console:
In the Aiven for Apache Flink service page, open the Application tab.
Create a new application or select an existing one with Aiven for PostgreSQL® integration.
If editing an existing application, create a new version to make changes to the source or sink tables.
In the Create new version screen, click Add source tables.
Click Add new table or click Edit if you want to edit an existing source table.
In the Add new source table or Edit source table screen, select the Aiven for PostgreSQL® service as the integrated service.
In the Table SQL section, enter the SQL statement to create the PostgreSQL-based Apache Flink table with the following details:
Write the PostgreSQL® table name in the JDBC table field with the format
When using a PostgreSQL® table as target of a Flink data pipeline, the table needs to exist before starting the Flink application otherwise it will fail.
Define the Flink table name; this name will represents the Flink reference to the topic and will be used during the data pipeline definition
To create a sink table, click Add sink tables and repeat steps 4-6 for sink tables.
In the Create statement section, write the SQL schema that defines the fields retrieved from the PostgreSQL® table and any additional transformations, such as format casting or timestamp extraction.
More details on data types mapping between Apache Flink® and PostgreSQL® are available at the dedicated JDBC Apache Flink® page.
Example: Define a Flink table over a PostgreSQL® table#
The Aiven for PostgreSQL® service named
pg-demo contains a table named
students in the
public schema with the following structure:
CREATE TABLE students_tbl ( student_id INT, student_name VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://', 'table-name' = 'public.students' )
url will be substituted with the appropriate address during runtime.