Blog

Replicating data into Clickhouse

Clickhouse is a relatively new analytics and datawarehouse engine that provides for very quick insertion and analysing of data. Like most analytics platforms it's built on a column-oriented storage basis and unlike many alternatives is completely open source. It's also exceedingly fast, even on relatively modest platforms.

Clickhouse does have some differences from some other environments, for example, data inserted cannot easily be updated, and it supports a number of different storage and table engine formats that are used to store and index the information. So how do we get into that from our MySQL transactional store?

Well, you can do dumps and loads, or you could use Tungsten Replicator to do that for you. The techniques I'm going to describe here are not in an active release, but use the same principles as other part of our data loading.

We're going to use the CSV-based batch loading system that is employed by our Hadoop, Vertica and Amazon Redshift appliers to get the data in. Ordinarily we would run a materialization step that would merge and update the data from the staging tables, which import the raw change information and turn that into 'base' or carbon copy tables. We can't do that with Clickhouse as the data cannot be modified once imported, but we can still use the information that gets imported.

If you are familiar with the way we load data in this method, you will know that we import information using a CSV file and each row of the file is either an INSERT or DELETE, with an UPDATE operation being simulated by a DELETE followed by an INSERT. All rows are also tagged with date, time, and transaction ID information, we can always identify the latest update.

Finally, one other thing to note about the Clickhouse environment, and that's the data types are defined slightly differently. In most databases we are familiar with INT, or LONG or VARCHAR. Within Clickhouse the datatypes you use within the database for table fields more closely match the types in C, so Int32 or Int64. That means creating a simple table uses a definition like this:

CREATE TABLE sales.stage_xxx_msg ( tungsten_opcode String, tungsten_seqno Int32, tungsten_row_id Int32, tungsten_commit_timestamp String, id Int32, msg String ) ENGINE = Log;

You can also see we dont have a timestamp datatype, or CHAR/VARCHAR, just String.

With all that in mind, let's try loading some data into Clickhouse using Tungsten Replicator!

First, a basic MySQL extraction recipe:

tools/tpm configure alpha \
--disable-relay-logs=true \
--enable-heterogeneous-service=true \
--install-directory=/opt/continuent \
--master=ubuntuheterosrc \
--mysql-allow-intensive-checks=true \
--replication-password=Tamsin \
--replication-user=root \
--skip-validation-check=MySQLMyISAMCheck

We're going to use a fairly standard replicator install, extracting from a basic MySQL 5.7 server and insert the change data into Clickhouse.

For the Clickhouse side, we'll use the batch applier with a different, custom, template:

tools/tpm configure alpha \
--batch-enabled=true \
--batch-load-template=clickhouse \
--datasource-mysql-conf=/dev/null \
--datasource-type=file \
--install-directory=/opt/continuent \
--master=ubuntuheterosrc \
--members=clickhouse2 \
--property=replicator.datasource.global.csvType=vertica \
--replication-password=password \
--replication-port=8123 \
--replication-user=tungsten \
--skip-validation-check=InstallerMasterSlaveCheck \
--start-and-report=true

That's it! We make one other change from other installations, in that because we cannot update information in Clickhouse, rather than using Clickhouse to store the Replicator status information, we'll use the File datasource type, which stores the information within a file on the local filesystem.

To generate this information I'll generate about 18,000 transactions of data which is a mixture of INSERT, DELETE and UPDATE operations, we'll load this into MySQL in tandem across 20 threads.

Let's run the load and check clickhouse:

clickhouse2 :) select * from stage_xxx_msg limit 10;
 
SELECT *
FROM stage_xxx_msg 
LIMIT 10
 
┌─tungsten_opcode─┬─tungsten_seqno─┬─tungsten_row_id─┬─tungsten_commit_timestamp─┬─id─┬─msg──────────────────┐
│ I               │             15 │               1 │ 2018-12-12 09:48:17.000   │  9 │ 4qwciTQiKdSrZKCwflf1 │
│ I               │             16 │               2 │ 2018-12-12 09:48:17.000   │ 10 │ Qorw8T10xLwt7R0h7PsD │
│ I               │             17 │               3 │ 2018-12-12 09:48:17.000   │ 11 │ hx2QIasJGShory3Xv907 │
│ I               │             19 │               1 │ 2018-12-12 09:48:17.000   │ 12 │ oMxnT7RhLWpvQSGYtE6V │
│ I               │             20 │               2 │ 2018-12-12 09:48:17.000   │ 13 │ fEuDvFWyanb1bV9Hq8iM │
│ I               │             23 │               1 │ 2018-12-12 09:48:17.000   │ 14 │ oLVGsNjMPfWcxnRMkpKI │
│ I               │             25 │               2 │ 2018-12-12 09:48:17.000   │ 15 │ w3rYUrzxXjb3o9iTHtnS │
│ I               │             27 │               3 │ 2018-12-12 09:48:17.000   │ 16 │ aDFjRpTOK6ruj3JaX2Na │
│ I               │             30 │               4 │ 2018-12-12 09:48:17.000   │ 17 │ SXDxPemQ5YI33iT1MVoZ │
│ I               │             32 │               5 │ 2018-12-12 09:48:17.000   │ 18 │ 8Ta8C0fjIMRYEfVZBZjE │
└─────────────────┴────────────────┴─────────────────┴───────────────────────────┴────┴──────────────────────┘
 
10 rows in set. Elapsed: 0.005 sec.

Analysing the overall times, I processed 358,980 transactions through MySQL and into Clickhouse using relatively modest virtual machines on my laptop and it took 538 seconds. That's about 670 transactions a second. Bear in mind we're comitting every 100 rows here, larger commit intervals would probably be quicker overall. This is using the default settings, and I know from past testing and imports that I can go much faster.

I'd count that as a success!

Bear in mind we're also writing to separate databases and tables here, but with the adddbname filter and the modified applier we can insert all of that data into a single table so that if you are concentrating data into a single database/table combination you can do this in one step with Tungsten Replicator.

As I said before, Clickhouse is not currently a supported target for the Replicator, but if you are interested please get in touch!

About the Author

MC Brown
Former VP of Products

MC Brown is a professional writer and technologist for over 25 years, an author and contributor to over 26 books covering a wide array of topics, and technical and architectural advisor on databases, cloud and grid computing, and operating system development. During his time he has worked for and with Sun, MySQL, Oracle, Couchbase, VMware, Microsoft and IBM, and written for O'Reilly, IBM, Computerworld, IBM developerWorks and many others.

Comments

Thanks for the informative article, btw clickhouse now also offer mysql function, you can easily import all data direct from mysql using mysql() in clickhouse.

Hi, Thank you for your comment! Yes, the mysql() function is a great way to read the data from a remote mysql database, and also a great way to do a CREATE TABLE … AS SELECT, and in fact this would be a recommended method for seeding your Clickhouse environment. What Tungsten Replicator adds to this is the ability to then have real-time replication in place, so that your newly created table can be kept up to date with changes!

Add new comment