sybn sybn-util 项目说明文档 - 基于java的跨数据库联合查询

关于自定义函数的说明

2020-06-12
sybn

简介

在 mysql, hive, presto 等架构中都有自定义 udf 函数的功能.

本工具包同样也提供类似能力.

常见的自定义函数分为三种: UDF(一进一出) UDAF(多进一出) UDTF(一进多出)

本工具包目前支持自定义 UDF 和 UDAF 函数, 并且内置 unwind 这个 UDTF 函数.

查询当前可用的函数

show functions

函数名不区分大小写

自定义 UDF

  • 以下是注册自定义 UDF 函数的代码
import cn.sybn.util.conver.udf.SybnUdfPool;
import cn.sybn.util.stat.udaf.base.BaseCountUdaf;

SybnUdfPool.register(CalcUdf.getInstance());
  • 以下是 upper 函数的定义代码.
// upper("aAa") => "AAA"
public class SqlUpperUdf implements SybnUdf.SybnSingleArgUdf<String> {

	private static SqlUpperUdf instance = new SqlUpperUdf();

	private SqlUpperUdf() {

	}

	public static SqlUpperUdf getInstance() {
		return instance;
	}

	@Override
	public String getName() {
		return "upper";
	}

	/**
	 * 当前函数有多个名字
	 */
	@Override
	public List<String> getNames() {
		return ListUtil.toList("toUpperCase", "upper");
	}

	@Override
	public String getDesc() {
		return "返回参数转字符串后的大写格式";
	}

	@Override
	public String getRes(Object value) {
		if (value == null) {
			return "";
		}
		return value.toString().toUpperCase();
	}
}

非流式计算的自定义 UDAF

非流式计算时, udf 引擎不负责处理 group by, 只处理 group by 分组后的 subList. 所有的 udf 函数共享一次 group by 分组操作.

非流式计算时, 无法多线程计算, 自然也无法在执行半途中获取中间结果.

  • 以下是注册自定义 UDAF 函数的代码
import cn.sybn.util.conver.udf.SybnUdfPool;
import cn.sybn.util.conver.udf.singlearg.CalcUdf;

StatUdafPool.register(BaseCountUdaf.getInstance());
  • 以下是 upper 函数的定义代码.
public class BaseCountUdaf implements BaseUdaf {

    private static BaseCountUdaf instance = new BaseCountUdaf();

    private BaseCountUdaf() {

    }

    public static BaseCountUdaf getInstance() {
        return instance;
    }

    @Override
    public String getUdafName() {
        return "count";
    }

    @Override
    public <T> void udaf(@NonNull UdafConf udaf, @NonNull String udafKey, List<T> subList, Map<String, Object> newMap, Object defaults) {
        String aggKey = udaf.getArg(0, udafKey);
        int agg = DatasMatchUdfUtil.count(subList, aggKey);
        // 将新数据存数在udafKey中
        newMap.put(udafKey, agg);
    }
}

流式计算的自定义 UDAF

流式计算时需要考虑多线程结果集合并, 因此会出现 merge 函数合并不同线程的结果集. 相关设计参考了 spark 的 udaf 引擎.

流式计算引擎可以允许在 stram 流未结束时, 查询中间计算结果.

  • 以下是注册自定义 UDAF 函数的代码
import cn.sybn.util.java8.stat.udaf.StreamUdafPool;

StreamUdafPool.register(udaf);
  • 以下是 upper 函数的定义代码.
SybnStreamUdaf udaf = new AbsMyGroupUdaf() {
    private static final long serialVersionUID = 1L;
    AtomicLong agg = new AtomicLong(0);

    @Override
    public String getUdafName() {
        return "count";
    }

    @Override
    public void put(Object value) {
        if (value != null) {
            agg.addAndGet(1);
        }
    }

    @Override
    public void merge(MyGroupUdaf myGroupUdaf) {
        Integer intermedia = (Integer) myGroupUdaf.getIntermedia();
        agg.addAndGet(intermedia);
    }

    @Override
    public Object getIntermedia() {
        return agg.get();
    }

    @Override
    public Object get() {
        return agg.get();
    }
};

Similar Posts

Comments

暂不开放评论! 可微信联系