ActiveMQ介绍
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种语言编写客户端 2、对spring的支持,很容易和spring整合 3、支持多种传输协议:TCP,SSL,NIO,UDP等 4、支持AJAX 消息形式: 1、点对点(queue) 2、一对多(topic)
ActiveMQ安装
我这里提供一个安装好的虚拟机: 服务器运行后,我们可以直接访问到activeMQ的界面: 然后点击queues可以看到现在没有一条消息:ActiveMQ测试
编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:
org.apache.activemq activemq-all
queue的发送代码如下:
public void testMQProducerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(queue); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-queue"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
接收代码:
public void TestMQConsumerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(queue); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
然后当我们运行queue发送的时候可以看到队列里已经有一条消息了,但没有发送出去:
然后在运行queue 的接收端,可以看到消息已经发出了:
接着对topic进行测试,发送代码如下:
public void TestTopicProducer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(topic); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-topic"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
接收代码:
public void TestTopicConsumer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(topic); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
然后运行topic发送:
可以看到消息已经发送出去。再运行topic接收: 可以看到有了一个消费者,但是没有接收的消息,这是因为正常情况下我们的topic消息不会再服务器持久化,所以要先打开消费者,再打开生产者,这个时候我们再运行生产者发送一条消息看到消息已经接收到了:ActiveMQ整合spring及项目中运用
activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:
org.springframework spring-context
然后编写applicationContext-activemq.xml文件,
代码如下:
然后在我们淘淘商城中,商品添加到数据库的时候,对应也要添加数据到我们的solr索引中,所以生产者应该在插入数据后创建:
当然,在xml文件中配置好的jmstemplate和destination也要注入进来:
@Autowired private JmsTemplate jmsTemplate; @Resource(name="itemAddTopic") private Destination destination;
然后消费者应该写在我们的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再编写一个监听器,当接收到消息时,就讲数据存入索引库,xml文件代码如下:
接收消息代码:
最后同时打开测试即可。
转载地址:
ActiveMQ介绍
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种语言编写客户端 2、对spring的支持,很容易和spring整合 3、支持多种传输协议:TCP,SSL,NIO,UDP等 4、支持AJAX 消息形式: 1、点对点(queue) 2、一对多(topic)
ActiveMQ安装
我这里提供一个安装好的虚拟机: 服务器运行后,我们可以直接访问到activeMQ的界面: 然后点击queues可以看到现在没有一条消息:ActiveMQ测试
编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:
org.apache.activemq activemq-all
- 1
- 2
- 3
- 4
queue的发送代码如下:
public void testMQProducerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(queue); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-queue"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
接收代码:
public void TestMQConsumerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(queue); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
然后当我们运行queue发送的时候可以看到队列里已经有一条消息了,但没有发送出去:
然后在运行queue 的接收端,可以看到消息已经发出了: 接着对topic进行测试,发送代码如下:public void TestTopicProducer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(topic); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-topic"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
接收代码:
public void TestTopicConsumer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(topic); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
然后运行topic发送:
可以看到消息已经发送出去。再运行topic接收: 可以看到有了一个消费者,但是没有接收的消息,这是因为正常情况下我们的topic消息不会再服务器持久化,所以要先打开消费者,再打开生产者,这个时候我们再运行生产者发送一条消息看到消息已经接收到了:ActiveMQ整合spring及项目中运用
activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:
org.springframework spring-context
- 1
- 2
- 3
- 4
然后编写applicationContext-activemq.xml文件,
代码如下:- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
然后在我们淘淘商城中,商品添加到数据库的时候,对应也要添加数据到我们的solr索引中,所以生产者应该在插入数据后创建:
当然,在xml文件中配置好的jmstemplate和destination也要注入进来:@Autowired private JmsTemplate jmsTemplate; @Resource(name="itemAddTopic") private Destination destination;
- 1
- 2
- 3
- 4
然后消费者应该写在我们的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再编写一个监听器,当接收到消息时,就讲数据存入索引库,xml文件代码如下:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
接收消息代码:
最后同时打开测试即可。