sybn sybn-util 项目说明文档 - 基于java的跨数据库联合查询

批量加载mysql数据到es

2022-10-24
sybn

某些场景下, 需要批量载入数据到其他数据源.

以 es 为例, 相关代码如下:

说明

批量复制数据
// 初始化各种数据源, 为了容易理解这里使用直接传入jdbc的实现类
SybnStreamDao dao1 = new DbutilStreamDaoImpl("jdbc:mysql://账户:密码@192.168.4.31:3306,192.168.4.32:3306/test"); // sql
// SybnStreamDao dao2 = new MongoStreamDaoImpl("mongodb://账户:密码@192.168.4.31:27017,192.168.4.32:27017/test"); // mongo
// SybnStreamDao dao3 = new SolrStreamDaoImpl("solr://192.168.7.71:2181,192.168.7.72:2181/solr"); // solr
EsStreamDaoImpl dao4 = new EsStreamDaoImpl("es", "es://192.168.7.71:9200,192.168.7.72:9200"); // es
// SybnStreamDao dao5 = new HBaseStreamDaoImpl("hbase://192.168.7.71,192.168.7.72/hbase-unsecure"); // HBase

// 设置 es 的数据落盘时间, 防止频繁落盘降低性能.
dao4.setEsRefreshInterval("xxx_es", 300);
// 设置每次个批次写入的数据量
dao4.setSaveBatchSize(3000);

// 流式读取数据
Stream<Map<String, Object>> mapStream = dao1.sqlFindStreamMap("select * from xxx where id > 123");
// 流式写入数据
dao4.commonSaveStream("xxx_es", mapStream);

// 恢复设置 es 的数据落盘时间为1.
dao4.setEsRefreshInterval("xxx_es", 1);
  • 语法

实测效率再 5000行/秒 左右, 此数据受 单行数据大小, 磁盘io 和 网络速度 等因素影响.


Similar Posts

Comments

暂不开放评论! 可微信联系