博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ消息管理
阅读量:4957 次
发布时间:2019-06-12

本文共 3879 字,大约阅读时间需要 12 分钟。

ActiveMQ是Apache的出品,主要是用于消息管理,完全支持J2EE1.4的规范,并且对spring支持,非常容易的融入到spring框架中,支持多种传送协议。

activeMQ的物种传输数据格式:TextMessage    mapMessage   streamMessage   objectMessage   byteMessage

activeMQ主要分为俩种方式,一种是点对点的方式,即一个生产者对应一个消费者。

//点对点方式发消息

@Test
public void test1() throws JMSException
{
//消息发送到ActiveMQ,所以需要先连接到ActiveMQ
//创建连接工厂
ConnectionFactory factory =new ActiveMQConnectionFactory("tcp://192.168.25.131:61616");
//从工厂中获取连接
Connection conn= factory.createConnection();
//连接到ActiveMQ
conn.start();
//连接上之后就可以发消息了
/**
* 第一个参数表示是否使用事务,如果使用事务(在分布式中使用)那么第二个参数无效
* 第一个参数为false,则第二个参数有效,表示应答方式:1:自动应答,2:手动应答
*/
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息发送的目的地Destination
Queue queue = session.createQueue("msgqueue");//点对点
//创建消息的发送者--生产者
MessageProducer producer = session.createProducer(queue);
//发送的内容
TextMessage message = session.createTextMessage("hello,你好");
//发消息
producer.send(message);
producer.close();
session.close();
conn.close();
}
//点对点方式接受消息
@Test
public void test2() throws JMSException, IOException
{
//连接到ActiveMQ服务
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.131:61616");
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//收消息
//创建收消息的目的地
Queue queue = session.createQueue("msgqueue");//点对点
//创建接收消息者--消费者
MessageConsumer consumer = session.createConsumer(queue);
//时刻监听是否有消息过来
consumer.setMessageListener(new MessageListener() {
//处理消息的方法
@Override
public void onMessage(Message message) {
// 处理消息的代码---同步缓存
try {
TextMessage msg = (TextMessage)message;
String text = msg.getText();
System.out.println(text+"*************");
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();//阻塞式方法
consumer.close();
session.close();
conn.close();
}

另一种方式的发布/订阅模式,即一个生产者对应多个消费者

 

//发布/订阅方式发送消息

@Test
public void test3() throws JMSException
{
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.131:61616");
Connection conn = factory.createConnection();
conn.start();
Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("msgtopic");//群发
MessageProducer producer = session.createProducer(topic);
TextMessage msg = session.createTextMessage("hello,this is a topic message");
producer.send(msg);
producer.close();
session.close();
conn.close();
}
//发布订阅方式接收消息
@Test
public void test4() throws JMSException, IOException
{
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.131:61616");
Connection conn = factory.createConnection();
conn.start();
Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("msgtopic");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage msg=(TextMessage) message;
String ss = msg.getText();
System.out.println(ss);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
consumer.close();
session.close();
conn.close();
}

 

pom.xml文件

<dependencies>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<!-- 注意jar包的版本 -->
<version>5.11.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>

 

注意:点对点模式可以可以把消息保存到MQ中,等到消费者打开服务时,还是可以接受到的,但是发布/订阅这种模式就不可以,他必须先把接受端先打开,否则消息发出去后,接受端没有开启的话是永远接受不到的,也就是说,点对点模式可以消息存储到MQ中,二发布或订阅模式却不行。

 

转载于:https://www.cnblogs.com/hzq-/p/8921347.html

你可能感兴趣的文章
mariadb BINLOG_FORMAT = STATEMENT 异常
查看>>
C3P0 WARN: Establishing SSL connection without server's identity verification is not recommended
查看>>
iPhone在日本最牛,在中国输得最慘
查看>>
动态方法决议 和 消息转发
查看>>
WPF自定义搜索框代码分享
查看>>
js 基础拓展
查看>>
C#生成随机数
查看>>
iOS CoreData介绍和使用(以及一些注意事项)
查看>>
Android应用程序与SurfaceFlinger服务的连接过程分析
查看>>
Java回顾之多线程
查看>>
sqlite
查看>>
机电行业如何进行信息化建设
查看>>
9、总线
查看>>
Git 笔记 - section 1
查看>>
HDU6409 没有兄弟的舞会
查看>>
2018 Multi-University Training Contest 10 - TeaTree
查看>>
2018 Multi-University Training Contest 10 - Count
查看>>
HDU6203 ping ping ping
查看>>
《人人都是产品经理》书籍目录
查看>>
如何在git bash中运行mysql
查看>>