ActiveMQ消息通知,使用java获取MQ中消费者数量等信息
第一种方法activeMQ发送消息时获取对应用户的消费者个数,根据该数量循环发送消息,保证每个消费者都可以收到消息api接口也可以获取MQ服务器消息信息,具体接口请自行查询activeMQ开发文档package cn.sdpjw;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamRe
·
第一种方法
activeMQ发送消息时获取对应用户的消费者个数,根据该数量循环发送消息,保证每个消费者都可以收到消息
api接口也可以获取MQ服务器消息信息,具体接口请自行查询activeMQ开发文档
package cn.sdpjw;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.*;
public class ActiveMQUtil {
public static void main(String[] args) {
String url ="http://你的远程消息服务器ip:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,
destinationType=Queue,destinationName=消费者id";
String mpa =sendPost(url);
System.err.println(mpa);
}
/***
* 获取消息面板中对应消费者的各种数据
* 返回Map中数据格式
* @return
*/
public static String sendPost(String url){
String result = "";
try {
URL realUrl = new URL(url);
String Authorization = Base64.getEncoder().encodeToString("admin:admin".getBytes());
System.err.println(Authorization);
HttpURLConnection conn = (HttpURLConnection)realUrl.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Accept", "application/json");
conn.setRequestProperty("Accept-Encoding", "gzip, deflate");
conn.setRequestProperty("Accept-Language", "zh-CN,zh;q=0.9");
conn.setRequestProperty("Authorization", "Basic "+Authorization);
conn.setRequestProperty("Cache-Control", "max-age=0");
conn.setRequestProperty("Connection", "Keep-Alive");
conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
conn.setRequestProperty("Host", "application/json; charset=utf-8");
conn.setRequestProperty("Upgrade-Insecure-Requests", "1");
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(),"utf-8"));
String line;
while((line = in.readLine()) != null){
result +="\n" + line;
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}
第二种方法
第一步下载apache-activemq-5.15.11 解压后进入
主要关注bin和conf两个目录
在conf目录中编辑activemq.xml文件,修改
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
添加 useJmx=”true”
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useJmx="true">
之后在修改
<managementContext>
<managementContext createConnector="false" />
</managementContext>
为
<managementContext>
<managementContext createConnector="false" connectorPath="/jmxrmi" connectorHost="本机ip地址" jmxDomainName="org.apache.activemq" connectorPort="1093" />
</managementContext>
保存后进入bin目录
修改activemq文件
在checkRunning()方法之前添加
ACTIVEMQ_CONF="/app/apache-activemq-5.15.11/conf"#配置路径
ACTIVEMQ_SUNJMX_START="-Dcom.sun.management.jmxremote.port=1093"#端口保持一致
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Djava.rmi.server.hostname=ip地址"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.password.file=/app/apache-activemq-5.15.11/conf/jmx.password"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.access.file=/app/apache-activemq-5.15.11/conf/jmx.access"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.ssl=false"
之后可以通过获取
import com.alibaba.fastjson.JSON;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 消息队列工具类
* activemq/conf/activemq.xml文件中 broker节点下,修改managementContext节点内容为
* <managementContext>
* <managementContext createConnector="true" connectorPort="1093" connectorPath="/jmxrmi" jmxDomainName="org.apache.activemq"/>
*</managementContext>
*/
public class ActiveMQUtil {
private String connectorPort = "1093";
private String connectorPath = "/jmxrmi";
private String jmxDomain = "org.apache.activemq";
public ActiveMQUtil() {
}
public static void main(String[] args) {
Map mpa = getAllQueue("USERS.98ba6c2a13da45bba9f38db7c63a40f7");
System.err.println(mpa.toString());
}
/***
* 获取消息面板中对应消费者的各种数据
* 返回Map中数据格式
* numberOfConsumers//消费者数 当前token对应的有效登录数(包括标签页打开个数)
* queueName//消息队列名称 用户名USER.ID
* queueSize//队列中剩余的消息数
* numberOfDequeue出队数 已发出消息个数
* @param queue
* @return
*/
public Map getAllQueue(String queue) {
Map queueMap=new HashMap<String, Long>();
BrokerViewMBean mBean=null;
MBeanServerConnection connection=null;
try{
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://ip地址:1093"+connectorPath);
Map<String, Object> auth = new HashMap<String, Object>();
auth.put(JMXConnector.CREDENTIALS, new String[] { "admin", "123456" });
JMXConnector connector = JMXConnectorFactory.connect(url,auth);
connector.connect();
connection = connector.getMBeanServerConnection();
ObjectName name = new ObjectName(jmxDomain + ":brokerName=localhost,type=Broker");
mBean = MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);
}catch (IOException e){
System.err.println("ActiveMQUtil.getAllQueue"+e);
}catch (MalformedObjectNameException e){
System.err.println("ActiveMQUtil.getAllQueue"+e);
}
if(mBean!=null){
for (ObjectName queueName : mBean.getQueues()) {
QueueViewMBean queueMBean = MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean.class, true);
if(queue.equals(queueMBean.getName())){
queueMap.put("numberOfConsumers",queueMBean.getConsumerCount());//消费者数 当前token对应的有效登录数(包括标签页打开个数)
queueMap.put("queueName",queueMBean.getName());//消息队列名称 用户名USER.ID
queueMap.put("queueSize",queueMBean.getQueueSize());//队列中剩余的消息数
queueMap.put("numberOfDequeue",queueMBean.getDequeueCount());// 出队数 已发出消息个数
break;
}
}
}
return queueMap;
}
}
欢迎大家访问我的个人博客:http://blog.ycsn.xyz/
更多推荐
所有评论(0)