Elasticsearch 使用reindex进行数据同步或索引重构
1、批量复制优化
POST _reindex
{"source": {"index": "source","size": 5000},"dest": {"index": "dest"}
}
2、提高scroll的并行度优化
POST _reindex?slices=5&refresh
{"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}
slices大小设置注意事项:
1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。
效果
实践证明,比默认设置reindex速度能提升10倍+。
3、条件查询以及部分字段同步
{"source": {"index": "maindata","_source": [ //查询字段"dataId","website"],"query": {"match_phrase": {"teamId": 3}},"excludes": [ "column1","column2" ] //排除字段},"dest": {"index": "maindatagroup","version_type": "internal"}
}
说明:
“version_type”: “internal”,internal表示内部的,省略version_type或version_type设置为 internal 将导致 Elasticsearch 盲目地将文档转储到目标中,覆盖任何具有相同类型和 ID 的文件。
这也是最常见的重建方式。
4、从远程中重建索引
POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200","username": "user","password": "pass","socket_timeout": "1m","connect_timeout": "10s"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}
注:需要给新的es配置白名单:reindex.remote.whitelist: “172.16.76.147:9200”
5、重构数据之取余
将publicsentimenthot 数据通过organId 取余2 ,把数据分配到相应的索引上
POST _reindex
{"source": {"index": "publicsentimenthot","size": 1000},"dest": {"index": "pubtest_0","op_type": "create"},"script": {"lang": "painless","source": "ctx._index = 'pubtest_' + (ctx._source.organId ?: 0) % 2;"}
}
6、查询reindex任务
(1)获取reindex任务列表
GET _tasks?detailed=true&actions=*reindex
(2)根据任务id查看任务
GET _tasks/r1A2WoRbTwKZ516z6NEs5A:36619
注: r1A2WoRbTwKZ516z6NEs5A:36619 为任务列表的id
(2)取消任务
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel
7、logstash 按照数据id重构索引
input {elasticsearch {hosts => ["第一个集群地址"]index => "源索引名称"query => '{"query": {"match_all": {}}}'size => 1000scroll => "5m"docinfo => true}
}filter {ruby {code => "organ_id = event.get('organId').to_i rescue 0target_index = '目标索引前缀_' + (organ_id % 10).to_sevent.set('[@metadata][target_index]', target_index)"}
}output {elasticsearch {hosts => ["第二个集群地址"]index => "%{[@metadata][target_index]}"document_id => "%{[@metadata][_id]}"}
}