import asyncio import signal import argparse from pathlib import Path from bleak import BleakScanner from datetime import datetime DEBUG_MODE = False # Переопределение print для работы с флагом --debug _original_print = print def debug_print(*args, **kwargs): """Выводит сообщения только в режиме отладки""" if DEBUG_MODE: _original_print(*args, **kwargs) # Переопределяем глобальный print print = debug_print def log_data(device_id, data_type, value): now = datetime.now() year_month = now.strftime("%Y_%m") day = now.strftime("%d") log_dir = Path(device_id) / data_type / year_month / day log_dir.mkdir(parents=True, exist_ok=True) filename = log_dir / f"{device_id}_{data_type}_{year_month}_{day}.log" timestamp = now.strftime("%Y-%m-%d %H:%M:%S") with open(filename, "a", encoding="utf-8") as f: f.write(f"{timestamp} {value}\n") async def scan_bthome(): def callback(device, adv_data): if device.name and device.name.startswith("ATC_"): device_id = device.name srv_data = adv_data.service_data.get("0000fcd2-0000-1000-8000-00805f9b34fb") if not srv_data: return data = bytes(srv_data) if len(data) < 3: return # Парсинг формата BTHome v2 # Структура зависит от длины пакета: # - Пакет 10 байт: только напряжение (байты 4-5) # - Пакет 11 байт: батарея (байт 4), температура (байты 6-7), влажность (байты 9-10) temp = None hum = None batt_percent = None volt = None # Проверяем формат BTHome (начинается с 0x40) if len(data) > 0 and data[0] == 0x40: if len(data) == 10: # Пакет только с напряжением # Байты 4-5: напряжение (little endian, /1000) if len(data) >= 6: volt_raw = int.from_bytes(data[4:6], "little", signed=False) volt = volt_raw / 1000.0 elif len(data) >= 11: # Пакет с температурой, влажностью и батареей # Байт 4: батарея if len(data) >= 5: batt_percent = data[4] # Байты 6-7: температура (little endian, /100) if len(data) >= 8: temp_raw = int.from_bytes(data[6:8], "little", signed=False) temp = temp_raw / 100.0 # Байты 9-10: влажность (little endian, /100) if len(data) >= 11: hum_raw = int.from_bytes(data[9:11], "little", signed=False) hum = hum_raw / 100.0 # Логирование if temp is not None: log_data(device_id, "temperature", f"{temp:.2f}") if hum is not None: log_data(device_id, "humidity", f"{hum:.2f}") if batt_percent is not None: log_data(device_id, "battery", f"{batt_percent}") if volt is not None: log_data(device_id, "voltage", f"{volt:.3f}") # Выводим только те значения, которые удалось прочитать values = [] if temp is not None: values.append(f"Температура: {temp:.2f} °C") if hum is not None: values.append(f"Влажность: {hum:.2f} %") if batt_percent is not None: values.append(f"Батарея: {batt_percent} %") if volt is not None: values.append(f"Напряжение: {volt:.3f} В") if values: print(f"{device.name} ({device.address})") print(", ".join(values)) print("-" * 50) print("Сканирование BLE...") scanner = BleakScanner(callback, active=False) await scanner.start() # Создаем событие для остановки stop_event = asyncio.Event() def signal_handler(): """Обработчик сигналов для корректной остановки""" stop_event.set() # Регистрируем обработчики сигналов через asyncio loop = asyncio.get_event_loop() if hasattr(loop, 'add_signal_handler'): # Linux/Unix try: loop.add_signal_handler(signal.SIGINT, signal_handler) loop.add_signal_handler(signal.SIGTERM, signal_handler) except NotImplementedError: # Если add_signal_handler не поддерживается, используем обычный способ signal.signal(signal.SIGINT, lambda s, f: signal_handler()) signal.signal(signal.SIGTERM, lambda s, f: signal_handler()) else: # Windows signal.signal(signal.SIGINT, lambda s, f: signal_handler()) signal.signal(signal.SIGTERM, lambda s, f: signal_handler()) # Ждем сигнала завершения await stop_event.wait() await scanner.stop() print("Сканирование остановлено.") def main(): """Главная функция с парсингом аргументов командной строки""" global DEBUG_MODE parser = argparse.ArgumentParser(description="Сканер BLE устройств BTHome") parser.add_argument("--debug", action="store_true", help="Включить отладочный вывод") args = parser.parse_args() DEBUG_MODE = args.debug asyncio.run(scan_bthome()) if __name__ == "__main__": main()