Publishing and Subscribing with Halcyon
As you recall, Halcyon is multiplatform XMPP library written in Kotlin. In a previous article: “A look at Halcyon” we had a look at basic concepts in library and we created a simple client.
This time we will dive into more complex stuff. We will create simple solution to monitoring temperature at home :-) In this article we will not focus on measuring temperature. We will create a command-line tool to publish temperature provided as parameter.
First letter in XMPP acronym is from the word “eXtensible”. There is a lot of extensions for the XMPP protocol. One of them is XEP-0060: Publish-Subscribe - specification for publish-subscribe functionality. We will use it to create our temperature monitor.
You need to use XMPP Server with PubSub component. You can use your deployment (for example Tigase XMPP Server or use one of the publicly available servers, for example sure.im
and its PubSub component pubsub.sure.im
. A PubSub node with unique name (to avoid conflicts) will have to be created in the PubSub component. Please note that node created with default configuration is open, which means that everyone can subscribe to it (but only you will be able to publish data there).
Data structure
First of all we have to create data structure. In our case, it will be as simple as possible:
<temperaturetimestamp="1597946187562">23.8</temperature>
timestamp
is time represented as a number of milliseconds after January 1, 1970 00:00:00 GMT.
We can use DSL (defined in Halcyon) to create such XML fragment:
val payload =element("temperature"){ attributes["timestamp"]=(Date()).time.toString()+temperature.toString()}
Publisher
Publisher is a simple XMPP client that connects to the server, sends information to PubSub component and immediately disconnects.
First of all, lets define global values to keep node name and PubSUB JID:
val PUBSUB_JID ="pubsub.tigase.org".toJID()val PUBSUB_NODE ="temperature_in_my_house"
It cannot be called a good practice, but is good enough for us right now :-)
In the previous article we explained how to create a simple client. Now we will focus on PubSubModule
. This module allows publishing and receiving events as well as managing PubSub nodes and subscriptions.
This is the main code that publishes events:
pubSubModule.publish(PUBSUB_JID, PUBSUB_NODE,null, payload).handle{ success { request, iq, result ->println("YAY! Published with id=${result!!.id}")} error { request, iq, errorCondition, s -> System.err.println("ERROR $errorCondition! $s")}}.send()
But what if the PubSub node doesn’t exist (e.g. it wasn’t created yet)? It’s simple: we have to create it using method create()
:
pubSubModule.create(PUBSUB_JID, PUBSUB_NODE).handle{ success { _: IQRequest<Unit>, _: IQ, _: Unit?->println("Got it! Node created!")} error { _: IQRequest<PubSubModule.PublishingInfo>, _: IQ?, errorCondition: ErrorCondition, msgs: String?->println("OOPS! Cannot create node $errorCondition$msgs")}}.send()
The question is: under what conditions we should call this part of code and automatically create the node? One of the possibilities would be moment when item publishing fails with error item-not-found
.
pubSubModule.publish(PUBSUB_JID, PUBSUB_NODE,null, payload).handle{ success { request, iq, result ->println("YAY! Published with id=${result!!.id}")} error { request, iq, errorCondition, s ->if(errorCondition == ErrorCondition.ItemNotFound){println("Node not found! We need to create it!") pubSubModule.create(PUBSUB_JID, PUBSUB_NODE).handle{ success { _: IQRequest<Unit>, _: IQ, _: Unit?->println("Got it! Node created!")} error { _: IQRequest<PubSubModule.PublishingInfo>, _: IQ?, errorCondition: ErrorCondition, msgs: String?->println("OOPS! Cannot create node $errorCondition$msgs")}}.send()}else System.err.println("ERROR $errorCondition! $s")}}.send()
To simplify the code, publishing will not be repeated after node creation.
It is good to use client.waitForAllResponses()
before disconnect()
, to not break connection before all responses comes back.
Listener
Listener is also a client (it should works on different account) that subscribes to receiving events from specific nodes of PubSub component. PubSub items received by PubSubModule
are distributed in the client as PubSubEventReceivedEvent
in Event Bus. To receive those events you have to register an events listener:
client.eventBus.register<PubSubEventReceivedEvent>(PubSubEventReceivedEvent.TYPE){if(it.pubSubJID == PUBSUB_JID && it.nodeName == PUBSUB_NODE){ it.items.forEach{ item ->val publishedContent = item.getFirstChild("temperature")!!val date =Date(publishedContent.attributes["timestamp"]!!.toLong())val value = publishedContent.value!!println("Received update: $date :: $value°C")}}}
Note, that this listener will be called on every received PubSub event (like OMEMO keys distribution, PEP events, etc). That’s why you need to check node name and JabberID of PubSub component.
Your client will not receive anything from PubSub if it does not subscribe to specific node. Because subscription is persistent (at least with default node configuration), client doesn’t need to subscribe every time it connects to the server. Though, it should be able to check if it’s subscribed to the specific node or not. For that, you need to retrieve list of subscribers and see if the JabberID of the client is on the list:
val myOwnJID = client.getModule<BindModule>(BindModule.TYPE)!!.boundJID!!pubSubModule.retrieveSubscriptions(PUBSUB_JID, PUBSUB_NODE).response{if(!it.get()!!.any{ subscription -> subscription.jid.bareJID == myOwnJID.bareJID }){println("We have to subscribe") pubSubModule.subscribe(PUBSUB_JID, PUBSUB_NODE, myOwnJID).send()}}.send()
NOTE: In this example we intentionally skipped checking response errors.
PubSub component can keep some history of published elements. We can retrieve that list easily:
pubSubModule.retrieveItem(PUBSUB_JID, PUBSUB_NODE).response{when(it){is IQResult.Success ->{println("Previously published temperatures:") it.get()!!.items.forEach{val date =Date(it.content!!.attributes["timestamp"]!!.toLong())val value = it.content!!.value!!println(" - $date :: $value°C")}}is IQResult.Error ->println("OOPS! Error "+ it.error)}}.send()
Length of the history is defined in node configuration.
Sample output
Submitting new temperature in Publisher
…:
yields receiving notifications in Listener
:
Summary
We presented a simple way to create a PubSub publisher and consumer. You can extend it: for example you can run publisher on Raspberry Pi connected to some meteo-sensors. Possible applications of PubSub component are limited only by your imagination.
All source codes for this article can be found in GitHub repository .