ActiveMQ是Apache的出品,主要是用于消息管理,完全支持J2EE1.4的规范,并且对spring支持,非常容易的融入到spring框架中,支持多种传送协议。
activeMQ的物种传输数据格式:TextMessage mapMessage streamMessage objectMessage byteMessage
activeMQ主要分为俩种方式,一种是点对点的方式,即一个生产者对应一个消费者。
//点对点方式发消息
@Test public void test1() throws JMSException { //消息发送到ActiveMQ,所以需要先连接到ActiveMQ //创建连接工厂 ConnectionFactory factory =new ActiveMQConnectionFactory("tcp://192.168.25.131:61616"); //从工厂中获取连接 Connection conn= factory.createConnection(); //连接到ActiveMQ conn.start(); //连接上之后就可以发消息了 /** * 第一个参数表示是否使用事务,如果使用事务(在分布式中使用)那么第二个参数无效 * 第一个参数为false,则第二个参数有效,表示应答方式:1:自动应答,2:手动应答 */ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建消息发送的目的地Destination Queue queue = session.createQueue("msgqueue");//点对点 //创建消息的发送者--生产者 MessageProducer producer = session.createProducer(queue); //发送的内容 TextMessage message = session.createTextMessage("hello,你好"); //发消息 producer.send(message); producer.close(); session.close(); conn.close(); } //点对点方式接受消息 @Test public void test2() throws JMSException, IOException { //连接到ActiveMQ服务 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.131:61616"); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE); //收消息 //创建收消息的目的地 Queue queue = session.createQueue("msgqueue");//点对点 //创建接收消息者--消费者 MessageConsumer consumer = session.createConsumer(queue); //时刻监听是否有消息过来 consumer.setMessageListener(new MessageListener() { //处理消息的方法 @Override public void onMessage(Message message) { // 处理消息的代码---同步缓存 try { TextMessage msg = (TextMessage)message; String text = msg.getText(); System.out.println(text+"*************"); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read();//阻塞式方法 consumer.close(); session.close(); conn.close(); }另一种方式的发布/订阅模式,即一个生产者对应多个消费者
//发布/订阅方式发送消息
@Test public void test3() throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.131:61616"); Connection conn = factory.createConnection(); conn.start(); Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("msgtopic");//群发 MessageProducer producer = session.createProducer(topic); TextMessage msg = session.createTextMessage("hello,this is a topic message"); producer.send(msg); producer.close(); session.close(); conn.close(); } //发布订阅方式接收消息 @Test public void test4() throws JMSException, IOException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.131:61616"); Connection conn = factory.createConnection(); conn.start(); Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("msgtopic"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage msg=(TextMessage) message; String ss = msg.getText(); System.out.println(ss); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read(); consumer.close(); session.close(); conn.close(); }
pom.xml文件
<dependencies>
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <!-- 注意jar包的版本 --> <version>5.11.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
注意:点对点模式可以可以把消息保存到MQ中,等到消费者打开服务时,还是可以接受到的,但是发布/订阅这种模式就不可以,他必须先把接受端先打开,否则消息发出去后,接受端没有开启的话是永远接受不到的,也就是说,点对点模式可以消息存储到MQ中,二发布或订阅模式却不行。