博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
让你的java业务代码并发的调用,并正确的处理返回结果
阅读量:4220 次
发布时间:2019-05-26

本文共 6560 字,大约阅读时间需要 21 分钟。

1 背景

实际的java web开发过程中,在业务处理的过程中,需要调用多次外部的服务(可能是http服务,也可能是rpc服务),而这写调用是可以并行的。然而,目前网上能找到如何编写这类代码的资料极少,对于初学者来说想要实现起来会有一定的难度。因此,本篇文章将给出一个简单易用的并行处理代码,希望对你有帮助。

2 实现的demo

2.1 工具类 ConcurrencyDataTagEnum.java

@Getter@AllArgsConstructorpublic enum ConcurrencyDataTagEnum {    /**     * 商品id与商品的映射标记     */    PRODUCT_ID_PRODUCT_MAP_TAG("ProductIdProductMap", "商品id与商品的映射标记"),    /**     * 商品id与品牌的映射标记     */    PRODUCT_ID_BRAND_MAP_TAG("ProductIdBrandMap", "商品id与品牌的映射标记"),    /**     * 商品id与类别list的映射标记     */    PRODUCT_ID_CATEGORY_LIST_MAP_TAG("ProductIdCategoryListMap", "品id与类别list的映射标记"),    /**     * 商品id与区域id的映射标记     */    PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG("ProductIdAreaZipAreaMap", "商品id与区域id的映射标记"),    /**     * 门店商品id与活动列表的映射标记     */    EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG("EffectiveStoreProductIdActivityESOMap", "门店商品id与活动列表的映射标记");    private String key;    private String value;}

2.2 ConcurrencyDataDTO.java

@Datapublic class ConcurrencyDataDTO
{ /** * 数据的标签,便于后期在一堆并发数据中取得想要的数据 */ private ConcurrencyDataTagEnum concurrencyDataTagEnum; private T data; public static ConcurrencyDataDTO create(ConcurrencyDataTagEnum concurrencyDataTagEnum, Object data) { ConcurrencyDataDTO concurrencyDataDTO = new ConcurrencyDataDTO(); concurrencyDataDTO.setConcurrencyDataTagEnum(concurrencyDataTagEnum); concurrencyDataDTO.setData(data); return concurrencyDataDTO; }}

2.3 ExecutorTemplate.java

@Slf4jpublic class ExecutorTemplate {    private volatile ThreadPoolTaskExecutor executor = null;    private volatile List
futures = null; public ExecutorTemplate(ThreadPoolTaskExecutor executor) { this.futures = Collections.synchronizedList(new ArrayList
()); this.executor = executor; } public void submit(Runnable task) { Future future = executor.submit(task); futures.add(future); check(future); } public void submit(Callable
task) { Future future = executor.submit(task); futures.add(future); check(future); } private void check(Future future) { if (future.isDone()) { // 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对异常时快速响应 try { future.get(); } catch (Throwable e) { // 取消完之后立马退出 cancelAllFutures(); throw new RuntimeException(e); } } } public synchronized List
waitForResult() { List
result = new ArrayList(); RuntimeException exception = null; for (Future future : futures) { try { Object object = future.get(); if (object instanceof ConcurrencyDataDTO) { result.add((ConcurrencyDataDTO) object); } else { log.warn("future.get result object type is'nt ConcurrencyDTO, return object:{}.", JsonUtils.toJson(object)); } } catch (Throwable e) { exception = new RuntimeException(e); // 如果一个future出现了异常,就退出 break; } } if (exception != null) { cancelAllFutures(); throw exception; } else { return result; } } public void cancelAllFutures() { for (Future future : futures) { if (!future.isDone() && !future.isCancelled()) { future.cancel(true); } } } public void clear() { futures.clear(); }}

2.4 核心业务代码

我们将下述的4个本来想要串行的代码并行化:

storeProductRelativeService.getProductIdProductMap(productIdSet))
storeProductRelativeService.getProductIdBrandMap(productIdSet))
storeProductRelativeService.getProductIdCategoryListMap(productIdSet))
storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet))

... Map
productIdProductMap = null; Map
productIdBrandMap = null; Map
> productIdCategoryListMap = null; Map
productIdAreaZipAreaMap = null; Map
> storeProductIdActivityListMap = null;try { // 并发执行相关的rpc调用 if (!CollectionUtils.isEmpty(productIdSet)) { executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_PRODUCT_MAP_TAG, storeProductRelativeService.getProductIdProductMap(productIdSet))); executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_BRAND_MAP_TAG, storeProductRelativeService.getProductIdBrandMap(productIdSet))); executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_CATEGORY_LIST_MAP_TAG, storeProductRelativeService.getProductIdCategoryListMap(productIdSet))); executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG, storeProductRelativeService.getProductIdAreaZipAreaMap(productIdSet))); } if (!CollectionUtils.isEmpty(storeProductIdSet)) { executorTemplate.submit(() -> ConcurrencyDataDTO.create(ConcurrencyDataTagEnum.EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG, activityStoreProductMapsService.getEffectiveStoreProductIdActivityESOMap(storeProductIdSet))); } // 等待所有异步执行结果 List
concurrencyDataDTOList = executorTemplate.waitForResult(); for (ConcurrencyDataDTO concurrencyDataDTO : concurrencyDataDTOList) { ConcurrencyDataTagEnum concurrencyDataTagEnum = concurrencyDataDTO.getConcurrencyDataTagEnum(); switch (concurrencyDataTagEnum) { case PRODUCT_ID_PRODUCT_MAP_TAG: productIdProductMap = (Map
) concurrencyDataDTO.getData(); break; case PRODUCT_ID_BRAND_MAP_TAG: productIdBrandMap = (Map
) concurrencyDataDTO.getData(); break; case PRODUCT_ID_CATEGORY_LIST_MAP_TAG: productIdCategoryListMap = (Map
>) concurrencyDataDTO.getData(); break; case PRODUCT_ID_AREA_ZIP_AREA_MAP_TAG: productIdAreaZipAreaMap = (Map
) concurrencyDataDTO.getData(); break; case EFFECTIVE_STORE_PRODUCT_ID_ACTIVITY_ESO_MAP_TAG: storeProductIdActivityListMap = (Map
>) concurrencyDataDTO.getData(); break; default: log.warn("concurrencyDataTagEnum[{}] is unknown, return data:{}.", concurrencyDataTagEnum, JsonUtils.toJson(concurrencyDataDTO)); break; } } } catch (Exception e) { log.error("exception, error message:{}.", e.getMessage(), e); throw e; } finally { // 注意:一定要添加这个,不然会导致内存泄漏 executorTemplate.clear(); } // 处理上述业务的返回值 // todo ...

executorTemplate 这里面的线程池的参数配置,可以根据业务进行显示的配置。

转载地址:http://umomi.baihongyu.com/

你可能感兴趣的文章
poj 3863Business Center
查看>>
Android编译系统简要介绍和学习计划
查看>>
Android编译系统环境初始化过程分析
查看>>
user2eng 笔记
查看>>
DRM in Android
查看>>
ARC MRC 变换
查看>>
Swift cell的自适应高度
查看>>
【linux】.fuse_hiddenXXXX 文件是如何生成的?
查看>>
【LKM】整合多个LKM为1个
查看>>
【Windows C++】调用powershell上传指定目录下所有文件
查看>>
Java图形界面中单选按钮JRadioButton和按钮Button事件处理
查看>>
小练习 - 排序:冒泡、选择、快排
查看>>
SparkStreaming 如何保证消费Kafka的数据不丢失不重复
查看>>
Spark Shuffle及其调优
查看>>
数据仓库分层
查看>>
常见数据结构-TrieTree/线段树/TreeSet
查看>>
Hive数据倾斜
查看>>
TopK问题
查看>>
HQL排查数据倾斜
查看>>
DAG以及任务调度
查看>>