Skip to main content

Bring your own Apache Kafka® Connect cluster

Aiven provides Apache Kafka® Connect as a managed service in combination with the Aiven for Apache Kafka® managed service. However, there are circumstances where you may want to roll your own Kafka Connect cluster.

The following article defines the necessary steps to integrate your own Apache Kafka Connect cluster with Aiven for Apache Kafka and use the schema registry offered by Karapace. The example shows how to create a JDBC sink connector to a PostgreSQL® database.

Prerequisites

To bring your own Apache Kafka Connector, you need an Aiven for Apache Kafka service up and running.

Furthermore, for the JDBC sink connector database example, you need to collect the following information about the Aiven for Apache Kafka service and the target database upfront:

  • APACHE_KAFKA_HOST: The hostname of the Apache Kafka service
  • APACHE_KAFKA_PORT: The port of the Apache Kafka service
  • REST_API_PORT: The Apache Kafka's REST API port, only needed when testing data flow with REST APIs
  • REST_API_USERNAME: The Apache Kafka's REST API username, only needed when testing data flow with REST APIs
  • REST_API_PASSWORD: The Apache Kafka's REST API password, only needed when testing data flow with REST APIs
  • SCHEMA_REGISTRY_PORT: The Apache Kafka's schema registry port, only needed when using Avro as data format
  • SCHEMA_REGISTRY_USER: The Apache Kafka's schema registry username, only needed when using Avro as data format
  • SCHEMA_REGISTRY_PASSWORD: The Apache Kafka's schema registry user password, only needed when using Avro as data format
  • PG_HOST: The PostgreSQL service hostname
  • PG_PORT: The PostgreSQL service port
  • PG_USERNAME: The PostgreSQL service username
  • PG_PASSWORD: The PostgreSQL service password
  • PG_DATABASE_NAME: The PostgreSQL service database name
note

If you're using Aiven for PostgreSQL and Aiven for Apache Kafka the above details are available in the Aiven console service Overview tab or via the dedicated avn service get command with the Aiven CLI.

Attach your own Apache Kafka Connect cluster to Aiven for Apache Kafka®

The following example demonstrates how to setup a local Apache Kafka Connect cluster with a working JDBC sink connector and attach it to an Aiven for Apache Kafka service.

Setup the truststore and keystore

Create a Java keystore and truststore for the Aiven for Apache Kafka service. For the following example we assume:

  • The keystore is available at KEYSTORE_PATH/client.keystore.p12
  • The truststore is available at TRUSTSTORE_PATH/client.truststore.jks
  • For simplicity, the same secret (password) is used for both the keystore and the truststore, and is shown here as KEY_TRUST_SECRET

Configure the Aiven for Apache Kafka service

You need to enable the schema registry features offered by Karapace. You can do it in the Aiven Console in the Aiven for Apache Kafka service Overview tab.

  1. Enable the Schema Registry (Karapace) and Apache Kafka REST API (Karapace)
  2. In the Topic tab, create a new topic called jdbc_sink, the topic will be used by the Apache Kafka Connect connector

Download the required binaries

The following binaries are needed to setup a Apache Kafka Connect cluster locally:

Setup the local Apache Kafka Connect cluster

The following process defines the setup required to create a local Apache Kafka Connect cluster. The example shows the steps needed with the Apache Kafka 3.1.0, Avro converter 7.1.0 and JDBC connector 6.7.0 versions:

  1. Extract the Apache Kafka binaries

    tar -xzf kafka_2.13-3.1.0.tgz
  2. Within the newly created kafka_2.13-3.1.0 folder, create a plugins folder containing a lib sub-folder

    cd kafka_2.13-3.1.0
    mkdir -p plugins/lib
  3. Unzip the JDBC and Avro binaries and copy the jar files in the plugins/lib folder

    # extract aiven connect jdbc
    unzip jdbc-connector-for-apache-kafka-6.7.0.zip
    # extract confluent kafka connect avro converter
    unzip confluentinc-kafka-connect-avro-converter-7.1.0.zip
    # copying plugins in the plugins/lib folder
    cp jdbc-connector-for-apache-kafka-6.7.0/*.jar plugins/lib/
    cp confluentinc-kafka-connect-avro-converter-7.1.0/*.jar plugins/lib/
  4. Create a properties file, my-connect-distributed.properties, under the main kafka_2.13-3.1.0 folder, for the Apache Kafka Connect settings. Change the following placeholders:

    • PATH_TO_KAFKA_HOME to the path to the kafka_2.13-3.1.0 folder
    • APACHE_KAFKA_HOST, APACHE_KAFKA_PORT, SCHEMA_REGISTRY_PORT, SCHEMA_REGISTRY_USER, SCHEMA_REGISTRY_PASSWORD, to the related parameters fetched in the prerequisite step
    • KEYSTORE_PATH, TRUSTSTORE_PATH and KEY_TRUST_SECRET to the keystore, truststore location and related secret as defined in the related step
    # Define the folders for plugins, including the JDBC and Avro
    plugin.path=PATH_TO_KAFKA_HOME/kafka_2.13-3.1.0/plugins

    # Defines the location of the Apache Kafka bootstrap servers
    bootstrap.servers=APACHE_KAFKA_HOST:APACHE_KAFKA_PORT

    # Defines the group.id used by the connection cluster
    group.id=connect-cluster

    # Defines the input data format for key and value: JSON without schema
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

    # Defines the internal data format for key and value: JSON without schema
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false

    # Connect clusters create three topics to manage offsets, configs, and status
    # information. Note that these contribute towards the total partition limit quota.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    offset.storage.partitions=3

    config.storage.topic=connect-configs
    config.storage.replication.factor=3

    status.storage.topic=connect-status
    status.storage.replication.factor=3

    # Defines the flush interval for the offset comunication
    offset.flush.interval.ms=10000

    # Defines the SSL endpoint
    ssl.endpoint.identification.algorithm=https
    request.timeout.ms=20000
    retry.backoff.ms=500
    security.protocol=SSL
    ssl.protocol=TLS
    ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks
    ssl.truststore.password=KEY_TRUST_SECRET
    ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12
    ssl.keystore.password=KEY_TRUST_SECRET
    ssl.key.password=KEY_TRUST_SECRET
    ssl.keystore.type=PKCS12

    # Defines the consumer SSL endpoint
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    consumer.security.protocol=SSL
    consumer.ssl.protocol=TLS
    consumer.ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks
    consumer.ssl.truststore.password=KEY_TRUST_SECRET
    consumer.ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12
    consumer.ssl.keystore.password=KEY_TRUST_SECRET
    consumer.ssl.key.password=KEY_TRUST_SECRET
    consumer.ssl.keystore.type=PKCS12

    # Defines the producer SSL endpoint
    producer.ssl.endpoint.identification.algorithm=https
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    producer.security.protocol=SSL
    producer.ssl.protocol=TLS
    producer.ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks
    producer.ssl.truststore.password=KEY_TRUST_SECRET
    producer.ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12
    producer.ssl.keystore.password=KEY_TRUST_SECRET
    producer.ssl.key.password=KEY_TRUST_SECRET
    producer.ssl.keystore.type=PKCS12
  5. Start the local Apache Kafka Connect cluster, executing the following from the kafka_2.13-3.1.0 folder:

    ./bin/connect-distributed.sh ./my-connect-distributed.properties

Add the JDBC sink connector

The following steps define how you can add a JDBC connector to the local Apache Kafka Connect cluster:

  1. Create the JDBC sink connector JSON configuration file named jdbc-sink-pg.json with the following content, replacing the placeholders PG_HOST, PG_PORT, PG_USERNAME, PG_PASSWORD, PG_DATABASE_NAME, APACHE_KAFKA_HOST, SCHEMA_REGISTRY_PORT, SCHEMA_REGISTRY_USER, SCHEMA_REGISTRY_PASSWORD.

    {
    "name": "jdbc-sink-pg",
    "config": {
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://PG_HOST:PG_PORT/PG_DATABASE_NAME?user=PG_USERNAME&password=PG_PASSWORD&ssl=required",
    "tasks.max": "1",
    "topics": "jdbc_sink",
    "auto.create": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
    }
    }
  2. Create the JDBC sink connector instance using Kafka Connect REST APIs

    curl -s -H "Content-Type: application/json" -X POST \
    -d @jdbc-sink-pg.json \
    http://localhost:8083/connectors/
  3. Check the status of the JDBC sink connector instance, jq is used to beautify the output

    curl localhost:8083/connectors/jdbc-sink-pg/status | jq

    The result should be similar to the following

    {
    "name": "jdbc-sink-pg",
    "connector": {
    "state": "RUNNING",
    "worker_id": "10.128.0.12:8083"
    },
    "tasks": [
    {
    "id": 0,
    "state": "RUNNING",
    "worker_id": "10.128.0.12:8083"
    }
    ],
    "type": "sink"
    }
tip

Check the dedicated blog post for an end-to-end example of how to setup a Kafka Connect cluster to host a custom connector.

Verify the JDBC connector using Karapace REST APIs

To verify that the connector is working, you can write messages to the jdbc_sink topic in Avro format using Karapace REST APIs, by following the steps below:

  1. Create a new Avro schema using the /subjects/ endpoint, after changing the placeholders for REST_API_USER, REST_API_PASSWORD, APACHE_KAFKA_HOST, REST_API_PORT

    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '''
    {"schema":
    "{\"type\": \"record\",\"name\": \"jdbcsinkexample\",\"namespace\": \"example\",\"doc\": \"example\",\"fields\": [{ \"type\": \"string\", \"name\": \"name\", \"doc\": \"person name\", \"namespace\": \"example\", \"default\": \"mario\"},{ \"type\": \"int\", \"name\": \"age\", \"doc\": \"persons age\", \"namespace\": \"example\", \"default\": 5}]}"
    }''' \
    https://REST_API_USER:REST_API_PASSWORD@APACHE_KAFKA_HOST:REST_API_PORT/subjects/jdbcsinkexample/versions/

    The above call creates a new schema called jdbcsinkexample with a schema containing two fields (name and age).

  2. Create a new message in the jdbc_sink topic using the jdbcsinkexample schema, after changing the placeholders for REST_API_USER, REST_API_PASSWORD, APACHE_KAFKA_HOST, REST_API_PORT

    curl -H "Content-Type: application/vnd.kafka.avro.v2+json" -X POST \
    -d '''
    {"value_schema":
    "{\"namespace\": \"test\", \"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}]}",
    "records": [{"value": {"name": "Eric","age":77}}]}''' \
    https://REST_API_USER:REST_API_PASSWORD@APACHE_KAFKA_HOST:REST_API_PORT/topics/jdbc_sink
  3. Verify the presence of a table called jdbc_sink in PostgreSQL containing the row with name Eric and age 77