消息队列动态负载
关于reids中的消息队列,以及其他的队列的处理方式。(底层一定要是队列,而不是伪队列。)
1.Reid中的消息队列的长度控制原理
对于一个队列,他的长度会随着放入数据而增加,随着取出数据而减少,那么放入的速度不变的前提下,取出的速度增加则会导致队列的长度变短。依据这个原理可以实现动态调整队列的长度。
而放入和取出随着调整取出占用cpu时间片的变化就一定可以维持在一个稳定的范围内。
2.Reid中的消息队列控制长度实现
Redis中队列增加,则表明当前处理队列的线程以及跟不上放入的速度,所以可以通过增加处理线程数量来增加cpu处理pop队列的时间,以队列长度除以一个定值,动态算出当前处理队列的线程数,让这些线程一起处理队列,这样可以让队列的长度维持在一个范围内。
随着队列的放入数据量的增加,服务端会新启用更过的线程去处理队列,来维持一个可控范围。
而当队列长度下降后,服务端会根据队列长度来减少线程,这样就可以保证队列长度在一个可控的范围内。服务端的压力也会随之降低。
3.本思路的优势和缺点
优点在于系统可以动态的调整当前处理的线程数量,来适应当前来队列的压力,充分利用系统的资源,一定可以控制队列内的内容长度,确保实时性。
缺点在于当系统的负载较高时,会申请更多的cpu时间片去处理队列,而抢占别的任务在cpu上的处理时间,拖慢系统的速度。(如果服务器资源匮乏,会导致别的操作的相应速度变慢。)当然,关于系统变慢的问题,可以通过增加系统的配置来提升。
如果短时间内的高并发数据可以通过这种方式很好的及时处理。实现系统资源的动态分配。
代码:
/**
* 动态线程池去pop数据
*
* @param queueName queueName
*/
private void dynamicPoolHandleData(String queueName) {
//使用线程池
dataThreadPool.execute(
() -> {
Vector<Thread> popThreadVector = new Vector<>();
while (true) {
try {
//这里必不可少需要至少启动一个(为了阻塞住本线程)
handleDataFromRedis(queueName);
//判断队列长度
Long queueLength = redisUtils.length(queueName);
//需要额外启动的线程数
int needThreadNum = queueLength.intValue() / 3000 + 1;
needThreadNum = needThreadNum >= dataThreadPool.getMaximumPoolSize() ? dataThreadPool.getMaximumPoolSize() : needThreadNum;
int beginSize = dataThreadPool.getActiveCount();
int todoCount = needThreadNum - beginSize;
if (todoCount > 0) {
//增加
//原子性计数器
CountDownLatch countDownLatch = new CountDownLatch(todoCount);
for (int i = 0; i < todoCount; i++) {
dataThreadPool.execute(() -> {
popThreadVector.add(Thread.currentThread());
countDownLatch.countDown();
while (true) {
if (Thread.currentThread().isInterrupted()) {
break;
}
handleDataFromRedis(queueName);
}
});
}
countDownLatch.await(20, TimeUnit.MILLISECONDS);
logger.info("++++++++ 数据队列平衡池调整线程数,当前数量: " + beginSize +
" ,本次增加线程:" + todoCount + "个,当前pop线程总数:" + dataThreadPool.getActiveCount() + "个 ++++++++");
} else if (todoCount < -1) {
//减少(这里-1个线程作为缓冲,防止多一个线程多了少一个线程少了,不断销毁创建,浪费系统资源)
todoCount = 0 - todoCount;
for (int i = 0, doCount = 0; i < popThreadVector.size() && doCount < todoCount; doCount++) {
//关闭多余线程
try {
//从前面开始移除,Vector移除后会自动补位
popThreadVector.get(0).interrupt();
popThreadVector.remove(0);
} catch (Exception e) {
e.printStackTrace();
}
}
//等待线程释放完成
Thread.sleep(20);
logger.info("-------- 数据队列平衡池调整线程数,当前数量: " + beginSize +
" ,本次减少线程:" + todoCount + "个,当前pop线程总数:" + dataThreadPool.getActiveCount() + "个 --------");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
还没有评论,来说两句吧...