from __future__ import annotations
import random
import time
import warnings
from typing import Optional, Union
from . import aws_connection, connection, mqtt_connection
from .model import (
JciHitachiAC,
JciHitachiACSupport,
JciHitachiAWSStatus,
JciHitachiAWSStatusSupport,
JciHitachiDH,
JciHitachiDHSupport,
JciHitachiStatus,
JciHitachiStatusSupport,
)
from .status import (
JciHitachiCommand,
JciHitachiCommandAC,
JciHitachiCommandDH,
JciHitachiStatusInterpreter,
)
[docs]
class Peripheral: # pragma: no cover
"""Peripheral (Device) Information.
Parameters
----------
peripheral_json : dict
Peripheral json of specific device.
"""
supported_device_type = {
144: "AC",
146: "DH",
# 148: "HE",
}
def __init__(self, peripheral_json: dict) -> None:
self._json = peripheral_json
self._available = False
self._status_code = ""
self._support_code = ""
self._supported_status = None
warnings.warn("This API has been deprecated.", DeprecationWarning)
def __repr__(self) -> str:
ret = (
f"name: {self.name}\n"
f"brand: {self.brand}\n"
f"model: {self.model}\n"
f"type: {self.type}\n"
f"available: {self.available}\n"
f"status_code: {self.status_code}\n"
f"support_code: {self.support_code}\n"
f"gateway_id: {self.gateway_id}\n"
f"gateway_mac_address: {self.gateway_mac_address}"
)
return ret
[docs]
@classmethod
def from_device_names(
cls, peripherals_json: dict, device_names: Optional[Union[list[str], str]]
) -> dict[str, object]:
"""Use device names to pick peripheral_jsons accordingly.
Parameters
----------
peripherals_json : dict
Peripherals_json retrieved from GetPeripheralsByUser.
device_names : list of str or str or None
Device name. If None is given, all available devices will be included, by default None.
Returns
-------
dict
A dict of Peripheral instances with device name key.
"""
peripherals = {}
if isinstance(device_names, str):
device_names = [device_names]
for result in peripherals_json["results"]:
device_name = result["DeviceName"]
device_type = result["Peripherals"][0]["DeviceType"]
if device_names is None or (device_names and device_name in device_names):
if device_type in cls.supported_device_type:
peripherals[device_name] = cls(result)
assert device_names is None or len(device_names) == len(peripherals), (
"Some of device_names are not available from the API."
)
return peripherals
@property
def available(self) -> bool:
"""Whether the device is available.
Returns
-------
bool
Return True if the device is available.
"""
return self._available
@available.setter
def available(self, x) -> None:
self._available = x
@property
def brand(self) -> str:
"""Device brand.
Returns
-------
str
Device brand.
"""
return getattr(self.supported_status, "brand")
@property
def commander(self) -> Union[JciHitachiCommand, None]:
"""Return a new JciHitachiCommand instance based on peripheral's type.
Returns
-------
JciHitachiCommand or None
JciHitachiCommand instance.
"""
if self.type == "AC":
return JciHitachiCommandAC(self.gateway_mac_address)
elif self.type == "DH":
return JciHitachiCommandDH(self.gateway_mac_address)
else:
return None
@property
def gateway_id(self) -> int:
"""Gateway ID.
Returns
-------
int
Gateway ID.
"""
return self._json["ObjectID"]
@property
def gateway_mac_address(self) -> str:
"""Gateway mac address.
Returns
-------
str
Gateway mac address.
"""
return self._json["GMACAddress"]
@property
def model(self) -> str:
"""Device model.
Returns
-------
str
Device model.
"""
return getattr(self.supported_status, "model")
@property
def name(self) -> str:
"""Device name.
Returns
-------
str
Device name.
"""
return self._json["DeviceName"]
@property
def picked_peripheral(self) -> dict:
"""Picked peripheral.
Returns
-------
dict
Picked peripheral.
"""
return self._json
@property
def status_code(self) -> str:
"""Peripheral's status code (LValue) reported by the API.
Returns
-------
str
Status code.
"""
return self._status_code
@status_code.setter
def status_code(self, x: str) -> None:
self._status_code = x
@property
def support_code(self) -> str:
"""Peripheral's support code (LValue) reported by the API.
Returns
-------
str
Status code.
"""
return self._support_code
@support_code.setter
def support_code(self, x: str) -> None:
self._support_code = x
if self.type == "AC":
self._supported_status = JciHitachiACSupport(
JciHitachiStatusInterpreter(x).decode_support()
)
elif self.type == "DH":
self._supported_status = JciHitachiDHSupport(
JciHitachiStatusInterpreter(x).decode_support()
)
@property
def supported_status(self) -> JciHitachiStatusSupport:
"""Peripheral's supported status converted from support_code.
Returns
-------
JciHitachiStatusSupport
Supported status.
"""
return self._supported_status
@property
def type(self) -> str:
"""Device type.
Returns
-------
str
Device type.
If not supported, 'unknown' will be returned. (currently supports: `AC`, `DH`)
"""
return self.supported_device_type.get(
self._json["Peripherals"][0]["DeviceType"], "unknown"
)
[docs]
class JciHitachiAPI: # pragma: no cover
"""Jci-Hitachi API.
Parameters
----------
email : str
User email.
password : str
User password.
device_names : list of str or str or None, optional
Device names. If None is given, all available devices will be included, by default None.
max_retries : int, optional
Maximum number of retries when setting status, by default 5.
device_offline_timeout: float, optional
Device offline timeout, by default 45.0.
print_response : bool, optional
If set, all responses of httpx and MQTT will be printed, by default False.
"""
def __init__(
self,
email: str,
password: str,
device_names: Optional[Union[list[str], str]] = None,
max_retries: int = 5,
device_offline_timeout: float = 45.0,
print_response: bool = False,
) -> None:
self.email = email
self.password = password
self.device_names = device_names
self.max_retries = max_retries
self.device_offline_timeout = device_offline_timeout
self.print_response = print_response
self._mqtt = None
self._device_id = random.randint(1000, 6999)
self._peripherals = {}
self._session_token = None
self._user_id = None
self._task_id = 0
warnings.warn("This API has been deprecated.", DeprecationWarning)
@property
def peripherals(self) -> dict[str, Peripheral]:
"""Picked peripherals.
Returns
-------
dict
A dict of Peripherals.
"""
return self._peripherals
@property
def user_id(self) -> Optional[int]:
"""User ID.
Returns
-------
int
User ID.
"""
return self._user_id
@property
def task_id(self) -> int:
"""Task ID.
Returns
-------
int
Serial number counted from 0, with maximum 999.
"""
self._task_id += 1
if self._task_id >= 1000:
self._task_id = 1
return self._task_id
def _sync_peripherals_availablity(self) -> None:
device_access_time = self._mqtt.mqtt_events.device_access_time
for peripheral in self._peripherals.values():
if (
peripheral.gateway_id in device_access_time
and abs(time.time() - device_access_time[peripheral.gateway_id])
< self.device_offline_timeout
):
peripheral.available = True
else:
peripheral.available = False
[docs]
def login(self) -> None:
"""Login API.
Raises
------
RuntimeError
If a login error occurs, RuntimeError will be raised.
"""
conn = connection.GetPeripheralsByUser(
self.email, self.password, print_response=self.print_response
)
conn_status, conn_json = conn.get_data()
self._session_token = conn.session_token
if conn_status == "OK":
if len(conn_json["results"]) != 0:
self._user_id = conn_json["results"][0]["Owner"]
# peripherals
self._peripherals = Peripheral.from_device_names(
conn_json, self.device_names
)
self.device_names = list(self._peripherals.keys())
# mqtt
self._mqtt = mqtt_connection.JciHitachiMqttConnection(
self.email,
self.password,
self._user_id,
print_response=self.print_response,
)
self._mqtt.configure()
self._mqtt.connect()
# status
self.refresh_status()
else:
raise RuntimeError(f"An error occurred when API login: {conn_status}.")
[docs]
def logout(self) -> None:
"""Logout API."""
self._mqtt.disconnect()
[docs]
def change_password(self, new_password: str) -> None:
"""Change password.
Parameters
----------
new_password : str
New password.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
conn = connection.UpdateUserCredential(
self.email,
self.password,
session_token=self._session_token,
print_response=self.print_response,
)
conn_status, conn_json = conn.get_data(new_password)
self._session_token = conn.session_token
[docs]
def get_status(
self, device_name: Optional[str] = None
) -> dict[str, JciHitachiStatus]:
"""Get device status after refreshing status.
Parameters
----------
device_name : str, optional
Getting a device's status by its name.
If None is given, all devices' status will be returned,
by default None.
Returns
-------
dict of JciHitachiStatus.
Return a dict of JciHitachiStatus instances according to device type.
For example, if the device type is `AC`, then return JciHitachiAC instance.
"""
statuses = {}
for name, peripheral in self._peripherals.items():
if (device_name and name != device_name) or peripheral.type == "unknown":
continue
dev_status = JciHitachiStatusInterpreter(
peripheral.status_code
).decode_status()
if peripheral.type == "AC":
statuses[name] = JciHitachiAC(dev_status)
elif peripheral.type == "DH":
statuses[name] = JciHitachiDH(dev_status)
return statuses
[docs]
def get_supported_status(
self, device_name: Optional[str] = None
) -> dict[str, JciHitachiStatusSupport]:
"""Get supported device status after refreshing status.
Parameters
----------
device_name : str, optional
Getting a device's status by its name.
If None is given, all devices' status will be returned,
by default None.
Returns
-------
dict of JciHitachiStatusSupport.
Return a dict of JciHitachiStatusSupport instances according to device type.
For example, if the device type is `AC`, then return JciHitachiACSupport instance.
"""
supported_statuses = {}
for name, peripheral in self._peripherals.items():
if (device_name and name != device_name) or peripheral.type == "unknown":
continue
supported_statuses[name] = peripheral.supported_status
return supported_statuses
[docs]
def refresh_status(self, device_name: Optional[str] = None) -> None:
"""Refresh device status from the API.
Parameters
----------
device_name : str, optional
Refreshing a device's status by its name.
If None is given, all devices' status will be refreshed,
by default None.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
self._sync_peripherals_availablity()
conn = connection.GetDataContainerByID(
self.email,
self.password,
session_token=self._session_token,
print_response=self.print_response,
)
for name, peripheral in self._peripherals.items():
if (device_name and name != device_name) or peripheral.type == "unknown":
continue
conn_status, conn_json = conn.get_data(peripheral.picked_peripheral)
self._session_token = conn.session_token
if conn_status == "OK":
peripheral.support_code = conn_json["results"]["DataContainer"][0][
"ContDetails"
][0]["LValue"]
peripheral.status_code = conn_json["results"]["DataContainer"][0][
"ContDetails"
][1]["LValue"]
else:
raise RuntimeError(
f"An error occurred when refreshing status: {conn_status}"
)
[docs]
def set_status(self, status_name: str, status_value: int, device_name: str) -> bool:
"""Set status to a peripheral.
Parameters
----------
status_name : str
Status name, which has to be in idx dict. E.g. JciHitachiAC.idx
status_value : int
Status value.
device_name : str
Device name.
Returns
-------
bool
Return True if the command has been successfully executed. Otherwise, return False.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
self._sync_peripherals_availablity()
if not self._peripherals[device_name].available:
return False
commander = self._peripherals[device_name].commander
conn = connection.CreateJob(
self.email,
self.password,
session_token=self._session_token,
print_response=self.print_response,
)
conn2 = connection.GetJobDoneReport(
self.email,
self.password,
session_token=self._session_token,
print_response=self.print_response,
)
# The mqtt events occurring order:
# job (occurs one time) ->
# job_done_report (occurs many times until the job status is successful) ->
# peripheral (occurs one time, if this step is timed out, the job command fails)
self._mqtt.mqtt_events.job.clear()
self._mqtt.mqtt_events.job_done_report.clear()
self._mqtt.mqtt_events.peripheral.clear()
conn_status, conn_json = conn.get_data(
gateway_id=self._peripherals[device_name].gateway_id,
device_id=self._device_id,
task_id=self.task_id,
job_info=commander.get_b64command(status_name, status_value),
)
self._session_token = conn.session_token
if not self._mqtt.mqtt_events.job.wait(timeout=10.0):
return False
for _ in range(self.max_retries):
if not self._mqtt.mqtt_events.job_done_report.wait(timeout=10.0):
self._mqtt.mqtt_events.job_done_report.clear()
continue
conn_status, conn_json = conn2.get_data(device_id=self._device_id)
if (
conn_status == "OK"
and len(conn_json["results"]) != 0
and conn_json["results"][0]["JobStatus"] == 0
):
if not self._mqtt.mqtt_events.peripheral.wait(timeout=10.0):
return False
time.sleep(0.5) # still needs to wait a moment.
return True
return False
[docs]
class AWSThing:
"""AWS thing (device) information.
Parameters
----------
thing_json : dict
Thing json of a specific device.
"""
supported_device_type = {
"1": "AC",
"2": "DH",
"3": "HE",
# "4": "PM25_PANEL",
}
def __init__(self, thing_json: dict) -> None:
self._json: dict = thing_json
self._available: bool = True
self._shadow: Optional[dict] = None
self._status_code: Optional[JciHitachiAWSStatus] = None
self._support_code: Optional[JciHitachiAWSStatusSupport] = None
self._monthly_data: Optional[list[dict]] = None
def __repr__(self) -> str:
ret = (
f"name: {self.name}\n"
f"brand: {self.brand}\n"
f"model: {self.model}\n"
f"type: {self.type}\n"
f"firmware_code: {self.firmware_code}\n"
f"firmawre_version: {self.firmware_version}\n"
f"available: {self.available}\n"
f"status_code: {self.status_code}\n"
f"support_code: {self.support_code}\n"
f"shadow: {self.shadow}\n"
f"gateway_mac_address: {self.gateway_mac_address}"
)
return ret
[docs]
@classmethod
def from_device_names(
cls, things_json: dict, device_names: Optional[Union[list[str], str]]
) -> dict[str, object]:
"""Use device names to pick things_json accordingly.
Parameters
----------
things_json : dict
things_json retrieved from aws_connection.GetAllDevice.
device_names : list of str or str or None
Device name. If None is given, all available devices will be included, by default None.
Returns
-------
dict
A dict of AWSThing instances with device name key.
"""
things = {}
if isinstance(device_names, str):
device_names = [device_names]
for thing in things_json["results"]["Things"]:
device_name = thing["CustomDeviceName"]
device_type = thing["DeviceType"]
if device_names is None or (device_names and device_name in device_names):
if device_type in cls.supported_device_type:
things[device_name] = cls(thing)
assert device_names is None or len(device_names) == len(things), (
"Some of device_names are not available from the API."
)
return things
@property
def available(self) -> bool:
"""Whether the device is available.
Returns
-------
bool
Return True if the device is available.
"""
return self._available
@available.setter
def available(self, x: bool) -> None:
self._available = x
@property
def brand(self) -> str:
"""Device brand.
Returns
-------
str
Device brand.
"""
return getattr(self._support_code, "Brand")
@property
def firmware_version(self) -> str:
"""Firmware version.
Returns
-------
str
Device firmware version.
"""
return getattr(self._support_code, "FirmwareVersion")
@property
def firmware_code(self) -> str:
"""Firmware code.
Returns
-------
str
Device firmware code.
"""
return getattr(self._support_code, "FirmwareCode")
@property
def gateway_mac_address(self) -> str:
"""Gateway mac address.
Returns
-------
str
Gateway mac address.
"""
return self._json["ThingName"].split("_")[-1]
@property
def model(self) -> str:
"""Device model.
Returns
-------
str
Device model.
"""
return getattr(self._support_code, "Model")
@property
def name(self) -> str:
"""Device name.
Returns
-------
str
Device name.
"""
return self._json["CustomDeviceName"]
@property
def picked_thing(self) -> dict:
"""Picked thing.
Returns
-------
dict
Picked thing.
"""
return self._json
@property
def shadow(self) -> Optional[dict]:
"""Thing's shadow reported by the API.
Returns
-------
dict
Shadow.
"""
return self._shadow
@shadow.setter
def shadow(self, x: dict) -> None:
self._shadow = x
@property
def status_code(self) -> Optional[JciHitachiAWSStatus]:
"""Thing's status code reported by the API.
Returns
-------
JciHitachiAWSStatus
Status code.
"""
return self._status_code
@status_code.setter
def status_code(self, x: JciHitachiAWSStatus) -> None:
self._status_code = x
@property
def support_code(self) -> Optional[JciHitachiAWSStatusSupport]:
"""Thing's support code reported by the API.
Returns
-------
JciHitachiAWSStatusSupport
Status code.
"""
return self._support_code
@support_code.setter
def support_code(self, x: JciHitachiAWSStatusSupport) -> None:
self._support_code = x
@property
def monthly_data(self) -> Optional[list[dict]]:
"""Thing's monthly data reported by the API.
Returns
-------
Optional[list[dict]]
Monthly data.
"""
return self._monthly_data
@monthly_data.setter
def monthly_data(self, x: Optional[list[dict]]) -> None:
self._monthly_data = x
@property
def thing_name(self) -> str:
return self._json["ThingName"]
@property
def type(self) -> str:
"""Device type.
Returns
-------
str
Device type.
If not supported, 'unknown' will be returned.
"""
return self.supported_device_type.get(self._json["DeviceType"], "unknown")
[docs]
class JciHitachiAWSAPI:
"""Jci-Hitachi API.
Parameters
----------
email : str
User email.
password : str
User password.
device_names : list of str or str or None, optional
Device names. If None is given, all available devices will be included, by default None.
max_retries : int, optional
Maximum number of retries when setting status, by default 5.
device_offline_timeout: float, optional
For future use.
print_response : bool, optional
If set, all responses of httpx and MQTT will be printed, by default False.
"""
def __init__(
self,
email: str,
password: str,
device_names: Optional[Union[list[str], str]] = None,
max_retries: int = 5,
device_offline_timeout: float = 10.0,
print_response: bool = False,
) -> None:
self.email: str = email
self.password: str = password
self.device_names: Optional[Union[list[str], str]] = device_names
self.max_retries: int = max_retries
self.print_response: bool = print_response
self._mqtt: Optional[aws_connection.JciHitachiAWSMqttConnection] = None
self._mqtt_timeout: float = device_offline_timeout
self._shadow_names: Union[str, list] = ["info"]
self._device_id: int = random.randint(1000, 6999)
self._things: dict[str, AWSThing] = {}
self._aws_tokens: Optional[aws_connection.AWSTokens] = None
self._aws_identity: Optional[aws_connection.AWSIdentity] = None
self._task_id: int = 0
@property
def things(self) -> dict[str, AWSThing]:
"""Picked things.
Returns
-------
dict of AWSThing
A dict of AWSThing instances.
"""
return self._things
@property
def task_id(self) -> int:
"""Task ID.
Returns
-------
int
Serial number counted from 0, with maximum 999.
"""
self._task_id += 1
if self._task_id >= 1000:
self._task_id = 1
return self._task_id
def _check_before_publish(self) -> None:
# Reauthenticate 5 mins before AWSTokens expiration.
current_time = time.time()
if self._aws_tokens is None or (
self._aws_tokens.expiration - current_time <= 300
):
self.reauth()
if self._mqtt.mqtt_events.mqtt_error_event.is_set():
self._mqtt.mqtt_events.mqtt_error_event.clear()
self.reauth()
def _get_valid_things(
self, device_name: Optional[str] = None
) -> tuple[str, AWSThing]:
for name, thing in self._things.items():
if (device_name and name != device_name) or thing.type == "unknown":
continue
yield name, thing
def _delay(self) -> None:
time.sleep(0.2)
[docs]
def login(self) -> None:
"""Login API.
Raises
------
RuntimeError
If a login error occurs, RuntimeError will be raised.
"""
conn = aws_connection.GetUser(
email=self.email,
password=self.password,
print_response=self.print_response,
)
self._aws_tokens = conn.aws_tokens
conn_status, self._aws_identity = conn.get_data()
conn = aws_connection.GetAllDevice(
self._aws_tokens, print_response=self.print_response
)
conn_status, conn_json = conn.get_data()
if conn_status == "OK":
# things
self._things = AWSThing.from_device_names(conn_json, self.device_names)
self.device_names = list(self._things.keys())
thing_names = [value.thing_name for value in self._things.values()]
# mqtt
def get_credential_callable():
self._check_before_publish()
conn = aws_connection.GetCredentials(
email=self.email,
password=self.password,
aws_tokens=self._aws_tokens,
print_response=self.print_response,
)
conn_status, aws_credentials = conn.get_data(self._aws_identity)
if conn_status != "OK":
aws_connection._LOGGER.error(
f"An error occurred when acquiring a new AwsCredentials: {conn_status}"
)
return aws_credentials
self._mqtt = aws_connection.JciHitachiAWSMqttConnection(
get_credential_callable, print_response=self.print_response
)
self._mqtt.configure(self._aws_identity.identity_id)
if not self._mqtt.connect(
self._aws_identity.host_identity_id, self._shadow_names, thing_names
):
raise RuntimeError(
"An error occurred when connecting to MQTT endpoint."
)
# status
self.refresh_status(refresh_support_code=True, refresh_shadow=True)
else:
raise RuntimeError(
f"An error occurred when retrieving devices info: {conn_status}"
)
[docs]
def logout(self) -> None:
"""Logout API."""
self._mqtt.disconnect()
[docs]
def reauth(self) -> None:
"""Reauthenticate with AWS Cognito Service."""
conn = aws_connection.JciHitachiAWSCognitoConnection(
email=self.email,
password=self.password,
aws_tokens=self._aws_tokens,
print_response=self.print_response,
)
conn_status, self._aws_tokens = conn.login(use_refresh_token=False)
if conn_status != "OK":
raise RuntimeError(
f"An error occurred when reauthenticating with AWS Cognito Service: {conn_status}"
)
[docs]
def change_password(self, new_password: str) -> None:
"""Change password.
Warning:
Use this function carefully, be sure you specify a strong enough password;
otherwise, your password might be accepted by the Hitachi account management but not be accepted by AWS Cognito or vice versa,
which will result in a login failure in the APP.
Parameters
----------
new_password : str
New password.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
# We have to change the password in both AWS Cognito and Hitachi owned account management simultaneously.
conn = aws_connection.ChangePassword(
self.email,
self.password,
aws_tokens=self._aws_tokens,
print_response=self.print_response,
)
aws_conn_status, _ = conn.get_data(new_password)
if aws_conn_status != "OK":
raise RuntimeError(
f"An error occurred when changing AWS Cognito password: {aws_conn_status}"
)
conn = connection.UpdateUserCredential(
self.email, self.password, print_response=self.print_response
)
hitachi_conn_status, _ = conn.get_data(new_password)
if hitachi_conn_status != "OK":
raise RuntimeError(
f"An error occurred when changing Hitachi password: {hitachi_conn_status}"
)
[docs]
def refresh_monthly_data(self, months: int, device_name: str) -> None:
"""Refresh available monthly data (power consumption) from the API.
Parameters
----------
months : int
Number of months to get.
device_name : str
Device name.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
thing = self._things[device_name]
current_timestamp_millis = time.time() * 1000
conn = aws_connection.GetAvailableAggregationMonthlyData(
self._aws_tokens, print_response=self.print_response
)
conn_status, response = conn.get_data(
thing.thing_name,
int(
current_timestamp_millis - months * 2678400000
), # 2678400000 ms == 31 days
int(current_timestamp_millis),
)
if conn_status != "OK":
raise RuntimeError(
f"An error occurred when getting monthly data: {conn_status}"
)
thing.monthly_data = sorted(
response["results"]["Data"], key=lambda x: x["Timestamp"]
)
[docs]
def refresh_status(
self,
device_name: Optional[str] = None,
refresh_support_code: bool = False,
refresh_shadow: bool = False,
) -> None:
"""Refresh device status from the API.
Parameters
----------
device_name : str, optional
Refreshing a device's status by its name.
If None is given, all devices' status will be refreshed,
by default None.
refresh_support_code : bool, optional
Whether or not to refresh support code, by default False.
refresh_shadow : bool, optional
Whether or not to refresh AWS IoT Shadow, by default False.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
# queue tasks
for name, thing in self._get_valid_things(device_name):
self._check_before_publish()
if refresh_support_code:
self._mqtt.publish(
self._aws_identity.host_identity_id,
thing.thing_name,
"support",
self._mqtt_timeout,
)
if refresh_shadow:
self._mqtt.publish_shadow(thing.thing_name, "get", shadow_name="info")
self._mqtt.publish(
self._aws_identity.host_identity_id,
thing.thing_name,
"status",
self._mqtt_timeout,
)
# execute
support_results, shadow_results, status_results, _ = self._mqtt.execute()
# gather results
for name, thing in self._get_valid_things(device_name):
if refresh_support_code:
if thing.thing_name in support_results:
if thing.thing_name not in self._mqtt.mqtt_events.device_support:
raise RuntimeError(
f"An event occurred but wasn't accompanied with data when refreshing {name} support code."
)
thing.support_code = self._mqtt.mqtt_events.device_support[
thing.thing_name
]
else:
raise RuntimeError(
f"Timed out refreshing {name} support code. Please ensure the device is online and avoid opening the official app."
)
if refresh_shadow:
if thing.thing_name in shadow_results:
if thing.thing_name not in self._mqtt.mqtt_events.device_shadow:
raise RuntimeError(
f"An event occurred but wasn't accompanied with data when refreshing {name} shadow."
)
thing.shadow = self._mqtt.mqtt_events.device_shadow[
thing.thing_name
]
else:
raise RuntimeError(
f"Timed out refreshing {name} shadow. Please ensure the device is online and avoid opening the official app."
)
if thing.thing_name in status_results:
if thing.thing_name not in self._mqtt.mqtt_events.device_status:
raise RuntimeError(
f"An event occurred but wasn't accompanied with data when refreshing {name} status code."
)
thing.status_code = self._mqtt.mqtt_events.device_status[
thing.thing_name
]
else:
raise RuntimeError(
f"Timed out refreshing {name} status code. Please ensure the device is online and avoid opening the official app."
)
[docs]
def get_status(
self, device_name: Optional[str] = None, legacy: bool = False
) -> dict[str, JciHitachiAWSStatus]:
"""Get device status after refreshing status.
Parameters
----------
device_name : str, optional
Getting a device's status by its name.
If None is given, all devices' status will be returned,
by default None.
legacy : bool, optional
Whether or not to return status with legacy status name, by default False.
Returns
-------
dict of JciHitachiAWSStatus.
A dict of JciHitachiAWSStatus instances.
"""
statuses = {}
for name, thing in self._get_valid_things(device_name):
if legacy:
statuses[name] = thing.status_code.legacy_status
else:
statuses[name] = thing.status_code
# inject temp and humidity limitations from the support code
if thing.type == "AC":
statuses[name]._status["max_temp"] = thing.support_code.max_temp
statuses[name]._status["min_temp"] = thing.support_code.min_temp
elif thing.type == "DH":
statuses[name]._status["max_humidity"] = thing.support_code.max_humidity
statuses[name]._status["min_humidity"] = thing.support_code.min_humidity
return statuses
[docs]
def set_status(
self,
status_name: str,
device_name: str,
status_value: int = None,
status_str_value: str = None,
) -> bool:
"""Set status to a thing. Either status_value or status_str_value must be specified.
Parameters
----------
status_name : str
Status name.
device_name : str
Device name.
status_value : int, optional
Status value, by default None.
status_str_value : str, optional
Status string value, by default None.
Returns
-------
bool
Return True if the command has been successfully executed. Otherwise, return False.
Raises
------
RuntimeError
If an error occurs, RuntimeError will be raised.
"""
self._check_before_publish()
thing = self._things[device_name]
is_valid, status_name, status_value = JciHitachiAWSStatus.str2id(
device_type=thing.type,
status_name=status_name,
status_value=status_value,
status_str_value=status_str_value,
support_code=thing.support_code,
)
if not is_valid:
return False
shadow_publish_mapping = {
"CleanFilterNotification": "filter",
"CleanNotification": "filter",
"CleanSecondaryFilterNotification": "filter",
"FrontFilterNotification": "filter",
"Pm25FilterNotification": "filter",
"enableQAMode": "qa",
}
if False: # status_name in shadow_publish_mapping: # TODO: replace False cond after shadow function is completed.
shadow_publish_schema = {}
if (
shadow_publish_mapping[status_name] == "filter"
or shadow_publish_mapping[status_name] == "qa"
):
shadow_publish_schema.update({status_name: bool(status_value)})
# if thing.type == "AC": # there is an additional parameter for AC
# shadow_publish_schema.update("FilterElapsedHour", 0 if status_value == 0 else status_value)
self._mqtt.publish_shadow(
thing.thing_name,
"update",
{"state": {"reported": {**shadow_publish_schema}}},
shadow_name="info",
)
if self._mqtt.mqtt_events.device_control_event.wait(
timeout=self._mqtt_timeout
):
device_control = self._mqtt.mqtt_events.device_control.get(
thing.thing_name
)
if device_control["state"]["reported"][status_name] == bool(
status_value
):
self._mqtt.mqtt_events.device_control_event.clear()
self._delay()
return True
return False
self._mqtt.publish(
self._aws_identity.host_identity_id,
thing.thing_name,
"control",
self._mqtt_timeout,
{
status_name: status_value,
"TaskID": self.task_id,
"Timestamp": int(time.time()),
},
)
_, _, _, control_results = self._mqtt.execute(control=True)
if thing.thing_name in control_results:
device_control = self._mqtt.mqtt_events.device_control.get(thing.thing_name)
if device_control.get(status_name) == status_value:
thing.status_code.set_new_status(status_name, status_value)
return True
return False