rd200.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. ## RaydonEye uses BLE LED Button Service (LBS) UUID 00001523-1212-EFDE-1523-785FEABCD123
  2. ## [!] ble only supports a single connection, if RadonEye App is running this will fail
  3. import BLE_GATT
  4. from gi.repository import GLib
  5. import subprocess
  6. import signal
  7. import time
  8. import struct
  9. import sqlite3
  10. BLUETOOTH_LBS_SERVICE_CHARACTERISTIC = '00001523-1212-EFDE-1523-785FEABCD123'
  11. BLUETOOTH_LBS_NOTIFY_READ_CHARACTERISTIC = '00001525-1212-EFDE-1523-785FEABCD123'
  12. BLUETOOTH_LBS_WRITE_CHARACTERISTIC = '00001524-1212-EFDE-1523-785FEABCD123'
  13. def connect(radonEyeMac = 'CF:CD:27:79:55:6B', dev = None):
  14. """
  15. Use Central Role BlueZ D-Bus API (BLE-GATT) to connect to peripheral (RadonEye)
  16. """
  17. print("connecting to radon sensor: ", radonEyeMac)
  18. while True:
  19. try:
  20. dev = BLE_GATT.Central(radonEyeMac)
  21. dev.connect()
  22. break
  23. except:
  24. print("device not found, retrying in a second...\n")
  25. time.sleep(1)
  26. ble_ctl(radonEyeMac)
  27. continue
  28. return dev
  29. # [!] To get this to work bluetoothctl needs to connect to the device before using BLE_GATT.connect
  30. def ble_ctl(addr:str, retry = 50):
  31. """
  32. use bluetoothctl interactively: scan on, ... wait ..., scan off, connect CF:CD:27:79:55:6B, exit
  33. subprocess has a relevant bug in line-buffering, see issue 21471
  34. """
  35. ctl = subprocess.Popen(['bluetoothctl'], stdin = subprocess.PIPE)
  36. def writeHelper(proc, str, delay = 1):
  37. proc.stdin.write(str.encode('utf-8'))
  38. proc.stdin.flush()
  39. time.sleep(delay)
  40. # send bluetoothctl commands interactively
  41. writeHelper(ctl, "scan on\n", 4)
  42. writeHelper(ctl, "scan off\n")
  43. writeHelper(ctl, "connect " + addr + "\n", 3)
  44. writeHelper(ctl, "exit\n")
  45. # wait for child subprocess to terminate, or kill after five seconds
  46. while True:
  47. if None != ctl.poll():
  48. break
  49. time.sleep(.1)
  50. if 0 >= retry--:
  51. print("bluetoothctl did not close in time, terminating...")
  52. ctl.terminate()
  53. # convert RD200 LBS response to integer value
  54. def radonEye_hundredths(res):
  55. """
  56. radonEye can only display 0.20 - 99.00 distinct values, internally they store each value
  57. as a IEEE754 value (4-byte big-endian float) -- 32 bits is a bit overkill when 12 would
  58. suffice, but we are not going to save space, instead, to simplify communication, we
  59. are going to send not picocuries per liter, and instead hundredths of picocurries per liter
  60. since this is the best resolution the radonEye can support and conversion to picocuries
  61. is divide by one hundred
  62. """
  63. return round(100 * struct.unpack('<f', bytes(res[2:6]))[0])
  64. # write 0x50 to LBS, unless disconnected teardown
  65. def radonEye_poll(LBS_device):
  66. try:
  67. print("timeout, pressing button")
  68. LBS_device.char_write(BLUETOOTH_LBS_WRITE_CHARACTERISTIC, b'\x50')
  69. except:
  70. print("device is not connected - attempting to cleanup")
  71. try:
  72. # remove notifications, exit event loop, disconnect device
  73. LBS_device.cleanup()
  74. except Exception as e:
  75. print("failed to cleanup BLE_GATT objects because: ", e)
  76. return False
  77. return True
  78. def notify_handler(value):
  79. print(" ".join(hex(x) for x in value))
  80. value = radonEye_hundredths(value)
  81. print(value / 100, " picocuries per liter")
  82. db = sqlite3.connect('radonEye.db')
  83. cu = db.cursor()
  84. cu.execute('SELECT value FROM samples WHERE rowid = (SELECT MAX(rowid) FROM samples)')
  85. last = cu.fetchone()
  86. if None == last or value != last[0]:
  87. print("saving...")
  88. try:
  89. cu.execute('INSERT INTO samples VALUES (?, ?)', (int(time.time()), value))
  90. db.commit()
  91. except Exception as e:
  92. print("python3 SQLite3 wrapper raised an error: ", e)
  93. db.close()
  94. def handler(signum, frame):
  95. if signal.SIGINT == signum:
  96. print("goodbye")
  97. exit()
  98. # terminate on [ctrl] + [c]
  99. signal.signal(signal.SIGINT, handler)
  100. # create a local database
  101. try:
  102. fd = open('radonEye.db')
  103. fd.close()
  104. except:
  105. db = sqlite3.connect('radonEye.db')
  106. cu = db.cursor()
  107. cu.execute('CREATE TABLE samples (time INTEGER, value INTEGER)')
  108. db.commit()
  109. db.close()
  110. while True:
  111. dev = connect()
  112. dev.on_value_change(BLUETOOTH_LBS_NOTIFY_READ_CHARACTERISTIC, notify_handler)
  113. # RadonEye RD200 does not notify on radon value change, instead poll 12 times an hour
  114. GLib.timeout_add_seconds(300, radonEye_poll, dev)
  115. dev.wait_for_notifications()
  116. print('restarting...')