Canal sync data to elasticsearch by Mysql binlog

Server environment

1
2
3
4
5
6
7
8
9
10
$ cat /etc/issue
Ubuntu 14.04.5 LTS

$ java -version
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)

$ mysqld --version
mysqld Ver 5.7.20 for Linux on x86_64 (MySQL Community Server (GPL))

My computer develop environment

1
2
3
4
5
6
7
8
9
10
11
$ sw_vers 
ProductName: Mac OS X
ProductVersion: 10.12.6
BuildVersion: 16G29

$ brew -v
Homebrew 1.3.0
Homebrew/homebrew-core (git revision 83c2; last commit 2017-08-03)

$ mvn -v
Apache Maven 3.5.2

Install Java8+

1
2
3
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer

or Java9

1
$ sudo apt-get install oracle-java9-installer

Set Mysql row binlog format

1
$ sudo vim /etc/mysql/mysql.conf.d/mysqld.cnf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
server_id = 1

log-bin = /var/lib/mysql/binlog
log_bin_trust_function_creators=1
binlog_format = ROW
expire_logs_days = 99
sync_binlog = 0

slow-query-log=1
slow-query-log-file=/var/log/mysql/slow-queries.log
long_query_time = 10
log-queries-not-using-indexes
binlog-row-image=full

Create Canal user in Mysql

1
2
3
mysql> CREATE USER canal IDENTIFIED BY 'canal';    
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES;

or you can give all power

1
2
3
mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
mysql> FLUSH PRIVILEGES;

Install Canal

1
2
3
4
5
6
7
8
9
10
$ wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

$ tar zxvf canal.deployer-1.0.24.tar.gz -C canal

$ tree canal -L 1
canal
├── bin
├── conf
├── lib
└── logs

Update config instance.properties

1
2
$ cd canal
$ vim conf/example/instance.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = app
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =

#################################################

Start Canal

1
$ ./bin/startup.sh

Check Canal running status in log file

1
2
3
4
5
6
$ tail -f logs/canal/canal.log 

...
2017-10-01 15:02:24.591 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2017-10-01 15:02:24.720 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.2.15:11111]
2017-10-01 15:02:25.541 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Ok, Canal server has been started!
And now we need the Canal client to sync data to elasticsearch

Install Maven

My development machine is Mac, so i will install Maven by brew

1
2
3
4
5
6
7
8
9
10
11
12
13
$ brew install maven

$ brew info maven
maven: stable 3.5.2
Java-based project management
https://maven.apache.org/
Conflicts with:
mvnvm (because also installs a 'mvn' executable)
/usr/local/Cellar/maven/3.5.2 (104 files, 10.1MB) *
Built from source on 2017-10-01 at 23:45:23
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/maven.rb
==> Requirements
Required: java >= 1.7 ✔

Add maven image repository address

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cd /usr/local/Cellar/maven/3.5.2
$ vim libexec/conf/settings.xml

<mirrors>
...
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
...

Create a Maven(Java) project

1
$ mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.client

After some information confirm, a empty maven project was create.

1
2
3
4
5
6
$ tree canal.client/ -L 1
canal.client/
├── pom.xml
└── src

1 directory, 1 file

Add com.alibaba.otter dependence

1
$ cd canal.client/; vim pom.xml
1
2
3
4
5
6
7
8
<dependencies>
...
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.22</version>
</dependency>
</dependencies>

Full pom.xml config file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.alibaba.otter</groupId>
<artifactId>canal.sample</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>canal.sample</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<!-- base on your java version -->
<maven.compiler.source>1.9</maven.compiler.source>
<maven.compiler.target>1.9</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<!-- canal client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.22</version>
</dependency>

<!-- elasticsearch client -->
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>2.0.0</version>
</dependency>

</dependencies>

<!-- build executable jar file plugin -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.alibaba.otter.App</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

Init the Maven project

1
$ mvn install

Add the code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package com.alibaba.otter;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Delete;
import io.searchbox.core.Index;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Hello world!
*/
public class App {

public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();

while (true) {

Message message = connector.getWithoutAck(batchSize); // get specify size
long batchId = message.getId();
int size = message.getEntries().size();

if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}


} else {
translateDatas(message.getEntries());
}

connector.ack(batchId); // commit confirm
// connector.rollback(batchId); // rollback if something gets errors
}
} finally {
connector.disconnect();
}
}

private static void translateDatas(List<CanalEntry.Entry> entrys) {

for (CanalEntry.Entry entry : entrys) {

if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}

CanalEntry.RowChange rowChange = null;

try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

CanalEntry.EventType eventType = rowChange.getEventType();
String dbName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();

System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType));

JestClient ESClient = ESClient();

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {

try {

if (eventType == CanalEntry.EventType.DELETE) {

for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {

if (column.getName().toLowerCase().equals("id")) {

ESClient.execute(new Delete.Builder(column.getValue())
.index(dbName)
.type(tableName)
.build());
}
}


// } else if (eventType == CanalEntry.EventType.INSERT) {

} else {

Map<String, String> mapping = columnToMapping(rowData.getAfterColumnsList());

Index index = new Index.Builder(mapping).index(dbName).type(tableName).id(mapping.get("id")).build();
ESClient.execute(index);

System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");

}

} catch (IOException e) {
e.printStackTrace();
}

}
}
}

private static Map<String, String> columnToMapping(List<CanalEntry.Column> afterColumnsList) {
Map<String, String> data = new LinkedHashMap<String, String>();

for (CanalEntry.Column column : afterColumnsList) {
data.put(column.getName(), column.getValue());
}
return data;
}


private static JestClient ESClient;

private static JestClient ESClient() {

if (ESClient == null) {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig
.Builder("http://localhost:9200")
.build());

ESClient = factory.getObject();
}
return ESClient;
}


private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

全量同步

http://shzhangji.com/cnblogs/2017/08/13/extract-data-from-mysql-with-binlog-and-canal/

Share