LogUtil 是底层实用日志工具, 可以代理 self4j 打印日志.
SHOW VALUE STAT 可在数据库中统计数据概况, 比如统计最大值最小值和null值的数量. 不计算词频, 如需统计词频可使用 AnalyticalTable 工具.
可以对 jdbc(mysql, presto, gbase, hive, clickhouse) / solr / es / mongo / hbase / execl / csv / json 等与数据执行字段分析.
某些场景下,需要临时连接到,未提前定义的或者随时变更的数据库。
在 mysql 中可以创建远程表 (federated engine) 来实现.
本工具提供了类似的实现方案, 无需建虚拟表可直接链接临时数据源.
dynamicSourceQuery(select * from xxx) on url='jdbc:sqlite:/usr/local/junit/data/sqlite_3.db'
本工具包支持接近sql风格的 if 语句, 实际对于 if 语句的支持如下:
if [not] exists (select code from xxx) [then (select 1)] [else (select 2)] [end if]
if exists (select code from xxx) then (select 1) end if
if exists (select code from xxx) then (select 1)
if exists (select code from xxx) then select 1
if not exists (select code from xxx) then (select 1) end if
if not exists (select code from xxx) then (select 1)
if not exists (select code from xxx) then select 1
if exists (select code from xxx) else (select 2) end if
if exists (select code from xxx) else (select 2)
if exists (select code from xxx) else select 2
if not exists (select code from xxx) else (select 2) end if
if not exists (select code from xxx) else (select 2)
if not exists (select code from xxx) else select 2
if exists (select code from xxx) then (select 1) else (select 2) end if
if exists (select code from xxx) then (select 1) else (select 2)
if not exists (select code from xxx) then (select 1) else (select 2) end if
if not exists (select code from xxx) then (select 1) else (select 2)
某些场景下, 需要批量载入数据到其他数据源.
以 es 为例, 相关代码如下:
// 初始化各种数据源, 为了容易理解这里使用直接传入jdbc的实现类
SybnStreamDao dao1 = new DbutilStreamDaoImpl("jdbc:mysql://账户:密码@192.168.4.31:3306,192.168.4.32:3306/test"); // sql
// SybnStreamDao dao2 = new MongoStreamDaoImpl("mongodb://账户:密码@192.168.4.31:27017,192.168.4.32:27017/test"); // mongo
// SybnStreamDao dao3 = new SolrStreamDaoImpl("solr://192.168.7.71:2181,192.168.7.72:2181/solr"); // solr
EsStreamDaoImpl dao4 = new EsStreamDaoImpl("es", "es://192.168.7.71:9200,192.168.7.72:9200"); // es
// SybnStreamDao dao5 = new HBaseStreamDaoImpl("hbase://192.168.7.71,192.168.7.72/hbase-unsecure"); // HBase
// 设置 es 的数据落盘时间, 防止频繁落盘降低性能.
dao4.setEsRefreshInterval("xxx_es", 300);
// 设置每次个批次写入的数据量
dao4.setSaveBatchSize(3000);
// 流式读取数据
Stream<Map<String, Object>> mapStream = dao1.sqlFindStreamMap("select * from xxx where id > 123");
// 流式写入数据
dao4.commonSaveStream("xxx_es", mapStream);
// 恢复设置 es 的数据落盘时间为1秒.
dao4.setEsRefreshInterval("xxx_es", 1);
实测效率再 5000行/秒 左右, 此数据受 单行数据大小, 磁盘io 和 网络速度 等因素影响.