Real-Time Database to BigQuery Sync with Kafka and Debezium

Siva Gollapalli
4 min readJan 9, 2025

--

Real Time Data sync using Kafka, Debezium & Big Query

Introduction

This post outlines a robust and scalable approach to synchronise database changes to Google BigQuery in real-time using Apache Kafka, Debezium, and Kafka Connect. This architecture allows for seamless data integration, enabling powerful analytics on your rapidly changing datasets. We will cover the setup and deployment of the necessary components for capturing database changes and streaming them to BigQuery.

Architecture Overview

The core idea involves capturing changes from a source database using Debezium, streaming these changes as events through Kafka topics, and then sinking those events into BigQuery using Kafka Connect. This setup includes the following key components:

  • Source Database (e.g., MySQL): The origin of the data that will be synchronised.
  • Debezium: A change data capture (CDC) tool that monitors database changes and publishes them as Kafka events.
  • Apache Kafka: A distributed streaming platform that acts as a message broker for the change events.
  • Kafka Connect: A framework for building scalable and reliable data pipelines, used here to integrate with Debezium and BigQuery.
  • Schema Registry: A component to manage and evolve schemas of the data being streamed in AVRO format.
  • Google BigQuery: The destination data warehouse for analysis.

Setting Up the Environment

For local development and testing, a Docker Compose file is provided to spin up all the necessary services:

version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.7.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
networks:
- local

kafka:
image: confluentinc/cp-kafka:7.7.1
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
- local
connect:
image: debezium/connect:3.0.0.Final
container_name: container_kafka_connect
ports:
- "8083:8083"
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: kafka_connect_configs
OFFSET_STORAGE_TOPIC: kafka_connect_offsets
BOOTSTRAP_SERVERS: kafka:9092
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on:
- zookeeper
- kafka
- schema_registry
volumes:
- $PWD/files:/kafka/files
- $PWD/wepay-kafka-connect-bigquery-2.5.7/lib:/kafka/connect/wepay-kafka-connect-bigquery-2.5.7
- $PWD/confluentinc-kafka-connect-avro-converter-7.7.1/lib:/kafka/connect/confluentinc-kafka-connect-avro-converter-7.7.1
networks:
- local
schema_registry:
image: confluentinc/cp-schema-registry:7.7.1
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
networks:
- local
networks:
local:
name: local
driver: bridge
external: true
version: '2'
services:
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.7.1
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: false
networks:
- local
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.7.1
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
networks:
- local
networks:
local:
name: local
driver: bridge
external: true

Key Points:

  • Zookeeper and Kafka: Provides the foundation for the distributed messaging system. Note that in production, you will want a multi-node Kafka setup for fault tolerance.
  • Kafka Connect: Hosts Debezium connectors for CDC and the BigQuery sink connector. The necessary connector libraries are mounted into the container.
  • Schema Registry: Manages AVRO schemas, essential for serialising and deserialising data consistently.
  • KSQL DB Server & CLI: For stream processing of data.

Note: Ensure the wepay-kafka-connect-bigquery-2.5.7/lib and confluentinc-kafka-connect-avro-converter-7.7.1/lib directories exist in the same directory as your compose file and contain the respective JAR files. Download them and place them inside kafka connect image.

Deploying Connectors

  1. Debezium Connector (MySQL Example):

Create a debezium-config.json file with the following JSON. Modify the values to match your database environment.

{
"name": "mysql-connector-local",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": 1,
"database.hostname": "<DB HOST NAME>",
"database.port": 3306,
"database.user": "<DB HOST USERNMAE>",
"database.password": "<DB HOST PASSWORD>",
"database.include.list": "<DB>",
"table.include.list": "<TABLES TO TRACK>",
"topic.prefix": "dbchanges",
"database.server.id": 123456,
"schema.history.internal.kafka.topic": "schema_history",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"transforms": "topicRegexRouter",
"transforms.topicRegexRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRegexRouter.regex": "dbchanges.source_database.(.*)",
"transforms.topicRegexRouter.replacement": "new_dim_raw_$1",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal"
}
}

Then, deploy the connector using:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors --data @debezium-config.json

Key points:

  • topic.prefix: Will define the main prefix for the created topic
  • transforms.topicRegexRouter: Used to transform the topic name. In above example we are replacing dbchanges.database.<table> to new_dim_raw_<table>

BigQuery Sink Connector:

Create a bq-config.json file with following JSON and store your BigQuery authentication key file at /kafka/files/bq-auth.json in docker image.. Replace placeholder values as appropriate.

{
"name": "BQ-connector",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": 1,
"topics": "<KAFKA TOPICS>",
"sanitizeTopics": false,
"autoCreateTables": true,
"autoUpdateSchemas": true,
"allBQFieldsNullable": true,
"allowNewBigQueryFields": true,
"allowBigQueryRequiredFieldRelaxation": true,
"project": "big query project",
"defaultDataset": "demo",
"keyfile": "/kafka/files/bq-auth.json",
"transforms": "flattentValue,insertField,timestampConverter",
"transforms.flattentValue.delimiter": "_",
"transforms.flattentValue.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.insertField.timestamp.field": "ingest_time",
"transforms.insertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.timestampConverter.field": "ingest_time",
"transforms.timestampConverter.target.type": "unix",
"transforms.timestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
}
}

Deploy the connector:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors --data @bq-config.json

Key Points:

  • topics: List of kafka topics to listen to and sync.
  • transforms: Used to add metadata information.
  • flattentValue: Flattens nested JSON structures.
  • insertField: To insert ingest_time field.
  • timestampConverter : convert the timestamp into unix timestamp format.
  • keyfile: Google authentication file path inside docker container.

Verifying Connector Deployment

Kakfa connect provides REST API to verify connector deployments. Please make sure to use and verify their status periodically.

Production Considerations

  • Multi-Node Kafka Cluster: Use a multi-node Kafka setup (as linked in the source content) to ensure high availability and fault tolerance.
  • Schema Evolution: Implement a robust schema evolution strategy to handle changes in database schema.
  • Resource Allocation: Tune resource allocations (CPU, memory) for all components based on data throughput and load.
  • Monitoring: Set up monitoring dashboards for Kafka, Kafka Connect, and other services.
  • Security: Implement security measures, including authentication and authorization for all services.

Conclusion

This post provides a detailed guide to set up real-time database change synchronisation from your database to BigQuery using Kafka, Debezium and Kafka Connect. By following these instructions, you can create a reliable and scalable pipeline for capturing database changes and using them for real-time analysis.

Enjoyed this article? Please share it with your network to help others learn.

Stay connected for more insights:

  • Follow me on Medium for the latest articles.
  • Connect with me on LinkedIn for professional updates.
  • Join the conversation on Twitter and share your thoughts.

--

--

No responses yet