博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ详细入门使用教程
阅读量:4694 次
发布时间:2019-06-09

本文共 13684 字,大约阅读时间需要 45 分钟。

ActiveMQ介绍

     MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 

特点: 
1、支持多种语言编写客户端 
2、对spring的支持,很容易和spring整合 
3、支持多种传输协议:TCP,SSL,NIO,UDP等 
4、支持AJAX 
消息形式: 
1、点对点(queue) 
2、一对多(topic) 

ActiveMQ安装

这里写图片描述

我这里提供一个安装好的虚拟机: 
服务器运行后,我们可以直接访问到activeMQ的界面: 
这里写图片描述
然后点击queues可以看到现在没有一条消息: 
这里写图片描述

ActiveMQ测试

      编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:

 

org.apache.activemq
activemq-all

 

queue的发送代码如下:

public void testMQProducerQueue() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Queue queue = session.createQueue("test-queue");        //6、使用会话对象创建生产者对象        MessageProducer producer = session.createProducer(queue);        //7、使用会话对象创建一个消息对象        TextMessage textMessage = session.createTextMessage("hello!test-queue");        //8、发送消息        producer.send(textMessage);        //9、关闭资源        producer.close();        session.close();        connection.close();    }

接收代码:

public void TestMQConsumerQueue() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Queue queue = session.createQueue("test-queue");        //6、使用会话对象创建生产者对象        MessageConsumer consumer = session.createConsumer(queue);        //7、向consumer对象中设置一个messageListener对象,用来接收消息        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                // TODO Auto-generated method stub                if(message instanceof TextMessage){                    TextMessage textMessage = (TextMessage)message;                    try {                        System.out.println(textMessage.getText());                    } catch (JMSException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        });        //8、程序等待接收用户消息        System.in.read();        //9、关闭资源        consumer.close();        session.close();        connection.close();    }

然后当我们运行queue发送的时候可以看到队列里已经有一条消息了,但没有发送出去: 
这里写图片描述
然后在运行queue 的接收端,可以看到消息已经发出了: 
这里写图片描述
这里写图片描述
接着对topic进行测试,发送代码如下:

public void TestTopicProducer() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Topic topic = session.createTopic("test-topic");        //6、使用会话对象创建生产者对象        MessageProducer producer = session.createProducer(topic);        //7、使用会话对象创建一个消息对象        TextMessage textMessage = session.createTextMessage("hello!test-topic");        //8、发送消息        producer.send(textMessage);        //9、关闭资源        producer.close();        session.close();        connection.close();    }

接收代码:

public void TestTopicConsumer() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Topic topic = session.createTopic("test-topic");        //6、使用会话对象创建生产者对象        MessageConsumer consumer = session.createConsumer(topic);        //7、向consumer对象中设置一个messageListener对象,用来接收消息        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                // TODO Auto-generated method stub                if(message instanceof TextMessage){                    TextMessage textMessage = (TextMessage)message;                    try {                        System.out.println(textMessage.getText());                    } catch (JMSException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        });        //8、程序等待接收用户消息        System.in.read();        //9、关闭资源        consumer.close();        session.close();        connection.close();    }

然后运行topic发送: 

这里写图片描述
可以看到消息已经发送出去。再运行topic接收: 
这里写图片描述
可以看到有了一个消费者,但是没有接收的消息,这是因为正常情况下我们的topic消息不会再服务器持久化,所以要先打开消费者,再打开生产者,这个时候我们再运行生产者发送一条消息看到消息已经接收到了: 
这里写图片描述
这里写图片描述

ActiveMQ整合spring及项目中运用

      activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:

org.springframework
spring-context

然后编写applicationContext-activemq.xml文件, 
代码如下:

然后在我们淘淘商城中,商品添加到数据库的时候,对应也要添加数据到我们的solr索引中,所以生产者应该在插入数据后创建: 
这里写图片描述
当然,在xml文件中配置好的jmstemplate和destination也要注入进来:

@Autowired    private JmsTemplate jmsTemplate;    @Resource(name="itemAddTopic")    private Destination destination;

然后消费者应该写在我们的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再编写一个监听器,当接收到消息时,就讲数据存入索引库,xml文件代码如下:

接收消息代码: 
这里写图片描述
最后同时打开测试即可。

 

转载地址:

 

ActiveMQ介绍

     MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 

特点: 
1、支持多种语言编写客户端 
2、对spring的支持,很容易和spring整合 
3、支持多种传输协议:TCP,SSL,NIO,UDP等 
4、支持AJAX 
消息形式: 
1、点对点(queue) 
2、一对多(topic) 

ActiveMQ安装

这里写图片描述

我这里提供一个安装好的虚拟机: 
服务器运行后,我们可以直接访问到activeMQ的界面: 
这里写图片描述
然后点击queues可以看到现在没有一条消息: 
这里写图片描述

ActiveMQ测试

      编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:

org.apache.activemq
activemq-all
  • 1
  • 2
  • 3
  • 4

queue的发送代码如下:

public void testMQProducerQueue() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Queue queue = session.createQueue("test-queue");        //6、使用会话对象创建生产者对象        MessageProducer producer = session.createProducer(queue);        //7、使用会话对象创建一个消息对象        TextMessage textMessage = session.createTextMessage("hello!test-queue");        //8、发送消息        producer.send(textMessage);        //9、关闭资源        producer.close();        session.close();        connection.close();    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

接收代码:

public void TestMQConsumerQueue() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Queue queue = session.createQueue("test-queue");        //6、使用会话对象创建生产者对象        MessageConsumer consumer = session.createConsumer(queue);        //7、向consumer对象中设置一个messageListener对象,用来接收消息        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                // TODO Auto-generated method stub                if(message instanceof TextMessage){                    TextMessage textMessage = (TextMessage)message;                    try {                        System.out.println(textMessage.getText());                    } catch (JMSException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        });        //8、程序等待接收用户消息        System.in.read();        //9、关闭资源        consumer.close();        session.close();        connection.close();    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

然后当我们运行queue发送的时候可以看到队列里已经有一条消息了,但没有发送出去: 

这里写图片描述
然后在运行queue 的接收端,可以看到消息已经发出了: 
这里写图片描述
这里写图片描述
接着对topic进行测试,发送代码如下:

public void TestTopicProducer() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Topic topic = session.createTopic("test-topic");        //6、使用会话对象创建生产者对象        MessageProducer producer = session.createProducer(topic);        //7、使用会话对象创建一个消息对象        TextMessage textMessage = session.createTextMessage("hello!test-topic");        //8、发送消息        producer.send(textMessage);        //9、关闭资源        producer.close();        session.close();        connection.close();    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

接收代码:

public void TestTopicConsumer() throws Exception{        //1、创建工厂连接对象,需要制定ip和端口号        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");        //2、使用连接工厂创建一个连接对象        Connection connection = connectionFactory.createConnection();        //3、开启连接        connection.start();        //4、使用连接对象创建会话(session)对象        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)        Topic topic = session.createTopic("test-topic");        //6、使用会话对象创建生产者对象        MessageConsumer consumer = session.createConsumer(topic);        //7、向consumer对象中设置一个messageListener对象,用来接收消息        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                // TODO Auto-generated method stub                if(message instanceof TextMessage){                    TextMessage textMessage = (TextMessage)message;                    try {                        System.out.println(textMessage.getText());                    } catch (JMSException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            }        });        //8、程序等待接收用户消息        System.in.read();        //9、关闭资源        consumer.close();        session.close();        connection.close();    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

然后运行topic发送: 

这里写图片描述
可以看到消息已经发送出去。再运行topic接收: 
这里写图片描述
可以看到有了一个消费者,但是没有接收的消息,这是因为正常情况下我们的topic消息不会再服务器持久化,所以要先打开消费者,再打开生产者,这个时候我们再运行生产者发送一条消息看到消息已经接收到了: 
这里写图片描述
这里写图片描述

ActiveMQ整合spring及项目中运用

      activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:

org.springframework
spring-context
  • 1
  • 2
  • 3
  • 4

然后编写applicationContext-activemq.xml文件, 

代码如下:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

然后在我们淘淘商城中,商品添加到数据库的时候,对应也要添加数据到我们的solr索引中,所以生产者应该在插入数据后创建: 

这里写图片描述
当然,在xml文件中配置好的jmstemplate和destination也要注入进来:

@Autowired    private JmsTemplate jmsTemplate;    @Resource(name="itemAddTopic")    private Destination destination;
  • 1
  • 2
  • 3
  • 4

然后消费者应该写在我们的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再编写一个监听器,当接收到消息时,就讲数据存入索引库,xml文件代码如下:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

接收消息代码: 

这里写图片描述
最后同时打开测试即可。

转载于:https://www.cnblogs.com/loong-hon/p/9661946.html

你可能感兴趣的文章
IT常用单词
查看>>
拓扑排序
查看>>
NYOJ--32--SEARCH--组合数
查看>>
JMS
查看>>
gulpfile 压缩模板
查看>>
【34.14%】【BZOJ 3110】 [Zjoi2013]K大数查询
查看>>
【 henuacm2016级暑期训练-动态规划专题 A 】Cards
查看>>
第五篇:白话tornado源码之褪去模板的外衣
查看>>
设备常用框架framework
查看>>
bootstrap模态框和select2合用时input无法获取焦点(转)
查看>>
21世纪经济网APP
查看>>
解决NetworkOnMainThreadException
查看>>
1039 到底买不买
查看>>
农银电商项目学习笔记(一)
查看>>
MockObject
查看>>
Chukwa
查看>>
(转)Maven仓库——私服介绍
查看>>
设计模式之工厂模式
查看>>
仿复制粘贴功能,长按弹出tips的实现
查看>>
Kubernetes-Host网络模式应用
查看>>