新聞中心
JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件(MOM)的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。Java消息服務(wù)是一個與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。

公司主營業(yè)務(wù):網(wǎng)站設(shè)計制作、網(wǎng)站設(shè)計、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。成都創(chuàng)新互聯(lián)公司是一支青春激揚、勤奮敬業(yè)、活力青春激揚、勤奮敬業(yè)、活力澎湃、和諧高效的團隊。公司秉承以“開放、自由、嚴謹、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。成都創(chuàng)新互聯(lián)公司推出福貢免費做網(wǎng)站回饋大家。
當(dāng)前環(huán)境
-
Mac OS 10.11.x
-
docker 1.12.1
-
JDK 1.8
-
SpringBoot 1.5
前言
基于之前一篇“一個故事告訴你什么是消息隊列”,了解了消息隊列的使用場景以及相關(guān)的特性。本文主要講述消息服務(wù)在 JAVA 中的使用。
市面上的有關(guān)消息隊列的技術(shù)選型非常多,如果我們的代碼框架要支持不同的消息實現(xiàn),在保證框架具有較高擴展性的前提下,我們勢必要進行一定的封裝。
在 JAVA 中,大可不必如此。因為 JAVA 已經(jīng)制定了一套標(biāo)準(zhǔn)的 JMS 規(guī)范。該規(guī)范定義了一套通用的接口和相關(guān)語義,提供了諸如持久、驗證和事務(wù)的消息服務(wù),其最主要的目的是允許Java應(yīng)用程序訪問現(xiàn)有的消息中間件。就和 JDBC 一樣。
基本概念
在介紹具體的使用之前,先簡單介紹一下 JMS 的一些基本知識。這里我打算分為 3 部分來介紹,即 消息隊列(MQ)的連接、消息發(fā)送與消息接收。
這里我們的技術(shù)選型是 SpringBoot、JMS、ActiveMQ
為了更好的理解 JMS,這里沒有使用 SpringBoot 零配置來搭建項目
MQ 的連接
使用 MQ 的第一步一定是先連接 MQ。因為這里使用的是 JMS 規(guī)范,對于任何遵守 JMS 規(guī)范的 MQ 來說,都會實現(xiàn)相應(yīng)的ConnectionFactory接口,因此我們只需要創(chuàng)建一個ConnectionFactory工廠類,由它來實現(xiàn) MQ 的連接,以及封裝一系列特性的 MQ 參數(shù)。
例子:這里我們以 ActiveMQ 為例,
maven 依賴:
org.springframework.boot
spring-boot-starter-parent
1.5.3.RELEASE
org.springframework.boot
spring-boot-starter-activemq
創(chuàng)建 ActiveMQ 連接工廠:
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(ActiveMQ_URL);
connectionFactory.setUserName(ActiveMQ_USER);
connectionFactory.setPassword(ActiveMQ_PASSWORD);
return connectionFactory;
}
消息發(fā)送
關(guān)于消息的發(fā)送,是通過 JMS 核心包中的JmsTemplate類來實現(xiàn)的,它簡化了 JMS 的使用,因為在發(fā)送或同步接收消息時它幫我們處理了資源的創(chuàng)建和釋放。從它的作用也不難推測出,它需要引用我們上面創(chuàng)建的連接工廠,具體代碼如下:
@Bean
public JmsTemplate jmsQueueTemplate(){
return new JmsTemplate(connectionFactory());
}
JmsTemplate創(chuàng)建完成后,我們就可以調(diào)用它的方法來發(fā)送消息了。這里有兩個概念需要注意:
-
消息會發(fā)送到哪里?-> 即需要指定發(fā)送隊列的目的地(Destination),是可以在 JNDI 中進行存儲和提取的 JMS 管理對象。
-
發(fā)送的消息體具體是什么?-> 實現(xiàn)了javax.jms.Message的對象,類似于 JAVA RMI 的 Remote 對象。
代碼示例:
@Autowired
private JmsTemplate jmsQueueTemplate;
/**
* 發(fā)送原始消息 Message
*/
public void send(){
jmsQueueTemplate.send("queue1", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("我是原始消息");
}
});
}
優(yōu)化:當(dāng)然,我們不用每次都通過MessageCreator匿名類的方式來創(chuàng)建Message對象,JmsTemplate類中提供了對象實體自動轉(zhuǎn)換為Message對象的方法,convertAndSend(String destinationName, final Object message)。
優(yōu)化代碼示例:
/**
* 發(fā)送消息自動轉(zhuǎn)換成原始消息
*/
public void convertAndSend(){
jmsQueueTemplate.convertAndSend("queue1", "我是自動轉(zhuǎn)換的消息");
}
注:關(guān)于消息轉(zhuǎn)換,還可以通過實現(xiàn)MessageConverter接口來自定義轉(zhuǎn)換內(nèi)容
消息接收
講完了消息發(fā)送,我們最后來說說消息是如何接收的。消息既然是以Message對象的形式發(fā)送到指定的目的地,那么消息的接收勢必會去指定的目的地上去接收消息。這里采用的是監(jiān)聽者的方式來監(jiān)聽指定地點的消息,采用注解@JmsListener來設(shè)置監(jiān)聽方法。
代碼示例:
@Component
public class Listener1 {
@JmsListener(destination = "queue1")
public void receive(String msg){
System.out.println("監(jiān)聽到的消息內(nèi)容為: " + msg);
}
}
有了監(jiān)聽的目標(biāo)和方法后,監(jiān)聽器還得和 MQ 關(guān)聯(lián)起來,這樣才能運作起來。這里的監(jiān)聽器可能不止一個,如果每個都要和 MQ 建立連接,肯定不太合適。所以需要一個監(jiān)聽容器工廠的概念,即接口JmsListenerContainerFactory,它會引用上面創(chuàng)建好的與 MQ 的連接工廠,由它來負責(zé)接收消息以及將消息分發(fā)給指定的監(jiān)聽器。當(dāng)然也包括事務(wù)管理、資源獲取與釋放和異常轉(zhuǎn)換等。
代碼示例:
@Bean
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
//設(shè)置連接數(shù)
factory.setConcurrency("3-10");
//重連間隔時間
factory.setRecoveryInterval(1000L);
return factory;
}
場景
代碼地址:https://github.com/jasonGeng88/springboot-jms
對 JMS 有了基本的理解后,我們就來在具體的場景中使用一下。
首先,我們需要先啟動 ActiveMQ,這里我們以 Docker 容器化的方式進行啟動。
啟動命令:
docker run -d -p 8161:8161 -p 61616:61616 --name activemq webcenter/activemq
啟動成功后,在 ActiveMQ 可視化界面查看效果(http://localhost:8161):
點對點模式(單消費者)
下面介紹消息隊列中最常用的一種場景,即點對點模式?;靖拍钊缦拢?/p>
-
每個消息只能被一個消費者(Consumer)進行消費。一旦消息被消費后,就不再在消息隊列中存在。
-
發(fā)送者和接收者之間在時間上沒有依賴性,也就是說當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發(fā)送到隊列。
-
接收者在成功接收消息之后需向隊列應(yīng)答成功。
代碼實現(xiàn)(為簡化代碼,部分代碼沿用上面所述): 啟動文件(Application.java)
@SpringBootApplication
@EnableJms
public class Application {
...
/**
* JMS 隊列的模板類
* connectionFactory() 為 ActiveMQ 連接工廠
*/
@Bean
public JmsTemplate jmsQueueTemplate(){
return new JmsTemplate(connectionFactory());
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
注解@EnableJms設(shè)置在@Configuration類上,用來聲明對 JMS 注解的支持。
消息生產(chǎn)者(PtpProducer.java)
@Component
public class PtpProducer {
@Autowired
private JmsTemplate jmsQueueTemplate;
/**
* 發(fā)送消息自動轉(zhuǎn)換成原始消息
*/
public void convertAndSend(){
jmsQueueTemplate.convertAndSend("ptp", "我是自動轉(zhuǎn)換的消息");
}
}
生產(chǎn)者調(diào)用類(PtpController.java)
@RestController
@RequestMapping(value = "/ptp")
public class PtpController {
@Autowired
private PtpProducer ptpProducer;
@RequestMapping(value = "/convertAndSend")
public Object convertAndSend(){
ptpProducer.convertAndSend();
return "success";
}
}
消息監(jiān)聽容器工廠
@SpringBootApplication
@EnableJms
public class Application {
...
/**
* JMS 隊列的監(jiān)聽容器工廠
*/
@Bean(name = "jmsQueueListenerCF")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
//設(shè)置連接數(shù)
factory.setConcurrency("3-10");
//重連間隔時間
factory.setRecoveryInterval(1000L);
return factory;
}
...
}
消息監(jiān)聽器
@Component
public class PtpListener1 {
/**
* 消息隊列監(jiān)聽器
* destination 隊列地址
* containerFactory 監(jiān)聽器容器工廠, 若存在2個以上的監(jiān)聽容器工廠,需進行指定
*/
@JmsListener(destination = "ptp", containerFactory = "jmsQueueListenerCF")
public void receive(String msg){
System.out.println("點對點模式1: " + msg);
}
}
演示
啟動項目啟動后,通過 REST 接口的方式來調(diào)用消息生產(chǎn)者發(fā)送消息,請求如下:
curl -XGET 127.0.0.1:8080/ptp/convertAndSend
消費者控制臺信息:
ActiveMQ 控制臺信息:
列表說明:
-
Name:隊列名稱。
-
Number Of Pending Messages:等待消費的消息個數(shù)。
-
Number Of Consumers:當(dāng)前連接的消費者數(shù)目,因為我們采用的是連接池的方式連接,初始連接數(shù)為 3,所以顯示數(shù)字為 3。
-
Messages Enqueued:進入隊列的消息總個數(shù),包括出隊列的和待消費的,這個數(shù)量只增不減。
-
Messages Dequeued:出了隊列的消息,可以理解為是已經(jīng)消費的消息數(shù)量。
點對點模式(多消費者)
基于上面一個消費者消費的模式,因為生產(chǎn)者可能會有很多,同時像某個隊列發(fā)送消息,這時一個消費者可能會成為瓶頸。所以需要多個消費者來分攤消費壓力(消費線程池能解決一定壓力,但畢竟在單機上,做不到分布式分布,所以多消費者是有必要的),也就產(chǎn)生了下面的場景。
代碼實現(xiàn)
添加新的監(jiān)聽器
@Component
public class PtpListener2 {
@JmsListener(destination = Constant.QUEUE_NAME, containerFactory = "jmsQueueListenerCF")
public void receive(String msg){
System.out.println("點對點模式2: " + msg);
}
}
演示 這里我們發(fā)起 10 次請求,來觀察消費者的消費情況:
這里因為監(jiān)聽容器設(shè)置了線程池的緣故,在實際消費過程中,監(jiān)聽器消費的順序會有所差異。
發(fā)布訂閱模式
除了點對點模式,發(fā)布訂閱模式也是消息隊列中常見的一種使用。試想一下,有一個即時聊天群,你在群里發(fā)送一條消息。所有在這個群里的人(即訂閱了該群的人),都會收到你發(fā)送的信息。
基本概念:
-
每個消息可以有多個消費者。
-
發(fā)布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息。
-
為了消費消息,訂閱者必須保持運行的狀態(tài)。
代碼實現(xiàn) 修改 JmsTemplate 模板類,使其支持發(fā)布訂閱功能
@SpringBootApplication
@EnableJms
public class Application {
...
@Bean
public JmsTemplate jmsTopicTemplate(){
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setPubSubDomain(true);
return jmsTemplate;
}
...
}
消息生產(chǎn)者(PubSubProducer.java)
@Component
public class PtpProducer {
@Autowired
private JmsTemplate jmsTopicTemplate;
public void convertAndSend(){
jmsTopicTemplate.convertAndSend("topic", "我是自動轉(zhuǎn)換的消息");
}
}
生產(chǎn)者調(diào)用類(PubSubController.java)
@RestController
@RequestMapping(value = "/pubsub")
public class PtpController {
@Autowired
private PubSubProducer pubSubProducer;
@RequestMapping(value = "/convertAndSend")
public String convertAndSend(){
pubSubProducer.convertAndSend();
return "success";
}
}
修改 DefaultJmsListenerContainerFactory 類,使其支持發(fā)布訂閱功能
@SpringBootApplication
@EnableJms
public class Application {
...
/**
* JMS 隊列的監(jiān)聽容器工廠
*/
@Bean(name = "jmsTopicListenerCF")
public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1");
factory.setPubSubDomain(true);
return factory;
}
...
}
消息監(jiān)聽器(這里設(shè)置2個訂閱者)
@Component
public class PubSubListener1 {
@JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF")
public void receive(String msg){
System.out.println("訂閱者1 - " + msg);
}
}
@Component
public class PubSubListener2 {
@JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF")
public void receive(String msg){
System.out.println("訂閱者2 - " + msg);
}
}
演示
curl -XGET 127.0.0.1:8080/pubSub/convertAndSend
消費者控制臺信息:
ActiveMQ 控制臺信息:
總結(jié)
這里只是對 SpringBoot 與 JMS 集成的簡單說明與使用,詳細的介紹可以查看 Spring 的官方文檔,我這里也有幸參與 并發(fā)編程網(wǎng) 發(fā)起的 Spring 5 的翻譯工作,我主要翻譯了 Spring 5 的 JMS 章節(jié),其內(nèi)容對于上述 JMS 的基本概念,都有詳細的展開說明,有興趣的可以看一下,當(dāng)然翻譯水平有限,英文好的建議看原文。
文章名稱:SpringBoot中的使用SpringBoot中的使用
當(dāng)前URL:http://m.fisionsoft.com.cn/article/djgosic.html


咨詢
建站咨詢
