import request # type: ignore import uio # type: ignore from misc import Power # type: ignore import uos # type: ignore import log # type: ignore import sys # type: ignore import ql_fs # type: ignore import ujson # type: ignore import utime # type: ignore import example # type: ignore import checkNet # type: ignore LOG_LEVEL = log.DEBUG log.basicConfig(level=LOG_LEVEL) ota_log = log.getLogger("OTA") NETWORK_TIMEOUT = 60 # seconds, adjust as needed PROJECT_NAME = "SSC" ENV_FILE = "/usr/.env" def read_setting_file(file=ENV_FILE): """Read configuration settings from a JSON file.""" try: print("Reading configuration settings from file: {}".format(file)) data = ql_fs.read_json(file) if data is None: print("Failed to read configuration settings.") return None return data except Exception as e: print("Failed to read configuration settings: {}".format(e)) return None try: SYS_SETTING = read_setting_file() if SYS_SETTING is None: raise Exception("Cannot read system setting!") except Exception as e: print("Read setting file error:", e) VERSION = SYS_SETTING["_v"] OTA_URL = SYS_SETTING["otaUrl"] CONFIG_URL = SYS_SETTING["configUrl"] MAIN_URL = SYS_SETTING["mainUrl"] SSC_URL = SYS_SETTING["sscUrl"] CHECKNET = checkNet.CheckNetwork(PROJECT_NAME, VERSION) def update_setting_file(key, data, file=ENV_FILE): """Update configuration settings in a JSON file.""" try: settings = read_setting_file(file) settings[key] = data ota_log.debug("Updating configuration settings: {}".format(settings)) uos.remove(file) # Remove the existing file ql_fs.touch(file, settings) ota_log.info("Configuration settings updated successfully.") except Exception as e: ota_log.error( "update_setting_file(): Failed to update configuration settings: {}".format( e ) ) return False return True def firmware_check_latest(_version) -> bool: try: if not ql_fs.path_exists("/usr/ssc.py"): ota_log.info("No existing firmware found. Proceeding with update.") return True if _version and VERSION: if _version == VERSION: ota_log.info("Firmware is up to date.") return False else: ota_log.info("New firmware version available.") return True except Exception as e: ota_log.error("OTA check failed: {}".format(e)) return False def retry(func, max_attempts: int = 5, delay: int = 2): for attempt in range(max_attempts): try: return func() except Exception as e: ota_log.error("Attempt {} failed: {}".format(attempt + 1, e)) if attempt < max_attempts - 1: utime.sleep(delay) else: raise # Re-raise the last exception after max attempts def firmware_update( config_url: str = OTA_URL + "/" + CONFIG_URL, main_url: str = OTA_URL + "/" + MAIN_URL, ssc_url: str = OTA_URL + "/" + SSC_URL, ) -> None: def update(): ota_log.debug("Check firmware update") config_response = request.get(config_url) config = "" for i in config_response.text: config += i _config = ujson.loads(config) _version = _config.get("_v") ota_log.debug("SSC version on server: {}".format(_version)) ota_log.debug("SSC current version on config file: {}".format(VERSION)) _ota_url = _config.get("otaUrl") _config_url = _config.get("configUrl") _main_url = _config.get("mainUrl") _ssc_url = _config.get("sscUrl") if _ota_url != OTA_URL: ota_log.debug("Updating OTA URL to: {}".format(_ota_url)) update_setting_file("otaUrl", _ota_url) if _config_url != CONFIG_URL: ota_log.debug("Updating Config URL to: {}".format(_config_url)) update_setting_file("configUrl", _config_url) if _main_url != MAIN_URL: ota_log.debug("Updating Main URL to: {}".format(_main_url)) update_setting_file("mainUrl", _main_url) if _ssc_url != SSC_URL: ota_log.debug("Updating SSC URL to: {}".format(_ssc_url)) update_setting_file("sscUrl", _ssc_url) if firmware_check_latest(_version): main_response = request.get(main_url) main_script = "" for i in main_response.text: main_script += i with uio.open("/usr/main.py.tmp", mode="w", encoding="UTF-8") as f: ota_log.debug("Writing new script to /usr/main.py.tmp") f.write(main_script) ssc_response = request.get(ssc_url) ssc_script = "" for i in ssc_response.text: ssc_script += i with uio.open("/usr/ssc.py.tmp", mode="w", encoding="UTF-8") as f: ota_log.debug("Writing new script to /usr/ssc.py.tmp") f.write(ssc_script) update_setting_file("_v", _version) ota_log.info("Update to new version: {}".format(_version)) uos.rename("/usr/main.py.tmp", "/usr/main.py") uos.rename("/usr/ssc.py.tmp", "/usr/ssc.py") Power.powerRestart() try: retry(update) except Exception as e: ota_log.error( "OTA update failed after multiple attempts: {}".format( sys.print_exception(e) ) ) def main(): try: CHECKNET.poweron_print_once() # Check network status network_status = CHECKNET.wait_network_connected(NETWORK_TIMEOUT) if network_status != (3, 1): ota_log.error("Network is not ready.") if network_status == (1, 0): ota_log.error("No SIM card detected or card slot is loose.") # ssota_logc.network_err_blink("no_sim") elif network_status == (2, -1): ota_log.error("Module cannot register to the network.") # ota_log.network_err_blink("module_error") elif network_status == ( 2, 0, ) or network_status == (2, 2): ota_log.error("SIM card cannot register to the network.") # ota_log.network_err_blink("sim_error") elif network_status == (3, 0): ota_log.error("PDP context activation failed.") # ota_log.network_err_blink("pdp_error") Power.powerRestart() else: ota_log.info("Network check completed.") firmware_update() example.exec("/usr/ssc.py") except KeyboardInterrupt: print("Program interrupted by user.") ota_log.info("Exiting program.") try: main() except Exception as e: print("An error occurred: {}".format(e))