上篇文章 ActiveMQ 服务器的部署 实现了 ActiveMQ 服务器的部署,本文分别以官方 API、Spring、SpringBoot 三种方式,实现 ActiveMQ 消息的生成者和消费者。
作者:王克锋
出处:https://kefeng.wang/2017/10/18/activemq-development/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。
1.基于官方 API
ActiveMQ 官方实现了 JMS 接口,但使用很繁琐,不建议直接使用。
https://mvnrepository.com/artifact/org.apache.activemq/activemq-client
1 2 3 4 5
| <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.3</version> </dependency>
|
2.基于 Spring 开发
采用 spring.xml 配置的方式,比 JMS 简单,但也比较繁琐。
参见 《Spring实战》/ 12.Spring 消息
1 2 3 4 5
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring-version}</version> </dependency>
|
3.基于 SpringBoot 开发
IDEA 新建模块的 Spring Initializr 向导中,选中 I/O, JMS(ActiveMQ);
相应的 starter 是 spring-boot-starter-activemq
3.1 依赖包 pom.xml
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
|
3.2 配置文件 application.properties
1 2 3 4 5 6
| # ActiveMQ 地址 spring.activemq.broker-url=tcp://centos:61616
# 参数 spring.jms.pub-sub-domain 用于指定消息模型是否为发布/订阅方式 # 默认情况下(false),是点对占方式(queue),如果要使用发布/订阅方式(topic),必须设置为 true spring.jms.pub-sub-domain=false
|
3.3 应用类 MessageApplication.java
其中定义了 4 个 Bean:
- queue/topic: MessageProducer 中使用,作为消息发送的目标(分别是 queue/topic);
- containerFactoryQueue/containerFactoryTopic: MessageConsumer 中使用,以便同时启用两种消息模型。
需要特别说明的是:
- 只启用“点对点”模型:可配置 spring.jms.pub-sub-domain=false
- 只启用“发布/订阅”模型:可配置 spring.jms.pub-sub-domain=false
- 若要同时启用两种消息模型:必须定义 containerFactory Bean 并在消费者的 @JmsListener 中引用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @SpringBootApplication public class MessageApplication { @Bean public Queue queue() { return new ActiveMQQueue("queueName"); }
@Bean public Topic topic() { return new ActiveMQTopic("topicName"); }
@Bean public JmsListenerContainerFactory<?> containerFactoryQueue(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean; }
@Bean public JmsListenerContainerFactory<?> containerFactoryTopic(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); bean.setPubSubDomain(true); return bean; }
public static void main(String[] args) { SpringApplication.run(MessageApplication.class, args); } }
|
3.4 生产类 MessageProducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component @EnableScheduling public class MessageProducer { @Resource private Queue queue;
@Resource private Topic topic;
@Resource private JmsMessagingTemplate messagingTemplate;
@Scheduled(fixedDelay = 3000) public void sendQueue() { messagingTemplate.convertAndSend(queue, "Hi, queue!"); }
@Scheduled(fixedDelay = 6000) public void sendTopic() { messagingTemplate.convertAndSend(topic, "Hi, topic!"); } }
|
3.5 消费类 MessageConsumer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class MessageConsumer { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@JmsListener(destination = "queueName", containerFactory = "containerFactoryQueue") public void receiveQueue(String message) { logger.info("receiveQueue: " + message); }
@JmsListener(destination = "topicName", containerFactory = "containerFactoryTopic") public void receiveTopic(String message) { logger.info("receiveTopic: " + message); } }
|
3.6 运行结果
1 2 3 4 5 6
| 14:43:27.981 INFO [MessageConsumer.java:22] - receiveTopic: Hi, topic! 14:43:27.993 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue! 14:43:31.028 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue! 14:43:34.008 INFO [MessageConsumer.java:22] - receiveTopic: Hi, topic! 14:43:34.048 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue! 14:43:37.169 INFO [MessageConsumer.java:17] - receiveQueue: Hi, queue!
|
同时,可在 http://centos:8161/admin 中看到相关信息。