本文共 5342 字,大约阅读时间需要 17 分钟。
发送方编码
package pres.zhang.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://localhost:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,获得连接connection,并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3.创建回话session //两个参数 1.事务 2.签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); //创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); //通过使用messageProducer生产3条消息发送到MQ的队列里面 for (int i = 0; i < 3; i++) { //7.创建消息 TextMessage textMessage = session.createTextMessage("msg---" + i);//可以理解为字符串 //8.通过messageProducer发送给MQ messageProducer.send(textMessage); } //9.关闭资源 messageProducer.close(); session.close(); connection.close();; System.out.println("消息发布到MQ完成!"); }}
运行代码,然后访问localhost:8161:
控制台的说明:
接收方消息编码
package pres.zhang.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://localhost:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,获得连接connection,并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3.创建回话session //两个参数 1.事务 2.签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); //5.创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(); if(textMessage != null){ System.out.println("*****消费者接收到消息:" + textMessage.getText()); }else{ break; } } messageConsumer.close(); session.close(); connection.close(); }}
运行程序,输出:
*****消费者接收到消息:msg---0*****消费者接收到消息:msg---1*****消费者接收到消息:msg---2*****消费者接收到消息:msg---0*****消费者接收到消息:msg---1*****消费者接收到消息:msg---2
控制台:
第一中接收方式:receive()方法
第二种接收方式:监听
package pres.zhang.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import javax.xml.soap.Text;import java.io.IOException;public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://localhost:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,获得连接connection,并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3.创建回话session //两个参数 1.事务 2.签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); //5.创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); //第一种方式:receive() // while(true){ // TextMessage textMessage = (TextMessage) messageConsumer.receive(); // if(textMessage != null){ // System.out.println("*****消费者接收到消息:" + textMessage.getText()); // }else{ // break; // } // } // messageConsumer.close(); // session.close(); // connection.close(); //第二中方式:监听 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message != null && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("监听器接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); //保证控制台运行 System.in.read(); messageConsumer.close(); session.close(); connection.close(); }}
运行输出:
监听器接收消息:msg---0监听器接收消息:msg---1监听器接收消息:msg---2
先生产: 只启动1号消费者。问题:1号消费者能消费吗?
可以先生产:先启动1号消费者再启动2号消费者。问题:2号消费者还能消费消息吗?
1号可以,2号没有,都被一号消费了。先启动两个消费者,再生产6条消息。问题,消费情况如何?
1号消费者:第1,3,5条消息。2号消费者:第2,4,6条。类似轮询。转载地址:http://hypqb.baihongyu.com/