ESP32+BME280+AI-RA08H Revision 2.0
อุปกรณ์ที่ใช้
1.ESP32 Dev Board แบบแคบ เสียบลง Breadboard แล้วเหลือช่องด้านข้างด้านละหนึ่งรู
2. C ขนาด 2.2 uF ช่วยแก้ bug ให้สามารถ Flash โปรแกรมได้อัตโนมัติ
3. LED ไว้ทดสอบการ Downlink (ไม่จำเป็น)
4.ฺSensor อุณหภูมิความชื้น BME280 แบบ I2C มี 4 ขา
5.มอดูล AI-RA08H บัดกรีอยู่บน PCB และลง Firmware สำหรับความถี่ AS923 เรียบร้อยแล้ว
เชื่อมต่อวงจรตามภาพ
ขั้นตอนการดำเนินการ
1.ใช้โปรแกรม Thonny เพื่อ Flash Firmware MicroPython ลงใน ESP32 (เขียนรายละเอียดไว้บทความอื่น)
2.ใส่โปรแกรมด้านล่างลงใน ESP32 อาจจะตั้งชื่อเป็น main.py เพื่อให้ทำงานทันทีหลัง ESP บูท
import struct
import ubinascii
import bme280
import time
from machine import UART, Pin, SoftI2C
from LoRaWAN import lora
from cayennelpp import CayenneLPP
stop = False
motion = 0
led = Pin(2, Pin.OUT)
temp = 0
pa = 0
hum = 0
i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=10000)
bme = bme280.BME280(i2c=i2c)
lora = lora()
DEV_EUI = "1200000000000002"
APP_EUI = "0000000000000000"
APP_KEY = "20000000000000000000000000000001"
lora.configure(DEV_EUI, APP_EUI, APP_KEY)
lora.startJoin()
print("Start Join.....")
while not lora.checkJoinStatus():
print("Joining....")
time.sleep(10)
print("Join success!")
c = CayenneLPP()
c.addTemperature(1, float(temp))
c.addRelativeHumidity(3, float(hum))
payload= (ubinascii.hexlify(c.getBuffer()))
cnt = 1
relay1 = Pin(12, Pin.OUT)
button = Pin(4, Pin.IN)
button.value()
while True:
print("----------Wait 5 Second---------")
time.sleep(5.0)
print("\r\n\r\nPacket No #{}".format(cnt))
temp, pa, hum = bme.values
print("-----------------------------------")
print("BME280/BMP280 values:")
temp, pa, hum = bme.values
print('temp:', temp, ' Hum:', hum , 'PA:', pa)
print("-----------------------------------")
c = CayenneLPP()
c.addTemperature(1, float(temp))
c.addRelativeHumidity(3, float(hum))
payload= (ubinascii.hexlify(c.getBuffer()))
led.value(1)
print("Message to be Sent:", payload.decode('ascii'))
lora.sendMsg(payload.decode('ascii'))
response = lora.receiveMsg()
if (response != ""):
print("Received: ", end=": ")
print(response)
led.value(0)
time.sleep(1)
cnt = cnt + 1
3. Copy LoRaWAN.py ด้านล่างลงใน ESP32
import time
import binascii
from machine import UART
# https://m5stack.oss-cn-shenzhen.aliyuncs.com/resource/docs/datasheet/unit/lorawan/ASR650X%20AT%20Command%20Introduction-20190605.pdf
# https://github.com/iot-lnu/
class lora:
def __init__(self, debug=False):
self._serial = UART(2, 9600) # use RPI PICO GP0 and GP1
self.debug = debug
self.init()
def Init(self, serial, RX, TX):
self._serial = serial
self._serial.begin(9600, serial.SERIAL_8N1, RX, TX)
self._serial.flush()
def checkDeviceConnect(self):
restr = ""
time.sleep(2)
self.writeCMD("AT+CGMI?\r\n")
restr = self.getResponse()
if "OK" not in restr:
return False
else:
return True
def checkJoinStatus(self):
restr = ""
self.writeCMD("AT+CSTATUS?\r\n")
restr = self.getResponse()
if restr.find("+CSTATUS:") != -1:
if restr.find("04") != -1: # or restr.find("07") != -1 or restr.find("08") != -1:
return True
else:
return False
else:
return False
def waitMsg(self, t):
restr = ""
start = round(time.time() * 1000)
while True:
if (round(time.time() * 1000) - start) < t:
res = self._serial.readline()
if res:
if len(res) > 0:
restr += str(res)
else:
break
return restr
def writeCMD(self, command):
self._serial.write(command)
print ("\r\nCommand:",command)
time.sleep(0.1)
def sendMsg(self, data, confirm=1, nbtrials=1):
cmd = f"AT+DTRX={confirm},{nbtrials},{len(data)},{data}\r\n"
if self.debug:
print("SENT", cmd)
self.writeCMD(cmd)
def setSpreadingFactor(self, sf):
cmd = f"AT+CDATARATE={sf}\r\n"
self.writeCMD(cmd)
self.getResponse()
def receiveMsg(self):
restr = self.getResponse()
if restr.find("OK+RECV:") != -1 and restr.find("02,00,00") == -1:
data = restr[restr.find("OK+RECV:") + 17:-2]
return self.decodeMsg(data)
else:
return ""
def configOTTA(self, device_eui, app_eui, app_key, ul_dl_mode):
self.writeCMD("AT+CJOINMODE=0\r\n")
self.getResponse()
self.writeCMD("AT+CDEVEUI=" + device_eui + "\r\n")
self.getResponse()
self.writeCMD("AT+CAPPEUI=" + app_eui + "\r\n")
self.getResponse()
self.writeCMD("AT+CAPPKEY=" + app_key + "\r\n")
self.getResponse()
self.writeCMD("AT+CULDLMODE=" + ul_dl_mode + "\r\n")
self.getResponse()
def configABP(self, device_addr, app_skey, net_skey, ul_dl_mode):
self.writeCMD("AT+CJOINMODE=1\r\n")
self.getResponse()
self.writeCMD("AT+CDEVADDR=" + device_addr + "\r\n")
self.getResponse()
self.writeCMD("AT+CAPPSKEY=" + app_skey + "\r\n")
self.getResponse()
self.writeCMD("AT+CNWKSKEY=" + net_skey + "\r\n")
self.getResponse()
self.writeCMD("AT+CULDLMODE=" + ul_dl_mode + "\r\n")
self.getResponse()
def setClass(self, mode):
self.writeCMD("AT+CCLASS=" + mode + "\r\n")
self.getResponse()
time.sleep(2)
def setRxWindow(self, freq):
self.writeCMD("AT+CRXP=0,0," + freq + "\r\n")
self.getResponse()
time.sleep(2)
def setFreqMask(self, mask):
self.writeCMD("AT+CFREQBANDMASK=" + mask + "\r\n")
self.getResponse()
time.sleep(2)
def startJoin(self):
self.writeCMD("AT+CJOIN=1,0,10,8\r\n")
def decodeMsg(self, hexEncoded):
if len(hexEncoded) % 2 == 0:
buf = hexEncoded
tempbuf = [None] * len(hexEncoded)
i = 0
loop = 2
while loop < len(hexEncoded) + 1:
tmpstr = buf[loop - 2:loop]
tempbuf[i] = chr(int(tmpstr, 16))
i += 1
loop += 2
return "".join(tempbuf)
else:
return hexEncoded
def getResponse(self):
time.sleep(0.05)
restr = self.waitMsg(200)
if self.debug:
newrestr=restr
newrestr=restr.replace("\\r\\n'", "")
newrestr=newrestr.replace("b'", " ")
print("getResponse restr:",newrestr )
return restr
def init(self):
while not self.checkDeviceConnect():
pass
if self.debug:
print("Module Connected")
self.writeCMD("AT+CRESTORE\r\n")
self.getResponse()
# Disable Log Information
self.writeCMD("AT+ILOGLVL=1\r\n")
self.getResponse()
time.sleep(2)
self.writeCMD("AT+CSAVE\r\n")
self.getResponse()
time.sleep(2)
self.writeCMD("AT+IREBOOT=0\r\n")
self.getResponse()
time.sleep(2)
self.writeCMD('AT+DRX?')
self.getResponse()
time.sleep(2)
while not self.checkDeviceConnect():
pass
def configure(self, devui, appeui, appkey):
print("Module Config...")
self.configOTTA(devui, # Device EUI
appeui, # APP EUI
appkey, # APP KEY
"2" # Upload Download Mode
)
# Set Class Mode
self.setClass("2")
time.sleep(2)
self.writeCMD("AT+CWORKMODE=2\r\n")
self.getResponse()
time.sleep(2)
self.setFreqMask("0001")
time.sleep(2)
4. ต้องติดตั้ง Lib BME280.py ไว้ด้วย
ตัวอย่างการ RUN
ให้ Add Device แบบ OTAA เพิ่มใน TheThingsNetwork หรือบน Chirpstack หาก Join แบบ OTAA สำเร็จ จะเห็น Join success! และจะเริ่มมีการส่งข้อมูล
เมื่อเปิดดู Gateway Traffic ที่ LoRaWAN Gateway Console จะเห็น Packet เข้าที่ Gateway ตามตัวอย่าง (ผู้เขียนใช้ Multichannel LoRaWAN Gateway ยี่ห้อ Dragino รุ่น LPS8N-AS923-TH)
ทดสอบการ Downlink
ป้อนข้อมูลตามภาพใช้ FPort 2
หน้าจอ Thonny จะเห็นข้อมูล
AABBCCDDD ตามหลัง OK+RECV
หมายเหตุ
1.Choose between debug=True and debug=False on line 9 ของ File LoRaWAN.py เพื่อเปิดหรือปิด Debug โหมด