DataX是阿里巴巴开发的用于离线数据同步的工具,它支持在MySQL、Oracle、SqlServer、HDFS、HBase等多个数据库之间进行数据的离线同步。

安装DataX

我们可以直接下载已经打包好的文件

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

当然,我们也可以选择从源码编译安装DataX。由于上面的包已经比较旧了,推荐从源码进行安装。

git clone https://github.com/alibaba/DataX.git

因为我们只需要针对一些指定的数据库,所以可以删除pom.xml文件中我们不需要使用的数据库子模块。我保留的子模块如下

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- reader -->
<module>mysqlreader</module>
<module>oraclereader</module>
<module>txtfilereader</module>
<module>streamreader</module>
<module>rdbmsreader</module>
<!-- writer -->
<module>mysqlwriter</module>
<module>tdenginewriter</module>
<module>txtfilewriter</module>
<module>streamwriter</module>
<module>rdbmswriter</module>
<module>elasticsearchwriter</module>

同时,我在使用中遇到了一个MySQL连接的问题,经确认应该是mysql驱动的问题。所以还需要修改pom.xml中的MySQL驱动的版本<mysql.driver.version>5.1.34</mysql.driver.version>。经过上面两步修改之后就可以执行编译命令了

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

执行结果如下

...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:42 min
[INFO] Finished at: 2021-06-09T14:32:08+08:00
[INFO] ------------------------------------------------------------------------

编译生成的包文件位于target/datax.tar.gz。解压工具包

tar -zxvf datax.tar.gz
cd datax

至此DataX就安装好了,它的目录如下

➜  datax tree -L 1
.
├── bin
├── conf
├── job
├── lib
├── plugin
├── script
└── tmp

7 directories, 0 files

读取MySQL数据并打印

接下来我们创建一个从MySQL读取数据并且把数据打印出来的任务,创建文件job/mysql_print.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"job": {
"setting": {
"speed": {"channel": 3},
"errorLimit": {"record": 0, "percentage": 0.02}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1234",
"column": ["id", "name", "update_time", "message"],
"connection": [{
"table": ["script_version"],
"jdbcUrl": ["jdbc:mysql://172.19.34.19:3306/bugatti"]
}]}
},
"writer": {
"name": "streamwriter",
"parameter": {"print": true}
}
}]
}}

创建好任务文件之后就可以执行同步命令了

➜  datax python bin/datax.py job/mysql_print.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
...
14    dev    2021-10-09 14:54:46    f9d428b45e882506ce45fb77dbb8e96611554129
15    master    2021-10-09 14:55:41    f9d428b45e882506ce45fb77dbb8e96611554129
20    revert-a8a85b0e    2021-08-03 09:40:09    04b81ff4fbf976328d59d105681b8c05fa4f04b5
22    默认    2021-06-09 10:48:33    04b81ff4fbf976328d59d105681b8c05fa4f04b5
...
2021-06-09 14:39:36.845 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-06-09 14:39:25
任务结束时刻                    : 2021-06-09 14:39:36
任务总计耗时                    :                 10s
任务平均流量                    :               22B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   4
读写失败总数                    :                   0

如上所示,我们已经成功的把MySQL中的4条数据查询并打印出来了。

从MySQL读取数据并写入Elasticsearch

我们再创建一个类似于上面的同步任务,只是这一次我们不打印数据而是把数据同步到Elasticsearch中去。创建job/mysql_2_es.json文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{"job": {
"setting": {"speed": {"channel": 3}, "errorLimit": {"record": 0,"percentage": 0.02}},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root", "password": "1234", "column": ["id", "name", "update_time", "message"],
"connection": [{
"table": ["script_version"],
"jdbcUrl": ["jdbc:mysql://172.19.34.19:3306/bugatti"]}]}},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://172.19.40.171:9200",
"accessId": "1",
"accessKey": "1",
"index": "script-version",
"type": "_doc",
"settings": {"index": {"number_of_shards": 3, "number_of_replicas": 1}}, "batchSize": 2,
"column": [
{"name": "id", "type": "id"},
{"name": "name", "type": "keyword"},
{"name": "update_time", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{"name": "message", "type": "keyword"}]}}}]}}

执行结果如下

➜  datax python bin/datax.py job/mysql_2_es.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2021-06-09 14:51:45.622 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-06-09 14:51:45.632 [main] INFO  Engine - the machine info  =>

    osInfo:    Oracle Corporation 1.8 25.321-b07
    jvmInfo:    Mac OS X x86_64 10.15.7
    cpu num:    4

    totalPhysicalMemory:    -0.00G
    freePhysicalMemory:    -0.00G
    maxFileDescriptorCount:    -1
    currentOpenFileDescriptorCount:    -1

    GC Names    [PS MarkSweep, PS Scavenge]

    MEMORY_NAME                    | allocation_size                | init_size
    PS Eden Space                  | 256.00MB                       | 256.00MB
    Code Cache                     | 240.00MB                       | 2.44MB
    Compressed Class Space         | 1,024.00MB                     | 0.00MB
    PS Survivor Space              | 42.50MB                        | 42.50MB
    PS Old Gen                     | 683.00MB                       | 683.00MB
    Metaspace                      | -0.00MB                        | 0.00MB


2021-06-09 14:51:45.658 [main] INFO  Engine -
{
    "content":[
        {
            "reader":{
                "name":"mysqlreader",
                "parameter":{
                    "column":[
                        "id",
                        "name",
                        "update_time",
                        "message"
                    ],
                    "connection":[
                        {
                            "jdbcUrl":[
                                "jdbc:mysql://172.19.34.19:3306/bugatti"
                            ],
                            "table":[
                                "script_version"
                            ]
                        }
                    ],
                    "password":"****",
                    "username":"root"
                }
            },
            "writer":{
                "name":"elasticsearchwriter",
                "parameter":{
                    "accessId":"1",
                    "accessKey":"*",
                    "batchSize":2,
                    "column":[
                        {
                            "name":"id",
                            "type":"id"
                        },
                        {
                            "name":"name",
                            "type":"keyword"
                        },
                        {
                            "format":"yyyy-MM-dd HH:mm:ss",
                            "name":"update_time",
                            "type":"date"
                        },
                        {
                            "name":"message",
                            "type":"keyword"
                        }
                    ],
                    "endpoint":"http://172.19.40.171:9200",
                    "index":"script-version",
                    "settings":{
                        "index":{
                            "number_of_replicas":1,
                            "number_of_shards":3
                        }
                    },
                    "type":"_doc"
                }
            }
        }
    ],
    "setting":{
        "errorLimit":{
            "percentage":0.02,
            "record":0
        },
        "speed":{
            "channel":3
        }
    }
}

2021-06-09 14:51:45.688 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2021-06-09 14:51:45.690 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-06-09 14:51:45.690 [main] INFO  JobContainer - DataX jobContainer starts job.
2021-06-09 14:51:45.693 [main] INFO  JobContainer - Set jobId = 0
2021-06-09 14:51:46.257 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2021-06-09 14:51:46.334 [job-0] INFO  OriginalConfPretreatmentUtil - table:[script_version] has columns:[id,name,update_time,message].
2021-06-09 14:51:46.345 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2021-06-09 14:51:46.347 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2021-06-09 14:51:46.347 [job-0] INFO  JobContainer - DataX Writer.Job [elasticsearchwriter] do prepare work .
2021-06-09 14:51:47.117 [job-0] INFO  AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]
2021-06-09 14:51:47.119 [job-0] INFO  JestClientFactory - Using single thread/connection supporting basic connection manager
2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Using default GSON instance
2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Node Discovery disabled...
2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Idle connection reaping disabled...
2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Authentication cache set for preemptive authentication
2021-06-09 14:51:47.245 [job-0] INFO  ESWriter$Job - [{"name":"id","type":"id"},{"name":"name","type":"keyword"},{"format":"yyyy-MM-dd HH:mm:ss","name":"update_time","type":"date"},{"name":"message","type":"keyword"}]
2021-06-09 14:51:47.255 [job-0] INFO  ESWriter$Job - index:[script-version], type:[_doc], mappings:[{"_doc":{"properties":{"message":{"type":"keyword"},"name":{"type":"keyword"},"update_time":{"type":"date"}}}}]
2021-06-09 14:51:47.458 [job-0] INFO  ESClient - create index script-version
2021-06-09 14:51:47.488 [job-0] INFO  ESClient - index [script-version] already exists
2021-06-09 14:51:47.489 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2021-06-09 14:51:47.490 [job-0] INFO  JobContainer - Job set Channel-Number to 3 channels.
2021-06-09 14:51:47.503 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2021-06-09 14:51:47.504 [job-0] INFO  JobContainer - DataX Writer.Job [elasticsearchwriter] splits to [1] tasks.
2021-06-09 14:51:47.527 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2021-06-09 14:51:47.536 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2021-06-09 14:51:47.538 [job-0] INFO  JobContainer - Running by standalone Mode.
2021-06-09 14:51:47.549 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-06-09 14:51:47.555 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-06-09 14:51:47.555 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2021-06-09 14:51:47.674 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2021-06-09 14:51:47.687 [0-0-0-writer] INFO  AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]
2021-06-09 14:51:47.687 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,update_time,message from script_version
] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2021-06-09 14:51:47.687 [0-0-0-writer] INFO  JestClientFactory - Using multi thread/connection supporting pooling connection manager
2021-06-09 14:51:47.709 [0-0-0-writer] INFO  JestClientFactory - Using default GSON instance
2021-06-09 14:51:47.710 [0-0-0-writer] INFO  JestClientFactory - Node Discovery disabled...
2021-06-09 14:51:47.710 [0-0-0-writer] INFO  JestClientFactory - Idle connection reaping disabled...
2021-06-09 14:51:47.711 [0-0-0-writer] INFO  JestClientFactory - Authentication cache set for preemptive authentication
2021-06-09 14:51:47.775 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,update_time,message from script_version
] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2021-06-09 14:51:48.369 [0-0-0-writer] INFO  ESWriter$Job - task end, write size :4
2021-06-09 14:51:48.388 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[714]ms
2021-06-09 14:51:48.388 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2021-06-09 14:51:57.722 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.074s | Percentage 100.00%
2021-06-09 14:51:57.723 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-06-09 14:51:57.724 [job-0] INFO  JobContainer - DataX Writer.Job [elasticsearchwriter] do post work.
2021-06-09 14:51:57.725 [job-0] INFO  AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]
2021-06-09 14:51:57.725 [job-0] INFO  JestClientFactory - Using single thread/connection supporting basic connection manager
2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Using default GSON instance
2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Node Discovery disabled...
2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Idle connection reaping disabled...
2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Authentication cache set for preemptive authentication
2021-06-09 14:51:57.728 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.
2021-06-09 14:51:57.728 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2021-06-09 14:51:57.729 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /Users/hourui/Downloads/111111/datax/hook
2021-06-09 14:51:57.735 [job-0] INFO  JobContainer -
    [total cpu info] =>
        averageCpu                     | maxDeltaCpu                    | minDeltaCpu
        -1.00%                         | -1.00%                         | -1.00%


    [total gc info] =>
        NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
        PS MarkSweep         | 1                  | 1                  | 1                  | 0.060s             | 0.060s             | 0.060s
        PS Scavenge          | 1                  | 1                  | 1                  | 0.031s             | 0.031s             | 0.031s

2021-06-09 14:51:57.735 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-06-09 14:51:57.736 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.074s | Percentage 100.00%
2021-06-09 14:51:57.737 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-06-09 14:51:45
任务结束时刻                    : 2021-06-09 14:51:57
任务总计耗时                    :                 12s
任务平均流量                    :               22B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   4
读写失败总数                    :                   0

上面的脚本执行完毕之后我们就可以在Elasticsearch中查询到相关的数据了

curl http://172.19.40.171:9200/script-version/_search

查询结果如下

1
2
3
4
5
{"took":3,"timed_out":false,"_shards":{"total":2,"successful":2,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":1.0,"hits":[
{"_index":"script-version","_type":"_doc","_id":"22","_score":1.0,"_source":{"update_time":"2021-06-09T10:48:33.000+08:00","name":"默认","message":"04b81ff4fbf976328d59d105681b8c05fa4f04b5"}},
{"_index":"script-version","_type":"_doc","_id":"14","_score":1.0,"_source":{"update_time":"2021-10-09T14:54:46.000+08:00","name":"dev","message":"f9d428b45e882506ce45fb77dbb8e96611554129"}},
{"_index":"script-version","_type":"_doc","_id":"15","_score":1.0,"_source":{"update_time":"2021-10-09T14:55:41.000+08:00","name":"master","message":"f9d428b45e882506ce45fb77dbb8e96611554129"}},
{"_index":"script-version","_type":"_doc","_id":"20","_score":1.0,"_source":{"update_time":"2021-08-03T09:40:09.000+08:00","name":"revert-a8a85b0e","message":"04b81ff4fbf976328d59d105681b8c05fa4f04b5"}}]}}