如何采用MQTT协议实现android消息推送
发布时间:2025-05-17 20:13:30 发布人:远客网络
一、如何采用MQTT协议实现android消息推送
MQTT协议实现android消息推送,我想每个Android开发人员对它应该都是比较熟悉的。 MQ遥测传输(MQTT)是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:网络代价昂贵,带宽低、不可靠。在嵌入设备中运行,处理器和内存资源有限。该协议的特点有:使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。对负载内容屏蔽的消息传输。使用 TCP/IP提供网络连接。有三种消息发布服务质量:“至多一次”,消息发布完全依赖底层 TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。“至少一次”,确保消息到达,但消息重复可能会发生。“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2字节),协议交换最小化,以降低网络流量。使用 Last Will和 Testament特性通知有关各方客户端异常中断的机制MQTT最简单的使用包括两种,一种是发消息,一种是订阅消息。发消息就是向一个固定IP地址的某个主题发送消息(publish)订阅消息是向服务器端订阅某些主题,当其他客户端向服务器的这个主题广播消息时,那么所有订阅这个主题的客户端就都能收到了MQTT是一项消息传递技术,由IBM再2001年发布。总结一下,机制就是使用一个代理服务器message broker,客户端client连接上这个服务器,然后告诉服务器说,我可以接收哪些类型的消息,同时,client也可以发布自己的消息,这些消息根据协议的内容,可以被其他client获取。只要手机客户端,连上服务器,然后就可以接收和发布消息了,不用自己写socket什么了,低带宽,低耗电量,代码量也少,很简单吧。 package com.pig.test.mqtt;002003 import com.ibm.mqtt.MqttClient;004 import com.ibm.mqtt.MqttException;005 import com.ibm.mqtt.MqttSimpleCallback;006007 public class SubscribeClient{008 private final static String CONNECTION_STRING="tcp://192.168.1.60:1883";009 private final static boolean CLEAN_START= true;010 private final static short KEEP_ALIVE= 30;//低耗网络,但是又需要及时获取数据,心跳30s011 private final static String CLIENT_ID="client1";012 private final static String[] TOPICS={013"Test/TestTopics/Topic1",014"Test/TestTopics/Topic2",015"Test/TestTopics/Topic3",016"tokudu/client1"017};018 private final static int[] QOS_VALUES={0, 0, 2, 0};019020//////////////////021 private MqttClient mqttClient= null;022023 public SubscribeClient(String i){024 try{025 mqttClient= new MqttClient(CONNECTION_STRING);026 SimpleCallbackHandler simpleCallbackHandler= new SimpleCallbackHandler();027 mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法028 mqttClient.connect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);029 mqttClient.subscribe(TOPICS, QOS_VALUES);//订阅接主题030031/**032*完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息033*/034035 mqttClient.publish(PUBLISH_TOPICS,"keepalive".getBytes(), QOS_VALUES[0], true);036037} catch(MqttException e){038// TODO Auto-generated catch block039 e.printStackTrace();040}041}042043/**044*简单回调函数,处理client接收到的主题消息045*@author pig046*047*/048 class SimpleCallbackHandler implements MqttSimpleCallback{ 049050/**051*当客户机和broker意外断开时触发052*可以再此处理重新订阅053*/054@Override055 public void connectionLost() throws Exception{056// TODO Auto-generated method stub057 System.out.println("客户机和broker已经断开");058}059060/**061*客户端订阅消息后,该方法负责回调接收处理消息062*/063@Override064 public void publishArrived(String topicName, byte[] payload, int Qos, booleanretained) throws Exception{065// TODO Auto-generated method stub066 System.out.println("订阅主题:"+ topicName);067 System.out.println("消息数据:"+ new String(payload));068 System.out.println("消息级别(0,1,2):"+ Qos);069 System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息):"+ retained);070}071072}073074/**075*高级回调076*@author pig077*078*/079 class AdvancedCallbackHandler implements MqttSimpleCallback{080081@Override082 public void connectionLost() throws Exception{083// TODO Auto-generated method stub084085}086087@Override088 public void publishArrived(String arg0, byte[] arg1, int arg2,089 boolean arg3) throws Exception{090// TODO Auto-generated method stub091092}093094}095096/**097*@param args098*/099 public static void main(String[] args){100// TODO Auto-generated method stub101 new SubscribeClient(""+ i);102103}104105}
二、为什么每份 Android 简历都说 “熟悉 MQTT 协议”
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种基于 TCP/IP协议族的应用层协议。MQTT协议是专门针对硬件性能低下&网络状况不稳定的场景设计的,这使得 MQTT在物联网和移动应用等受限场景得到广泛应用。
目前,MQTT主要分为两个大版本:
物联网和移动应用场景的特点是硬件性能低下和网络状况不稳定,而 MQTT协议就是专门针对这种环境设计的,主要在四个方面有优势:
结论:这三种协议并没有绝对的优胜者,最好的协议取决于具体的需求和限制条件。但如果只从带宽、电池、功能多样性这些基本条件看,MQTT在其中是更占优的选择。
MQTT协议的设计特性中包含了一项“高可靠性交付”,它需要一个保证可靠的底层传输层协议,因此 TCP协议、TLS协议、WebSocket协议都可以作为 MQTT的底层协议。而无连接的 UDP协议会丢失或重排数据,不能满足 MQTT协议的传输需要。
MQTT是基于发布-订阅模型(pub/sub)的消息传递协议,与请求-响应模型不同,发布-订阅模型主要有三种角色: publisher& subscriber& subscriber:
当 client发布某个主题的消息时,broker会将该消息分发给任何已订阅该主题的 client。通常来说,client不会存储消息,一旦消息被发送到这些 client,消息就会从 broker上删除。另外,保留消息、持久连接和服务质量 QoS可能会导致消息临时存储在 broker上。
发布-订阅模式使得消息的发布者和订阅者解耦,主要体现为空间解耦和时间解耦:
图片引用自 —— cxuan著
1、固定报头:每一个 MQTT消息都包含一个固定报头,包含消息类型、标志位和剩余长度三个部分。固定报头长度为 2~ 5字节,具体取决于“剩余长度”的大小,格式如下:
2、可变报头:不同消息的可变报头内容不一样,不过其中有一个比较通用的字段:
3、载荷:某些 MQTT消息会包含一个有效载荷,对于 PUBLISH消息来说,有效载荷就是应用消息。
MQTT的连接总是发生在 client和 broker之间,两个 client之间不会互相感知。请求连接时,client会向 broker发送 CONNECT连接消息,broker接受连接后会响应 CONNACK连接确认消息。一旦连接建立,连接会一直保持打开状态,直到 client发送 DISCONNECT断开连接消息或连接异常中断。
CONNECT是 client发送给 broker的首个消息,并且在一次连接中,client只能发送一次 CONNECT消息,发送的第二个 CONNECT消息会被 broker当作违反协议处理,并断开连接。在 CONNECT消息中,主要包含以下内容:
CONNACK消息用于确认 CONNECT消息。CONNECT是 client发送给 broker的首个消息,相应地,broker发送给 client的首个消息一定是 CONNACK消息。在 CONNACK消息中,主要包含以下内容:
DISCONNECT消息由 client发送给 broker,用于断开连接。 DISCONNECT消息没有可变报头和有效载荷,也没有对应的确认应答消息,表示一个干净利索地断开连接操作。断开连接后,client不能再发送除 CONNECT消息之外的消息,broker也需要丢弃和当前会话有环的遗嘱消息。
MQTT是基于发布订阅模型的协议,在建立连接后,client可以向 broker订阅感兴趣的一个或多个话题。
SUBSCRIBE消息由 client发送给 broker,用于订阅感兴趣的话题,SUBSCRIBE消息主要包含以下内容:
SUBACK消息用于确认 SUBSCRIBE消息。SUBACK消息主要包含以下内容:
UNSUBSCRIBE消息由 client发送给 broker,用于退订不感兴趣的话题,UNSUBSCRIBE消息主要包含以下内容:
UNSUBACK消息用于确认 UNSUBSCRIBE消息。UNSUBACK消息非常简单,只有一个包唯一标识(位于可变报头)。
当 MQTT client在连接到 broker之后就可以发送消息了,每条 PUBLISH消息都包含一个 topic,broker会根据 topic将消息发送给感兴趣的 client。除此之外,每条消息还会包含一个 Payload,Payload是真正发布的应用消息,载荷的内容和格式由应用层决定,MQTT协议层不关心。
PUBLISH消息可以由 client发送给 broker,也可以由 broker发送给 client,用来运送应用层消息。PUBLISH消息主要包含以下内容:
PUBLISH消息的接收方需要发送确认应答,不同 QoS等级的 PUBLISH消息响应的消息不同:
当 client和 broker在一段时间内没有数据交互时,client会发送 PINGREQ探测消息,用于判断连接是否正常,来决定是否要关闭该连接,这就是 MQTT协议的保活机制。
PINGREQ消息由 client发送给 broker。
PINGRESP消息由 broker发送给 client,代表 client是存活的。
MQTT主题本质上是一种“寻址形式”,用于将应用层消息分发到期望的客户端。MQTT主题是一种类似于文件系统的分层结构,使用“/”正斜杠作为分隔符。
客户端订阅主题时,可以订阅确定的主题(例如“group/group123”),也可以使用“通配符”来同时订阅多个主题。需要注意的是:在发布消息是不允许使用主题通配符,client每次发布消息只能发布到单个主题。
$SYS主题是 broker上默认创建的只读主题,除此之外,broker不会默认创建任何主题,所有主题都是由客户端订阅或发布才创建的,都不是永久性的。关于$SYS主题的更多介绍在这里
当 client连接到 broker时,可以使用持久连接或非持久连接,这是通过 CONNECT消息中的 CleanSession标志来决定的(当 CleanSession= 0时表示持久连接)。对于持久会话,broker会存储会话状态;而对于非持久会话,broker不会存储 client的任何内容。会话状态主要包含以下内容:
QoS 0等级的 PUBLISH消息的交付能力完全依赖于底层传输层,QoS 1和 QoS 2等级开始在应用层提高 PUBLISH消息的交付能力。当消息丢失时,发送端会重新发送早前尝试发送过的 PUBLISH消息(DUP= 1),接收者收到消息也会发送确认响应消息。
在 QoS 0的等级的 PUBLISH消息中不包含包唯一标识。发送者不考虑消息交付结果,接收者也不发送响应。接收者最多只能收到一次消息,也有可能一次也收不到。
在 QoS 1等级的 PUBLISH消息中包含包唯一标识,发送方会一直将该消息当作“未确认”的消息,直到收到对应的 PUBACK确认消息。具体消息流如下:
QoS 2是最高的服务质量,保证消息不会丢失也不会重复,缺点是会增加开销。在 QoS 2等级的 PUBLISH消息中包含包唯一标识,发送者会一直将该消息当作“未确认”的消息,知道收到对应的 PUBCOMP确认消息。
当 client发布某个主题的消息时,broker会将该消息分发给任何已订阅该主题的 client,随后这条消息会从 broker上删除。可以设置 RETAIN保留标志设置该 PUBLISH消息为保留消息,broker会存储该主题的最后一条保留消息,当新的 client注册订阅时,并且匹配该消息主题时,该保留消息会发送给订阅者。需要注意:broker只会为每个主题保存最近一条保留消息,新收到的 RETAIN= 1的消息会覆盖原本那条保留消息;
持久会话&服务质量等级&保留消息都会影响新订阅者是否接受消息,总结如下表:
标记 DUP= 1的消息是重复发送的消息,MQTT消息重传有两种场景:
需要注意:DUP标志只对 OoS> 0的消息有效,所有 QoS= 0的消息 DUP标志必须设置为 0;
TCP协议的报文重传机制是对所有 TCP报文有效的重传机制,而 MQTT协议的消息重传机制只对一小部分消息有效,用于实现更可靠的消息交付保证。虽然 TCP协议在一般情况下可以保证不丢包,但是这并不是绝对的,依然存在请求超时或者连接中断等情况。而 MQTT协议的 QoS 1和 QoS 2要求更可靠的交付能力,并且需要在客户端重连后也能保证交付。因此,MQTT协议也定义了一个消息重传机制。
到这里,关于 MQTT协议的工作原理&协议消息格式&核心特性等内容就介绍完了。我知道你应该会对 MQTT协议的实战应用更加感兴趣,下一篇文章里,我将带你实现基于 MQTT协议的 IM服务,请关注。
三、Android MQTT 通信
MQTT协议是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。
常用于 IOT物联网和一些需要服务端主动通知客户端的场景。
2.创建 MqttHelper辅助类,设置回调监听
连接成功或失败,以及中途的连接掉线,会触发 OnMqttStatusChangeListener回调
onSubMessage订阅的消息回调,因为存在订阅多个 topic的情况,所以回调能知道是来自哪个 Topic的消息;
onPubMessage发布的消息回调,用于确认发布的消息是否发送成功。
需要在 MQTT连接成功后才能订阅 topic,否则订阅 Topic不成功,收不到对应消息
由于 MQTT启动了一个 Service,而 Android 8.0以上对于后台 Service限制时长 5秒;所以将 MqttService绑定到 Notification上成为了一个前台通知;通知的标题和内容显示可以在 strings.xml中设置,对应属性如下:
Android 8.0及以上开启前台服务绑定到通知,8.0以下默认不启用,可将 mqtt_foreground_notification_low_26设为 true,将 8.0以下设备也开启前台通知服务
创建 MQTT实例时需要传送参数 MqttOptions,下面将介绍下部分参数;
MQTT是一种发布/订阅的消息协议,通过设定的主题 Topic,
发布者向 Topic发送的 payload负载消息会经过服务器,转发到所有订阅
通配符:假想移动端消息推送场景,有的系统消息是全体用户接收,有的消息是 Android或 iOS设备接收,又或者是某些消息具体推送到用户,当然,对应的多种类型消息可以通过多订阅几个对应的 Topic解决,也可以使用通配符;
通配符有两个,"+"和"#",与正斜杠"/"组合使用;加号只能表示一级Topic,井号可以表示任意层级 Topic;例如:订阅 Topic为" System/+",发布者发布的 Topic可以是 System、System/Android、System/iOS;但是不能是 System/iOS/123,而订阅的 Topic如果是" System/#"则可以收到;
注意,只有订阅的 Topic才可以使用通配符,发布和遗嘱的 Topic不能包含通配符.
发布者和订阅者都是属于客户端,客户端与服务端建立连接之后,发送的第一个报文消息必须是 Connect消息,而 Connect的消息载荷中必须包含 clientID客户端唯一标识;
如果两个客户端的 clientID一样,则服务端记录第一个客户端连接之后再收到第二个客户端连接请求,则会向一个客户端发送 Disconnect报文断开连接,并连接第二个客户端,而如果此时设置了自动重连,第一个客户端再次连接,服务端又断开与第二个的连接,连上第一个客户端,如此将导致两个客户端不断的被挤掉重连.
注意: clientID使用的字符最好是大小写字母和数字,长度最好限制在[1, 23]之间;
可选参数,客户端没有主动向服务端发起 disconnect断开连接消息,然而服务端检测到和客户端之间的连接已断开,此时服务端将该客户端设置的遗嘱消息发送出去
应用场景:客户端因网络等情况掉线之后,可以及时通知到所有订阅该遗嘱 Topic的客户端;
遗嘱 Topic中不能存在通配符.
客户端和服务端之间建立的会话状态,一般用于消息保存,如果设置清除 Session,则每次客户端和服务端建立连接会创建一个新的会话,之前连接中的消息不能恢复,
而设置不清除会话,对应发布者发送的 qos为 1和2的消息,还未被订阅者接收确认,则需要保存在会话中,以便订阅者下次连接可以恢复这些消息;
注意: Session存储的消息是保存在内容中的,所以如果不是重要的消息,最好是设置清除 Session,或者设置 qos= 0;
标识客户端传输一次控制报文到下一次传输之间允许的空闲时间;在这段时间内,如果客户端没有其他任何报文发送,必须发送一个 PINGREQ报文到服务器,而如果服务端在 1.5倍心跳时间内没有收到客户端消息,则会主动断开客户端的连接,发送其遗嘱消息给所有订阅者。而服务端收到 PINGREQ报文之后,立即返回 PINGRESP报文给客户端
心跳时间单位为秒,占用2个字节,最大 2^16- 1= 65535秒(18小时12分钟15秒),设置为 0表示不使用心跳机制;心跳时间一般设置为几分钟或几十秒即可,时间短点可以更快的发出遗嘱消息通知掉线,但是时间短会增加消息频率,影响服务端并发;微信长连接为 300秒,而三大运营商貌似也有个连接时间最小的为 5分钟。
服务质量等级 qos对应两部分,一是客户端到服务端发送的消息,一是服务端到客户端订阅的消息;从发布者到订阅者实际 qos为两段路中 qos最小的。
qos可选值 0(最多交付一次)、1(最少交付一次)、2(正好交付一次);
qos= 0:接收方不发送响应,发送方不进行重试;发送方只管发一次,不管是否发成功,也不管接收方是否成功接收,适用于不重要的数据传输;
qos= 1:确保消息至少有一次到达接收方,发送方向接收方发送消息,需要等待接收方返回应答消息,如果发送方在一定时间之内没有收到应答,发送方继续下一次消息发送,直到收到应答消息,删除本地消息缓存,不再发送;所以接收方可能收到1-n次消息;适用于需要收到所有消息,客户端可以处理重复消息。
qos= 2:确保消息只一次到达接收方,发送方和接收方之间消息处理流程最复杂;
Mqtt Qos深度解读和 MQTT协议QoS2准确一次送达的实现
字节流类型,是 MQTT通信传输的真实数据
发布消息时设置,对应参数 retain,服务端将保留对应 Topic最新的一条消息记录;保留消息的作用是每次客户端连接上线都会收到其 Topic的最后一条保留消息,所以可能存在网络不稳定,频繁掉线重连,每次重连重复收到保留消息;
可以向对应的 Topic发送一条空消息,用于清除保留消息。
MQTT服务搭建 Apache Apollo服务器搭建 MQTT服务