Easy MQTT application with Fyne

by | Jan 10, 2022 | IoT

This small blog will introduce you to how easy it is to use Fyne to quickly get a visual and portable go application that connect to the network and IoT devices.

For this example application, we will be connecting the application to a MQTT broker which publishes data from a Tempest weather station thanks to a home assistant integration which is usually used for a home assistant setup. All of that integration is the result of the work of open source contributors which makes complex projects like this easy to create. I won’t cover the installation and configuration of this setup here as there is enough already published on writing an app GUI with Fyne – you can find more at the Fyne developer website.

So, what is the goal here. I want to make a simple application that will display in real time the value that the Tempest gets. I won’t be spending a lot of time of making it fancy, but it should still look clean and always be interactive, never blocking the user flow. Finally it should be portable and run on every platform currently supported by Fyne. Sounds easy right? And it really should, you can find the entire source code and its 400 lines of code here now, but we should dive in first together.

There is really only two important parts in this application, first the form to get the information on the MQTT broker to connect to which is in server.go that will look like this:

Connection form

And the second important file that defines the widgets to display the MQTT information and refresh its content. This is in weather.go and will look like this:

The main screen that display live data from the Tempest weather station

So let’s look first at setting up the connection to the MQTT broker inside server.go . The first step is to create a dialog which contains a form and will validate user input before trying to make a connection. This is done inside connectionDialogShow(). We first create the 3 entries needed for getting the broker address, the login and password. The only special one is the broker address that we want to validate against a regex. This is done with the following code:

	broker := widget.NewEntry()
	broker.SetPlaceHolder("tcp://broker.emqx.io:1883/")
	broker.Validator = validation.NewRegexp(`(tcp|ws)://[a-z0-9-._-]+:\d+/`, "not a valid broker address")

The place holder is just a greyed out text in the background that is there to guide the user into what can be expected in that entry. The regexp is a classic regexp and the form will only be valid once all the validator passes (there are more validators available and you can write custom one too). This means that normally there isn’t much validation to be done once the form is submitted.

The step is what do we do once the user submits the connection form. In this case, we want to connect to the MQTT broker. There is nothing very different from any go program connecting to a MQTT broker, you can look here for a more in depth tutorial just on MQTT in Go or here for the source of the MQTT Go library. Once the connection is set up we need to let the user know what is going on as we wait for it to establish. To do this we create a standby dialog with an infinite progress bar as we do not know how long things are going to take.

func (app *application) standbyDialogShow(broker string) (dialog.Dialog, *widget.Label) {
	info := widget.NewLabel("Connecting to MQTT broker: " + broker)
	infinite := widget.NewProgressBarInfinite()
	infinite.Start()

	container := container.NewVBox(container.NewCenter(info), infinite)

	d := dialog.NewCustom("Setting up MQTT connection", "Cancel", container, app.window)
	d.Show()

	return d, action
}

This function is straightforward – we first create the label that will display the current action information and create an infinite progress bar. This is then packed into one vertical box which is used as the content for a dialog with just a Cancel button.

Before we jump into the difficult bit (establishing the asynchronous connection) let’s look at some helper function that are going to be useful to understand the process.

A complex task that we will have to repeat at pretty much every step of our process is to wait on an MQTT token result and yet allow user cancellation. This is done by waiting on a Done channel when trying to do anything asynchronous with the mqtt library. We also use a channel to notify cancellation from the standby dialog. This makes it simple to use the following function to handle wait during different operations:

func (app *application) waitCancelOrStepSuccess(token mqtt.Token, d dialog.Dialog, card *weatherCard) bool {
	select {
	case <-card.cancel:
		card.stop = true

	case <-token.Done():
		if token.Error() != nil {
			card.stop = true
		}
	}

	if card.stop {
		app.card.stopMqtt(d)

		app.connectionDialogShow()
	}

	return true
}

The main trick here is that their is two case where we will have to go back to the connection dialog (when the user clicked the cancel button on the standby dialog or the token notifies us of an error). Both of them are notified via a channel which make it easy to select on both and take action as necessary. As well as displaying the connection dialog, we should remember to close cleanly the MQTT connection.

Now that we have our helper function, let’s look at the center piece: asynchronousConnect in server.go which is more understandable with all the comments.

func (app *application) asynchronousConnect(d dialog.Dialog, standbyAction *widget.Label, broker string) {
	// Setup action on dialog close action
	d.SetOnClosed(func() {
		if !app.card.stop {
			app.card.cancel <- struct{}{}
		}
	})

	// Connect to MQTT and wait on either user cancel or success
	if !app.waitCancelOrStepSuccess(app.card.client.Connect(), d, app.card) {
		return
	}

	topicMatch := regexp.MustCompile(`homeassistant/sensor/weatherflow2mqtt_ST-(\d+)/status/attributes`)

	standbyAction.SetText("Waiting for MQTT sensor identification.")

	// Subscribe to a topic that will give us the serial number of a Tempest weather station
	token := app.card.client.Subscribe("homeassistant/sensor/+/status/attributes", 1, func(client mqtt.Client, msg mqtt.Message) {
		r := topicMatch.FindStringSubmatch(msg.Topic())
		if len(r) == 0 {
			return
		}

		// Stop looking for any additional serial number
		app.card.client.Unsubscribe("homeassistant/sensor/+/status/attributes")

		standbyAction.SetText("Waiting for first MQTT data.")

		// Connect the MQTT session to the widget
		json, err := app.card.connectWeather2Mqtt(r[1])
		if err != nil {
			app.card.stopMqtt(d)

			app.connectionDialogShow()
			return
		}

		// Wait for the first valid live data to arrive
		var listener binding.DataListener

		listener = binding.NewDataListener(func() {
			if json.IsEmpty() {
				return
			}

			json.RemoveListener(listener)

			app.card.stop = true
			close(app.card.cancel)

			app.card.Enable()
			app.card.action.SetText("Disconnect")

			d.Hide()
		})

		json.AddListener(listener)

		// This goroutine wait for the chanel to notify a cancellation or to be close as a synchronization point.
		go func() {
			<-app.card.cancel

			if !app.card.stop {
				app.card.stopMqtt(nil)

				app.connectionDialogShow()
			}
		}()
	})
	// Wait for Subscribe to successful set up or user cancel
	app.waitCancelOrStepSuccess(token, d, app.card)
}

This function shows things in almost a linear way even if it is asynchronous in nature. It sets up the cancel dialog action, connects and waits for the mqtt session to establish. It then sets up an MQTT subscribtion to a topic that will let us know the serial number of a Tempest weather station. We only wait for the first response before unsubscribing from the topic. Once we know that serial number we can connect the widget using data binding to the live stream. We still need to wait for the first live data to arrive because MQTT doesn’t always provide a stored value. This is why we call AddListener on the JSON data binding. That listener just waits for the JSON data to not be empty, once we have live data to display we can simlply hide the overlay dialog.

The last step of the subscribe callback is to handle disconnection. We use a goroutine for that to not block the processing of further MQTT topics and just wait on the cancel channel. That channel is either notified by the user cancelling the action in the progress dialog or by the listener closing the channel.

So far, we have not discussed how we get the data displayed and synchronized with the Fyne widgets. Let’s look at that magic function connectWeather2Mqtt in weather.go.

func (card *weatherCard) connectWeather2Mqtt(serial string) (xbinding.JSONValue, error) {
	mqtt, err := xbinding.NewMqttString(card.client, "homeassistant/sensor/weatherflow2mqtt_ST-"+serial+"/observation/state")
	if err != nil {
		return nil, err
	}
	json, err := xbinding.NewJSONFromDataString(mqtt)
	if err != nil {
		return nil, err
	}
	temperature, err := json.GetItemFloat("air_temperature")
	if err != nil {
		return nil, err
	}

	temperatureFeel, err := json.GetItemFloat("feelslike")
	if err != nil {
		return nil, err
	}

	temperatureLabel, err := binding.NewSprintf("%.2f°C, feels like %.2f°C", temperature, temperatureFeel)
	if err != nil {
		return nil, err
	}

	humidity, err := json.GetItemFloat("relative_humidity")
	if err != nil {
		return nil, err
	}
	humidityLabel := binding.FloatToStringWithFormat(humidity, "%.1f%%")

	pressure, err := json.GetItemString("pressure_trend")
	if err != nil {
		return nil, err
	}

	windSpeed, err := json.GetItemFloat("wind_speed")
	if err != nil {
		return nil, err
	}

	windBurst, err := json.GetItemFloat("wind_gust")
	if err != nil {
		return nil, err
	}

	windDirection, err := json.GetItemFloat("wind_direction")
	if err != nil {
		return nil, err
	}

	windLabel, err := binding.NewSprintf("%.2f kph (%.2f kph) from %.2f°", windSpeed, windBurst, windDirection)
	if err != nil {
		return nil, err
	}

	uv, err := json.GetItemString("uv_description")
	if err != nil {
		return nil, err
	}

	rain, err := json.GetItemString("rain_intensity")
	if err != nil {
		return nil, err
	}

	card.temperature.Bind(temperatureLabel)
	card.humidity.Bind(humidityLabel)
	card.pressure.Bind(pressure)
	card.wind.Bind(windLabel)
	card.uv.Bind(uv)
	card.rain.Bind(rain)

	return json, nil
}

The function is straightforward thanks mostly to the magic of the data binding that do all the work for us, though we do need to handle many possible errors. We first connect a data binding to the correct MQTT topic for our case. This provides us with a String data binding that we pass to a JSON data binding. This second binding will parse the string every time it receive a new string and automatically notify all the bound items that are linked to it. All the GetItemFloat and GetItemString are used to set up those connections from the JSON object to a widget. We also use the Sprintf data binding, which allow to build entire sentence that get update when any of the source data binding change. Finally all of the data bindings are connected to the widget and they will automatically update when a new message arrive on the MQTT topic.

All the difficult work of synchronisation is done for us by the data binding. This really helps to keep the application simple and requires only around 400 lines of code. It is not difficult to write your own data binding as the API provides a clean pattern to fetch and write back data asynchronously. Maybe the content for a follow up blog article.

This was really a quick introduction to a lot of topic. If you want to learn more about Fyne, I can really recommend the documentation as a starting point. I also haven’t even described the setup of the hardware and software used to make this demo possible. I wonder if people would be interested by a video showing the entire things, so let us know what you are interested to see here next.

0 Comments

Leave a Reply

Previous Article

Next Article

%d bloggers like this: