Replicate data with Kafka (Confluent) and Debezium
Learn how to replicate data from Neon with Kafka (Confluent) and Debezium
Neon's logical replication feature allows you to replicate data from your Neon Postgres database to external destinations.
Confluent Cloud is a fully managed, cloud-native real-time data streaming service, built on Apache Kafka. It allows you to stream data from various sources, including Postgres, and build apps that consume messages from an Apache Kafka cluster.
In this guide, you will learn how to stream data from a Neon Postgres database to a Kafka cluster in Confluent Cloud. You will use the PostgreSQL CDC Source Connector (Debezium) for Confluent Cloud to read Change Data Capture (CDC) events from the Write-Ahead Log (WAL) of your Neon database in real-time. The connector will write events to a Kafka stream and auto-generate a Kafka topic. The connector performs an initial snapshot of the table and then streams any future change events.
note
Confluent Cloud Connectors can be set up using the Confluent Cloud UI or the Confluent command-line interface (CLI). This guide uses the Confluent Cloud UI.
Prerequisites
- A Confluent Cloud account
- A Neon account
Enable logical replication in Neon
important
Enabling logical replication modifies the PostgreSQL wal_level
configuration parameter, changing it from replica
to logical
for all databases in your Neon project. Once the wal_level
setting is changed to logical
, it cannot be reverted. Enabling logical replication also restarts all computes in your Neon project, which means that active connections will be dropped and have to reconnect.
To enable logical replication in Neon:
- Select your project in the Neon Console.
- On the Neon Dashboard, select Settings.
- Select Logical Replication.
- Click Enable to enable logical replication.
You can verify that logical replication is enabled by running the following query from the the Neon SQL Editor:
Create a publication
In this example, we'll create a publication for a users
table in the public
schema of your Neon database.
-
Create the
users
table in your Neon database. You can do this via the Neon SQL Editor or by connecting to your Neon database from an SQL client such as psql. -
Create a publication for the
users
table:
This command creates a publication, named users_publication
, which will include all changes to the users
table in your replication stream.
Create a Postgres role for replication
It is recommended that you create a dedicated Postgres role for replicating data. The role must have the REPLICATION
privilege. The default Postgres role created with your Neon project and roles created using the Neon CLI, Console, or API are granted membership in the neon_superuser role, which has the required REPLICATION
privilege.
Grant schema access to your Postgres role
If your replication role does not own the schemas and tables you are replicating from, make sure to grant access. For example, the following commands grant access to all tables in the public
schema to Postgres role alex
:
GRANT USAGE ON SCHEMA public TO alex;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO alex;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO alex;
Granting SELECT ON ALL TABLES IN SCHEMA
instead of naming the specific tables avoids having to add privileges later if you add tables to your publication.
Grant schema access to your Postgres role
If your replication role does not own the schemas and tables you are replicating from, make sure to grant access. For example, the following commands grant access to all tables in the public
schema to Postgres role alex
:
GRANT USAGE ON SCHEMA public TO alex;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO alex;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO alex;
Granting SELECT ON ALL TABLES IN SCHEMA
instead of naming the specific tables avoids having to add privileges later if you add tables to your publication.
Create a replication slot
The Debezium connector requires a dedicated replication slot. Only one source should be configured to use this replication slot.
To create a replication slot called debezium
, run the following command on your database using your replication role:
SELECT pg_create_logical_replication_slot('debezium', 'pgoutput');
debezium
is the name assigned to the replication slot. You will need to provide the slot name when you set up your source connector in Confluent.pgoutput
is the logical decoder plugin used in this example. Neon supports bothpgoutput
andwal2json
decoder plugins.
important
To prevent storage bloat, Neon automatically removes inactive replication slots after a period of time if there are other active replication slots. If you have or intend on having more than one replication slot, please see Unused replication slots to learn more.
Set up a Kafka cluster in Confluent Cloud
- Sign in to Confluent Cloud at https://confluent.cloud.
- Click Add cluster.
- On the Create cluster page, for the Basic cluster, select Begin configuration.
- On the Region/zones page, choose a cloud provider, a region, and select a single availability zone.
- Select Continue.
- Specify your payment details. You can select Skip payment for now if you're just trying out the setup.
- Specify a cluster name, review the configuration and cost information, and select Launch cluster. In this example, we use
cluster_neon
as the cluster name. It may take a few minutes to provision your cluster. After the cluster has been provisioned, the Cluster Overview page displays.
Set up a source connector
To set up a Postgres CDC source connector for Confluent Cloud:
-
On the Cluster Overview page, under Set up connector, select Get started.
-
On the Connector Plugins page, enter
Postgres
into the search field. -
Select the Postgres CDC Source connector. This is the PostgreSQL CDC Source Connector (Debezium) for Confluent Cloud. This connector will take a snapshot of the existing data and then monitor and record all subsequent row-level changes to that data.
-
On the Add Postgres CDC Source connector page:
- Select the type of access you want to grant the connector. For the purpose of this guide, we'll select Global access, but if you are configuring a production pipeline, Confluent recommends Granular access.
- Click the Generate API key & download button to generate an API key and secret that your connector can use to communicate with your Kafka cluster. Your applications will need this API key and secret to make requests to your Kafka cluster. Store the API key and secret somewhere safe. This is the only time you’ll see the secret.
Click Continue.
-
On the Add Postgres CDC Source connector page:
-
Add the connection details for your Neon database. You can obtain the required details from your Neon connection string, which you can find in the Connection Details widget on the Neon Dashboard. Your connection string will look something like this:
postgresql://alex:AbC123dEf@ep-cool-darkness-123456.us-east-2.aws.neon.tech/dbname?sslmode=require
Enter the details for your connection string into the source connector fields. Based on the sample connection string above, the values would be specified as shown below. Your values will differ.
- Database name:
dbname
- Database server name:
neon_server
(This is a user-specified value that will represent the logical name of your Postgres server. Confluent uses this name as a namespace in all Kafka topic and schema names. It is also used for Avro schema namespaces if the Avro data format is used. The Kafka topic will be created with the prefixdatabase.server.name
. Only alphanumeric characters, underscores, hyphens, and dots are allowed.) - SSL mode:
require
- Database hostname
ep-cool-darkness-123456.us-east-2.aws.neon.tech
(this example shows the portion of a Neon connection string forms the database hostname) - Database port:
5432
(Neon uses port5432
) - Database username:
alex
- Database Password
AbC123dEf
- Database name:
-
If you use Neon's IP Allow feature to limit IP addresses that can connect to Neon, you will need to add the Confluent cluster static IP addresses to your allowlist. For information about configuring allowed IPs in Neon, see Configure IP Allow. If you do not use Neon's IP Allow feature, you can skip this step.
Click Continue.
-
-
Under Output Kafka record value format, select an output format for Kafka record values. The default is
JSON
, so we'll use that format in this guide. Other supported values includeAVRO
,JSON_SR
, andPROTOBUF
, which are schema-based message formats. If you use any of these, you must also configure a Confluent Cloud Schema Registry.Expand the Show advanced configurations drop-down and set the following values:
- Under Advanced configuration
- Ensure Slot name is set to
debezium
. This is the name of the replication slot you created earlier. - Set the Publication name to
users_publication
, which is the name of the publication you created earlier. - Set Publication auto-create mode to
disabled
. You've already created your publication.
- Ensure Slot name is set to
- Under Database details, set Tables included to
public.users
, which is the name of the Neon database table you are replicating from.
Click Continue.
- Under Advanced configuration
-
For Connector sizing, accept the default for the maximum number of Tasks. Tasks can be scaled up at a later time for additional throughput capacity.
Click Continue.
-
Adjust your Connector name if desired, and review your Connector configuration, which is provided in
JSON
format, as shown below. We'll use the default connector name in this guide.{ "connector.class": "PostgresCdcSource", "name": "PostgresCdcSourceConnector_0", "kafka.auth.mode": "KAFKA_API_KEY", "kafka.api.key": "2WY3UABFDN7DDFIV", "kafka.api.secret": "****************************************************************", "schema.context.name": "default", "database.hostname": "ep-cool-darkness-123456.us-east-2.aws.neon.tech", "database.port": "5432", "database.user": "alex", "database.password": "************", "database.dbname": "dbname", "database.server.name": "neon_server", "database.sslmode": "require", "publication.name": "users_publication", "publication.autocreate.mode": "all_tables", "snapshot.mode": "initial", "tombstones.on.delete": "true", "plugin.name": "pgoutput", "slot.name": "debezium", "poll.interval.ms": "1000", "max.batch.size": "1000", "event.processing.failure.handling.mode": "fail", "heartbeat.interval.ms": "0", "provide.transaction.metadata": "false", "decimal.handling.mode": "precise", "binary.handling.mode": "bytes", "time.precision.mode": "adaptive", "cleanup.policy": "delete", "hstore.handling.mode": "json", "interval.handling.mode": "numeric", "schema.refresh.mode": "columns_diff", "output.data.format": "JSON", "after.state.only": "true", "output.key.format": "JSON", "json.output.decimal.format": "BASE64", "tasks.max": "1" }
Click Continue to provision the connector, which may take a few monents to complete.
Verify your Kafka stream
To verify that events are now being published to a Kafka stream in Confluent:
-
Insert a row into your
users
table from the Neon SQL Editor or apsql
client connect to your Neon database. For example:-- Insert a new user INSERT INTO users (username, email) VALUES ('Zhang', 'zhang@example.com');
-
In Confluent Cloud, navigate to your cluster (
cluster_neon
in this guide) and select Topics > neon_server.public.users > Messages. Your newly inserted data should appear at the top of the list of messages.
Next steps
With events now being published to a Kafka stream, you can now set up a connection between Confluent and a supported consumer. This is quite simple using a Confluent Connector. For example, you can stream data to Databricks, Snowflake, or one of the other supported consumers. Refer to the Confluent documentation for connector-specific instructions.
References
Need help?
Join our Discord Server to ask questions or see what others are doing with Neon. Users on paid plans can open a support ticket from the console. For more detail, see Getting Support.