Новый код для чтения BUFR данных метеорологического радара из .msg-файлов и конвертации в .npz
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
6.6 KiB

#!/usr/bin/env python3
# ==============================================================================
# Создание GIF анимации из NPZ данных
# 4 панели: maxz(dBz), maxz(dBzD), maxz(Doppler), meteo
#
# Copyright (c) 2025 SFG
#
# Все права защищены. Распространяется под пользовательской лицензией.
# Подробности см. в README.md.
# ==============================================================================
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
from matplotlib.colors import ListedColormap, BoundaryNorm
from pathlib import Path
from msg_converter import load_npz_data
def create_reflectivity_colormap():
"""
Создает цветовую схему для отражательности (dBz)
на основе предоставленных цветов и диапазонов
"""
# Цвета для отражательности
colors = [
'#E1F2FC', '#C2E2F7', '#A8D0E9', '#CFFF99', '#54FE3A',
'#00AAFF', '#034CFF', '#0000D0', '#00007F', '#FFFE02',
'#FF7F00', '#FE3D36', '#FE0000', '#49DE00', '#00AA01',
'#F500F5', '#AA01FE', '#720000'
]
# Диапазоны от -10 до 70 с шагом 5
boundaries = [-20] + list(range(-10, 75, 5)) + [80]
# Проверяем соответствие количества цветов и границ
assert len(colors) == len(boundaries) - 1, "Количество цветов должно быть на 1 меньше количества границ!"
# Создаем цветовую карту и нормализацию
cmap = ListedColormap(colors, name='reflectivity_segments')
norm = BoundaryNorm(boundaries, len(colors))
return cmap, norm
def create_gif(npz_path, output_path=None):
"""
Args:
npz_path: путь к NPZ файлу
output_path: путь к GIF (если None, то npz_path.gif)
"""
npz_path = Path(npz_path)
if output_path is None:
output_path = npz_path.with_suffix('.gif')
else:
output_path = Path(output_path)
print(f"Загрузка данных: {npz_path}")
data = load_npz_data(npz_path)
n_frames = data['dbz'].shape[0]
print(f"Кадров: {n_frames}")
## Подготовка массивов c shape = (144, 100, 100)
# Максимумы по z
dbz_max = np.nanmax(data['dbz'], axis=3)
dbzd_max = np.nanmax(data['dbzd'], axis=3)
meteo = data['meteo']
# Срезы по высоте
doppler_5km = data['doppler'][:, :, :, 5]
# Создаем цветовую схему для отражательности
reflectivity_cmap, reflectivity_norm = create_reflectivity_colormap()
# Диапазоны для colorbar
vmin_dbz = -10 # Фиксированный диапазон для отражательности
vmax_dbz = 70
vmin_dbzd = -10 # Фиксированный диапазон для отражательности
vmax_dbzd = 70
vmin_doppler = np.nanpercentile(doppler_5km, 1)
vmax_doppler = np.nanpercentile(doppler_5km, 99)
vmin_meteo = np.nanmin(meteo)
vmax_meteo = np.nanmax(meteo)
# Создаем фигуру
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('MSG Radar Data', fontsize=16, y=0.98)
# Инициализация imshow
im1 = axes[0, 0].imshow(dbz_max[0], cmap=reflectivity_cmap, norm=reflectivity_norm, origin='lower')
axes[0, 0].set_title('max(dBz)')
axes[0, 0].axis('off')
cb1 = plt.colorbar(im1, ax=axes[0, 0], fraction=0.046)
cb1.set_ticks(range(-10, 75, 5))
cb1.ax.set_yticklabels([f'{i}' for i in range(-10, 75, 5)])
im2 = axes[0, 1].imshow(dbzd_max[0], cmap=reflectivity_cmap, norm=reflectivity_norm, origin='lower')
axes[0, 1].set_title('max(dBzD)')
axes[0, 1].axis('off')
cb2 = plt.colorbar(im2, ax=axes[0, 1], fraction=0.046)
cb2.set_ticks(range(-10, 75, 5))
cb2.ax.set_yticklabels([f'{i}' for i in range(-10, 75, 5)])
mod_seismic = plt.cm.seismic.copy()
mod_seismic.set_bad(color='black')
im3 = axes[1, 0].imshow(doppler_5km[0], cmap=mod_seismic, vmin=vmin_doppler, vmax=vmax_doppler, origin='lower')
axes[1, 0].set_title('Doppler [5 km]')
axes[1, 0].axis('off')
plt.colorbar(im3, ax=axes[1, 0], fraction=0.046)
im4 = axes[1, 1].imshow(meteo[0], cmap='YlOrRd', vmin=vmin_meteo, vmax=vmax_meteo, origin='lower')
axes[1, 1].set_title('Meteo')
axes[1, 1].axis('off')
plt.colorbar(im4, ax=axes[1, 1], fraction=0.046)
time_text = fig.text(0.5, 0.01, '', ha='center', fontsize=12)
plt.tight_layout(rect=[0, 0.02, 1, 0.96])
def update(frame):
"""Обновление кадра"""
im1.set_array(dbz_max[frame])
im2.set_array(dbzd_max[frame])
im3.set_array(doppler_5km[frame])
im4.set_array(meteo[frame])
if data['times'] is not None:
time_str = data['times'][frame].decode()
time_text.set_text(f"Frame {frame+1}/{n_frames} | Time: {time_str}")
else:
time_text.set_text(f"Frame {frame+1}/{n_frames}")
if frame % 10 == 0 or frame == n_frames - 1:
percent = (frame + 1) / n_frames * 100
bar_len = 40
filled = int(bar_len * (frame + 1) / n_frames)
bar = '' * filled + '' * (bar_len - filled)
print(f" [{bar}] {percent:.0f}% ({frame+1}/{n_frames})", end='\r' if frame < n_frames - 1 else '\n')
return im1, im2, im3, im4, time_text
print("Создание анимации...")
anim = FuncAnimation(fig, update, frames=n_frames, interval=100, blit=True)
print(f"Сохранение GIF: {output_path}\n")
writer = PillowWriter(fps=10)
anim.save(output_path, writer=writer)
plt.close()
file_size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"Готово! Размер: {file_size_mb:.2f} МБ")
return output_path
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Использование: python make_gif.py <file.npz> [output.gif]")
sys.exit(1)
npz_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
try:
result = create_gif(npz_file, output_file)
print(f"OK: {result}")
except Exception as e:
print(f"Ошибка: {e}")
import traceback
traceback.print_exc()
sys.exit(1)