import _thread import math import sys import dataCall # type: ignore import fota # type:ignore import log # type: ignore import modem # type: ignore import net # type: ignore import ntptime # type: ignore import pm # type: ignore import ql_fs # type: ignore import SecureData # type: ignore import ujson # type: ignore import uos # type: ignore import utime # type: ignore from machine import ( UART, # type: ignore Pin, # type: ignore ) from misc import Power # type: ignore from umqtt import MQTTClient # type: ignore ENV_FILE = "/usr/.env" LOG_LEVEL = log.DEBUG # FALL_CHECK_DURATION = 10 GET_DATA_RETRY = 3 def read_setting_file(file=ENV_FILE): """Read configuration settings from a JSON file.""" try: print("Reading configuration settings from file: {}".format(file)) data = ql_fs.read_json(file) if data is None: print("Failed to read configuration settings.") return None return data except Exception as e: print("Failed to read configuration settings: {}".format(e)) return None try: SYS_SETTING = read_setting_file() if SYS_SETTING is None: raise Exception("Cannot read system setting!") except Exception as e: print("Read setting file error:", e) RESET_FLAG_INDEX = 1 GPS_INFO_INDEX = 2 VERSION = SYS_SETTING["_v"] MQTT_USERNAME = SYS_SETTING["username"] MQTT_PASSWORD = SYS_SETTING["pwd"] MQTT_SERVER = SYS_SETTING["ip"] MQTT_PORT = SYS_SETTING["port"] PROJECT_ID = SYS_SETTING["topic"] TDC = SYS_SETTING["TDC"] STDC = SYS_SETTING["STDC"] ANGLE_THRESHOLD = SYS_SETTING["angleThreshold"] PHRASE = SYS_SETTING["p"] NUMBER_OF_LIGHTS = SYS_SETTING["n"] CYCLE_LENGTH = SYS_SETTING["l"] SCHEDULE = SYS_SETTING["schedule"] # LIGHT_MODE = SYS_SETTING["lightmode"] NTP_SERVER = "stdtime.gov.hk" SLEEP_DURATION = 30 # seconds, adjust as needed LED_GPIO = Pin.GPIO23 GPS_EN = True if GPS_EN: import quecgnss # type: ignore GPS_TIMEOUT = 30 class MqttClient: """ mqtt init """ def __init__( self, clientid, server=MQTT_SERVER, port=MQTT_PORT, user=MQTT_USERNAME, password=MQTT_PASSWORD, keepalive=60, ssl=False, ssl_params={}, reconn=True, ): self.__clientid = clientid self.__pw = password self.__server = server self.__port = port self.__username = user self.__keepalive = keepalive self.__ssl = ssl self.__ssl_params = ssl_params log.basicConfig(level=LOG_LEVEL) self.log = log.getLogger("MQTT") self.qos = None # Network Status Flag self.__nw_flag = True # thread lock for mqtt reconnect self.mp_lock = _thread.allocate_lock() self.mqtt_task = False # Flag to control the listening thread # mqtt client self.client = MQTTClient( self.__clientid, self.__server, self.__port, self.__username, self.__pw, keepalive=self.__keepalive, ssl=self.__ssl, ssl_params=self.__ssl_params, reconn=reconn, ) def connect(self): """ Connect to mqtt Server """ # connect to MQTT server self.log.debug("Connecting to MQTT server...") connection_failed = self.client.connect() if connection_failed: raise Exception( "Failed to connect to MQTT server: {}, port: {}. {}".format( self.__server, self.__port, connection_failed ) ) # register network callback flag = dataCall.setCallback(self.nw_cb) if flag != 0: # register network callback failed raise Exception("Network callback registration failed") self.log.debug( "Connected to MQTT server: {}, port: {}".format(self.__server, self.__port) ) self.mqtt_task = True return True def set_callback(self, sub_cb): """ set mqtt subscribe callback function :param sub_cb: callback function to handle subscribed messages """ self.client.set_callback(sub_cb) def error_register_cb(self, func): """ Register a callback function to handle errors in the MQTT client. :param func: Callback function that will be called with the error message. """ self.client.error_register_cb(func) def subscribe(self, topic, qos=0): """ Subscribe to a topic :param topic: String, Topic to subscribe to """ self.topic = topic self.qos = qos self.client.subscribe(topic, qos) def publish(self, msg, topic, qos=0): """ Publish a message to a topic :param topic: String, Topic to publish to """ self.topic = topic self.client.publish(topic, msg, qos) self.log.debug( "Published to topic: {}, msg: {}, qos: {}".format(topic, msg, qos) ) def disconnect(self): """ Disconnect from the MQTT server and stop the listening thread. This method will stop the listening thread and close the MQTT connection. """ try: self.log.debug("disconnect(): Stopping MQTT listening thread...") self.mqtt_task = False # _thread.exit_thread(listen_thread_id) self.client.disconnect() self.log.debug( "disconnect(): Disconnected from MQTT server. mqtt_task flag: {}".format( self.mqtt_task ) ) except Exception as e: self.log.error( "disconnect(): Error disconnecting from MQTT server: {}".format(e) ) def reconnect(self): """ Reconnect to the MQTT server. This method will check the network status and attempt to reconnect to the MQTT server. If the network is not available, it will wait until the network is restored before attempting to reconnect. It will also handle the case where the MQTT client is already connected. """ try: # if lock is already acquired, return immediately if self.mp_lock.locked(): return self.mp_lock.acquire() # close the existing mqtt connection self.client.close() # reconnect logic for attempt in range(MQTT_RECONNECT_COUNT): MQTT_RECONNECT_COUNT -= 1 self.log.debug( "Reconnecting to MQTT server, attempt {}/{}".format( attempt + 1, MQTT_RECONNECT_COUNT ) ) net_sta = net.getState() # get network status if net_sta != -1 and net_sta[1][0] == 1: call_state = dataCall.getInfo(1, 0) # get dataCall state if (call_state != -1) and (call_state[2][0] == 1): try: # reconnect to mqtt server self.log.debug("Reconnecting to MQTT server...") self.connect() except Exception as e: # failed to reconnect, log the error and wait before retrying self.log.error("Reconnect failed: {}".format(e)) self.client.close() utime.sleep(5) continue else: # network not connected, wait for network to be ready self.log.info("Network not connected, waiting for network...") utime.sleep(10) continue # reconnect successful, subscribe to topic self.log.info("Reconnected to MQTT server successfully.") try: if self.topic is not None: self.client.subscribe(self.topic, self.qos) self.mp_lock.release() except: # subscribe failed, log the error and wait before retrying self.log.error("Subscribe failed, retrying...") self.client.close() utime.sleep(5) continue else: utime.sleep(5) continue return True # if we reach here, it means all attempts failed self.log.error( "Failed to reconnect to MQTT server after {} attempts.".format( MQTT_RECONNECT_COUNT ) ) self.mp_lock.release() return False except Exception as e: self.log.error("Error in reconnect(): {}".format(sys.print_exception(e))) if self.mp_lock.locked(): self.mp_lock.release() return False def nw_cb(self, args): """ dataCall network callback function This function is called when the network status changes. """ nw_sta = args[1] if nw_sta == 1: # network connected self.log.info("*** network connected! ***") self.__nw_flag = True else: # network not connected self.log.info("*** network not connected! ***") self.__nw_flag = False def __listen(self): while True: try: self.log.debug("__listen(): mqtt_task flag: {}".format(self.mqtt_task)) if not self.mqtt_task: self.log.info("__listen(): Stopped MQTT listening thread.") break self.log.debug("__listen(): Waiting for MQTT messages...") self.client.wait_msg() except OSError as e: # if the socket is closed or an error occurs, handle the exception if not self.__nw_flag: # network is not connected, wait for network to be ready self.log.error("Network not connected, waiting for network...") self.reconnect() # if the error is due to a closed socket, try to reconnect elif self.client.get_mqttsta() != 0 and self.mqtt_task: self.log.error("MQTT client error: {}".format(e)) self.reconnect() else: self.log.error("MQTT client error: {}".format(e)) return -1 except Exception as e: self.log.error("MQTT client error: {}".format(e)) return -1 def loop_forever(self): global listen_thread_id self.log.debug("Starting MQTT client loop forever.") task_stacksize = _thread.stack_size() name, platform = uos.uname()[1].split("=", 1) if ( platform == "EC600E" or platform == "EC800E" or platform == "EG800K" or platform == "EC800K" ): _thread.stack_size(8 * 1024) elif platform == "FCM362K": _thread.stack_size(3 * 1024) else: _thread.stack_size(16 * 1024) # Start the listening thread listen_thread_id = _thread.start_new_thread(self.__listen, ()) self.log.debug("Listening thread started with ID: {}".format(listen_thread_id)) # Reset the stack size to the original value _thread.stack_size(task_stacksize) class LED: """LED control class.""" def __init__( self, pin_number, mode=0, phrase=PHRASE, number_of_lights=NUMBER_OF_LIGHTS, cycle_length=CYCLE_LENGTH, schedule=SCHEDULE, ): """Initialize the LED with the specified pin number.""" log.basicConfig(level=LOG_LEVEL) self.log = log.getLogger("LED") self.pin = Pin(pin_number, Pin.OUT) self.state = False # 初始状态为关闭 self.phrase = phrase # phase self.number_of_lights = number_of_lights # number of lights self.cycle_length = cycle_length # length of cycle in ms self.schedule = ( schedule if schedule is not None else [60] ) # List of minutes in the day self.light_mode = mode self.init_ts = 0 self.init_ticks = 0 def on(self): """Turn the LED on.""" self.pin.write(1) self.state = True def off(self): """Turn the LED off.""" self.pin.write(0) self.state = False def toggle(self): """Toggle the LED state.""" if self.state: self.off() else: self.on() def blink(self): self.blinking_flag = True while self.blinking_flag: current_ticks = utime.ticks_ms() ticks_diff = utime.ticks_diff(current_ticks, self.init_ticks) time_in_ms = self.init_ts + ticks_diff # self.log.debug("current_ticks: {}".format(current_ticks)) # self.log.debug("self.init_ticks: {}".format(self.init_ticks)) # self.log.debug("ticks_diff: {}".format(ticks_diff)) # self.log.debug("time_in_ms: {}".format(time_in_ms)) if self.light_mode == 1 and ((time_in_ms // 500 % 2) == 1): # Blink on according to phase self.on() elif ( self.light_mode == 2 and self.in_schedule(self.schedule) and ( (time_in_ms // self.cycle_length % self.number_of_lights) == self.phrase ) ): # Scheduled Blink on according to phase self.on() elif self.light_mode == 3: # Long On self.on() else: # Blink off self.off() utime.sleep_ms(20) self.off() _thread.exit() def in_schedule(self, schedule): """Check if the current time is in the schedule.""" if not schedule: return False # Cannot be empty # Get current hour and minute, convert to minutes time = utime.localtime_ex() minute_of_day = time[3] * 60 + time[4] intervals = [] # Build intervals from schedule for i in range(0, len(schedule), 2): start = schedule[i] if i + 1 < len(schedule): end = schedule[i + 1] intervals.append((start, end)) else: # Odd length: last interval is from start to 24:00 intervals.append((start, 24 * 60)) # Check if current minute is in any "on" interval for start, end in intervals: if start <= minute_of_day < end: return True return False class Sen_Uart: """UART communication class.""" def __init__( self, channel=UART.UART2, baudrate=115200, databits=8, partity=0, stopbits=1, flowctl=0, ): """Initialize the UART with the specified pins and baudrate.""" log.basicConfig(level=LOG_LEVEL) self.log = log.getLogger("esp32-UART") self.uart = UART(channel, baudrate, databits, partity, stopbits, flowctl) self.uart.set_callback(self.callback) self.data = None self.get_data_retry = GET_DATA_RETRY self.get_data_flag = False def callback(self, para): """Callback function for UART read events. Callback will be triggered when data is received. """ self.log.debug("call para:{}".format(para)) if 0 == para[0]: self.data = self.decode_data(self.uartRead(para[2])) self.get_data_flag = True def uartWrite(self, msg): self.log.debug("write msg:{}".format(msg)) self.uart.write(msg) def uartRead(self, len=1024): msg = self.uart.read(len) utf8_msg = msg.decode() return utf8_msg def _get_data(self): """Get angle data from the UART.""" self.log.info("Getting data from UART...") while self.get_data_retry > 0 and not self.get_data_flag: self.uartWrite("getdata\n") utime.sleep(3) self.get_data_retry -= 1 if self.get_data_retry == 0: self.log.warning("get_data(): Failed to get data from UART") return self.get_data_retry = GET_DATA_RETRY # Reset retry count self.get_data_flag = False def decode_data(self, data): """Decode the received data.""" try: decoded_data = ujson.loads(data) print("Decoded data: {}".format(decoded_data)) # return ( # decoded_data.get("bat", 0), # decoded_data.get("roll", 0), # decoded_data.get("pitch", 0), # ) return decoded_data except ValueError as e: self.log.error("Failed to decode data: {}".format(e)) return None class SSC: def __init__(self): self.version = VERSION self.IMEI = modem.getDevImei() self.project_id = PROJECT_ID self.fota_obj = fota() self.task_enable = True self.log = None self.setup_logging() self.send_data_flag = False # Flag to indicate if data sending is enabled self.fall_check_flag = False # Flag to indicate if fall checking is enabled self.fall_flag = False # Flag to indicate if a fall has been detected # Sensor data self.tdc = TDC # TDC value, default is 10 self.stdc = STDC # STDC value, default is 0 self.roll = 0 # Roll angle, default is 0 self.pitch = 0 # Pitch angle, default is 0 self.tilt = 0 # tilt_angle, default is 0 self.angle_threshold = ANGLE_THRESHOLD # GPS data # self._lat = 0.0 # Latitude, default is 0.0 # self._long = 0.0 # Longitude, default is 0.0 # self.gps = [self._lat, self._long] # GPS coordinates, default is [0.0, 0.0] # Battery data self.battery = 0 # Battery level, default is 0 # NTP self.ntp_code = 0 # LED self.light = LED( LED_GPIO, mode=0, phrase=PHRASE, number_of_lights=NUMBER_OF_LIGHTS, cycle_length=CYCLE_LENGTH, schedule=SCHEDULE, ) # UART Communication self.esp32_uart = Sen_Uart() # MQTT client self.pub_topic = "spvtms-data" self.individual_topic = "sensor/{}".format(self.IMEI) if self.project_id: self.group_topic = self.get_group_topic(self.project_id) else: self.group_topic = "" self.mqtt_client = MqttClient( clientid="SSC-{}".format(self.IMEI), user=MQTT_USERNAME, password=MQTT_PASSWORD, ssl=False, ) self.init_gps() # Initialize module power management self.wakelock = pm.create_wakelock("wake_lock", len("wake_lock")) pm.autosleep(1) # Enable Autosleep """ Sensor Methods""" def setTDC(self, tdc): if tdc is None: self.log.warning("TDC not found in payload") return self.tdc = tdc self.log.debug("TDC set to: {}".format(tdc)) self.update_setting_file("TDC", tdc) p_send_data_flag = self.send_data_flag if p_send_data_flag: self.send_data_flag = False self.start_send_data_loop() def setSTDC(self, stdc): if stdc is None: self.log.warning("STDC not found in payload") return self.stdc = stdc self.log.debug("STDC set to: {}".format(stdc)) self.update_setting_file("STDC", stdc) p_fall_check_flag = self.fall_check_flag if p_fall_check_flag: self.fall_check_flag = False self.start_check_fall_loop() # def calculate_tilt_angle(self, roll, pitch): # if roll > 0: # tilt_angle = math.degrees( # math.atan( # math.sqrt( # (math.tan(math.radians(90 - roll)) ** 2) # + (math.tan(math.radians(pitch)) ** 2) # ) # ) # ) # elif roll < 0: # tilt_angle = -180 - math.degrees( # math.atan( # math.sqrt( # (math.tan(math.radians(90 - roll)) ** 2) # + (math.tan(math.radians(pitch)) ** 2) # ) # ) # ) # else: # tilt_angle = 0 # return tilt_angle def calculate_tilt_angle(self, roll_deg, pitch_deg, return_deg=True): # 将角度限制在 [-180, 180] 范围内 roll_deg = ((roll_deg + 180) % 360) - 180 pitch_deg = ((pitch_deg + 180) % 360) - 180 # 当角度接近 ±90° 时,直接返回 90° 以避免 tan 函数发散 if abs(roll_deg) >= 89.9 or abs(pitch_deg) >= 89.9: return 90.0 if return_deg else math.pi / 2 roll_rad = math.radians(roll_deg) pitch_rad = math.radians(pitch_deg) tan_roll = math.tan(roll_rad) tan_pitch = math.tan(pitch_rad) total_slope = math.sqrt(tan_roll**2 + tan_pitch**2) tilt_rad = math.atan(total_slope) if return_deg: return math.degrees(tilt_rad) else: return tilt_rad """ Module Sensor Method""" def get_data(self): self.esp32_uart._get_data() if self.esp32_uart.data is None: self.log.warning("get_data(): No data received from UART") return self.battery = self.esp32_uart.data.get("bat", 0) self.roll = self.esp32_uart.data.get("roll", 0) self.pitch = self.esp32_uart.data.get("pitch", 0) # def ang_compare(self): # """Compare the roll and pitch angles with the threshold. # return True if a fall is detected, False otherwise. # """ # # fall = ( # # abs( # # abs(self.calculate_tilt_angle(self.roll, self.pitch)) # # - abs(self.p_tilt_angle) # # ) # # > self.angle_threshold # # ) # tilt_angle = self.calculate_tilt_angle( # self.roll - self.p_roll, self.pitch - self.p_pitch # ) # fall = tilt_angle > self.angle_threshold # return fall def check_fall_loop(self): """Get sensor data from the UART and check for falls. This method will continuously read data from the UART and check if a fall is detected. """ duration = self.stdc while True: self.get_data() p_fall_flag = self.fall_flag self.tilt = self.calculate_tilt_angle( self.roll - self.p_roll, self.pitch - self.p_pitch ) self.fall_flag = self.tilt > self.angle_threshold self.log.debug( "check_fall_loop(): Roll: {}, Pitch: {}, Fall: {}".format( self.roll, self.pitch, self.fall_flag ) ) if p_fall_flag != self.fall_flag: self.log.warning("check_fall_loop(): Fall change detected!") self.mqtt_pub_data() self.mqtt_pub_data(True) for _ in range(duration): if self.fall_check_flag: utime.sleep(1) else: _thread.exit() def send_data_loop(self): """Send regular data to the MQTT server.""" duration = self.tdc while True: self.mqtt_pub_data() self.log.debug("send_data_loop(): Sleep for {} seconds".format(duration)) for _ in range(duration): if self.send_data_flag: utime.sleep(1) else: _thread.exit() def init(self): self.sync_time() self.log.debug("init(): Time synchronized. TS: {}".format(self.get_timestamp())) self.align_timestamps() self.log.debug("init_ts: {}".format(self.light.init_ts)) self.log.debug("init_ticks: {}".format(self.light.init_ticks)) # Initialize MQTT client if self.mqtt_init(): self.log.info("MQTT client initialized and ready.") self.mqtt_client.loop_forever() else: self.power_reset() # Initial MQTT publish self.mqtt_pub_data() # self.mqtt_pub_data(True) utime.sleep(1) def start(self): """Start the SSC device task loops.""" # Initialize roll and pitch self.get_data() self.p_roll = self.roll self.p_pitch = self.pitch self.p_tilt_angle = self.calculate_tilt_angle(self.p_roll, self.p_pitch) self.start_send_data_loop() self.start_check_fall_loop() self.log.info("Started SSC device task loops.") def start_send_data_loop(self): self.send_data_flag = True # Restart send data loop _thread.start_new_thread(self.send_data_loop, ()) self.log.info("Started send data task loop.") def start_check_fall_loop(self): self.fall_check_flag = True # Restart fall checking loop _thread.start_new_thread(self.check_fall_loop, ()) self.log.info("Started check fall task loop.") def stop(self): """Stop the SSC device task loops.""" try: self.send_data_flag = False self.fall_check_flag = False except Exception as e: self.log.error("stop(): Error stopping threads: {}".format(e)) """ Logging Methods""" def setup_logging(self): log.basicConfig(level=LOG_LEVEL) self.log = log.getLogger("SSC") """ Storage Methods""" def write_storage(self, databuf, index=RESET_FLAG_INDEX): # 在index为1的存储区域中存储长度为8的数据databuf if type(databuf) is not bytearray: databuf = bytearray(databuf) if index <= 0: return -1 elif index <= 8: length = 52 elif index <= 12: length = 100 elif index <= 14: length = 500 elif index <= 16: length = 1000 if SecureData.Store(index, databuf, length) < 0: return -1 return 0 def read_storage(self, index=RESET_FLAG_INDEX): # 定义一个长度为20的数组用于读取存储的数据 buf = bytearray(20) # 读取index为1的存储区域中的数据至buf中,将读取到数据的长度存储在变量length中 length = SecureData.Read(index, buf, 20) if length < 0: return None return buf[:length] def update_setting_file(self, key, data, file=ENV_FILE): """Update configuration settings in a JSON file.""" try: settings = read_setting_file(file) settings[key] = data self.log.debug("Updating configuration settings: {}".format(settings)) uos.remove(file) # Remove the existing file ql_fs.touch(file, settings) self.log.info("Configuration settings updated successfully.") except Exception as e: self.log.error( "update_setting_file(): Failed to update configuration settings: {}".format( e ) ) return False return True """ Power Management Methods""" def power_reset(self): """Reset the device power. This function will reset the device and reinitialize all components. """ self.log.info("Resetting the device power...") Power.powerRestart() self.done_pin.write(0) # Reset done signal """ Firmware Update Methods """ def firemware_udpate_callback(self, args): self.log.info("Download status:", args[0], "Download process:", args[1]) # TODO -> Implement firmware integrity check logic return 0 # Placeholder for actual firmware update logic """GPS Related Methods""" def init_gps(self): self.log.debug("Initializing GPS module...") quecgnss.configSet(0, 0) quecgnss.configSet(2, 1) if quecgnss.init() < 0: self.log.error("Failed to initialize GPS module.") self._gps_off() return False self.log.info("GPS module initialized successfully.") self._gps_off() return True def _gps_on(self): self.log.debug("Turning on GPS...") if quecgnss.gnssEnable(1) < 0: self.log.error("Failed to turn on GPS.") def _gps_off(self): self.log.debug("Turning off GPS...") if quecgnss.gnssEnable(0) < 0: self.log.error("Failed to turn off GPS.") def get_gps(self): """Get GPS data from the QuecGNSS module. return [lat, long] | None if no data is available.""" try: self._gps_on() self.log.debug("get_gps(): Getting GPS data") if quecgnss.get_state() < 2: utime.sleep(1) for _ in range(GPS_TIMEOUT): gps_info = quecgnss.read(4096) self.log.debug("GPS info: {}".format(gps_info)) if gps_info != -1: gps_data = self.decode_gps_info(gps_info) if ( gps_data is not None and gps_data["lat"] != 0.0 and gps_data["long"] != 0.0 ): self.log.debug( "get_gps(): Decoded GPS data: {}".format(gps_data) ) self._gps_off() return gps_data self.log.error("'get_gps()': Failed to decode GPS information.") else: self.log.error("'get_gps()': No GPS info available.") utime.sleep(1) self._gps_off() return None except Exception as e: self.log.error("'get_gps()': Error getting GPS information: {}".format(e)) self._gps_off() return None def decode_gps_info(self, gps_info): """ Decode the GPS information. return [lat, long] """ try: gngsa = None gpgga = None if gps_info[1]: res = "".join(gps_info[1]) res = ( res.replace("\r\n", "\n") .replace("\r", "\n") .replace("\n\n", "\n") .replace("\n", " ") .replace("$", "\n") ) res = res.split("\n") self.log.debug("'decode_gps_info()': res: {}".format(res)) for data in res: if data.startswith("GNGGA"): gngsa = data gngsa = gngsa.split(",") elif data.startswith("GPGGA"): gpgga = data gpgga = gpgga.split(",") if gngsa is not None: de_gngsa = { "lat": float(gngsa[2]) if gngsa[2] else 0.0, "long": float(gngsa[4]) if gngsa[4] else 0.0, } return de_gngsa elif gpgga is not None: de_gpgga = { "lat": float(gpgga[2]) if gpgga[2] else 0.0, "long": float(gpgga[4]) if gpgga[4] else 0.0, } return de_gpgga self.log.debug( "'decode_gps_info()': decoded GPS information: {}".format(gngsa) ) return gngsa except Exception as e: self.log.error( "'decode_gps_info()': Error decoding GPS information: {}".format(e) ) return None """ MQTT related methods""" def decode_mqtt_msg(self, msg): """ Decode the MQTT message. :param msg: The MQTT message to decode. :return: Decoded message as a dictionary. """ self.log.debug("Decoding MQTT message: {}".format(msg)) try: decoded_msg = ujson.loads(msg) command = decoded_msg.get("command") payload = decoded_msg.get("payload") if command is None: self.log.debug("Ignoring uplink message.") return None, None elif ( command == "setTDC" or command == "setSTDC" or command == "setMQTTConfig" or command == "setConfig" ) and payload is None: self.log.warning("Received command without payload: {}".format(msg)) return None, None else: return command, payload except Exception as e: self.log.error("Failed to decode MQTT message: {}".format(e)) return None, None def re_subscribe_group_topic(self, group_topic, qos=0): """ Change the group topic for the MQTT client. :param group_topic: String, New group topic """ try: self.log.debug( "re_subscribe_group_topic(): Changing group topic to: {}".format( group_topic ) ) self.mqtt_client.disconnect() # Disconnect from MQTT server if self.mqtt_init(): self.log.info("Reinitialized MQTT client with new group topic.") else: self.log.error( "Failed to reinitialize MQTT client with new group topic." ) Power.powerRestart() except Exception as e: self.log.error( "re_subscribe_group_topic(): Error changing group topic: {}".format(e) ) Power.powerRestart() def sub_cb(self, topic, msg): self.log.debug("Received message on topic: {}, msg: {}".format(topic, msg)) # Decode the topic to ASCII topic = topic.decode("ascii") # Check if the topic is a valid individual or group topic if "SPVTMS-project-" not in topic and "sensor/" not in topic: # topic = "SPVTMS-project-{}".format(topic) topic = self.get_group_topic(topic) # Check if the topic is either individual or group topic if topic != self.individual_topic and topic != self.group_topic: self.log.warning("Received message on unknown topic: {}".format(topic)) self.log.warning( "Current individual topic: {}".format(self.individual_topic) ) self.log.warning("Current group topic: {}".format(self.group_topic)) return # Decode the MQTT message (command, payload) = self.decode_mqtt_msg(msg) # If command is None, ignore the message self.log.debug("Decoded MQTT command: {}, payload: {}".format(command, payload)) # If command is not None, process the command if command is not None: if command == "setTDC": self.log.info("Received command to set TDC.") self.setTDC(payload.get("TDC")) self.mqtt_pub_data() elif command == "setSTDC": self.log.info("Received command to set STDC.") self.setSTDC(payload.get("STDC")) self.mqtt_pub_data() elif command == "start": self.log.info("Received command to start fall detection.") self.light.light_mode = 0 self.fall_flag = False self.send_data_flag = True self.start() self.mqtt_pub_data(True) elif command == "stop": self.log.info("Received command to stop fall detection.") self.light.light_mode = 0 self.fall_flag = False self.stop() self.mqtt_pub_data() elif command == "setMQTTConfig": self.log.info("Received command to set MQTT configuration.") self.setMQTTConfig(payload) self.mqtt_pub_data() elif command == "setConfig": self.log.info("Received command to set configuration.") self.set_task_config(payload) if self.light.light_mode in [1, 2, 3]: self.light.blinking_flag = False _thread.start_new_thread(self.light.blink, ()) elif self.light.light_mode == 0: self.light.blinking_flag = False self.mqtt_pub_data() elif command == "ping": self.log.info("Received ping command.") self.mqtt_pub_data() self.mqtt_pub_data(True) elif command == "reboot": self.log.info("Received reboot command.") Power.powerRestart() else: self.log.error("Unknown MQTT command: {}".format(command)) def err_cb(self, error): self.log.info(error) # self.mqtt_err_blink() if not self.mqtt_client.reconnect(): self.log.error( "'mqtt_err_cb()': Failed to reconnect to MQTT server after 10 attempts." ) return False def mqtt_init(self): try: self.mqtt_client.set_callback(self.sub_cb) self.mqtt_client.error_register_cb(self.err_cb) self.mqtt_client.connect() self.log.info("MQTT client connected.") # Subscribe to individual and group topics self.mqtt_client.subscribe(self.individual_topic) self.log.info( "Subscribed to individual MQTT topic: {}".format(self.individual_topic) ) if self.group_topic: self.mqtt_client.subscribe(self.group_topic) self.log.info( "Subscribed to group MQTT topic: {}".format(self.group_topic) ) return True except Exception as e: self.log.error("Failed to initialize MQTT client: {}".format(e)) return False def pub_individual(self, msg, qos=0): """Publish a message to the individual topic""" self.mqtt_client.publish(msg, self.pub_topic, qos) def mqtt_pub_data(self, gps=False): self.get_data() data = { "IMEI": self.IMEI, "project": self.project_id, "ts": self.get_timestamp(), "bat": self.battery, "CSQ": net.csqQueryPoll(), "TDC": self.tdc, "STDC": self.stdc, "roll": self.roll, "pitch": self.pitch, "tilt": self.tilt, "start": self.send_data_flag, "mode": self.light.light_mode, "fall": self.fall_flag, } config = { "_v": self.version, "ip": self.mqtt_client.__server, "port": self.mqtt_client.__port, "username": self.mqtt_client.__username, "topic": self.group_topic, "angleThreshold": self.angle_threshold, "p": self.light.phrase, "n": self.light.number_of_lights, "l": self.light.cycle_length, "schedule": self.light.schedule, } if self.mqtt_client.__username is None: config.pop("username") if gps and GPS_EN: gps_data = self.get_gps() data["gps"] = ( [gps_data.get("lat"), gps_data.get("long")] if gps_data is not None else None ) mqtt_msg = {"data": data, "config": config} mqtt_msg = ujson.dumps(mqtt_msg) self.pub_individual(str(mqtt_msg)) self.log.debug("'mqtt_pub_data()': Published data & config to MQTT topic.") def test_mqtt(self, tmp_setting): """Test MQTT config""" self.log.info("Testing MQTT configuration...") self.log.debug("Temporary MQTT settings: {}".format(tmp_setting)) self.log.debug( "Testing MQTT connection with server: {}, port: {}".format( tmp_setting.get("ip"), tmp_setting.get("port") ) ) self.log.debug("Testing MQTT username: {}".format(tmp_setting.get("username"))) self.log.debug("Testing MQTT password: {}".format(tmp_setting.get("pwd"))) try: tmp_mqtt_client = MqttClient( clientid="SSC-TEMP-{}".format(self.IMEI), user=tmp_setting.get("username"), password=tmp_setting.get("pwd"), server=tmp_setting.get("ip"), port=tmp_setting.get("port"), ) tmp_mqtt_client.connect() self.log.info("MQTT client connected successfully.") tmp_mqtt_client.set_callback(self.sub_cb) tmp_mqtt_client.error_register_cb(self.err_cb) tmp_mqtt_client.subscribe(self.individual_topic) self.log.info( "MQTT client subscribe successfully to individual topic: {}".format( self.individual_topic ) ) tmp_mqtt_client.disconnect() self.log.info("MQTT client disconnected successfully.") return True except Exception as e: self.log.error( "test_mqtt(): Failed to connect or subscribe to MQTT: {}".format(e) ) return False def setMQTTConfig(self, payload, file=ENV_FILE): """ Set MQTT configuration from the payload. :param payload: Dictionary containing MQTT configuration parameters. """ if payload: if self.test_mqtt(payload): self.log.info("MQTT configuration is valid.") try: settings = read_setting_file() settings["ip"] = payload.get("ip") settings["port"] = payload.get("port") settings["username"] = ( payload.get("username") if payload.get("username") else None ) settings["pwd"] = payload.get("pwd") if payload.get("pwd") else None self.log.debug("Updating MQTT settings: {}".format(settings)) uos.remove(file) # Remove the existing file ql_fs.touch(file, settings) self.log.info("MQTT settings updated successfully.") except Exception as e: self.log.error( "setMQTTConfig(): Failed to update MQTT settings: {}".format(e) ) return False else: self.log.error( "setMQTTConfig(): MQTT configuration is invalid. Please check the settings." ) def set_task_config(self, payload, file=ENV_FILE): """ Set configuration parameters from the payload. :param payload: Dictionary containing configuration parameters. """ try: self.log.debug("Setting task settings: {}".format(payload)) if payload: self.log.info("Setting configuration parameters...") new_project_id = payload.get("project") new_angle_threshold = payload.get("angleThreshold") new_phrase = payload.get("p") new_number_of_lights = payload.get("n") new_cycle_length = payload.get("l") new_schedule = payload.get("schedule") new_mode = payload.get("mode", None) settings = read_setting_file() if new_project_id is not None and "SPVTMS-project-" in new_project_id: new_project_id = new_project_id.replace("SPVTMS-project-", "") self.log.debug( "Old project ID: {}, New project ID: {}".format( self.project_id, new_project_id ) ) if new_project_id is not None and new_project_id != self.project_id: self.project_id = new_project_id self.group_topic = self.get_group_topic(new_project_id) settings["topic"] = new_project_id self.log.debug( "set_task_config(): Updating group topic to: {}".format( self.group_topic ) ) if new_angle_threshold is not None: self.angle_threshold = new_angle_threshold settings["angleThreshold"] = self.angle_threshold self.log.debug( "set_task_config(): Updating angle threshold to: {}".format( self.angle_threshold ) ) else: self.log.warning( "No angle threshold provided, using previous value: {}".format( self.angle_threshold ) ) if new_phrase is not None and new_phrase >= 0: self.light.phrase = new_phrase settings["p"] = self.light.phrase self.log.debug( "set_task_config(): Updating phrase to: {}".format( self.light.phrase ) ) else: self.log.warning( "No phrase provided or invalid value, using previous value: {}".format( self.light.phrase ) ) if new_number_of_lights is not None and new_number_of_lights > 0: self.light.number_of_lights = new_number_of_lights settings["n"] = self.light.number_of_lights self.log.debug( "set_task_config(): Updating number of lights to: {}".format( self.light.number_of_lights ) ) else: self.log.warning( "No number of lights provided or invalid value, using previous value: {}".format( self.light.number_of_lights ) ) if new_cycle_length is not None: self.light.cycle_length = new_cycle_length settings["l"] = self.light.cycle_length self.log.debug( "set_task_config(): Updating cycle length to: {}".format( self.light.cycle_length ) ) else: self.log.warning( "No cycle length provided or invalid value, using previous value: {}".format( self.light.cycle_length ) ) if new_schedule is not None and new_schedule != []: self.light.schedule = new_schedule settings["schedule"] = self.light.schedule self.log.debug( "set_task_config(): Updating schedule to: {}".format( self.light.schedule ) ) else: self.log.warning( "No schedule provided or invalid value, using previous value: {}".format( self.light.schedule ) ) if new_mode is not None and new_mode in [0, 1, 2, 3]: self.light.light_mode = new_mode # settings["lightmode"] = self.light.light_mode self.log.debug( "set_task_config(): Updating light mode to: {}".format( self.light.light_mode ) ) else: self.log.warning( "No light mode provided or invalid value, using previous value: {}".format( self.light.light_mode ) ) uos.remove(file) # Remove the existing file ql_fs.touch(file, settings) self.log.info("Configuration settings updated successfully.") else: self.log.error("No payload provided for configuration update.") except Exception as e: self.log.error( "set_task_config(): Failed to update configuration settings: {}".format( sys.print_exception(e) ) ) return False """ Blinking Related Methods""" def network_err_blink(self, status): """Blink LEDs based on network error status. LED will blink for 10 times Blink pattern: - NO SIM: 1x blink - Module Error: 2x blink - SIM Error: 3x blink - PDP Error: 4x blink - Firmware Error: 5x blink """ self.status_led.off() # Turn off external LED if status == "no_sim": self.status_led.blink(1, self.blink_cycle) elif status == "module_error": self.status_led.blink(2, self.blink_cycle) elif status == "sim_error": self.status_led.blink(3, self.blink_cycle) elif status == "pdp_error": self.status_led.blink(4, self.blink_cycle) elif status == "firmware_error": self.status_led.blink(5, self.blink_cycle) def mqtt_err_blink(self): self.status_led.off() # Turn off external LED self.status_led.blink(10, self.blink_cycle) def mqtt_pub_blink(self): self.network_led.off() self.network_led.blink(1, self.blink_cycle) """ Helper Methods """ def get_timestamp(self): """ Get the current timestamp in a human-readable format. The timestamp is returned with UTC+8 timezone adjustment. """ return utime.mktime(utime.localtime_ex()) def sync_time(self): # Synchronize time ntptime.sethost(NTP_SERVER) code = ntptime.settime(timezone=8) # Set timezone to Hong Kong (UTC+8) self.log.info("NTP Sync Code: {}".format(code)) self.ntp_code = code def align_timestamps(self): init_time_in_ms = int(utime.mktime(utime.localtime_ex()) * 1000) # Wait in a loop until the second changes while True: current_time_in_ms = int(utime.mktime(utime.localtime_ex()) * 1000) current_ticks = utime.ticks_ms() if current_time_in_ms != init_time_in_ms: self.light.init_ts = current_time_in_ms self.light.init_ticks = current_ticks break utime.sleep_ms(10) # Sleep for 10 ms def get_group_topic(self, project_id): """ Generate the group topic based on the project ID. :param project_id: String, Project ID :return: String, Group topic """ return "SPVTMS-project-{}".format(project_id) """""" def main(): try: # Initialize FES instance if network is ready ssc = SSC() # ssc.status_led.on() # Turn on status LED ssc.init() ssc.log.info("SSC device initialized successfully.") while True: utime.sleep(1) # Sleep to prevent busy-waiting except KeyboardInterrupt: print("Program interrupted by user.") ssc.stop() ssc.mqtt_client.disconnect() ssc.log.info("SSC device task loops stopped.") ssc.log.info("Exiting program.") if __name__ == "__main__": try: main() except Exception as e: print("An error occurred: {}".format(e))