Browse Source

First commit

master
FedorSarafanov 2 days ago
commit
52b2d59458
  1. 152
      .gitignore
  2. 67
      README.md
  3. 138
      make_gif.py
  4. 568
      msg_converter.py
  5. 4
      requirements.txt

152
.gitignore vendored

@ -0,0 +1,152 @@ @@ -0,0 +1,152 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Project specific
*.npz
*.gif
*.msg
temp/
logs/

67
README.md

@ -0,0 +1,67 @@ @@ -0,0 +1,67 @@
# Конвертер данных метеорологического радара в формате MSG
## Использование
### Конвертация MSG в NPZ
```bash
python msg_converter.py input.msg [output.npz]
```
### Создание GIF анимации
```bash
python make_gif.py file.npz [output.gif]
```
## Установка зависимостей
```bash
pip install -r requirements.txt
```
## Описание
Проект состоит из двух основных модулей:
- `msg_converter.py` - конвертер данных MSG (BUFR) в бинарный формат NPZ
- `make_gif.py` - тестовый файл для создания анимированного GIF из NPZ с четырьмя панелямиЖ
- max(dBz) - максимальная отражаемость
- max(dBzD) - дифференциальная отражаемость
- max(Doppler) - доплеровская скорость
- Meteo - метеорологические данные
# Лицензионное соглашение
## РАЗРЕШЕНИЯ
Настоящим предоставляется бесплатное право на использование данного программного
обеспечения (ПО) в неизменном виде для личных и коммерческих целей без каких-либо
ограничений.
## ЗАПРЕТЫ
Запрещается:
1. **Распространение без копирайта** - распространение ПО в любой форме
(включая, но не ограничиваясь: продажа, передача, публикация, размещение
в интернете) без сохранения оригинального копирайта и данной лицензии.
2. **Создание производных работ без упоминания автора** - разработка новых программ
на основе данного кода или использование его в составе других программ без
упоминания автора данной версии кода.
3. **Удаление или изменение копирайта** - удаление, изменение или сокрытие
информации об авторских правах в любом виде.
## УСЛОВИЯ
- Данная лицензия применяется ко всем файлам проекта
- Любое использование ПО означает принятие условий данной лицензии
- Нарушение условий лицензии влечет ответственность в соответствии
с действующим законодательством
## КОНТАКТНАЯ ИНФОРМАЦИЯ
Для получения разрешения на модификацию или коммерческое использование:
fg.sarafanov@ipfran.ru

138
make_gif.py

@ -0,0 +1,138 @@ @@ -0,0 +1,138 @@
#!/usr/bin/env python3
# ==============================================================================
# Создание GIF анимации из NPZ данных
# 4 панели: maxz(dBz), maxz(dBzD), maxz(Doppler), meteo
#
# Copyright (c) 2025 SFG
#
# Все права защищены. Распространяется под пользовательской лицензией.
# Подробности см. в README.md.
# ==============================================================================
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
from pathlib import Path
from msg_converter import load_npz_data
def create_gif(npz_path, output_path=None):
"""
Args:
npz_path: путь к NPZ файлу
output_path: путь к GIF (если None, то npz_path.gif)
"""
npz_path = Path(npz_path)
if output_path is None:
output_path = npz_path.with_suffix('.gif')
else:
output_path = Path(output_path)
print(f"Загрузка данных: {npz_path}")
data = load_npz_data(npz_path)
n_frames = data['dbz'].shape[0]
print(f"Кадров: {n_frames}")
# Максимумы по z
dbz_max = np.nanmax(data['dbz'], axis=3) # (144, 100, 100)
dbzd_max = np.nanmax(data['dbzd'], axis=3) # (144, 100, 100)
doppler_max = np.nanmax(data['doppler'], axis=3) # (144, 100, 100)
meteo = data['meteo'] # (144, 100, 100)
# Диапазоны для colorbar
vmin_dbz = np.nanpercentile(dbz_max, 1)
vmax_dbz = np.nanpercentile(dbz_max, 99)
vmin_dbzd = np.nanpercentile(dbzd_max, 1)
vmax_dbzd = np.nanpercentile(dbzd_max, 99)
vmin_doppler = np.nanpercentile(doppler_max, 1)
vmax_doppler = np.nanpercentile(doppler_max, 99)
vmin_meteo = np.nanmin(meteo)
vmax_meteo = np.nanmax(meteo)
# Создаем фигуру
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('MSG Radar Data', fontsize=16, y=0.98)
# Инициализация imshow
im1 = axes[0, 0].imshow(dbz_max[0], cmap='viridis', vmin=vmin_dbz, vmax=vmax_dbz)
axes[0, 0].set_title('max(dBz)')
axes[0, 0].axis('off')
plt.colorbar(im1, ax=axes[0, 0], fraction=0.046)
im2 = axes[0, 1].imshow(dbzd_max[0], cmap='plasma', vmin=vmin_dbzd, vmax=vmax_dbzd)
axes[0, 1].set_title('max(dBzD)')
axes[0, 1].axis('off')
plt.colorbar(im2, ax=axes[0, 1], fraction=0.046)
im3 = axes[1, 0].imshow(doppler_max[0], cmap='seismic', vmin=vmin_doppler, vmax=vmax_doppler)
axes[1, 0].set_title('max(Doppler)')
axes[1, 0].axis('off')
plt.colorbar(im3, ax=axes[1, 0], fraction=0.046)
im4 = axes[1, 1].imshow(meteo[0], cmap='YlOrRd', vmin=vmin_meteo, vmax=vmax_meteo)
axes[1, 1].set_title('Meteo')
axes[1, 1].axis('off')
plt.colorbar(im4, ax=axes[1, 1], fraction=0.046)
time_text = fig.text(0.5, 0.01, '', ha='center', fontsize=12)
plt.tight_layout(rect=[0, 0.02, 1, 0.96])
def update(frame):
"""Обновление кадра"""
im1.set_array(dbz_max[frame])
im2.set_array(dbzd_max[frame])
im3.set_array(doppler_max[frame])
im4.set_array(meteo[frame])
if data['times'] is not None:
time_str = data['times'][frame].decode()
time_text.set_text(f"Frame {frame+1}/{n_frames} | Time: {time_str}")
else:
time_text.set_text(f"Frame {frame+1}/{n_frames}")
if frame % 10 == 0 or frame == n_frames - 1:
percent = (frame + 1) / n_frames * 100
bar_len = 40
filled = int(bar_len * (frame + 1) / n_frames)
bar = '' * filled + '' * (bar_len - filled)
print(f" [{bar}] {percent:.0f}% ({frame+1}/{n_frames})", end='\r' if frame < n_frames - 1 else '\n')
return im1, im2, im3, im4, time_text
print("Создание анимации...")
anim = FuncAnimation(fig, update, frames=n_frames, interval=100, blit=True)
print(f"Сохранение GIF: {output_path}")
writer = PillowWriter(fps=10)
anim.save(output_path, writer=writer)
plt.close()
file_size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"Готово! Размер: {file_size_mb:.2f} МБ")
return output_path
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Использование: python make_gif.py <file.npz> [output.gif]")
sys.exit(1)
npz_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
try:
result = create_gif(npz_file, output_file)
print(f"{result}")
except Exception as e:
print(f"Ошибка: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

568
msg_converter.py

@ -0,0 +1,568 @@ @@ -0,0 +1,568 @@
#!/usr/bin/env python3
# ==============================================================================
# Конвертер MSG (BUFR) в NPZ
# Объединенный модуль для извлечения, парсинга и конвертации
# ИСПОЛЬЗОВАНИЕ:
# # Из командной строки:
# python msg_converter.py input.msg [output.npz]
# # Из Python:
# from msg_converter import convert_msg_to_npz, load_npz_data
# convert_msg_to_npz('input.msg')
# data = load_npz_data('output.npz')
#
# Copyright (c) 2025 SFG
#
# Все права защищены. Распространяется под пользовательской лицензией.
# Подробности см. в README.md.
# ==============================================================================
import re
import numpy as np
from pathlib import Path
from bitarray import bitarray
from multiprocessing import Pool, cpu_count
# ============================================================================
# Таблицы дескрипторов и последовательностей (что-то вроде таблиц B и D из BUFR)
# ============================================================================
DESCRIPTORS = {
"0 01 018": {"bits": 40, "scale": 0, "offset": 0, "type": "IA5", "name": "SHORT STATION NAME"},
"0 01 015": {"bits": 160, "scale": 0, "offset": 0, "type": "IA5", "name": "STATION NAME"},
"0 05 002": {"bits": 15, "scale": -2, "offset": -9000, "type": "INT", "name": "LATITUDE"},
"0 06 002": {"bits": 16, "scale": -2, "offset": -18000,"type": "INT", "name": "LONGITUDE"},
"0 07 001": {"bits": 15, "scale": 0, "offset": -400, "type": "INT", "name": "HEIGHT OF STATION"},
"0 12 004": {"bits": 12, "scale": -1, "offset": 0, "type": "INT", "name": "DRY BULB TEMPERATURE AT 2M"},
"0 08 001": {"bits": 7, "scale": 0, "offset": 0, "type": "UINT","name": "VERTICAL SOUNDING SIGNIFICANCE"},
"0 07 002": {"bits": 16, "scale": 1, "offset": -40, "type": "INT", "name": "HEIGHT/ALTITUDE"},
"0 19 005": {"bits": 9, "scale": 0, "offset": 0, "type": "INT", "name": "DIRECTION OF MOTION"},
"0 19 006": {"bits": 14, "scale": -2, "offset": 0, "type": "INT", "name": "SPEED OF MOTION"},
"0 29 001": {"bits": 3, "scale": 0, "offset": 0, "type": "UINT","name": "PROJECTION TYPE"},
"0 30 031": {"bits": 4, "scale": 0, "offset": 0, "type": "UINT","name": "PICTURE TYPE"},
"0 30 021": {"bits": 12, "scale": 0, "offset": 0, "type": "INT", "name": "PIXELS PER ROW"},
"0 30 022": {"bits": 12, "scale": 0, "offset": 0, "type": "INT", "name": "PIXELS PER COLUMN"},
"0 05 033": {"bits": 16, "scale": 1, "offset": 0, "type": "INT", "name": "PIXEL SIZE HORIZ-1"},
"0 06 033": {"bits": 16, "scale": 1, "offset": 0, "type": "INT", "name": "PIXEL SIZE HORIZ-2"},
"0 10 040": {"bits": 10, "scale": 0, "offset": 0, "type": "INT", "name": "NUMBER OF RETRIEVED LAYERS"},
"0 07 006": {"bits": 15, "scale": 0, "offset": 0, "type": "INT", "name": "HEIGHT ABOVE STATION"},
"0 08 007": {"bits": 4, "scale": 0, "offset": 0, "type": "UINT","name": "DIMENSIONAL SIGNIFICANCE"},
"0 31 002": {"bits": 16, "scale": 0, "offset": 0, "type": "UINT","name": "EXTENDED DELAYED DESCRIPT.REPLIC.FACTOR"},
"0 31 012": {"bits": 16, "scale": 0, "offset": 0, "type": "UINT","name": "EXT.DEL.DESCRIPT.AND DATA REPETIT.FACTOR"},
"0 21 001": {"bits": 7, "scale": 0, "offset": -64, "type": "UINT", "name": "HORIZONTAL REFLECTIVITY"},
"0 21 003": {"bits": 7, "scale": 1, "offset": -5, "type": "UINT", "name": "DIFFERENTIAL REFLECTIVITY"},
"0 21 014": {"bits": 13, "scale": 1, "offset": -4096, "type": "UINT", "name": "DOPPLER MEAN VELOCITY RADIAL"},
"0 21 022": {"bits": 5, "scale": 0, "offset": 0, "type": "UINT", "name": "METEO [SFG NOTE: UNKNOWN]"},
}
SEQUENCES = {
"3 01 024": ["0 05 002", "0 06 002", "0 07 001"], # LAT/LON/HEIGHT
}
# ============================================================================
# BUFR Reader
# ============================================================================
class BitReader:
"""Побитовое чтение данных"""
def __init__(self, data, start_bit=0, end_bit=None):
if isinstance(data, str):
with open(data, "rb") as f:
data = f.read()
if isinstance(data, bytes):
bits = bitarray()
bits.frombytes(data)
else:
bits = data
self.bits = bits[start_bit:end_bit] if end_bit else bits[start_bit:]
self.pos = 0
def __len__(self):
return len(self.bits)
def tell(self):
return self.pos
def seek(self, bit_pos):
self.pos = bit_pos
def read_bits(self, n, restore=False):
if self.pos + n > len(self.bits):
raise EOFError("конец битового потока")
val = 0
for bit in self.bits[self.pos:self.pos + n]:
val = (val << 1) | bit
if not restore:
self.pos += n
return val
def read_bytes(self, n, restore=False):
return self.read_bits(n * 8, restore)
def read_uint(self, n, restore=False):
return self.read_bits(n, restore)
def read_int(self, n, signed=True, restore=False):
raw = self.read_bits(n, restore)
if signed and n > 1 and (raw & (1 << (n - 1))):
raw -= 1 << n
return raw
def read_ia5(self, n_bits, restore=False):
n_bytes = (n_bits + 7) // 8
val = self.read_bits(n_bytes * 8, restore)
raw = val.to_bytes(n_bytes, "big")
return raw.decode("ascii", errors="ignore").rstrip("\x00").strip()
def read_descriptor(self, desc_code):
SEQ = SEQUENCES.get(desc_code)
if SEQ:
return [self.read_descriptor(s) for s in SEQ]
desc = DESCRIPTORS.get(desc_code)
if not desc:
raise ValueError(f"Неизвестный дескриптор: {desc_code}")
bits, scale, offset = desc["bits"], desc["scale"], desc["offset"]
dtype = desc["type"]
if dtype == "IA5":
val = self.read_ia5(bits)
elif dtype == "INT":
val = (self.read_int(bits) + offset) * (10 ** scale)
elif dtype == "UINT":
val = (self.read_uint(bits) + offset) * (10 ** scale)
else:
val = None
return val
def read_section1(reader):
"""Читает секцию 1 BUFR"""
length1 = reader.read_uint(16)
return {
"length": length1,
"master_table": reader.read_uint(8),
"originating_center": reader.read_uint(8),
"sub_center": reader.read_uint(8),
"update_sequence": reader.read_uint(8),
"flags": reader.read_uint(8),
"data_category": reader.read_uint(8),
"local_subcat": reader.read_uint(8),
"int_subcat": reader.read_uint(8),
"master_table_version": reader.read_uint(8),
"local_table_version": reader.read_uint(8),
"year": reader.read_uint(8),
"month": reader.read_uint(8),
"day": reader.read_uint(8),
"hour": reader.read_uint(8),
"minute": reader.read_uint(8),
"second": reader.read_uint(8), # [в BUFR из MSG это поле используется некорректно, секунд тут нет]
}
def read_section3(reader):
"""Читает секцию 3 BUFR"""
start = reader.tell()
length = reader.read_uint(24)
reserved = reader.read_uint(8)
num_subsets = reader.read_uint(16)
flags = reader.read_uint(8)
descriptors = []
while reader.tell() < start + length * 8:
remaining = start + length * 8 - reader.tell()
if remaining < 16:
break
desc = reader.read_uint(16)
f = desc >> 14
x = (desc >> 8) & 0x3F
y = desc & 0xFF
if f == 0 and x == 0 and y == 0:
break
descriptors.append((f, x, y))
# Пропускаем padding
padding_bits = start + length * 8 - reader.tell()
if padding_bits > 0:
reader.read_uint(padding_bits)
return {
"length": length,
"num_subsets": num_subsets,
"descriptors": descriptors,
}
def parse_bufr_block(bufr_data, data_type='dBz'):
"""
Парсит BUFR и возвращает сетку 100x100
Args:
bufr_data: данные BUFR, извлеченные из MSG
data_type: тип данных ('dBz', 'dBzD', 'doppler', 'meteo')
Returns:
numpy array (100, 100) или None
"""
try:
reader = BitReader(bufr_data)
# BUFR по стандарту обычно в начале,
# но для единообразия с старыми DAT-файлами
# оставлено название IPRN***RUDN в начале данных.
bufr_pos = bufr_data.find(b'BUFR')
if bufr_pos == -1 or bufr_pos > 1000:
return None
reader.seek(bufr_pos * 8)
# Заголовок
reader.read_ia5(32) # BUFR
reader.read_uint(24) # длина всего BUFR
reader.read_uint(8) # версия BUFR
# SFG: хотя указывается что версия BUFR 2,
# но в BUFR из MSG используется частично формат версии 4,
# частично формат версии 2, плюс нестандартные дескрипторы.
# поэтому подавляющее большинство софта для работы с BUFR
# не будет работать с BUFR из MSG.
# Секция 1
read_section1(reader)
# Секция 2
sec2_len = reader.read_bytes(3, restore=True)
if sec2_len > 0:
reader.read_bytes(sec2_len)
# Секция 3
sec3_data = read_section3(reader)
# Список дескрипторов
descriptor_list = [f"{f} {x:02d} {y:03d}" for f, x, y in sec3_data["descriptors"]]
# Секция 4
reader.read_bytes(3) # длина секции в байтах
reader.read_bytes(1) # зарезервировано
# Читаем метаданные всех дескрипторов
for code in descriptor_list:
reader.read_descriptor(code)
if code == "0 08 007":
break
# SFG: почти всех, некоторые стандартные дескрипторы отсутствуют.
# Дескриптор данных по типу
descriptor_map = {
'dBz': "0 21 001",
'dBzD': "0 21 003",
'doppler': "0 21 014",
'meteo': "0 21 022"
}
descriptor_code = descriptor_map.get(data_type, "0 21 001")
# Извлекаем пиксели напрямую в numpy массив
grid = np.full(10000, np.nan, dtype=np.float32)
position = 0
n_pixels_with_data = reader.read_descriptor("0 31 002")
for _ in range(n_pixels_with_data):
n_skip = reader.read_descriptor("0 31 012")
marker = reader.read_descriptor(descriptor_code)
# Заполняем пропущенные пиксели
if n_skip > 0 and position + n_skip <= 10000:
grid[position:position + n_skip] = marker
position += n_skip
# Пиксели с данными
n_values = reader.read_descriptor("0 31 002")
for _ in range(n_values):
if position >= 10000:
break
value = reader.read_descriptor(descriptor_code)
grid[position] = value
position += 1
grid = grid.reshape(100, 100)
# Offset и scale уже применены в read_descriptor через таблицу DESCRIPTORS,
# поэтому здесь, в отличие от старого кода для DAT-файлов, нет необходимости применять их еще раз.
return grid
except Exception:
return None
# ============================================================================
# Извлечение блоков с BUFR из MSG
# ============================================================================
class LayerConfig:
"""Конфигурация слоев"""
DBZ_LAYERS = list(range(40, 51)) # 11 слоев
DBZD_LAYERS = list(range(56, 66)) # 10 слоев
DOPPLER_LAYERS = list(range(78, 88)) # 10 слоев
METEO_LAYER = 70
@classmethod
def get_all_layers(cls):
"""Возвращает все слои"""
return cls.DBZ_LAYERS + cls.DBZD_LAYERS + cls.DOPPLER_LAYERS + [cls.METEO_LAYER]
def extract_msg_blocks(msg_file_path, verbose=True):
"""
Извлекает BUFR блоки из MSG файла в память
Returns:
dict: {(layer, time): block_data}
"""
if verbose:
print(f"Чтение MSG: {msg_file_path}")
with open(msg_file_path, 'rb') as f:
data = f.read()
# Поиск окончаний блоков
pattern = b'7777\r\r\n\r\r\n\x03'
block_ends = []
start = 0
while True:
pos = data.find(pattern, start)
if pos == -1:
break
block_ends.append(pos + len(pattern))
start = pos + 1
if verbose:
print(f"Найдено блоков: {len(block_ends)}")
# Извлекаем блоки
blocks = {}
block_starts = [0] + block_ends[:-1]
for start_pos, end_pos in zip(block_starts, block_ends):
block_data = data[start_pos:end_pos]
# Извлекаем метаданные
iprn_match = re.search(b'IPRN(\d+)\s+RUDN\s+(\d{6})', block_data)
if iprn_match:
layer = int(iprn_match.group(1).decode())
time_str = iprn_match.group(2).decode()
blocks[(layer, time_str)] = block_data
if verbose:
times = sorted(set(t for l, t in blocks.keys()))
print(f"Временных меток: {len(times)}")
print(f"Слоев: {len(set(l for l, t in blocks.keys()))}")
return blocks
# ============================================================================
# Конвертация в NPZ
# ============================================================================
def _process_single_time(args):
"""
Обработка одной временной метки (для параллелизации)
Args:
args: (time_str, blocks_for_time, time_idx)
Returns:
tuple: (time_idx, dbz_data, dbzd_data, doppler_data, meteo_data)
"""
time_str, blocks_for_time, time_idx = args
# Инициализация массивов для одной временной метки
dbz = np.full((100, 100, 11), np.nan, dtype=np.float32)
dbzd = np.full((100, 100, 10), np.nan, dtype=np.float32)
doppler = np.full((100, 100, 10), np.nan, dtype=np.float32)
meteo = np.full((100, 100), np.nan, dtype=np.float32)
# dBz слои (40-50)
for layer_idx, layer in enumerate(LayerConfig.DBZ_LAYERS):
block_data = blocks_for_time.get(layer)
if block_data:
grid = parse_bufr_block(block_data, 'dBz')
if grid is not None:
dbz[:, :, layer_idx] = grid
# dBzD слои (56-65)
for layer_idx, layer in enumerate(LayerConfig.DBZD_LAYERS):
block_data = blocks_for_time.get(layer)
if block_data:
grid = parse_bufr_block(block_data, 'dBzD')
if grid is not None:
dbzd[:, :, layer_idx] = grid
# Doppler слои (78-87)
for layer_idx, layer in enumerate(LayerConfig.DOPPLER_LAYERS):
block_data = blocks_for_time.get(layer)
if block_data:
grid = parse_bufr_block(block_data, 'doppler')
if grid is not None:
doppler[:, :, layer_idx] = grid
# Meteo слой (70)
block_data = blocks_for_time.get(LayerConfig.METEO_LAYER)
if block_data:
grid = parse_bufr_block(block_data, 'meteo')
if grid is not None:
meteo[:, :] = grid
return (time_idx, dbz, dbzd, doppler, meteo)
def convert_msg_to_npz(msg_file_path, output_path=None, verbose=True, n_workers=None):
"""
Args:
msg_file_path: путь к MSG файлу
output_path: путь к выходному NPZ (опционально)
verbose: выводить ли прогресс
n_workers: количество процессов (по умолчанию cpu_count())
Returns:
путь к сохраненному NPZ
"""
msg_file_path = Path(msg_file_path)
if n_workers is None:
n_workers = cpu_count()
if verbose:
print("=" * 70)
print("Конвертация MSG в NPZ")
print("=" * 70)
# Извлечение блоков
if verbose:
print("\n[1/3] Извлечение BUFR блоков...")
blocks = extract_msg_blocks(msg_file_path, verbose=verbose)
# Парсинг данных (параллельно)
if verbose:
print(f"\n[2/3] Парсинг данных (workers: {n_workers})...")
times = sorted(set(t for l, t in blocks.keys()))
n_times = len(times)
# Инициализация массивов
dbz = np.full((n_times, 100, 100, 11), np.nan, dtype=np.float32)
dbzd = np.full((n_times, 100, 100, 10), np.nan, dtype=np.float32)
doppler = np.full((n_times, 100, 100, 10), np.nan, dtype=np.float32)
meteo = np.full((n_times, 100, 100), np.nan, dtype=np.float32)
# Подготовка данных для параллельной обработки
tasks = []
for time_idx, time_str in enumerate(times):
# Собираем все блоки для данной временной метки
blocks_for_time = {}
for layer in LayerConfig.get_all_layers():
block_data = blocks.get((layer, time_str))
if block_data:
blocks_for_time[layer] = block_data
tasks.append((time_str, blocks_for_time, time_idx))
# Параллельная обработка
with Pool(processes=n_workers) as pool:
results = pool.map(_process_single_time, tasks)
# Сборка результатов
if verbose:
print(f" Сборка результатов...")
for time_idx, dbz_t, dbzd_t, doppler_t, meteo_t in results:
dbz[time_idx] = dbz_t
dbzd[time_idx] = dbzd_t
doppler[time_idx] = doppler_t
meteo[time_idx] = meteo_t
# Сохранение
if verbose:
print("\n[3/3] Сохранение в NPZ...")
if output_path is None:
date_match = re.search(r'(\d{8})', msg_file_path.stem)
if date_match:
date_str = date_match.group(1)
formatted_date = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}"
else:
formatted_date = msg_file_path.stem
output_dir = msg_file_path.parent
output_dir.mkdir(exist_ok=True)
output_path = output_dir / f"{formatted_date}.npz"
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(
output_path,
dbz=dbz,
dbzd=dbzd,
doppler=doppler,
meteo=meteo,
times=np.array(times, dtype='S6')
)
if verbose:
file_size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"\n{'=' * 70}")
print(f"Готово! Файл: {output_path}")
print(f"Размер: {file_size_mb:.2f} МБ")
print(f"{'=' * 70}\n")
return output_path
def load_npz_data(npz_path):
"""Загружает данные из NPZ"""
data = np.load(npz_path)
return {
'dbz': data['dbz'],
'dbzd': data['dbzd'],
'doppler': data['doppler'],
'meteo': data['meteo'],
'times': data['times'] if 'times' in data else None
}
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Использование: python msg_converter.py <msg_file> [output.npz]")
sys.exit(1)
msg_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
try:
result_path = convert_msg_to_npz(msg_file, output_file, verbose=True)
print(f"Успешно: {result_path}")
except Exception as e:
print(f"Ошибка: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

4
requirements.txt

@ -0,0 +1,4 @@ @@ -0,0 +1,4 @@
numpy>=1.20.0
bitarray>=2.3.0
matplotlib>=3.3.0
Pillow>=8.0.0
Loading…
Cancel
Save