Connect MQTT client to database and implement basic JSON parsing

master
esensar 2018-05-02 21:05:30 +02:00
parent d82d46109b
commit 1dea242bce
2 changed files with 45 additions and 11 deletions

View File

@ -18,12 +18,13 @@ class Recording(db.Model):
def __init__(self, device_id, record_type, def __init__(self, device_id, record_type,
record_value, recorded_at, raw_json): record_value, recorded_at, raw_json):
self.device_id = device_id self.device_id = int(device_id)
self.record_type = record_type self.record_type = int(record_type)
self.record_value = record_value self.record_value = str(record_value)
self.recorded_at = recorded_at self.recorded_at = datetime.fromtimestamp(int(recorded_at))
self.received_at = datetime.utcnow() self.received_at = datetime.utcnow()
self.raw_record = raw_json self.raw_record = raw_json
def __repr__(self): def __repr__(self):
return '<Recording %r>' % self.recorded_at return '<Recording (value=%s, recorded_at=%s)>' % (
self.record_value, self.recorded_at)

View File

@ -1,4 +1,5 @@
from datetime import datetime import sys
import json
from flask_mqtt import Mqtt from flask_mqtt import Mqtt
from .models import Recording from .models import Recording
from . import db from . import db
@ -24,7 +25,7 @@ def tear_down_mqtt():
@mqtt.on_connect() @mqtt.on_connect()
def handle_connect(client, userdata, flags, rc): def handle_connect(client, userdata, flags, rc):
print('MQTT client connected') print('MQTT client connected')
mqtt.subscribe('topic/state') mqtt.subscribe('device/+')
@mqtt.on_disconnect() @mqtt.on_disconnect()
@ -37,7 +38,39 @@ def handle_subscribe(client, userdata, mid, granted_qos):
def handle_mqtt_message(client, userdata, message): def handle_mqtt_message(client, userdata, message):
recording = Recording(1, 1, "315", datetime.utcnow(), "{}") print("Received message!")
db.session.add(recording) print("Topic: " + message.topic)
db.session.commit() print("Payload: " + message.payload.decode())
print(message.payload.decode()) try:
# If type is JSON
recording = parse_json_message(message.topic, message.payload.decode())
db.session.add(recording)
db.session.commit()
print(recording)
except ValueError:
print("ERROR!")
error_type, error_instance, traceback = sys.exc_info()
print("Type: " + str(error_type))
print("Instance: " + str(error_instance))
return
def parse_json_message(topic, payload) -> Recording:
try:
json_msg = json.loads(payload)
device_id = get_device_id(topic)
return Recording(device_id, json_msg["record_type"],
json_msg["record_value"], json_msg["recorded_at"],
json_msg)
except KeyError:
error_type, error_instance, traceback = sys.exc_info()
raise ValueError("JSON parsing failed! Key error: "
+ str(error_instance))
def get_device_id(topic) -> int:
device_token, device_id = topic.split("/")
if device_token == "device":
return int(device_id)
else:
raise ValueError("Topic is in invalid format")