内存队列+线程池实现异步处理 àì夳堔傛蜴生んèń 2021-12-21 15:23 280阅读 0赞 **ThreadPoolManagers接口** /** * 线程池管理 * @ClassName ThreadPoolManagers * @Author dlh * @Date 2019/6/27 0027 下午 4:44 * @Version 1.0 **/ public interface ThreadPoolManagers<T> { /** * 任务提交执行 * @param commonService * @param message */ void execute(T commonService, String message); /** * 最大队列容量 */ int getMaxThreadQueueSize(); /** * 核心容量 */ int getCorePoolSize(); /** * 最大容量 */ int getMaxPoolSize(); /** * 存活时间 */ int getKeepAliveTime(); /** * 最大缓存队列容量 */ int getMaxCacheQueueSize(); /** * 标识 */ boolean getOffline(); } **ThreadPoolManagersImpl实现类** /** * @ClassName ThreadPoolManagersImpl * @Author dlh * @Date 2019/6/26 0026 下午 6:00 * @Version 1.0 **/ public class ThreadPoolManagersImpl<T> implements ThreadPoolManagers<T> { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolManagersImpl.class); /** * 缓存队列 */ private volatile LinkedBlockingQueue<Runnable> cacheLinkedQueue; /** * 最大队列容量 */ private final int MAX_THREAD_QUEUE_SIZE = 1000; /** * 核心容量 */ private final int CORE_POOL_SIZE = 20; /** * 最大容量 */ private final int MAX_POOL_SIZE = 40; /** * 存活时间 */ private final int KEEP_ALIVE_TIME = 0; /** * 最大缓存队列容量 */ private final int MAX_CACHE_QUEUE_SIZE = 10000; /** * 线程池 */ private volatile ThreadPoolExecutor threadPoolExecutor; /** * 标识 */ private volatile static boolean offline = false; public ThreadPoolManagersImpl() { getCacheQueue(); getExecutorService(); } @Override public void execute(T commonService, String message){ int size = getExecutorService().getQueue().size(); logger.info("线程池queue,队列size :" + size); if (offline){ addQueue((CommonService) commonService,message); logger.info("缓存,队列开始缓存,size:{} " , cacheLinkedQueue.size()); if(size == 0){ logger.info("开始恢复"); new OfflineResumeThread().start(); } return; } if (size >= MAX_THREAD_QUEUE_SIZE){ setOffline(true); return; } getExecutorService().submit(addTask((CommonService)commonService,message)); } private void addQueue(CommonService commonService,String message){ getCacheQueue().add(addTask(commonService,message)); } /** * 生成任务 * @param commonService * @param message * @return */ private Runnable addTask(CommonService commonService,String message){ return new Runnable(){ @Override public void run() { commonService.process(message); } }; } private ThreadPoolExecutor getExecutorService(){ if (threadPoolExecutor == null){ synchronized (ThreadPoolManagersImpl.class){ if (threadPoolExecutor == null){ threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE , MAX_POOL_SIZE , KEEP_ALIVE_TIME , TimeUnit.MILLISECONDS , new LinkedBlockingQueue<>(MAX_THREAD_QUEUE_SIZE) ,new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { logger.info("线程被拒绝掉,核心线程队列容量{}",executor.getQueue().size()); // 可以做一些补偿机制 } } ); } } } return threadPoolExecutor; } private LinkedBlockingQueue getCacheQueue(){ if (cacheLinkedQueue == null){ synchronized (ThreadPoolManagersImpl.class){ if (cacheLinkedQueue == null){ cacheLinkedQueue = new LinkedBlockingQueue<>(MAX_CACHE_QUEUE_SIZE); } } } return cacheLinkedQueue; } class OfflineResumeThread extends Thread{ @Override public void run(){ while (true){ if (getExecutorService().getQueue().size() >= MAX_THREAD_QUEUE_SIZE){ continue; } Runnable runnable = null; try { runnable = cacheLinkedQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("移除 queue,size :{}" , cacheLinkedQueue.size()); if (runnable != null){ getExecutorService().submit(runnable); } if (cacheLinkedQueue.size() == 0){ setOffline(false); logger.info("恢复完毕"); return; } } } } @Override public int getMaxThreadQueueSize() { return MAX_THREAD_QUEUE_SIZE; } @Override public int getCorePoolSize() { return CORE_POOL_SIZE; } @Override public int getMaxPoolSize() { return MAX_POOL_SIZE; } @Override public int getKeepAliveTime() { return KEEP_ALIVE_TIME; } @Override public int getMaxCacheQueueSize() { return MAX_CACHE_QUEUE_SIZE; } @Override public boolean getOffline() { return offline; } private void setOffline(boolean offline) { ThreadPoolManagersImpl.offline = offline; } **CommonService 是我的一个业务处理的类,大家可以自己换成自己要用的** **调用结果就不展示了,有兴趣可以自己跑起来看看** **缓存队列借助备忘录模式思想,线程池的工作队列满了后,再来的请求会放入缓存队列,等线程池的队列空闲了,再恢复缓存队列的请求**
还没有评论,来说两句吧...