A collection of diagnostic and interpolation routines for use with output from the Weather Research and Forecasting (WRF-ARW) Model.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1158 lines
44 KiB

import unittest as ut
import numpy.testing as nt
import numpy as np
import numpy.ma as ma
import os
import sys
import subprocess
import glob
from wrf import (getvar, interplevel, interpline, vertcross, vinterp,
disable_xarray, xarray_enabled, to_np,
xy_to_ll, ll_to_xy, xy_to_ll_proj, ll_to_xy_proj,
extract_global_attrs, viewitems, CoordPair, ll_points)
from wrf.util import is_multi_file
NCL_EXE = "/Users/ladwig/miniconda2/envs/ncl_build/bin/ncl"
NCARG_ROOT = "/Users/ladwig/miniconda2/envs/ncl_build"
DIRS = ["/Users/ladwig/Documents/wrf_files/wrf_vortex_multi/moving_nest",
"/Users/ladwig/Documents/wrf_files/wrf_vortex_multi/static_nest"]
PATTERN = "wrfout_d02_*"
REF_NC_FILES = ["/tmp/wrftest_moving.nc", "/tmp/wrftest_static.nc"]
NEST = ["moving", "static"]
# Python 3
if sys.version_info > (3,):
xrange = range
def setUpModule():
# ncarg_root = os.environ.get("NCARG_ROOT", None)
# if ncarg_root is None:
# raise RuntimeError("$NCARG_ROOT environment variable not set")
os.environ["NCARG_ROOT"] = NCARG_ROOT
os.environ["NCARG_NCARG"] = os.path.join(NCARG_ROOT, "lib", "ncarg")
os.environ["OMP_NUM_THREADS"] = "4"
this_path = os.path.realpath(__file__)
ncl_script = os.path.join(os.path.dirname(this_path), "ncl",
"ncl_get_var.ncl")
for dir, outfile in zip(DIRS, REF_NC_FILES):
cmd = "{} {} 'dir=\"{}\"' 'pattern=\"{}\"' 'out_file=\"{}\"'" .format(
NCL_EXE,
ncl_script,
dir,
PATTERN,
outfile)
print(cmd)
if not os.path.exists(outfile):
status = subprocess.call(cmd, shell=True)
if (status != 0):
raise RuntimeError("NCL script failed. Could not set up test.")
# Using helpful information at:
# http://eli.thegreenplace.net/2014/04/02/
# dynamically-generating-python-test-cases
def make_test(varname, dir, pattern, referent, multi=False, pynio=False):
def test(self):
try:
from netCDF4 import Dataset as NetCDF
except ImportError:
pass
try:
import Nio
except ImportError:
pass
timeidx = 0 if not multi else None
pat = os.path.join(dir, pattern)
wrf_files = glob.glob(pat)
wrf_files.sort()
wrfin = []
for fname in wrf_files:
if not pynio:
f = NetCDF(fname)
try:
f.set_always_mask(False)
except AttributeError:
pass
wrfin.append(f)
else:
if not fname.endswith(".nc"):
_fname = fname + ".nc"
else:
_fname = fname
f = Nio.open_file(_fname)
wrfin.append(f)
refnc = NetCDF(referent)
try:
refnc.set_auto_mask(False)
except AttributeError:
pass
# These have a left index that defines the product type
multiproduct = varname in ("uvmet", "uvmet10", "wspd_wdir",
"wspd_wdir10", "uvmet_wspd_wdir",
"uvmet10_wspd_wdir",
"cape_2d", "cape_3d", "cfrac")
multi2d = ("uvmet10", "wspd_wdir10", "uvmet10_wspd_wdir",
"cape_2d", "cfrac")
multi3d = ("uvmet", "wspd_wdir", "uvmet_wspd_wdir", "cape_3d")
# These varnames don't have NCL functions to test against
ignore_referent = ("zstag", "geopt_stag")
if varname not in ignore_referent:
if not multi:
if varname in multi2d:
ref_vals = refnc.variables[varname][..., 0, :, :]
elif varname in multi3d:
ref_vals = refnc.variables[varname][..., 0, :, :, :]
else:
ref_vals = refnc.variables[varname][0, :]
else:
ref_vals = refnc.variables[varname][:]
if (varname == "zstag" or varname == "geopt_stag"):
v = getvar(wrfin, varname, timeidx=timeidx)
# For now, only make sure it runs without crashing since no NCL
# to compare with yet.0
else:
if varname == "cape_2d" or varname == "cape_3d":
# Cape still has a small floating point issue that
# hasn't been completely resolved, so for now check
# that cape is within 50.
my_vals = getvar(wrfin, varname, timeidx=timeidx)
rtol = 0
atol = 50.
else:
# All other tests should be within .001 of each other
my_vals = getvar(wrfin, varname, timeidx=timeidx)
rtol = 0
atol = 1.0E-3
try:
nt.assert_allclose(to_np(my_vals), ref_vals, rtol, atol)
except AssertionError:
absdiff = np.abs(to_np(my_vals) - ref_vals)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
return test
def _get_refvals(referent, varname, multi):
try:
from netCDF4 import Dataset as NetCDF
except ImportError:
pass
refnc = NetCDF(referent)
try:
pass
except ImportError:
pass
multi2d = ("uvmet10", "wspd_wdir10", "uvmet10_wspd_wdir", "cape_2d",
"cfrac")
multi3d = ("uvmet", "wspd_wdir", "uvmet_wspd_wdir", "cape_3d")
interpline = ("t2_line", "t2_line2", "t2_line3")
if not multi:
if varname in multi2d:
ref_vals = refnc.variables[varname][..., 0, :, :]
elif varname in multi3d:
ref_vals = refnc.variables[varname][..., 0, :, :, :]
else:
v = refnc.variables[varname][:]
if v.ndim == 2:
if varname in interpline:
ref_vals = v[0, :]
else:
ref_vals = v
else:
ref_vals = v[0, :]
else:
ref_vals = refnc.variables[varname][:]
return ref_vals
def make_interp_test(varname, dir, pattern, referent, multi=False,
pynio=False):
def test(self):
try:
from netCDF4 import Dataset as NetCDF
except ImportError:
pass
try:
import Nio
except ImportError:
pass
timeidx = 0 if not multi else None
pat = os.path.join(dir, pattern)
wrf_files = glob.glob(pat)
wrf_files.sort()
wrfin = []
for fname in wrf_files:
if not pynio:
f = NetCDF(fname)
try:
f.set_always_mask(False)
except AttributeError:
pass
wrfin.append(f)
else:
if not fname.endswith(".nc"):
_fname = fname + ".nc"
else:
_fname = fname
f = Nio.open_file(_fname)
wrfin.append(f)
if (varname == "interplevel"):
ref_ht_500 = _get_refvals(referent, "z_500", multi)
ref_p_5000 = _get_refvals(referent, "p_5000", multi)
ref_ht_multi = _get_refvals(referent, "z_multi", multi)
ref_p_multi = _get_refvals(referent, "p_multi", multi)
ref_ht2_500 = _get_refvals(referent, "z2_500", multi)
ref_p2_5000 = _get_refvals(referent, "p2_5000", multi)
ref_ht2_multi = _get_refvals(referent, "z2_multi", multi)
ref_p2_multi = _get_refvals(referent, "p2_multi", multi)
ref_p_lev2d = _get_refvals(referent, "p_lev2d", multi)
hts = getvar(wrfin, "z", timeidx=timeidx)
p = getvar(wrfin, "pressure", timeidx=timeidx)
wspd_wdir = getvar(wrfin, "wspd_wdir", timeidx=timeidx)
# Make sure the numpy versions work first
hts_500 = interplevel(to_np(hts), to_np(p), 500)
hts_500 = interplevel(hts, p, 500)
# Note: the '*2*' versions in the reference are testing
# against the new version of interplevel in NCL
nt.assert_allclose(to_np(hts_500), ref_ht_500)
nt.assert_allclose(to_np(hts_500), ref_ht2_500)
# Make sure the numpy versions work first
p_5000 = interplevel(to_np(p), to_np(hts), 5000)
p_5000 = interplevel(p, hts, 5000)
nt.assert_allclose(to_np(p_5000), ref_p_5000)
nt.assert_allclose(to_np(p_5000), ref_p2_5000)
hts_multi = interplevel(to_np(hts), to_np(p),
[1000., 850., 500., 250.])
hts_multi = interplevel(hts, p, [1000., 850., 500., 250.])
nt.assert_allclose(to_np(hts_multi), ref_ht_multi)
nt.assert_allclose(to_np(hts_multi), ref_ht2_multi)
p_multi = interplevel(to_np(p), to_np(hts),
[500., 2500., 5000., 10000.])
p_multi = interplevel(p, hts, [500., 2500., 5000., 10000.])
nt.assert_allclose(to_np(p_multi), ref_p_multi)
nt.assert_allclose(to_np(p_multi), ref_p2_multi)
pblh = getvar(wrfin, "PBLH", timeidx=timeidx)
p_lev2d = interplevel(to_np(p), to_np(hts), to_np(pblh))
p_lev2d = interplevel(p, hts, pblh)
nt.assert_allclose(to_np(p_lev2d), ref_p_lev2d)
# Just make sure these run below
wspd_wdir_500 = interplevel(to_np(wspd_wdir), to_np(p), 500)
wspd_wdir_500 = interplevel(wspd_wdir, p, 500)
wspd_wdir_multi = interplevel(to_np(wspd_wdir), to_np(p),
[1000, 500, 250])
wdpd_wdir_multi = interplevel(wspd_wdir, p, [1000, 500, 250])
wspd_wdir_pblh = interplevel(to_np(wspd_wdir), to_np(hts), pblh)
wspd_wdir_pblh = interplevel(wspd_wdir, hts, pblh)
if multi:
wspd_wdir_pblh_2 = interplevel(to_np(wspd_wdir),
to_np(hts), pblh[0, :])
wspd_wdir_pblh_2 = interplevel(wspd_wdir, hts, pblh[0, :])
# Since PBLH doesn't change in this case, it should match
# the 0 time from previous computation. Note that this
# only works when the data has 1 time step that is repeated.
# If you use a different case with multiple times,
# this will probably fail.
nt.assert_allclose(to_np(wspd_wdir_pblh_2[:, 0, :]),
to_np(wspd_wdir_pblh[:, 0, :]))
nt.assert_allclose(to_np(wspd_wdir_pblh_2[:, -1, :]),
to_np(wspd_wdir_pblh[:, 0, :]))
elif (varname == "vertcross"):
ref_ht_cross = _get_refvals(referent, "ht_cross", multi)
ref_p_cross = _get_refvals(referent, "p_cross", multi)
ref_ht_vertcross1 = _get_refvals(referent, "ht_vertcross1",
multi)
ref_ht_vertcross2 = _get_refvals(referent, "ht_vertcross2",
multi)
ref_ht_vertcross3 = _get_refvals(referent, "ht_vertcross3",
multi)
hts = getvar(wrfin, "z", timeidx=timeidx)
p = getvar(wrfin, "pressure", timeidx=timeidx)
pivot_point = CoordPair(hts.shape[-1] / 2, hts.shape[-2] / 2)
# Beginning in wrf-python 1.3, users can select number of levels.
# Originally, for pressure, dz was 10, so let's back calculate
# the number of levels.
p_max = np.floor(np.amax(p)/10) * 10 # bottom value
p_min = np.ceil(np.amin(p)/10) * 10 # top value
p_autolevels = int((p_max - p_min) / 10)
# Make sure the numpy versions work first
ht_cross = vertcross(to_np(hts), to_np(p),
pivot_point=pivot_point, angle=90.,
autolevels=p_autolevels)
ht_cross = vertcross(hts, p, pivot_point=pivot_point, angle=90.,
autolevels=p_autolevels)
try:
nt.assert_allclose(to_np(ht_cross), ref_ht_cross, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(ht_cross) - ref_ht_cross)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
lats = hts.coords["XLAT"]
lons = hts.coords["XLONG"]
# Test the manual projection method with lat/lon
# Only do this for the non-multi case, since the domain
# might be moving
if not multi:
if lats.ndim > 2: # moving nest
lats = lats[0, :]
lons = lons[0, :]
ll_point = ll_points(lats, lons)
pivot = CoordPair(lat=lats[int(lats.shape[-2]/2),
int(lats.shape[-1]/2)],
lon=lons[int(lons.shape[-2]/2),
int(lons.shape[-1]/2)])
v1 = vertcross(hts, p, wrfin=wrfin, pivot_point=pivot_point,
angle=90.0)
v2 = vertcross(hts, p, projection=hts.attrs["projection"],
ll_point=ll_point, pivot_point=pivot_point,
angle=90.)
try:
nt.assert_allclose(to_np(v1), to_np(v2), atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(v1) - to_np(v2))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Test opposite
p_cross1 = vertcross(p, hts, pivot_point=pivot_point, angle=90.0)
try:
nt.assert_allclose(to_np(p_cross1), ref_p_cross, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(p_cross1) - ref_p_cross)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Test point to point
start_point = CoordPair(0, hts.shape[-2]/2)
end_point = CoordPair(-1, hts.shape[-2]/2)
p_cross2 = vertcross(p, hts, start_point=start_point,
end_point=end_point)
try:
nt.assert_allclose(to_np(p_cross1), to_np(p_cross2),
atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(p_cross1) - to_np(p_cross2))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Check the new vertcross routine
pivot_point = CoordPair(hts.shape[-1] / 2, hts.shape[-2] / 2)
ht_cross = vertcross(hts, p, pivot_point=pivot_point, angle=90.,
latlon=True)
try:
nt.assert_allclose(to_np(ht_cross),
to_np(ref_ht_vertcross1), atol=.005)
except AssertionError:
absdiff = np.abs(to_np(ht_cross) - to_np(ref_ht_vertcross1))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
levels = [1000., 850., 700., 500., 250.]
ht_cross = vertcross(hts, p, pivot_point=pivot_point, angle=90.,
levels=levels, latlon=True)
try:
nt.assert_allclose(to_np(ht_cross), to_np(ref_ht_vertcross2),
atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(ht_cross) - to_np(ref_ht_vertcross2))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
idxs = (0, slice(None)) if lats.ndim > 2 else (slice(None),)
start_lat = np.amin(lats[idxs]) + .25*(np.amax(lats[idxs])
- np.amin(lats[idxs]))
end_lat = np.amin(lats[idxs]) + .65*(np.amax(lats[idxs])
- np.amin(lats[idxs]))
start_lon = np.amin(lons[idxs]) + .25*(np.amax(lons[idxs])
- np.amin(lons[idxs]))
end_lon = np.amin(lons[idxs]) + .65*(np.amax(lons[idxs])
- np.amin(lons[idxs]))
start_point = CoordPair(lat=start_lat, lon=start_lon)
end_point = CoordPair(lat=end_lat, lon=end_lon)
ll_point = ll_points(lats[idxs], lons[idxs])
ht_cross = vertcross(hts, p,
start_point=start_point,
end_point=end_point,
projection=hts.attrs["projection"],
ll_point=ll_point,
latlon=True,
autolevels=1000)
try:
nt.assert_allclose(to_np(ht_cross), to_np(ref_ht_vertcross3),
atol=.01)
except AssertionError:
absdiff = np.abs(to_np(ht_cross) - to_np(ref_ht_vertcross3))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
if multi:
ntimes = hts.shape[0]
for t in range(ntimes):
hts = getvar(wrfin, "z", timeidx=t)
p = getvar(wrfin, "pressure", timeidx=t)
ht_cross = vertcross(
hts, p, start_point=start_point, end_point=end_point,
wrfin=wrfin, timeidx=t, latlon=True, autolevels=1000)
refname = "ht_vertcross_t{}".format(t)
ref_ht_vertcross = _get_refvals(referent, refname, False)
try:
nt.assert_allclose(to_np(ht_cross),
to_np(ref_ht_vertcross), atol=.01)
except AssertionError:
absdiff = np.abs(to_np(ht_cross) -
to_np(ref_ht_vertcross))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
elif (varname == "interpline"):
ref_t2_line = _get_refvals(referent, "t2_line", multi)
ref_t2_line2 = _get_refvals(referent, "t2_line2", multi)
ref_t2_line3 = _get_refvals(referent, "t2_line3", multi)
t2 = getvar(wrfin, "T2", timeidx=timeidx)
pivot_point = CoordPair(t2.shape[-1] / 2, t2.shape[-2] / 2)
# Make sure the numpy version works
t2_line1 = interpline(to_np(t2), pivot_point=pivot_point,
angle=90.0)
t2_line1 = interpline(t2, pivot_point=pivot_point, angle=90.0)
nt.assert_allclose(to_np(t2_line1), ref_t2_line)
# Test the new NCL wrf_user_interplevel result
try:
nt.assert_allclose(to_np(t2_line1), ref_t2_line2)
except AssertionError:
absdiff = np.abs(to_np(t2_line1) - ref_t2_line2)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Test the manual projection method with lat/lon
lats = t2.coords["XLAT"]
lons = t2.coords["XLONG"]
if multi:
if lats.ndim > 2: # moving nest
lats = lats[0, :]
lons = lons[0, :]
ll_point = ll_points(lats, lons)
pivot = CoordPair(lat=lats[int(lats.shape[-2]/2),
int(lats.shape[-1]/2)],
lon=lons[int(lons.shape[-2]/2),
int(lons.shape[-1]/2)])
l1 = interpline(t2, wrfin=wrfin, pivot_point=pivot_point,
angle=90.0)
l2 = interpline(t2, projection=t2.attrs["projection"],
ll_point=ll_point, pivot_point=pivot_point,
angle=90.)
try:
nt.assert_allclose(to_np(l1), to_np(l2))
except AssertionError:
absdiff = np.abs(to_np(l1) - to_np(l2))
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Test point to point
start_point = CoordPair(0, t2.shape[-2]/2)
end_point = CoordPair(-1, t2.shape[-2]/2)
t2_line2 = interpline(t2, start_point=start_point,
end_point=end_point)
try:
nt.assert_allclose(to_np(t2_line1), to_np(t2_line2))
except AssertionError:
absdiff = np.abs(to_np(t2_line1) - t2_line2)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Now test the start/end with lat/lon points
start_lat = float(np.amin(lats) + .25*(np.amax(lats)
- np.amin(lats)))
end_lat = float(np.amin(lats) + .65*(np.amax(lats)
- np.amin(lats)))
start_lon = float(np.amin(lons) + .25*(np.amax(lons)
- np.amin(lons)))
end_lon = float(np.amin(lons) + .65*(np.amax(lons)
- np.amin(lons)))
start_point = CoordPair(lat=start_lat, lon=start_lon)
end_point = CoordPair(lat=end_lat, lon=end_lon)
t2_line3 = interpline(t2, wrfin=wrfin, timeidx=0,
start_point=start_point,
end_point=end_point, latlon=True)
try:
nt.assert_allclose(to_np(t2_line3), ref_t2_line3, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(t2_line3) - ref_t2_line3)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Test all time steps
if multi:
refnc = NetCDF(referent)
ntimes = t2.shape[0]
for t in range(ntimes):
t2 = getvar(wrfin, "T2", timeidx=t)
line = interpline(
t2, wrfin=wrfin, timeidx=t, start_point=start_point,
end_point=end_point, latlon=True)
refname = "t2_line_t{}".format(t)
refline = refnc.variables[refname][:]
try:
nt.assert_allclose(to_np(line), to_np(refline),
atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(line) - refline)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
refnc.close()
# Test NCLs single time case
if not multi:
refnc = NetCDF(referent)
ref_t2_line4 = refnc.variables["t2_line4"][:]
t2 = getvar(wrfin, "T2", timeidx=0)
line = interpline(t2, wrfin=wrfin, timeidx=0,
start_point=start_point,
end_point=end_point, latlon=True)
try:
nt.assert_allclose(to_np(line), to_np(ref_t2_line4),
atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(line) - ref_t2_line4)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
finally:
refnc.close()
elif (varname == "vinterp"):
# Tk to theta
fld_tk_theta = _get_refvals(referent, "fld_tk_theta", multi)
fld_tk_theta = np.squeeze(fld_tk_theta)
tk = getvar(wrfin, "temp", timeidx=timeidx, units="k")
interp_levels = [200, 300, 500, 1000]
# Make sure the numpy version works
field = vinterp(wrfin,
field=to_np(tk),
vert_coord="theta",
interp_levels=interp_levels,
extrapolate=True,
field_type="tk",
timeidx=timeidx,
log_p=True)
field = vinterp(wrfin,
field=tk,
vert_coord="theta",
interp_levels=interp_levels,
extrapolate=True,
field_type="tk",
timeidx=timeidx,
log_p=True)
tol = 0/100.
atol = 0.0001
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_tk_theta)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_tk_theta)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Tk to theta-e
fld_tk_theta_e = _get_refvals(referent, "fld_tk_theta_e", multi)
fld_tk_theta_e = np.squeeze(fld_tk_theta_e)
interp_levels = [200, 300, 500, 1000]
field = vinterp(wrfin,
field=tk,
vert_coord="theta-e",
interp_levels=interp_levels,
extrapolate=True,
field_type="tk",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_tk_theta_e, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_tk_theta_e)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Tk to pressure
fld_tk_pres = _get_refvals(referent, "fld_tk_pres", multi)
fld_tk_pres = np.squeeze(fld_tk_pres)
interp_levels = [850, 500]
field = vinterp(wrfin,
field=tk,
vert_coord="pressure",
interp_levels=interp_levels,
extrapolate=True,
field_type="tk",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_tk_pres, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_tk_pres)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Tk to geoht_msl
fld_tk_ght_msl = _get_refvals(referent, "fld_tk_ght_msl", multi)
fld_tk_ght_msl = np.squeeze(fld_tk_ght_msl)
interp_levels = [1, 2]
field = vinterp(wrfin,
field=tk,
vert_coord="ght_msl",
interp_levels=interp_levels,
extrapolate=True,
field_type="tk",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_tk_ght_msl, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_tk_ght_msl)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Tk to geoht_agl
fld_tk_ght_agl = _get_refvals(referent, "fld_tk_ght_agl", multi)
fld_tk_ght_agl = np.squeeze(fld_tk_ght_agl)
interp_levels = [1, 2]
field = vinterp(wrfin,
field=tk,
vert_coord="ght_agl",
interp_levels=interp_levels,
extrapolate=True,
field_type="tk",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_tk_ght_agl, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_tk_ght_agl)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Hgt to pressure
fld_ht_pres = _get_refvals(referent, "fld_ht_pres", multi)
fld_ht_pres = np.squeeze(fld_ht_pres)
z = getvar(wrfin, "height", timeidx=timeidx, units="m")
interp_levels = [500, 50]
field = vinterp(wrfin,
field=z,
vert_coord="pressure",
interp_levels=interp_levels,
extrapolate=True,
field_type="ght",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_ht_pres, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_ht_pres)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Pressure to theta
fld_pres_theta = _get_refvals(referent, "fld_pres_theta", multi)
fld_pres_theta = np.squeeze(fld_pres_theta)
p = getvar(wrfin, "pressure", timeidx=timeidx)
interp_levels = [200, 300, 500, 1000]
field = vinterp(wrfin,
field=p,
vert_coord="theta",
interp_levels=interp_levels,
extrapolate=True,
field_type="pressure",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_pres_theta, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_pres_theta)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
# Theta-e to pres
fld_thetae_pres = _get_refvals(referent, "fld_thetae_pres", multi)
fld_thetae_pres = np.squeeze(fld_thetae_pres)
eth = getvar(wrfin, "eth", timeidx=timeidx)
interp_levels = [850, 500, 5]
field = vinterp(wrfin,
field=eth,
vert_coord="pressure",
interp_levels=interp_levels,
extrapolate=True,
field_type="theta-e",
timeidx=timeidx,
log_p=True)
field = np.squeeze(field)
try:
nt.assert_allclose(to_np(field), fld_thetae_pres, atol=.0001)
except AssertionError:
absdiff = np.abs(to_np(field) - fld_thetae_pres)
maxdiff = np.amax(absdiff)
print(maxdiff)
print(np.argwhere(absdiff == maxdiff))
raise
return test
def extract_proj_params(wrfnc, timeidx=0):
attrs = extract_global_attrs(wrfnc, ("MAP_PROJ", "TRUELAT1", "TRUELAT2",
"STAND_LON", "POLE_LAT", "POLE_LON",
"DX", "DY"))
result = {key.lower(): val for key, val in viewitems(attrs)}
_timeidx = timeidx
if is_multi_file(wrfnc):
wrfnc0 = wrfnc[0]
num_times_per_file = len(wrfnc0.dimensions["Time"])
file_idx = timeidx // num_times_per_file
_timeidx = timeidx % num_times_per_file
wrfnc = wrfnc[file_idx]
result["known_x"] = 0
result["known_y"] = 0
result["ref_lat"] = wrfnc.variables["XLAT"][_timeidx, 0, 0]
result["ref_lon"] = wrfnc.variables["XLONG"][_timeidx, 0, 0]
return result
def make_latlon_test(testid, dir, pattern, referent, single,
multi=False, pynio=False):
def test(self):
try:
from netCDF4 import Dataset as NetCDF
except ImportError:
pass
try:
import Nio
except ImportError:
pass
timeidx = 0 if not multi else None
pat = os.path.join(dir, pattern)
wrf_files = glob.glob(pat)
wrf_files.sort()
refnc = NetCDF(referent)
try:
refnc.set_always_mask(False)
except AttributeError:
pass
wrfin = []
for fname in wrf_files:
if not pynio:
f = NetCDF(fname)
try:
f.set_auto_mask(False)
except AttributeError:
pass
wrfin.append(f)
else:
if not fname.endswith(".nc"):
_fname = fname + ".nc"
else:
_fname = fname
f = Nio.open_file(_fname)
wrfin.append(f)
if testid == "xy":
# Lats/Lons taken from NCL script, just hard-coding for now
lats = [22.0, 25.0, 27.0]
lons = [-90.0, -87.5, -83.75]
# Just call with a single lat/lon
if single:
timeidx = 8
ref_vals = refnc.variables["xy2"][:]
xy = ll_to_xy(wrfin, lats[0], lons[0], timeidx=timeidx,
as_int=True)
ref = ref_vals[:, 0]
nt.assert_allclose(to_np(xy), ref)
# Next make sure the 'proj' version works
projparams = extract_proj_params(wrfin, timeidx=timeidx)
xy_proj = ll_to_xy_proj(lats[0], lons[0], as_int=True,
**projparams)
nt.assert_allclose(to_np(xy_proj), to_np(xy))
else:
ref_vals = refnc.variables["xy1"][:]
xy = ll_to_xy(wrfin, lats, lons, timeidx=None, as_int=False)
ref = ref_vals[:]
nt.assert_allclose(to_np(xy), ref)
if xy.ndim > 2:
# Moving nest
is_moving = True
numtimes = xy.shape[-2]
else:
is_moving = False
numtimes = 1
for tidx in range(9):
# Next make sure the 'proj' version works
projparams = extract_proj_params(wrfin, timeidx=tidx)
xy_proj = ll_to_xy_proj(lats, lons, as_int=False,
**projparams)
if is_moving:
idxs = (slice(None), tidx, slice(None))
else:
idxs = (slice(None),)
nt.assert_allclose(to_np(xy_proj), to_np(xy[idxs]))
else:
# i_s, j_s taken from NCL script, just hard-coding for now
# NCL uses 1-based indexing for this, so need to subtract 1
x_s = np.asarray([10, 50, 90], int)
y_s = np.asarray([10, 50, 90], int)
if single:
timeidx = 8
ref_vals = refnc.variables["ll2"][:]
ll = xy_to_ll(wrfin, x_s[0], y_s[0], timeidx=timeidx)
ref = ref_vals[::-1, 0]
nt.assert_allclose(to_np(ll), ref)
# Next make sure the 'proj' version works
projparams = extract_proj_params(wrfin, timeidx=8)
ll_proj = xy_to_ll_proj(x_s[0], y_s[0], **projparams)
nt.assert_allclose(to_np(ll_proj), to_np(ll))
else:
ref_vals = refnc.variables["ll1"][:]
ll = xy_to_ll(wrfin, x_s, y_s, timeidx=None)
ref = ref_vals[::-1, :]
nt.assert_allclose(to_np(ll), ref)
if ll.ndim > 2:
# Moving nest
is_moving = True
numtimes = ll.shape[-2]
else:
is_moving = False
numtimes = 1
for tidx in range(numtimes):
# Next make sure the 'proj' version works
projparams = extract_proj_params(wrfin, timeidx=tidx)
ll_proj = xy_to_ll_proj(x_s, y_s, **projparams)
if is_moving:
idxs = (slice(None), tidx, slice(None))
else:
idxs = (slice(None),)
nt.assert_allclose(to_np(ll_proj), to_np(ll[idxs]))
return test
class WRFVarsTest(ut.TestCase):
longMessage = True
class WRFInterpTest(ut.TestCase):
longMessage = True
class WRFLatLonTest(ut.TestCase):
longMessage = True
if __name__ == "__main__":
from wrf import (omp_set_num_threads, omp_set_schedule, omp_get_schedule,
omp_set_dynamic, omp_get_num_procs, OMP_SCHED_STATIC)
omp_set_num_threads(omp_get_num_procs()//2)
omp_set_schedule(OMP_SCHED_STATIC, 0)
omp_set_dynamic(False)
ignore_vars = [] # Not testable yet
wrf_vars = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz",
"geopt", "helicity", "lat", "lon", "omg", "p", "pressure",
"pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc",
"theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va",
"wa", "uvmet10", "uvmet", "z", "cfrac", "zstag", "geopt_stag",
"height_agl", "wspd_wdir", "wspd_wdir10", "uvmet_wspd_wdir",
"uvmet10_wspd_wdir"]
interp_methods = ["interplevel", "vertcross", "interpline", "vinterp"]
latlon_tests = ["xy", "ll"]
for dir, ref_nc_file, nest in zip(DIRS, REF_NC_FILES, NEST):
try:
import netCDF4
except ImportError:
pass
else:
for var in wrf_vars:
if var in ignore_vars:
continue
test_func1 = make_test(var, dir, PATTERN, ref_nc_file)
test_func2 = make_test(var, dir, PATTERN, ref_nc_file,
multi=True)
setattr(WRFVarsTest, 'test_{0}_{1}'.format(nest, var),
test_func1)
setattr(WRFVarsTest, 'test_{0}_multi_{1}'.format(nest, var),
test_func2)
for method in interp_methods:
test_interp_func1 = make_interp_test(method, dir, PATTERN,
ref_nc_file)
test_interp_func2 = make_interp_test(method, dir, PATTERN,
ref_nc_file, multi=True)
setattr(WRFInterpTest, 'test_{0}_{1}'.format(nest, method),
test_interp_func1)
setattr(WRFInterpTest,
'test_{0}_multi_{1}'.format(nest, method),
test_interp_func2)
for testid in latlon_tests:
for single in (True, False):
for multi in (True, False):
test_ll_func = make_latlon_test(testid, dir, PATTERN,
ref_nc_file,
single=single,
multi=multi,
pynio=False)
multistr = "" if not multi else "_multi"
singlestr = "_nosingle" if not single else "_single"
test_name = "test_{}_{}{}{}".format(nest, testid,
singlestr,
multistr)
setattr(WRFLatLonTest, test_name, test_ll_func)
try:
import PyNIO
except ImportError:
pass
else:
for var in wrf_vars:
if var in ignore_vars:
continue
test_func1 = make_test(var, dir, PATTERN, ref_nc_file,
pynio=True)
test_func2 = make_test(var, dir, PATTERN, ref_nc_file,
multi=True,
pynio=True)
setattr(WRFVarsTest, 'test_pynio_{0}_{1}'.format(
nest, var), test_func1)
setattr(WRFVarsTest, 'test_pynio_{0}_multi_{1}'.format(
nest, var), test_func2)
for method in interp_methods:
test_interp_func1 = make_interp_test(method, dir, PATTERN,
ref_nc_file)
test_interp_func2 = make_interp_test(method, dir, PATTERN,
ref_nc_file, multi=True)
setattr(WRFInterpTest, 'test_pynio_{0}_{1}'.format(nest,
method),
test_interp_func1)
setattr(WRFInterpTest, 'test_pynio_{0}_multi_{1}'.format(
nest, method), test_interp_func2)
for testid in latlon_tests:
for single in (True, False):
for multi in (True, False):
test_ll_func = make_latlon_test(testid, dir, PATTERN,
ref_nc_file,
single=single,
multi=multi,
pynio=False)
multistr = "" if not multi else "_multi"
singlestr = "_nosingle" if not single else "_single"
test_name = "test_pynio_{}_{}{}{}".format(
nest, testid, singlestr, multistr)
setattr(WRFLatLonTest, test_name, test_ll_func)
ut.main()