ES跨集群数据迁移

根据业务需求,存在以下场景:

  • 迁移过程中,旧的集群可以暂时停止服务或者暂停写入,数据全部迁移到新的集群中后,业务切换到新的集群进行读取和写入
  • 迁移过程中,旧集群不能停止写入,业务不能停服

如果是第一种场景,数据迁移过程中可以停止写入,可以采用诸如elasticsearch-dump、logstash、reindex、snapshot等方式进行数据迁移。实际上这几种工具大体上可以分为两类:

  • scroll query + bulk: 批量读取旧集群的数据然后再批量写入新集群,elasticsearch-dump、logstash、reindex都是采用这种方式
  • snapshot: 直接把旧集群的底层的文件进行备份,在新的集群中恢复出来,相比较scroll query + bulk的方式,snapshot的方式迁移速度最快。

如果是第二种场景,数据迁移过程中旧集群不能停止写入,需要根据实际的业务场景解决数据一致性的问题:

  • 如果业务不是直接写ES, 而是把数据写入到了中间件,比如业务->kafka->logstash->es的架构,此时可以直接采用双写的策略,旧集群不停止读写,新的集群也直接写入,然后迁移旧集群的数据到新集群中去,等数据追平之后,新的集群再提供读服务;
  • 如果业务是直接写ES, 并且会进行删除doc操作;此时可以使用ES官方在6.5版本之后的CCR(跨集群复制)功能,把旧集群作为Leader, 新集群作为Follower, 旧集群不停止读写,新集群从旧集群中follow新写入的数据;另一方面使用第三方工具把存量的旧集群中的数据迁移到新集群中,存量数据迁移完毕后,业务再切换到新的集群进行读写。

1.1 ES数据离线迁移

可以根据自己的业务需要选择适合自己的迁移方案。如果业务可以停服或者可以暂停写操作,可以参考离线迁移的四种方案。

离线迁移需要先停止老集群的写操作,将数据迁移完毕后在新集群上进行读写操作。适合于业务可以停服的场景。

离线迁移大概有以下几种方式:

  • elasticsearch-dump
  • snapshot
  • reindex
  • logstash

1.1.1 elasticsearch-dump

适用场景

适合数据量不大,迁移索引个数不多的场景

使用方式

elasticsearch-dump是一款开源的ES数据迁移工具,github地址: https://github.com/taskrabbit/elasticsearch-dump

1 . 通过docker运行一个elasticdump容器,如果没有安装docker去百度找安装文档。

1
$ docker run -d --name dump taskrabbit/elasticsearch-dump:latest sh sleep 9999999999

进入到elasticsearch-dump容器内

1
$ docker exec -it dump sh
es到es迁移单个索引

以下操作通过elasticdump命令将集群172.16.0.39中的companydatabase索引迁移至集群172.16.0.20。注意第一条命令先将索引的settings先迁移,如果直接迁移mapping或者data将失去原有集群中索引的配置信息如分片数量和副本数量等,当然也可以直接在目标集群中将索引创建完毕后再同步mapping与data

1
2
3
$ elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=settings
$ elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=mapping
$ elasticdump --input=http://172.16.0.39:9200/companydatabase --output=http://172.16.0.20:9200/companydatabase --type=data --limit=2000
es-es迁移所有索引

以下操作通过elasticdump命令将将集群172.16.0.39中的所有索引迁移至集群172.16.0.20。 注意此操作并不能迁移索引的配置如分片数量和副本数量,必须对每个索引单独进行配置的迁移,或者直接在目标集群中将索引创建完毕后再迁移数据

1
$ elasticdump --input=http://172.16.0.39:9200 --output=http://172.16.0.20:9200 --limit=2000
导出索引到文件中

将目标es集群数据导出到本地文件,然后通过本地文件导入到目标集群

1
2
3
4
5
6
7
# 下载到本地
$ elasticdump --input=http://192.168.203.25:32221 --output=./32221_setting.json --type=settings --all=true
$ elasticdump --input=http://192.168.203.25:32221 --output=./32221_mapping.json --type=mapping --all=true
$ elasticdump --input=http://192.168.203.25:32221 --output=./32221_analyzer.json --type=analyzer --all=true

# 恢复本地文件到服务器
$ elasticdump --input=./9_175_setting.json --output=http://192.168.203.25:32222 --type=settings
导出索引到目录过滤包含索引

导出192.168.203.25:32221服务器中以risk.开头的索引,包含的导出类型为mapping,settings,analyzer

1
2
3
4
5
6
$ multielasticdump \
--direction=dump \
--match='^risk.*'\
--input=http://192.168.203.25:32221 \
--includeType='mapping,settings,analyzer' \
--output=/tmp/es_backup

恢复/tmp/es_backup目录下的以risk.开头的所有索引,包含的恢复类型为mapping,settings,analyzer

1
2
3
4
5
6
$ multielasticdump \
--direction=load \
--match='^risk.*'\
--input=/tmp/es_backup \
--includeType='mapping,settings,analyzer' \
--output=http://192.168.203.25:32222

主要参数说明

  • –input: 源地址,可为ES集群URL、文件或stdin,可指定索引,格式为:{protocol}://{host}:{port}/{index}
  • –input-index: 源ES集群中的索引
  • –output: 目标地址,可为ES集群地址URL、文件或stdout,可指定索引,格式为:{protocol}://{host}:{port}/{index}
  • –output-index: 目标ES集群的索引
  • –type: 迁移类型,默认为data,表明只迁移数据,可选settings, analyzer, data, mapping, alias
  • –limit:每次向目标ES集群写入数据的条数,默认100,不可设置的过大,以免bulk队列写满
  • –offset: 设置迁移数据的偏移量

1.1.2 snapshot

适用场景

适用数据量大的场景

snapshot api是Elasticsearch用于对数据进行备份和恢复的一组api接口,可以通过snapshot api进行跨集群的数据迁移,原理就是从源ES集群创建数据快照,然后在目标ES集群中进行恢复。需要注意ES的版本问题:

目标ES集群的主版本号(如5.6.4中的5为主版本号)要大于等于源ES集群的主版本号;
1.x版本的集群创建的快照不能在5.x版本中恢复;

  1. 源ES集群中创建repository

创建快照前必须先创建repository仓库,一个repository仓库可以包含多份快照文件,repository主要有一下几种类型

  • fs: 共享文件系统,将快照文件存放于文件系统中
  • url: 指定文件系统的URL路径,支持协议:http,https,ftp,file,jar
  • s3: AWS S3对象存储,快照存放于S3中,以插件形式支持
  • hdfs: 快照存放于hdfs中,以插件形式支持
  • cos: 快照存放于腾讯云COS对象存储中,以插件形式支持

如果需要从自建ES集群迁移至腾讯云的ES集群,可以直接使用fs类型仓库,注意需要在Elasticsearch配置文件elasticsearch.yml设置仓库路径:

1
path.repo: ["/tmp/data"]

之后调用snapshot api创建repository:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
		curl -XPUT "http://192.168.203.25:32222/_snapshot/my_backup" -H 	'Content-Type: application/json' -d '{
"type": "fs",
"settings": {
"location": "/tmp/data",
"compress": true
}
}'

curl -X PUT "localhost:9200/_snapshot/my_backup" -H 'Content-Type: application/json' -d'
{
"type": "fs",
"settings": {
"location": "my_backup_location"
}
}
'

如果需要从其它云厂商的ES集群迁移至ES集群,或者阿里云内部的ES集群迁移,可以使用对应云厂商他提供的仓库类型,如AWS的S3, 阿里云的OSS,腾讯云的COS

1
2
3
4
5
6
7
8
curl -XPUT http://172.16.0.39:9200/_snapshot/my_s3_repository
{
"type": "s3",
"settings": {
"bucket": "my_bucket_name",
"region": "us-west"
}
}

2 . 源ES集群中创建snapshot

调用snapshot api在创建好的仓库中创建快照

1
curl -XPUT http://172.16.0.39:9200/_snapshot/my_backup/snapshot_1?wait_for_completion=true

创建快照可以指定索引,也可以指定快照中包含哪些内容,具体的api接口参数可以查阅官方文档

3 . 目标ES集群中创建repository

目标ES集群中创建仓库和在源ES集群中创建仓库类似,用户可在腾讯云上创建COS对象bucket, 将仓库将在COS的某个bucket下。

4 . 移动源ES集群snapshot至目标ES集群的仓库

把源ES集群创建好的snapshot上传至目标ES集群创建好的仓库中

5 . 从快照恢复

1
curl -XPUT http://172.16.0.20:9200/_snapshot/my_backup/snapshot_1/_restore

6 . 查看快照恢复状态

1
curl http://172.16.0.20:9200/_snapshot/_status

(4)查看快照状态

查看仓库列表

1
curl -XGET 'http://10.10.10.1:9200/_snapshot/_all?pretty=true'

查看指定仓库下的所有快照状态

1
curl -XGET 'http://10.10.10.1:9200/_snapshot/elk_backup/_all?pretty=true'

查看指定仓库下指定快照状态

1
curl -XGET 'http://10.10.10.1:9200/_snapshot/elk_backup/elkdata/_all?pretty=true'

查看指定仓库下快照执行状态

1
curl -XGET 'http://10.10.10.1:9200/_snapshot/elk_backup/_status?pretty=true'

1.1.3 reindex

reindex是Elasticsearch提供的一个api接口,可以把数据从源ES集群导入到当前的ES集群,同样实现了数据的迁移

1 . 配置reindex.remote.whitelist参数,需要在目标ES集群中配置该参数,指明能够reindex的远程集群的白名单

1
2
3
4
5
6
7
# 修改elasticsearch.yml
#reindex.remote.whitelist: ["ip:9200","ip2:9201"] 迁移数据白名单
reindex.remote.whitelist: ["localhost:9200"]

#跨域问题
http.cors.enabled: true
http.cors.allow-origin: "*"

2 . 调用reindex api

以下操作表示从源ES集群中查询名为test1的索引,查询条件为title字段为elasticsearch,将结果写入当前集群的test2索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -XPOST "http://192.168.203.25:32223/_reindex" -H 'Content-Type:application/json'  -d '
{
"conflicts": "proceed",
"source": {
"remote": {
"host": "http://192.168.203.25:32222/"
},
"index": "companynew",
"query": {
"match_all": {}
},
"size": 6000
},
"dest": {
"index": "companynew"
}
}'

1.1.4 logstash方式

logstash支持从一个ES集群中读取数据然后写入到另一个ES集群,因此可以使用logstash进行数据迁移,具体的配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@es-test logstash]# cat logstash.conf
input {
elasticsearch {
# 源es集群地址
hosts => ["http://192.168.21.4:31111"]
# *表示迁移所有索引,-代表排除.monitoring、.security和.kibana的索引
index => "*,-.monitoring*,-.security*,-.kibana*"
docinfo => true
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version"] # 过滤掉logstash自动添加的字段
}
}
output {
elasticsearch {
hosts => ["http://192.168.21.4:32222"] # 目标es集群地址
index => "%{[@metadata][_index]}" # 和来源索引名称相同
document_type => "%{[@metadata][_type]}" # 和来源类型相同
document_id => "%{[@metadata][_id]}" # 和来源id相同
}
}
1
[root@es-test logstash]# docker run  --rm --name logstash -v /root/es/elasticsearch/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf docker.elastic.co/logstash/logstash:7.13.2

上述配置文件将源ES集群的所有索引同步到目标集群中,当然可以设置只同步指定的索引,logstash的更多功能可查阅logstash官方文档

总结

  1. elasticsearch-dump和logstash做跨集群数据迁移时,都要求用于执行迁移任务的机器可以同时访问到两个集群,不然网络无法连通的情况下就无法实现迁移。而使用snapshot的方式没有这个限制,因为snapshot方式是完全离线的。因此elasticsearch-dump和logstash迁移方式更适合于源ES集群和目标ES集群处于同一网络的情况下进行迁移,而需要跨云厂商的迁移,比如从阿里云ES集群迁移至腾讯云ES集群,可以选择使用snapshot的方式进行迁移,当然也可以通过打通网络实现集群互通,但是成本较高。
  2. elasticsearchdump工具和mysql数据库用于做数据备份的工具mysqldump工具类似,都是逻辑备份,需要将数据一条一条导出后再执行导入,所以适合数据量小的场景下进行迁移;
  3. snapshot的方式适合数据量大的场景下进行迁移。

1.2 ES跨集群CCR数据实时同步

CCR - Cross Cluster Replication - 跨集群复制是 Elasticsearch v6.5 发布的一个新的特性,这个特性可以让你将一个集群的索引数据同步复制到远程的另外一个集群上面去。或者反过来,将一个远程的集群的索引数据同步的复制到本地 Elasticsearch 集群中来。集群复制类似于数据订阅的方式,一个集群的数据可以被多个集群订阅,也就是可以被复制到多个集群上面去。CCR 有两个角色,一个是 Leader,表示数据的源头,另外一个Follower,表示数据的订阅方,得到的是数据副本。CCR 工作在索引层面,使用 Pull 的模式,Follower 索引主动的去 Pull Leader 的数据。

CCR在Elasticsearch 6.7和7.0版本正式发布GA版本.

这个特性是 Elasticsearch 的商业特性,需要白金订阅。

很多公司的业务可能已经是遍布整个国家,甚至全球化。借助 CCR 刚好就可以解决下面的几个场景的问题:

  • 集群高可用以及灾难恢复,数据迁移。
  • 实现数据的就近访问(地理)
  • 集中式的报告集群

我这里实验所用到是白金版免费试用30天来迁移数据。

  • ES集群版本:最新的7.12.3(博客中helm部署)
  • kibana版本:7.12.3(通过helm安装)

1.关闭源集群和目标集群的xpack.security功能。

1
2
# 修改集群所有节点的elasticsearch.yml配置后重启节点
xpack.security.enabled: false

2.两个集群分别安装kibana(如果数据源集群已经申请了试用白金版订阅30天可以不用安装)(如果集群已经安装kibana请从申请试用白金版开始)

我这里源集群已经申请了白金版订阅,就跳过源集群的kibana安装,只安装目标集群的bibana

1
2
3
4
5
$ helm add repo elastic https://helm.elastic.co
$ helm pull elastic/kibana --version 7.13.2
$ tar xzvf kibana-7.13.2.tgz
$ ls
kibana kibana-7.13.2.tgz

通过helm的chart包创建一个kibana

1
$ helm install kibana -n es --set elasticsearchHosts="http://YourESAddress:9200" --set service.type="NodePort"  --set service.nodePort=32232 kibana/

kibana的pod启动成功后访问http://nodeip:32232即可访问到kibana

申请白金版试用版licence

左上角菜单 —> stack management —> 许可管理 —> 申请试用

添加远程源ES集群信息

远程集群 —> 添加远程集群 —> 填写远程源集群9300地址 —> 保存后验证是否连接成功

新建一个跨集群复制,选择要同步远程集群的索引名称(这里目标集群不能存在索引)

跨集群复制 —> 创建Follower索引

数秒后刷新页面查看集群索引复制的状态

查看目标集群中索引就发现数据已经同步过来了,如果数据量比较大,取决于带宽大小(此CCR是实时同步的,当源集群中指定索引发现操作增、删、改将会同步操作到目标集群中)