Source code for JciHitachi.status

import base64

from . import utility as util
from .model import JciHitachiAC, JciHitachiDH, JciHitachiHE


[docs] class JciHitachiCommand: # pragma: no cover """Abstract class for sending job command. Parameters ---------- gateway_mac_address : str Gateway mac address. """ def __init__(self, gateway_mac_address): self.job_info_base = bytearray.fromhex( "d0d100003c6a9dffff03e0d4ffffffff \ 00000100000000000000002000010000 \ 000000000000000002000d278050f0d4 \ 469dafd3605a6ebbdb130d278052f0d4 \ 469dafd3605a6ebbdb13060006000000 \ 0000" ) self.job_info_base[32:40] = bytearray.fromhex(hex(int(gateway_mac_address))[2:]) def get_command(self, command, value): raise NotImplementedError
[docs] def get_b64command(self, command, value): """A wrapper of get_command, generating base64 command. Parameters ---------- command : str Status name. value : int Status value. Returns ------- str Base64 command. """ return base64.b64encode(self.get_command(command, value)).decode()
[docs] class JciHitachiCommandAC(JciHitachiCommand): # pragma: no cover """Sending job command to air conditioner. Parameters ---------- gateway_mac_address : str Gateway mac address. """ def __init__(self, gateway_mac_address): super().__init__(gateway_mac_address)
[docs] def get_command(self, command, value): """Get job command. Parameters ---------- command : str Status name. value : int Status value. Returns ------- bytearray Bytearray command. """ job_info = self.job_info_base.copy() # Device type job_info[77] = 1 # Command (eg. target_temp) job_info[78] = 128 + JciHitachiAC.idx[command] # Value (eg. 27) job_info[80] = value # Checksum # Original algorithm: # xor job_info 76~80 # Since byte 76(0x06), 77(device type), and 79(0x00) are constants, # here is the simplified algorithm: # command ^ value ^ 0x07 (flip last 3 bits) job_info[81] = job_info[78] ^ job_info[80] ^ 0x07 return job_info
[docs] class JciHitachiCommandDH(JciHitachiCommand): # pragma: no cover """Sending job command to dehumidifier. Parameters ---------- gateway_mac_address : str Gateway mac address. """ def __init__(self, gateway_mac_address): super().__init__(gateway_mac_address)
[docs] def get_command(self, command, value): """Get job command. Parameters ---------- command : str Status name. value : int Status value. Returns ------- bytearray Bytearray command. """ job_info = self.job_info_base.copy() # Device type job_info[77] = 4 # Command (eg. target_temp) job_info[78] = 128 + JciHitachiDH.idx[command] # Value (eg. 27) job_info[80] = value # Checksum # Original algorithm: # xor job_info 76~80 # Since byte 76(0x06), 77(device type), and 79(0x00) are constants, # here is the simplified algorithm: # command ^ value ^ 0x02 job_info[81] = job_info[78] ^ job_info[80] ^ 0x02 return job_info
[docs] class JciHitachiCommandHE(JciHitachiCommand): # pragma: no cover """Sending job command to heat exchanger. Parameters ---------- gateway_mac_address : str Gateway mac address. """ def __init__(self, gateway_mac_address): super().__init__(gateway_mac_address)
[docs] def get_command(self, command, value): """Get job command. Parameters ---------- command : str Status name. value : int Status value. Returns ------- bytearray Bytearray command. """ job_info = self.job_info_base.copy() # Device type job_info[77] = 14 # Command (eg. target_temp) job_info[78] = 128 + JciHitachiHE.idx[command] # Value (eg. 27) job_info[80] = value # Checksum # Original algorithm: # xor job_info 76~80 # Since byte 76(0x06), 77(device type), and 79(0x00) are constants, # here is the simplified algorithm: # command ^ value ^ 0x08 job_info[81] = job_info[78] ^ job_info[80] ^ 0x08 return job_info
[docs] class JciHitachiStatusInterpreter: # pragma: no cover """Interpreting received status code. Parameters ---------- code : str status code. """ def __init__(self, code): self.base64_bytes = base64.standard_b64decode(code) def _decode_status_number(self): if 6 < self.base64_bytes[0] and ( self.base64_bytes[1], self.base64_bytes[2], ) == (0, 8): return int((self.base64_bytes[0] - 4) / 3) else: return 0 def _decode_support_number(self): if 9 < self.base64_bytes[0]: return int((self.base64_bytes[0] - 26) / 3) else: return 0 def _decode_single_status(self, max_func_number, while_counter): stat_idx = while_counter * 3 + 3 if stat_idx + 3 <= self.base64_bytes[0] - 1: status_bytes = bytearray(4) status_bytes[0] = (self.base64_bytes[stat_idx] & 0x80) != 0 status_bytes[1] = self.base64_bytes[stat_idx] & 0xFFFF7FFF status_bytes[2:4] = self.base64_bytes[stat_idx + 1 : stat_idx + 3] output = int.from_bytes(status_bytes, byteorder="little") return output else: output = util.bin_concat(0xFF, max_func_number) output = (output << 16) & 0xFFFF0000 | max_func_number return output def _decode_single_support(self, max_func_number, while_counter, init): stat_idx = while_counter * 3 + init if stat_idx + 3 <= self.base64_bytes[0] - 1: status_bytes = bytearray(4) status_bytes[0] = (self.base64_bytes[stat_idx] & 0x80) != 0 status_bytes[1] = self.base64_bytes[stat_idx] & 0xFFFF7FFF status_bytes[2:4] = self.base64_bytes[stat_idx + 1 : stat_idx + 3] output = int.from_bytes(status_bytes, byteorder="little") else: output = util.bin_concat(0xFF, max_func_number) output = (output << 16) & 0xFFFF0000 | max_func_number return output def _get_strs(self, start_idx, num_strs): strs = [] idx = start_idx while len(strs) < num_strs: char = self.base64_bytes[idx] if char == 0: strs.append(self.base64_bytes[start_idx:idx].decode()) idx += 1 start_idx = idx else: idx += 1 return idx, strs
[docs] def decode_status(self): """Decode all status codes of a peripheral. Returns ------- dict Decoded status. """ table = {} num_idx = self._decode_status_number() for i in range(num_idx): ret = self._decode_single_status(num_idx, i) idx = util.cast_bytes(ret >> 8, 1) table[idx] = ret >> 0x18 + (ret >> 0x10 * 0x100) return table
[docs] def decode_support(self): """Decode all support codes of a peripheral. Returns ------- dict Decoded support. """ init, (brand, model) = self._get_strs(8, 2) table = {"brand": brand, "model": model} num_idx = self._decode_support_number() for i in range(num_idx): ret = self._decode_single_support(num_idx, i, init) idx = util.cast_bytes(ret >> 8, 1) if idx >= 128: idx = idx - 128 # save raw value, the extraction procedure is performed in model. table[idx] = ret return table