MQTT Client Library Encylcopedia – Paho Go

Guest post by Al Stockdill-Mander

Paho Go Client
Language Go
License EPL and EDL
Website http://www.eclipse.org/paho/
API Style Asynchronous

Description

Back in October 2013 my job involved working on the other Paho MQTT client libraries, and having picked Go as a new language to learn, what better way to learn it than to write an MQTT client? 🙂 Starting as a side project with two colleagues the code was contributed to Paho in January 2014 and continued to be developed in the open.

The Paho Go library also includes a packet library that can be used independently for reading and writing MQTT packets.

Currently at 0.9 the Paho Go library is imminently to release a 1.0 stable version, actively maintained it has been picked up by commercial and open source projects such as Gobot http://gobot.io/

Features

MQTT 3.1 ok
MQTT 3.1.1 ok
LWT ok
SSL/TLS ok
Automatic Reconnect ok
QoS 0 ok
QoS 1 ok
QoS 2 ok
Authentication ok
Throttling nok

Usage

Installation

Assuming you already having a working Go installation the easiest way to get the Paho Go library is to run;

go get git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git

This will download the library to your $GOPATH/src directory, to make use of the library in your application add

`git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git`

to your list of imports.

Connect

opts := mqtt.NewClientOptions().AddBroker("tcp://broker.hivemq.com:1883").SetClientID("sample")

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
	panic(token.Error())
}

To connect to an MQTT broker the two things that must be provided are the URL of the broker and a client ID to be used. To do this we create a new ClientOptions struct and add a broker url and set the client ID. The functions that operate on a ClientOptions struct return a pointer to the altered struct allowing you to chain the functions together.

Inspired by the Paho Java library the Paho Go library allows you to optionally receive a token when performing actions that can be used to determine when the action is complete. Calling `token.Wait()` is a blocking call that will only return when the action is finished, `token.WaitTimeout(

Connect with MQTT 3.1 or MQTT 3.1.1

opts := mqtt.NewClientOptions().AddBroker("tcp://broker.hivemq.com:1883").SetClientID("sample")
opts.SetProtocolVersion(4)

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
	panic(token.Error())
}

By default the Paho Go library will attempt to connect to an MQTT broker using 3.1.1, if that fails it will automatically fallback and attempt to connect using 3.1. `SetProtocolVersion()` allows you to explicitly set the version to connect with, 4 is 3.1.1, 3 is 3.1. If you do explicitly set a version the fallback mechanism is disabled.

Connect with LWT

opts := mqtt.NewClientOptions().AddBroker("tcp://broker.hivemq.com:1883").SetClientID("sample")
opts.SetWill("my/will/topic", "Goodbye", 1, true)

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
	panic(token.Error())
}

There are two functions for setting the LWT in the Paho Go library, `SetWill()` and `SetBinaryWill()`, and both take 4 parameters. The first in both is a string of the topic to which the LWT will be published. The second is the payload of the message, in `SetWill()` this is a string value, in `SetBinaryWill()` it is a []byte. The third parameter is the qos level of the message and the fourth is a bool for the retained flag of the LWT

Connect with Username / Password

opts := mqtt.NewClientOptions().AddBroker("tcp://broker.hivemq.com:1883").SetClientID("sample")
opts.SetUsername("username")
opts.SetPassword("password")

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
	panic(token.Error())
}

Publish

c.Publish("test/topic", 1, false, "Example Payload")

if token := c.Publish("test/topic", 1, false, "Example Payload"); token.Wait() && token.Error() != nil {
	fmt.Println(token.Error())
}

In the above sample c is an mqtt.Client as returned by mqtt.NewClient(). Publish takes 4 parameters; A string of the topic to publish to, the qos value for the message, a bool for the retain property of the message, and either a string or []byte for the payload. Also I’m demonstrating how to use Publish() with and without using a token.

Publish a retained message

c.Publish("test/topic", 1, true, "Example Payload")

Subscribe

var msgRcvd := func(client *mqtt.Client, message mqtt.Message) {
	fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())
}

if token := c.Subscribe("example/topic", 0, msgRcvd); token.Wait() && token.Error() != nil {
	fmt.Println(token.Error())
}

`Subscribe()` takes 3 paramters; a string of the topic to subscribe to, the qos level of the subscription, and a function to be called when a message matching this subscription is received. The callback function must have the signature `func(*mqtt.Client, mqtt.Message)`
It is acceptable to pass nil as the callback function, in this case when a message is received the library will call the client default message handler (if set), which can be set by calling `SetDefaultPublishHandler()` on a ClientOptions struct.

Unsubscribe

c.Unsubscribe("example/topic")

if token := c.Unsubscribe("example/topic"); token.Wait() && token.Error() != nil {
	fmt.Println(token.Error())
}

`Unsubscribe()` will accept more than one topic to Unsubscribe from, each should be a separate string parameter.

Disconnect

c.Disconnect(250)

`Disconnect()` takes one parameter which is the number of milliseconds to allow for the library to finish any work in progress.

Using SSL / TLS

opts := mqtt.NewClientOptions().AddBroker("ssl://iot.eclipse.org:8883").SetClientID("sample")

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
	panic(token.Error())
}

Connecting via SSL/TLS to a broker that supports it is as simple as changing the scheme part of the broker URL; ssl, tls or tcps are all supported as indicating the client should connect securely. This assumes you are connecting to a broker that uses a certificate known by the system, if you are using self signed certificates you will need to a tls.Config struct and pass it to ClientOptions with `SetTLSConfig()`. The samples directory of the Paho Go library has an example of doing this.

Author Information

Al Stockdill-Mander
Al is a Software Engineer at IBM and has worked on the Paho C, Java, Javascript and Embedded C clients as well as being one of the originators and maintainer of the Paho Go client. He is also a member of the OASIS MQTT TC.

2 comments

  1. Shubham says:

    Is the callback function called asynchronously through a go routine ? or is it synchronous?

    I need to write a message parser that takes messages from my mqtt broker and massages them and writes them to my ES server. This is be able to serve 1 million requests per second. Will a single go script work ?

    1. We recommend to ask this question on the Paho mailing list, since the authors may get the question earlier this way.

      In general I am pretty sure that a single MQTT client is not able to process 1 million messages per second. This would mean your application only has 1 microsecond for executing all your business logic, including parsing. Scaling out your client by using shared subscriptions (http://www.hivemq.com/blog/mqtt-client-load-balancing-with-shared-subscriptions/) may help here.

      Hope this helps,
      Dominik

Leave a Reply

Your email address will not be published. Required fields are marked *