package com.yiidata.amc.server.service; import com.yiidata.amc.api.utils.ServerConfig; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import javax.inject.Inject; import javax.inject.Named; import java.io.File; /** *
* commons of this class *
** Created by ZhenQin on 2018/3/27 0027-10:58 * Vendor: yiidata.com */ public class MQBrockerService { final String mqBrokerUrl; final BrokerService brocker; @Inject public MQBrockerService(@Named("brokerUrl") String mqBrokerUrl, ServerConfig serverConfig) { this.mqBrokerUrl = mqBrokerUrl; try { brocker = new BrokerService(); brocker.addConnector(mqBrokerUrl); // 消息持久化 brocker.setPersistent("true".equals(serverConfig.getProperty("mq.persistent", "false"))); brocker.setUseJmx(false); PolicyMap policy = new PolicyMap(); PolicyEntry entry = new PolicyEntry(); entry.setAdvisoryForConsumed(true); policy.put(new ActiveMQQueue(">"), entry); brocker.setDestinationPolicy(policy); // 持久化 if(brocker.isPersistent()) { KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); File dir = new File(ServerConfig.getAppHome(), "KahaDB"); if(!dir.exists()) { dir.mkdirs(); } adaptor.setDirectory(dir); brocker.setPersistenceAdapter(adaptor); } } catch (Exception e) { throw new IllegalStateException(e); } } public void start() { try { brocker.start(); } catch (Exception e) { throw new IllegalStateException(e); } } public void stop() { try { brocker.stop(); } catch (Exception e) { e.printStackTrace(); } } }