Source code for velbus.modules.vmb4ry

"""
:author: Thomas Delaet <thomas@delaet.org>
"""
from velbus.module import Module
from velbus.module_registry import register_module
from velbus.messages.switch_relay_off import SwitchRelayOffMessage
from velbus.messages.switch_relay_on import SwitchRelayOnMessage
from velbus.messages.relay_status import RelayStatusMessage


[docs]class VMB4RYModule(Module): """ Velbus Relay module. """ def __init__(self, module_type, module_name, module_address, controller): Module.__init__(self, module_type, module_name, module_address, controller) self._is_on = {} self._callbacks = {}
[docs] def number_of_channels(self): return 5
[docs] def is_on(self, channel): """ Check if a switch is turned on :return: bool """ if channel in self._is_on: return self._is_on[channel] return False
[docs] def turn_on(self, channel, callback=None): """ Turn on switch. :return: None """ if callback is None: def callb(): """No-op""" pass callback = callb message = SwitchRelayOnMessage(self._address) message.relay_channels = [channel] self._controller.send(message, callback)
[docs] def turn_off(self, channel, callback=None): """ Turn off switch. :return: None """ if callback is None: def callb(): """No-op""" pass callback = callb message = SwitchRelayOffMessage(self._address) message.relay_channels = [channel] self._controller.send(message, callback)
def _on_message(self, message): if isinstance(message, RelayStatusMessage): self._is_on[message.channel] = message.is_on() if message.channel in self._callbacks: for callback in self._callbacks[message.channel]: callback(message.is_on())
[docs] def on_status_update(self, channel, callback): """ Callback to execute on status of update of channel """ if channel not in self._callbacks: self._callbacks[channel] = [] self._callbacks[channel].append(callback)
[docs] def get_categories(self, channel): return ["switch"]
[docs]class VMB4RY(VMB4RYModule):
[docs] def number_of_channels(self): return 4
def _on_message(self, message): if isinstance(message, RelayStatusMessage): self._is_on[message.channel] = message.channel_is_on() if message.channel in self._callbacks: for callback in self._callbacks[message.channel]: callback(message.channel_is_on())
[docs]class VMB1RY(VMB4RYModule):
[docs] def number_of_channels(self): return 1
def _on_message(self, message): if isinstance(message, RelayStatusMessage): self._is_on[message.channel] = message.channel_is_on() if message.channel in self._callbacks: for callback in self._callbacks[message.channel]: callback(message.channel_is_on())
register_module("VMB4RYLD", VMB4RYModule) register_module("VMB4RYNO", VMB4RYModule) register_module("VMB1RYNO", VMB4RYModule) register_module("VMB1RYNOS", VMB4RYModule) register_module("VMB1RY", VMB1RY) register_module("VMB4RY", VMB4RY) register_module("VMB1RYS", VMB1RY)