MessageService.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package com.primeton.dgs.kernel.core.message;
  2. import com.google.common.collect.HashMultimap;
  3. import com.google.common.collect.Multimap;
  4. import org.apache.commons.lang.StringUtils;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.Collection;
  8. import java.util.Queue;
  9. import java.util.concurrent.LinkedBlockingQueue;
  10. import java.util.concurrent.locks.ReentrantLock;
  11. /**
  12. *
  13. * 消息服务。 实现类似 JMS 的消息功能
  14. *
  15. *
  16. * <pre>
  17. *
  18. * Created by zhaopx.
  19. * User: zhaopx
  20. * Date: 2019/11/11
  21. * Time: 13:53
  22. *
  23. * </pre>
  24. *
  25. * @author zhaopx
  26. */
  27. public class MessageService implements Runnable {
  28. /**
  29. * 在 Spring 中注入的 Bean 名称
  30. */
  31. public static final String BEAN_NAME = "messageService";
  32. private static Logger logger = LoggerFactory.getLogger(MessageService.class);
  33. /**
  34. * Consumer 和 topic 的映射
  35. */
  36. private final Multimap<MessageConsumer, String> CONSUMER_AND_TOPIC_MAP = HashMultimap.create();
  37. /**
  38. * 内部线程
  39. */
  40. private final Thread CONSUMER_THREAD = new Thread(this);
  41. /**
  42. * 消息队列
  43. */
  44. private final Queue<Event> MESSAGE_QUEUE = new LinkedBlockingQueue<>();
  45. /**
  46. * 第一次启动,无需等待
  47. */
  48. private boolean hasMessage = true;
  49. /**
  50. * 防止遍历 Consumer 时,对 Consumer add, remove
  51. */
  52. private final ReentrantLock CONSUMER_LOCK = new ReentrantLock();
  53. /**
  54. * 当前系统状态
  55. */
  56. private boolean shutdown = false;
  57. /**
  58. * 初始化,线程
  59. */
  60. public void init() {
  61. CONSUMER_THREAD.setDaemon(true);
  62. CONSUMER_THREAD.setName("message_consumer_thread");
  63. CONSUMER_THREAD.start();
  64. }
  65. /**
  66. * 提供监听服务,阻塞当前线程
  67. */
  68. public void serve() {
  69. try {
  70. CONSUMER_THREAD.join();
  71. } catch (InterruptedException e) {
  72. // 当前线程执行中断、向上传导
  73. Thread.currentThread().interrupt();
  74. }
  75. }
  76. @Override
  77. public void run() {
  78. // 只要没有 shutdown,一直执行
  79. while (!shutdown) {
  80. if(!hasMessage) {
  81. synchronized (CONSUMER_THREAD) {
  82. try {
  83. // 无消息,等待 5s
  84. CONSUMER_THREAD.wait(5 * 1000L);
  85. } catch (InterruptedException e) {
  86. logger.error("Interrupted!", e);
  87. Thread.currentThread().interrupt();
  88. }
  89. }
  90. }
  91. Event message = MESSAGE_QUEUE.poll();
  92. // 是否还有消息,如果没有消息下一次等待时间会偏长
  93. hasMessage = (message != null);
  94. if(message == null) {
  95. continue;
  96. }
  97. // 获取到消息
  98. CONSUMER_LOCK.lock();
  99. try {
  100. for (MessageConsumer consumer : CONSUMER_AND_TOPIC_MAP.keySet()) {
  101. Collection<String> topics = CONSUMER_AND_TOPIC_MAP.get(consumer);
  102. String topic = StringUtils.upperCase(message.getTopic());
  103. // topic 符合 Consumer 的 topic 订阅,则广播
  104. if(topic != null && topics.contains(topic)){
  105. // 广播消息
  106. try {
  107. consumer.onMessage(message.getTopic(), message.getPayload());
  108. } catch (Exception e) {
  109. logger.error(consumer.getClass().getName() + " 处理消息异常。", e);
  110. }
  111. }
  112. }
  113. } catch (Exception e) {
  114. logger.error("消息处理异常。", e);
  115. } finally {
  116. CONSUMER_LOCK.unlock();
  117. }
  118. }
  119. }
  120. /**
  121. * 停止消息服务
  122. */
  123. public void shutdown() {
  124. shutdown = true;
  125. // 新的消息,唤醒 Consumer 线程
  126. synchronized (CONSUMER_THREAD) {
  127. CONSUMER_THREAD.notifyAll();
  128. }
  129. CONSUMER_THREAD.interrupt();
  130. try {
  131. CONSUMER_THREAD.join();
  132. } catch (InterruptedException e) {
  133. logger.error("Interrupted!", e);
  134. Thread.currentThread().interrupt();
  135. }
  136. }
  137. /**
  138. * 发消息
  139. * @param event
  140. */
  141. public void sendMessage(Event event) {
  142. this.MESSAGE_QUEUE.offer(event);
  143. // 新的消息,唤醒 Consumer 线程
  144. synchronized (CONSUMER_THREAD) {
  145. CONSUMER_THREAD.notify();
  146. }
  147. }
  148. /**
  149. * 获得一个消息发送提供者
  150. * @return
  151. */
  152. public MessageProvider getMessageProvider() {
  153. return new DefaultMessageProvider();
  154. }
  155. /**
  156. * 添加 Consumer
  157. * @param topic
  158. * @param consumer
  159. */
  160. public void addMessageConsumer(String topic, MessageConsumer consumer) {
  161. CONSUMER_LOCK.lock();
  162. try {
  163. // 改变为大写
  164. CONSUMER_AND_TOPIC_MAP.put(consumer, topic.toUpperCase());
  165. } finally {
  166. CONSUMER_LOCK.unlock();
  167. }
  168. }
  169. /**
  170. * 移除 消息 监听者
  171. * @param consumer
  172. * @param consumer
  173. */
  174. public void removeMessageConsumer(MessageConsumer consumer) {
  175. CONSUMER_LOCK.lock();
  176. try {
  177. CONSUMER_AND_TOPIC_MAP.removeAll(consumer);
  178. } finally {
  179. CONSUMER_LOCK.unlock();
  180. }
  181. }
  182. class DefaultMessageProvider implements MessageProvider {
  183. @Override
  184. public void send(String topic, Object message) {
  185. sendMessage(new Event(topic, message));
  186. }
  187. @Override
  188. public void send(Event<Object> event) {
  189. sendMessage(event);
  190. }
  191. }
  192. public static void main(String[] args) throws InterruptedException {
  193. MessageService messageService = new MessageService();
  194. messageService.init();
  195. messageService.addMessageConsumer("hello", ((topic, message) -> System.out.println(message)));
  196. messageService.getMessageProvider().send("hello", "AAAAAAAa");
  197. messageService.getMessageProvider().send("hello", "BBBBBBBB");
  198. Thread.sleep(3000);
  199. messageService.getMessageProvider().send("hello", "CCCCCCCC");
  200. Thread.sleep(100000);
  201. }
  202. }