123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- ## RaydonEye uses BLE LED Button Service (LBS) UUID 00001523-1212-EFDE-1523-785FEABCD123
- ## [!] ble only supports a single connection, if RadonEye App is running this will fail
- import BLE_GATT
- from gi.repository import GLib
- import subprocess
- import signal
- import time
- import struct
- import sqlite3
- BLUETOOTH_LBS_SERVICE_CHARACTERISTIC = '00001523-1212-EFDE-1523-785FEABCD123'
- BLUETOOTH_LBS_NOTIFY_READ_CHARACTERISTIC = '00001525-1212-EFDE-1523-785FEABCD123'
- BLUETOOTH_LBS_WRITE_CHARACTERISTIC = '00001524-1212-EFDE-1523-785FEABCD123'
- def connect(radonEyeMac = 'CF:CD:27:79:55:6B', dev = None):
- """
- Use Central Role BlueZ D-Bus API (BLE-GATT) to connect to peripheral (RadonEye)
- """
- print("connecting to radon sensor: ", radonEyeMac)
- while True:
- try:
- dev = BLE_GATT.Central(radonEyeMac)
- dev.connect()
- break
- except:
- print("device not found, retrying in a second...\n")
- time.sleep(1)
- ble_ctl(radonEyeMac)
- continue
- return dev
- # [!] To get this to work bluetoothctl needs to connect to the device before using BLE_GATT.connect
- def ble_ctl(addr:str, retry = 50):
- """
- use bluetoothctl interactively: scan on, ... wait ..., scan off, connect CF:CD:27:79:55:6B, exit
- subprocess has a relevant bug in line-buffering, see issue 21471
- """
- ctl = subprocess.Popen(['bluetoothctl'], stdin = subprocess.PIPE)
- def writeHelper(proc, str, delay = 1):
- proc.stdin.write(str.encode('utf-8'))
- proc.stdin.flush()
- time.sleep(delay)
- # send bluetoothctl commands interactively
- writeHelper(ctl, "scan on\n", 4)
- writeHelper(ctl, "scan off\n")
- writeHelper(ctl, "connect " + addr + "\n", 3)
- writeHelper(ctl, "exit\n")
- # wait for child subprocess to terminate, or kill after five seconds
- while True:
- if None != ctl.poll():
- break
- time.sleep(.1)
- if 0 >= retry--:
- print("bluetoothctl did not close in time, terminating...")
- ctl.terminate()
- # convert RD200 LBS response to integer value
- def radonEye_hundredths(res):
- """
- radonEye can only display 0.20 - 99.00 distinct values, internally they store each value
- as a IEEE754 value (4-byte big-endian float) -- 32 bits is a bit overkill when 12 would
- suffice, but we are not going to save space, instead, to simplify communication, we
- are going to send not picocuries per liter, and instead hundredths of picocurries per liter
- since this is the best resolution the radonEye can support and conversion to picocuries
- is divide by one hundred
- """
- return round(100 * struct.unpack('<f', bytes(res[2:6]))[0])
- # write 0x50 to LBS, unless disconnected teardown
- def radonEye_poll(LBS_device):
- try:
- print("timeout, pressing button")
- LBS_device.char_write(BLUETOOTH_LBS_WRITE_CHARACTERISTIC, b'\x50')
- except:
- print("device is not connected - attempting to cleanup")
- try:
- # remove notifications, exit event loop, disconnect device
- LBS_device.cleanup()
- except Exception as e:
- print("failed to cleanup BLE_GATT objects because: ", e)
- return False
- return True
- def notify_handler(value):
- print(" ".join(hex(x) for x in value))
- value = radonEye_hundredths(value)
- print(value / 100, " picocuries per liter")
- db = sqlite3.connect('radonEye.db')
- cu = db.cursor()
- cu.execute('SELECT value FROM samples WHERE rowid = (SELECT MAX(rowid) FROM samples)')
- last = cu.fetchone()
- if None == last or value != last[0]:
- print("saving...")
- try:
- cu.execute('INSERT INTO samples VALUES (?, ?)', (int(time.time()), value))
- db.commit()
- except Exception as e:
- print("python3 SQLite3 wrapper raised an error: ", e)
- db.close()
- def handler(signum, frame):
- if signal.SIGINT == signum:
- print("goodbye")
- exit()
- # terminate on [ctrl] + [c]
- signal.signal(signal.SIGINT, handler)
- # create a local database
- try:
- fd = open('radonEye.db')
- fd.close()
- except:
- db = sqlite3.connect('radonEye.db')
- cu = db.cursor()
- cu.execute('CREATE TABLE samples (time INTEGER, value INTEGER)')
- db.commit()
- db.close()
- while True:
- dev = connect()
- dev.on_value_change(BLUETOOTH_LBS_NOTIFY_READ_CHARACTERISTIC, notify_handler)
- # RadonEye RD200 does not notify on radon value change, instead poll 12 times an hour
- GLib.timeout_add_seconds(300, radonEye_poll, dev)
- dev.wait_for_notifications()
- print('restarting...')
|