Flink海量数据实时去重

男娘i 2022-11-07 14:56 249阅读 0赞

Flink海量数据实时去重

方案1: 借助redis的Set

具体实现代码

缺点

  1. 需要频繁连接Redis
  2. 如果数据量过大, 对redis的内存也是一种压力

具体实现代码

缺点

  1. 如果数据量过大, 状态后端最好选择 RocksDBStateBackend
  2. 如果数据量过大, 对存储也有一定压力

方案3: 使用布隆过滤器

布隆过滤器可以大大减少存储的数据的数据量

优点

  1. 不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;
  2. 时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是
  3. 哈希函数之间相互独立,可以在硬件指令层面并行计算。

缺点

  1. 存在假阳性的概率,不适用于任何要求100%准确率的情境;
  2. 只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。

使用场景

所以,BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适.
另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。

使用布隆过滤器实现去重

Flink已经内置了布隆过滤器的实现(使用的是google的Guava)

  1. import com.atguigu.flink.java.chapter_6.UserBehavior;
  2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.state.ValueState;
  5. import org.apache.flink.api.common.state.ValueStateDescriptor;
  6. import org.apache.flink.api.common.typeinfo.TypeHint;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import org.apache.flink.configuration.Configuration;
  9. import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
  10. import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  13. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  16. import org.apache.flink.util.Collector;
  17. import java.time.Duration;
  18. public class Flink02_UV_BoomFilter {
  19. public static void main(String[] args) throws Exception {
  20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. // 创建WatermarkStrategy
  22. WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
  23. .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  24. .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
  25. @Override
  26. public long extractTimestamp(UserBehavior element, long recordTimestamp) {
  27. return element.getTimestamp() * 1000L;
  28. }
  29. });
  30. env
  31. .readTextFile("input/UserBehavior.csv")
  32. .map(line -> { // 对数据切割, 然后封装到POJO中
  33. String[] split = line.split(",");
  34. return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));
  35. })
  36. .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为
  37. .assignTimestampsAndWatermarks(wms)
  38. .keyBy(UserBehavior::getBehavior)
  39. .window(TumblingEventTimeWindows.of(Time.minutes(60)))
  40. .process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {
  41. private ValueState<Long> countState;
  42. private ValueState<BloomFilter<Long>> bfState;
  43. @Override
  44. public void open(Configuration parameters) throws Exception {
  45. countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));
  46. bfState = getRuntimeContext()
  47. .getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() { })
  48. )
  49. );
  50. }
  51. @Override
  52. public void process(String key,
  53. Context context,
  54. Iterable<UserBehavior> elements, Collector<String> out) throws Exception {
  55. countState.update(0L);
  56. // 在状态中初始化一个布隆过滤器
  57. // 参数1: 漏斗, 存储的类型
  58. // 参数2: 期望插入的元素总个数
  59. // 参数3: 期望的误判率(假阳性率)
  60. BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001);
  61. bfState.update(bf);
  62. for (UserBehavior behavior : elements) {
  63. // 查布隆
  64. if (!bfState.value().mightContain(behavior.getUserId())) {
  65. // 不存在 计数+1
  66. countState.update(countState.value() + 1L);
  67. // 记录这个用户di, 表示来过
  68. bfState.value().put(behavior.getUserId());
  69. }
  70. }
  71. out.collect("窗口: " + context.window() + " 的uv是: " + countState.value());
  72. }
  73. })
  74. .print();
  75. env.execute();
  76. }
  77. }

发表评论

表情:
评论列表 (有 0 条评论,249人围观)

还没有评论,来说两句吧...

相关阅读

    相关 flink sql

    消沉了一段时间,但是生活还是要继续的。 一直以来收集的数据都有重复的情况,flink sql如何去重呢? 1、 distinct统计 对devId去重

    相关 Flink第一弹:MapState

    点击上方蓝 字关注~         去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通