文章目录
  1. 1.基于官方 API
  2. 2.基于 Spring 开发
  3. 3.基于 SpringBoot 开发
    1. 3.1 依赖包 pom.xml
    2. 3.2 配置文件 application.properties
    3. 3.3 应用类 MessageApplication.java
    4. 3.4 生产类 MessageProducer.java
    5. 3.5 消费类 MessageConsumer.java
    6. 3.6 运行结果

上篇文章 ActiveMQ 服务器的部署 实现了 ActiveMQ 服务器的部署,本文分别以官方 API、Spring、SpringBoot 三种方式,实现 ActiveMQ 消息的生成者和消费者。

作者:王克锋
出处:https://kefeng.wang/2017/10/18/activemq-development/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。

1.基于官方 API

ActiveMQ 官方实现了 JMS 接口,但使用很繁琐,不建议直接使用。
https://mvnrepository.com/artifact/org.apache.activemq/activemq-client

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.3</version>
</dependency>

2.基于 Spring 开发

采用 spring.xml 配置的方式,比 JMS 简单,但也比较繁琐。
参见 《Spring实战》/ 12.Spring 消息

1
2
3
4
5
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-version}</version>
</dependency>

3.基于 SpringBoot 开发

IDEA 新建模块的 Spring Initializr 向导中,选中 I/O, JMS(ActiveMQ);
相应的 starter 是 spring-boot-starter-activemq

3.1 依赖包 pom.xml

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

3.2 配置文件 application.properties

1
2
3
4
5
6
# ActiveMQ 地址
spring.activemq.broker-url=tcp://centos:61616

# 参数 spring.jms.pub-sub-domain 用于指定消息模型是否为发布/订阅方式
# 默认情况下(false),是点对占方式(queue),如果要使用发布/订阅方式(topic),必须设置为 true
spring.jms.pub-sub-domain=false

3.3 应用类 MessageApplication.java

其中定义了 4 个 Bean:

  • queue/topic: MessageProducer 中使用,作为消息发送的目标(分别是 queue/topic);
  • containerFactoryQueue/containerFactoryTopic: MessageConsumer 中使用,以便同时启用两种消息模型。

需要特别说明的是:

  • 只启用“点对点”模型:可配置 spring.jms.pub-sub-domain=false
  • 只启用“发布/订阅”模型:可配置 spring.jms.pub-sub-domain=false
  • 若要同时启用两种消息模型:必须定义 containerFactory Bean 并在消费者的 @JmsListener 中引用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@SpringBootApplication
public class MessageApplication {
@Bean
public Queue queue() { // javax.jms.Queue
return new ActiveMQQueue("queueName");
}

@Bean
public Topic topic() { // javax.jms.Topic
return new ActiveMQTopic("topicName");
}

@Bean
public JmsListenerContainerFactory<?> containerFactoryQueue(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}

@Bean
public JmsListenerContainerFactory<?> containerFactoryTopic(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
bean.setPubSubDomain(true);
return bean;
}

public static void main(String[] args) {
SpringApplication.run(MessageApplication.class, args);
}
}

3.4 生产类 MessageProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
@EnableScheduling
public class MessageProducer {
@Resource
private Queue queue; // javax.jms.Queue

@Resource
private Topic topic; // javax.jms.Topic

@Resource
private JmsMessagingTemplate messagingTemplate;

@Scheduled(fixedDelay = 3000)
public void sendQueue() { // 每隔 3 秒,发送一个“点对点”消息
messagingTemplate.convertAndSend(queue, "Hi, queue!");
}

@Scheduled(fixedDelay = 6000)
public void sendTopic() { // 每隔 6 秒,发送一个“发布/订阅”消息
messagingTemplate.convertAndSend(topic, "Hi, topic!");
}
}

3.5 消费类 MessageConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class MessageConsumer {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@JmsListener(destination = "queueName", containerFactory = "containerFactoryQueue")
public void receiveQueue(String message) {
logger.info("receiveQueue: " + message);
}

@JmsListener(destination = "topicName", containerFactory = "containerFactoryTopic")
public void receiveTopic(String message) {
logger.info("receiveTopic: " + message);
}
}

3.6 运行结果

1
2
3
4
5
6
14:43:27.981  INFO  [MessageConsumer.java:22] - receiveTopic: Hi, topic!
14:43:27.993 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue!
14:43:31.028 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue!
14:43:34.008 INFO [MessageConsumer.java:22] - receiveTopic: Hi, topic!
14:43:34.048 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue!
14:43:37.169 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue!

同时,可在 http://centos:8161/admin 中看到相关信息。

文章目录
  1. 1.基于官方 API
  2. 2.基于 Spring 开发
  3. 3.基于 SpringBoot 开发
    1. 3.1 依赖包 pom.xml
    2. 3.2 配置文件 application.properties
    3. 3.3 应用类 MessageApplication.java
    4. 3.4 生产类 MessageProducer.java
    5. 3.5 消费类 MessageConsumer.java
    6. 3.6 运行结果