某些场景下, 需要批量载入数据到其他数据源.
以 es 为例, 相关代码如下:
说明
批量复制数据
// 初始化各种数据源, 为了容易理解这里使用直接传入jdbc的实现类
SybnStreamDao dao1 = new DbutilStreamDaoImpl("jdbc:mysql://账户:密码@192.168.4.31:3306,192.168.4.32:3306/test"); // sql
// SybnStreamDao dao2 = new MongoStreamDaoImpl("mongodb://账户:密码@192.168.4.31:27017,192.168.4.32:27017/test"); // mongo
// SybnStreamDao dao3 = new SolrStreamDaoImpl("solr://192.168.7.71:2181,192.168.7.72:2181/solr"); // solr
EsStreamDaoImpl dao4 = new EsStreamDaoImpl("es", "es://192.168.7.71:9200,192.168.7.72:9200"); // es
// SybnStreamDao dao5 = new HBaseStreamDaoImpl("hbase://192.168.7.71,192.168.7.72/hbase-unsecure"); // HBase
// 设置 es 的数据落盘时间, 防止频繁落盘降低性能.
dao4.setEsRefreshInterval("xxx_es", 300);
// 设置每次个批次写入的数据量
dao4.setSaveBatchSize(3000);
// 流式读取数据
Stream<Map<String, Object>> mapStream = dao1.sqlFindStreamMap("select * from xxx where id > 123");
// 流式写入数据
dao4.commonSaveStream("xxx_es", mapStream);
// 恢复设置 es 的数据落盘时间为1秒.
dao4.setEsRefreshInterval("xxx_es", 1);
- 语法
实测效率再 5000行/秒 左右, 此数据受 单行数据大小, 磁盘io 和 网络速度 等因素影响.