实时数仓使用Flink识别新老客户

向右看齐 2022-11-09 14:26 287阅读 0赞

识别新老客户

  1. 识别新老客户
    本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)。
  2. 数据拆分
  3. 不同数据写入Kafka不同的Topic中

1. 封装kafka工具类

  1. public class MyKafkaUtil {
  2. static Properties props = new Properties();
  3. static {
  4. props.setProperty("bootstrap.servers","hadoop162:9092,hadoop163:9092");
  5. props.setProperty("auto.offset.reset","latest");
  6. }
  7. public static FlinkKafkaConsumer<String> getKafkaSource(String groupId, String topic){
  8. props.setProperty("group.id",groupId);
  9. return new FlinkKafkaConsumer<>(topic,new SimpleStringSchema(),props);
  10. }
  11. public static SinkFunction<String> getFlinkKafkaSink(String topic) {
  12. //return new FlinkKafkaProducer<String>("hadoop162:9092",topic,new SimpleStringSchema());
  13. Properties props = new Properties();
  14. props.setProperty("bootstrap.servers","hadoop162:9092,hadoop163:9092");
  15. props.setProperty("transaction.timeout.ms", 1000 * 60 * 15 + "");
  16. return new FlinkKafkaProducer<String>(topic,
  17. new KafkaSerializationSchema<String>(){
  18. @Override
  19. public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
  20. return new ProducerRecord<>(topic,null,s.getBytes());
  21. }
  22. },
  23. props,
  24. FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
  25. }
  26. }

2. 封装消费Kafka数据的BaseApp类

每次消费kafka数据总会有很多的模板代码, BaseApp把一些模板进行封装, 该类的子类只需要实现相应的业务逻辑即可

  1. public abstract class BaseApp {
  2. /** * 子类在此抽象方法中完成自己的业务逻辑 * * @param env 执行环境 * @param sourceStream 从Kafka直接获取得到的流 */
  3. protected abstract void run(StreamExecutionEnvironment env,
  4. DataStreamSource<String> sourceStream);
  5. /** * 做初始化相关工作 * * @param defaultParallelism 默认并行度 * @param groupId 消费者组 * @param topic 消费的topic */
  6. public void init(int defaultParallelism, String groupId, String topic) {
  7. System.setProperty("HADOOP_USER_NAME", "atguigu");
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setParallelism(defaultParallelism);
  10. // 设置CK相关的参数
  11. // 1. 设置精准一次性保证(默认) 每5000ms开始一次checkpoint
  12. env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
  13. // 2. Checkpoint必须在一分钟内完成,否则就会被抛弃
  14. env.getCheckpointConfig().setCheckpointTimeout(60000);
  15. // 3.开启在 job 中止后仍然保留的 externalized checkpoints
  16. env
  17. .getCheckpointConfig()
  18. .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  19. // 4. 设置状态后端
  20. env.setStateBackend(new FsStateBackend("hdfs://hadoop162:8020/gmall2021/flink/checkpoint"));
  21. DataStreamSource<String> sourceStream = env.addSource(MyKafkaUtil.getKafkaSource(groupId, topic));
  22. run(env, sourceStream);
  23. try {
  24. env.execute();
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }

3. DWDLogApp具体实现

识别新老访客

实现思路:

  1. 考虑数据的乱序, 使用event-time语义
  2. 按照mid分组
  3. 添加5s的滚动窗口
  4. 使用状态记录首次访问的时间戳
  5. 如果状态为空, 则此窗口内的最小时间戳的事件为首次访问, 其他均为非首次访问
  6. 如果状态不为空, 则此窗口内所有的事件均为非首次访问

    public class DWDLogApp extends BaseAPP{

    1. public static void main(String[] args) {
    2. new DWDLogApp().init(20000,2,"DWDLogApp","DWDLogApp","ods_log");
    3. }
    4. @Override
    5. public void run(StreamExecutionEnvironment env,
    6. DataStreamSource<String> sourceStream) {
    7. //1.区分新老客户
    8. SingleOutputStreamOperator<JSONObject> validateFlatDS = distinguishNewOrOld(sourceStream);
    9. //validateFlatDS.print();
    10. //2.分流 启动日志在主流,其他日志在侧输出流
    11. Tuple3<SingleOutputStreamOperator<JSONObject>, DataStream<JSONObject>, DataStream<JSONObject>> streams = splitStram(validateFlatDS);
    12. //3.分流后的数据写入到DWD层(kafka)
    13. sendToKafka(streams);
    14. }

4. 识别新老用户

  1. //1.区分新老客户
  2. private SingleOutputStreamOperator<JSONObject> distinguishNewOrOld(DataStreamSource<String> sourceStream) {
  3. SingleOutputStreamOperator<JSONObject> result = sourceStream
  4. .map(JSON::parseObject)
  5. .assignTimestampsAndWatermarks(
  6. WatermarkStrategy
  7. .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
  8. .withTimestampAssigner((obj, ts) -> obj.getLong("ts")))
  9. .keyBy(line -> line.getJSONObject("common").getString("mid"))
  10. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  11. .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
  12. private ValueState<Long> firstVistedState;
  13. @Override
  14. public void open(Configuration parameters) throws Exception {
  15. firstVistedState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("firstVistedState", Long.class));
  16. }
  17. @Override
  18. public void process(String key,
  19. Context context,
  20. Iterable<JSONObject> iterable,
  21. Collector<JSONObject> out) throws Exception {
  22. //每条数据判断是否为新老用户
  23. if (firstVistedState.value() == null) {
  24. System.out.println("这是第一次登陆");
  25. //1.mid第一次登陆的窗口,按时间戳排序,把第一个登陆标记为新用户,其他都是旧用户
  26. ArrayList<JSONObject> list = new ArrayList<>();
  27. for (JSONObject object : iterable) {
  28. list.add(object);
  29. }
  30. list.sort(Comparator.comparing(o -> o.getLong("ts")));
  31. for (int i = 0; i < list.size(); i++) {
  32. if (i == 0) {
  33. list.get(i).getJSONObject("common").put("is_new", "1");
  34. firstVistedState.update(list.get(i).getLong("ts"));
  35. } else {
  36. list.get(i).getJSONObject("common").put("is_new", "0");
  37. }
  38. out.collect(list.get(i));
  39. }
  40. } else {
  41. //不是这个用户的第一个窗口,所有访问都是老用户访问
  42. for (JSONObject object : iterable) {
  43. object.getJSONObject("common").put("is_new", "0");
  44. out.collect(object);
  45. }
  46. }
  47. }
  48. });
  49. return result;
  50. }
  51. }

5. 数据分流

根据日志数据内容,将日志数据分为3类: 页面日志、启动日志和曝光日志。
页面日志输出到主流,启动日志输出到启动侧输出流曝光日志输出到曝光日志侧输出流

  1. //2.分流 启动日志在主流,其他日志在侧输出流
  2. private Tuple3<SingleOutputStreamOperator<JSONObject>, DataStream<JSONObject>, DataStream<JSONObject>> splitStram(SingleOutputStreamOperator<JSONObject> validateFlatDS) {
  3. OutputTag<JSONObject> pageTag = new OutputTag<JSONObject>("page"){ };
  4. OutputTag<JSONObject> displayTag = new OutputTag<JSONObject>("display"){ };
  5. SingleOutputStreamOperator<JSONObject> startStream = validateFlatDS
  6. .process(new ProcessFunction<JSONObject, JSONObject>() {
  7. @Override
  8. public void processElement(JSONObject value,
  9. Context context,
  10. Collector<JSONObject> out) throws Exception {
  11. JSONObject start = value.getJSONObject("start");
  12. if (start != null) {
  13. //这条日志是启动日志
  14. out.collect(value);
  15. } else {
  16. JSONObject page = value.getJSONObject("page");
  17. if (page != null) {
  18. context.output(pageTag, value);
  19. }
  20. JSONArray displays = value.getJSONArray("displays");
  21. if (displays != null && displays.size() > 0) {
  22. for (int i = 0; i < displays.size(); i++) {
  23. JSONObject displayObject = displays.getJSONObject(i);
  24. String page_id = value.getJSONObject("page").getString("page_id");
  25. displayObject.put("page_id", page_id);
  26. displayObject.put("ts", value.getLong("ts"));
  27. context.output(displayTag, displayObject);
  28. }
  29. }
  30. }
  31. }
  32. });
  33. return Tuple3.of(startStream,
  34. startStream.getSideOutput(pageTag),
  35. startStream.getSideOutput(displayTag));
  36. }

6. 不同流写入到Kafka不同Topic

  1. private void sendToKafka(Tuple3<SingleOutputStreamOperator<JSONObject>, DataStream<JSONObject>, DataStream<JSONObject>> streams) {
  2. streams.f0.map(line->JSON.toJSONString(line)).addSink(MyKafkaUtil.getFlinkKafkaSink("dwd_start_log")).setParallelism(1);
  3. streams.f1.map(line->JSON.toJSONString(line)).addSink(MyKafkaUtil.getFlinkKafkaSink("dwd_page_log")).setParallelism(1);
  4. streams.f2.map(line->JSON.toJSONString(line)).addSink(MyKafkaUtil.getFlinkKafkaSink("dwd_display_log")).setParallelism(1);
  5. }

发表评论

表情:
评论列表 (有 0 条评论,287人围观)

还没有评论,来说两句吧...

相关阅读

    相关 基于Flink实时

    转自博客园大佬:[鼬手牵佐手][Link 1]   实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽