1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- package com.yiidata.amc.server.service;
- import com.yiidata.amc.api.utils.ServerConfig;
- import org.apache.activemq.broker.BrokerService;
- import org.apache.activemq.broker.region.policy.PolicyEntry;
- import org.apache.activemq.broker.region.policy.PolicyMap;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
- import javax.inject.Inject;
- import javax.inject.Named;
- import java.io.File;
- /**
- * <p>
- * commons of this class
- * </p>
- * <p>
- * Created by ZhenQin on 2018/3/27 0027-10:58
- * Vendor: yiidata.com
- */
- public class MQBrockerService {
- final String mqBrokerUrl;
- final BrokerService brocker;
- @Inject
- public MQBrockerService(@Named("brokerUrl") String mqBrokerUrl, ServerConfig serverConfig) {
- this.mqBrokerUrl = mqBrokerUrl;
- try {
- brocker = new BrokerService();
- brocker.addConnector(mqBrokerUrl);
- // 消息持久化
- brocker.setPersistent("true".equals(serverConfig.getProperty("mq.persistent", "false")));
- brocker.setUseJmx(false);
- PolicyMap policy = new PolicyMap();
- PolicyEntry entry = new PolicyEntry();
- entry.setAdvisoryForConsumed(true);
- policy.put(new ActiveMQQueue(">"), entry);
- brocker.setDestinationPolicy(policy);
- // 持久化
- if(brocker.isPersistent()) {
- KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
- File dir = new File(ServerConfig.getAppHome(), "KahaDB");
- if(!dir.exists()) {
- dir.mkdirs();
- }
- adaptor.setDirectory(dir);
- brocker.setPersistenceAdapter(adaptor);
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- public void start() {
- try {
- brocker.start();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- public void stop() {
- try {
- brocker.stop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
|