本文共 6560 字,大约阅读时间需要 21 分钟。
实际的java web开发过程中,在业务处理的过程中,需要调用多次外部的服务(可能是http服务,也可能是rpc服务),而这写调用是可以并行的。然而,目前网上能找到如何编写这类代码的资料极少,对于初学者来说想要实现起来会有一定的难度。因此,本篇文章将给出一个简单易用的并行处理代码,希望对你有帮助。
@Getter@AllArgsConstructorpublic enum ConcurrencyDataTagEnum { /** * 商品id与商品的映射标记 */ PRODUCT_ID_PRODUCT_MAP_TAG("ProductIdProductMap", "商品id与商品的映射标记"), /** * 商品id与品牌的映射标记 */ PRODUCT_ID_BRAND_MAP_TAG("ProductIdBrandMap", "商品id与品牌的映射标记"), /** * 商品id与类别list的映射标记 */ PRODUCT_ID_CATEGORY_LIST_MAP_TAG("ProductIdCategoryListMap", "品id与类别list的映射标记"), /** * 商品id与区域id的映射标记 */ PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG("ProductIdAreaZipAreaMap", "商品id与区域id的映射标记"), /** * 门店商品id与活动列表的映射标记 */ EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG("EffectiveStoreProductIdActivityESOMap", "门店商品id与活动列表的映射标记"); private String key; private String value;}
@Datapublic class ConcurrencyDataDTO{ /** * 数据的标签,便于后期在一堆并发数据中取得想要的数据 */ private ConcurrencyDataTagEnum concurrencyDataTagEnum; private T data; public static ConcurrencyDataDTO create(ConcurrencyDataTagEnum concurrencyDataTagEnum, Object data) { ConcurrencyDataDTO concurrencyDataDTO = new ConcurrencyDataDTO(); concurrencyDataDTO.setConcurrencyDataTagEnum(concurrencyDataTagEnum); concurrencyDataDTO.setData(data); return concurrencyDataDTO; }}
@Slf4jpublic class ExecutorTemplate { private volatile ThreadPoolTaskExecutor executor = null; private volatile Listfutures = null; public ExecutorTemplate(ThreadPoolTaskExecutor executor) { this.futures = Collections.synchronizedList(new ArrayList ()); this.executor = executor; } public void submit(Runnable task) { Future future = executor.submit(task); futures.add(future); check(future); } public void submit(Callable task) { Future future = executor.submit(task); futures.add(future); check(future); } private void check(Future future) { if (future.isDone()) { // 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对异常时快速响应 try { future.get(); } catch (Throwable e) { // 取消完之后立马退出 cancelAllFutures(); throw new RuntimeException(e); } } } public synchronized List waitForResult() { List result = new ArrayList(); RuntimeException exception = null; for (Future future : futures) { try { Object object = future.get(); if (object instanceof ConcurrencyDataDTO) { result.add((ConcurrencyDataDTO) object); } else { log.warn("future.get result object type is'nt ConcurrencyDTO, return object:{}.", JsonUtils.toJson(object)); } } catch (Throwable e) { exception = new RuntimeException(e); // 如果一个future出现了异常,就退出 break; } } if (exception != null) { cancelAllFutures(); throw exception; } else { return result; } } public void cancelAllFutures() { for (Future future : futures) { if (!future.isDone() && !future.isCancelled()) { future.cancel(true); } } } public void clear() { futures.clear(); }}
我们将下述的4个本来想要串行的代码并行化:
storeProductRelativeService.getProductIdProductMap(productIdSet)) storeProductRelativeService.getProductIdBrandMap(productIdSet)) storeProductRelativeService.getProductIdCategoryListMap(productIdSet)) storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet))... MapproductIdProductMap = null; Map productIdBrandMap = null; Map > productIdCategoryListMap = null; Map productIdAreaZipAreaMap = null; Map > storeProductIdActivityListMap = null;try { // 并发执行相关的rpc调用 if (!CollectionUtils.isEmpty(productIdSet)) { executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_PRODUCT_MAP_TAG, storeProductRelativeService.getProductIdProductMap(productIdSet))); executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_BRAND_MAP_TAG, storeProductRelativeService.getProductIdBrandMap(productIdSet))); executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_CATEGORY_LIST_MAP_TAG, storeProductRelativeService.getProductIdCategoryListMap(productIdSet))); executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG, storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet))); } if (!CollectionUtils.isEmpty(storeProductIdSet)) { executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG, activityStoreProductMapsService.getEffectiveStoreProductIdActivityESOMap(storeProductIdSet))); } // 等待所有异步执行结果 List concurrencyDataDTOList = executorTemplate.waitForResult(); for (ConcurrencyDataDTO concurrencyDataDTO : concurrencyDataDTOList) { ConcurrencyDataTagEnum concurrencyDataTagEnum = concurrencyDataDTO.getConcurrencyDataTagEnum(); switch (concurrencyDataTagEnum) { case PRODUCT_ID_PRODUCT_MAP_TAG: productIdProductMap = (Map ) concurrencyDataDTO.getData(); break; case PRODUCT_ID_BRAND_MAP_TAG: productIdBrandMap = (Map ) concurrencyDataDTO.getData(); break; case PRODUCT_ID_CATEGORY_LIST_MAP_TAG: productIdCategoryListMap = (Map >) concurrencyDataDTO.getData(); break; case PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG: productIdAreaZipAreaMap = (Map ) concurrencyDataDTO.getData(); break; case EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG: storeProductIdActivityListMap = (Map >) concurrencyDataDTO.getData(); break; default: log.warn("concurrencyDataTagEnum[{}] is unknown, return data:{}.", concurrencyDataTagEnum, JsonUtils.toJson(concurrencyDataDTO)); break; } } } catch (Exception e) { log.error("exception, error message:{}.", e.getMessage(), e); throw e; } finally { // 注意:一定要添加这个,不然会导致内存泄漏 executorTemplate.clear(); } // 处理上述业务的返回值 // todo ...
executorTemplate 这里面的线程池的参数配置,可以根据业务进行显示的配置。
转载地址:http://umomi.baihongyu.com/