"""
:author: Tom Dupré <gitd8400@gmail.com>
"""
import json
import struct
from velbus.message import Message
from velbus.command_registry import register_command
COMMAND_CODE = 0x05
[docs]class CoverUpMessage(Message):
"""
sent by:
received by: VMB2BLE
"""
def __init__(self, address=None):
Message.__init__(self)
self.channel = 0
self.delay_time = 0
self.set_defaults(address)
[docs] def populate(self, priority, address, rtr, data):
"""
:return: None
"""
assert isinstance(data, bytes)
self.needs_high_priority(priority)
self.needs_no_rtr(rtr)
self.needs_data(data, 4)
self.set_attributes(priority, address, rtr)
self.channel = self.byte_to_channel(data[0])
self.needs_valid_channel(self.channel, 2)
(self.delay_time,) = struct.unpack(">L", bytes([0]) + data[1:])
[docs] def to_json(self):
"""
:return: str
"""
json_dict = self.to_json_basic()
json_dict["channel"] = self.channel
json_dict["delay_time"] = self.delay_time
return json.dumps(json_dict)
[docs] def set_defaults(self, address):
if address is not None:
self.set_address(address)
self.set_high_priority()
self.set_no_rtr()
[docs] def data_to_binary(self):
"""
:return: bytes
"""
return (
bytes([COMMAND_CODE, self.channels_to_byte([self.channel])])
+ struct.pack(">L", self.delay_time)[-3:]
)
[docs]class CoverUpMessage2(Message):
"""
sent by:
received by: VMB1BL VMB2BL
"""
def __init__(self, address=None):
Message.__init__(self)
self.channel = 0
self.delay_time = 0
self.set_defaults(address)
[docs] def populate(self, priority, address, rtr, data):
"""
:return: None
"""
assert isinstance(data, bytes)
self.needs_high_priority(priority)
self.needs_no_rtr(rtr)
self.needs_data(data, 4)
self.set_attributes(priority, address, rtr)
# 00000011 = channel 1
# 00001100 = channel 2
# so shift 1 bit to the right + and with 03
tmp = (data[0] >> 1) & 0x03
self.channel = self.byte_to_channel(tmp)
self.needs_valid_channel(self.channel, 2)
(self.delay_time,) = struct.unpack(">L", bytes([0]) + data[1:])
[docs] def to_json(self):
"""
:return: str
"""
json_dict = self.to_json_basic()
json_dict["channel"] = self.channel
json_dict["delay_time"] = self.delay_time
return json.dumps(json_dict)
[docs] def set_defaults(self, address):
if address is not None:
self.set_address(address)
self.set_high_priority()
self.set_no_rtr()
[docs] def data_to_binary(self):
"""
:return: bytes
"""
if self.channel == 0x01:
tmp = 0x03
else:
tmp = 0x0C
return bytes([COMMAND_CODE, tmp]) + struct.pack(">L", self.delay_time)[-3:]
register_command(COMMAND_CODE, CoverUpMessage2, "VMB1BL")
register_command(COMMAND_CODE, CoverUpMessage2, "VMB2BL")
register_command(COMMAND_CODE, CoverUpMessage, "VMB1BLE")
register_command(COMMAND_CODE, CoverUpMessage, "VMB2BLE")
register_command(COMMAND_CODE, CoverUpMessage, "VMB1BLS")