You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
152 lines
6.1 KiB
152 lines
6.1 KiB
import asyncio |
|
import signal |
|
import argparse |
|
from pathlib import Path |
|
from bleak import BleakScanner |
|
from datetime import datetime |
|
|
|
DEBUG_MODE = False |
|
|
|
# Переопределение print для работы с флагом --debug |
|
_original_print = print |
|
|
|
def debug_print(*args, **kwargs): |
|
"""Выводит сообщения только в режиме отладки""" |
|
if DEBUG_MODE: |
|
_original_print(*args, **kwargs) |
|
|
|
# Переопределяем глобальный print |
|
print = debug_print |
|
|
|
def log_data(device_id, data_type, value): |
|
|
|
now = datetime.now() |
|
year_month = now.strftime("%Y_%m") |
|
day = now.strftime("%d") |
|
|
|
log_dir = Path(device_id) / data_type / year_month / day |
|
log_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
filename = log_dir / f"{device_id}_{data_type}_{year_month}_{day}.log" |
|
|
|
timestamp = now.strftime("%Y-%m-%d %H:%M:%S") |
|
with open(filename, "a", encoding="utf-8") as f: |
|
f.write(f"{timestamp} {value}\n") |
|
|
|
async def scan_bthome(): |
|
def callback(device, adv_data): |
|
if device.name and device.name.startswith("ATC_"): |
|
device_id = device.name |
|
|
|
srv_data = adv_data.service_data.get("0000fcd2-0000-1000-8000-00805f9b34fb") |
|
if not srv_data: |
|
return |
|
data = bytes(srv_data) |
|
if len(data) < 3: |
|
return |
|
|
|
# Парсинг формата BTHome v2 |
|
# Структура зависит от длины пакета: |
|
# - Пакет 10 байт: только напряжение (байты 4-5) |
|
# - Пакет 11 байт: батарея (байт 4), температура (байты 6-7), влажность (байты 9-10) |
|
|
|
temp = None |
|
hum = None |
|
batt_percent = None |
|
volt = None |
|
|
|
# Проверяем формат BTHome (начинается с 0x40) |
|
if len(data) > 0 and data[0] == 0x40: |
|
if len(data) == 10: |
|
# Пакет только с напряжением |
|
# Байты 4-5: напряжение (little endian, /1000) |
|
if len(data) >= 6: |
|
volt_raw = int.from_bytes(data[4:6], "little", signed=False) |
|
volt = volt_raw / 1000.0 |
|
elif len(data) >= 11: |
|
# Пакет с температурой, влажностью и батареей |
|
# Байт 4: батарея |
|
if len(data) >= 5: |
|
batt_percent = data[4] |
|
# Байты 6-7: температура (little endian, /100) |
|
if len(data) >= 8: |
|
temp_raw = int.from_bytes(data[6:8], "little", signed=False) |
|
temp = temp_raw / 100.0 |
|
# Байты 9-10: влажность (little endian, /100) |
|
if len(data) >= 11: |
|
hum_raw = int.from_bytes(data[9:11], "little", signed=False) |
|
hum = hum_raw / 100.0 |
|
|
|
# Логирование |
|
if temp is not None: |
|
log_data(device_id, "temperature", f"{temp:.2f}") |
|
if hum is not None: |
|
log_data(device_id, "humidity", f"{hum:.2f}") |
|
if batt_percent is not None: |
|
log_data(device_id, "battery", f"{batt_percent}") |
|
if volt is not None: |
|
log_data(device_id, "voltage", f"{volt:.3f}") |
|
|
|
# Выводим только те значения, которые удалось прочитать |
|
values = [] |
|
if temp is not None: |
|
values.append(f"Температура: {temp:.2f} °C") |
|
if hum is not None: |
|
values.append(f"Влажность: {hum:.2f} %") |
|
if batt_percent is not None: |
|
values.append(f"Батарея: {batt_percent} %") |
|
if volt is not None: |
|
values.append(f"Напряжение: {volt:.3f} В") |
|
|
|
if values: |
|
print(f"{device.name} ({device.address})") |
|
print(", ".join(values)) |
|
print("-" * 50) |
|
|
|
print("Сканирование BLE...") |
|
scanner = BleakScanner(callback) |
|
await scanner.start() |
|
|
|
# Создаем событие для остановки |
|
stop_event = asyncio.Event() |
|
|
|
def signal_handler(): |
|
"""Обработчик сигналов для корректной остановки""" |
|
stop_event.set() |
|
|
|
# Регистрируем обработчики сигналов через asyncio |
|
loop = asyncio.get_event_loop() |
|
if hasattr(loop, 'add_signal_handler'): |
|
# Linux/Unix |
|
try: |
|
loop.add_signal_handler(signal.SIGINT, signal_handler) |
|
loop.add_signal_handler(signal.SIGTERM, signal_handler) |
|
except NotImplementedError: |
|
# Если add_signal_handler не поддерживается, используем обычный способ |
|
signal.signal(signal.SIGINT, lambda s, f: signal_handler()) |
|
signal.signal(signal.SIGTERM, lambda s, f: signal_handler()) |
|
else: |
|
# Windows |
|
signal.signal(signal.SIGINT, lambda s, f: signal_handler()) |
|
signal.signal(signal.SIGTERM, lambda s, f: signal_handler()) |
|
|
|
# Ждем сигнала завершения |
|
await stop_event.wait() |
|
|
|
await scanner.stop() |
|
print("Сканирование остановлено.") |
|
|
|
def main(): |
|
"""Главная функция с парсингом аргументов командной строки""" |
|
global DEBUG_MODE |
|
|
|
parser = argparse.ArgumentParser(description="Сканер BLE устройств BTHome") |
|
parser.add_argument("--debug", action="store_true", help="Включить отладочный вывод") |
|
args = parser.parse_args() |
|
|
|
DEBUG_MODE = args.debug |
|
|
|
asyncio.run(scan_bthome()) |
|
|
|
if __name__ == "__main__": |
|
main() |