123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- package com.primeton.dgs.kernel.core.message;
- import com.google.common.collect.HashMultimap;
- import com.google.common.collect.Multimap;
- import org.apache.commons.lang.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.Collection;
- import java.util.Queue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- *
- * 消息服务。 实现类似 JMS 的消息功能
- *
- *
- * <pre>
- *
- * Created by zhaopx.
- * User: zhaopx
- * Date: 2019/11/11
- * Time: 13:53
- *
- * </pre>
- *
- * @author zhaopx
- */
- public class MessageService implements Runnable {
- /**
- * 在 Spring 中注入的 Bean 名称
- */
- public static final String BEAN_NAME = "messageService";
- private static Logger logger = LoggerFactory.getLogger(MessageService.class);
- /**
- * Consumer 和 topic 的映射
- */
- private final Multimap<MessageConsumer, String> CONSUMER_AND_TOPIC_MAP = HashMultimap.create();
- /**
- * 内部线程
- */
- private final Thread CONSUMER_THREAD = new Thread(this);
- /**
- * 消息队列
- */
- private final Queue<Event> MESSAGE_QUEUE = new LinkedBlockingQueue<>();
- /**
- * 第一次启动,无需等待
- */
- private boolean hasMessage = true;
- /**
- * 防止遍历 Consumer 时,对 Consumer add, remove
- */
- private final ReentrantLock CONSUMER_LOCK = new ReentrantLock();
- /**
- * 当前系统状态
- */
- private boolean shutdown = false;
- /**
- * 初始化,线程
- */
- public void init() {
- CONSUMER_THREAD.setDaemon(true);
- CONSUMER_THREAD.setName("message_consumer_thread");
- CONSUMER_THREAD.start();
- }
- /**
- * 提供监听服务,阻塞当前线程
- */
- public void serve() {
- try {
- CONSUMER_THREAD.join();
- } catch (InterruptedException e) {
- // 当前线程执行中断、向上传导
- Thread.currentThread().interrupt();
- }
- }
- @Override
- public void run() {
- // 只要没有 shutdown,一直执行
- while (!shutdown) {
- if(!hasMessage) {
- synchronized (CONSUMER_THREAD) {
- try {
- // 无消息,等待 5s
- CONSUMER_THREAD.wait(5 * 1000L);
- } catch (InterruptedException e) {
- logger.error("Interrupted!", e);
- Thread.currentThread().interrupt();
- }
- }
- }
- Event message = MESSAGE_QUEUE.poll();
- // 是否还有消息,如果没有消息下一次等待时间会偏长
- hasMessage = (message != null);
- if(message == null) {
- continue;
- }
- // 获取到消息
- CONSUMER_LOCK.lock();
- try {
- for (MessageConsumer consumer : CONSUMER_AND_TOPIC_MAP.keySet()) {
- Collection<String> topics = CONSUMER_AND_TOPIC_MAP.get(consumer);
- String topic = StringUtils.upperCase(message.getTopic());
- // topic 符合 Consumer 的 topic 订阅,则广播
- if(topic != null && topics.contains(topic)){
- // 广播消息
- try {
- consumer.onMessage(message.getTopic(), message.getPayload());
- } catch (Exception e) {
- logger.error(consumer.getClass().getName() + " 处理消息异常。", e);
- }
- }
- }
- } catch (Exception e) {
- logger.error("消息处理异常。", e);
- } finally {
- CONSUMER_LOCK.unlock();
- }
- }
- }
- /**
- * 停止消息服务
- */
- public void shutdown() {
- shutdown = true;
- // 新的消息,唤醒 Consumer 线程
- synchronized (CONSUMER_THREAD) {
- CONSUMER_THREAD.notifyAll();
- }
- CONSUMER_THREAD.interrupt();
- try {
- CONSUMER_THREAD.join();
- } catch (InterruptedException e) {
- logger.error("Interrupted!", e);
- Thread.currentThread().interrupt();
- }
- }
- /**
- * 发消息
- * @param event
- */
- public void sendMessage(Event event) {
- this.MESSAGE_QUEUE.offer(event);
- // 新的消息,唤醒 Consumer 线程
- synchronized (CONSUMER_THREAD) {
- CONSUMER_THREAD.notify();
- }
- }
- /**
- * 获得一个消息发送提供者
- * @return
- */
- public MessageProvider getMessageProvider() {
- return new DefaultMessageProvider();
- }
- /**
- * 添加 Consumer
- * @param topic
- * @param consumer
- */
- public void addMessageConsumer(String topic, MessageConsumer consumer) {
- CONSUMER_LOCK.lock();
- try {
- // 改变为大写
- CONSUMER_AND_TOPIC_MAP.put(consumer, topic.toUpperCase());
- } finally {
- CONSUMER_LOCK.unlock();
- }
- }
- /**
- * 移除 消息 监听者
- * @param consumer
- * @param consumer
- */
- public void removeMessageConsumer(MessageConsumer consumer) {
- CONSUMER_LOCK.lock();
- try {
- CONSUMER_AND_TOPIC_MAP.removeAll(consumer);
- } finally {
- CONSUMER_LOCK.unlock();
- }
- }
- class DefaultMessageProvider implements MessageProvider {
- @Override
- public void send(String topic, Object message) {
- sendMessage(new Event(topic, message));
- }
- @Override
- public void send(Event<Object> event) {
- sendMessage(event);
- }
- }
- public static void main(String[] args) throws InterruptedException {
- MessageService messageService = new MessageService();
- messageService.init();
- messageService.addMessageConsumer("hello", ((topic, message) -> System.out.println(message)));
- messageService.getMessageProvider().send("hello", "AAAAAAAa");
- messageService.getMessageProvider().send("hello", "BBBBBBBB");
- Thread.sleep(3000);
- messageService.getMessageProvider().send("hello", "CCCCCCCC");
- Thread.sleep(100000);
- }
- }
|