Новый код для чтения BUFR данных метеорологического радара из .msg-файлов и конвертации в .npz
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

579 lines
21 KiB

#!/usr/bin/env python3
# ==============================================================================
# Конвертер MSG (BUFR) в NPZ
# Объединенный модуль для извлечения, парсинга и конвертации
# ИСПОЛЬЗОВАНИЕ:
# # Из командной строки:
# python msg_converter.py input.msg [output.npz]
# # Из Python:
# from msg_converter import convert_msg_to_npz, load_npz_data
# convert_msg_to_npz('input.msg')
# data = load_npz_data('output.npz')
#
# Copyright (c) 2025 SFG
#
# Все права защищены. Распространяется под пользовательской лицензией.
# Подробности см. в README.md.
# ==============================================================================
import re
import numpy as np
from pathlib import Path
from bitarray import bitarray
from multiprocessing import Pool, cpu_count
# ============================================================================
# Таблицы дескрипторов и последовательностей (что-то вроде таблиц B и D из BUFR)
# ============================================================================
DESCRIPTORS = {
"0 01 018": {"bits": 40, "scale": 0, "offset": 0, "type": "IA5", "name": "SHORT STATION NAME"},
"0 01 015": {"bits": 160, "scale": 0, "offset": 0, "type": "IA5", "name": "STATION NAME"},
"0 05 002": {"bits": 15, "scale": -2, "offset": -9000, "type": "INT", "name": "LATITUDE"},
"0 06 002": {"bits": 16, "scale": -2, "offset": -18000,"type": "INT", "name": "LONGITUDE"},
"0 07 001": {"bits": 15, "scale": 0, "offset": -400, "type": "INT", "name": "HEIGHT OF STATION"},
"0 12 004": {"bits": 12, "scale": -1, "offset": 0, "type": "INT", "name": "DRY BULB TEMPERATURE AT 2M"},
"0 08 001": {"bits": 7, "scale": 0, "offset": 0, "type": "UINT","name": "VERTICAL SOUNDING SIGNIFICANCE"},
"0 07 002": {"bits": 16, "scale": 1, "offset": -40, "type": "INT", "name": "HEIGHT/ALTITUDE"},
"0 19 005": {"bits": 9, "scale": 0, "offset": 0, "type": "INT", "name": "DIRECTION OF MOTION"},
"0 19 006": {"bits": 14, "scale": -2, "offset": 0, "type": "INT", "name": "SPEED OF MOTION"},
"0 29 001": {"bits": 3, "scale": 0, "offset": 0, "type": "UINT","name": "PROJECTION TYPE"},
"0 30 031": {"bits": 4, "scale": 0, "offset": 0, "type": "UINT","name": "PICTURE TYPE"},
"0 30 021": {"bits": 12, "scale": 0, "offset": 0, "type": "INT", "name": "PIXELS PER ROW"},
"0 30 022": {"bits": 12, "scale": 0, "offset": 0, "type": "INT", "name": "PIXELS PER COLUMN"},
"0 05 033": {"bits": 16, "scale": 1, "offset": 0, "type": "INT", "name": "PIXEL SIZE HORIZ-1"},
"0 06 033": {"bits": 16, "scale": 1, "offset": 0, "type": "INT", "name": "PIXEL SIZE HORIZ-2"},
"0 10 040": {"bits": 10, "scale": 0, "offset": 0, "type": "INT", "name": "NUMBER OF RETRIEVED LAYERS"},
"0 07 006": {"bits": 15, "scale": 0, "offset": 0, "type": "INT", "name": "HEIGHT ABOVE STATION"},
"0 08 007": {"bits": 4, "scale": 0, "offset": 0, "type": "UINT","name": "DIMENSIONAL SIGNIFICANCE"},
"0 31 002": {"bits": 16, "scale": 0, "offset": 0, "type": "UINT","name": "EXTENDED DELAYED DESCRIPT.REPLIC.FACTOR"},
"0 31 012": {"bits": 16, "scale": 0, "offset": 0, "type": "UINT","name": "EXT.DEL.DESCRIPT.AND DATA REPETIT.FACTOR"},
"0 21 001": {"bits": 7, "scale": 0, "offset": -64, "type": "UINT", "name": "HORIZONTAL REFLECTIVITY"},
"0 21 003": {"bits": 7, "scale": -1, "offset": -5, "type": "UINT", "name": "DIFFERENTIAL REFLECTIVITY"},
"0 21 014": {"bits": 13, "scale": -1, "offset": -4096, "type": "UINT", "name": "DOPPLER MEAN VELOCITY RADIAL"},
"0 21 022": {"bits": 5, "scale": 0, "offset": 0, "type": "UINT", "name": "METEO [SFG NOTE: UNKNOWN]"},
}
SEQUENCES = {
"3 01 024": ["0 05 002", "0 06 002", "0 07 001"], # LAT/LON/HEIGHT
}
DEBUG = False
# ============================================================================
# BUFR Reader
# ============================================================================
class BitReader:
"""Побитовое чтение данных"""
def __init__(self, data, start_bit=0, end_bit=None):
if isinstance(data, str):
with open(data, "rb") as f:
data = f.read()
if isinstance(data, bytes):
bits = bitarray()
bits.frombytes(data)
else:
bits = data
self.bits = bits[start_bit:end_bit] if end_bit else bits[start_bit:]
self.pos = 0
def __len__(self):
return len(self.bits)
def tell(self):
return self.pos
def seek(self, bit_pos):
self.pos = bit_pos
def read_bits(self, n, restore=False):
if self.pos + n > len(self.bits):
raise EOFError("конец битового потока")
val = 0
for bit in self.bits[self.pos:self.pos + n]:
val = (val << 1) | bit
if not restore:
self.pos += n
return val
def read_bytes(self, n, restore=False):
return self.read_bits(n * 8, restore)
def read_uint(self, n, restore=False):
return self.read_bits(n, restore)
def read_int(self, n, signed=True, restore=False):
raw = self.read_bits(n, restore)
if signed and n > 1 and (raw & (1 << (n - 1))):
raw -= 1 << n
return raw
def read_ia5(self, n_bits, restore=False):
n_bytes = (n_bits + 7) // 8
val = self.read_bits(n_bytes * 8, restore)
raw = val.to_bytes(n_bytes, "big")
return raw.decode("ascii", errors="ignore").rstrip("\x00").strip()
def read_descriptor(self, desc_code):
SEQ = SEQUENCES.get(desc_code)
if SEQ:
return [self.read_descriptor(s) for s in SEQ]
desc = DESCRIPTORS.get(desc_code)
if not desc:
raise ValueError(f"Неизвестный дескриптор: {desc_code}")
bits, scale, offset = desc["bits"], desc["scale"], desc["offset"]
dtype = desc["type"]
val = -9999999
raw_val = -9999999
if dtype == "IA5":
val = self.read_ia5(bits)
elif dtype == "INT":
raw_val = self.read_int(bits)
elif dtype == "UINT":
raw_val = self.read_uint(bits)
if dtype != "IA5":
val = (raw_val + offset) * (10 ** scale)
# Hot fixes
if "DIFFERENTIAL" in desc["name"]:
val = val - int(val > 6) * 12
if "DOPPLER" in desc["name"]:
if val < -50:
val = np.nan
if DEBUG:
print(f"desc_code: {desc_code}, val: {val}")
return val
def read_section1(reader):
"""Читает секцию 1 BUFR"""
length1 = reader.read_uint(16)
return {
"length": length1,
"master_table": reader.read_uint(8),
"originating_center": reader.read_uint(8),
"sub_center": reader.read_uint(8),
"update_sequence": reader.read_uint(8),
"flags": reader.read_uint(8),
"data_category": reader.read_uint(8),
"local_subcat": reader.read_uint(8),
"int_subcat": reader.read_uint(8),
"master_table_version": reader.read_uint(8),
"local_table_version": reader.read_uint(8),
"year": reader.read_uint(8),
"month": reader.read_uint(8),
"day": reader.read_uint(8),
"hour": reader.read_uint(8),
"minute": reader.read_uint(8),
"second": reader.read_uint(8), # [в BUFR из MSG это поле используется некорректно, секунд тут нет]
}
def read_section3(reader):
"""Читает секцию 3 BUFR"""
start = reader.tell()
length = reader.read_uint(24)
reserved = reader.read_uint(8)
num_subsets = reader.read_uint(16)
flags = reader.read_uint(8)
descriptors = []
while reader.tell() < start + length * 8:
remaining = start + length * 8 - reader.tell()
if remaining < 16:
break
desc = reader.read_uint(16)
f = desc >> 14
x = (desc >> 8) & 0x3F
y = desc & 0xFF
if f == 0 and x == 0 and y == 0:
break
descriptors.append((f, x, y))
# Пропускаем padding
padding_bits = start + length * 8 - reader.tell()
if padding_bits > 0:
reader.read_uint(padding_bits)
return {
"length": length,
"num_subsets": num_subsets,
"descriptors": descriptors,
}
def parse_bufr_block(bufr_data, data_type='dBz'):
"""
Парсит BUFR и возвращает сетку 100x100
Args:
bufr_data: данные BUFR, извлеченные из MSG
data_type: тип данных ('dBz', 'dBzD', 'doppler', 'meteo')
Returns:
numpy array (100, 100) или None
"""
try:
reader = BitReader(bufr_data)
# BUFR по стандарту обычно в начале,
# но для единообразия с старыми DAT-файлами
# оставлено название IPRN***RUDN в начале данных.
bufr_pos = bufr_data.find(b'BUFR')
if bufr_pos == -1 or bufr_pos > 1000:
return None
reader.seek(bufr_pos * 8)
# Заголовок
reader.read_ia5(32) # BUFR
reader.read_uint(24) # длина всего BUFR
reader.read_uint(8) # версия BUFR
# SFG: хотя указывается что версия BUFR 2,
# но в BUFR из MSG используется частично формат версии 4,
# частично формат версии 2, плюс нестандартные дескрипторы.
# поэтому подавляющее большинство софта для работы с BUFR
# не будет работать с BUFR из MSG.
# Секция 1
read_section1(reader)
# Секция 2
sec2_len = reader.read_bytes(3, restore=True)
if sec2_len > 0:
reader.read_bytes(sec2_len)
# Секция 3
sec3_data = read_section3(reader)
# Список дескрипторов
descriptor_list = [f"{f} {x:02d} {y:03d}" for f, x, y in sec3_data["descriptors"]]
# Секция 4
reader.read_bytes(3) # длина секции в байтах
reader.read_bytes(1) # зарезервировано
# Читаем метаданные всех дескрипторов
for code in descriptor_list:
reader.read_descriptor(code)
if code == "0 08 007":
break
# SFG: почти всех, некоторые стандартные дескрипторы отсутствуют.
# Дескриптор данных по типу
descriptor_map = {
'dBz': "0 21 001",
'dBzD': "0 21 003",
'doppler': "0 21 014",
'meteo': "0 21 022"
}
descriptor_code = descriptor_map.get(data_type, "0 21 001")
# Извлекаем пиксели напрямую в numpy массив
grid = np.full(10000, np.nan, dtype=np.float32)
position = 0
n_pixels_with_data = reader.read_descriptor("0 31 002")
for _ in range(n_pixels_with_data):
n_skip = reader.read_descriptor("0 31 012")
marker = reader.read_descriptor(descriptor_code)
# Заполняем пропущенные пиксели
if n_skip > 0: # and position + n_skip <= 10000:
grid[position:position + n_skip] = marker
position += n_skip
# Пиксели с данными
n_values = reader.read_descriptor("0 31 002")
for _ in range(n_values):
if position >= 10000:
break
value = reader.read_descriptor(descriptor_code)
grid[position] = value
position += 1
grid = grid.reshape(100, 100)
# Offset и scale уже применены в read_descriptor через таблицу DESCRIPTORS,
# поэтому здесь, в отличие от старого кода для DAT-файлов, нет необходимости применять их еще раз.
return grid
except Exception as e:
print(f"Error parsing BUFR block: {e}")
return None
# ============================================================================
# Извлечение блоков с BUFR из MSG
# ============================================================================
class LayerConfig:
"""Конфигурация слоев"""
DBZ_LAYERS = list(range(40, 51)) # 11 слоев
DBZD_LAYERS = list(range(56, 66)) # 10 слоев
DOPPLER_LAYERS = list(range(78, 88)) # 10 слоев
METEO_LAYER = 70
@classmethod
def get_all_layers(cls):
"""Возвращает все слои"""
return cls.DBZ_LAYERS + cls.DBZD_LAYERS + cls.DOPPLER_LAYERS + [cls.METEO_LAYER]
def extract_msg_blocks(msg_file_path, verbose=True):
"""
Извлекает BUFR блоки из MSG файла в память
Returns:
dict: {(layer, time): block_data}
"""
if verbose:
print(f"Чтение MSG: {msg_file_path}")
with open(msg_file_path, 'rb') as f:
data = f.read()
# Поиск окончаний блоков
pattern = b'7777\r\r\n\r\r\n\x03'
block_ends = []
start = 0
while True:
pos = data.find(pattern, start)
if pos == -1:
break
block_ends.append(pos + len(pattern))
start = pos + 1
if verbose:
print(f"Найдено блоков: {len(block_ends)}")
# Извлекаем блоки
blocks = {}
block_starts = [0] + block_ends[:-1]
for start_pos, end_pos in zip(block_starts, block_ends):
block_data = data[start_pos:end_pos]
# Извлекаем метаданные
iprn_match = re.search(b'IPRN(\d+)\s+RUDN\s+(\d{6})', block_data)
if iprn_match:
layer = int(iprn_match.group(1).decode())
time_str = iprn_match.group(2).decode()
blocks[(layer, time_str)] = block_data
if verbose:
times = sorted(set(t for l, t in blocks.keys()))
print(f"Временных меток: {len(times)}")
print(f"Слоев: {len(set(l for l, t in blocks.keys()))}")
return blocks
# ============================================================================
# Конвертация в NPZ
# ============================================================================
def _process_single_time(args):
"""
Обработка одной временной метки (для параллелизации)
Args:
args: (time_str, blocks_for_time, time_idx)
Returns:
tuple: (time_idx, dbz_data, dbzd_data, doppler_data, meteo_data)
"""
time_str, blocks_for_time, time_idx = args
# Инициализация массивов для одной временной метки
dbz = np.full((100, 100, 11), np.nan, dtype=np.float32)
dbzd = np.full((100, 100, 10), np.nan, dtype=np.float32)
doppler = np.full((100, 100, 10), np.nan, dtype=np.float32)
meteo = np.full((100, 100), np.nan, dtype=np.float32)
# dBz слои (40-50)
for layer_idx, layer in enumerate(LayerConfig.DBZ_LAYERS):
block_data = blocks_for_time.get(layer)
if block_data:
grid = parse_bufr_block(block_data, 'dBz')
if grid is not None:
dbz[:, :, layer_idx] = grid
# dBzD слои (56-65)
for layer_idx, layer in enumerate(LayerConfig.DBZD_LAYERS):
block_data = blocks_for_time.get(layer)
if block_data:
grid = parse_bufr_block(block_data, 'dBzD')
if grid is not None:
dbzd[:, :, layer_idx] = grid
# Doppler слои (78-87)
for layer_idx, layer in enumerate(LayerConfig.DOPPLER_LAYERS):
block_data = blocks_for_time.get(layer)
if block_data:
grid = parse_bufr_block(block_data, 'doppler')
if grid is not None:
doppler[:, :, layer_idx] = grid
# Meteo слой (70)
block_data = blocks_for_time.get(LayerConfig.METEO_LAYER)
if block_data:
grid = parse_bufr_block(block_data, 'meteo')
if grid is not None:
meteo[:, :] = grid
return (time_idx, dbz, dbzd, doppler, meteo)
def convert_msg_to_npz(msg_file_path, output_path=None, verbose=True, n_workers=None):
"""
Args:
msg_file_path: путь к MSG файлу
output_path: путь к выходному NPZ (опционально)
verbose: выводить ли прогресс
n_workers: количество процессов (по умолчанию cpu_count())
Returns:
путь к сохраненному NPZ
"""
msg_file_path = Path(msg_file_path)
if n_workers is None:
n_workers = cpu_count()
if verbose:
print("=" * 70)
print("Конвертация MSG в NPZ")
print("=" * 70)
print("\n[1/3] Извлечение BUFR блоков...")
blocks = extract_msg_blocks(msg_file_path, verbose=verbose)
if verbose:
print(f"\n[2/3] Парсинг данных (workers: {n_workers})...")
times = sorted(set(t for l, t in blocks.keys()))
n_times = len(times)
dbz = np.full((n_times, 100, 100, 11), np.nan, dtype=np.float32)
dbzd = np.full((n_times, 100, 100, 10), np.nan, dtype=np.float32)
doppler = np.full((n_times, 100, 100, 10), np.nan, dtype=np.float32)
meteo = np.full((n_times, 100, 100), np.nan, dtype=np.float32)
# Подготовка данных для параллельной обработки
tasks = []
for time_idx, time_str in enumerate(times):
# Собираем все блоки для данной временной метки
blocks_for_time = {}
for layer in LayerConfig.get_all_layers():
block_data = blocks.get((layer, time_str))
if block_data:
blocks_for_time[layer] = block_data
tasks.append((time_str, blocks_for_time, time_idx))
with Pool(processes=n_workers) as pool:
results = pool.map(_process_single_time, tasks)
if verbose:
print(f" Сборка результатов...")
for time_idx, dbz_t, dbzd_t, doppler_t, meteo_t in results:
dbz[time_idx] = dbz_t
dbzd[time_idx] = dbzd_t
doppler[time_idx] = doppler_t
meteo[time_idx] = meteo_t
if verbose:
print("\n[3/3] Сохранение в NPZ...")
if output_path is None:
date_match = re.search(r'(\d{8})', msg_file_path.stem)
if date_match:
date_str = date_match.group(1)
formatted_date = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}"
else:
formatted_date = msg_file_path.stem
output_dir = msg_file_path.parent
output_dir.mkdir(exist_ok=True)
output_path = output_dir / f"{formatted_date}.npz"
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(
output_path,
dbz=dbz,
dbzd=dbzd,
doppler=doppler,
meteo=meteo,
times=np.array(times, dtype='S6')
)
if verbose:
file_size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"\n{'=' * 70}")
print(f"Готово! Файл: {output_path}")
print(f"Размер: {file_size_mb:.2f} МБ")
print(f"{'=' * 70}\n")
return output_path
def load_npz_data(npz_path):
"""Загружает данные из NPZ"""
data = np.load(npz_path)
return {
'dbz': data['dbz'],
'dbzd': data['dbzd'],
'doppler': data['doppler'],
'meteo': data['meteo'] if 'meteo' in data else np.zeros((144, 100, 100)),
'times': data['times'] if 'times' in data else None
}
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Использование: python msg_converter.py <msg_file> [output.npz]")
sys.exit(1)
msg_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
try:
result_path = convert_msg_to_npz(msg_file, output_file, verbose=True)
print(f"Успешно: {result_path}")
except Exception as e:
print(f"Ошибка: {e}")
import traceback
traceback.print_exc()
sys.exit(1)