- 浏览: 448049 次
- 性别:
- 来自: 广州
文章分类
- 全部博客 (369)
- javascript html (20)
- java (31)
- jquery (15)
- jcrop (0)
- JEECG (1)
- ajax (3)
- 反射 (3)
- VI (1)
- mysql (48)
- easyui (1)
- svn (2)
- MD5 加密 (1)
- spring (14)
- ORACLE (8)
- 经验总结 (1)
- TCP/IP协议 (1)
- ICMP协议 (1)
- eclipse (1)
- Reflect (1)
- linux (21)
- android (5)
- excel 操作 (1)
- java tree (1)
- html (1)
- plupload (1)
- mongodb (9)
- aes (1)
- python (1)
- java relax (1)
- highcharts (2)
- json (2)
- java 多线程 (30)
- maven (2)
- 设计模式 (1)
- jsp+js (2)
- 面向对象 (1)
- jvm (16)
- 缓存 (1)
- proxy (1)
- 聊侃 (1)
- 面经 (1)
- java 字节 (1)
- java 类加载器 (2)
- java 基础 (2)
- java 语法糖 (1)
- java 位运算 (1)
- 排序 (3)
- java 服务器性能优化 (19)
- 网络编程 (2)
- jvm 参数设置 (0)
- jersey (1)
- webservice (2)
- nginx+多tomcat 集成 (5)
- nginx (16)
- squid (3)
- memcached (5)
- 正则表达式 (1)
- 常用免费接口 (1)
- jpa (1)
- win7 (1)
- java处理大文件 (1)
- js正则表达式 (1)
- tomcat (1)
- java 敏感字 (1)
- 系统架构优化 (4)
- 学习 (1)
- 本地测试QQ微博第三方登陆 (1)
- java 错误 (1)
- 微信支付 (1)
- https (1)
- httpclient (1)
- awk (2)
- loadrunner (1)
- sql server 2008 (3)
- git (4)
- sql server2008 (1)
- solr (2)
- centos (1)
- 数据存储架构 (3)
- log4j (1)
- weboffice (1)
- 并发编程 (1)
- postgreSQL (0)
- ssl (1)
- openssl (1)
- activeMQ (2)
- IDEA (1)
- shell (1)
- ansible (4)
- docker (2)
- grafana (1)
- jmeter (1)
- TLS (1)
- 将博客搬至CSDN (1)
最新评论
-
dida1990:
啊喔,过去了这么久,不过还是评一个。谁说uuid的hashCo ...
高并发生成订单号(二) -
annan211:
yclovesun 写道使用了uuid,为什么还要machin ...
高并发生成订单号(二) -
yclovesun:
使用了uuid,为什么还要machineId?uuid已经可以 ...
高并发生成订单号(二) -
u013280917:
太深奥,看不懂
mysql优化特定类型的查询
基于activeMQ broker cluster 集群 的高可用 多协议 物联网消息的架构设计
- 博客分类:
- activeMQ
activeMQ是一款功能十分强大的消息中间件。
支持包括MQTT NIO 在内的多种协议,而且是jms的完美实现。当有数以百万计的终端设备需要连接到服务器时,适当处理和架构就可以对外提供功能强劲的服务能力。
首先需要解决activeMQ 单节点服务性能问题,切不可直接使用默认配置上生产。
可以自己百度 或者 参照 日志
http://m.blog.csdn.net/truong/article/details/73718621
http://blog.csdn.net/yinwenjie/article/details/50955502
http://blog.csdn.net/yinwenjie/article/details/50991443
http://blog.csdn.net/yinwenjie/article/details/51064242
单节点完成 则进行 集群搭建 一样参照博文
http://www.cnblogs.com/leihenqianshang/articles/5623858.html
搭建完成之后,高性能的服务架构已经出来了。这时候需要写一些测试代码。
1 先写针对客户端和消费端都基于activeMQ API 的情况。生产者代码如下:
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws JMSException { // 连接到ActiveMQ服务器 //ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:61616)"); //这里可以使用 failover 机制 进行多节点配置 达到高可用和后端负载均衡 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1883)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Topic topic = session.createTopic("VirtualTopic.virtual-T.100990"); MessageProducer producer = session.createProducer(topic); // NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式 //producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage(); for(int i = 0;i<10;i++){ message.setText("topic 消息。"+i); message.setStringProperty("property", "消息Property"); // 发布主题消息 producer.send(message); System.out.println("Sent message: " + message.getText()); } session.close(); connection.close(); } }
我们需要多个消费者来消费消息以达到 分布式部署的目的
消费者1
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerA { public static void main(String[] args) throws JMSException, InterruptedException { // 连接到ActiveMQ服务器 // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.virtual-T.*"); // 消费者A组创建订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); Integer count = 0; while(true){ Message message = consumerA1.receive(); count++; TextMessage msg = (TextMessage)message; final String messageText = msg.getText(); System.out.println(messageText +",消费消息总数为:"+count); } /**consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } });**/ //session.close(); //connection.close(); } }
消费者2
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerB { public static void main(String[] args) throws JMSException, InterruptedException { // 连接到ActiveMQ服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(nio://172.16.7.17:1888,nio://172.16.7.18:1889)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.virtual-T.*"); // 消费者A组创建订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); Integer count = 0; while(true){ Message message = consumerA1.receive(); count++; TextMessage msg = (TextMessage)message; final String messageText = msg.getText(); System.out.println(messageText +",消费消息总数为:"+count); } /**consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("B 收到消息: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } });**/ //session.close(); //connection.close(); } }
消费者3
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerC { public static void main(String[] args) throws JMSException, InterruptedException { // 连接到ActiveMQ服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(nio://172.16.7.17:1888,nio://172.16.7.18:1889)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(mqtt://172.16.7.17:1883,mqtt://172.16.7.18:1883)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.virtual-T.*"); // 消费者A组创建订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); Integer count = 0; while(true){ Message message = consumerA1.receive(); count++; TextMessage msg = (TextMessage)message; final String messageText = msg.getText(); System.out.println(messageText +",消费消息总数为:"+count); } /**consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("B 收到消息: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } });**/ //session.close(); //connection.close(); } }
测试的结果是 生产者生成的消息会负载到不同的消费者客户端,各个消费者客户端消费了不同的消息。这里不管生产者生产的消息是走队列还是走topic 其结果都是实现了消息的路由。
假设出现一种情况,设备端只支持MQTT协议,使用了MQTTClient 发送消息或者只想走MQTT,那么针对这个架构该如何应对呢?道理是一样的,由于linux-c或者C++ 客户端开发人员的尿性,所以 生产者模拟程序
package com.sunshine.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MyMqttClient { private String host="tcp://172.16.7.15:1887"; private String userName="admin"; private String passWord = "activemq.123"; private MqttConnectOptions options; private MqttClient client; private MqttMessage message ; private String[] myTopics={"test_result/20179112982783"}; private int[] myQos={2}; private MqttTopic topic; private String myTopic = "test/20179112982783"; public MyMqttClient(){ try { client=new MqttClient(host,"test99990000",new MemoryPersistence()); options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); client.setCallback(new MqttCallback(){ @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topicName, MqttMessage message) throws Exception { System.out.println("topicName is :"+topicName); System.out.println("Message is:"+message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { }}); client.connect(options); client.subscribe(myTopics,myQos); } catch (Exception e) { e.printStackTrace(); } } public void sendMessage(String topicT,String messageTest){ try { message = new MqttMessage(); message.setQos(0); message.setRetained(true); message.setPayload(messageTest.getBytes()); topic = client.getTopic(topicT); MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println(" 发送消息:"+messageTest); } catch (Exception e) { e.printStackTrace(); } } }
package com.sunshine.mqtt; import java.text.SimpleDateFormat; import java.util.Date; import org.junit.Test; public class MyMqttTest { @Test public void testMQTT(){ MyMqtt mqtt = new MyMqtt(); mqtt.sendMessage("我向客户端发送了一条消息"); } @Test public void testMQTTClient(){ MyMqttClient client=new MyMqttClient(); //client.sendMessage("我向服务端发送了一条消息,我的SN是 20179112982783"); } @Test public void testIotConnect() throws InterruptedException{ MyMqttClient client=new MyMqttClient(); for(int i=0;i<10;i++){ client.sendMessage("VirtualTopic/100990"+i,"mqtt-msg:"+i+">"+("100990"+i)); } } }
三个消费者
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicConnectionFactory; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQBytesMessage; public class ConsumerTopicA { public static void main(String[] args) throws JMSException, InterruptedException { // 连接到ActiveMQ服务器 // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)"); //ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.*"); // 消费者A组创建订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); Integer count = 0; while(true){ //在实际生产过程中不建议使用receive 会造成阻塞并损耗大量资源 建议直接使用spring jms 监听器 ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumerA1.receive(); count++; // TextMessage msg = (TextMessage)message; // String messageText = msg.getText(); System.out.println(new String(message.getMessage().getContent().getData()) +",消费消息总数为:"+count); } /**consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } });**/ //session.close(); //connection.close(); } }
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicConnectionFactory; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQBytesMessage; public class ConsumerTopicB { public static void main(String[] args) throws JMSException, InterruptedException { // 连接到ActiveMQ服务器 // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)"); //ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.*"); // 消费者A组创建订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); Integer count = 0; while(true){ ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumerA1.receive(); count++; // TextMessage msg = (TextMessage)message; // String messageText = msg.getText(); System.out.println(new String(message.getMessage().getContent().getData()) +",消费消息总数为:"+count); } /**consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } });**/ //session.close(); //connection.close(); } }
package com.sunshine.mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicConnectionFactory; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQBytesMessage; public class ConsumerTopicC { public static void main(String[] args) throws JMSException, InterruptedException { // 连接到ActiveMQ服务器 // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.15:1887)"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:61616,tcp://172.16.7.18:61616)"); //ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin","failover:(tcp://172.16.7.17:1883,tcp://172.16.7.18:1883)"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://172.16.7.17:1888,tcp://172.16.7.18:1889)"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建主题 Queue topicA = session.createQueue("Consumer.GG.VirtualTopic.*"); // 消费者A组创建订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); Integer count = 0; while(true){ ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumerA1.receive(); count++; // TextMessage msg = (TextMessage)message; // String messageText = msg.getText(); System.out.println(new String(message.getMessage().getContent().getData()) +",消费消息总数为:"+count); } /**consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("A 收到消息: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } });**/ //session.close(); //connection.close(); } }
此时客户端只支持单个broker地址,可以通过一些策略分配给客户端相对比较空闲的broker
相关推荐
基于kahadb的activemq高可用集群部署配置示例,两个broker分别部署在两台机器上,一台机子上面,两个实例组成一个broker对外提供高可用服务,两个broker通过桥接形成集群服务
ActiveMQ 集群中JDBC Master Slave + Broker Cluster的整合
activemqBroker插件:activemqBroker-2.14-SNAPSHOT.war
高可用之ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置、高可用测试
亲自安装的笔记
高可用之ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置(伪集群)
amq-聚类Activemq 代理动态集群,基于 ZooKeeper。
ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置、高可用测试
ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试
ActiveMQ高可用集群(ZooKeeper+LevelDB)安装、配置(伪集群)
构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...
《RabbitMQ集群环境生产实例部署》《ActiveMQ集群》《ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试》
activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程,activemq分布式集群视频教程
activemq性能与高可用性测试,activemq性能与高可用性测试。
基于ActiveMQ的消息中间件架构设计.docx
ActiveMQ具有强大和灵活的集群功能,ActiveMQ的集群方式主要由两种:Master-Slave和Broker Cluster。
50多页超详细,ActiveMQ消息总线设计方案,包含各种消息队列技术对比
window搭建activeMQ集群(linux系统搭建集群的方式和window的一样),还有自己写的搭建集群的文档和我自己亲手搭建的一个三个mq集群
activemq-broker-5.9.1.jar,activemq-broker-5.9.1.jar,activemq-broker-5.9.1.jar
高可用之ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试--java源码