venerdì 28 settembre 2018

MQTT - MQ Telemetry Transport

Non è proprio un argomento specifico system-i ma rimane un interessante spunto.

Chi conosce MQ difficlmente avrà sentito parlare di MQTT, un protocollo publish/subscribe creato quasi apposta per l'IoT.
L'architettura tipo:

client1-+                                 +--| subscriber 1
        |                                 |
client2-+---(publish)--|SERVER MQTT|------+--| subscriber1 2
        |                                 |
clientn-+                                 +--|
                                          |
                                          +--|


Un client è qualsiasi dispositivo (da un microcontroller foìino ad un super-server) che esegue una libreria MQTT e si connette ad un server MQTT tramite rete

uno o più client che pubblicano informazioni si connettono ad un server in modalità [publisher]
uno o più client che vogliono ricevere le informazioni si connettono in modalità [subscriber]
lo stesso client può essere al contempo publisher e subscriber

immaginiamo per esempio uno scenario applicativo
i client sono contemporaneamente pub/sub
un client invia un messaggio ed i sottoscrittori lo possono vedere
un sottoscrittore risponde e gli altri vedono la risposta
ricorda qualcosa? per esempio Facebook Messenger(a)  (no, Whats'app(b) usa un altra tecnologia)

I client possono interagire col server MQQ tramite vari linguaggi, non ultimo java (c).

Attualmente il MQ Telemetry Server non è disponibile su system-i (d)  ma il mondo open fornisce valide soluzioni
Va però detto che esistono diversi server da poter installare in-house o utilizzabili come servizio in cloud

per il mio POC  ho utilizzato un server in cloud [http://test.mosquitto.org] disponibile appunto per i test ed un client per le verifiche su quanto inviato al server [MQTT.fx]

Un client Java invia json contenti l'equivalente di un record al Broker (serverMQTT), utilizzando librerie standard: org.eclipse.paho.client.mqttv3-1.1.1.jar

==
            IMqttClient publisher = new MqttClient("tcp://test.mosquitto.org:1883", publisherId);
           
            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
            publisher.connect(options);
            System.out.println("Connected");
            String message = "{\n" +
                    "    \"name\": \"Gaetano\",\n" +
                    "    \"surname\": \"Donizetti\",\n" +
                    "    \"active\": true,\n" +
                    "    \"favoriteNumber\": 42,\n" +
                    "    \"birthday\": {\n" +
                    "        \"day\": 1,\n" +
                    "        \"month\": 1,\n" +
                    "        \"year\": 2000\n" +
                    "    },\n" +
                    "    \"languages\": [ \"it\", \"en\" ]\n" +
                    "}";
            publisher.publish(
                "Clienti", // topic
                message.getBytes(), // payload
                2, // QoS
                true); // retained

==
caratteristiche principali dei messaggi, oltre ai contenuti (che possono essere di qualsiasi tipo, ho usato json) sono (e)
topic: è l'equivalente di una coda
QoS: 0 - At most once delivery
   : 1 - At least Once Delivery
   : 2 - Exactly once delivery
Retain: Persistenza del messaggio (f)





ho disegnato quindi un client - sempre in java - su system-i in ascolto sul server MQTT (è il server che effettua push dei messaggi)

Poche righe: si dichiara il server da interrogare e una callback function che si attiva ogni volta che si riceve un messaggio


            MqttClient client=new MqttClient("tcp://test.mosquitto.org:1883", MqttClient.generateClientId());
            client.setCallback(new SimpleMqttCallBack());
            client.connect();
            client.subscribe("Clienti", 2);

ad ogni messaggio vioene quindi invocata la SimpleMqttCallBack che provvede a
effettuare il parse del json mappandolo in un pojo
lanciare l'inserimento dell'oggetto pojo come riga di database


==
public class SimpleMqttCallBack implements MqttCallback {

      public void connectionLost(Throwable throwable) {
        System.out.println("Connection to MQTT broker lost!");
      }
      @Override
      public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        Gson gson = new Gson();
        Person person = gson.fromJson(new String(mqttMessage.getPayload()), Person.class);
        System.out.println(person);
        // insert into DB Table
        SaveData sd = new SaveData();
        sd.insertRecord(person.getName(), person.getSurname());
       }
      @Override
      public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        // not used
      }
    }





(a) http://mqtt.org/2011/08/mqtt-used-by-facebook-messenger
(b) https://www.ictsecuritymagazine.com/articoli/whatsapp-forensics/
(c) https://www.hivemq.com/blog/mqtt-essentials-part-3-client-broker-connection-establishment

(d) https://www.ibm.com/software/reports/compatibility/clarity-reports/report/html/softwareReqsForProductByComponent?deliverableId=1350550241693&duComponent=Server_568CF0B01AEF11E38F7D228059AF1569)
(e) https://www.ibm.com/developerworks/community/blogs/aimsupport/entry/what_is_mqtt_and_how_does_it_work_with_websphere_mq?lang=en
(f) https://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

Nessun commento:

Posta un commento