package com.primeton.dgs.kernel.core.message; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantLock; /** * * 消息服务。 实现类似 JMS 的消息功能 * * *
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2019/11/11
 * Time: 13:53
 *
 * 
* * @author zhaopx */ public class MessageService implements Runnable { /** * 在 Spring 中注入的 Bean 名称 */ public static final String BEAN_NAME = "messageService"; private static Logger logger = LoggerFactory.getLogger(MessageService.class); /** * Consumer 和 topic 的映射 */ private final Multimap CONSUMER_AND_TOPIC_MAP = HashMultimap.create(); /** * 内部线程 */ private final Thread CONSUMER_THREAD = new Thread(this); /** * 消息队列 */ private final Queue MESSAGE_QUEUE = new LinkedBlockingQueue<>(); /** * 第一次启动,无需等待 */ private boolean hasMessage = true; /** * 防止遍历 Consumer 时,对 Consumer add, remove */ private final ReentrantLock CONSUMER_LOCK = new ReentrantLock(); /** * 当前系统状态 */ private boolean shutdown = false; /** * 初始化,线程 */ public void init() { CONSUMER_THREAD.setDaemon(true); CONSUMER_THREAD.setName("message_consumer_thread"); CONSUMER_THREAD.start(); } /** * 提供监听服务,阻塞当前线程 */ public void serve() { try { CONSUMER_THREAD.join(); } catch (InterruptedException e) { // 当前线程执行中断、向上传导 Thread.currentThread().interrupt(); } } @Override public void run() { // 只要没有 shutdown,一直执行 while (!shutdown) { if(!hasMessage) { synchronized (CONSUMER_THREAD) { try { // 无消息,等待 5s CONSUMER_THREAD.wait(5 * 1000L); } catch (InterruptedException e) { logger.error("Interrupted!", e); Thread.currentThread().interrupt(); } } } Event message = MESSAGE_QUEUE.poll(); // 是否还有消息,如果没有消息下一次等待时间会偏长 hasMessage = (message != null); if(message == null) { continue; } // 获取到消息 CONSUMER_LOCK.lock(); try { for (MessageConsumer consumer : CONSUMER_AND_TOPIC_MAP.keySet()) { Collection topics = CONSUMER_AND_TOPIC_MAP.get(consumer); String topic = StringUtils.upperCase(message.getTopic()); // topic 符合 Consumer 的 topic 订阅,则广播 if(topic != null && topics.contains(topic)){ // 广播消息 try { consumer.onMessage(message.getTopic(), message.getPayload()); } catch (Exception e) { logger.error(consumer.getClass().getName() + " 处理消息异常。", e); } } } } catch (Exception e) { logger.error("消息处理异常。", e); } finally { CONSUMER_LOCK.unlock(); } } } /** * 停止消息服务 */ public void shutdown() { shutdown = true; // 新的消息,唤醒 Consumer 线程 synchronized (CONSUMER_THREAD) { CONSUMER_THREAD.notifyAll(); } CONSUMER_THREAD.interrupt(); try { CONSUMER_THREAD.join(); } catch (InterruptedException e) { logger.error("Interrupted!", e); Thread.currentThread().interrupt(); } } /** * 发消息 * @param event */ public void sendMessage(Event event) { this.MESSAGE_QUEUE.offer(event); // 新的消息,唤醒 Consumer 线程 synchronized (CONSUMER_THREAD) { CONSUMER_THREAD.notify(); } } /** * 获得一个消息发送提供者 * @return */ public MessageProvider getMessageProvider() { return new DefaultMessageProvider(); } /** * 添加 Consumer * @param topic * @param consumer */ public void addMessageConsumer(String topic, MessageConsumer consumer) { CONSUMER_LOCK.lock(); try { // 改变为大写 CONSUMER_AND_TOPIC_MAP.put(consumer, topic.toUpperCase()); } finally { CONSUMER_LOCK.unlock(); } } /** * 移除 消息 监听者 * @param consumer * @param consumer */ public void removeMessageConsumer(MessageConsumer consumer) { CONSUMER_LOCK.lock(); try { CONSUMER_AND_TOPIC_MAP.removeAll(consumer); } finally { CONSUMER_LOCK.unlock(); } } class DefaultMessageProvider implements MessageProvider { @Override public void send(String topic, Object message) { sendMessage(new Event(topic, message)); } @Override public void send(Event event) { sendMessage(event); } } public static void main(String[] args) throws InterruptedException { MessageService messageService = new MessageService(); messageService.init(); messageService.addMessageConsumer("hello", ((topic, message) -> System.out.println(message))); messageService.getMessageProvider().send("hello", "AAAAAAAa"); messageService.getMessageProvider().send("hello", "BBBBBBBB"); Thread.sleep(3000); messageService.getMessageProvider().send("hello", "CCCCCCCC"); Thread.sleep(100000); } }