消息队列动态负载

水深无声 2022-02-28 11:38 43阅读 0赞

关于reids中的消息队列,以及其他的队列的处理方式。(底层一定要是队列,而不是伪队列。)

1.Reid中的消息队列的长度控制原理

对于一个队列,他的长度会随着放入数据而增加,随着取出数据而减少,那么放入的速度不变的前提下,取出的速度增加则会导致队列的长度变短。依据这个原理可以实现动态调整队列的长度。

  1. 而放入和取出随着调整取出占用cpu时间片的变化就一定可以维持在一个稳定的范围内。

2.Reid中的消息队列控制长度实现

Redis中队列增加,则表明当前处理队列的线程以及跟不上放入的速度,所以可以通过增加处理线程数量来增加cpu处理pop队列的时间,以队列长度除以一个定值,动态算出当前处理队列的线程数,让这些线程一起处理队列,这样可以让队列的长度维持在一个范围内。

随着队列的放入数据量的增加,服务端会新启用更过的线程去处理队列,来维持一个可控范围。

而当队列长度下降后,服务端会根据队列长度来减少线程,这样就可以保证队列长度在一个可控的范围内。服务端的压力也会随之降低。

3.本思路的优势和缺点

优点在于系统可以动态的调整当前处理的线程数量,来适应当前来队列的压力,充分利用系统的资源,一定可以控制队列内的内容长度,确保实时性。

缺点在于当系统的负载较高时,会申请更多的cpu时间片去处理队列,而抢占别的任务在cpu上的处理时间,拖慢系统的速度。(如果服务器资源匮乏,会导致别的操作的相应速度变慢。)当然,关于系统变慢的问题,可以通过增加系统的配置来提升。

如果短时间内的高并发数据可以通过这种方式很好的及时处理。实现系统资源的动态分配。

代码:

  1. /**
  2. * 动态线程池去pop数据
  3. *
  4. * @param queueName queueName
  5. */
  6. private void dynamicPoolHandleData(String queueName) {
  7. //使用线程池
  8. dataThreadPool.execute(
  9. () -> {
  10. Vector<Thread> popThreadVector = new Vector<>();
  11. while (true) {
  12. try {
  13. //这里必不可少需要至少启动一个(为了阻塞住本线程)
  14. handleDataFromRedis(queueName);
  15. //判断队列长度
  16. Long queueLength = redisUtils.length(queueName);
  17. //需要额外启动的线程数
  18. int needThreadNum = queueLength.intValue() / 3000 + 1;
  19. needThreadNum = needThreadNum >= dataThreadPool.getMaximumPoolSize() ? dataThreadPool.getMaximumPoolSize() : needThreadNum;
  20. int beginSize = dataThreadPool.getActiveCount();
  21. int todoCount = needThreadNum - beginSize;
  22. if (todoCount > 0) {
  23. //增加
  24. //原子性计数器
  25. CountDownLatch countDownLatch = new CountDownLatch(todoCount);
  26. for (int i = 0; i < todoCount; i++) {
  27. dataThreadPool.execute(() -> {
  28. popThreadVector.add(Thread.currentThread());
  29. countDownLatch.countDown();
  30. while (true) {
  31. if (Thread.currentThread().isInterrupted()) {
  32. break;
  33. }
  34. handleDataFromRedis(queueName);
  35. }
  36. });
  37. }
  38. countDownLatch.await(20, TimeUnit.MILLISECONDS);
  39. logger.info("++++++++ 数据队列平衡池调整线程数,当前数量: " + beginSize +
  40. " ,本次增加线程:" + todoCount + "个,当前pop线程总数:" + dataThreadPool.getActiveCount() + "个 ++++++++");
  41. } else if (todoCount < -1) {
  42. //减少(这里-1个线程作为缓冲,防止多一个线程多了少一个线程少了,不断销毁创建,浪费系统资源)
  43. todoCount = 0 - todoCount;
  44. for (int i = 0, doCount = 0; i < popThreadVector.size() && doCount < todoCount; doCount++) {
  45. //关闭多余线程
  46. try {
  47. //从前面开始移除,Vector移除后会自动补位
  48. popThreadVector.get(0).interrupt();
  49. popThreadVector.remove(0);
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. //等待线程释放完成
  55. Thread.sleep(20);
  56. logger.info("-------- 数据队列平衡池调整线程数,当前数量: " + beginSize +
  57. " ,本次减少线程:" + todoCount + "个,当前pop线程总数:" + dataThreadPool.getActiveCount() + "个 --------");
  58. }
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. });
  64. }

发表评论

表情:
评论列表 (有 0 条评论,43人围观)

还没有评论,来说两句吧...

相关阅读

    相关 消息队列

    对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。下面我们以这个本质作为根,一起由浅入深地聊聊 MQ。 01 从

    相关 消息队列

    1. 消息队列在项目中的使用 背景:在分布式系统中是如何处理高并发的。 由于在高并发的环境下,来不及同步处理用户发送的请求,则会导致请求发生阻塞。比如说,大量的ins

    相关 消息队列

    消息队列是啥?我觉得大家都心知肚明,已经众所周知到不用解释的程度。不过,但凡学习、解释一样东西,都应该遵循  “它是什么?”、  “做什么用?”、  “为啥要用它”

    相关 消息队列动态负载

    关于reids中的消息队列,以及其他的队列的处理方式。(底层一定要是队列,而不是伪队列。) 1.Reid中的消息队列的长度控制原理 对于一个队列,他的长度会随着放入数据而增

    相关 消息队列

    消息队列介绍 维基百科上的描述:在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通

    相关 消息队列

    为什么写这篇文章? 博主有两位朋友分别是小A和小B: 1. 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑。再不然就