Hive自定义UDTF解析json不定长数组
Hive自定义UDTF解析json不定长数组
一、UDTF函数概述
UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求。
二、添加pom.xml依赖
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
三、继承GenericUDTF实现解析json不定长数组
package com.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
public class myUDTF extends GenericUDTF {
/** * 作用:1.检查入参是否符合要求,如果不符合,可以进行通知 * 2.返回ObjectInspector,代表每行中内容的类型 * * 要求传入的必须是一个String * * @param argOIs * @return * @throws UDFArgumentException */
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//获取函数传入的所有字段
List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
//判断函数传入的是否是一个字段
if (inputFields.size() != 1){
throw new UDFArgumentException("此函数只能传入一列!");
}
//判断传入的字段是否是字符串
if (!"string".equals(inputFields.get(0).getFieldObjectInspector().getTypeName())){
throw new UDFArgumentException("此函数只能传入一列字符串");
}
//返回的是N行1列,每行都是只有一列(string)
//每行的每列的名称
List<String> fieldNames = new ArrayList<>();
fieldNames.add("jsonObjectStr");
//每行的每列的类型
List<ObjectInspector> fieldOIS = new ArrayList<>();
fieldOIS.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIS);
}
//构造一个容器,存放输出的每行内容
private String [] result = new String[1];
//process: 生成UDTF返回的每行内容,生成后,调用forward(),将结果传给其他的运算符运算!
public void process(Object[] args) throws HiveException {
//获取函数传入的JsonArrayStr
String jsonArrayStr = args[0].toString();
//将JsonArrayStr转换为JsonArray对象
JSONArray jsonArray = new JSONArray(jsonArrayStr);
//遍历jsonArray,取出每一个JsonObject代表的str
for (int i=0;i<jsonArray.length();i++){
String jsonObjectStr = jsonArray.getJSONObject(i).toString();
result[0] = jsonObjectStr;
forward(result);
}
}
//关闭资源
public void close() throws HiveException {
}
}
四、在Hive创建自定义函数
①将上述代码打包上传到hive家目录下的auxlib文件夹内($HIVE_HOME/auxlib)
如果不存在需要自己手动创建,重启hive的服务,hive会自动读取auxlib目录下的jar包
②创建永久函数
注意: 函数是有库的范围,自定义的函数,在哪个库定义,只能在哪个库用! 或使用 库名.函数名
create function 函数名 as '函数全类名';
create function explode_array as 'com.hive.myUDTF';
五、函数使用
SELECT
GET_JSON_OBJECT(line,'$.common.ar'),
GET_JSON_OBJECT(line,'$.common.ba'),
GET_JSON_OBJECT(line,'$.common.ch') ,
GET_JSON_OBJECT(line,'$.common.md') ,
GET_JSON_OBJECT(line,'$.common.mid'),
GET_JSON_OBJECT(line,'$.common.os') ,
GET_JSON_OBJECT(line,'$.common.uid') ,
GET_JSON_OBJECT(line,'$.common.vc') ,
GET_JSON_OBJECT(line,'$.page.during_time') ,
GET_JSON_OBJECT(line,'$.page.item') ,
GET_JSON_OBJECT(line,'$.page.item_type') ,
GET_JSON_OBJECT(line,'$.page.last_page_id') ,
GET_JSON_OBJECT(line,'$.page.page_id') ,
GET_JSON_OBJECT(line,'$.page.sourceType') ,
GET_JSON_OBJECT(line,'$.ts') ,
GET_JSON_OBJECT(jsonObjectStr,'$.displayType') ,
GET_JSON_OBJECT(jsonObjectStr,'$.item') ,
GET_JSON_OBJECT(jsonObjectStr,'$.item_type') ,
GET_JSON_OBJECT(jsonObjectStr,'$.order')
FROM ods_log
lateral view explore_arr(GET_JSON_OBJECT(line,'$.displays')) tmp as jsonObjectStr
WHERE GET_JSON_OBJECT(line ,'$.displays') is not null
AND dt='2021-01-04';
-—————————————————————————————更新——————————————————————————————————————
对json数组字符串数组进行处理,拆分后使用explore函数处理(explore函数也是UDTF函数)
SELECT
GET_JSON_OBJECT(line,'$.common.ar'),
GET_JSON_OBJECT(line,'$.common.ba'),
GET_JSON_OBJECT(line,'$.common.ch') ,
GET_JSON_OBJECT(line,'$.common.md') ,
GET_JSON_OBJECT(line,'$.common.mid'),
GET_JSON_OBJECT(line,'$.common.os') ,
GET_JSON_OBJECT(line,'$.common.uid') ,
GET_JSON_OBJECT(line,'$.common.vc') ,
GET_JSON_OBJECT(line,'$.page.during_time') ,
GET_JSON_OBJECT(line,'$.page.item') ,
GET_JSON_OBJECT(line,'$.page.item_type') ,
GET_JSON_OBJECT(line,'$.page.last_page_id') ,
GET_JSON_OBJECT(line,'$.page.page_id') ,
GET_JSON_OBJECT(line,'$.page.sourceType') ,
GET_JSON_OBJECT(line,'$.ts') ,
GET_JSON_OBJECT(jsonObjectStr,'$.displayType') ,
GET_JSON_OBJECT(jsonObjectStr,'$.item') ,
GET_JSON_OBJECT(jsonObjectStr,'$.item_type') ,
GET_JSON_OBJECT(jsonObjectStr,'$.order')
--REGEXP_REPLACE( GET_JSON_OBJECT(line,'$.displays') ,'\\[|\\]' ,'' )
--REGEXP_REPLACE( REGEXP_REPLACE( GET_JSON_OBJECT(line,'$.displays') ,'\\[|\\]' ,'' ),'\\}\\,\\{' ,'\\}\\!\\{')
--split( REGEXP_REPLACE( REGEXP_REPLACE( GET_JSON_OBJECT(line,'$.displays') ,'\\[|\\]' ,'' ),'\\}\\,\\{' ,'\\}\\!\\{'),'\\!' )
from ods_log
lateral view explode( split( REGEXP_REPLACE( REGEXP_REPLACE( GET_JSON_OBJECT(line,'$.displays') ,'\\[|\\]' ,'' ),'\\}\\,\\{' ,'\\}\\!\\{'),'\\!' ) ) tmp as jsonObjectStr
where GET_JSON_OBJECT(line ,'$.displays') is not null
and dt='2021-01-04'
;
还没有评论,来说两句吧...