如何在 Arduino 上使用 MQTT--PubSubClient 库使用指南

PubSubClient 库使用简介 源地址

本文是 PubSubClient 库文档的中文翻译,本库的作者是 Nick O’Leary - @knolleary

PubSubClient库允许你创建一个客户端与支持MQTT的服务器通信,进行简单的发布/订阅消息

Github地址

关于 MQTT 的更多信息,请访问mqtt.org

适用的硬件

PubSubClient库使用Arduino Ethernet Client提供的接口与底层网络硬件交互,这意味着越来越多的开发板得到了支持,包括:

  • Arduino Ethernet
  • Arduino Ethernet Shield
  • Arduino YUN – use the included YunClient in place ofEthernetClient, and be sure to do a Bridge.begin() first
  • Arduino WiFi Shield - if you want to send packets greater than 90 bytes with this shield, enable the MQTT_MAX_TRANSFER_SIZE option in PubSubClient.h.
  • Sparkfun WiFly Shield – when used with this library
  • Intel Galileo/Edison
  • ESP8266
  • ESP32

需要注意的是,这个库无法在基于 ENC28J60 芯片的硬件上正常运行,例如the Nanode or the Nuelectronics Ethernet Shield,对于这些硬件,我们有一个可供替代的库

使用简介

  1. 在第一步,我们应调用构造函数创建一个对象,一个典型的构造函数如下,构造函数还可以接受多种参数,见下

     PubSubClient client(espClient)
    
  2. 如果我们需要接收信息,则必须定义一个回调函数,回调函数定义了硬件接收到信息时需要怎么做,下面是一个实例

     void callback(char* topic, byte* payload, unsigned int length) {
         Serial.print("Message arrived [");
         Serial.print(topic);
         Serial.print("] ");
         for (int i=0;i<length;i++) {
             Serial.print((char)payload[i]);
         }
         Serial.println();
     }
    
  3. 如果你在第一步调用构造函数时省略了许多信息,那么在进行下去之前最好先把它们补齐,运用下列函数将完成这一项工作

     PubSubClient setServer (server, port)
     PubSubClient setCallback (callback)
     PubSubClient setClient (client)
     PubSubClient setStream (stream) //相关说明见下
    
  4. 完成这一切后,我们可以使用 connect() 函数来开始客户端的运行,但我们一般不这样做,因为总有断开连接的可能,最好的办法是编写一个reconnect() 函数并在每一轮循环开始时调用。 connect() 同样可以接受多种参数。

     void reconnect() {
         while (!client.connected()) {
         Serial.print("Attempting MQTT connection...");
         if (client.connect("arduinoClient")) {
            Serial.println("connected");
            client.publish("outTopic","hello world");
            client.subscribe("inTopic");
         } 
         else {
           Serial.print("failed, rc=");
           Serial.print(client.state());
           Serial.println(" try again in 5 seconds");
           delay(5000);
         }
       }
     }
    
  5. 完成连接以后,我们可以开始发布消息了,最简单的一个例子是

      publish (topic, payload)
    

    也可以发布储存在 FLASH 中的消息

      publish_P (topic, payload, length, retained)
    

    还可以使用一系列函数调用,逐字节或多字节的分次确定发布内容

      boolean beginPublish (topic, payloadLength, retained)
      size_t write (uint8_t)
      size_t write (payload, length)
      boolean endPublish ()
    
  6. 订阅特定主题只需调用 subscribe (topic, [qos])

  7. 循环中必须定期调用 loop() 保持连接,而 state() 可检查状态

源API地址

总览

Constructors

PubSubClient ()  
PubSubClient (client)  
PubSubClient (server, port, [callback], client, [stream])  

Functions

* boolean connect (clientID)  
* boolean connect (clientID, willTopic, willQoS, willRetain,willMessage)  
* boolean connect (clientID, username, password)  
* boolean connect (clientID, username, password, willTopic, willQoS, willRetain, willMessage)  
* boolean connect (clientID, username, password, willTopic, willQoS, willRetain, willMessage, cleanSession)    

* void disconnect ()  

* int publish (topic, payload)  
* int publish (topic, payload, retained)  
* int publish (topic, payload, length)  
* int publish (topic, payload, length, retained)  
* int publish_P (topic, payload, length, retained) 
 
* boolean beginPublish (topic, payloadLength, retained)  
* boolean endPublish ()

* size_t write (uint8_t)  
* size_t write (payload, length) 


​ * boolean subscribe (topic, [qos])
​ * boolean unsubscribe (topic)

* boolean loop ()
* int connected ()
* int state ()

* PubSubClient setServer (server, port)  
* PubSubClient setCallback (callback)  
* PubSubClient setClient (client)  
* PubSubClient setStream (stream) 

Other

* Configuration Options  
* Subscription Callback  

详细内容

构造函数Constructors

  1. 创建一个未初始化的实例,该客户端在使用之前必须以正确的设置初始化

     PubSubClient()
    
  2. 创建一个部分初始化的实例,同样需要在使用前初始化,client参数的一个例子是EthernetClient

     PubSubClient(client)
    
  3. 创建一个完全初始化的实例

     PubSubClient((server, port, [callback], client, [stream])
        
         server:服务器 ip 地址,uint8_t[]或者const int[]
         port:服务器端口号,int
         callback:可选参数,一个指向讯息回调函数的指针,当此客户端创建的预订的消息到达时调用该函数
         client:见上
         stream:可选参数,当需要储存接受到的信息时设置,详见示例 mqtt_stream
    

函数Fountions

连接客户端

  1. 连接客户端,参数是一个连接服务器时所使用的ID

     boolean connect (clientID)
    
  2. 使用指定的遗嘱消息连接客户端

     boolean connect (clientID, willTopic, willQoS, willRetain, willMessage)     
           
         willTopic:发布遗嘱消息的主题,const char[]
         willQoS:发布遗嘱消息的等级,int 0,1,2
         willRetain:是否使用保留标志(retain flag)发布遗嘱消息,boolean
         willMessage:想要发送的实际信息(Payload)
    

    备注:保留标志1表示发送的消息需要一直持久保存,不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送,不是所有。 而0则代表仅仅为当前订阅者推送此消息。

  3. 使用指定的用户名与密码连接客户端

     boolean connect(clientID, username, password)
         clientID : the client ID to use when connecting to the server.
         username : the username to use. If NULL, no username or password is used (const char[])
         password : the password to use. If NULL, no password is used (const char[])
    
  4. 上两种的综合

     boolean connect (clientID, username, password, willTopic, willQoS, willRetain, willMessage)
    
  5. 使用指定的….和指定的标志位连接客户端

     boolean connect (clientID, username, password, willTopic, willQoS, willRetain, willMessage, cleanSession)
         cleanSession:boolean
    

    在连接的过程中, 一个客户端设置“cleansession”标记位。如果该位被设置为false,则该连接则被认为是持久连接,其具体表现为:当该客户 断开后,任何订阅的主题和QoS被设置为1或2的信息都会保存,直到该客户端再次连接上server端。若 cleansession 被设置为 true ,所有的订阅主题都会被移除。

    注意:即便标志位被设为false或者0,客户端也不会重试失败的qos值为1的发布,该标志位仅用于维护代理上的订阅

    资料来源

  6. 与客户端断开连接

     void disconnect ()
    

发布信息

  1. 发布一个字符串信息到指定的主题,尽管该函数的返回值是 int 类型,实际上代表了布尔值

     int publish (topic, payload)
            
         topic - the topic to publish to (const char[])
         payload - the message to publish (const char[])
    
  2. 发布一个字符串信息到指定的主题

     int publish (topic, payload, retained)
            
         retained - 信息是否需要保留 (boolean)
    
  3. 发布一个字符串信息到指定的主题

     int publish (topic, payload, length, retained)
            
         length - the length of the message (byte)
    
  4. 发布一个储存在Flash中的消息至指定主题,并带有指定的保留标识符( PROGMEN 待查)。尽管该函数的返回值是 int 类型,实际上代表了布尔值
    Publishes a message stored in PROGMEN to the specified topic, with the retained flag as specified.

     int publish_P (topic, payload, length, retained)
    
         topic - the topic to publish to (const char[])
         payload - the message to publish (PROGMEM byte[])
         length - the length of the message (byte)
         retained - whether the message should be retained (boolean)
    
  5. 开始发送发布内容,实际发布的信息将由一次或多次对 write 函数的调用给出,最后将以 endPublish 函数的调用结束,返回值表示发送是否成功
    Begins sending a publish message. The payload of the message is provided by one or more calls to write followed by a call to endPublish.

     boolean beginPublish (topic, payloadLength, retained)
    
         payloadLength - the length of the message to publish
         retained - whether the message should be retained (boolean)
    
  6. 将一个字节写入发布内容的一部分
    Writes a byte as a component of a publish started with a call to beginPublish.

     size_t write (uint8_t)
    
         uint8_t - the byte to write
    
  7. 将一个字节数组写入发布内容的一部分

     size_t write (payload, length)
        
         payload - the bytes to write (byte[])
         length - the length of the byte array (byte)
    
  8. Finishing sending a message that was started with a call to beginPublish.

     boolean endPublish ()
    

订阅频道

无论是订阅还是取消订阅的操作都是异步完成的

  1. 订阅发布到特定主题的消息

     boolean subscribe (topic, [qos])
        
         qos - optional the qos to subscribe at (int: 0 or 1 only)
    
  2. 取消对指定主题的订阅

     boolean unsubscribe (topic)
            
         topic - the topic to unsubscribe from (const char[])
    

保持连接

  1. 该函数必须定时调用,使客户端接受信息并保持与服务端的连接。返回false表示客户端已经与服务端失去连接

     boolean loop ()
    

检查状态

  1. Checks whether the client is connected to the server.
    尽管该函数的返回值是 int 类型,实际上代表了布尔值

    int connected ()
    
  2. Returns the current state of the client. If a connection attempt fails, this can be used to get more information about the failure.

     int state ()
    

    Returns:
    int - the client state, which can take the following values (constants defined in PubSubClient.h):
    -4 : MQTT_CONNECTION_TIMEOUT - the server didn’t respond within the keepalive time
    -3 : MQTT_CONNECTION_LOST - the network connection was broken 无网络连接
    -2 : MQTT_CONNECT_FAILED - the network connection failed 网络连接失败
    -1 : MQTT_DISCONNECTED - the client is disconnected cleanly 客户端已断开连接
    0 : MQTT_CONNECTED - the client is connected
    1 : MQTT_CONNECT_BAD_PROTOCOL - the server doesn’t support the requested version of MQTT 服务器不支持请求的MQTT版本
    2 : MQTT_CONNECT_BAD_CLIENT_ID - the server rejected the client identifier 服务器拒绝客户端身份
    3 : MQTT_CONNECT_UNAVAILABLE - the server was unable to accept the connection 服务器未接收连接
    4 : MQTT_CONNECT_BAD_CREDENTIALS - the username/password were rejected 用户名/密码错误
    5 : MQTT_CONNECT_UNAUTHORIZED - the client was not authorized to connect 客户端被拒绝连接

设置

  1. 设置服务器详细信息

     PubSubClient setServer(server, port)
        
         server : the address of the server (IPAddress, uint8_t[] or const char[])
         port : the port to connect to (int)
    
  2. 设置消息回调函数

     PubSubClient setCallback(callback)
        
         callback : a pointer to a message callback function called when a message arrives for a subscription created by this client.
    
  3. 设置客户端信息

     PubSubClient setClient(client)
            
         client : an instance of Client, typically EthernetClient.
    
  4. Sets the stream.

     PubSubClient setStream(stream)
        
         stream : an instance of Stream, used to store received messages. See the mqtt_stream example for more information.
    

一个实例

    #include <SPI.h>
    #include <Ethernet.h>
    #include <PubSubClient.h>
    #include <SRAM.h>

    // Update these with values suitable for your network.
    byte mac[]    = {  0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
    IPAddress ip(172, 16, 0, 100);
    IPAddress server(172, 16, 0, 2);

    SRAM sram(4, SRAM_1024);

    void callback(char* topic, byte* payload, unsigned int length) {
      sram.seek(1);

      // do something with the message
      for(uint8_t i=0; i<length; i++) {
        Serial.write(sram.read());
      }
      Serial.println();

      // Reset position for the next message to be stored
      sram.seek(1);
    }

    EthernetClient ethClient;
    PubSubClient client(server, 1883, callback, ethClient, sram);

    void setup()
    {
      Ethernet.begin(mac, ip);
      if (client.connect("arduinoClient")) {
        client.publish("outTopic","hello world");
        client.subscribe("inTopic");
      }

      sram.begin();
      sram.seek(1);

      Serial.begin(9600);
    }

    void loop()
    {
      client.loop();
    }

其他

Configuration Options

下列设置项可被用于设置本库,它们被包含在PubSubClient.h里面

MQTT_MAX_PACKET_SIZE
    Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this size will be ignored.
    Default: 128 bytes

MQTT_KEEPALIVE
    Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection when no other packets are being sent or received.
    Default: 15 seconds

MQTT_VERSION
    Sets the version of the MQTT protocol to use.
    Default: MQTT 3.1.1

MQTT_MAX_TRANSFER_SIZE
    Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a limit to how much data can be passed to them in one go, such as the Arduino Wifi Shield.
    Default: undefined (complete packet passed in each write call)

MQTT_SOCKET_TIMEOUT
    Sets the timeout when reading from the network. This also applies as the timeout for calls to connect.
    Default: 15 seconds

订阅返回

Subscription Callback

如果客户端订阅了一个主题,则必须提供一个返回函数给构造函数,该函数将会在新消息到达客户端时被调用

回调函数形式如下

void callback(const char[] topic, byte* payload, unsigned int length)

    topic - the topic the message arrived on (const char[])
    payload - the message payload (byte array)
    length - the length of the message payload (unsigned int)

在内部,客户端对入站和出站消息使用相同的缓冲区。在回调函数返回之后,或者如果在回调函数中调用publish 或subscribe,则传递给该函数的topic 和payload值将被覆盖。如果超出此范围,则应用程序应创建自己的值副本
Internally, the client uses the same buffer for both inbound and outbound messages. After the callback function returns, or if a call to either publish or subscribe is made from within the callback function, the topic and payload values passed to the function will be overwritten. The application should create its own copy of the values if they are required beyond this.

一个实例

void callback(char* topic, byte* payload, unsigned int length) {
    // In order to republish this payload, a copy must be made
    // as the orignal payload buffer will be overwritten whilst
    // constructing the PUBLISH packet.
    // Allocate the correct amount of memory for the payload copy
    byte* p = (byte*)malloc(length);
    // Copy the payload to the new buffer
    memcpy(p,payload,length);
    client.publish("outTopic", p, length);
    // Free the memory
    free(p);
}