## RaydonEye uses BLE LED Button Service (LBS) UUID 00001523-1212-EFDE-1523-785FEABCD123 ## [!] ble only supports a single connection, if RadonEye App is running this will fail import BLE_GATT from gi.repository import GLib import subprocess import signal import time import struct import sqlite3 import requests import sys import os import errno BLUETOOTH_LBS_SERVICE_CHARACTERISTIC = '00001523-1212-EFDE-1523-785FEABCD123' BLUETOOTH_LBS_NOTIFY_READ_CHARACTERISTIC = '00001525-1212-EFDE-1523-785FEABCD123' BLUETOOTH_LBS_WRITE_CHARACTERISTIC = '00001524-1212-EFDE-1523-785FEABCD123' # second is 'S' C2:58:00:9A:26:29 # first purchased 'R' CF:CD:27:79:55:6B def getName(dbName = False): sensor = 'CF:CD:27:79:55:6B' if (2 > len(sys.argv)) else sys.argv[1] return 'radonEye' + sensor.replace(':', '-') + '.db' if dbName else sensor def connect(radonEyeMac, dev = None): """ Use Central Role BlueZ D-Bus API (BLE-GATT) to connect to peripheral (RadonEye) """ print("connecting to radon sensor: ", radonEyeMac) while True: try: dev = BLE_GATT.Central(radonEyeMac) # 2022 Aug 18th trying to fix a bug where we get stuck in an # infinite loop, I think it is due to BLE_GATT.Central.connect() # having a while condition sleep .5 - I find rd200.py strace stuck # in a select(tv_usec=500000) loop #dev.connect() ## replacement BLE_GATT.Central.connect() <------------------- dev.device.Connect() max = 10 while not dev.device.ServicesResolved: sleep(0.5) max -= 1 if 1 > max: raise Exception("waiting for GATT services to resolve") dev._get_device_chars() ## end replacement ------------------> break except: print("device not found, retrying in a second...\n") time.sleep(1) if(False == ble_ctl(radonEyeMac)): # try again with a power cycle print("try to recover on org.bluez.error...\n") time.sleep(1) weakAttempt(radonEyeMac) continue return dev def weakAttempt(radonEyeMac): """ Tries to handle org.bluez.Error.InProgress errors, this is not a very strong but does help in this specific situation without needing to do additional stuff - there is still a problem that another program can sever the BLE interface so be careful about using this """ lockFile = ".rd200-lockfile" try: # use a file as a lock (exclusive create) f = open(lockFile, mode='x') ble_ctl(radonEyeMac, powerCycle = True) # delete the file f.close() os.remove(lockFile) except Exception as e: if(errno.EEXIST == e.errno): # make sure the file lock is not broken try: old = os.stat(lockFile) if(60 < (int(time.time()) - int(old.st_mtime))): print("lock was borked, cleaning up...\n") os.remove(lockFile) except Exception as e2: print("failed to cleanup borked lock", e2) else: print("weakAttempt did not fix org.bluez.Error.InProgress", e) # [!] To get this to work bluetoothctl needs to connect to the device before using BLE_GATT.connect def ble_ctl(addr:str, retry = 50, powerCycle = False): """ use bluetoothctl interactively: scan on, ... wait ..., scan off, connect CF:CD:27:79:55:6B, exit [!] subprocess has a relevant bug in line-buffering, see issue 21471 """ def writeHelper(proc, str, delay = 1): proc.stdin.write(str.encode('utf-8')) proc.stdin.flush() time.sleep(delay) # get a PTY-like interface to control the bluetoothctl program process = subprocess.Popen(['bluetoothctl'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) # optional, handle "org.bluez.Error.InProgress" using bluetoothctl if(True == powerCycle): writeHelper(process, "power off\n", 2) writeHelper(process, "power on\n", 2) # send bluetoothctl commands interactively writeHelper(process, "scan on\n", 12) """ [!] technically we should wait here until we get the BLE peripheral advertisement, some random internet document says: 4.4.2.2.1 Advertising Interval is integer multiple of 0.625ms in the range of 20 milliseconds to 10.24 seconds """ writeHelper(process, "scan off\n") writeHelper(process, "connect " + addr + "\n", 3) writeHelper(process, "exit\n") # wait for child subprocess to terminate, or kill after five seconds while True: if None != process.poll(): break time.sleep(.1) if 0 < retry: retry -= 1 else: print("bluetoothctl did not close in time, terminating...") process.terminate() try: out, err = process.communicate() except Exception as e: print("tried to close subprocess stdin but got error: ", e) # compress logging when bluetoothctl reports: "No default controller available" if(0 < out.decode("utf-8").count("No default controller available")): print("bluetoothctl reports reports no default controller") # in this case bluetoothctl power off / on does nothing... # if this is logged then bt-snuggler did not prepare environment # check for possible bluetoothctl error: # "Failed to start discovery: org.bluez.Error.InProgress" if(0 < out.decode("utf-8").count("org.bluez.Error")): return False # convert RD200 LBS response to integer value def radonEye_hundredths(res): """ radonEye can only display 0.20 - 99.00 distinct values, internally they store each value as a IEEE754 value (4-byte big-endian float) -- 32 bits is a bit overkill when 12 would suffice, but we are not going to save space, instead, to simplify communication, we are going to send not picocuries per liter, and instead hundredths of picocurries per liter since this is the best resolution the radonEye can support and conversion to picocuries is divide by one hundred """ return round(100 * struct.unpack('