IT技术之家

首页 > 数据库

数据库

Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)_雪の星空朝酱_手机mqtt

发布时间:2022-10-24 16:48:00 数据库 0次 标签:android MQTT kotlin 即时通讯 消息协议
??可能有很多小伙伴和我一样是初次知道mqtt,然后它是啥,用来干什么那就更不清楚了,前段时间公司要求调研这方面,所以今天这篇文章就来介绍mqtt是啥,以及Android可以用它来干啥。??实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(payloa...

Android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

关于MQTT介绍MQTT协议实现方式MQTT服务器MQTT协议中的订阅、主题、会话MQTT协议中的方法MQTT服务质量 (QoS)MQTT服务端(Broker)MQTT客户端 mqtt服务选取Android连接mqtt配置修改mainActivity文件订阅 topic取消订阅 topic发布消息publish断开 MQTT 连接

关于

??可能有很多小伙伴和我一样是初次知道mqtt,然后它是啥,用来干什么那就更不清楚了,前段时间公司要求调研这方面,所以今天这篇文章就来介绍mqtt是啥,以及Android可以用它来干啥。

MQTT介绍

MQTT协议实现方式

??实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

MQTT服务器

??MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

(1)接受来自客户的网络连接;(2)接受客户发布的应用信息;(3)处理来自客户端的订阅和退订请求;(4)向订阅的客户转发应用程序消息。

MQTT协议中的订阅、主题、会话

一、订阅(Subscription)
??订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
二、会话(Session)
??每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
??连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
??一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
??消息订阅者所具体接收的内容。

MQTT协议中的方法

??MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

(1)Connect。等待与服务器建立连接。(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。(3)Subscribe。等待完成订阅。(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

MQTT服务质量 (QoS)

??qos为0"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
qos为1"至少一次",确保消息到达,但消息重复可能会发生。
qos为2"只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
可以在订阅/发布消息的时候设置服务质量。

MQTT服务端(Broker)

??网上有开源的服务端项目代码可以在电脑上进行搭建,也可以试用一些供应商的服务器。

MQTT客户端

经调研,Android开发mqtt客户端主流使用的是eclipse提供的paho.mqtt,项目引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

mqtt服务选取

??因为是调研,所以就不考虑自己去打一个mqtt的服务,网上搜能搜到很多搭建的mqtt,这里我是用了一个三方可以免费试用14天的服务方EMQ,使用前需要注册一个账号,如果你有github账号的话可以直接提供使用。
然后我们选中试用的服务(基础版和专业版都可以试用14天,这边建议试用专业版的),我因为之前已经试用过了专业版,所以现在只能试用基础版的了,专业版的好处就是提供的ip:

??然后我们等待它自动部署好项目即可:

??完成之后我们需要添加一个认证用户,这个认证用户用来连接时候判断身份用的:


??认证的用户及密码需要记一下。

Android连接mqtt

配置

??首先要在项目的build文件里添加如下引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

其实这里有个问题,target为Android 12的(31),需要去做很多修改,包括去掉引用,当然这部分我准备放到第二篇里面来讲如何适配Android12版本手机实现mqtt使用。
??修改Androidmanifest.xml文件,添加权限和mqttservice的的注册:

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<service android:name="org.eclipse.paho.android.service.MqttService" />

修改mainActivity文件

private val TAG = "MqttClient"
private lateinit var mqttClient: MqttAndroidClient
override fun onCreate(savedInstanceState: Bundle?) {
   //....
   val serverURI = "tcp://pc1c6e1c.cn-shenzhen.emqx.cloud:11838"
   mqttClient = MqttAndroidClient(this, serverURI, "kotlin_mqtt_test1") //"kotlin_mqtt_test1"是作为连接客户端的名称来使用,所以要注意避免重复
}
fun connect() {
        mqttClient.setCallback(object : MqttCallback {
            override fun messageArrived(topic: String?, message: MqttMessage?) {
                Log.d(TAG, "Receive message: ${message.toString()} from topic: $topic")
            }

            override fun connectionLost(cause: Throwable?) {
                Log.d(TAG, "Connection lost ${cause.toString()}")
            }

            override fun deliveryComplete(token: IMqttDeliveryToken?) {

            }
        })
        val options = MqttConnectOptions()
        options.apply {
                userName = "tobeyr1"
                this.password = "1234".toCharArray()
                connectionTimeout = 12
                this.keepAliveInterval = 0
                this.isAutomaticReconnect = false
                this.isCleanSession = true
            }
        try {
            mqttClient.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Connection success")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Connection failure")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }

    }

??其中我们的serverurl可以在橄榄中看到:

??然后我们可以在调试台的监控里面看到已经连接到了mqtt服务:

订阅 topic

private fun subscribe(topic: String, qos: Int = 1) {
        try {
            mqttClient.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

??订阅成功之后也可以在控制台看到订阅的主题:

取消订阅 topic

private fun unsubscribe(topic: String) {
        try {
            mqttClient.unsubscribe(topic, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Unsubscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to unsubscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

发布消息publish

private fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained
            mqttClient.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

断开 MQTT 连接

private fun disconnect() {
        try {
            mqttClient.disconnect(null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Disconnected")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to disconnect")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

??好了,本篇简单介绍Android(11及以下版本)连接mqtt服务就到此结束了,下篇《Android使用MQTT订阅及发布消息((二)兼容Android12 封装Mqtt客户端service)》将会介绍兼容Android12版本调用mqtt服务。有问题欢迎批评指正,觉得不错的也请点个赞,多谢。