Your IP: Unknown · Your Status: ProtectedUnprotectedUnknown

Skip to main content

How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect

Syncing data from a MySQL database to Google BigQuery can be a great way to keep your data up to date and easily accessible for analysis. In this article, we will explore the process of setting up Debezium and Kafka Connect to sync data from MySQL to BigQuery, providing you with all the information you need to get started.

How to sync data from MySQL to Google BigQuery using Debezium and Kafka Connect

Why use Debezium and Kafka Connect?

Debezium and Kafka Connect are open-source platforms that provide a powerful solution for streaming data changes in real-time between systems. That real-time interaction allows you to keep your data in sync and accessible for various use cases such as real-time analytics, data warehousing, and data pipeline integrations.

Technology used

Before we go into the details of setting up Debezium and Kafka Connect to sync data from MySQL to BigQuery, it is important to understand the technologies that you will be using and how they are connected.

Change data capture

Change data capture (CDC) is a technique for capturing and recording all the changes made to a database over time. This allows for real-time data replication, making it easy to keep multiple systems in sync.

CDC does this by detecting row-level changes in database source tables, which are characterized as “Insert,” “Update,” and “Delete” events. CDC then notifies other systems or services that rely on the same data.​

Apache Kafka

Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It allows for the storage and processing of streams of records in a fault-tolerant way.

Kafka Connect

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using so-called connectors.​

Kafka connectors are ready-to-use components that can help you to import data from external systems into Kafka topics and export data from Kafka topics into external systems. You can use existing connector implementations for common data sources and syncs or implement our own connectors.​

Debezium

Debezium is an open-source platform that allows you to easily stream changes from your MySQL database to other systems using CDC. It works by reading MySQL binlog to capture data changes in a transactional manner, so you can be sure that you’re always working with the most up-to-date data.

By using Debezium, you can capture the changes made to the MySQL database and stream them to Kafka. Data on the changes can then be consumed by Kafka Connect to load the data into BigQuery.

BigQuery setup

Tutorial

To start syncing data from MySQL to BigQuery we will need following components:

Start required services

    1. Let’s start with creating a new directory. Open Terminal and run:
      
      $ mkdir mysql-to-bigquery
      $ cd mysql-to-bigquery
      
      
    2. Create a plugins directory
      
      $ mkdir plugins
      
    3. Download Debezium mysql plugin:
      
      $ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.1.Final/debezium-connector-mysql-2.1.1.Final-plugin.tar.gz -O mysql-plugin.tar.gz
      
      
      $ tar -xzf mysql-plugin.tar.gz -C plugins
      
    4. Download BigQuery plugin and put the contents into your plugins directory (in this tutorial we are using version v2.4.3). Now your plugins directory should look like this:
      
      $ ls plugins
      debezium-connector-mysql		wepay-kafka-connect-bigquery-2.4.3
      
    5. Create a new file (“docker-compose.yml”) with these configurations:
      
      version: '2'
      services:
        zookeeper:
          container_name: zookeeper
          image: quay.io/debezium/zookeeper:2.1
          ports:
           - 2181:2181
           - 2888:2888
           - 3888:3888
        kafka:
          container_name: kafka
          image: quay.io/debezium/kafka:2.1
          ports:
           - 9092:9092
          links:
           - zookeeper
          environment:
           - ZOOKEEPER_CONNECT=zookeeper:2181
        mysql:
          container_name: mysql
          image: quay.io/debezium/example-mysql:2.1
          ports:
           - 3306:3306
          environment:
           - MYSQL_ROOT_PASSWORD=debezium
           - MYSQL_USER=mysqluser
           - MYSQL_PASSWORD=mysqlpw
        connect:
          container_name: connect
          image: quay.io/debezium/connect-base:2.1
          volumes:
            - ./plugins:/kafka/connect
          ports:
           - 8083:8083
          links:
           - kafka
           - mysql
          environment:
           - BOOTSTRAP_SERVERS=kafka:9092
           - GROUP_ID=1
           - CONFIG_STORAGE_TOPIC=my_connect_configs
           - OFFSET_STORAGE_TOPIC=my_connect_offsets
           - STATUS_STORAGE_TOPIC=my_connect_statuses
      
    6. Let’s start the services:
      
      $ docker-compose up
      
    7. You should see an output similar to the following:
      
      ...
      2023-01-16 15:48:33,939 INFO   ||  Kafka version: 3.0.0   [org.apache.kafka.common.utils.AppInfoParser]
      ...
      2023-01-16 15:48:34,485 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
      2023-01-16 15:48:34,485 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
      
    8. Check if Debezium is running with Kafka Connect API. An empty array in response shows that there are no connectors currently registered with Kafka Connect.
      
       $ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors
      []
      
    9. We also have MySQL running with an example database inventory. You can check what tables are there by running:
      
       $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SHOW TABLES;"
      +---------------------+
      | Tables_in_inventory |
      +---------------------+
      | addresses           |
      | customers           |
      | geom                |
      | orders              |
      | products            |
      | products_on_hand    |
      +---------------------+
      
      

Let’s check what’s inside the customers table:


 $ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SELECT * FROM customers;"  
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

Configure Debezium to start syncing MySQL to Kafka

Now let’s configure Debezium to start syncing the inventory database with Kafka.

    1. Create a new file (“register-mysql.json”) with these configurations (You can find information about these configuration properties in the Debezium documentation):
      
      {
      	"name": "inventory-connector-mysql",
      	"config": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "mysql",
      		"database.port": "3306",
      		"database.user": "root",
      		"database.password": "debezium",
      		"database.server.id": "184054",
      		"topic.prefix": "debezium",
      		"database.include.list": "inventory",
      		"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
      		"schema.history.internal.kafka.topic": "schemahistory.inventory"
      	}
      }
      
      
    2. Register a MySQL connector:
      
      $ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
      
      
    3. Verify that “inventory-connector” is included in the list of connectors:
      
      $ curl -H "Accept:application/json" localhost:8083/connectors/
      
      ["inventory-connector-mysql"]
      
      
    4. You can now see database contents in Kafka. To see topics, run:
      
      $ docker exec -it kafka bash bin/kafka-topics.sh --list 
      
      --bootstrap-server kafka:9092
      ...
      debezium.inventory.addresses
      debezium.inventory.customers
      ...
      
      

Let’s check debezium.inventory.addresses:


$ docker exec -it kafka bash bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic debezium.inventory.addresses --from-beginning

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium.inventory.addresses.Envelope","version":1},"payload":{"before":null,"after":{"id":10,"customer_id":1001,"street":"3183 Moore Avenue","city":"Euless","state":"Texas","zip":"76036","type":"SHIPPING"},"source":{"version":"2.1.1.Final","connector":"mysql","name":"debezium","ts_ms":1673446748000,"snapshot":"first","db":"inventory","sequence":null,"table":"addresses","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1673446748425,"transaction":null}} 
...

For more information on Debezium events, see this Debezium documentation.

Configure Debezium to start syncing data to Google BigQuery

Before you start configuring the BigQuery connector, move the Google BigQuery service account key file (details in previous section) to your working directory and name it “bigquery-keyfile.json”.

    1. Once you have the key file, copy it to the Connect container:
      
      
      $ docker cp bigquery-keyfile.json connect:/bigquery-keyfile.json
      
    2. Now create a file register-bigquery.json with these configurations (You can find information about these configuration properties in the official documentation):
      
      {
      	"name": "inventory-connector-bigquery",
      	"config": {
      		"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
      		"tasks.max": "1",
      		"consumer.auto.offset.reset": "earliest",
      		"topics.regex": "debezium.inventory.*",
      		"sanitizeTopics": "true",
      		"autoCreateTables": "true",
      		"keyfile": "/bigquery-keyfile.json",
      		"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
      		"project": "mysql-bigquery",
      		"defaultDataset": "debezium",
      		"allBQFieldsNullable": true,
      		"allowNewBigQueryFields": true,
      		"transforms": "regexTopicRename,extractAfterData",
      		"transforms.regexTopicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
      		"transforms.regexTopicRename.regex": "debezium.inventory.(.*)",
      		"transforms.regexTopicRename.replacement": "$1",
      		"transforms.extractAfterData.type": "io.debezium.transforms.ExtractNewRecordState"
      	}
      }
      
      
    3. To register the BigQuery connector, run:
      
      $ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-bigquery.json
      
    4. Verify that “inventory-connector” is included in the list of connectors:
      
      $ curl -H "Accept:application/json" localhost:8083/connectors/
      
      ["inventory-connector-mysql","inventory-connector-bigquery"]
      
      

In your BigQuery dataset, you will now be able to see tables matching those in MySQL.

Now, select data from your customers table. Emails used are for example purposes only and do not correspond to real individuals.

You can create a new entry in MySQL customers table:


$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "INSERT INTO customers VALUES(1005, "Tom", "Addams", "tom.addams@mailer.net");"

You will see that a new entry has automatically synced to BigQuery.

Conclusion

You should now have a clear understanding of the benefits of syncing data from MySQL to BigQuery using Debezium and Kafka Connect. With the detailed tutorial found in this article, you will be able to set up and configure Debezium and Kafka Connect yourself.

As a reminder, it’s important to test and monitor the pipeline to ensure that data is being synced as expected and to troubleshoot any issues that may arise.

For more information on Debezium and Kafka Connect, you can refer to the following resources: