0%

JMS

JMS简介

JMS是什么

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

为什么需要JMS

在JAVA中,如果两个应用程序之间对各自都不了解,甚至这两个程序可能部署在不同的大洲上,那么它们之间如何发送消息呢?举个例子,一个应用程序A部署在印度,另一个应用程序部署在美国,然后每当A触发某件事后,B想从A获取一些更新信息。
当然,也有可能不止一个B对A的更新信息感兴趣,可能会有N个类似B的应用程序想从A中获取更新的信息。

在这种情况下,JAVA提供了最佳的解决方案-JMS,完美解决了上面讨论的问题。

JMS同样适用于基于事件的应用程序,如聊天服务,它需要一种发布事件机制向所有与服务器连接的客户端发送消息。JMS与RMI不同,发送消息的时候,接收者不需要在线。服务器发送了消息,然后就不管了;
等到客户端上线的时候,能保证接收到服务器发送的消息。这是一个很强大的解决方案,能处理当今世界很多普遍问题。

JMS的优势

1)异步:JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。
2)可靠:JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题,只是避免而不是杜绝,所以在一些糟糕的环境下还是有可能会出现重复。

JMS体系架构

JMS由以下元素组成。

1)JMS提供者
连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

2)JMS客户
生产或消费消息的基于Java的应用程序或对象。

3)JMS生产者
创建并发送消息的JMS客户。

4)JMS消费者
接收消息的JMS客户。

5)JMS消息
包括可以在JMS客户之间传递的数据的对象

6)JMS队列
一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。

7)JMS主题
一种支持发送消息给多个订阅者的机制。

JMS消息模型

JMS消息模型

1)Point-to-Point(P2P)
2)Publish/Subscribe(Pub/Sub)

在JMS API出现之前,大部分产品使用“点对点”和“发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范,它们相互独立。
任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。

P2P(点对点)

P2P模式

涉及的概念

1)消息队列(Queue)
2)提供者(Sender)
3)消费者(Receiver)
4)每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点

1)每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
2)提供者和消费者之间在时间上没有依赖性,也就是说当提供者发送了消息之后,不管消费者有没有正在运行,它不会影响到消息被发送到队列
3)每条消息仅会传送给一个消费者。可能会有多个消费者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个消费者所消费。
4)消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者。当已被消费时,就会从队列头部将它们删除(除非使用了消息优先级)。
5)消费者在成功接收消息之后需向队列应答成功

注意:如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。

Pub/Sub(发布/订阅模式)

Pub/Sub模式

涉及的概念

1)主题(Topic)
2)发布者(Publisher)
3)订阅者(Subscriber)
4)客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点

1)每个消息可以有多个消费者。
2)发布者和订阅者之间有时间上的依赖性。针对某个主题的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
3)为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
4)每条消息都会传送给称为订阅者的多个消息消费者。订阅者有许多类型,包括持久型、非持久型和动态型。
5)发布者通常不会知道、也意识不到哪一个订阅者正在接收主题消息。
6)消息被推送给消费者,这意味着消息会传送给消费者,而无须请求。

如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

JMS消息

JMS消息结构

JMS客户端使用JMS消息与系统通讯,JMS消息虽然格式简单但是非常灵活。
JMS消息由三部分组成:消息头,消息属性,消息体;

消息头

设置消息的通用属性,类似http消息头,可以设置消息的过期时间,是否持久化,投递的目的地(队列名/topic名),最主要的设置消息的唯一标识messageId来标识消息的唯一性(JMS会模认生成,你也可以使用自己的id格式来生成唯一标识)。

JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

  • JMSDestination:消息发送的目的地,主要是指Queue和Topic,由session创建而由生产者的send方法设置。
  • JMSDeliveryMode:传送模式:有两种即久模式和非持久模式。一条持久性的消息应该被传输”一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传递一次,这意味着服务器出现故障,该消息将永远丢失。由session穿件由消息生产者的send方法设置。
  • JMSMessageID:唯一识别每个消息的标识,由JMS消息生产者产生。由send方法设置。
  • JMSTimestamp:一个JMS Provider在调用send()方法时自动设置,它是消息被发送和消费者实际接收的时间差。由客户端设置。
  • JMSCorrelationID:用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。由客户端设置
  • JMSReplyTo:提供本消息回复消息的目的地址,由客户端设置
  • JMSRedelivered:如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。如果该消息被重新传送,JMSRedelivered=true 否则 JMSRedelivered=flase 。由JMS Provider设置
  • JMSType:消息类型的标识符,由客户端设置
  • JMSExpiration:消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT的时间值。如果timeToLive值等于零,则JMSExpiration被设置为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。由send方法设置
  • JMSPriority:消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认是4级。由send方法设置
    一个消息的消息头有这些属性,我们可以按照需要对这个消息的消息进行设计,在将这个消息使用消息生产者的send()方法发送到消息服务上。

消息体

在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。

不同的消息类型如下:

  • Text message : javax.jms.TextMessage,表示一个文本对象。
  • Object message : javax.jms.ObjectMessage,表示一个JAVA对象。
  • Bytes message : javax.jms.BytesMessage,表示字节数据。
  • Stream message :javax.jms.StreamMessage,表示java原始值数据流。
  • Map message : javax.jms.MapMessage,表示键值对。

消息属性

我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。
对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。

消息消费

在JMS中,消息的产生和消息是异步的。
对于消费来说,JMS的消息者可以通过两种方式来消费消息:

  • 同步:订阅者或消费者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
  • 异步:订阅者或消费者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

消息持久化(可靠性)

场景问题:服务器断电重启,未被消费的消息是否会在重启之后被继续消费?

JMS提供两种选择:(非持久性模式/持久性模式)

  • 非持久性模式: 服务器断电(关闭)之后,使用非持久性模式时,没有被消费的消息不会继续消费全部丢失;
    程序会报一个连接关闭异常停止运行,继续启动服务器运行程序,不会接收任何消息。
  • 持久性模式: 服务器断电(关闭)后,使用持久性模式时,没有被消费的消息会继续消费;
    程序也会报连接关闭异常,但再次启动服务器和程序后,接收方还能继续原来的消息再次接收。

设置方式:

1
2
PERSISTENT: 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失
NON_PERSISTENT: 不要求JMS provider持久保存消息

注意:topic持久化时,消费者需要在MQ注册一个自己的身份id标识,在消费者宕机时,生产者会为对应的id保留数据。

消息确认机制(可靠性)

JMS消息只有在被确认之后,才认为已经被成功的消费了,消息的成功消费通常包含三个阶段:客户接收消息,客户处理消息和消息被确认.

在事务性会话中,当一个事务被提交的时候,确认自动发生。
在非事务性会话中,消息何时被确认取决于创建会话时的应答模式。改参数有三个可选值:

  • Session.AUTO_ACKNOWLEDGE:自动签收
    当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
  • Session.CLIENT_ACKNOWLEDGE:手动签收
    客户通过调用消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息,将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
  • Session.DUPS_ACKNOWLEDGE:
    该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true

消息过期

可以设置消息在一定时间后过期,默认是永不过期。

消息优先级

可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

消息的临时目的地

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保存的时间。只有创建该临时的地方的连接上的消息消费者才能够从临时目的地中提取消息。

JMS应用程序接口

  1. ConnectionFactory 接口(连接工厂)
    • 创建Connection对象的工厂,根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂
    • 分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
  2. Destination 接口(目标)
    • Destination是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。
    • 是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);
    • 对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
    • 所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。
  3. Connection 接口(连接)
    • Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。
    • Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
  4. Session 接口(会话)
    • Session是我们操作消息的接口。表示一个单线程的上下文,用于发送和接收消息。
    • 由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。
    • 可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
    • 同样,也分QueueSession和TopicSession。
  5. MessageProducer 接口(消息的生产者)
    • 消息生产者由Session创建,并用于将消息发送到Destination。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
    • 同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
  6. MessageConsumer 接口(消息消费者)
    • 消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。
    • 可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。
    • 当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
  7. Message 接口(消息)
    • 是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。
    • 一个消息有三个主要部分:
      • 1、消息头(必须):包含用于识别和为消息寻找路由的操作设置。
      • 2、一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
      • 3、一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口非常灵活,并提供了许多方式来定制消息的内容。
    • 消息接口非常灵活,并提供了许多方式来定制消息的内容。
  8. MessageListener
    • 消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
    • EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

JMS 应用程序接口

JMS提供者实现

要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。现在既有开源的提供者也有专有的提供者。
开源的提供者包括:Apache ActiveMQ、Kafka、WebMethods、阿里的RocketMQ等。

参考资料

https://www.cnblogs.com/molao-doing/articles/6557305.html
https://blog.csdn.net/pengchenghui/article/details/80934723
https://mp.weixin.qq.com/s/WCeYaDjK9dWQ95m3Wv3vPw