Source code for velbus.module

"""
:author: Thomas Delaet <thomas@delaet.org>
"""
import string
import struct
from velbus.messages.read_data_from_memory import ReadDataFromMemoryMessage
from datetime import datetime, timedelta
from velbus.messages.memory_data import MemoryDataMessage
from velbus.messages.channel_name_part1 import ChannelNamePart1Message
from velbus.messages.channel_name_part1 import ChannelNamePart1Message2
from velbus.messages.channel_name_part2 import ChannelNamePart2Message
from velbus.messages.channel_name_part2 import ChannelNamePart2Message2
from velbus.messages.channel_name_part3 import ChannelNamePart3Message
from velbus.messages.channel_name_part3 import ChannelNamePart3Message2
from velbus.messages.module_type import ModuleTypeMessage
from velbus.messages.module_subtype import ModuleSubTypeMessage
from velbus.messages.module_status_request import ModuleStatusRequestMessage
from velbus.messages.channel_name_request import ChannelNameRequestMessage


[docs]class Module(object): """ Abstract class for Velbus hardware modules. """ def __init__(self, module_type, module_name, module_address, controller): self._type = module_type self._model_name = module_name self._name = False self._address = module_address self._master_address = 0xFF self.sub_module = 0 self.serial = 0 self.memory_map_version = 0 self.build_year = 0 self.build_week = 0 self._channel_names = {} self._name_data = {} self._loaded_callbacks = [] self.loaded = False self._loading_triggered = False self._last_channel_name_msg = datetime.utcnow() self._controller = controller self._controller.subscribe(self.on_message) if not self._is_submodule(): self._data = controller._module_data["0x{:02x}".format(module_type)] else: self._data = {} self._memoryRead = {}
[docs] def get_module_name(self): """ Returns the module model name :return: str """ if self._name: return self._name return self._model_name
[docs] def get_module_address(self): """ Returns the module address :return: int """ return self._address
[docs] def get_name(self, channel): """ Get name for one of the channels :return: str """ return self._channel_names[channel]
[docs] def get_module_type_name(self): return self._model_name
[docs] def get_type(self): return self._type
[docs] def get_categories(self, channel): """ Get type of functionality of channel :return: str """ return []
[docs] def on_message(self, message): """ Process received message """ if isinstance(message, ChannelNamePart1Message) or isinstance( message, ChannelNamePart1Message2 ): if (message.address == self._address) or ( self._is_submodule() and (message.address == self._master_address) ): self._process_channel_name_message(1, message) elif isinstance(message, ChannelNamePart2Message) or isinstance( message, ChannelNamePart2Message2 ): if (message.address == self._address) or ( self._is_submodule() and (message.address == self._master_address) ): self._process_channel_name_message(2, message) elif isinstance(message, ChannelNamePart3Message) or isinstance( message, ChannelNamePart3Message2 ): if (message.address == self._address) or ( self._is_submodule() and (message.address == self._master_address) ): self._process_channel_name_message(3, message) elif isinstance(message, MemoryDataMessage): if message.address == self._address: for typ, item in self._memoryRead.items(): if (message.high_address, message.low_address) in item: if message.data == 0xFF: self._memoryRead[typ].remove( (message.high_address, message.low_address) ) if typ == "moduleName": self._moduleName_is_complete() else: idx = [ i for i, x in enumerate(self._memoryRead[typ]) if x == (message.high_address, message.low_address) ] self._memoryRead[typ][idx[0]] = chr(message.data) break else: if message.address == self._address: if isinstance(message, ModuleTypeMessage): self._process_module_type_message(message) elif isinstance(message, ModuleSubTypeMessage): self._process_module_subtype_message(message) else: self._on_message(message)
def _on_message(self, message): pass
[docs] def load(self, callback): """ Retrieve names of channels """ if not self.loaded: if not self._loading_triggered: self._loading_triggered = True if not self._is_submodule(): # load the data from memory ( the stuff that we need) self._load_memory() # load the module status self._request_module_status() if not self._is_submodule(): # load the channel names self._request_channel_name() # load the module specific stuff self._load() else: # Request channel names if last received if ( not self._is_submodule() and not self._name_messages_complete() and self._last_channel_name_msg < datetime.utcnow() - timedelta(seconds=10) ): self._request_channel_name() if callback: self._loaded_callbacks.append(callback) else: if callback: callback()
[docs] def loading_in_progress(self): return not self._name_messages_complete()
def _load(self): pass
[docs] def number_of_channels(self): """ Retrieve the number of available channels in this module :return: int """ raise NotImplementedError
[docs] def light_is_buttonled(self, channel): return False
def _is_submodule(self): return False def _name_count_needed(self): return self.number_of_channels() * 3 def _process_channel_name_message(self, part, message): self._last_channel_name_msg = datetime.utcnow() channel = message.channel if self._is_submodule(): channel = channel - (self.number_of_channels() * self.sub_module) if 1 <= channel <= self.number_of_channels(): if channel not in self._name_data: self._name_data[channel] = {} self._name_data[channel][part] = message.name if self._name_messages_complete(): self._generate_names() else: if channel <= self.number_of_channels(): if channel not in self._name_data: self._name_data[channel] = {} self._name_data[channel][part] = message.name if self._name_messages_complete(): self._generate_names() def _process_module_type_message(self, message): self.serial = message.serial self.memory_map_version = int(message.memory_map_version) self.build_year = int(message.build_year) self.build_week = int(message.build_week) def _process_module_subtype_message(self, message): self.serial = message.serial def _generate_names(self): assert self._name_messages_complete() for channel in range(1, self.number_of_channels() + 1): name_parts = self._name_data[channel] name = name_parts[1] + name_parts[2] + name_parts[3] self._channel_names[channel] = "".join( filter(lambda x: x in string.printable, name) ) self._name_data = {} self.loaded = True for callback in self._loaded_callbacks: callback() self._loaded_callbacks = [] def _name_messages_complete(self): """ Check if all name messages have been received """ for channel in range(1, self.number_of_channels() + 1): try: for name_index in range(1, 4): if not isinstance(self._name_data[channel][name_index], str): return False except KeyError: return False return True def _moduleName_is_complete(self): self._name = "" for char in self._memoryRead["moduleName"]: if type(char) is str: self._name = self._name + char del self._memoryRead["moduleName"] def _request_module_status(self): message = ModuleStatusRequestMessage(self._address) message.channels = list(range(1, self.number_of_channels() + 1)) self._controller.send(message) def _request_channel_name(self): message = ChannelNameRequestMessage(self._address) message.channels = list(range(1, self.number_of_channels() + 1)) self._controller.send(message) def _load_memory(self): if "memory" not in self._data: return for memoryType, matchData in self._data["memory"].items(): self._memoryRead[memoryType] = [] # TODO matchbuild + matchversion for addrRange in matchData["address"].split(";"): addrR = addrRange.split("-") for addr in range(int(addrR[0], 0), int(addrR[1], 0)): addr = struct.unpack(">BB", struct.pack(">h", addr)) self._memoryRead[memoryType].append(addr) message = ReadDataFromMemoryMessage(self._address) message.high_address = addr[0] message.low_address = addr[1] self._controller.send(message)