服務(wù)端集群節點之間緩存更新(xīn)通知
如果您部署的服務(wù)端集群節點服務(wù)之間可(kě)以通過确定的URL地址訪問,那麽您可(kě)以通過3.8.集群管理(lǐ)進行集群節點緩存更新(xīn)的管理(lǐ)。
如果集群節點服務(wù)之間無法通過确定的URL地址訪問,比如每次節點重啓IP都不确定,或者節點個數可(kě)能(néng)随時随地發生變化,那麽可(kě)以參考如下實現,通過消息中(zhōng)間件通知的機制實現每個節點的緩存更新(xīn)。
這種方式無需在URule Pro的集群管理(lǐ)中(zhōng)配置每個節點URL地址,需要實現 ClusterPacketCacheAdapter
和 JarCacheAdapter
兩個URule更新(xīn)緩存的接口的方法生産(chǎn)消息:
package com.bstek.urule.console.cache.packet;
import java.util.List;
import java.util.Map;
//服務(wù)端集群節點知識包緩存
public interface ClusterPacketCacheAdapter {
public static final String BEAN_ID = "urule.clusterPacketCacheAdapter";
void putPacket(long packetId, PacketData paramPacketData);
void putPacket(String packetCode, PacketData paramPacketData);
void remove(long packetId);
void remove(String packetCode);
//刷新(xīn)知識包緩存時觸發
List<Map<String, Object>> refreshPacket(String groupId, long packetId);
//重置全部知識包緩存時觸發
List<Map<String, Object>> recacheAllPackets(String groupId);
//删除項目時觸發
List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list);
}
public abstract class JarCacheAdapter {
public static String BEAN_ID = "urule.jarCacheAdapter";
//更新(xīn)jar熱部署時觸發
public abstract List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception;
}
接收到消息的消費類中(zhōng),通過 com.bstek.urule.console.cache.ServerCacheManager
裏的方法進行當前節點的緩存更新(xīn):
方法 | 說明 |
---|---|
reloadPacket(String systemId, long packetId) | 重新(xīn)加載指定id知識包 |
syncPacketForRemoveProject(String systemId, long projectId) | 删除指定項目的知識包緩存 |
recacheAllPackets(String systemId) | 重新(xīn)加載所有(yǒu)知識包緩存 |
reloadDynamicJars(String systemId) | 重新(xīn)加載所有(yǒu)Jar緩存 |
下面做一個參考實現,前面我們使用(yòng)Redis實現了Session共享,為(wèi)了方便我們可(kě)以繼續使用(yòng)Redis的消息通知功能(néng),你也可(kě)以使用(yòng)更專業的消息中(zhōng)間件來實現,比如Kafka、RabbitMQ等。
1、定義消息通知的常量類
package com.bstek.urule.sample.mq.constant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* MQ所需常量
*/
@Component
public class MQConstant {
public static String QUEUE_CLUSTER_PACKET_REFRESHALL = "urule.cluster.knowledge.refreshall";
public static String QUEUE_CLUSTER_PACKET_REFRESH = "urule.cluster.knowledge.refresh";
public static String QUEUE_CLUSTER_PROJECT_REMOVE = "urule.cluster.project.remove";
public static String QUEUE_CLUSTER_JAR_SYNC = "urule.cluster.jar.sync";
public static String QUEUE_CLIENT_PACKET_DISABLE = "urule.client.knowledge.disable";
public static String QUEUE_CLIENT_PACKET_ENABLE = "urule.client.knowledge.enable";
public static String QUEUE_CLIENT_PACKET_REFRESH = "urule.client.knowledge.refresh";
public static String QUEUE_CLIENT_JAR_SYNC = "urule.client.jar.sync";
public static String MQIP;
@Value("${project.urule.mq.ip:127.0.0.1:9092}")
public void setMQIP(String mqIP) {
MQIP = mqIP;
}
public static String CLUSTER_TOPIC;
@Value("${project.urule.mq.clusterTopic:urule-cluster-topic}")
public void setClusterTopic(String clusterTopic) {
CLUSTER_TOPIC = clusterTopic;
}
public static String CLIENT_TOPIC;
@Value("${project.urule.mq.clientTopic:urule-client-topic}")
public void setClientTopic(String clientTopic) {
CLIENT_TOPIC = clientTopic;
}
}
2、yaml配置
project:
urule:
mq:
clusterTopic: urule-cluster-topic
clientTopic: urule-client-topic
3、配置RedisSessionConfig類
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.session.data.redis.config.ConfigureRedisAction;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
@Configuration
//設置session過期時間,默認是1800秒(miǎo)
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 30 * 60)
public class RedisSessionConfig {
@Bean
public static ConfigureRedisAction configureRedisAction(){
return ConfigureRedisAction.NO_OP;
}
@Autowired
private RedisConnectionFactory factory;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
}
4、配置RedisConsumerConfig消息消費類
package com.bstek.urule.sample.mq.redis.config;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.listener.RedisConsumerListener;
/**
* Redis消費頻道配置
*
*/
@Component
public class RedisConsumerConfig {
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(new RedisConsumerListener());
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new PatternTopic(MQConstant.CLUSTER_TOPIC));
return container;
}
}
5、為(wèi)了切換消息中(zhōng)間件方便,定義了一個生産(chǎn)消息的接口
package com.bstek.urule.sample.mq;
public interface CustomProducerService {
public static final String BEAN_ID = "urule.ext.CustomProducerService";
public void sendMessage(String topic, String message);
}
6、定義生産(chǎn)消息的實現類
package com.bstek.urule.sample.mq.redis.service;
import javax.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.bstek.urule.sample.mq.CustomProducerService;
/**
* Reids消息發送類
*/
@Service
public class RedisProducerServiceImpl implements CustomProducerService{
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void sendMessage(String topic, String message){
redisTemplate.convertAndSend(topic,message);
}
}
7、定義服務(wù)端某一個集群節點知識包緩存發生變化時,發送消息的通知類
package com.bstek.urule.sample.mq.adapter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties.Tomcat.Threads;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.packet.ClientPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.ClusterPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.PacketCache;
import com.bstek.urule.console.cache.packet.PacketConfig;
import com.bstek.urule.console.cache.packet.PacketData;
import com.bstek.urule.console.database.manager.packet.PacketManager;
import com.bstek.urule.console.database.model.Packet;
import com.bstek.urule.exception.RuleException;
import com.bstek.urule.runtime.KnowledgePackage;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
* 知識包緩存更新(xīn)消息通知類
*/
@Slf4j
@Component(ClusterPacketCacheAdapter.BEAN_ID)
public class MsgClusterPacketCacheAdapter implements ClusterPacketCacheAdapter{
@Autowired
private CustomProducerService customProducerService;
public List<Map<String, Object>> recacheAllPackets(String groupId) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("recacheAllPackets(String groupId):"+groupId);
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
return result;
}
@Override
public List<Map<String, Object>> refreshPacket(String groupId, long packetId) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
// PacketData packetData = PacketCache.ins.getPacket(packetId);
// String packetCode =packetData.getPacket().getCode();
Packet packet = PacketManager.ins.load(packetId);
String packetCode = packet.getCode();
log.info("refreshPacket(String groupId, long packetId):{}:{}:{}",groupId,packetId,packetCode);
ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clustermsg.put("groupId", groupId);
clustermsg.put("systemId", Utils.SystemId);
clustermsg.put("packetId", String.valueOf(packetId));
clustermsg.put("packetCode", packetCode);
clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESH);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
/* 如果客戶端的緩存更新(xīn)也采用(yòng)消息中(zhōng)間件通知更新(xīn),可(kě)以添加如下代碼
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clientmsg.put("groupId", groupId);
clientmsg.put("systemId", Utils.SystemId);
clientmsg.put("packetId", String.valueOf(packetId));
clientmsg.put("packetCode", packetCode);
clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_REFRESH);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
*/
return result;
}
@Override
public List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("removeProject(String paramString, long paramLong, List<PacketConfig> paramList):"+projectId);
/*如果客戶端的緩存更新(xīn)也采用(yòng)消息中(zhōng)間件通知更新(xīn),可(kě)以添加如下代碼
for(PacketConfig pc:list) {
disableClientsPacket(groupId,pc.getId(),pc.getCode());
} */
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("projectId", String.valueOf(projectId));
msg.put("messageType", MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
return result;
}
@Override
public void putPacket(long packetId, PacketData paramPacketData) {
// TODO Auto-generated method stub
}
@Override
public void putPacket(String packetCode, PacketData paramPacketData) {
// TODO Auto-generated method stub
}
@Override
public void remove(long packetId) {
// TODO Auto-generated method stub
}
@Override
public void remove(String packetCode) {
// TODO Auto-generated method stub
}
/*如果客戶端的緩存更新(xīn)也采用(yòng)消息中(zhōng)間件通知更新(xīn),可(kě)以添加如下代碼
public List<Map<String, Object>> disableClientsPacket(String groupId, long packetId,String packetCode) {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("disableClientsPacket(String groupId:{}, long packetId):{},code:{}" ,groupId,packetId,packetCode);
ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
msg.put("groupId", groupId);
msg.put("systemId", Utils.SystemId);
msg.put("packetId", String.valueOf(packetId));
msg.put("packetCode", packetCode);
msg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_DISABLE);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, msg.toString());
return result;
}
*/
}
8、定義服務(wù)端某一個集群節點Jar包緩存發生變化時,發送消息的通知類
package com.bstek.urule.sample.mq.adapter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.database.model.UrlType;
import com.bstek.urule.console.editor.jar.JarCacheAdapter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
* Jar包緩存消息發送類
*/
@Slf4j
@Component("urule.jarCacheAdapter")
public class MsgJarCacheAdapter extends JarCacheAdapter {
@Autowired
private CustomProducerService customProducerService;
@Override
public List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception {
List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
log.info("loadDynamicJars(String groupId, UrlType urlType)"+groupId);
ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clustermsg.put("groupId", groupId);
clustermsg.put("systemId", Utils.SystemId);
clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_JAR_SYNC);
customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
/*如果客戶端的緩存更新(xīn)也采用(yòng)消息中(zhōng)間件通知更新(xīn),可(kě)以添加如下代碼
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
clientmsg.put("groupId", groupId);
clientmsg.put("systemId", Utils.SystemId);
clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_JAR_SYNC);
customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
*/
return result;
}
}
9、定義接收到消息的監聽類
package com.bstek.urule.sample.mq.redis.listener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.ServerCacheManager;
import com.bstek.urule.runtime.cache.ClientCacheManager;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.bstek.urule.sample.mq.constant.MQConstant;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Resource;
/**
* Redis消息監聽處理(lǐ)類
*/
@Slf4j
public class RedisConsumerListener implements MessageListener {
private static Map<String, Consumer<String>> RULE = new HashMap<>();
{
RULE.put(MQConstant.CLUSTER_TOPIC, this::consumerClusterMessage);
// RULE.put(MQConstant.CLIENT_TOPIC, this::consumerClusterMessage);
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] b_channel = message.getChannel();
byte[] b_body = message.getBody();
try {
String channel = new String(b_channel);
String body = new String(b_body);
log.info("channel is:" + channel + " , body is: " + body);
RULE.get(channel).accept(body);
} catch (Exception e) {
}
}
public void consumerClusterMessage(String message) {
ServerCacheManager serverCacheManager = (ServerCacheManager) Utils.getApplicationContext().getBean("urule.serverCacheManager");
log.info("consumerClusterMessage exec params is :" + message);
try {
HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
String messageType = mapMessage.get("messageType");
String systemId = mapMessage.get("systemId");
String groupId = mapMessage.get("groupId");
String packetId = mapMessage.get("packetId");
String projectId = mapMessage.get("projectId");
if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESH.equals(messageType)) {
serverCacheManager.reloadPacket(systemId, Long.valueOf(packetId));
}else if(MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE.equals(messageType)) {
serverCacheManager.syncPacketForRemoveProject(systemId, Long.valueOf(projectId));
}else if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL.equals(messageType)) {
serverCacheManager.recacheAllPackets(systemId);
}else if(MQConstant.QUEUE_CLUSTER_JAR_SYNC.equals(messageType)) {
serverCacheManager.reloadDynamicJars(systemId);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*如果客戶端的消息消費監聽類,可(kě)以添加如下代碼
public void consumerClinetMessage(String message) {
ClientCacheManager clientCacheManager = (ClientCacheManager) Utils.getApplicationContext().getBean("urule.clientCacheManager");
log.info("consumerClinetMessage exec params is :" + message);
try {
HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
String messageType = mapMessage.get("messageType");
String systemId = mapMessage.get("systemId");
String groupId = mapMessage.get("groupId");
String packetId = mapMessage.get("packetId");
String projectId = mapMessage.get("projectId");
if(MQConstant.QUEUE_CLIENT_PACKET_DISABLE.equals(messageType)) {
clientCacheManager.disableKnowledge(packetId);
}else if(MQConstant.QUEUE_CLIENT_PACKET_ENABLE.equals(messageType)) {
clientCacheManager.enableKnowledge(packetId);
}else if(MQConstant.QUEUE_CLIENT_PACKET_REFRESH.equals(messageType)) {
clientCacheManager.reloadKnowledge(packetId);
}else if(MQConstant.QUEUE_CLIENT_JAR_SYNC.equals(messageType)) {
clientCacheManager.reloadDynamicJars();
log.info("=====jar重新(xīn)加載完成=====");
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
*/
至此,通過Redis實現的集群節點之間的緩存通知功能(néng)就完成了。