import sys from machine import Pin, UART, ExtInt # type: ignore from misc import Power, ADC # type: ignore import usr.uasyncio as asyncio import ubinascii # type: ignore import quecgnss # type: ignore import ql_fs # type: ignore import modem # type: ignore import utime # type: ignore import ujson # type: ignore from umqtt import MQTTClient # type: ignore import log # type: ignore from usr.hal import * import math # ----------------------------------------------------------------------------------------------------------------------------------------------------------- # Files # ----------------------------------------------------------------------------------------------------------------------------------------------------------- LOAD_CELL_PARAM_PATH = "/usr/load_cell_parameters.json" CONFIG_PATH = "/usr/config.json" # ----------------------------------------------------------------------------------------------------------------------------------------------------------- # Config # ----------------------------------------------------------------------------------------------------------------------------------------------------------- config = ql_fs.read_json(CONFIG_PATH) config = config if config else dict() config["gnss_timeout"] = ( config.get("gnss_timeout", 60) ) config["mqtt_host"] = ( config.get("mqtt_host", "mqtt.smartfire.ai") ) config["mqtt_port"] = config.get("mqtt_port", 1883) config["mqtt_username"] = ( config.get("mqtt_username", "admin") ) config["mqtt_password"] = ( config.get("mqtt_password", "123456789") ) config["mqtt_topic"] = config.get("mqtt_topic", "fes") config["network_sim_apn"] = ( config.get("network_sim_apn", "") ) config["feature_rfid_enabled"] = ( config.get("feature_rfid_enabled", True) ) config["rfid_antenna_power"] = ( config.get("rfid_antenna_power", 9) ) config["feature_gnss_enabled"] = ( config.get("feature_gnss_enabled", False) ) config["battery_adc_ratio"] = ( config.get("battery_adc_ratio", 0.012844) ) config["log_level"] = config.get("log_level", "info") ql_fs.touch(CONFIG_PATH, config) log.basicConfig( { "debug": log.DEBUG, "info": log.INFO, "warning": log.WARNING, "error": log.ERROR, "critical": log.CRITICAL, }.get(config["log_level"], "info") ) # ----------------------------------------------------------------------------------------------------------------------------------------------------------- # HAL # ----------------------------------------------------------------------------------------------------------------------------------------------------------- class LED: def __init__(self): self.pin = Pin(SIGNAL_LED_PIN, Pin.OUT, Pin.PULL_DISABLE, 0) self.pin.write(0) self.powerOn = 0 self.logger = log.getLogger("LED") def on(self): # self.logger.debug("LED On") self.pin.write(1) self.powerOn = 1 def off(self): # self.logger.debug("LED Off") self.pin.write(0) self.powerOn = 0 def toggle(self): # self.logger.debug("LED toggle") if self.powerOn: self.off() else: self.on() async def blink(self, pattern, repeat=-1): i = 0 self.logger.debug("LED start blinking with {} pattern".format(pattern)) if len(pattern) % 2 != 0: raise Exception("Length of pattern must be even to form repeatable pattern") while i < repeat * 2 or repeat == -1: self.toggle() await asyncio.sleep(pattern[i % len(pattern)]) # type: ignore i = i + 1 async def wait_and_blink(self, pattern, task): self.logger.debug("LED blinking while task running") blinking_task = asyncio.create_task(self.blink(pattern)) result = await task blinking_task.cancel() self.off() return result class MFB: def __init__(self): self.pin = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_PD, 0) # Due to hardware design flaw, trigger will cost signal generator to flow back signal to MFB pin # self.trigger = ExtInt(TRIGGER_PIN, ExtInt.IRQ_RISING_FALLING, ExtInt.PULL_DISABLE, self._disable_mfb) # type: ignore # self.trigger.enable() self.last_triggered = 0 def _disable_mfb(self, args): self.last_triggered = utime.time() def pressed(self): return self.pin.read() async def wait_press(self): while True: if self.pressed(): await asyncio.sleep_ms(100) # type: ignore if (utime.time() - self.last_triggered) > 2: return await asyncio.sleep_ms(2) # type: ignore class Trigger: def __init__(self): self.pin = Pin(TRIGGER_PIN, Pin.IN, Pin.PULL_PU, 1) def read(self): return self.pin.read() class LoadCell: def __init__(self, adc: ADC): self.adc = adc self.switch = Pin(LC_SWITCH_PIN, Pin.OUT, Pin.PULL_DISABLE, 0) async def power_on(self): self.switch.write(1) await asyncio.sleep(2) # type: ignore def power_off(self): self.switch.write(0) def read(self): return self.adc.read(LC_ADC) class Balance: def __init__(self, adc: ADC, signal_led: LED): self.load_cell = LoadCell(adc) self.trigger = Trigger() self.signal_led = signal_led self.logger = log.getLogger("Balance") self.parameters: dict[str, float] = ql_fs.read_json(LOAD_CELL_PARAM_PATH) def calibrated(self): return self.parameters is not None def loaded(self): return self.trigger.read() == 0 async def wait_for_load(self, duration=0): start = None while True: if self.loaded(): now = utime.time() start = start if start != None else now hold_duration = now - start if hold_duration >= duration: self.logger.debug("Trigger loaded for {}s".format(hold_duration)) return else: start = None await asyncio.sleep_ms(2) # type: ignore async def wait_for_release(self, duration=0): start = None while True: if not self.loaded(): now = utime.time() start = start if start != None else now hold_duration = now - start if hold_duration >= duration: self.logger.debug("Trigger released for {}s".format(hold_duration)) return else: start = None await asyncio.sleep_ms(2) # type: ignore async def calibrate(self): samples = [] for y in [1, 2, 3, 4, 5, 6, 7, 8, 9]: pattern = [0.1 for _ in range(y * 2)] pattern[-1] = 2 await self.signal_led.wait_and_blink(pattern, self.wait_for_load(10)) for _ in range(5): x = self.load_cell.read() data_point = (x, y) samples.append(data_point) self.logger.debug("Calibration sample collected: {}".format(data_point)) await asyncio.sleep_ms(500) # type: ignore await self.signal_led.wait_and_blink([0.5, 0.5], self.wait_for_release(3)) self.parameters = self.calculate_parameters(samples) ql_fs.touch(LOAD_CELL_PARAM_PATH, self.parameters) def calculate_parameters(self, samples): self.logger.debug("Start calibrating the load cell") n = len(samples) # Calculate means mean_x = sum(x for x, _ in samples) / n mean_y = sum(y for _, y in samples) / n # Calculate numerator and denominator for slope dx_data = [x - mean_x for x, _ in samples] denominator = sum(dx**2 for dx in dx_data) # Handle cases where denominator is zero (e.g., all x values are the same) if denominator == 0: raise ValueError("Cannot calculate slope: all x values are identical.") numerator = sum(dx * (y - mean_y) for dx, (_, y) in zip(dx_data, samples)) m = numerator / denominator c = mean_y - m * mean_x self.logger.debug( "Save calibration result to file, m = {}, c = {}".format(m, c) ) return {"m": m, "c": c} async def measure(self): await self.load_cell.power_on() if not self.parameters: raise Exception("Load cell is not calibrated") self.logger.debug("Start sampling for reading") samples = [] for _ in range(10): try: samples.append(self.load_cell.read()) await asyncio.sleep_ms(500) # type: ignore except Exception as e: self.logger.error("Failed to read load cell: {}".format(e)) self.load_cell.power_off() self.logger.debug("Samples collected: {}".format(samples)) avg = sum(samples) / len(samples) self.logger.debug("Average raw value: {}".format(avg)) weight = avg * self.parameters["m"] + self.parameters["c"] self.logger.debug( "LoadCell read {}kg (m = {}, c = {})".format( weight, self.parameters["m"], self.parameters["c"] ) ) return [weight, samples] # async def read_raw(self): # await self.load_cell.power_on() # raw = self.load_cell.read() # self.load_cell.power_off() # return raw class GNSS: def __init__(self): self.gnss_pin = Pin(GNSS_PIN, Pin.OUT, Pin.PULL_DISABLE, 0) self.gnss_pin.write(0) self.logger = log.getLogger("GNSS") quecgnss.configSet(0, 1) quecgnss.configSet(2, 1) if quecgnss.init() < 0: self.logger.error("Failed to initialize GNSS") # raise Exception("Failed to initialize GNSS") async def _get_location(self): location = None while location is None: if quecgnss.get_state() == 2: location = self.decode_gnss_info(quecgnss.read(4096)) if location is None: await asyncio.sleep(1) # type: ignore self.gnss_pin.write(0) return location def get_location(self, timeout=600): self.logger.debug("Getting GNSS location with timeout {}s".format(timeout)) self.gnss_pin.write(1) return asyncio.wait_for(self._get_location(), timeout) def decode_gnss_info(self, gnss_info): self.logger.debug("Decoding GNSS info") try: if gnss_info == -1: return None if gnss_info[1]: res = "".join(gnss_info[1]) res = ( res.replace("\r\n", "\n") .replace("\r", "\n") .replace("\n\n", "\n") .replace("\n", " ") .replace("$", "\n") ) res = res.split("\n") location_data = None for row in res: if row.startswith("GNGGA") or row.startswith("GPGGA"): location_data = row.split(",") if location_data is not None and location_data[2] and location_data[4]: lat = float(location_data[2]) long = float(location_data[4]) lat_degree = math.floor(lat / 100) lat_minutes = lat % 100 lat = lat_degree + (lat_minutes / 60) long_degree = math.floor(long / 100) long_minutes = long % 100 long = long_degree + (long_minutes / 60) self.logger.debug( "Decoded GNSS location: lat {}, long {}".format(lat, long) ) return { "lat": lat, "long": long, } else: return None except Exception as e: self.logger.error( "Error decoding GPS information: {}".format(sys.print_exception(e)) ) return None class BatteryMeter: def __init__(self, adc: ADC): self.switch = Pin(BAT_SWITCH_PIN, Pin.OUT, Pin.PULL_DISABLE, 0) self.adc = adc self.logger = log.getLogger("BatteryMeter") async def get_level(self): self.switch.write(1) samples = [] for _ in range(10): try: samples.append(self.adc.read(BAT_ADC)) await asyncio.sleep_ms(500) # pyright: ignore[reportGeneralTypeIssues] except Exception as e: self.logger.error("Failed to read battery voltage: {}".format(e)) self.switch.write(0) adc_value = sum(samples) / len(samples) battery_voltage = round( adc_value * config["battery_adc_ratio"], 2 ) # Convert mV to V battery_level = 0 if battery_voltage > 3.3: battery_level = 100 elif battery_voltage > 3.0: battery_level = (1 - (3.3 - battery_voltage) / 0.3273) * 100 elif battery_voltage > 2.7: battery_level = (1 - (6.3 - battery_voltage) / 3.6) * 100 else: battery_level = 0 return [round(battery_level, 2), samples] class RFIDReader: def __init__(self): self.uart = UART( RFID_UART, 115200, 8, 0, 1, 0, ) self.uart.set_callback(self.on_message) self.data = None self.logger = log.getLogger("RFIDReader") self._setting_antenna_power = False self._reading_tags = False self.tags = [] def on_message(self, para): if 0 != para[0]: return msg = self.uart.read(para[2]) self.logger.debug("Received RFID message: {}".format(msg)) msg_type = msg[1] command = msg[2] if msg_type == 1 and command == 0xB6: self.logger.debug("Antenna power set successful") self._setting_antenna_power = False if msg_type == 2 and command == 0x22: self.logger.debug("RFID Tag detected") self.tags = self.parse_card_data(msg) self._reading_tags = False if msg_type == 1 and command == 0x22: self.logger.debug("No RFID Tag detected") self.tags = [] self._reading_tags = False def parse_card_data(self, msg): tags = [] rssi_uint8 = msg[5] if rssi_uint8 & 0x80: # Check if the 8th bit (0x80 = 128) is set rssi = rssi_uint8 - 256 # Convert from unsigned to signed two's complement else: rssi = rssi_uint8 tag = {"rssi": rssi, "id": ubinascii.hexlify(msg[8:20])} tags.append(tag) return tags async def _read_tags(self): await asyncio.wait_for(self.set_antenna_power(config["rfid_antenna_power"]), 5) self.send_command(0x22, []) self.logger.debug("Reading card...") self._reading_tags = True while self._reading_tags: await asyncio.sleep_ms(50) # type: ignore return self.tags[0]["id"] if len(self.tags) > 0 else None async def read_tags(self): tag_id = None retry = 0 while not tag_id and retry < 5: try: tag_id = await asyncio.wait_for(self._read_tags(), 2) except: tag_id = None finally: retry = retry + 1 return tag_id def send_command(self, command, payload): payload_length = len(payload) pl_upper = payload_length >> 8 pl_lower = payload_length & 255 content = [command, pl_upper, pl_lower] content.extend(payload) cks = sum(content) & 255 encoded = [0xBB, 0x00, command, pl_upper, pl_lower] encoded.extend(payload) encoded.extend([cks, 0x7E]) encoded = bytearray(encoded) self.logger.debug("Encoded command: {}".format(encoded)) self.uart.write(encoded) async def set_antenna_power(self, power): if not (9 <= power <= 26): raise Exception( "Invalid power level: {}. Must be between 9 and 26.".format(power) ) power = power * 100 power_upper = power >> 8 power_lower = power & 255 self.send_command(0xB6, [power_upper, power_lower]) self._setting_antenna_power = True while self._setting_antenna_power: await asyncio.sleep_ms(100) # type: ignore return class Timer: def __init__(self): self.done_pin = Pin(DONE_PIN, Pin.OUT, Pin.PULL_PD, 0) self.done_pin.write(0) def power_off(self): self.done_pin.write(1) # Power.powerDown() # self.done_pin.write(0) async def noop(): return None class FES: def __init__(self): self.IMEI = modem.getDevImei() self.button = MFB() self.signal_led = LED() self.adc = ADC() self.adc.open() self.balance = Balance(self.adc, self.signal_led) self.battery_meter = BatteryMeter(self.adc) self.timer = Timer() if config["feature_gnss_enabled"]: self.gnss = GNSS() else: self.gnss = None if config["feature_rfid_enabled"]: self.rfid_reader = RFIDReader() else: self.rfid_reader = None self.mqtt_client = MQTTClient( client_id="FES-{}".format(self.IMEI), server=config["mqtt_host"], port=config["mqtt_port"], user=config["mqtt_username"], password=config["mqtt_password"], ssl=False, ssl_params=None, keepalive=30, reconn=True, ) self.logger = log.getLogger("FES") async def publish(self, msg): self.mqtt_client.connect() self.mqtt_client.publish( topic=config["mqtt_topic"], msg=msg, retain=False, qos=0 ) self.mqtt_client.disconnect() async def sampling(self): tasks = [ self.balance.measure(), self.battery_meter.get_level(), self.rfid_reader.read_tags() if self.rfid_reader else noop(), self.gnss.get_location() if self.gnss else noop(), ] result = await asyncio.gather(*tasks, return_exceptions=True) weight = result[0][0] battery = result[1][0] rfid_id = result[2] if not isinstance(result[2], Exception) else None location = result[3] if not isinstance(result[3], Exception) else None loadcell_raw = result[0][1] battery_raw = result[1][1] msg = { "id": self.IMEI, "loaded": bool(self.balance.loaded()), "weight": weight, "rfid_id": rfid_id, "battery": battery, "ts": utime.mktime(utime.localtime_ex()), # type: ignore "reset": False, "lat": location["lat"] if location else None, "long": location["long"] if location else None, "load_raw": loadcell_raw, "battery_raw": battery_raw, } msg = ujson.dumps(msg) self.logger.debug("Collected data: {}".format(msg)) return msg async def uplink(self): msg = await self.sampling() await self.publish(msg) async def run(self): self.signal_led.on() utime.sleep(5) self.signal_led.off() if not self.balance.calibrated(): await self.balance.calibrate() try: await self.signal_led.wait_and_blink([0.1, 0.1], self.uplink()) except Exception as e: self.logger.error("Failed to uplink: {}".format(e)) finally: await asyncio.sleep(5) # type: ignore self.timer.power_off() if __name__ == "__main__": fes = FES() asyncio.run(fes.run())