ESP32+BME280+AI-RA08H Revision 2.0

Somsak Lima
4 min readMar 5, 2024

--

อุปกรณ์ที่ใช้

1.ESP32 Dev Board แบบแคบ เสียบลง Breadboard แล้วเหลือช่องด้านข้างด้านละหนึ่งรู
2. C ขนาด 2.2 uF ช่วยแก้ bug ให้สามารถ Flash โปรแกรมได้อัตโนมัติ
3. LED ไว้ทดสอบการ Downlink (ไม่จำเป็น)
4.ฺSensor อุณหภูมิความชื้น BME280 แบบ I2C มี 4 ขา
5.มอดูล AI-RA08H บัดกรีอยู่บน PCB และลง Firmware สำหรับความถี่ AS923 เรียบร้อยแล้ว

เชื่อมต่อวงจรตามภาพ

ขั้นตอนการดำเนินการ

1.ใช้โปรแกรม Thonny เพื่อ Flash Firmware MicroPython ลงใน ESP32 (เขียนรายละเอียดไว้บทความอื่น)
2.ใส่โปรแกรมด้านล่างลงใน ESP32 อาจจะตั้งชื่อเป็น main.py เพื่อให้ทำงานทันทีหลัง ESP บูท


import struct
import ubinascii
import bme280
import time
from machine import UART, Pin, SoftI2C
from LoRaWAN import lora
from cayennelpp import CayenneLPP
stop = False
motion = 0
led = Pin(2, Pin.OUT)
temp = 0
pa = 0
hum = 0
i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=10000)
bme = bme280.BME280(i2c=i2c)
lora = lora()
DEV_EUI = "1200000000000002"
APP_EUI = "0000000000000000"
APP_KEY = "20000000000000000000000000000001"

lora.configure(DEV_EUI, APP_EUI, APP_KEY)

lora.startJoin()
print("Start Join.....")
while not lora.checkJoinStatus():
print("Joining....")
time.sleep(10)
print("Join success!")

c = CayenneLPP()
c.addTemperature(1, float(temp))
c.addRelativeHumidity(3, float(hum))
payload= (ubinascii.hexlify(c.getBuffer()))
cnt = 1
relay1 = Pin(12, Pin.OUT)
button = Pin(4, Pin.IN)
button.value()

while True:
print("----------Wait 5 Second---------")
time.sleep(5.0)
print("\r\n\r\nPacket No #{}".format(cnt))
temp, pa, hum = bme.values
print("-----------------------------------")
print("BME280/BMP280 values:")
temp, pa, hum = bme.values
print('temp:', temp, ' Hum:', hum , 'PA:', pa)
print("-----------------------------------")

c = CayenneLPP()
c.addTemperature(1, float(temp))
c.addRelativeHumidity(3, float(hum))
payload= (ubinascii.hexlify(c.getBuffer()))
led.value(1)
print("Message to be Sent:", payload.decode('ascii'))
lora.sendMsg(payload.decode('ascii'))
response = lora.receiveMsg()

if (response != ""):
print("Received: ", end=": ")
print(response)

led.value(0)
time.sleep(1)
cnt = cnt + 1

3. Copy LoRaWAN.py ด้านล่างลงใน ESP32

import time
import binascii
from machine import UART

# https://m5stack.oss-cn-shenzhen.aliyuncs.com/resource/docs/datasheet/unit/lorawan/ASR650X%20AT%20Command%20Introduction-20190605.pdf
# https://github.com/iot-lnu/
class lora:

def __init__(self, debug=False):
self._serial = UART(2, 9600) # use RPI PICO GP0 and GP1
self.debug = debug
self.init()

def Init(self, serial, RX, TX):
self._serial = serial
self._serial.begin(9600, serial.SERIAL_8N1, RX, TX)
self._serial.flush()

def checkDeviceConnect(self):
restr = ""
time.sleep(2)
self.writeCMD("AT+CGMI?\r\n")
restr = self.getResponse()
if "OK" not in restr:
return False
else:
return True

def checkJoinStatus(self):
restr = ""
self.writeCMD("AT+CSTATUS?\r\n")
restr = self.getResponse()
if restr.find("+CSTATUS:") != -1:
if restr.find("04") != -1: # or restr.find("07") != -1 or restr.find("08") != -1:
return True
else:
return False
else:
return False

def waitMsg(self, t):

restr = ""
start = round(time.time() * 1000)
while True:
if (round(time.time() * 1000) - start) < t:
res = self._serial.readline()
if res:
if len(res) > 0:
restr += str(res)
else:
break
return restr

def writeCMD(self, command):
self._serial.write(command)
print ("\r\nCommand:",command)
time.sleep(0.1)

def sendMsg(self, data, confirm=1, nbtrials=1):

cmd = f"AT+DTRX={confirm},{nbtrials},{len(data)},{data}\r\n"
if self.debug:
print("SENT", cmd)
self.writeCMD(cmd)

def setSpreadingFactor(self, sf):

cmd = f"AT+CDATARATE={sf}\r\n"
self.writeCMD(cmd)
self.getResponse()

def receiveMsg(self):
restr = self.getResponse()
if restr.find("OK+RECV:") != -1 and restr.find("02,00,00") == -1:
data = restr[restr.find("OK+RECV:") + 17:-2]
return self.decodeMsg(data)
else:
return ""

def configOTTA(self, device_eui, app_eui, app_key, ul_dl_mode):
self.writeCMD("AT+CJOINMODE=0\r\n")
self.getResponse()
self.writeCMD("AT+CDEVEUI=" + device_eui + "\r\n")
self.getResponse()
self.writeCMD("AT+CAPPEUI=" + app_eui + "\r\n")
self.getResponse()
self.writeCMD("AT+CAPPKEY=" + app_key + "\r\n")
self.getResponse()
self.writeCMD("AT+CULDLMODE=" + ul_dl_mode + "\r\n")
self.getResponse()

def configABP(self, device_addr, app_skey, net_skey, ul_dl_mode):
self.writeCMD("AT+CJOINMODE=1\r\n")
self.getResponse()
self.writeCMD("AT+CDEVADDR=" + device_addr + "\r\n")
self.getResponse()
self.writeCMD("AT+CAPPSKEY=" + app_skey + "\r\n")
self.getResponse()
self.writeCMD("AT+CNWKSKEY=" + net_skey + "\r\n")
self.getResponse()
self.writeCMD("AT+CULDLMODE=" + ul_dl_mode + "\r\n")
self.getResponse()

def setClass(self, mode):
self.writeCMD("AT+CCLASS=" + mode + "\r\n")
self.getResponse()
time.sleep(2)

def setRxWindow(self, freq):
self.writeCMD("AT+CRXP=0,0," + freq + "\r\n")
self.getResponse()
time.sleep(2)

def setFreqMask(self, mask):
self.writeCMD("AT+CFREQBANDMASK=" + mask + "\r\n")
self.getResponse()
time.sleep(2)

def startJoin(self):
self.writeCMD("AT+CJOIN=1,0,10,8\r\n")

def decodeMsg(self, hexEncoded):
if len(hexEncoded) % 2 == 0:
buf = hexEncoded
tempbuf = [None] * len(hexEncoded)
i = 0
loop = 2
while loop < len(hexEncoded) + 1:
tmpstr = buf[loop - 2:loop]
tempbuf[i] = chr(int(tmpstr, 16))
i += 1
loop += 2
return "".join(tempbuf)
else:
return hexEncoded

def getResponse(self):
time.sleep(0.05)
restr = self.waitMsg(200)
if self.debug:
newrestr=restr

newrestr=restr.replace("\\r\\n'", "")
newrestr=newrestr.replace("b'", " ")
print("getResponse restr:",newrestr )
return restr

def init(self):
while not self.checkDeviceConnect():
pass
if self.debug:
print("Module Connected")

self.writeCMD("AT+CRESTORE\r\n")
self.getResponse()
# Disable Log Information
self.writeCMD("AT+ILOGLVL=1\r\n")
self.getResponse()
time.sleep(2)
self.writeCMD("AT+CSAVE\r\n")
self.getResponse()
time.sleep(2)
self.writeCMD("AT+IREBOOT=0\r\n")
self.getResponse()
time.sleep(2)
self.writeCMD('AT+DRX?')
self.getResponse()
time.sleep(2)

while not self.checkDeviceConnect():
pass

def configure(self, devui, appeui, appkey):
print("Module Config...")
self.configOTTA(devui, # Device EUI
appeui, # APP EUI
appkey, # APP KEY
"2" # Upload Download Mode
)

# Set Class Mode
self.setClass("2")
time.sleep(2)
self.writeCMD("AT+CWORKMODE=2\r\n")
self.getResponse()
time.sleep(2)
self.setFreqMask("0001")
time.sleep(2)

4. ต้องติดตั้ง Lib BME280.py ไว้ด้วย

ตัวอย่างการ RUN

ให้ Add Device แบบ OTAA เพิ่มใน TheThingsNetwork หรือบน Chirpstack หาก Join แบบ OTAA สำเร็จ จะเห็น Join success! และจะเริ่มมีการส่งข้อมูล

เมื่อเปิดดู Gateway Traffic ที่ LoRaWAN Gateway Console จะเห็น Packet เข้าที่ Gateway ตามตัวอย่าง (ผู้เขียนใช้ Multichannel LoRaWAN Gateway ยี่ห้อ Dragino รุ่น LPS8N-AS923-TH)

ทดสอบการ Downlink

ป้อนข้อมูลตามภาพใช้ FPort 2

หน้าจอ Thonny จะเห็นข้อมูล

AABBCCDDD ตามหลัง OK+RECV

หมายเหตุ
1.Choose between debug=True and debug=False on line 9 ของ File LoRaWAN.py เพื่อเปิดหรือปิด Debug โหมด

Reference: https://github.com/iot-lnu/applied-iot/tree/master/Raspberry%20Pi%20Pico%20(W)%20Micropython/network-examples/N4_LoRaWAN_Connection

--

--

Somsak Lima
Somsak Lima

Written by Somsak Lima

สนับสนุนและส่งเสริมให้ผู้สนใจสามารถใช้งานเทคโนโลยี LoRa และ LoRaWAN ได้ โดยนำความรู้ที่ได้ไปต่อยอดเพื่อใช้งาน

No responses yet