Leverage Flink CDC if you already replicate MySQL

Having gone through the Flink CDC blog importing data from MySQL to Doris, I came across a few blockers which will hopefully help a few people out there following the same instructions.

https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/get-started/quickstart/mysql-to-doris

Firstly you need to understand that using those instructions we are hooking into an existing MySQL replication setup with a source and replica (destination) MySQL server. The reason is that the instructions of the replication are recorded in the bin log and this will only exist if there is a replication running.

If submitting the YAML specified job (mysql-to-doris.yaml) gets into FLINK successfully

and you can see it running, it may still fail (and retry & fail) on the command of “SHOW MASTER STATUS” as it is looking for a replication job.

Click on the Job Name to get an overview of the job and switch to the Exceptions tab

Flink will be running the command: SHOW MASTER STATUS; and it may show “Make sure your server is correctly configured”.

“Show master status” is deprecated, but still working. It will throw an error when there is no replication setup. A newer SQL command “Show replica status” equally comes up empty. Flink needs this replication information to re-do the replication to Doris DB.

To setup MySQL replication, this blog on Digital Ocean helped a lot:

https://www.digitalocean.com/community/tutorials/how-to-set-up-replication-in-mysql#understanding-replication-in-mysql

Only once this is setup and you have a result returned, the job in FLINK will actually work. If you have trouble getting to this point, here a few more tips we can provide.

Task Manager and Jobs should not be 0 in the dashboard


Your FLINK conf file has entries like taskmanager.numberOfTaskSlots: which when modified should be reflected in the dashboard. If Total Task Slots stays at 0, there is something wrong.

Check the logs (under the FLINK log directory) as to why the taskmanager is failing to load, for example the FLINK_HOME is not set properly.
e.g. export FLINK_HOME in the .bashrc

Make sure the indentation in the YAML file is perfect, 2 spaces across. Install a YAML parser which may be missing (if the logs indicate that):

sudo apt install libyaml-snake-java

Key Flink BlobServer Error

The Flink BlobServer shows as not being able to connect to 127.0.0.1 in the logs even though your config for the Job Manager specified your correct IPv4 address:

Modify the FLINK conf file to bind to 0.0.0.0 not the actual IP address, for some reason this triggers it to try to connect to loopback (127.0.0.1) which is not allowed on my production server. You can check this by trying to telnet into the server on the Blobserver port which appears in your logs ( in example screenshot it’s 45733). Telnet will be rejected if there is a blocker somewhere (like firewall, iptables etc).

jobmanager.bind-host: 0.0.0.0

Set server-time-zone in  mysql-to-doris.yaml to: Pacific/Auckland if you are in New Zealand. NZST or NZDT is not accepted.

MySQL JAR file
mysql-connector-java-<version>.jar should be placed into $FLINK_HOME/lib

Using the MySQL community edition, add % (wildcard) to hosts which allows any host to connect to the MySQL DB, as otherwise only localhost is allowed to connect. You can check this using : SELECT host FROM mysql.user;

CREATE USER 'rep-user'@'%' IDENTIFIED BY '<your_pwd>';
GRANT ALL PRIVILEGES ON *.* TO 'rep-user'@'%';

SELECT host FROM mysql.user WHERE User = 'rep-user';

Test Application WordCount.jar

An easy way to test if your Flink setup works outside of the MySQL replication scenario, is to run the WordCount test app.

./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data

Leave a Reply

Your email address will not be published. Required fields are marked *