Новый код для чтения BUFR данных метеорологического радара из .msg-файлов и конвертации в .npz
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
4.9 KiB

#!/usr/bin/env python3
# ==============================================================================
# Создание GIF анимации из NPZ данных
# 4 панели: maxz(dBz), maxz(dBzD), maxz(Doppler), meteo
#
# Copyright (c) 2025 SFG
#
# Все права защищены. Распространяется под пользовательской лицензией.
# Подробности см. в README.md.
# ==============================================================================
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
from pathlib import Path
from msg_converter import load_npz_data
def create_gif(npz_path, output_path=None):
"""
Args:
npz_path: путь к NPZ файлу
output_path: путь к GIF (если None, то npz_path.gif)
"""
npz_path = Path(npz_path)
if output_path is None:
output_path = npz_path.with_suffix('.gif')
else:
output_path = Path(output_path)
print(f"Загрузка данных: {npz_path}")
data = load_npz_data(npz_path)
n_frames = data['dbz'].shape[0]
print(f"Кадров: {n_frames}")
# Максимумы по z
dbz_max = np.nanmax(data['dbz'], axis=3) # (144, 100, 100)
dbzd_max = np.nanmax(data['dbzd'], axis=3) # (144, 100, 100)
meteo = data['meteo'] # (144, 100, 100)
# Срезы по высоте
doppler_5km = data['doppler'][:, :, :, 5] # (144, 100, 100)
# Диапазоны для colorbar
vmin_dbz = np.nanpercentile(dbz_max, 1)
vmax_dbz = np.nanpercentile(dbz_max, 99)
vmin_dbzd = np.nanpercentile(dbzd_max, 1)
vmax_dbzd = np.nanpercentile(dbzd_max, 99)
vmin_doppler = np.nanpercentile(doppler_5km, 1)
vmax_doppler = np.nanpercentile(doppler_5km, 99)
vmin_meteo = np.nanmin(meteo)
vmax_meteo = np.nanmax(meteo)
# Создаем фигуру
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle('MSG Radar Data', fontsize=16, y=0.98)
# Инициализация imshow
im1 = axes[0, 0].imshow(dbz_max[0], cmap='viridis', vmin=vmin_dbz, vmax=vmax_dbz)
axes[0, 0].set_title('max(dBz)')
axes[0, 0].axis('off')
plt.colorbar(im1, ax=axes[0, 0], fraction=0.046)
im2 = axes[0, 1].imshow(dbzd_max[0], cmap='plasma', vmin=vmin_dbzd, vmax=vmax_dbzd)
axes[0, 1].set_title('max(dBzD)')
axes[0, 1].axis('off')
plt.colorbar(im2, ax=axes[0, 1], fraction=0.046)
mod_seismic = plt.cm.seismic.copy()
mod_seismic.set_bad(color='black')
im3 = axes[1, 0].imshow(doppler_5km[0], cmap=mod_seismic, vmin=vmin_doppler, vmax=vmax_doppler)
axes[1, 0].set_title('Doppler [5 km]')
axes[1, 0].axis('off')
plt.colorbar(im3, ax=axes[1, 0], fraction=0.046)
im4 = axes[1, 1].imshow(meteo[0], cmap='YlOrRd', vmin=vmin_meteo, vmax=vmax_meteo)
axes[1, 1].set_title('Meteo')
axes[1, 1].axis('off')
plt.colorbar(im4, ax=axes[1, 1], fraction=0.046)
time_text = fig.text(0.5, 0.01, '', ha='center', fontsize=12)
plt.tight_layout(rect=[0, 0.02, 1, 0.96])
def update(frame):
"""Обновление кадра"""
im1.set_array(dbz_max[frame])
im2.set_array(dbzd_max[frame])
im3.set_array(doppler_5km[frame])
im4.set_array(meteo[frame])
if data['times'] is not None:
time_str = data['times'][frame].decode()
time_text.set_text(f"Frame {frame+1}/{n_frames} | Time: {time_str}")
else:
time_text.set_text(f"Frame {frame+1}/{n_frames}")
if frame % 10 == 0 or frame == n_frames - 1:
percent = (frame + 1) / n_frames * 100
bar_len = 40
filled = int(bar_len * (frame + 1) / n_frames)
bar = '' * filled + '' * (bar_len - filled)
print(f" [{bar}] {percent:.0f}% ({frame+1}/{n_frames})", end='\r' if frame < n_frames - 1 else '\n')
return im1, im2, im3, im4, time_text
print("Создание анимации...")
anim = FuncAnimation(fig, update, frames=n_frames, interval=100, blit=True)
print(f"Сохранение GIF: {output_path}\n")
writer = PillowWriter(fps=10)
anim.save(output_path, writer=writer)
plt.close()
file_size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"Готово! Размер: {file_size_mb:.2f} МБ")
return output_path
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Использование: python make_gif.py <file.npz> [output.gif]")
sys.exit(1)
npz_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
try:
result = create_gif(npz_file, output_file)
print(f"{result}")
except Exception as e:
print(f"Ошибка: {e}")
import traceback
traceback.print_exc()
sys.exit(1)