From 4dbba2b44c25bcb232e71fd24116872d90a11615 Mon Sep 17 00:00:00 2001 From: esensar Date: Thu, 3 May 2018 22:30:34 +0200 Subject: [PATCH] Encapsulate Mqtt client in a class --- app/mod_devices/__init__.py | 7 +- app/mod_devices/mqtt_client.py | 122 +++++++++++++++++---------------- 2 files changed, 67 insertions(+), 62 deletions(-) diff --git a/app/mod_devices/__init__.py b/app/mod_devices/__init__.py index b7f6d07..5d85860 100644 --- a/app/mod_devices/__init__.py +++ b/app/mod_devices/__init__.py @@ -1,13 +1,14 @@ import atexit from flask import Blueprint -from .mqtt_client import tear_down_mqtt, setup_mqtt +from .mqtt_client import MqttClient devices = Blueprint('devices', __name__) +mqtt_client = MqttClient() # When app dies, stop mqtt connection def on_stop(): - tear_down_mqtt() + mqtt_client.tear_down() atexit.register(on_stop) @@ -21,4 +22,4 @@ def hello(): @devices.record def on_blueprint_setup(setup_state): - setup_mqtt(setup_state.app) + mqtt_client.setup(setup_state.app) diff --git a/app/mod_devices/mqtt_client.py b/app/mod_devices/mqtt_client.py index 52b92cc..59ce95d 100644 --- a/app/mod_devices/mqtt_client.py +++ b/app/mod_devices/mqtt_client.py @@ -4,74 +4,78 @@ from flask_mqtt import Mqtt from .models import Recording from app import db, app -mqtt = Mqtt() +class MqttClient: + def __init__(self): + self.mqtt = Mqtt() + self.initialized = False -# Mqtt setup -def setup_mqtt(app): - mqtt.init_app(app) - mqtt.client.on_message = handle_mqtt_message - mqtt.client.on_subscribe = handle_subscribe - print('MQTT client initialized') + # Mqtt setup + def setup(self, app): + if not self.initialized: + self.mqtt.init_app(app) + self.mqtt.client.on_message = self.handle_mqtt_message + self.mqtt.client.on_subscribe = self.handle_subscribe + initialized = True + + @self.mqtt.on_connect() + def handle_connect(client, userdata, flags, rc): + print('MQTT client connected') + self.mqtt.subscribe('device/+') + + @self.mqtt.on_disconnect() + def handle_disconnect(): + print('MQTT client disconnected') + + print('MQTT client initialized') -def tear_down_mqtt(): - mqtt.unsubscribe_all() - if hasattr(mqtt, 'client') and mqtt.client is not None: - mqtt.client.disconnect() - print('MQTT client destroyed') + def tear_down(self): + self.mqtt.unsubscribe_all() + if hasattr(self.mqtt, 'client') and self.mqtt.client is not None: + self.mqtt.client.disconnect() + print('MQTT client destroyed') -@mqtt.on_connect() -def handle_connect(client, userdata, flags, rc): - print('MQTT client connected') - mqtt.subscribe('device/+') + def handle_subscribe(self, client, userdata, mid, granted_qos): + print('MQTT client subscribed') -@mqtt.on_disconnect() -def handle_disconnect(): - print('MQTT client disconnected') + def handle_mqtt_message(self, client, userdata, message): + print("Received message!") + print("Topic: " + message.topic) + print("Payload: " + message.payload.decode()) + try: + # If type is JSON + recording = self.parse_json_message(message.topic, message.payload.decode()) + with app.app_context(): + recording.save() + except ValueError: + print("ERROR!") + error_type, error_instance, traceback = sys.exc_info() + print("Type: " + str(error_type)) + print("Instance: " + str(error_instance)) + return -def handle_subscribe(client, userdata, mid, granted_qos): - print('MQTT client subscribed') + def parse_json_message(self, topic, payload) -> Recording: + try: + json_msg = json.loads(payload) + device_id = self.get_device_id(topic) + return Recording(device_id=device_id, + record_type=json_msg["record_type"], + record_value=json_msg["record_value"], + recorded_at=json_msg["recorded_at"], + raw_json=json_msg) + except KeyError: + error_type, error_instance, traceback = sys.exc_info() + raise ValueError("JSON parsing failed! Key error: " + + str(error_instance)) -def handle_mqtt_message(client, userdata, message): - print("Received message!") - print("Topic: " + message.topic) - print("Payload: " + message.payload.decode()) - try: - # If type is JSON - recording = parse_json_message(message.topic, message.payload.decode()) - with app.app_context(): - recording.save() - except ValueError: - print("ERROR!") - error_type, error_instance, traceback = sys.exc_info() - print("Type: " + str(error_type)) - print("Instance: " + str(error_instance)) - return - - -def parse_json_message(topic, payload) -> Recording: - try: - json_msg = json.loads(payload) - device_id = get_device_id(topic) - return Recording(device_id=device_id, - record_type=json_msg["record_type"], - record_value=json_msg["record_value"], - recorded_at=json_msg["recorded_at"], - raw_json=json_msg) - except KeyError: - error_type, error_instance, traceback = sys.exc_info() - raise ValueError("JSON parsing failed! Key error: " - + str(error_instance)) - - -def get_device_id(topic) -> int: - device_token, device_id = topic.split("/") - if device_token == "device": - return int(device_id) - else: - raise ValueError("Topic is in invalid format") + def get_device_id(self, topic) -> int: + device_token, device_id = topic.split("/") + if device_token == "device": + return int(device_id) + else: + raise ValueError("Topic is in invalid format")