实时数仓使用Flink识别新老客户
识别新老客户
- 识别新老客户
本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)。 - 数据拆分
- 不同数据写入Kafka不同的Topic中
1. 封装kafka工具类
public class MyKafkaUtil {
static Properties props = new Properties();
static {
props.setProperty("bootstrap.servers","hadoop162:9092,hadoop163:9092");
props.setProperty("auto.offset.reset","latest");
}
public static FlinkKafkaConsumer<String> getKafkaSource(String groupId, String topic){
props.setProperty("group.id",groupId);
return new FlinkKafkaConsumer<>(topic,new SimpleStringSchema(),props);
}
public static SinkFunction<String> getFlinkKafkaSink(String topic) {
//return new FlinkKafkaProducer<String>("hadoop162:9092",topic,new SimpleStringSchema());
Properties props = new Properties();
props.setProperty("bootstrap.servers","hadoop162:9092,hadoop163:9092");
props.setProperty("transaction.timeout.ms", 1000 * 60 * 15 + "");
return new FlinkKafkaProducer<String>(topic,
new KafkaSerializationSchema<String>(){
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return new ProducerRecord<>(topic,null,s.getBytes());
}
},
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
}
2. 封装消费Kafka数据的BaseApp类
每次消费kafka数据总会有很多的模板代码, BaseApp把一些模板进行封装, 该类的子类只需要实现相应的业务逻辑即可
public abstract class BaseApp {
/** * 子类在此抽象方法中完成自己的业务逻辑 * * @param env 执行环境 * @param sourceStream 从Kafka直接获取得到的流 */
protected abstract void run(StreamExecutionEnvironment env,
DataStreamSource<String> sourceStream);
/** * 做初始化相关工作 * * @param defaultParallelism 默认并行度 * @param groupId 消费者组 * @param topic 消费的topic */
public void init(int defaultParallelism, String groupId, String topic) {
System.setProperty("HADOOP_USER_NAME", "atguigu");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(defaultParallelism);
// 设置CK相关的参数
// 1. 设置精准一次性保证(默认) 每5000ms开始一次checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 2. Checkpoint必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 3.开启在 job 中止后仍然保留的 externalized checkpoints
env
.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 4. 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop162:8020/gmall2021/flink/checkpoint"));
DataStreamSource<String> sourceStream = env.addSource(MyKafkaUtil.getKafkaSource(groupId, topic));
run(env, sourceStream);
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. DWDLogApp具体实现
识别新老访客
实现思路:
- 考虑数据的乱序, 使用event-time语义
- 按照mid分组
- 添加5s的滚动窗口
- 使用状态记录首次访问的时间戳
- 如果状态为空, 则此窗口内的最小时间戳的事件为首次访问, 其他均为非首次访问
如果状态不为空, 则此窗口内所有的事件均为非首次访问
public class DWDLogApp extends BaseAPP{
public static void main(String[] args) {
new DWDLogApp().init(20000,2,"DWDLogApp","DWDLogApp","ods_log");
}
@Override
public void run(StreamExecutionEnvironment env,
DataStreamSource<String> sourceStream) {
//1.区分新老客户
SingleOutputStreamOperator<JSONObject> validateFlatDS = distinguishNewOrOld(sourceStream);
//validateFlatDS.print();
//2.分流 启动日志在主流,其他日志在侧输出流
Tuple3<SingleOutputStreamOperator<JSONObject>, DataStream<JSONObject>, DataStream<JSONObject>> streams = splitStram(validateFlatDS);
//3.分流后的数据写入到DWD层(kafka)
sendToKafka(streams);
}
4. 识别新老用户
//1.区分新老客户
private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> sourceStream) {
SingleOutputStreamOperator<JSONObject> result = sourceStream
.map(JSON::parseObject)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((obj, ts) -> obj.getLong("ts")))
.keyBy(line -> line.getJSONObject("common").getString("mid"))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
private ValueState<Long> firstVistedState;
@Override
public void open(Configuration parameters) throws Exception {
firstVistedState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstVistedState", Long.class));
}
@Override
public void process(String key,
Context context,
Iterable<JSONObject> iterable,
Collector<JSONObject> out) throws Exception {
//每条数据判断是否为新老用户
if (firstVistedState.value() == null) {
System.out.println("这是第一次登陆");
//1.mid第一次登陆的窗口,按时间戳排序,把第一个登陆标记为新用户,其他都是旧用户
ArrayList<JSONObject> list = new ArrayList<>();
for (JSONObject object : iterable) {
list.add(object);
}
list.sort(Comparator.comparing(o -> o.getLong("ts")));
for (int i = 0; i < list.size(); i++) {
if (i == 0) {
list.get(i).getJSONObject("common").put("is_new", "1");
firstVistedState.update(list.get(i).getLong("ts"));
} else {
list.get(i).getJSONObject("common").put("is_new", "0");
}
out.collect(list.get(i));
}
} else {
//不是这个用户的第一个窗口,所有访问都是老用户访问
for (JSONObject object : iterable) {
object.getJSONObject("common").put("is_new", "0");
out.collect(object);
}
}
}
});
return result;
}
}
5. 数据分流
根据日志数据内容,将日志数据分为3类: 页面日志、启动日志和曝光日志。
页面日志输出到主流,启动日志输出到启动侧输出流曝光日志输出到曝光日志侧输出流
//2.分流 启动日志在主流,其他日志在侧输出流
private Tuple3<SingleOutputStreamOperator<JSONObject>, DataStream<JSONObject>, DataStream<JSONObject>> splitStram(SingleOutputStreamOperator<JSONObject> validateFlatDS) {
OutputTag<JSONObject> pageTag = new OutputTag<JSONObject>("page"){ };
OutputTag<JSONObject> displayTag = new OutputTag<JSONObject>("display"){ };
SingleOutputStreamOperator<JSONObject> startStream = validateFlatDS
.process(new ProcessFunction<JSONObject, JSONObject>() {
@Override
public void processElement(JSONObject value,
Context context,
Collector<JSONObject> out) throws Exception {
JSONObject start = value.getJSONObject("start");
if (start != null) {
//这条日志是启动日志
out.collect(value);
} else {
JSONObject page = value.getJSONObject("page");
if (page != null) {
context.output(pageTag, value);
}
JSONArray displays = value.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
for (int i = 0; i < displays.size(); i++) {
JSONObject displayObject = displays.getJSONObject(i);
String page_id = value.getJSONObject("page").getString("page_id");
displayObject.put("page_id", page_id);
displayObject.put("ts", value.getLong("ts"));
context.output(displayTag, displayObject);
}
}
}
}
});
return Tuple3.of(startStream,
startStream.getSideOutput(pageTag),
startStream.getSideOutput(displayTag));
}
6. 不同流写入到Kafka不同Topic
private void sendToKafka(Tuple3<SingleOutputStreamOperator<JSONObject>, DataStream<JSONObject>, DataStream<JSONObject>> streams) {
streams.f0.map(line->JSON.toJSONString(line)).addSink(MyKafkaUtil.getFlinkKafkaSink("dwd_start_log")).setParallelism(1);
streams.f1.map(line->JSON.toJSONString(line)).addSink(MyKafkaUtil.getFlinkKafkaSink("dwd_page_log")).setParallelism(1);
streams.f2.map(line->JSON.toJSONString(line)).addSink(MyKafkaUtil.getFlinkKafkaSink("dwd_display_log")).setParallelism(1);
}
还没有评论,来说两句吧...