from machine import Timer # type: ignore from machine import ExtInt # type: ignore from machine import WDT # type: ignore from machine import RTC # type: ignore from misc import Power # type: ignore import machine # type: ignore import time import net # type: ignore import modem # type: ignore import _thread import uos # type: ignore import ujson # type: ignore import utime # type: ignore import checkNet # type: ignore import gc import ntptime # type: ignore import ubinascii # type: ignore import uhashlib # type: ignore import request # type: ignore import ql_fs # type: ignore try: from usr.umqtt import MQTTClient except: pass ##### Constants ##### class Constants: C_FW_VERSION = "0.15" C_FW_DATE = "2026-01-27" C_LOG_LEVEL = 1 # 0=trace, 1=debug, 2=info, 3=warn, 4=error C_MQTT_TOPIC_TELEMETRY = "telemetry" C_MQTT_TOPIC_STATUS = "status" C_MQTT_TOPIC_EVENT = "event" C_MQTT_TOPIC_RESPONSE = "response" ##### Utility Functions ##### def is_int(s): try: int(s) return True except ValueError: return False def is_float(s): try: float(s) return True except ValueError: return False def file_exists(filename): try: uos.stat(filename) return True except OSError: return False def remove_files(filenames): if not isinstance(filenames, list): filenames = [filenames] for filename in filenames: try: uos.remove(filename) except: pass def get_file_md5_checksum(path, chunk_size=1024): if path is None: return None if not file_exists(path): return None hash = uhashlib.md5() try: with open(path, "rb") as f: while True: data = f.read(chunk_size) if not data: break hash.update(data) except Exception as e: print_error("File MD5 checksum error:", e) return None return ubinascii.hexlify(hash.digest()).decode() ##### Class Definitions ##### DEFAULT_CONFIG_FILE = "usr/fw_config_main.json" BACKUP_CONFIG_FILE = "usr/fw_config_backup.json" class Config: def __init__(self): config = ql_fs.read_json(DEFAULT_CONFIG_FILE) config = config if config else dict() self.data = { "mqtt_server": "mqtt.smartfire.ai", "mqtt_port": 1883, "mqtt_user": "elight", "mqtt_pass": "123", "mqtt_client_id": "{IMEI}", "mqtt_group_id": "group1", "mqtt_topic_sub": [ "FAST/ELS/{IMEI}/command", "FAST/ELS/{IMEI}/config", "FAST/ELS/{IMEI}/ota", "FAST/ELS/group/{GROUP_ID}/command", "FAST/ELS/group/{GROUP_ID}/config", "FAST/ELS/group/{GROUP_ID}/ota", "FAST/ELS/all/command", "FAST/ELS/all/config", "FAST/ELS/all/ota", ], "mqtt_topic_pub": "FAST/ELS/{IMEI}/status", "mqtt_topic_pub_telemetry": "FAST/ELS/{IMEI}/telemetry", "mqtt_topic_pub_status": "FAST/ELS/{IMEI}/status", "mqtt_topic_pub_event": "FAST/ELS/{IMEI}/event", "mqtt_topic_pub_response": "FAST/ELS/{IMEI}/response", "mqtt_qos": 0, "mqtt_keepalive": 60, "network_check_interval": 30, "mqtt_reconnect_interval": 10, "no_msg_timeout": 3600, "telemetry_interval": 300, "telemetry_enable": True, "timezone": 8, "ntp_server": "stdtime.gov.hk", } def get(self, key, default=None): return self.data.get(key, default) def set(self, key, value): if key not in self.data: raise ValueError("Unknown key: " + key) else: try: value = type(self.data[key])(value) # cast to original type except: raise ValueError("Type mismatch for key: " + key) self.data[key] = value def load(self, filename): try: with open(filename, "r") as f: tmp_data = ujson.load(f) for k in tmp_data: if k not in self.data: print_warn("Config load warning: Unknown key:", k) self.data[k] = tmp_data[k] return True except Exception as e: print_error("Config load error:", e) return False def save(self, filename): try: with open(filename, "w") as f: ujson.dump(self.data, f) return True except Exception as e: print_error("Config save error:", e) return False class MqttPayload: # type: str, 'ACK', 'SYS', 'TEST' # seq: int # status: 0=successful, 1=warn, else error # msg: str # details: dict|str def __init__(self, type, seq, code, msg, details=None): self.type = type self.seq = seq self.code = code self.msg = msg self.details = details def __str__(self): return ujson.dumps( { "type": self.type, "seq": self.seq, "code": self.code, "msg": self.msg, "details": self.details, } ) ##### Global Constants/Variables ##### class PreviousGlobals: client = Globals.client if "Globals" in globals() else None mqtt_listen_thread = Globals.mqtt_listen_thread if "Globals" in globals() else None mqtt_tx_thread = Globals.mqtt_tx_thread if "Globals" in globals() else None mqtt_rx_data_lock = Globals.mqtt_rx_data_lock if "Globals" in globals() else None mqtt_tx_data_lock = Globals.mqtt_tx_data_lock if "Globals" in globals() else None class Globals: global g_board global g_wdt global g_fw_filename global FW_FILE_LIST LOG_LEVEL = Constants.C_LOG_LEVEL board = g_board wdt = ( g_wdt if ("g_wdt" in globals()) and (g_wdt is not None) else WDT(30) ) # 30 seconds watchdog fw_filename = g_fw_filename fw_file_list = ( FW_FILE_LIST if "g_fw_file_list" in globals() else ["usr/fw1.py", "usr/fw2.py", "usr/fw3.py", "usr/fw4.py"] ) led_timer = Timer(0) main_fsm_timer = Timer(1) main_fsm_lock = None th_sensor = None config = None thread_running = False mqtt_rx_enable = False # MQTT variables client = PreviousGlobals.client # MQTTClient mqtt_listen_thread = PreviousGlobals.mqtt_listen_thread mqtt_tx_thread = PreviousGlobals.mqtt_tx_thread mqtt_rx_data_lock = PreviousGlobals.mqtt_rx_data_lock mqtt_tx_data_lock = PreviousGlobals.mqtt_tx_data_lock class States: dev_imei = str(modem.getDevImei()) # IMEI dev_fw_ver = str(modem.getDevFwVersion()) # FW Version dev_sn = str(modem.getDevSN()) # Serial Number dev_uname = str(uos.uname()) # Device uname power_on_reason = Power.powerOnReason() power_down_reason = Power.powerDownReason() led_state = 0 network_flag = False mqtt_connected = False reboot_flag = False deadlock_flag = False uptime = 0 fsm_cntr = 0 mqtt_reconnect_cntr = 0 led_cntr = 0 pbtn1_cntr = 0 pbtn2_cntr = 0 extint_pbtn1 = None extint_pbtn2 = None extint_chg_state = None extint_pg_inv_state = None no_msg_timeout_cntr = 0 telemetry_cntr = 0 force_telemetry = False system_error = False network_error = False mqtt_error = False system_warn = False network_warn = False mqtt_warn = False mqtt_rx_que = [] mqtt_tx_que = [] test_id = None test_cmd_start = False test_cmd_stop = False test_enabled = False test_duration = 0 test_time_cntr = 0 test_report_interval = 10 ota_filename = None ota_file_handle = None ota_file_checksum = None ota_transfer_offset = 0 ota_transfer_size = 0 @staticmethod def reset(): States.led_state = 0 States.network_flag = False States.mqtt_connected = False States.reboot_flag = False States.deadlock_flag = False States.uptime = 0 States.fsm_cntr = 0 States.mqtt_reconnect_cntr = 0 States.led_cntr = 0 States.pbtn1_cntr = 0 States.pbtn2_cntr = 0 States.extint_pbtn1 = None States.extint_pbtn2 = None States.extint_chg_state = None States.extint_pg_inv_state = None States.no_msg_timeout_cntr = 0 States.telemetry_cntr = 0 States.force_telemetry = False States.system_error = False States.network_error = False States.mqtt_error = False States.system_warn = False States.network_warn = False States.mqtt_warn = False # Clear MQTT queues if Globals.mqtt_rx_data_lock: Globals.mqtt_rx_data_lock.acquire() States.mqtt_rx_que.clear() if Globals.mqtt_rx_data_lock: Globals.mqtt_rx_data_lock.release() if Globals.mqtt_tx_data_lock: Globals.mqtt_tx_data_lock.acquire() States.mqtt_tx_que.clear() if Globals.mqtt_tx_data_lock: Globals.mqtt_tx_data_lock.release() States.test_cmd_start = False States.test_cmd_stop = False States.test_enabled = False States.test_duration = 0 States.test_time_cntr = 0 States.test_report_interval = 10 States.ota_filename = None States.ota_file_handle = None States.ota_file_checksum = None States.ota_transfer_offset = 0 States.ota_transfer_size = 0 class THSensor: MODEL = "HDC1080" I2C_SLAVE_ADDR = 0x40 # I2C device address REG_TEMP = 0x00 # temperature register REG_HUMI = 0x01 # humidity register REG_MANUF_ID = 0xFE # manufacturer ID register REG_DEVICE_ID = 0xFF # device ID register MANUF_ID_VERIFY = 0x5449 # manufacturer ID verification value DEVICE_ID_VERIFY = 0x1050 # device ID verification value SENSOR_DELAY = 16 # sensor measurement delay in milliseconds def __init__(self, i2c): self.i2c = i2c def check_communication(self): manuf_id = self.get_manuf_id() device_id = self.get_device_id() return (manuf_id == THSensor.MANUF_ID_VERIFY) and ( device_id == THSensor.DEVICE_ID_VERIFY ) def read_register(self, reg, delay_ms=0): buffer = bytearray(2) self.i2c.read(THSensor.I2C_SLAVE_ADDR, bytearray([reg]), 1, buffer, 2, delay_ms) return buffer[0] << 8 | buffer[1] def get_manuf_id(self): return self.read_register(THSensor.REG_MANUF_ID) def get_device_id(self): return self.read_register(THSensor.REG_DEVICE_ID) def get_temperature(self): try: raw_temp = self.read_register(THSensor.REG_TEMP, THSensor.SENSOR_DELAY) temperature = (raw_temp / 65536) * 165 - 40 except: return 0.0 return temperature def get_humidity(self): try: raw_humi = self.read_register(THSensor.REG_HUMI, THSensor.SENSOR_DELAY) humidity = (raw_humi / 65536) * 100 except: return 0.0 return humidity def get_ibatt_voltage(): return Power.getVbatt() def get_ibatt_chg_state(): return 1 - Globals.board.chg_stat.read() def get_ibatt_pg_state(): return 1 - Globals.board.chg_pg_inv.read() def get_ebatt_voltage(): adc_raw = Globals.board.adc.read(0) # mV bv_ctl1 = Globals.board.bv_crl1.read() bv_ctl2 = Globals.board.bv_crl2.read() r_f = 120 r_1 = 18 r_2 = 27 r_3 = 10 r_eqv = 0 if bv_ctl1 and bv_ctl2: # (1, 1) r_eqv = 1 / (1 / r_1 + 1 / r_2 + 1 / r_3) elif bv_ctl1 and (not bv_ctl2): # (1, 0) r_eqv = 1 / (1 / r_1 + 1 / r_2) elif (not bv_ctl1) and bv_ctl2: # (0, 1) r_eqv = 1 / (1 / r_1 + 1 / r_3) else: # (0, 0) r_eqv = r_1 v_batt = adc_raw * (r_eqv + r_f) / r_eqv return int(v_batt) # mV def get_ebatt_current(): adc_raw = Globals.board.adc.read(1) # mV tare_val = 600 amp_gain = 50 r_shunt = 0.01 total_gain = amp_gain * r_shunt # V per A current_mA = (adc_raw - tare_val) / total_gain return round(current_mA, 2) ##### Logging Functions ##### def print_newline(): print() def print_trace(*args): if Globals.LOG_LEVEL <= 0: print("[TRACE]", *args) def print_debug(*args): if Globals.LOG_LEVEL <= 1: print("[DEBUG]", *args) def print_info(*args): if Globals.LOG_LEVEL <= 2: print("[INFO]", *args) def print_warn(*args): if Globals.LOG_LEVEL <= 3: print("[WARN]", *args) def print_error(*args): if Globals.LOG_LEVEL <= 4: print("[ERROR]", *args) ##### App functions ##### def check_network(): r = net.getConfig() if r != -1 and r[1] != True: print_debug("Enabling roaming...") net.setConfig(r[0], True) r = net.getSignal() if r != -1 and len(r) >= 2: gw = r[0] lte = r[1] if gw[0] == 99 and lte[0] == 99: return False ret = False r = net.getState() if r != -1 and len(r) >= 2: voice = r[0] data = r[1] if data[0] == 1 or data[0] == 5: print_trace( "Network connected. Type =", data[0], "GW signal =", gw[0], "LTE signal =", lte[0], ) States.network_warn = True if lte[0] < -80 else False States.network_error = False ret = True else: States.network_error = True if States.network_flag and not ret: print_trace("Network disconnected.") return ret ##### MQTT Process Functions ##### def process_mqtt_message_topic_command_relay(rsp_type, action, msg_data): if action == "on" or action == "enable": Globals.board.relay_en.write(1) elif action == "off" or action == "disable": Globals.board.relay_en.write(0) elif action == "toggle" or action == "flip": Globals.board.relay_en.write(1 - Globals.board.relay_en.read()) else: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Unknown RELAY action: " + action ) return MqttPayload(rsp_type, 0, 0, "OK", {"RELAY": Globals.board.relay_en.read()}) def process_mqtt_message_topic_command_adc(rsp_type, action, msg_data): if action == "read_0": adc_value = Globals.board.adc.read(0) return MqttPayload(rsp_type, 0, 0, "OK", {"ADC0": adc_value}) if action == "read_1": adc_value = Globals.board.adc.read(1) return MqttPayload(rsp_type, 0, 0, "OK", {"ADC1": adc_value}) if action == "read_all": adc_value_0 = Globals.board.adc.read(0) adc_value_1 = Globals.board.adc.read(1) return MqttPayload( rsp_type, 0, 0, "OK", {"ADC0": adc_value_0, "ADC1": adc_value_1} ) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "Unknown ADC action: " + action) def process_mqtt_message_topic_command_echo(rsp_type, action, msg_data): if "message" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Missing 'message' field for ECHO command." ) echo_message = msg_data["message"] return MqttPayload(rsp_type, 0, 0, "OK", {"message": echo_message}) def process_mqtt_message_topic_command_test_start(rsp_type, msg_data): if "parameters" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "TEST/START requires 'parameters' field." ) parameters = msg_data["parameters"] if ( ("duration" not in parameters) or ("report_interval" not in parameters) or ("test_id" not in parameters) ): return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "TEST/START requires 'duration', 'report_interval', and 'test_id' fields in parameters.", ) try: duration = int(parameters["duration"]) report_interval = int(parameters["report_interval"]) States.test_duration = duration States.test_report_interval = report_interval States.test_cmd_start = True States.test_id = parameters.get("test_id") return MqttPayload( rsp_type, 0, 0, "OK", { "duration": duration, "report_interval": report_interval, "test_id": States.test_id, }, ) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Invalid parameters for TEST/START. Error: " + str(e), ) def process_mqtt_message_topic_command_test_stop(rsp_type): States.test_cmd_stop = True States.test_id = None return MqttPayload(rsp_type, 0, 0, "OK", "Test stopping.") def process_mqtt_message_topic_command_test(rsp_type, action, msg_data): if action == "start": return process_mqtt_message_topic_command_test_start(rsp_type, msg_data) if action == "stop": return process_mqtt_message_topic_command_test_stop(rsp_type) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "Unknown TEST action: " + action) def process_mqtt_message_topic_command_system_reboot(rsp_type, action, msg_data): States.reboot_flag = True return MqttPayload(rsp_type, 0, 0, "OK", "Rebooting device...") def process_mqtt_message_topic_command_system_deadlock(rsp_type, action, msg_data): States.deadlock_flag = True return MqttPayload(rsp_type, 0, 0, "OK", "Entering deadlock...") def process_mqtt_message_topic_command_system_set_fw_good(rsp_type, action, msg_data): if Globals.fw_filename is not None: try: with open(Globals.fw_filename + ".good", "w") as f: pass except Exception as e: pass return MqttPayload( rsp_type, 0, 0, "OK", "Firmware marked as good: " + Globals.fw_filename ) return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "No firmware filename available to mark as good." ) def process_mqtt_message_topic_command_system_set_fw_bad(rsp_type, action, msg_data): if Globals.fw_filename is not None: remove_files(Globals.fw_filename + ".good") return MqttPayload( rsp_type, 0, 0, "OK", "Firmware marked as bad: " + Globals.fw_filename ) return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "No firmware filename available to mark as bad." ) def process_mqtt_message_topic_command_system_fw_cntr(rsp_type, action, msg_data): if Globals.fw_filename is not None: cntr = -1 try: with open(Globals.fw_filename + ".cntr", "r") as f: cntr = int(f.readline().strip()) except Exception as e: pass return MqttPayload( rsp_type, 0, 0, "OK", {"FW_FILENAME": Globals.fw_filename, "BOOT_CNTR": cntr}, ) return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "No firmware filename available for reboot counter.", ) def process_mqtt_message_topic_command_system_status(rsp_type, action, msg_data): sys_error = States.system_error sys_warn = States.system_warn network_flag = States.network_flag mqtt_flag = States.mqtt_connected fw_ver = Constants.C_FW_VERSION fw_date = Constants.C_FW_DATE fw_filename = Globals.fw_filename fw_md5sum = get_file_md5_checksum(fw_filename) return MqttPayload( rsp_type, 0, 0, "OK", { "IMEI": States.dev_imei, "DEV_FW_VER": States.dev_fw_ver, "DEV_SN": States.dev_sn, "DEV_UNAME": States.dev_uname, "IBATT": {"VOLTAGE": get_ibatt_voltage(), "CHG": get_ibatt_chg_state()}, "PG": get_ibatt_pg_state(), "SYS_ERR": sys_error, "SYS_WARN": sys_warn, "NET_CONNECTED": network_flag, "MQTT_CONNECTED": mqtt_flag, "FW_VERSION": fw_ver, "FW_DATE": fw_date, "FW_FILENAME": fw_filename, "FW_MD5SUM": fw_md5sum, "USR_FS_STATS": uos.statvfs("/usr"), "BAK_FS_STATS": uos.statvfs("/bak"), "MEM_FREE": gc.mem_free(), "UPTIME": States.uptime, "POWER_ON_REASON": States.power_on_reason, "POWER_DOWN_REASON": States.power_down_reason, "DATETIME": str(RTC().datetime()), }, ) def process_mqtt_message_topic_command_system_clr_sys_err(rsp_type, action, msg_data): States.system_error = False return MqttPayload(rsp_type, 0, 0, "OK", "System errors cleared.") def process_mqtt_message_topic_command_system_clr_sys_warn(rsp_type, action, msg_data): States.system_warn = False return MqttPayload(rsp_type, 0, 0, "OK", "System warnings cleared.") def process_mqtt_message_topic_command_system_touch_file(rsp_type, action, msg_data): if "filename" not in msg_data or not msg_data["filename"]: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Missing 'filename' field for TOUCH_FILE command.", ) filename = msg_data["filename"] try: with open(filename, "a"): pass return MqttPayload(rsp_type, 0, 0, "OK", "File touched/created: " + filename) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to touch/create file: " + filename + ". Error: " + str(e), ) def process_mqtt_message_topic_command_system_remove_file(rsp_type, action, msg_data): protect_files = [Globals.fw_filename.split("/")[-1], "main.py", "brd.py"] if "filename" not in msg_data or not msg_data["filename"]: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Missing 'filename' field for REMOVE_FILE command.", ) filename = msg_data["filename"] if not file_exists(filename): return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "File does not exist: " + filename ) if ("force" not in msg_data) or not msg_data["force"]: for file in protect_files: if filename.endswith(file): return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Cannot remove protected file: " + filename, ) try: uos.remove(filename) return MqttPayload(rsp_type, 0, 0, "OK", "File removed: " + filename) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to remove file: " + filename + ". Error: " + str(e), ) def process_mqtt_message_topic_command_system_list_file(rsp_type, action, msg_data): file_list = [] try: for filename in uos.listdir("/usr"): file_info = uos.stat("/usr/" + filename) file_list.append({"filename": "/usr/" + filename, "size": file_info[6]}) for filename in uos.listdir("/bak"): file_info = uos.stat("/bak/" + filename) file_list.append({"filename": "/bak/" + filename, "size": file_info[6]}) return MqttPayload(rsp_type, 0, 0, "OK", file_list) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to list files. Error: " + str(e) ) def process_mqtt_message_topic_command_system(rsp_type, action, msg_data): if action == "reboot": return process_mqtt_message_topic_command_system_reboot( rsp_type, action, msg_data ) if action == "deadlock": return process_mqtt_message_topic_command_system_deadlock( rsp_type, action, msg_data ) if action == "set_fw_good": return process_mqtt_message_topic_command_system_set_fw_good( rsp_type, action, msg_data ) if action == "set_fw_bad": return process_mqtt_message_topic_command_system_set_fw_bad( rsp_type, action, msg_data ) if action == "get_fw_cntr": return process_mqtt_message_topic_command_system_fw_cntr( rsp_type, action, msg_data ) if action == "status": return process_mqtt_message_topic_command_system_status( rsp_type, action, msg_data ) if action == "clr_sys_err": return process_mqtt_message_topic_command_system_clr_sys_err( rsp_type, action, msg_data ) if action == "clr_sys_warn": return process_mqtt_message_topic_command_system_clr_sys_warn( rsp_type, action, msg_data ) if action == "touch_file": return process_mqtt_message_topic_command_system_touch_file( rsp_type, action, msg_data ) if action == "remove_file": return process_mqtt_message_topic_command_system_remove_file( rsp_type, action, msg_data ) if action == "list_file": return process_mqtt_message_topic_command_system_list_file( rsp_type, action, msg_data ) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "Unknown SYS action: " + action) def process_mqtt_message_topic_command_ebatt(rsp_type, action, msg_data): voltage = get_ebatt_voltage() current = get_ebatt_current() return MqttPayload(rsp_type, 0, 0, "OK", {"VOLTAGE": voltage, "CURRENT": current}) def process_mqtt_message_topic_command_sensor_get_temp(rsp_type): if Globals.th_sensor is not None: temperature = Globals.th_sensor.get_temperature() return MqttPayload(rsp_type, 0, 0, "OK", {"TEMP": "{:.2f}".format(temperature)}) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "No temperature sensor detected.") def process_mqtt_message_topic_command_sensor_get_humi(rsp_type): if Globals.th_sensor is not None: humidity = Globals.th_sensor.get_humidity() return MqttPayload(rsp_type, 0, 0, "OK", {"HUMI": "{:.2f}".format(humidity)}) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "No humidity sensor detected.") def process_mqtt_message_topic_command_sensor_get_all(rsp_type): if Globals.th_sensor is not None: temperature = Globals.th_sensor.get_temperature() humidity = Globals.th_sensor.get_humidity() return MqttPayload( rsp_type, 0, 0, "OK", {"TEMP": "{:.2f}".format(temperature), "HUMI": "{:.2f}".format(humidity)}, ) return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "No temperature/humidity sensor detected." ) def process_mqtt_message_topic_command_sensor(rsp_type, action, msg_data): if action == "get_temp": return process_mqtt_message_topic_command_sensor_get_temp(rsp_type) if action == "get_humi": return process_mqtt_message_topic_command_sensor_get_humi(rsp_type) if action == "get_all": return process_mqtt_message_topic_command_sensor_get_all(rsp_type) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "Unknown SENSOR action: " + action) def process_mqtt_message_topic_command_help(rsp_type, action, msg_data): return MqttPayload( rsp_type, 0, 0, "OK", "Supported commands: RELAY, ADC, PING, ECHO, TEST, SYSTEM, EBATT, SENSOR, NOOP.", ) def process_mqtt_message_topic_command(msg_data): if "type" not in msg_data: return MqttPayload( "N/A", 0, 2, "CMD_ERROR", "Missing 'type' field in topic command message." ) cmd_type = msg_data["type"].lower() action = ( msg_data["action"].lower().replace("-", "_") if "action" in msg_data else None ) rsp_type = cmd_type + "/" + action if action is not None else cmd_type if cmd_type == "relay": return process_mqtt_message_topic_command_relay(rsp_type, action, msg_data) if cmd_type == "adc": return process_mqtt_message_topic_command_adc(rsp_type, action, msg_data) if cmd_type == "echo": return process_mqtt_message_topic_command_echo(rsp_type, action, msg_data) if cmd_type == "test": return process_mqtt_message_topic_command_test(rsp_type, action, msg_data) if cmd_type == "system": return process_mqtt_message_topic_command_system(rsp_type, action, msg_data) if cmd_type == "ebatt": return process_mqtt_message_topic_command_ebatt(rsp_type, action, msg_data) if cmd_type == "sensor": return process_mqtt_message_topic_command_sensor(rsp_type, action, msg_data) if cmd_type == "help": return process_mqtt_message_topic_command_help(rsp_type, action, msg_data) if cmd_type == "ping": States.force_telemetry = True return MqttPayload("pong", 0, 0, "OK") if cmd_type == "noop": return MqttPayload(rsp_type, 0, 0, "OK", "No operation performed.") return MqttPayload("N/A", 0, 2, "CMD_ERROR", "Unknown command type: " + cmd_type) def process_mqtt_message_topic_config_get(rsp_type, msg_data): if "key" not in msg_data: return MqttPayload(rsp_type, 0, 0, "OK", ujson.dumps(Globals.config.data)) cfg_key = msg_data["key"] cfg_value = Globals.config.get(cfg_key, None) if cfg_value is not None: return MqttPayload(rsp_type, 0, 0, "OK", {cfg_key: cfg_value}) else: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Unknown CONFIG key: " + cfg_key ) def process_mqtt_message_topic_config_set(rsp_type, msg_data): if "key" not in msg_data or "value" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "config/set requires 'key' and 'value' fields." ) cfg_key = msg_data["key"] cfg_value = msg_data["value"] if cfg_key == "": return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "CONFIG/SET requires a valid key." ) try: Globals.config.set(cfg_key, cfg_value) return MqttPayload(rsp_type, 0, 0, "OK", {cfg_key: Globals.config.get(cfg_key)}) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to set CONFIG key: " + cfg_key + ". Error: " + str(e), ) def process_mqtt_message_topic_config_reset(rsp_type, msg_data): Globals.config = Config() return MqttPayload(rsp_type, 0, 0, "OK", "Configuration reset to defaults.") def process_mqtt_message_topic_config_save(rsp_type, msg_data): filename = DEFAULT_CONFIG_FILE if "filename" in msg_data: filename = msg_data["filename"] if not filename.lower().endswith(".json"): return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Requires a .json filename." ) if not filename.startswith("usr/"): filename = "usr/" + filename if Globals.config.save(filename): return MqttPayload(rsp_type, 0, 0, "OK", "Saved to: " + filename) else: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to save config to: " + filename ) def process_mqtt_message_topic_config_load(rsp_type, msg_data): filename = DEFAULT_CONFIG_FILE if "filename" in msg_data: filename = msg_data["filename"] if not filename.lower().endswith(".json"): return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Requires a .json filename." ) if not filename.startswith("usr/"): filename = "usr/" + filename if Globals.config.load(filename): return MqttPayload(rsp_type, 0, 0, "OK", "Loaded from: " + filename) return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to load config from: " + filename ) def process_mqtt_message_topic_config_remove(rsp_type, msg_data): filename = DEFAULT_CONFIG_FILE if "filename" in msg_data: filename = msg_data["filename"] if not filename.lower().endswith(".json"): return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Requires a .json filename." ) if not filename.startswith("usr/"): filename = "usr/" + filename try: uos.remove(filename) return MqttPayload(rsp_type, 0, 0, "OK", "Removed config: " + filename) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to remove file: " + filename + ". Error: " + str(e), ) def process_mqtt_message_topic_config(msg_data): cmd_type = "config" if "action" not in msg_data: return MqttPayload( cmd_type, 0, 2, "CMD_ERROR", "Missing 'action' field in config message." ) action = msg_data["action"].lower() rsp_type = cmd_type + "/" + action if action is not None else cmd_type if action == "get": return process_mqtt_message_topic_config_get(rsp_type, msg_data) if action == "set": return process_mqtt_message_topic_config_set(rsp_type, msg_data) if action == "reset": return process_mqtt_message_topic_config_reset(rsp_type, msg_data) if action == "save": return process_mqtt_message_topic_config_save(rsp_type, msg_data) if action == "load": return process_mqtt_message_topic_config_load(rsp_type, msg_data) if action == "remove": return process_mqtt_message_topic_config_remove(rsp_type, msg_data) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "Unknown CONFIG action: " + action) def get_ota_fw_filename(): usable_candidates = [ candidate for candidate in Globals.fw_file_list if candidate != Globals.fw_filename ] if len(usable_candidates) == 0: return None # 1) Check for candidates that do not exist for candidate in usable_candidates: if not file_exists(candidate): return candidate # 2) Check for candidates that lack .cntr file (never booted) for candidate in usable_candidates: if not file_exists(candidate + ".cntr"): return candidate # 3) Check for candidates that lack .good file (not marked good) for candidate in usable_candidates: if not file_exists(candidate + ".good"): return candidate # 4) Check for candidates that lack .last file (not last used) for candidate in usable_candidates: if not file_exists(candidate + ".last"): return candidate # 5) Return candidate with lowest reboot counter min_cntr = 0 selected_candidate = usable_candidates[0] for candidate in usable_candidates: cntr = 0 try: with open(candidate + ".cntr", "r") as f: cntr = int(f.readline().strip()) except: cntr = 0 if min_cntr is None or cntr < min_cntr: min_cntr = cntr selected_candidate = candidate return selected_candidate def process_mqtt_message_topic_ota_upgrade_from_url(rsp_type, action, msg_data): if "url" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/upgrade-from-url requires 'url' field." ) if "checksum" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/upgrade-from-url requires 'checksum' field.", ) ota_url = msg_data["url"] response = request.get(ota_url, timeout=60) if response.status_code != 200: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to download OTA firmware from URL: " + ota_url + ". HTTP Status Code: " + str(response.status_code), ) ota_filename = get_ota_fw_filename() print_info("Selected OTA firmware filename:", ota_filename) if ota_filename is None: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "No available firmware filename for OTA upgrade.", ) # Clean up any previous OTA/boot markers remove_files( [ ota_filename, ota_filename + ".upd", ota_filename + ".good", ota_filename + ".cntr", ota_filename + ".last", ] ) try: with open(ota_filename, "wb") as f: for chunk in response.content: if isinstance(chunk, str): f.write(chunk.encode("utf-8")) else: f.write(chunk) checksum = get_file_md5_checksum(ota_filename) if checksum != msg_data["checksum"]: remove_files(ota_filename) return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Checksum mismatch for downloaded OTA firmware. Expected: " + msg_data["checksum"] + ", Received: " + checksum, ) with open(ota_filename + ".upd", "w") as f: pass return MqttPayload( rsp_type, 0, 0, "OK", "OTA firmware downloaded and saved to: " + ota_filename + ", checksum: " + checksum + ". Reboot to apply update.", ) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to save OTA firmware to file: " + ota_filename + ". Error: " + str(e), ) def process_mqtt_message_topic_ota_upgrade_from_payload(rsp_type, action, msg_data): if "checksum" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/upgrade-from-payload requires 'checksum' field.", ) if "size" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/upgrade-from-payload requires 'size' field.", ) ota_filename = get_ota_fw_filename() print_info("Selected OTA firmware filename:", ota_filename) if ota_filename is None: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "No available firmware filename for OTA upgrade.", ) # Close any previous OTA file handle if States.ota_file_handle is not None: try: States.ota_file_handle.close() except: pass if States.ota_filename is not None: remove_files(States.ota_filename) States.ota_file_handle = None States.ota_filename = ota_filename try: # Clean up any previous OTA/boot markers remove_files( [ ota_filename, ota_filename + ".upd", ota_filename + ".good", ota_filename + ".cntr", ota_filename + ".last", ] ) States.ota_file_checksum = msg_data["checksum"] States.ota_transfer_offset = 0 States.ota_transfer_size = msg_data["size"] return MqttPayload( rsp_type, 0, 0, "OK", "Starting OTA to firmware: " + ota_filename + ", checksum: " + States.ota_file_checksum, ) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to open OTA file for writing: " + ota_filename + ". Error: " + str(e), ) def process_mqtt_message_topic_ota_write_from_payload(rsp_type, action, msg_data): if States.ota_filename is None: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/WRITE_FILE called without starting transaction.", ) if "data" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/WRITE_FILE requires 'data' field." ) if "offset" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/WRITE_FILE requires 'offset' field." ) if "size" not in msg_data: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA/WRITE_FILE requires 'size' field." ) if States.ota_file_handle is None: try: States.ota_file_handle = open(States.ota_filename, "w") except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to open OTA file for writing: " + States.ota_filename + ". Error: " + str(e), ) try: ota_data_b64 = msg_data["data"] ota_offset = int(msg_data["offset"]) ota_size = int(msg_data["size"]) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Invalid parameters for OTA/WRITE_FILE. Error: " + str(e), ) if ota_offset != States.ota_transfer_offset: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA data chunk offset mismatch. Expected: " + str(States.ota_transfer_offset) + ", Received: " + str(ota_offset), ) try: ota_data = ubinascii.a2b_base64(ota_data_b64) States.ota_file_handle.write(ota_data) States.ota_transfer_offset += ota_size except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to write OTA data chunk. Error: " + str(e), ) if "last" in msg_data and msg_data["last"]: # Finalize OTA if this is the last chunk try: States.ota_file_handle.close() States.ota_file_handle = None if not file_exists(States.ota_filename): filename = States.ota_filename States.ota_filename = None States.ota_file_checksum = None States.ota_transfer_offset = 0 States.ota_transfer_size = 0 return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to write file: " + filename + ". Please check available space.", ) try: checksum = get_file_md5_checksum(States.ota_filename) print_info("Calculated OTA file checksum:", checksum) if checksum != States.ota_file_checksum: remove_files(States.ota_filename) expected_checksum = States.ota_file_checksum States.ota_filename = None States.ota_file_checksum = None States.ota_transfer_offset = 0 States.ota_transfer_size = 0 return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "OTA file checksum mismatch. Expected: " + expected_checksum + ", Calculated: " + checksum, ) except Exception as e: remove_files(States.ota_filename) return MqttPayload( rsp_type, 0, 0, "OK", "OTA update finalized, but failed to calculate checksum. File removed: " + States.ota_filename + ". Error: " + str(e), ) try: with open(States.ota_filename + ".upd", "w") as f: pass except Exception as e: return MqttPayload( rsp_type, 0, 0, "OK", "OTA update finalized, but failed to create .upd marker file. File saved: " + States.ota_filename + ". Error: " + str(e), ) return MqttPayload( rsp_type, 0, 0, "OK", "OTA update finalized. File saved: " + States.ota_filename + ", checksum: " + States.ota_file_checksum + ". Reboot to apply update.", ) except Exception as e: return MqttPayload( rsp_type, 0, 2, "CMD_ERROR", "Failed to finalize OTA update. Error: " + str(e), ) return MqttPayload( rsp_type, 0, 0, "OK", "OTA data chunk written successfully, offset: " + str(States.ota_transfer_offset) + ", size: " + str(ota_size), ) def process_mqtt_message_topic_ota(msg_data): cmd_type = "ota" if "action" not in msg_data: return MqttPayload( cmd_type, 0, 2, "CMD_ERROR", "Missing 'action' field in OTA message." ) action = msg_data["action"].lower().replace("-", "_") rsp_type = cmd_type + "/" + action if action is not None else cmd_type if action == "upgrade_from_url": return process_mqtt_message_topic_ota_upgrade_from_url( rsp_type, action, msg_data ) if action == "upgrade_from_payload": return process_mqtt_message_topic_ota_upgrade_from_payload( rsp_type, action, msg_data ) if action == "write_from_payload": return process_mqtt_message_topic_ota_write_from_payload( rsp_type, action, msg_data ) return MqttPayload(rsp_type, 0, 2, "CMD_ERROR", "Unknown OTA action: " + action) ##### MQTT Process Entry ##### def process_mqtt_message(topic, msg): print_debug("Processing MQTT message. Topic:", topic, "Message:", msg) msg_data = None try: msg_data = ujson.loads(msg) except: return MqttPayload( "N/A", 0, 2, "CMD_ERROR", "Invalid JSON format in MQTT message." ) finally: pass if topic.endswith("/command"): return process_mqtt_message_topic_command(msg_data) if topic.endswith("/config"): return process_mqtt_message_topic_config(msg_data) if topic.endswith("/ota"): return process_mqtt_message_topic_ota(msg_data) return MqttPayload("N/A", 0, 2, "CMD_ERROR", "Unknown MQTT topic: " + topic) ##### Callback ##### def task_status(timer): # Uptime States.uptime += 1 # LED flash States.led_cntr += 1 if States.led_cntr % 2 == 0: Globals.board.led_sys.write(1) else: Globals.board.led_sys.write(0) # Update status LEDs Globals.board.led_net.write(States.network_flag) Globals.board.led_err.write( States.system_error or States.network_error or States.mqtt_error ) Globals.board.led_warn.write( States.system_warn or States.network_warn or States.mqtt_warn ) # Push button debounce counter if States.pbtn1_cntr < 10: States.pbtn1_cntr += 1 if States.pbtn2_cntr < 10: States.pbtn2_cntr += 1 def main_fsm_timer_proc(timer): # Network connection if (not States.network_flag) or ( States.fsm_cntr % Globals.config.get("network_check_interval") == 0 ): if check_network(): if not States.network_flag: print_info("Network connection established.") States.mqtt_connected = False States.mqtt_reconnect_cntr = Globals.config.get( "mqtt_reconnect_interval" ) # immediate reconnect States.network_flag = True else: if States.network_flag: print_warn("Network connection lost.") States.network_flag = False States.mqtt_connected = False # MQTT connection if States.network_flag: # MQTT connection if not States.mqtt_connected: States.mqtt_reconnect_cntr += 1 if States.mqtt_reconnect_cntr > Globals.config.get( "mqtt_reconnect_interval" ): States.mqtt_reconnect_cntr = 0 if mqtt_connect(): States.mqtt_connected = True # Test sequence processing if States.test_enabled: if (States.test_report_interval > 0) and ( (States.test_time_cntr == 0) or (States.test_time_cntr % States.test_report_interval == 0) or (States.test_time_cntr == States.test_duration) ): sq = int(States.test_time_cntr / States.test_report_interval) mqtt_publish( Constants.C_MQTT_TOPIC_RESPONSE, MqttPayload( "test", sq, 0, "UPDATE", { "test_id": States.test_id, "elapsed": States.test_time_cntr, "ADC0": Globals.board.adc.read(0), "ADC1": Globals.board.adc.read(1), "EBATT": { "VOLTAGE": get_ebatt_voltage(), "CURRENT": get_ebatt_current(), }, "IBATT": { "VOLTAGE": get_ibatt_voltage(), "CHG": get_ibatt_chg_state(), }, "SENSOR": { "TEMP": ( "{:.2f}".format(Globals.th_sensor.get_temperature()) if Globals.th_sensor is not None else None ), "HUMI": ( "{:.2f}".format(Globals.th_sensor.get_humidity()) if Globals.th_sensor is not None else None ), }, "PG": get_ibatt_pg_state(), "RELAY": Globals.board.relay_en.read(), }, ), ) if States.test_time_cntr >= States.test_duration: mqtt_publish( Constants.C_MQTT_TOPIC_RESPONSE, MqttPayload( "test", 0, 0, "COMPLETE", {"elapsed": States.test_duration, "test_id": States.test_id}, ), ) States.test_cmd_stop = True States.test_time_cntr += 1 if States.test_cmd_start: States.test_cmd_start = False Globals.board.led_run.write(1) # indicate test running States.test_enabled = True States.test_time_cntr = 0 Globals.board.relay_en.write(1) # for testing, turn on relay at test start mqtt_publish( Constants.C_MQTT_TOPIC_RESPONSE, MqttPayload( "test", 0, 0, "STARTED", { "test_id": States.test_id, "duration": States.test_duration, "report_interval": States.test_report_interval, }, ), ) if States.test_cmd_stop: States.test_cmd_stop = False Globals.board.led_run.write(0) # indicate test stopped States.test_enabled = False Globals.board.relay_en.write(0) # for testing, turn off relay at test stop mqtt_publish( Constants.C_MQTT_TOPIC_RESPONSE, MqttPayload( "test", 0, 0, "STOPPED", {"test_id": States.test_id, "elapsed": States.test_time_cntr}, ), ) States.test_id = None States.test_time_cntr = 0 def task_main_fsm(timer): Globals.main_fsm_lock.acquire() try: if timer is not None: States.fsm_cntr += 1 # Timer callback context (1 second interval) main_fsm_timer_proc(timer) # Feed watchdog Globals.wdt.feed() # No message with certain period of time, consider restarting States.no_msg_timeout_cntr += 1 if (Globals.config.get("no_msg_timeout") > 0) and ( States.no_msg_timeout_cntr >= Globals.config.get("no_msg_timeout") ): States.no_msg_timeout_cntr = 0 threshold = str(Globals.config.get("no_msg_timeout")) print_warn( "No MQTT messages received for " + threshold + " seconds. Restarting device..." ) mqtt_publish( Constants.C_MQTT_TOPIC_STATUS, MqttPayload( "system", 0, 1, "WARN", "No MQTT messages received for " + threshold + " seconds. Restarting device...", ), ) # time.sleep(2) # Power.powerRestart() # Telemetry processing if Globals.config.get("telemetry_enable"): States.telemetry_cntr += 1 if ( States.telemetry_cntr >= Globals.config.get("telemetry_interval") or States.force_telemetry ): States.telemetry_cntr = 0 States.force_telemetry = False # Gather telemetry data telemetry_data = { "ADC0": Globals.board.adc.read(0), "ADC1": Globals.board.adc.read(1), "EBATT": { "VOLTAGE": get_ebatt_voltage(), "CURRENT": get_ebatt_current(), }, "IBATT": { "VOLTAGE": get_ibatt_voltage(), "CHG": get_ibatt_chg_state(), }, "SENSOR": { "TEMP": ( "{:.2f}".format(Globals.th_sensor.get_temperature()) if Globals.th_sensor is not None else None ), "HUMI": ( "{:.2f}".format(Globals.th_sensor.get_humidity()) if Globals.th_sensor is not None else None ), }, "PG": get_ibatt_pg_state(), "RELAY": Globals.board.relay_en.read(), "UPTIME": States.uptime, } mqtt_publish( Constants.C_MQTT_TOPIC_TELEMETRY, MqttPayload("telemetry", 0, 0, "OK", telemetry_data), ) else: States.telemetry_cntr = 0 # MQTT RX message processing while len(States.mqtt_rx_que) > 0: Globals.mqtt_rx_data_lock.acquire() topic, msg = States.mqtt_rx_que.pop(0) Globals.mqtt_rx_data_lock.release() States.no_msg_timeout_cntr = 0 # Process message payload = None try: payload = process_mqtt_message(topic, msg) except Exception as e: print_error("MQTT message processing error:", e) payload = MqttPayload( "N/A", 0, 2, "CMD_ERROR", "Internal error processing MQTT message:" + str(e), ) States.mqtt_error = True mqtt_publish( Constants.C_MQTT_TOPIC_RESPONSE, payload, ) except Exception as e: print_error("Main FSM error:", e) States.system_error = True Globals.main_fsm_lock.release() if States.reboot_flag: States.reboot_flag = False print_warn("System rebooting as requested...") time.sleep(2) Power.powerRestart() if States.deadlock_flag: States.deadlock_flag = False print_warn("System deadlock as requested, waiting watchdog to reboot") while True: time.sleep(1) # External interrupt service routine def isr_extint(args): pin = args[0] is_rising = args[1] == 0 if pin == BoardPinDef.PBTN1: if States.pbtn1_cntr <= 1: return print_info("Push button 1 pressed") Globals.board.relay_en.write(1 - Globals.board.relay_en.read()) States.pbtn1_cntr = 0 mqtt_publish( Constants.C_MQTT_TOPIC_EVENT, MqttPayload("interrupt", 0, 0, "PBTN1", "Pbtn1 pressed"), ) return if pin == BoardPinDef.PBTN2: if States.pbtn2_cntr <= 1: return print_info("Push button 2 pressed") States.pbtn2_cntr = 0 mqtt_publish( Constants.C_MQTT_TOPIC_EVENT, MqttPayload("interrupt", 0, 0, "PBTN2", "Pbtn2 pressed"), ) if Globals.board.pbtn1.read() == 0: # for testing: remove firmware file when there is deadlock print_warn("Removing current firmware for testing purposes...") remove_files(Globals.fw_filename) return if pin == BoardPinDef.CHG_STAT: print_info("Charger status changed. State:", is_rising) mqtt_publish( Constants.C_MQTT_TOPIC_EVENT, MqttPayload( "interrupt", 0, 0, "CHG_STAT_" + str(int(not is_rising)), "Charger status changed. Charging: " + str(not is_rising), ), ) return if pin == BoardPinDef.CHG_PG_INV: print_info("Power-good status changed. State:", is_rising) mqtt_publish( Constants.C_MQTT_TOPIC_EVENT, MqttPayload( "interrupt", 0, 0, "CHG_PG_" + str(int(not is_rising)), "Power-good status changed. Power Good: " + str(not is_rising), ), ) return # MQTT message callback def mqtt_msg_callback(topic, msg): if not Globals.mqtt_rx_enable: return Globals.mqtt_rx_data_lock.acquire() try: topic_str = topic.decode("utf-8") msg_str = msg.decode("utf-8") States.mqtt_rx_que.append((topic_str, msg_str)) except Exception as e: print_error("MQTT message callback error:", e) States.mqtt_error = True Globals.mqtt_rx_data_lock.release() print_debug("MQTT message received. Topic:", topic, "Message:", msg) task_main_fsm(None) # process immediately # MQTT error callback def mqtt_error_callback(err): if not Globals.mqtt_rx_enable: return print_error("MQTT error:", err) States.mqtt_error = True # if err == 'reconnect_success': # States.mqtt_connected = False ##### MQTT functions ##### def mqtt_connect(): if Globals.client is None: return False try: Globals.client.connect() print_info("MQTT connected to server:", Globals.config.get("mqtt_server")) mqtt_subscribe() mqtt_publish( Constants.C_MQTT_TOPIC_STATUS, MqttPayload( "system", 0, 0, "CONNECTED", { "DEV_ID": Globals.config.get("mqtt_client_id"), "IMEI": States.dev_imei, "POWER_ON_REASON": States.power_on_reason, "POWER_DOWN_REASON": States.power_down_reason, }, ), ) except Exception as e: print_error("MQTT connection error:", e) return False return True def mqtt_disconnect(): if Globals.client is None: return True try: Globals.client.disconnect() print_info("MQTT disconnected from server.") except Exception as e: print_error("MQTT disconnection error:", e) return False Globals.client = None return True def mqtt_listen_func(): print_info("MQTT listen thread started.") state = 0 while state == 0: _, state = checkNet.waitNetworkReady(10) print_debug("MQTT listen thread entered main loop.") while Globals.thread_running: try: if (Globals.client is None) or (not States.mqtt_connected): time.sleep(1) continue Globals.client.wait_msg() time.sleep(0.1) except Exception as e: print_error("MQTT check message error:", e) States.mqtt_error = True print_info("MQTT listen thread exits.") def mqtt_tx_func(): print_info("MQTT TX thread started.") state = 0 while state == 0: stage, state = checkNet.waitNetworkReady(10) print_debug("MQTT TX thread entered main loop.") while Globals.thread_running: if len(States.mqtt_tx_que) == 0: time.sleep(0.1) continue Globals.mqtt_tx_data_lock.acquire() topic, msg = States.mqtt_tx_que.pop(0) Globals.mqtt_tx_data_lock.release() try: Globals.client.publish(topic, msg, qos=Globals.config.get("mqtt_qos")) print_debug("MQTT message published. Topic:", topic, "Message:", msg) except Exception as e: print_error("MQTT publish error:", e) States.mqtt_error = True print_info("MQTT TX thread exits.") def get_subs_data(): return { "DEV_ID": Globals.config.get("mqtt_client_id"), "GROUP_ID": Globals.config.get("mqtt_group_id"), "IMEI": States.dev_imei, } def mqtt_subscribe(mqtt_topic=None): topic_list = mqtt_topic if topic_list is None: topic_list = Globals.config.get("mqtt_topic_sub") if not isinstance(topic_list, list): topic_list = [topic_list] ret = True subs_data = get_subs_data() for topic in topic_list: resolved_topic = resolve_variable(topic, subs_data) try: Globals.client.subscribe(resolved_topic, qos=Globals.config.get("mqtt_qos")) print_info("MQTT subscribed to topic:", resolved_topic) except Exception as e: print_error("MQTT subscribe error:", e) ret = False return ret def resolve_variable(s, data): for subs_key in data: subs_str = "{" + subs_key + "}" if subs_str in s: s = s.replace(subs_str, data[subs_key]) return s def mqtt_publish(type_str, data): topic = None if type_str is not None: topic = Globals.config.get("mqtt_topic_pub_" + type_str, None) if topic is None: # default topic topic = Globals.config.get("mqtt_topic_pub") if topic is None: print_error("Cannot determine topic") return False subs_data = get_subs_data() for subs_key in subs_data: subs_str = "{" + subs_key + "}" if subs_str in topic: topic = topic.replace(subs_str, subs_data[subs_key]) if isinstance(data, MqttPayload): data = data.__str__() Globals.mqtt_tx_data_lock.acquire() States.mqtt_tx_que.append((topic, data)) Globals.mqtt_tx_data_lock.release() return True ##### Main functions ##### def reset(): # stop timers Globals.led_timer.stop() Globals.main_fsm_timer.stop() # reset global states States.reset() def init(): # external interrupts States.extint_pbtn1 = machine.ExtInt( BoardPinDef.PBTN1, ExtInt.IRQ_FALLING, ExtInt.PULL_DISABLE, isr_extint ) States.extint_pbtn2 = machine.ExtInt( BoardPinDef.PBTN2, ExtInt.IRQ_FALLING, ExtInt.PULL_DISABLE, isr_extint ) States.extint_chg_state = machine.ExtInt( BoardPinDef.CHG_STAT, ExtInt.IRQ_RISING_FALLING, ExtInt.PULL_DISABLE, isr_extint ) States.extint_pg_inv_state = machine.ExtInt( BoardPinDef.CHG_PG_INV, ExtInt.IRQ_RISING_FALLING, ExtInt.PULL_DISABLE, isr_extint, ) print_debug("Device IMEI:", States.dev_imei) # configuration Globals.config = Config() print_info("Loading default config file:", DEFAULT_CONFIG_FILE) if not Globals.config.load(DEFAULT_CONFIG_FILE): print_warn("Failed. Trying backup config file: ", BACKUP_CONFIG_FILE) if not Globals.config.load(BACKUP_CONFIG_FILE): print_warn( "Failed. Saving defaults as backup config file:", BACKUP_CONFIG_FILE ) Globals.config.save(BACKUP_CONFIG_FILE) print_info("Saving to default config file:", DEFAULT_CONFIG_FILE) Globals.config.save(DEFAULT_CONFIG_FILE) # print configuration print_debug( "MQTT Server:", Globals.config.get("mqtt_server"), "Port:", Globals.config.get("mqtt_port"), ) print_debug("MQTT Client ID:", Globals.config.get("mqtt_client_id")) print_debug("MQTT Topic Sub:", Globals.config.get("mqtt_topic_sub")) print_debug("MQTT Topic Pub:", Globals.config.get("mqtt_topic_pub")) print_debug( "Network Check Interval (s):", Globals.config.get("network_check_interval") ) print_debug( "MQTT Reconnect Interval (s):", Globals.config.get("mqtt_reconnect_interval") ) # print volatile information print_debug("Device IMEI:", States.dev_imei) print_debug("Power On Reason:", States.power_on_reason) print_debug("Power Down Reason:", States.power_down_reason) # main FSM lock Globals.main_fsm_lock = _thread.allocate_lock() # Temperature/Humidity sensor Globals.th_sensor = THSensor(Globals.board.i2c0) if Globals.th_sensor.check_communication(): print_info( "Temperature/Humidity sensor check passed. Model:", Globals.th_sensor.MODEL ) else: print_warn("Temperature/Humidity sensor check failed.") States.system_warn = True # Timezone try: print_info("Setting timezone and synchronizing time via NTP...") utime.setTimeZone(Globals.config.get("timezone")) ntptime.sethost(Globals.config.get("ntp_server")) ntptime.settime() except: print_warn("Failed to set timezone or synchronize time via NTP.") # MQTT client if Globals.mqtt_listen_thread or Globals.mqtt_tx_thread: # wait for previous threads to exit if Globals.client: Globals.client.timerFlag = False time.sleep(1) if Globals.client: mqtt_disconnect() del Globals.client Globals.client = None print_info("Initializing MQTT client...") subs_data = get_subs_data() client_id = resolve_variable(Globals.config.get("mqtt_client_id"), subs_data) Globals.client = MQTTClient( client_id=client_id, server=Globals.config.get("mqtt_server"), port=Globals.config.get("mqtt_port"), user=Globals.config.get("mqtt_user"), password=Globals.config.get("mqtt_pass"), keepalive=Globals.config.get("mqtt_keepalive"), ) print_info("MQTT info: Client ID:", client_id) print_info("MQTT info: Server:", Globals.config.get("mqtt_server")) print_info("MQTT info: Port:", Globals.config.get("mqtt_port")) print_info("MQTT info: User:", Globals.config.get("mqtt_user")) print_info("MQTT info: Password:", Globals.config.get("mqtt_pass")) Globals.client.set_callback(mqtt_msg_callback) Globals.client.error_register_cb(mqtt_error_callback) if not Globals.mqtt_rx_data_lock: Globals.mqtt_rx_data_lock = _thread.allocate_lock() if not Globals.mqtt_tx_data_lock: Globals.mqtt_tx_data_lock = _thread.allocate_lock() def run(): Globals.led_timer.start(period=1000, mode=Timer.PERIODIC, callback=task_status) Globals.main_fsm_timer.start( period=1000, mode=Timer.PERIODIC, callback=task_main_fsm ) States.extint_pbtn1.enable() States.extint_pbtn2.enable() States.extint_chg_state.enable() States.extint_pg_inv_state.enable() Globals.thread_running = True Globals.mqtt_listen_thread = _thread.start_new_thread(mqtt_listen_func, ()) Globals.mqtt_tx_thread = _thread.start_new_thread(mqtt_tx_func, ()) def main(): print_newline() print_info("Firmware version:", Constants.C_FW_VERSION) print_info("Firmware date:", Constants.C_FW_DATE) # Indicate initialization Globals.board.led_warn.write(1) init() # Indicate normal operation Globals.board.led_warn.write(0) Globals.board.led_err.write(0) Globals.board.led_run.write(0) Globals.board.led_sys.write(1) # Start main tasks run() # Indicate system ready print("System initialized and running.") Globals.mqtt_rx_enable = True ##### Entry point ##### if __name__ == "__main__": reset() main()