Софт для логгирования данных с BLE датчиков температуры и влажности LYWSD03MMC с прошивкой ATC
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

152 lines
6.1 KiB

import asyncio
import signal
import argparse
from pathlib import Path
from bleak import BleakScanner
from datetime import datetime
DEBUG_MODE = False
# Переопределение print для работы с флагом --debug
_original_print = print
def debug_print(*args, **kwargs):
"""Выводит сообщения только в режиме отладки"""
if DEBUG_MODE:
_original_print(*args, **kwargs)
# Переопределяем глобальный print
print = debug_print
def log_data(device_id, data_type, value):
now = datetime.now()
year_month = now.strftime("%Y_%m")
day = now.strftime("%d")
log_dir = Path(device_id) / data_type / year_month / day
log_dir.mkdir(parents=True, exist_ok=True)
filename = log_dir / f"{device_id}_{data_type}_{year_month}_{day}.log"
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
with open(filename, "a", encoding="utf-8") as f:
f.write(f"{timestamp} {value}\n")
async def scan_bthome():
def callback(device, adv_data):
if device.name and device.name.startswith("ATC_"):
device_id = device.name
srv_data = adv_data.service_data.get("0000fcd2-0000-1000-8000-00805f9b34fb")
if not srv_data:
return
data = bytes(srv_data)
if len(data) < 3:
return
# Парсинг формата BTHome v2
# Структура зависит от длины пакета:
# - Пакет 10 байт: только напряжение (байты 4-5)
# - Пакет 11 байт: батарея (байт 4), температура (байты 6-7), влажность (байты 9-10)
temp = None
hum = None
batt_percent = None
volt = None
# Проверяем формат BTHome (начинается с 0x40)
if len(data) > 0 and data[0] == 0x40:
if len(data) == 10:
# Пакет только с напряжением
# Байты 4-5: напряжение (little endian, /1000)
if len(data) >= 6:
volt_raw = int.from_bytes(data[4:6], "little", signed=False)
volt = volt_raw / 1000.0
elif len(data) >= 11:
# Пакет с температурой, влажностью и батареей
# Байт 4: батарея
if len(data) >= 5:
batt_percent = data[4]
# Байты 6-7: температура (little endian, /100)
if len(data) >= 8:
temp_raw = int.from_bytes(data[6:8], "little", signed=False)
temp = temp_raw / 100.0
# Байты 9-10: влажность (little endian, /100)
if len(data) >= 11:
hum_raw = int.from_bytes(data[9:11], "little", signed=False)
hum = hum_raw / 100.0
# Логирование
if temp is not None:
log_data(device_id, "temperature", f"{temp:.2f}")
if hum is not None:
log_data(device_id, "humidity", f"{hum:.2f}")
if batt_percent is not None:
log_data(device_id, "battery", f"{batt_percent}")
if volt is not None:
log_data(device_id, "voltage", f"{volt:.3f}")
# Выводим только те значения, которые удалось прочитать
values = []
if temp is not None:
values.append(f"Температура: {temp:.2f} °C")
if hum is not None:
values.append(f"Влажность: {hum:.2f} %")
if batt_percent is not None:
values.append(f"Батарея: {batt_percent} %")
if volt is not None:
values.append(f"Напряжение: {volt:.3f} В")
if values:
print(f"{device.name} ({device.address})")
print(", ".join(values))
print("-" * 50)
print("Сканирование BLE...")
scanner = BleakScanner(callback, active=False)
await scanner.start()
# Создаем событие для остановки
stop_event = asyncio.Event()
def signal_handler():
"""Обработчик сигналов для корректной остановки"""
stop_event.set()
# Регистрируем обработчики сигналов через asyncio
loop = asyncio.get_event_loop()
if hasattr(loop, 'add_signal_handler'):
# Linux/Unix
try:
loop.add_signal_handler(signal.SIGINT, signal_handler)
loop.add_signal_handler(signal.SIGTERM, signal_handler)
except NotImplementedError:
# Если add_signal_handler не поддерживается, используем обычный способ
signal.signal(signal.SIGINT, lambda s, f: signal_handler())
signal.signal(signal.SIGTERM, lambda s, f: signal_handler())
else:
# Windows
signal.signal(signal.SIGINT, lambda s, f: signal_handler())
signal.signal(signal.SIGTERM, lambda s, f: signal_handler())
# Ждем сигнала завершения
await stop_event.wait()
await scanner.stop()
print("Сканирование остановлено.")
def main():
"""Главная функция с парсингом аргументов командной строки"""
global DEBUG_MODE
parser = argparse.ArgumentParser(description="Сканер BLE устройств BTHome")
parser.add_argument("--debug", action="store_true", help="Включить отладочный вывод")
args = parser.parse_args()
DEBUG_MODE = args.debug
asyncio.run(scan_bthome())
if __name__ == "__main__":
main()