MQBrockerService.java 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package com.yiidata.amc.server.service;
  2. import com.yiidata.amc.api.utils.ServerConfig;
  3. import org.apache.activemq.broker.BrokerService;
  4. import org.apache.activemq.broker.region.policy.PolicyEntry;
  5. import org.apache.activemq.broker.region.policy.PolicyMap;
  6. import org.apache.activemq.command.ActiveMQQueue;
  7. import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
  8. import javax.inject.Inject;
  9. import javax.inject.Named;
  10. import java.io.File;
  11. /**
  12. * <p>
  13. * commons of this class
  14. * </p>
  15. * <p>
  16. * Created by ZhenQin on 2018/3/27 0027-10:58
  17. * Vendor: yiidata.com
  18. */
  19. public class MQBrockerService {
  20. final String mqBrokerUrl;
  21. final BrokerService brocker;
  22. @Inject
  23. public MQBrockerService(@Named("brokerUrl") String mqBrokerUrl, ServerConfig serverConfig) {
  24. this.mqBrokerUrl = mqBrokerUrl;
  25. try {
  26. brocker = new BrokerService();
  27. brocker.addConnector(mqBrokerUrl);
  28. // 消息持久化
  29. brocker.setPersistent("true".equals(serverConfig.getProperty("mq.persistent", "false")));
  30. brocker.setUseJmx(false);
  31. PolicyMap policy = new PolicyMap();
  32. PolicyEntry entry = new PolicyEntry();
  33. entry.setAdvisoryForConsumed(true);
  34. policy.put(new ActiveMQQueue(">"), entry);
  35. brocker.setDestinationPolicy(policy);
  36. // 持久化
  37. if(brocker.isPersistent()) {
  38. KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
  39. File dir = new File(ServerConfig.getAppHome(), "KahaDB");
  40. if(!dir.exists()) {
  41. dir.mkdirs();
  42. }
  43. adaptor.setDirectory(dir);
  44. brocker.setPersistenceAdapter(adaptor);
  45. }
  46. } catch (Exception e) {
  47. throw new IllegalStateException(e);
  48. }
  49. }
  50. public void start() {
  51. try {
  52. brocker.start();
  53. } catch (Exception e) {
  54. throw new IllegalStateException(e);
  55. }
  56. }
  57. public void stop() {
  58. try {
  59. brocker.stop();
  60. } catch (Exception e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }