【MybatisPlus】怎么处理百万级以上数据开发案例

深藏阁楼爱情的钟 2023-10-09 19:14 24阅读 0赞

1.需求

数据库的表有一百多万条数据,现在要通过定时任务,去把这些数据发送到kafka 第三方。
在这里插入图片描述

2.遇到的问题

方案1(失败)

首先是想通过分页去一页一页的发送到kafka,但是出现了两个问题。

  • 第一是 分页越到后面越慢,经过分析是:当一个数据库表过于庞大,LIMIT offset, length中的offset值过大,则SQL查询语句会非常缓慢
  • 第二是 在查询过程中,因为使用的是封装框架的多数据源切换,分页插件也有问题,这个暂时没有解决,直接切换方案。

方案2(成功)

通过查询一些资料,最终使用通过 最大ID方式去 一批一批的拿数据;

之前的sql是:

  1. select * from alert where 1=1 and (first_alert_time BETWEEN ? AND ? ) limit 1000000,100

优化后是:通过每次获取比当前id大的数据,获取100条,处理后,下一次再根据之前最后一条为基础,作为下一次的比较id。

备注:字段已加索引

  1. select * from alert where 1=1 and (first_alert_time BETWEEN ? AND ? ) and alert_seq >1655600780346079 order by alert_seq asc LIMIT 100

如果主键不是自增序列的话,用时间也是可以的,根据创建时间:

  1. select * from alert where 1=1 and (first_alert_time BETWEEN ? AND ? ) and create_time >'2020-01-01 12:29:23' order by create_time asc LIMIT 100

3.解决

使用方案2,分页的问题也解决了,查询慢的问题也解决了。

最后上代码:

  1. public void ltAssetSafeProcess(String username, String password, Integer businessType, String topic, Boolean isToday) {
  2. Long startTime = null;
  3. Long endTime = null;
  4. Integer count = eventSearchTaskMapper.getCount(businessType);
  5. if (Objects.nonNull(count) && count == 0) {
  6. // 初次上报直接全量
  7. startTime = 946656000000L;
  8. endTime = new Date().getTime();
  9. } else {
  10. // 增量上报
  11. QueryWrapper<EventSearchTask> queryWrapper = new QueryWrapper();
  12. queryWrapper.eq("business_type", businessType);
  13. queryWrapper.orderByDesc("create_time");
  14. queryWrapper.last("limit 1");
  15. EventSearchTask selectOne = eventSearchTaskMapper.selectOne(queryWrapper);
  16. startTime = selectOne.getEndTime();
  17. endTime = new Date().getTime();
  18. }
  19. // 处理上报批次
  20. EventSearchTask eventSearchTask = new EventSearchTask();
  21. eventSearchTask.setId($.toString(SnowFlake.nextId()));
  22. eventSearchTask.setDataSource(FeignTemplateType.LT_ZC_INFO.getType());
  23. eventSearchTask.setStartTime(startTime);
  24. eventSearchTask.setEndTime(endTime);
  25. eventSearchTask.setTimeDimension("TODAY");
  26. eventSearchTask.setBusinessType(businessType);
  27. eventSearchTask.setCreateTime(SystemClock.nowDate());
  28. // 处理上报批次
  29. QueryWrapper<AlertEntity> queryWrapper = new QueryWrapper<>();
  30. queryWrapper.lambda().between(AlertEntity::getFirstAlertTime,new Timestamp(startTime),new Timestamp(endTime));
  31. Integer alertCounts = alertService.getCount(queryWrapper);//获取总数
  32. eventSearchTask.setAlertCount(alertCounts);
  33. eventSearchTaskMapper.insert(eventSearchTask);
  34. int pageSize = 1000;
  35. int pageCount = PageUtils.pageCount(alertCounts, pageSize);//计算页数
  36. // 开始上报
  37. String lastAlertSeq = null;
  38. for (int i = 0; i <= pageCount; i++) {
  39. log.info("处理到第:{}页",i);
  40. // 使用alertSeq 去增量获取
  41. Map<String,Object> params = new HashMap<>();
  42. params.put("startTime",new Timestamp(startTime));
  43. params.put("endTime",new Timestamp(endTime));
  44. params.put("pageSize",pageSize);
  45. if(i == 0){
  46. // 获取最早的一条数据 最为第一次基点
  47. AlertEntity alertEntity = alertService.getStratAlertOne();
  48. params.put("alertSeq",alertEntity.getAlertSeq());
  49. }else {
  50. params.put("alertSeq",lastAlertSeq);
  51. }
  52. List<AlertEntity> records = alertService.getAlertPages(params);
  53. if(records.size() == 0){
  54. log.info("数据:{} 条",records.size());
  55. }
  56. log.info("处理数据:{} 条",records.size());
  57. for(AlertEntity entity : records){
  58. lastAlertSeq = entity.getAlertSeq();
  59. // 异步发送kafka
  60. threadPoolTaskExecutor.submit(() -> {
  61. try {
  62. JSON.toJSONString(entity, SerializerFeature.WriteMapNullValue);
  63. } catch (Exception e) {
  64. log.error("数据存在问题--------------------",e);
  65. return;
  66. }
  67. kafkaAnalyzeProducer.send(topic, JSON.toJSONString(entity, SerializerFeature.WriteMapNullValue), new SendCallBack() {
  68. @Override
  69. public void sendSuccessCallBack(String topic, String msg) {
  70. log.info("sendSuccessCallBack--------------{}----------{}",topic,msg);
  71. }
  72. @Override
  73. public void sendFailCallBack(String topic, String msg, Throwable ex) {
  74. log.info("sendFailCallBack--------------{}----------{}---------{}",topic,msg,ex);
  75. }
  76. });
  77. });
  78. }
  79. }
  80. }

发表评论

表情:
评论列表 (有 0 条评论,24人围观)

还没有评论,来说两句吧...

相关阅读