Oliv'

I am currently working with BLE beacons -only iBeacons for now- with Espressif’s esp-idf libraries. The aim is to use the ESP32 as a gateway to published detected beacons to an MQTT broker. It was surprisingly easy and I will write something about that when the code will be more polished.

Anyway, by curiosity and to speed up development, I wanted to try microPython port to ESP32, and I did. Unfortunately, BLE support is not yet reliable enough and a lot of advertisements were lost with 4 to 6 beacons. This post is to keep track of my work and, hopefully, use in the future when microPython port will be more reliable :-)

microPython compilation

While compiling microPython was really straightforward, I had one issue afterward: I was unable to use the MQTT library, neither by calling it mqtt or umqtt, nor by putting it manually in flash. I did not investigate why but I found that adding it to the “frozen” scripts worked.

The code used is from Pycom with Wipy 2.0 as target. I used this commit against this version of esp-idf.

  • Clone both repositories, update esp-idf’s submodules
  • Set PATH to xtensa compilation tools. Be careful to use only version 1.22.0-61 for the mentioned Pycom & esp-idf commits.
  • Set IDF_PATH to esp-idf directory
  • Change directory to pycom-micropython-sigfox
  • run
    • cd mpy-cross && make clean && make
    • cd ../ESP32/frozen
    • wget https://raw.githubusercontent.com/micropython/micropython-lib/a09b3ec20a745cfd1bccb7bd9db513bfe36968db/umqtt.simple/umqtt/simple.py
    • cd ..
    • make BOARD=WIPY TARGET=boot
    • make BOARD=WIPY TARGET=app
    • make BOARD=WIPY flash

You can now connect to microPython REPL on your ESP32!

##Python BLE tracker Following is a quick and dirty code I used to test it. It does:

  • Connect to WiFi
  • Connect to MQTT server
  • Scan for BLE advertisements
  • Send one MQTT message by discovered device
from network import Bluetooth, WLAN
from mqtt import MQTTClient
from binascii import hexlify
import machine
import time
import ujson


def hexlifyNone(object):
    return None if object is None else hexlify(object)


def scan():
    devices = {}
    print("***   Starting   ***")
    bt = Bluetooth()
    print("Scan...", end="")
    bt.start_scan(10)
    time.sleep(10)
    print("OK")
    advs = bt.get_advertisements()
    print(advs)

    for adv in advs:
        devices[hexlify(adv.mac)] = {
            "Mac": hexlify(adv.mac),
            "Rssi": adv.rssi,
            "Data": hexlifyNone(adv.data),
            "MfrData": hexlifyNone(
                bt.resolve_adv_data(adv.data, Bluetooth.ADV_MANUFACTURER_DATA)
            ),
            "NameShort": hexlifyNone(
                bt.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_SHORT)
            ),
            "NameComplete": hexlifyNone(
                bt.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL)
            ),
            "Flag": bt.resolve_adv_data(adv.data, Bluetooth.ADV_FLAG),
            "Appearance": hexlifyNone(
                bt.resolve_adv_data(adv.data, Bluetooth.ADV_APPEARANCE)
            ),
        }
        """
        print( 'MAC: {}\n\tRSSI: {}\n\tData: {}\n\tName: {}\n\tMfrData: {}'.format(
                hexlify( adv.mac ),
                adv.rssi,
                hexlify( adv.data ),
                bt.resolve_adv_data( adv.data, Bluetooth.ADV_NAME_SHORT ),
                hexlify( bt.resolve_adv_data( adv.data, Bluetooth.ADV_MANUFACTURER_DATA ) )
                )
              )
        """
    print("\n***********\nDevices: ", devices)
    for key, data in devices.items():
        print("\n***********\n", data)
        msgJson = ujson.dumps(data)
        client.publish(topic="/test", msg=msgJson)

    print("***   End   ***")


def sub_cb(topic, msg):
    print(msg)


wlan = WLAN(mode=WLAN.STA)
wlan.connect("SSID name", auth=(WLAN.WPA2, "password"), timeout=5000)
while not wlan.isconnected():
    machine.idle()
print("Connected to Wifi\n")

client = MQTTClient(
    "device_id",
    "MQTT broker address",
    user="user if any",
    password="password if any",
    port=1883,
    ssl=True,
)
client.set_callback(sub_cb)
client.connect()
client.subscribe(topic="/test")
client.publish(topic="/test", msg="Started")
scan()

comments powered by Disqus