某些场景下,需要临时连接到,未提前定义的或者随时变更的数据库。
在 mysql 中可以创建远程表 (federated engine) 来实现.
本工具提供了类似的实现方案, 无需建虚拟表可直接链接临时数据源.
dynamicSourceQuery(select * from xxx) on url='jdbc:sqlite:/usr/local/junit/data/sqlite_3.db'
本工具包支持接近sql风格的 if 语句, 实际对于 if 语句的支持如下:
-- 判断子查询是否有值
if [not] exists (select code from xxx) [then (select 1)] [else (select 2)] [end if]
-- 判断表达式
if [not] (@a>1) select 1
-- 判断表是否存在
if [not] exists table (table1, table2, ...) [then (select 1)] [else (select 2)] [end if]
-- 判断字段是否存在
if [not] exists [table] field (table1.field1, table2[.field2], ...) [then (select 1)] [else (select 2)] [end if]
某些场景下, 需要批量载入数据到其他数据源.
以 es 为例, 相关代码如下:
// 初始化各种数据源, 为了容易理解这里使用直接传入jdbc的实现类
SybnStreamDao dao1 = new DbutilStreamDaoImpl("jdbc:mysql://账户:密码@192.168.4.31:3306,192.168.4.32:3306/test"); // sql
// SybnStreamDao dao2 = new MongoStreamDaoImpl("mongodb://账户:密码@192.168.4.31:27017,192.168.4.32:27017/test"); // mongo
// SybnStreamDao dao3 = new SolrStreamDaoImpl("solr://192.168.7.71:2181,192.168.7.72:2181/solr"); // solr
EsStreamDaoImpl dao4 = new EsStreamDaoImpl("es", "es://192.168.7.71:9200,192.168.7.72:9200"); // es
// SybnStreamDao dao5 = new HBaseStreamDaoImpl("hbase://192.168.7.71,192.168.7.72/hbase-unsecure"); // HBase
// 设置 es 的数据落盘时间, 防止频繁落盘降低性能.
dao4.setEsRefreshInterval("xxx_es", 300);
// 设置每次个批次写入的数据量
dao4.setSaveBatchSize(3000);
// 流式读取数据
Stream<Map<String, Object>> mapStream = dao1.sqlFindStreamMap("select * from xxx where id > 123");
// 流式写入数据
dao4.commonSaveStream("xxx_es", mapStream);
// 恢复设置 es 的数据落盘时间为1秒.
dao4.setEsRefreshInterval("xxx_es", 1);
实测效率再 5000行/秒 左右, 此数据受 单行数据大小, 磁盘io 和 网络速度 等因素影响.
某些场景下, 需要批量执行某些 sql, 其中 sql 中表名或者字段名是变量。
原生sql中, 一般使用存储过程实现。 本工具为了满足此场景, 引入了 execBatch 批量操作。
-- 前置sql, 列出所有表名
show tables;
-- 单线程批量操作, 获取所有表的表结构
execBatch(show create table @tables);
```sql
-- 前置sql, 列出所有表名
show tables;
-- 多线程批量操作, 获取所有表的表结构, 如果单个任务失败不影响其他任务继续执行
execBatch(show create table @tables) ON concurrent=5 and ignore_error=true;