批處理(lǐ)支持
實際業務(wù)當中(zhōng),我們除了會做單條規則計算外,還有(yǒu)可(kě)能(néng)需要運行規則引擎來處理(lǐ)一大批數據,這些數據可(kě)能(néng)有(yǒu)幾萬條,幾十萬條,甚至更多(duō)。在這種情況下,如果我們還是采用(yòng)普通的KnowledgeSession在一個線(xiàn)程裏處理(lǐ)大批量數據的話,那麽引擎還是隻能(néng)在當前線(xiàn)程裏運行,這樣就會需要很(hěn)長(cháng)的時間才能(néng)可(kě)能(néng)将這幾十萬條甚至更多(duō)的數據處理(lǐ)完成,在這個時候,為(wèi)了充分(fēn)利用(yòng)服務(wù)器較強的CPU性能(néng),我們可(kě)以使用(yòng)BatchSession利用(yòng)多(duō)線(xiàn)程并行處理(lǐ)這些數據。顧名(míng)思義BatchSession是用(yòng)來做批處理(lǐ)任務(wù)的會話對象,它是 URule Pro當中(zhōng)提供的多(duō)線(xiàn)程并行處理(lǐ)大批量業務(wù)數據的規則會話對象。
要得到一個BatchSession對象,我們也需要提供一個或多(duō)個KnowledgePackage對象,獲取KnowledgePackage對象的方法與上面介紹的方法相同,有(yǒu)了KnowledgePackage對象後,就可(kě)以利用(yòng)KnowledgeSessionFactory類創建一個BatchSession對象。
在com.bstek.urule.runtime.KnowledgeSessionFactory類中(zhōng)除上面提到的兩個構建KnowledgeSession的靜态方法外,還提供了另外八個可(kě)用(yòng)于構建BatchSession的靜态方法,其源碼如下:
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,這裏默認将開啓10個普通的線(xiàn)程池來運行提交的批處理(lǐ)任務(wù),默認将每100個任務(wù)放在一個線(xiàn)程裏處理(lǐ)
* @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSession(KnowledgePackage knowledgePackage){
return new BatchSessionImpl(knowledgePackage,BatchSession.DEFAULT_THREAD_SIZE,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,第二個參數來指定線(xiàn)程池中(zhōng)可(kě)用(yòng)線(xiàn)程個數,默認将每100個任務(wù)放在一個線(xiàn)程裏處理(lǐ)
* @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
* @param threadSize 線(xiàn)程池中(zhōng)可(kě)用(yòng)的線(xiàn)程個數
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSessionByThreadSize(KnowledgePackage knowledgePackage,int threadSize){
return new BatchSessionImpl(knowledgePackage,threadSize,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,這裏默認将開啓10個普通的線(xiàn)程池來運行提交的批處理(lǐ)任務(wù),第二個參數用(yòng)來決定單個線(xiàn)程處理(lǐ)的任務(wù)數
* @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
* @param batchSize 單個線(xiàn)程處理(lǐ)的任務(wù)數
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSessionByBatchSize(KnowledgePackage knowledgePackage,int batchSize){
return new BatchSessionImpl(knowledgePackage,BatchSession.DEFAULT_THREAD_SIZE,batchSize);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,第二個參數來指定線(xiàn)程池中(zhōng)可(kě)用(yòng)線(xiàn)程個數,第三個參數用(yòng)來決定單個線(xiàn)程處理(lǐ)的任務(wù)數
* @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage對象
* @param threadSize 線(xiàn)程池中(zhōng)可(kě)用(yòng)的線(xiàn)程個數
* @param batchSize 單個線(xiàn)程處理(lǐ)的任務(wù)數
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSession(KnowledgePackage knowledgePackage,int threadSize,int batchSize){
return new BatchSessionImpl(knowledgePackage,threadSize,batchSize);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,這裏默認将開啓10個普通的線(xiàn)程池來運行提交的批處理(lǐ)任務(wù),默認将每100個任務(wù)放在一個線(xiàn)程裏處理(lǐ)
* @param knowledgePackage 創建BatchSession對象所需要的KnowledgePackage集合對象
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSession(KnowledgePackage[] knowledgePackages){
return new BatchSessionImpl(knowledgePackages,BatchSession.DEFAULT_THREAD_SIZE,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,第二個參數來指定線(xiàn)程池中(zhōng)可(kě)用(yòng)線(xiàn)程個數,默認将每100個任務(wù)放在一個線(xiàn)程裏處理(lǐ)
* @param knowledgePackages 創建BatchSession對象所需要的KnowledgePackage集合對象
* @param threadSize 線(xiàn)程池中(zhōng)可(kě)用(yòng)的線(xiàn)程個數
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSessionByThreadSize(KnowledgePackage[] knowledgePackages,int threadSize){
return new BatchSessionImpl(knowledgePackages,threadSize,BatchSession.DEFAULT_BATCH_SIZE);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,這裏默認将開啓10個普通的線(xiàn)程池來運行提交的批處理(lǐ)任務(wù),第二個參數用(yòng)來決定單個線(xiàn)程處理(lǐ)的任務(wù)數
* @param knowledgePackages 創建BatchSession對象所需要的KnowledgePackage集合對象
* @param batchSize 單個線(xiàn)程處理(lǐ)的任務(wù)數
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSessionByBatchSize(KnowledgePackage[] knowledgePackages,int batchSize){
return new BatchSessionImpl(knowledgePackages,BatchSession.DEFAULT_THREAD_SIZE,batchSize);
}
/**
* 創建一個用(yòng)于批處理(lǐ)的BatchSession對象,第二個參數來指定線(xiàn)程池中(zhōng)可(kě)用(yòng)線(xiàn)程個數,第三個參數用(yòng)來決定單個線(xiàn)程處理(lǐ)的任務(wù)數
* @param knowledgePackages 創建BatchSession對象所需要的KnowledgePackage集合對象
* @param threadSize 線(xiàn)程池中(zhōng)可(kě)用(yòng)的線(xiàn)程個數
* @param batchSize 單個線(xiàn)程處理(lǐ)的任務(wù)數
* @return 返回一個新(xīn)的BatchSession對象
*/
public static BatchSession newBatchSession(KnowledgePackage[] knowledgePackages,int threadSize,int batchSize){
return new BatchSessionImpl(knowledgePackages,threadSize,batchSize);
}
前面介紹規則流中(zhōng)的決策節點時,了解到決策節點中(zhōng)支持百分(fēn)比分(fēn)流,這種百分(fēn)比分(fēn)流就要求必須是在使用(yòng)BatchSession處理(lǐ)一批數據的時候,或者是一個用(yòng)一個普通的KnowledgeSession一次性處理(lǐ)多(duō)條數據才有(yǒu)效,否則規則流隻會走比例最高的那個分(fēn)支。
BatchSession接口比較簡單,它隻定義了兩個方法:
/**
* 添加一個具(jù)體(tǐ)要執行Business對象
* @param business Business對象實例
*/
void addBusiness(Business business);
/**
* 等待線(xiàn)程池中(zhōng)所有(yǒu)業務(wù)線(xiàn)程執行完成,在進行批處理(lǐ)操作(zuò)時一定要以此方法作(zuò)為(wèi)方法調用(yòng)結尾
*/
void waitForCompletion();
可(kě)以看到,它可(kě)以接收若幹個名(míng)為(wèi)com.bstek.urule.runtime.Business接口實例,Business接口比較簡單,它隻有(yǒu)一個方法:
package com.bstek.urule.runtime;
/**
* @author Jacky.gao
* @since 2015年9月29日
*/
public interface Business {
void execute(KnowledgeSession session);
}
在Business實現類中(zhōng),我們的業務(wù)寫在execute方法當中(zhōng),在這個方法中(zhōng),隻有(yǒu)一個KnowledgeSession對象,這個session對象就是我們與規則引擎操作(zuò)的對象,示例代碼如下:
//從Spring中(zhōng)獲取KnowledgeService接口實例
KnowledgeService service=(KnowledgeService)Utils.getApplicationContext().getBean(KnowledgeService.BEAN_ID);
//通過KnowledgeService接口獲取指定的知識包ID"213"
KnowledgePackage knowledgePackage=service.getKnowledge("213");
//通過取的KnowledgePackage對象創建BatchSession對象,在這個對象中(zhōng),我們将開啓5個線(xiàn)程,每個線(xiàn)程最多(duō)放置10個Bussiness接口實例運行
BatchSession batchSession=KnowledgeSessionFactory.newBatchSession(knowledgePackage, 5, 10);
for(int i=0;i<100;i++){
batchSession.addBusiness(new Business(){
@Override
public void execute(KnowledgeSession session) {
Employee employee=new Employee();
employee.setSalary(11080);
//将業務(wù)數據對象Employee插入到KnowledgeSession中(zhōng)
session.insert(employee);
session.startProcess("demo");
}
});
}
//等待所有(yǒu)的線(xiàn)程執行完成,對于BatchSession調用(yòng)來說,此行代碼必不可(kě)少,否則将導緻錯誤
batchSession.waitForCompletion();