Protect devices from unauthorized access
parent
6cee752f2b
commit
6f3826f006
|
@ -1,3 +1,4 @@
|
||||||
|
from flask_restful import abort
|
||||||
from marshmallow import Schema, fields
|
from marshmallow import Schema, fields
|
||||||
from webargs.flaskparser import use_args
|
from webargs.flaskparser import use_args
|
||||||
from flasgger import swag_from
|
from flasgger import swag_from
|
||||||
|
@ -53,14 +54,22 @@ class RecordingsWrapperSchema(Schema):
|
||||||
location='json', many=True)
|
location='json', many=True)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_device_ownership(device_id):
|
||||||
|
if not devices.can_user_access_device(g.current_account.id, device_id):
|
||||||
|
abort(403, message='You are not allowed to access this device',
|
||||||
|
status='error')
|
||||||
|
|
||||||
|
|
||||||
class DeviceResource(ProtectedResource):
|
class DeviceResource(ProtectedResource):
|
||||||
@swag_from('swagger/get_device_spec.yaml')
|
@swag_from('swagger/get_device_spec.yaml')
|
||||||
def get(self, device_id):
|
def get(self, device_id):
|
||||||
|
validate_device_ownership(device_id)
|
||||||
return DeviceWrapperSchema().dump(
|
return DeviceWrapperSchema().dump(
|
||||||
{'device': devices.get_device(device_id)}), 200
|
{'device': devices.get_device(device_id)}), 200
|
||||||
|
|
||||||
@swag_from('swagger/delete_device_spec.yaml')
|
@swag_from('swagger/delete_device_spec.yaml')
|
||||||
def delete(self, device_id):
|
def delete(self, device_id):
|
||||||
|
validate_device_ownership(device_id)
|
||||||
devices.delete_device(device_id)
|
devices.delete_device(device_id)
|
||||||
return '', 204
|
return '', 204
|
||||||
|
|
||||||
|
@ -76,6 +85,9 @@ class DeviceTypeListResource(ProtectedResource):
|
||||||
@use_args(DeviceTypeWrapperSchema())
|
@use_args(DeviceTypeWrapperSchema())
|
||||||
@swag_from('swagger/create_device_type_spec.yaml')
|
@swag_from('swagger/create_device_type_spec.yaml')
|
||||||
def post(self, args):
|
def post(self, args):
|
||||||
|
if g.current_account.role_id != 1:
|
||||||
|
abort(403, message='Only admin may create device types',
|
||||||
|
status='error')
|
||||||
args = args['device_type']
|
args = args['device_type']
|
||||||
success = devices.create_device_type(
|
success = devices.create_device_type(
|
||||||
args['name'])
|
args['name'])
|
||||||
|
@ -91,6 +103,7 @@ class DeviceTypeListResource(ProtectedResource):
|
||||||
class DeviceRecordingResource(ProtectedResource):
|
class DeviceRecordingResource(ProtectedResource):
|
||||||
@swag_from('swagger/get_device_recordings_spec.yaml')
|
@swag_from('swagger/get_device_recordings_spec.yaml')
|
||||||
def get(self, device_id):
|
def get(self, device_id):
|
||||||
|
validate_device_ownership(device_id)
|
||||||
return RecordingsWrapperSchema().dump(
|
return RecordingsWrapperSchema().dump(
|
||||||
{'recordings': devices.get_device_recordings(device_id)}), 200
|
{'recordings': devices.get_device_recordings(device_id)}), 200
|
||||||
|
|
||||||
|
@ -116,10 +129,12 @@ class DeviceListResource(ProtectedResource):
|
||||||
class DeviceConfigurationResource(ProtectedResource):
|
class DeviceConfigurationResource(ProtectedResource):
|
||||||
@swag_from('swagger/update_device_configuration_spec.yaml')
|
@swag_from('swagger/update_device_configuration_spec.yaml')
|
||||||
def put(self, device_id):
|
def put(self, device_id):
|
||||||
|
validate_device_ownership(device_id)
|
||||||
success = devices.set_device_configuration(device_id, request.json)
|
success = devices.set_device_configuration(device_id, request.json)
|
||||||
if success:
|
if success:
|
||||||
return '', 204
|
return '', 204
|
||||||
|
|
||||||
@swag_from('swagger/get_device_configuration_spec.yaml')
|
@swag_from('swagger/get_device_configuration_spec.yaml')
|
||||||
def get(self, device_id):
|
def get(self, device_id):
|
||||||
|
validate_device_ownership(device_id)
|
||||||
return devices.get_device_configuration(device_id), 200
|
return devices.get_device_configuration(device_id), 200
|
||||||
|
|
|
@ -96,6 +96,21 @@ def get_device(device_id):
|
||||||
return Device.get(id=device_id)
|
return Device.get(id=device_id)
|
||||||
|
|
||||||
|
|
||||||
|
def can_user_access_device(account_id, device_id):
|
||||||
|
"""
|
||||||
|
Checks if user with given account_id can access device with given device_id
|
||||||
|
|
||||||
|
:param account_id: Id of account
|
||||||
|
:param device_id: Id of device
|
||||||
|
:type account_id: int
|
||||||
|
:type device_id: int
|
||||||
|
:returns: true if device is accessible by this account, false otherwise
|
||||||
|
:rtype: Boolean
|
||||||
|
"""
|
||||||
|
return len(DeviceAssociation.get_many(account_id=account_id,
|
||||||
|
device_id=device_id)) > 0
|
||||||
|
|
||||||
|
|
||||||
def get_device_type(device_type_id):
|
def get_device_type(device_type_id):
|
||||||
"""
|
"""
|
||||||
Tries to get device type with given parameters. Raises error on failure
|
Tries to get device type with given parameters. Raises error on failure
|
||||||
|
|
Loading…
Reference in New Issue