package com.primeton.dgs.kernel.core.message;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
/**
*
* 消息服务。 实现类似 JMS 的消息功能
*
*
*
*
* Created by zhaopx.
* User: zhaopx
* Date: 2019/11/11
* Time: 13:53
*
*
*
* @author zhaopx
*/
public class MessageService implements Runnable {
/**
* 在 Spring 中注入的 Bean 名称
*/
public static final String BEAN_NAME = "messageService";
private static Logger logger = LoggerFactory.getLogger(MessageService.class);
/**
* Consumer 和 topic 的映射
*/
private final Multimap CONSUMER_AND_TOPIC_MAP = HashMultimap.create();
/**
* 内部线程
*/
private final Thread CONSUMER_THREAD = new Thread(this);
/**
* 消息队列
*/
private final Queue MESSAGE_QUEUE = new LinkedBlockingQueue<>();
/**
* 第一次启动,无需等待
*/
private boolean hasMessage = true;
/**
* 防止遍历 Consumer 时,对 Consumer add, remove
*/
private final ReentrantLock CONSUMER_LOCK = new ReentrantLock();
/**
* 当前系统状态
*/
private boolean shutdown = false;
/**
* 初始化,线程
*/
public void init() {
CONSUMER_THREAD.setDaemon(true);
CONSUMER_THREAD.setName("message_consumer_thread");
CONSUMER_THREAD.start();
}
/**
* 提供监听服务,阻塞当前线程
*/
public void serve() {
try {
CONSUMER_THREAD.join();
} catch (InterruptedException e) {
// 当前线程执行中断、向上传导
Thread.currentThread().interrupt();
}
}
@Override
public void run() {
// 只要没有 shutdown,一直执行
while (!shutdown) {
if(!hasMessage) {
synchronized (CONSUMER_THREAD) {
try {
// 无消息,等待 5s
CONSUMER_THREAD.wait(5 * 1000L);
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
}
Event message = MESSAGE_QUEUE.poll();
// 是否还有消息,如果没有消息下一次等待时间会偏长
hasMessage = (message != null);
if(message == null) {
continue;
}
// 获取到消息
CONSUMER_LOCK.lock();
try {
for (MessageConsumer consumer : CONSUMER_AND_TOPIC_MAP.keySet()) {
Collection topics = CONSUMER_AND_TOPIC_MAP.get(consumer);
String topic = StringUtils.upperCase(message.getTopic());
// topic 符合 Consumer 的 topic 订阅,则广播
if(topic != null && topics.contains(topic)){
// 广播消息
try {
consumer.onMessage(message.getTopic(), message.getPayload());
} catch (Exception e) {
logger.error(consumer.getClass().getName() + " 处理消息异常。", e);
}
}
}
} catch (Exception e) {
logger.error("消息处理异常。", e);
} finally {
CONSUMER_LOCK.unlock();
}
}
}
/**
* 停止消息服务
*/
public void shutdown() {
shutdown = true;
// 新的消息,唤醒 Consumer 线程
synchronized (CONSUMER_THREAD) {
CONSUMER_THREAD.notifyAll();
}
CONSUMER_THREAD.interrupt();
try {
CONSUMER_THREAD.join();
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
/**
* 发消息
* @param event
*/
public void sendMessage(Event event) {
this.MESSAGE_QUEUE.offer(event);
// 新的消息,唤醒 Consumer 线程
synchronized (CONSUMER_THREAD) {
CONSUMER_THREAD.notify();
}
}
/**
* 获得一个消息发送提供者
* @return
*/
public MessageProvider getMessageProvider() {
return new DefaultMessageProvider();
}
/**
* 添加 Consumer
* @param topic
* @param consumer
*/
public void addMessageConsumer(String topic, MessageConsumer consumer) {
CONSUMER_LOCK.lock();
try {
// 改变为大写
CONSUMER_AND_TOPIC_MAP.put(consumer, topic.toUpperCase());
} finally {
CONSUMER_LOCK.unlock();
}
}
/**
* 移除 消息 监听者
* @param consumer
* @param consumer
*/
public void removeMessageConsumer(MessageConsumer consumer) {
CONSUMER_LOCK.lock();
try {
CONSUMER_AND_TOPIC_MAP.removeAll(consumer);
} finally {
CONSUMER_LOCK.unlock();
}
}
class DefaultMessageProvider implements MessageProvider {
@Override
public void send(String topic, Object message) {
sendMessage(new Event(topic, message));
}
@Override
public void send(Event