博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并行调度框架封装及演示样例
阅读量:6670 次
发布时间:2019-06-25

本文共 5100 字,大约阅读时间需要 17 分钟。

參考资料:  阿里巴巴开源项目 CobarClient  源代码实现。

分享作者:闫建忠 

分享时间:2014年5月7日

---------------------------------------------------------------------------------------

并行调度封装类设计: BXexample.java

package org.hdht.business.ordermanager.quartzjob;import java.util.ArrayList;import java.util.Collection;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.concurrent.BlockingQueue;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.commons.lang.exception.ExceptionUtils;import org.springframework.dao.ConcurrencyFailureException;public class BXexample {	 private static ExecutorService createCustomExecutorService(int poolSize, final String method) {	        int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量	        if (poolSize < coreSize) {	            coreSize = poolSize;	        }	        ThreadFactory tf = new ThreadFactory() {	            public Thread newThread(Runnable r) {	                Thread t = new Thread(r, "thread created at BXexample method [" + method + "]");	                t.setDaemon(true);	                return t;	            }	        };	        BlockingQueue
queueToUse = new LinkedBlockingQueue
(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60, TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } public static
List
getSubListPage(List
list, int skip,int pageSize) { if (list == null || list.isEmpty()) { return null; } int startIndex = skip; int endIndex = skip + pageSize; if (startIndex > endIndex || startIndex > list.size()) { return null; } if (endIndex > list.size()) { endIndex = list.size(); } return list.subList(startIndex, endIndex); } public static void BXfunction(Collection
paramCollection,final ExectueCallBack ecb){ //构建运行器 ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection"); try { //监视器 final CountDownLatch latch = new CountDownLatch(paramCollection.size()); final StringBuffer exceptionStaktrace = new StringBuffer(); Iterator
iter = paramCollection.iterator(); while (iter.hasNext()) { final Object entity = iter.next(); Runnable task = new Runnable() { public void run() { try { ecb.doExectue(entity); } catch (Throwable t) { exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t)); } finally { latch.countDown(); } } }; executor.execute(task);//并行调度 } try { latch.await();//监视器等待全部线程运行完成 } catch (InterruptedException e) { //调度异常 throw new ConcurrencyFailureException( "unexpected interruption when re-arranging parameter collection into sub-collections ",e); } if (exceptionStaktrace.length() > 0) { //业务异常 throw new ConcurrencyFailureException( "unpected exception when re-arranging parameter collection, check previous log for details.\n"+ exceptionStaktrace); } } finally { executor.shutdown();//运行器关闭 } } }

回调接口类设计:ExectueCallBack.java

package org.hdht.business.ordermanager.quartzjob;public interface ExectueCallBack {	void doExectue(Object executor) throws Exception;}

演示样例(hello 演示样例)

public static void main(String[] args) {		List
paramCollection = new ArrayList
(); paramCollection.add("9"); paramCollection.add("2"); paramCollection.add("18"); paramCollection.add("7"); paramCollection.add("6"); paramCollection.add("1"); paramCollection.add("3"); paramCollection.add("4"); paramCollection.add("14"); paramCollection.add("13"); int freesize = 3;//当前处理能力 for(int i=0;i
tl = BXexample.getSubListPage(paramCollection, i, freesize); BXexample.BXfunction(tl,new ExectueCallBack() { public void doExectue(Object executor) throws Exception { int k = Integer.parseInt((String)executor); for(int i=0;i

演示样例(实际业务应用演示样例)

/**		 * 并行调度相关处理		 * 		 * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 加入到paramMapList列表中		 */		List
> paramMapList = new ArrayList
>(); for (Iterator
iterator = paramSatellites.iterator(); iterator.hasNext();) { OrderParamSatellite paramSatellite = iterator.next(); paramMapList.addAll(this.getParamMapList(paramSatellite)); } //依据集群最大处理能力,分页处理任务列表,作为list截取的步长 int fsize = HostServerQueue.getInstance().freeSize(); for(int i=0;i
> tl = BXexample.getSubListPage(paramMapList, i, fsize); //并行调度 BXexample.BXfunction(tl,new ExectueCallBack(){ public void doExectue(Object executor) throws Exception { ExecuteOrderBTask((Map
)executor); } }); //动态查找空暇节点数量,即集群最大处理能力 fsize = HostServerQueue.getInstance().freeSize(); }

转载地址:http://yclxo.baihongyu.com/

你可能感兴趣的文章
tomcat 服务不支持 chkconfig 以及其他服务不能添加到开机启动时的操作
查看>>
【转载】Winform开发框架之权限管理系统
查看>>
Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN
查看>>
让PowerShell用上Git
查看>>
XXXXX was compiled with optimization - stepping may behave oddly; variables may not be available.
查看>>
Linux0.11内核--几种地址(逻辑地址、线性地址、物理地址)的含义
查看>>
posix多线程有感--自旋锁
查看>>
静态库中如何包含资源文件
查看>>
NOIP2014 提高组 Day2——寻找道路
查看>>
设置Sysctl.conf用以提高Linux的性能(最完整的sysctl.conf优化方案)
查看>>
tp路由+伪静态+去掉index.php
查看>>
R.I.P. PK
查看>>
今日晚餐:姹紫嫣红阳春面
查看>>
【转载】使用铁哥SmartFlash快速开发方案:66行代码搞定抽奖程序!
查看>>
Map<key,value>泛型get(key)值为null问题解决
查看>>
ZendFramework学习第一章
查看>>
40种网页小技巧
查看>>
PHP 乱码解决方面
查看>>
在Linux中一个网卡绑定多个IP设定
查看>>
Ural 1519 Formula 1 (插头DP)
查看>>