參考资料: 阿里巴巴开源项目 CobarClient 源代码实现。
分享作者:闫建忠
分享时间:2014年5月7日
---------------------------------------------------------------------------------------
并行调度封装类设计: BXexample.java
package org.hdht.business.ordermanager.quartzjob;import java.util.ArrayList;import java.util.Collection;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.concurrent.BlockingQueue;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.commons.lang.exception.ExceptionUtils;import org.springframework.dao.ConcurrencyFailureException;public class BXexample { private static ExecutorService createCustomExecutorService(int poolSize, final String method) { int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量 if (poolSize < coreSize) { coreSize = poolSize; } ThreadFactory tf = new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r, "thread created at BXexample method [" + method + "]"); t.setDaemon(true); return t; } }; BlockingQueuequeueToUse = new LinkedBlockingQueue (); final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60, TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } public static List getSubListPage(List list, int skip,int pageSize) { if (list == null || list.isEmpty()) { return null; } int startIndex = skip; int endIndex = skip + pageSize; if (startIndex > endIndex || startIndex > list.size()) { return null; } if (endIndex > list.size()) { endIndex = list.size(); } return list.subList(startIndex, endIndex); } public static void BXfunction(Collection paramCollection,final ExectueCallBack ecb){ //构建运行器 ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection"); try { //监视器 final CountDownLatch latch = new CountDownLatch(paramCollection.size()); final StringBuffer exceptionStaktrace = new StringBuffer(); Iterator iter = paramCollection.iterator(); while (iter.hasNext()) { final Object entity = iter.next(); Runnable task = new Runnable() { public void run() { try { ecb.doExectue(entity); } catch (Throwable t) { exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t)); } finally { latch.countDown(); } } }; executor.execute(task);//并行调度 } try { latch.await();//监视器等待全部线程运行完成 } catch (InterruptedException e) { //调度异常 throw new ConcurrencyFailureException( "unexpected interruption when re-arranging parameter collection into sub-collections ",e); } if (exceptionStaktrace.length() > 0) { //业务异常 throw new ConcurrencyFailureException( "unpected exception when re-arranging parameter collection, check previous log for details.\n"+ exceptionStaktrace); } } finally { executor.shutdown();//运行器关闭 } } }
回调接口类设计:ExectueCallBack.java
package org.hdht.business.ordermanager.quartzjob;public interface ExectueCallBack { void doExectue(Object executor) throws Exception;}
演示样例(hello 演示样例)
public static void main(String[] args) { ListparamCollection = new ArrayList (); paramCollection.add("9"); paramCollection.add("2"); paramCollection.add("18"); paramCollection.add("7"); paramCollection.add("6"); paramCollection.add("1"); paramCollection.add("3"); paramCollection.add("4"); paramCollection.add("14"); paramCollection.add("13"); int freesize = 3;//当前处理能力 for(int i=0;i tl = BXexample.getSubListPage(paramCollection, i, freesize); BXexample.BXfunction(tl,new ExectueCallBack() { public void doExectue(Object executor) throws Exception { int k = Integer.parseInt((String)executor); for(int i=0;i
演示样例(实际业务应用演示样例)
/** * 并行调度相关处理 * * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 加入到paramMapList列表中 */ List