university-final-iot-backend/app/api/resources/device.py

154 lines
5.0 KiB
Python
Raw Normal View History

from flask_restful import abort
2018-05-08 14:45:09 +00:00
from marshmallow import Schema, fields
from webargs.flaskparser import use_args
from flasgger import swag_from
2018-09-19 21:31:21 +00:00
from flask import g, request
2018-05-08 14:45:09 +00:00
import app.devices as devices
from app.api import ProtectedResource
2018-09-17 22:51:26 +00:00
class DeviceTypeSchema(Schema):
id = fields.Integer(dump_only=True)
name = fields.Str(required=True)
2018-05-08 14:45:09 +00:00
class DeviceSchema(Schema):
id = fields.Integer(dump_only=True)
name = fields.Str(required=True)
2018-09-17 22:51:26 +00:00
device_type = fields.Nested(DeviceTypeSchema, dump_only=True)
device_type_id = fields.Integer(load_only=True, missing=1)
2018-05-08 14:45:09 +00:00
2018-09-19 22:23:22 +00:00
class DeviceWithConfigurationSchema(DeviceSchema):
configuration = fields.Raw(dump_only=True)
2018-05-08 14:45:09 +00:00
class DeviceWrapperSchema(Schema):
2018-09-19 22:23:22 +00:00
device = fields.Nested(DeviceWithConfigurationSchema,
required=True, location='json')
2018-05-08 14:45:09 +00:00
2018-09-16 20:50:54 +00:00
2018-08-24 12:32:31 +00:00
class DevicesWrapperSchema(Schema):
devices = fields.Nested(DeviceSchema, required=True,
2018-09-16 20:50:54 +00:00
location='json', many=True)
2018-05-08 14:45:09 +00:00
2018-09-17 22:51:26 +00:00
class DeviceTypeWrapperSchema(Schema):
device_type = fields.Nested(DeviceTypeSchema, required=True,
location='json')
class DeviceTypesWrapperSchema(Schema):
device_types = fields.Nested(DeviceTypeSchema, required=True,
location='json', many=True)
2018-05-08 14:45:09 +00:00
class RecordingsSchema(Schema):
recorded_at = fields.DateTime()
record_type = fields.Integer()
record_value = fields.String()
class RecordingsWrapperSchema(Schema):
recordings = fields.Nested(RecordingsSchema, required=True,
location='json', many=True)
def validate_device_ownership(device_id):
if not devices.can_user_access_device(g.current_account.id, device_id):
abort(403, message='You are not allowed to access this device',
status='error')
2018-05-08 14:45:09 +00:00
class DeviceResource(ProtectedResource):
@swag_from('swagger/get_device_spec.yaml')
def get(self, device_id):
validate_device_ownership(device_id)
2018-05-08 14:45:09 +00:00
return DeviceWrapperSchema().dump(
{'device': devices.get_device(device_id)}), 200
2018-09-16 20:50:54 +00:00
@swag_from('swagger/delete_device_spec.yaml')
def delete(self, device_id):
validate_device_ownership(device_id)
2018-09-16 20:50:54 +00:00
devices.delete_device(device_id)
return '', 204
2018-05-08 14:45:09 +00:00
2018-09-17 22:51:26 +00:00
class DeviceTypeResource(ProtectedResource):
@swag_from('swagger/get_device_type_spec.yaml')
def get(self, device_type_id):
return DeviceTypeWrapperSchema().dump(
{'device_type': devices.get_device_type(device_type_id)}), 200
class DeviceTypeListResource(ProtectedResource):
@use_args(DeviceTypeWrapperSchema())
@swag_from('swagger/create_device_type_spec.yaml')
def post(self, args):
if g.current_account.role_id != 1:
abort(403, message='Only admin may create device types',
status='error')
2018-09-17 22:51:26 +00:00
args = args['device_type']
success = devices.create_device_type(
args['name'])
if success:
return '', 201
@swag_from('swagger/get_device_types_spec.yaml')
def get(self):
return DeviceTypesWrapperSchema().dump(
{'device_types': devices.get_device_types()}), 200
2018-05-08 14:45:09 +00:00
class DeviceRecordingResource(ProtectedResource):
@swag_from('swagger/get_device_recordings_spec.yaml')
def get(self, device_id):
validate_device_ownership(device_id)
request_args = request.args
2018-05-08 14:45:09 +00:00
return RecordingsWrapperSchema().dump(
{'recordings':
devices.get_device_recordings_filtered(
device_id,
request_args.get('record_type'),
request_args.get('start_date'),
request_args.get('end_date'))}), 200
2018-05-08 14:45:09 +00:00
2018-09-20 22:16:01 +00:00
@swag_from('swagger/create_device_recording_spec.yaml')
def post(self, device_id):
validate_device_ownership(device_id)
success = devices.create_recording(device_id, request.json)
if success:
return '', 201
2018-05-08 14:45:09 +00:00
class DeviceListResource(ProtectedResource):
@use_args(DeviceWrapperSchema())
@swag_from('swagger/create_device_spec.yaml')
def post(self, args):
args = args['device']
success = devices.create_device(
2018-08-24 12:32:31 +00:00
args['name'],
g.current_account.id,
args['device_type_id'])
2018-05-08 14:45:09 +00:00
if success:
return '', 201
2018-08-24 12:32:31 +00:00
@swag_from('swagger/get_devices_spec.yaml')
def get(self):
return DevicesWrapperSchema().dump(
{'devices': devices.get_devices(g.current_account.id)}), 200
2018-09-19 21:31:21 +00:00
class DeviceConfigurationResource(ProtectedResource):
@swag_from('swagger/update_device_configuration_spec.yaml')
def put(self, device_id):
validate_device_ownership(device_id)
2018-09-19 21:31:21 +00:00
success = devices.set_device_configuration(device_id, request.json)
if success:
return '', 204
@swag_from('swagger/get_device_configuration_spec.yaml')
def get(self, device_id):
validate_device_ownership(device_id)
2018-09-19 21:31:21 +00:00
return devices.get_device_configuration(device_id), 200