多线程实例
场景如下
有多个品牌,每个品牌下有多个商品,现在要过滤掉其中没有商品的品牌。
除了没有商品的可能,还有用户被屏蔽的可能,因此还有一道过滤逻辑。
比如康师傅品牌下有康师傅红茶、康师傅方便面等等,盼盼品牌下啥也没有,就要把盼盼过滤掉,康师傅留下。
过滤的线程如下
//得到品牌下商品的个数
class CountGoods implements Callable<Integer>{
//分页参数
PageRequest request;
//品牌id
Long brandId;
//所有品牌列表,如果传入的品牌无效,就将此列表的该品牌去掉
CopyOnWriteArrayList<Long> brandList;
public CountGoods(PageRequest request, Long brandId, CopyOnWriteArrayList<Long> brandList){
this.request = request;
this.brandId = brandId;
this.brandList = brandList;
}
@Override
public Integer call() throws Exception{
//获取所有品牌下商品
Map<Long,List<Long>> brandSkuMap = getBrand(request.getBpin());
//获取该品牌下的商品
List<Long> subSkuList = brandSkuMap.get(fBrandId);
//防止空数据
if(subSkuList == null){
return 0;
}
//对该品牌下的商品进行过滤
List<GoodsInfo> goodsInfo= goodsRpc.synFilterSku(request.getBpin(), subSkuList);
if(goodsInfo== null){
return 0;
}else{
if(goodsInfo.size() == 0){
brandList.remove(brandId);
}
return goodsInfo.size();
}
}
}
多线程进行过滤
public List<Long> filterBrandList(PageRequest request, List<Long> IdList){
List<Callable<Integer>> calls = new ArrayList();
//过滤逻辑
Iterator<Long> iterator = IdList.iterator();
while(iterator.hasNext()){
Long brandId =iterator.next();
Callable call = new countId(request, brandId , (CopyOnWriteArrayList<Long>) IdList);
calls.add(call);
}
ExecutorService cacheThreadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
List<Future<Integer>> callableResult = null;
try{
if(calls != null && calls.size() > 0){
callableResult = cacheThreadPool.invokeAll(calls);
}
}catch (Exception e){
e.printStackTrace();
LogUtil.printErrorLog(log, "多线程过滤品牌出现异常");
}finally {
cacheThreadPool.shutdown();
}
return fontBrandIdList;
}
缺陷
毫无疑问,这段代码一上到线上运行,就出了bug。
报错很奇怪,说是执行线程时,执行线程里的过滤操作出错了,但是有部分是执行成功。
百思不得其姐,终于周末想到一个问题,我似乎没给它阻塞队列……
注意了,这个线程池的阻塞队列有问题,需要修改,这样只能有10个任务过来,更多的会被拒绝。修改线程池的阻塞队列类型即可。
new SynchronousQueue<Runnable>()
这个是没有阻塞队列的意思。
修改如下:
ExecutorService cacheThreadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100))
但是还有一个问题:
注意了,这样将线程池直接在service方法中new出来是不对的,因为这样没法复用线程。比如,两个人同时访问这个方法,就分别new了一个线程池,没有用同一个线程池,而且,如果并发很高,线程数量激增,很可能会挂掉。应该给个static的线程池,或者单独写一个线程池的bean,在所有地方复用。
还没有评论,来说两句吧...