#!/usr/bin/env python3 # ============================================================================== # Конвертер MSG (BUFR) в NPZ # Объединенный модуль для извлечения, парсинга и конвертации # ИСПОЛЬЗОВАНИЕ: # # Из командной строки: # python msg_converter.py input.msg [output.npz] # # Из Python: # from msg_converter import convert_msg_to_npz, load_npz_data # convert_msg_to_npz('input.msg') # data = load_npz_data('output.npz') # # Copyright (c) 2025 SFG # # Все права защищены. Распространяется под пользовательской лицензией. # Подробности см. в README.md. # ============================================================================== import re import numpy as np from pathlib import Path from bitarray import bitarray from multiprocessing import Pool, cpu_count # ============================================================================ # Таблицы дескрипторов и последовательностей (что-то вроде таблиц B и D из BUFR) # ============================================================================ DESCRIPTORS = { "0 01 018": {"bits": 40, "scale": 0, "offset": 0, "type": "IA5", "name": "SHORT STATION NAME"}, "0 01 015": {"bits": 160, "scale": 0, "offset": 0, "type": "IA5", "name": "STATION NAME"}, "0 05 002": {"bits": 15, "scale": -2, "offset": -9000, "type": "INT", "name": "LATITUDE"}, "0 06 002": {"bits": 16, "scale": -2, "offset": -18000,"type": "INT", "name": "LONGITUDE"}, "0 07 001": {"bits": 15, "scale": 0, "offset": -400, "type": "INT", "name": "HEIGHT OF STATION"}, "0 12 004": {"bits": 12, "scale": -1, "offset": 0, "type": "INT", "name": "DRY BULB TEMPERATURE AT 2M"}, "0 08 001": {"bits": 7, "scale": 0, "offset": 0, "type": "UINT","name": "VERTICAL SOUNDING SIGNIFICANCE"}, "0 07 002": {"bits": 16, "scale": 1, "offset": -40, "type": "INT", "name": "HEIGHT/ALTITUDE"}, "0 19 005": {"bits": 9, "scale": 0, "offset": 0, "type": "INT", "name": "DIRECTION OF MOTION"}, "0 19 006": {"bits": 14, "scale": -2, "offset": 0, "type": "INT", "name": "SPEED OF MOTION"}, "0 29 001": {"bits": 3, "scale": 0, "offset": 0, "type": "UINT","name": "PROJECTION TYPE"}, "0 30 031": {"bits": 4, "scale": 0, "offset": 0, "type": "UINT","name": "PICTURE TYPE"}, "0 30 021": {"bits": 12, "scale": 0, "offset": 0, "type": "INT", "name": "PIXELS PER ROW"}, "0 30 022": {"bits": 12, "scale": 0, "offset": 0, "type": "INT", "name": "PIXELS PER COLUMN"}, "0 05 033": {"bits": 16, "scale": 1, "offset": 0, "type": "INT", "name": "PIXEL SIZE HORIZ-1"}, "0 06 033": {"bits": 16, "scale": 1, "offset": 0, "type": "INT", "name": "PIXEL SIZE HORIZ-2"}, "0 10 040": {"bits": 10, "scale": 0, "offset": 0, "type": "INT", "name": "NUMBER OF RETRIEVED LAYERS"}, "0 07 006": {"bits": 15, "scale": 0, "offset": 0, "type": "INT", "name": "HEIGHT ABOVE STATION"}, "0 08 007": {"bits": 4, "scale": 0, "offset": 0, "type": "UINT","name": "DIMENSIONAL SIGNIFICANCE"}, "0 31 002": {"bits": 16, "scale": 0, "offset": 0, "type": "UINT","name": "EXTENDED DELAYED DESCRIPT.REPLIC.FACTOR"}, "0 31 012": {"bits": 16, "scale": 0, "offset": 0, "type": "UINT","name": "EXT.DEL.DESCRIPT.AND DATA REPETIT.FACTOR"}, "0 21 001": {"bits": 7, "scale": 0, "offset": -64, "type": "UINT", "name": "HORIZONTAL REFLECTIVITY"}, "0 21 003": {"bits": 7, "scale": -1, "offset": -5, "type": "UINT", "name": "DIFFERENTIAL REFLECTIVITY"}, "0 21 014": {"bits": 13, "scale": -1, "offset": -4096, "type": "UINT", "name": "DOPPLER MEAN VELOCITY RADIAL"}, "0 21 022": {"bits": 5, "scale": 0, "offset": 0, "type": "UINT", "name": "METEO [SFG NOTE: UNKNOWN]"}, } SEQUENCES = { "3 01 024": ["0 05 002", "0 06 002", "0 07 001"], # LAT/LON/HEIGHT } DEBUG = False # ============================================================================ # BUFR Reader # ============================================================================ class BitReader: """Побитовое чтение данных""" def __init__(self, data, start_bit=0, end_bit=None): if isinstance(data, str): with open(data, "rb") as f: data = f.read() if isinstance(data, bytes): bits = bitarray() bits.frombytes(data) else: bits = data self.bits = bits[start_bit:end_bit] if end_bit else bits[start_bit:] self.pos = 0 def __len__(self): return len(self.bits) def tell(self): return self.pos def seek(self, bit_pos): self.pos = bit_pos def read_bits(self, n, restore=False): if self.pos + n > len(self.bits): raise EOFError("конец битового потока") val = 0 for bit in self.bits[self.pos:self.pos + n]: val = (val << 1) | bit if not restore: self.pos += n return val def read_bytes(self, n, restore=False): return self.read_bits(n * 8, restore) def read_uint(self, n, restore=False): return self.read_bits(n, restore) def read_int(self, n, signed=True, restore=False): raw = self.read_bits(n, restore) if signed and n > 1 and (raw & (1 << (n - 1))): raw -= 1 << n return raw def read_ia5(self, n_bits, restore=False): n_bytes = (n_bits + 7) // 8 val = self.read_bits(n_bytes * 8, restore) raw = val.to_bytes(n_bytes, "big") return raw.decode("ascii", errors="ignore").rstrip("\x00").strip() def read_descriptor(self, desc_code): SEQ = SEQUENCES.get(desc_code) if SEQ: return [self.read_descriptor(s) for s in SEQ] desc = DESCRIPTORS.get(desc_code) if not desc: raise ValueError(f"Неизвестный дескриптор: {desc_code}") bits, scale, offset = desc["bits"], desc["scale"], desc["offset"] dtype = desc["type"] val = -9999999 raw_val = -9999999 if dtype == "IA5": val = self.read_ia5(bits) elif dtype == "INT": raw_val = self.read_int(bits) elif dtype == "UINT": raw_val = self.read_uint(bits) if dtype != "IA5": val = (raw_val + offset) * (10 ** scale) if "DIFFERENTIAL" in desc["name"]: val = val - int(val > 6) * 12 # hot fix from FK code if "DOPPLER" in desc["name"]: if val < -50: val = np.nan if DEBUG: print(f"desc_code: {desc_code}, val: {val}") return val def read_section1(reader): """Читает секцию 1 BUFR""" length1 = reader.read_uint(16) return { "length": length1, "master_table": reader.read_uint(8), "originating_center": reader.read_uint(8), "sub_center": reader.read_uint(8), "update_sequence": reader.read_uint(8), "flags": reader.read_uint(8), "data_category": reader.read_uint(8), "local_subcat": reader.read_uint(8), "int_subcat": reader.read_uint(8), "master_table_version": reader.read_uint(8), "local_table_version": reader.read_uint(8), "year": reader.read_uint(8), "month": reader.read_uint(8), "day": reader.read_uint(8), "hour": reader.read_uint(8), "minute": reader.read_uint(8), "second": reader.read_uint(8), # [в BUFR из MSG это поле используется некорректно, секунд тут нет] } def read_section3(reader): """Читает секцию 3 BUFR""" start = reader.tell() length = reader.read_uint(24) reserved = reader.read_uint(8) num_subsets = reader.read_uint(16) flags = reader.read_uint(8) descriptors = [] while reader.tell() < start + length * 8: remaining = start + length * 8 - reader.tell() if remaining < 16: break desc = reader.read_uint(16) f = desc >> 14 x = (desc >> 8) & 0x3F y = desc & 0xFF if f == 0 and x == 0 and y == 0: break descriptors.append((f, x, y)) # Пропускаем padding padding_bits = start + length * 8 - reader.tell() if padding_bits > 0: reader.read_uint(padding_bits) return { "length": length, "num_subsets": num_subsets, "descriptors": descriptors, } def parse_bufr_block(bufr_data, data_type='dBz'): """ Парсит BUFR и возвращает сетку 100x100 Args: bufr_data: данные BUFR, извлеченные из MSG data_type: тип данных ('dBz', 'dBzD', 'doppler', 'meteo') Returns: numpy array (100, 100) или None """ try: reader = BitReader(bufr_data) # BUFR по стандарту обычно в начале, # но для единообразия с старыми DAT-файлами # оставлено название IPRN***RUDN в начале данных. bufr_pos = bufr_data.find(b'BUFR') if bufr_pos == -1 or bufr_pos > 1000: return None reader.seek(bufr_pos * 8) # Заголовок reader.read_ia5(32) # BUFR reader.read_uint(24) # длина всего BUFR reader.read_uint(8) # версия BUFR # SFG: хотя указывается что версия BUFR 2, # но в BUFR из MSG используется частично формат версии 4, # частично формат версии 2, плюс нестандартные дескрипторы. # поэтому подавляющее большинство софта для работы с BUFR # не будет работать с BUFR из MSG. # Секция 1 read_section1(reader) # Секция 2 sec2_len = reader.read_bytes(3, restore=True) if sec2_len > 0: reader.read_bytes(sec2_len) # Секция 3 sec3_data = read_section3(reader) # Список дескрипторов descriptor_list = [f"{f} {x:02d} {y:03d}" for f, x, y in sec3_data["descriptors"]] # Секция 4 reader.read_bytes(3) # длина секции в байтах reader.read_bytes(1) # зарезервировано # Читаем метаданные всех дескрипторов for code in descriptor_list: reader.read_descriptor(code) if code == "0 08 007": break # SFG: почти всех, некоторые стандартные дескрипторы отсутствуют. # Дескриптор данных по типу descriptor_map = { 'dBz': "0 21 001", 'dBzD': "0 21 003", 'doppler': "0 21 014", 'meteo': "0 21 022" } descriptor_code = descriptor_map.get(data_type, "0 21 001") # Извлекаем пиксели напрямую в numpy массив grid = np.full(10000, np.nan, dtype=np.float32) position = 0 n_pixels_with_data = reader.read_descriptor("0 31 002") for _ in range(n_pixels_with_data): n_skip = reader.read_descriptor("0 31 012") marker = reader.read_descriptor(descriptor_code) # Заполняем пропущенные пиксели if n_skip > 0: # and position + n_skip <= 10000: grid[position:position + n_skip] = marker position += n_skip # Пиксели с данными n_values = reader.read_descriptor("0 31 002") for _ in range(n_values): if position >= 10000: break value = reader.read_descriptor(descriptor_code) grid[position] = value position += 1 grid = grid.reshape(100, 100) # Offset и scale уже применены в read_descriptor через таблицу DESCRIPTORS, # поэтому здесь, в отличие от старого кода для DAT-файлов, нет необходимости применять их еще раз. return grid except Exception as e: print(f"Error parsing BUFR block: {e}") return None # ============================================================================ # Извлечение блоков с BUFR из MSG # ============================================================================ class LayerConfig: """Конфигурация слоев""" DBZ_LAYERS = list(range(40, 51)) # 11 слоев DBZD_LAYERS = list(range(56, 66)) # 10 слоев DOPPLER_LAYERS = list(range(78, 88)) # 10 слоев METEO_LAYER = 70 @classmethod def get_all_layers(cls): """Возвращает все слои""" return cls.DBZ_LAYERS + cls.DBZD_LAYERS + cls.DOPPLER_LAYERS + [cls.METEO_LAYER] def extract_msg_blocks(msg_file_path, verbose=True): """ Извлекает BUFR блоки из MSG файла в память Returns: dict: {(layer, time): block_data} """ if verbose: print(f"Чтение MSG: {msg_file_path}") with open(msg_file_path, 'rb') as f: data = f.read() # Поиск окончаний блоков pattern = b'7777\r\r\n\r\r\n\x03' block_ends = [] start = 0 while True: pos = data.find(pattern, start) if pos == -1: break block_ends.append(pos + len(pattern)) start = pos + 1 if verbose: print(f"Найдено блоков: {len(block_ends)}") # Извлекаем блоки blocks = {} block_starts = [0] + block_ends[:-1] for start_pos, end_pos in zip(block_starts, block_ends): block_data = data[start_pos:end_pos] # Извлекаем метаданные iprn_match = re.search(b'IPRN(\d+)\s+RUDN\s+(\d{6})', block_data) if iprn_match: layer = int(iprn_match.group(1).decode()) time_str = iprn_match.group(2).decode() blocks[(layer, time_str)] = block_data if verbose: times = sorted(set(t for l, t in blocks.keys())) print(f"Временных меток: {len(times)}") print(f"Слоев: {len(set(l for l, t in blocks.keys()))}") return blocks # ============================================================================ # Конвертация в NPZ # ============================================================================ def _process_single_time(args): """ Обработка одной временной метки (для параллелизации) Args: args: (time_str, blocks_for_time, time_idx) Returns: tuple: (time_idx, dbz_data, dbzd_data, doppler_data, meteo_data) """ time_str, blocks_for_time, time_idx = args # Инициализация массивов для одной временной метки dbz = np.full((100, 100, 11), np.nan, dtype=np.float32) dbzd = np.full((100, 100, 10), np.nan, dtype=np.float32) doppler = np.full((100, 100, 10), np.nan, dtype=np.float32) meteo = np.full((100, 100), np.nan, dtype=np.float32) # dBz слои (40-50) for layer_idx, layer in enumerate(LayerConfig.DBZ_LAYERS): block_data = blocks_for_time.get(layer) if block_data: grid = parse_bufr_block(block_data, 'dBz') if grid is not None: dbz[:, :, layer_idx] = grid # dBzD слои (56-65) for layer_idx, layer in enumerate(LayerConfig.DBZD_LAYERS): block_data = blocks_for_time.get(layer) if block_data: grid = parse_bufr_block(block_data, 'dBzD') if grid is not None: dbzd[:, :, layer_idx] = grid # Doppler слои (78-87) for layer_idx, layer in enumerate(LayerConfig.DOPPLER_LAYERS): block_data = blocks_for_time.get(layer) if block_data: grid = parse_bufr_block(block_data, 'doppler') if grid is not None: doppler[:, :, layer_idx] = grid # Meteo слой (70) block_data = blocks_for_time.get(LayerConfig.METEO_LAYER) if block_data: grid = parse_bufr_block(block_data, 'meteo') if grid is not None: meteo[:, :] = grid return (time_idx, dbz, dbzd, doppler, meteo) def convert_msg_to_npz(msg_file_path, output_path=None, verbose=True, n_workers=None): """ Args: msg_file_path: путь к MSG файлу output_path: путь к выходному NPZ (опционально) verbose: выводить ли прогресс n_workers: количество процессов (по умолчанию cpu_count()) Returns: путь к сохраненному NPZ """ msg_file_path = Path(msg_file_path) if n_workers is None: n_workers = cpu_count() if verbose: print("=" * 70) print("Конвертация MSG в NPZ") print("=" * 70) print("\n[1/3] Извлечение BUFR блоков...") blocks = extract_msg_blocks(msg_file_path, verbose=verbose) if verbose: print(f"\n[2/3] Парсинг данных (workers: {n_workers})...") times = sorted(set(t for l, t in blocks.keys())) n_times = len(times) dbz = np.full((n_times, 100, 100, 11), np.nan, dtype=np.float32) dbzd = np.full((n_times, 100, 100, 10), np.nan, dtype=np.float32) doppler = np.full((n_times, 100, 100, 10), np.nan, dtype=np.float32) meteo = np.full((n_times, 100, 100), np.nan, dtype=np.float32) # Подготовка данных для параллельной обработки tasks = [] for time_idx, time_str in enumerate(times): # Собираем все блоки для данной временной метки blocks_for_time = {} for layer in LayerConfig.get_all_layers(): block_data = blocks.get((layer, time_str)) if block_data: blocks_for_time[layer] = block_data tasks.append((time_str, blocks_for_time, time_idx)) with Pool(processes=n_workers) as pool: results = pool.map(_process_single_time, tasks) if verbose: print(f" Сборка результатов...") for time_idx, dbz_t, dbzd_t, doppler_t, meteo_t in results: dbz[time_idx] = dbz_t dbzd[time_idx] = dbzd_t doppler[time_idx] = doppler_t meteo[time_idx] = meteo_t if verbose: print("\n[3/3] Сохранение в NPZ...") if output_path is None: date_match = re.search(r'(\d{8})', msg_file_path.stem) if date_match: date_str = date_match.group(1) formatted_date = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" else: formatted_date = msg_file_path.stem output_dir = msg_file_path.parent output_dir.mkdir(exist_ok=True) output_path = output_dir / f"{formatted_date}.npz" output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) np.savez_compressed( output_path, dbz=dbz, dbzd=dbzd, doppler=doppler, meteo=meteo, times=np.array(times, dtype='S6') ) if verbose: file_size_mb = output_path.stat().st_size / (1024 * 1024) print(f"\n{'=' * 70}") print(f"Готово! Файл: {output_path}") print(f"Размер: {file_size_mb:.2f} МБ") print(f"{'=' * 70}\n") return output_path def load_npz_data(npz_path): """Загружает данные из NPZ""" data = np.load(npz_path) return { 'dbz': data['dbz'], 'dbzd': data['dbzd'], 'doppler': data['doppler'], 'meteo': data['meteo'] if 'meteo' in data else np.zeros((144, 100, 100)), 'times': data['times'] if 'times' in data else None } if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Использование: python msg_converter.py [output.npz]") sys.exit(1) msg_file = sys.argv[1] output_file = sys.argv[2] if len(sys.argv) > 2 else None try: result_path = convert_msg_to_npz(msg_file, output_file, verbose=True) print(f"Успешно: {result_path}") except Exception as e: print(f"Ошибка: {e}") import traceback traceback.print_exc() sys.exit(1)