Bill Ladwig 6 years ago
parent
commit
4651ec4d1b
  1. 123
      test/ci_tests/make_test_file.py
  2. 208
      test/ci_tests/utests.py

123
test/ci_tests/make_test_file.py

@ -9,19 +9,19 @@ from netCDF4 import Dataset
from wrf import (getvar, interplevel, interpline, vertcross, vinterp, py2round, from wrf import (getvar, interplevel, interpline, vertcross, vinterp, py2round,
CoordPair, ll_to_xy, xy_to_ll, to_np) CoordPair, ll_to_xy, xy_to_ll, to_np)
VARS_TO_KEEP = ("Times", "XLAT", "XLONG", "XLAT_U", "XLAT_V", "XLONG_U", VARS_TO_KEEP = ("Times", "XLAT", "XLONG", "XLAT_U", "XLAT_V", "XLONG_U",
"XLONG_V", "U", "V", "W", "PH", "PHB", "T", "P", "PB", "Q2", "XLONG_V", "U", "V", "W", "PH", "PHB", "T", "P", "PB", "Q2",
"T2", "PSFC", "U10", "V10", "XTIME", "QVAPOR", "QCLOUD", "T2", "PSFC", "U10", "V10", "XTIME", "QVAPOR", "QCLOUD",
"QGRAUP", "QRAIN", "QSNOW", "QICE", "MAPFAC_M", "MAPFAC_U", "QGRAUP", "QRAIN", "QSNOW", "QICE", "MAPFAC_M", "MAPFAC_U",
"MAPFAC_V", "F", "HGT", "RAINC", "RAINSH", "RAINNC") "MAPFAC_V", "F", "HGT", "RAINC", "RAINSH", "RAINNC")
DIMS_TO_TRIM = ("west_east", "south_north", "bottom_top", "bottom_top_stag", DIMS_TO_TRIM = ("west_east", "south_north", "bottom_top", "bottom_top_stag",
"west_east_stag", "south_north_stag") "west_east_stag", "south_north_stag")
WRF_DIAGS = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz", WRF_DIAGS = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz",
"geopt", "helicity", "lat", "lon", "omg", "p", "pressure", "geopt", "helicity", "lat", "lon", "omg", "p", "pressure",
"pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc", "pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc",
"theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va", "theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va",
"wa", "uvmet10", "uvmet", "z", "cfrac", "zstag"] "wa", "uvmet10", "uvmet", "z", "cfrac", "zstag"]
INTERP_METHS = ["interplevel", "vertcross", "interpline", "vinterp"] INTERP_METHS = ["interplevel", "vertcross", "interpline", "vinterp"]
@ -33,56 +33,56 @@ def copy_and_reduce(opts):
infilename = opts.filename infilename = opts.filename
outfilename = os.path.expanduser( outfilename = os.path.expanduser(
os.path.join(opts.outdir, "ci_test_file.nc")) os.path.join(opts.outdir, "ci_test_file.nc"))
with Dataset(infilename) as infile, Dataset(outfilename, "w") as outfile: with Dataset(infilename) as infile, Dataset(outfilename, "w") as outfile:
# Copy the global attributes # Copy the global attributes
outfile.setncatts(infile.__dict__) outfile.setncatts(infile.__dict__)
# Copy Dimensions # Copy Dimensions
for name, dimension in infile.dimensions.iteritems(): for name, dimension in infile.dimensions.iteritems():
if name in DIMS_TO_TRIM: if name in DIMS_TO_TRIM:
if name.find("_stag") > 0: if name.find("_stag") > 0:
dimsize = (len(dimension) / 2 dimsize = (len(dimension) / 2
if len(dimension) % 2 == 0 if len(dimension) % 2 == 0
else (len(dimension) + 1) / 2) else (len(dimension) + 1) / 2)
else: else:
dimsize = (len(dimension) / 2 dimsize = (len(dimension) / 2
if len(dimension) % 2 == 0 if len(dimension) % 2 == 0
else (len(dimension) - 1) / 2) else (len(dimension) - 1) / 2)
else: else:
dimsize = len(dimension) dimsize = len(dimension)
outfile.createDimension(name, dimsize) outfile.createDimension(name, dimsize)
# Copy Variables # Copy Variables
for name, variable in infile.variables.iteritems(): for name, variable in infile.variables.iteritems():
if name not in VARS_TO_KEEP: if name not in VARS_TO_KEEP:
continue continue
outvar = outfile.createVariable(name, variable.datatype, outvar = outfile.createVariable(name, variable.datatype,
variable.dimensions, zlib=True) variable.dimensions, zlib=True)
in_slices = tuple(slice(0, dimsize) for dimsize in outvar.shape) in_slices = tuple(slice(0, dimsize) for dimsize in outvar.shape)
outvar[:] = variable[in_slices] outvar[:] = variable[in_slices]
outvar.setncatts(infile.variables[name].__dict__) outvar.setncatts(infile.variables[name].__dict__)
def add_to_ncfile(outfile, var, varname): def add_to_ncfile(outfile, var, varname):
dim_d = dict(zip(var.dims, var.shape)) dim_d = dict(zip(var.dims, var.shape))
for dimname, dimsize in dim_d.items(): for dimname, dimsize in dim_d.items():
if dimname not in outfile.dimensions: if dimname not in outfile.dimensions:
outfile.createDimension(dimname, dimsize) outfile.createDimension(dimname, dimsize)
fill_value = None fill_value = None
if "_FillValue" in var.attrs: if "_FillValue" in var.attrs:
fill_value = var.attrs["_FillValue"] fill_value = var.attrs["_FillValue"]
ncvar = outfile.createVariable(varname, var.dtype, var.dims, ncvar = outfile.createVariable(varname, var.dtype, var.dims,
zlib=True, fill_value=fill_value) zlib=True, fill_value=fill_value)
var_vals = to_np(var) var_vals = to_np(var)
ncvar[:] = var_vals[:] ncvar[:] = var_vals[:]
@ -92,78 +92,79 @@ def make_result_file(opts):
os.path.join(opts.outdir, "ci_test_file.nc")) os.path.join(opts.outdir, "ci_test_file.nc"))
outfilename = os.path.expanduser( outfilename = os.path.expanduser(
os.path.join(opts.outdir, "ci_result_file.nc")) os.path.join(opts.outdir, "ci_result_file.nc"))
with Dataset(infilename) as infile, Dataset(outfilename, "w") as outfile: with Dataset(infilename) as infile, Dataset(outfilename, "w") as outfile:
for varname in WRF_DIAGS: for varname in WRF_DIAGS:
var = getvar(infile, varname) var = getvar(infile, varname)
add_to_ncfile(outfile, var, varname) add_to_ncfile(outfile, var, varname)
for interptype in INTERP_METHS: for interptype in INTERP_METHS:
if interptype == "interpline": if interptype == "interpline":
hts = getvar(infile, "z") hts = getvar(infile, "z")
p = getvar(infile, "pressure") p = getvar(infile, "pressure")
hts_850 = interplevel(hts, p, 850.) hts_850 = interplevel(hts, p, 850.)
add_to_ncfile(outfile, hts_850, "interplevel") add_to_ncfile(outfile, hts_850, "interplevel")
if interptype == "vertcross": if interptype == "vertcross":
hts = getvar(infile, "z") hts = getvar(infile, "z")
p = getvar(infile, "pressure") p = getvar(infile, "pressure")
pivot_point = CoordPair(hts.shape[-1] // 2, hts.shape[-2] // 2) pivot_point = CoordPair(hts.shape[-1] // 2, hts.shape[-2] // 2)
ht_cross = vertcross(hts, p, pivot_point=pivot_point, ht_cross = vertcross(hts, p, pivot_point=pivot_point,
angle=90.) angle=90.)
add_to_ncfile(outfile, ht_cross, "vertcross") add_to_ncfile(outfile, ht_cross, "vertcross")
if interptype == "interpline": if interptype == "interpline":
t2 = getvar(infile, "T2") t2 = getvar(infile, "T2")
pivot_point = CoordPair(t2.shape[-1] // 2, t2.shape[-2] // 2) pivot_point = CoordPair(t2.shape[-1] // 2, t2.shape[-2] // 2)
t2_line = interpline(t2, pivot_point=pivot_point, angle=90.0) t2_line = interpline(t2, pivot_point=pivot_point, angle=90.0)
add_to_ncfile(outfile, t2_line, "interpline") add_to_ncfile(outfile, t2_line, "interpline")
if interptype == "vinterp": if interptype == "vinterp":
tk = getvar(infile, "temp", units="k") tk = getvar(infile, "temp", units="k")
interp_levels = [200,300,500,1000] interp_levels = [200, 300, 500, 1000]
field = vinterp(infile, field = vinterp(infile,
field=tk, field=tk,
vert_coord="theta", vert_coord="theta",
interp_levels=interp_levels, interp_levels=interp_levels,
extrapolate=True, extrapolate=True,
field_type="tk", field_type="tk",
log_p=True) log_p=True)
add_to_ncfile(outfile, field, "vinterp") add_to_ncfile(outfile, field, "vinterp")
for latlonmeth in LATLON_METHS: for latlonmeth in LATLON_METHS:
if latlonmeth == "xy": if latlonmeth == "xy":
# Hardcoded values from other unit tests # Hardcoded values from other unit tests
lats = [22.0, 25.0, 27.0] lats = [22.0, 25.0, 27.0]
lons = [-90.0, -87.5, -83.75] lons = [-90.0, -87.5, -83.75]
xy = ll_to_xy(infile, lats[0], lons[0]) xy = ll_to_xy(infile, lats[0], lons[0])
add_to_ncfile(outfile, xy, "xy") add_to_ncfile(outfile, xy, "xy")
else: else:
# Hardcoded from other unit tests # Hardcoded from other unit tests
x_s = np.asarray([10, 50, 90], int) x_s = np.asarray([10, 50, 90], int)
y_s = np.asarray([10, 50, 90], int) y_s = np.asarray([10, 50, 90], int)
ll = xy_to_ll(infile, x_s[0], y_s[0]) ll = xy_to_ll(infile, x_s[0], y_s[0])
add_to_ncfile(outfile, ll, "ll") add_to_ncfile(outfile, ll, "ll")
def main(opts): def main(opts):
copy_and_reduce(opts) copy_and_reduce(opts)
make_result_file(opts) make_result_file(opts)
if __name__ == "__main__": if __name__ == "__main__":
DEFAULT_FILE = ("/Users/ladwig/Documents/wrf_files/" DEFAULT_FILE = ("/Users/ladwig/Documents/wrf_files/"
"wrf_vortex_multi/moving_nest/" "wrf_vortex_multi/moving_nest/"
@ -171,10 +172,10 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate conda test files " parser = argparse.ArgumentParser(description="Generate conda test files "
"for unit testing.") "for unit testing.")
parser.add_argument("-f", "--filename", required=False, parser.add_argument("-f", "--filename", required=False,
default=DEFAULT_FILE, default=DEFAULT_FILE,
help="the WRF test file") help="the WRF test file")
parser.add_argument("-o", "--outdir", required=False, default="./", parser.add_argument("-o", "--outdir", required=False, default="./",
help="the location for the output files") help="the location for the output files")
opts = parser.parse_args() opts = parser.parse_args()
main(opts) main(opts)

208
test/ci_tests/utests.py

@ -1,14 +1,15 @@
import unittest as ut import unittest as ut
import numpy.testing as nt import numpy.testing as nt
import numpy as np import numpy as np
import numpy.ma as ma import numpy.ma as ma
import os, sys import os
import sys
import subprocess import subprocess
from wrf import (getvar, interplevel, interpline, vertcross, vinterp, from wrf import (getvar, interplevel, interpline, vertcross, vinterp,
disable_xarray, xarray_enabled, to_np, disable_xarray, xarray_enabled, to_np,
xy_to_ll, ll_to_xy, xy_to_ll_proj, ll_to_xy_proj, xy_to_ll, ll_to_xy, xy_to_ll_proj, ll_to_xy_proj,
extract_global_attrs, viewitems, CoordPair, extract_global_attrs, viewitems, CoordPair,
omp_get_num_procs, omp_set_num_threads) omp_get_num_procs, omp_set_num_threads)
from wrf.util import is_multi_file from wrf.util import is_multi_file
@ -20,82 +21,82 @@ REF_FILE = "ci_result_file.nc"
if sys.version_info > (3,): if sys.version_info > (3,):
xrange = range xrange = range
# Using helpful information at:
# Using helpful information at:
# http://eli.thegreenplace.net/2014/04/02/dynamically-generating-python-test-cases # http://eli.thegreenplace.net/2014/04/02/dynamically-generating-python-test-cases
def make_test(varname, wrf_in, referent, multi=False, repeat=3, pynio=False): def make_test(varname, wrf_in, referent, multi=False, repeat=3, pynio=False):
def test(self): def test(self):
from netCDF4 import Dataset as NetCDF from netCDF4 import Dataset as NetCDF
timeidx = 0 timeidx = 0
in_wrfnc = NetCDF(wrf_in) in_wrfnc = NetCDF(wrf_in)
refnc = NetCDF(referent) refnc = NetCDF(referent)
# These have a left index that defines the product type # These have a left index that defines the product type
multiproduct = varname in ("uvmet", "uvmet10", "cape_2d", "cape_3d", multiproduct = varname in ("uvmet", "uvmet10", "cape_2d", "cape_3d",
"cfrac") "cfrac")
ref_vals = refnc.variables[varname][:] ref_vals = refnc.variables[varname][:]
if (varname == "tc"): if (varname == "tc"):
my_vals = getvar(in_wrfnc, "temp", timeidx=timeidx, units="c") my_vals = getvar(in_wrfnc, "temp", timeidx=timeidx, units="c")
tol = 1/100. tol = 1/100.
atol = .1 # Note: NCL uses 273.16 as conversion for some reason atol = .1 # Note: NCL uses 273.16 as conversion for some reason
nt.assert_allclose(to_np(my_vals), ref_vals, tol, atol) nt.assert_allclose(to_np(my_vals), ref_vals, tol, atol)
elif (varname == "pw"): elif (varname == "pw"):
my_vals = getvar(in_wrfnc, "pw", timeidx=timeidx) my_vals = getvar(in_wrfnc, "pw", timeidx=timeidx)
tol = .5/100.0 tol = .5/100.0
atol = 0 # NCL uses different constants and doesn't use same atol = 0
# handrolled virtual temp in method
nt.assert_allclose(to_np(my_vals), ref_vals, tol, atol) nt.assert_allclose(to_np(my_vals), ref_vals, tol, atol)
elif (varname == "cape_2d"): elif (varname == "cape_2d"):
cape_2d = getvar(in_wrfnc, varname, timeidx=timeidx) cape_2d = getvar(in_wrfnc, varname, timeidx=timeidx)
tol = 0/100. tol = 0/100.
atol = 200.0 atol = 200.0
# Let's only compare CAPE values until the F90 changes are # Let's only compare CAPE values until the F90 changes are
# merged back in to NCL. The modifications to the R and CP # merged back in to NCL. The modifications to the R and CP
# changes TK enough that non-lifting parcels could lift, thus # changes TK enough that non-lifting parcels could lift, thus
# causing wildly different values in LCL # causing wildly different values in LCL
nt.assert_allclose(to_np(cape_2d[0,:]), ref_vals[0,:], tol, atol) nt.assert_allclose(to_np(cape_2d[0, :]), ref_vals[0, :], tol, atol)
elif (varname == "cape_3d"): elif (varname == "cape_3d"):
cape_3d = getvar(in_wrfnc, varname, timeidx=timeidx) cape_3d = getvar(in_wrfnc, varname, timeidx=timeidx)
# Changing the R and CP constants, while keeping TK within # Changing the R and CP constants, while keeping TK within
# 2%, can lead to some big changes in CAPE. Tolerances # 2%, can lead to some big changes in CAPE. Tolerances
# have been set wide when comparing the with the original # have been set wide when comparing the with the original
# NCL. Change back when the F90 code is merged back with # NCL. Change back when the F90 code is merged back with
# NCL # NCL
tol = 0/100. tol = 0/100.
atol = 200.0 atol = 200.0
#print np.amax(np.abs(to_np(cape_3d[0,:]) - ref_vals[0,:])) # print np.amax(np.abs(to_np(cape_3d[0,:]) - ref_vals[0,:]))
nt.assert_allclose(to_np(cape_3d), ref_vals, tol, atol) nt.assert_allclose(to_np(cape_3d), ref_vals, tol, atol)
else: else:
my_vals = getvar(in_wrfnc, varname, timeidx=timeidx) my_vals = getvar(in_wrfnc, varname, timeidx=timeidx)
tol = 2/100. tol = 2/100.
atol = 0.1 atol = 0.1
#print (np.amax(np.abs(to_np(my_vals) - ref_vals))) # print (np.amax(np.abs(to_np(my_vals) - ref_vals)))
nt.assert_allclose(to_np(my_vals), ref_vals, tol, atol) nt.assert_allclose(to_np(my_vals), ref_vals, tol, atol)
return test return test
def _get_refvals(referent, varname, repeat, multi): def _get_refvals(referent, varname, repeat, multi):
from netCDF4 import Dataset as NetCDF from netCDF4 import Dataset as NetCDF
refnc = NetCDF(referent) refnc = NetCDF(referent)
ref_vals = refnc.variables[varname][:] ref_vals = refnc.variables[varname][:]
return ref_vals return ref_vals
def make_interp_test(varname, wrf_in, referent, multi=False,
def make_interp_test(varname, wrf_in, referent, multi=False,
repeat=3, pynio=False): repeat=3, pynio=False):
def test(self): def test(self):
from netCDF4 import Dataset as NetCDF from netCDF4 import Dataset as NetCDF
timeidx = 0 timeidx = 0
in_wrfnc = NetCDF(wrf_in) in_wrfnc = NetCDF(wrf_in)
if (varname == "interplevel"): if (varname == "interplevel"):
ref_ht_850 = _get_refvals(referent, "interplevel", repeat, multi) ref_ht_850 = _get_refvals(referent, "interplevel", repeat, multi)
hts = getvar(in_wrfnc, "z", timeidx=timeidx) hts = getvar(in_wrfnc, "z", timeidx=timeidx)
@ -103,168 +104,165 @@ def make_interp_test(varname, wrf_in, referent, multi=False,
# Check that it works with numpy arrays # Check that it works with numpy arrays
hts_850 = interplevel(to_np(hts), p, 850) hts_850 = interplevel(to_np(hts), p, 850)
#print (hts_850) # print (hts_850)
hts_850 = interplevel(hts, p, 850) hts_850 = interplevel(hts, p, 850)
nt.assert_allclose(to_np(hts_850), ref_ht_850) nt.assert_allclose(to_np(hts_850), ref_ht_850)
elif (varname == "vertcross"): elif (varname == "vertcross"):
ref_ht_cross = _get_refvals(referent, "vertcross", repeat, multi) ref_ht_cross = _get_refvals(referent, "vertcross", repeat, multi)
hts = getvar(in_wrfnc, "z", timeidx=timeidx) hts = getvar(in_wrfnc, "z", timeidx=timeidx)
p = getvar(in_wrfnc, "pressure", timeidx=timeidx) p = getvar(in_wrfnc, "pressure", timeidx=timeidx)
pivot_point = CoordPair(hts.shape[-1] // 2, hts.shape[-2] // 2) pivot_point = CoordPair(hts.shape[-1] // 2, hts.shape[-2] // 2)
# Check that it works with numpy arrays # Check that it works with numpy arrays
ht_cross = vertcross(to_np(hts), to_np(p), ht_cross = vertcross(to_np(hts), to_np(p),
pivot_point=pivot_point, angle=90.) pivot_point=pivot_point, angle=90.)
#print (ht_cross) # print (ht_cross)
ht_cross = vertcross(hts, p, pivot_point=pivot_point, angle=90.) ht_cross = vertcross(hts, p, pivot_point=pivot_point, angle=90.)
nt.assert_allclose(to_np(ht_cross), ref_ht_cross, rtol=.01) nt.assert_allclose(to_np(ht_cross), ref_ht_cross, rtol=.01)
elif (varname == "interpline"): elif (varname == "interpline"):
ref_t2_line = _get_refvals(referent, "interpline", repeat, multi) ref_t2_line = _get_refvals(referent, "interpline", repeat, multi)
t2 = getvar(in_wrfnc, "T2", timeidx=timeidx) t2 = getvar(in_wrfnc, "T2", timeidx=timeidx)
pivot_point = CoordPair(t2.shape[-1] // 2, t2.shape[-2] // 2) pivot_point = CoordPair(t2.shape[-1] // 2, t2.shape[-2] // 2)
# Check that it works with numpy arrays # Check that it works with numpy arrays
t2_line1 = interpline(to_np(t2), pivot_point=pivot_point, t2_line1 = interpline(to_np(t2), pivot_point=pivot_point,
angle=90.0) angle=90.0)
#print (t2_line1) # print (t2_line1)
t2_line1 = interpline(t2, pivot_point=pivot_point, angle=90.0) t2_line1 = interpline(t2, pivot_point=pivot_point, angle=90.0)
nt.assert_allclose(to_np(t2_line1), ref_t2_line) nt.assert_allclose(to_np(t2_line1), ref_t2_line)
elif (varname == "vinterp"): elif (varname == "vinterp"):
# Tk to theta # Tk to theta
fld_tk_theta = _get_refvals(referent, "vinterp", repeat, multi) fld_tk_theta = _get_refvals(referent, "vinterp", repeat, multi)
fld_tk_theta = np.squeeze(fld_tk_theta) fld_tk_theta = np.squeeze(fld_tk_theta)
tk = getvar(in_wrfnc, "temp", timeidx=timeidx, units="k") tk = getvar(in_wrfnc, "temp", timeidx=timeidx, units="k")
interp_levels = [200,300,500,1000] interp_levels = [200, 300, 500, 1000]
# Check that it works with numpy arrays # Check that it works with numpy arrays
field = vinterp(in_wrfnc, field = vinterp(in_wrfnc,
field=to_np(tk), field=to_np(tk),
vert_coord="theta", vert_coord="theta",
interp_levels=interp_levels, interp_levels=interp_levels,
extrapolate=True, extrapolate=True,
field_type="tk", field_type="tk",
timeidx=timeidx, timeidx=timeidx,
log_p=True) log_p=True)
#print (field) # print (field)
field = vinterp(in_wrfnc, field = vinterp(in_wrfnc,
field=tk, field=tk,
vert_coord="theta", vert_coord="theta",
interp_levels=interp_levels, interp_levels=interp_levels,
extrapolate=True, extrapolate=True,
field_type="tk", field_type="tk",
timeidx=timeidx, timeidx=timeidx,
log_p=True) log_p=True)
tol = 5/100. tol = 5/100.
atol = 0.0001 atol = 0.0001
field = np.squeeze(field) field = np.squeeze(field)
nt.assert_allclose(to_np(field), fld_tk_theta, tol, atol) nt.assert_allclose(to_np(field), fld_tk_theta, tol, atol)
return test return test
def make_latlon_test(testid, wrf_in, referent, single, multi=False, repeat=3, def make_latlon_test(testid, wrf_in, referent, single, multi=False, repeat=3,
pynio=False): pynio=False):
def test(self): def test(self):
from netCDF4 import Dataset as NetCDF from netCDF4 import Dataset as NetCDF
timeidx = 0 timeidx = 0
in_wrfnc = NetCDF(wrf_in) in_wrfnc = NetCDF(wrf_in)
refnc = NetCDF(referent) refnc = NetCDF(referent)
if testid == "xy": if testid == "xy":
# Since this domain is not moving, the reference values are the # Since this domain is not moving, the reference values are the
# same whether there are multiple or single files # same whether there are multiple or single files
ref_vals = refnc.variables["xy"][:] ref_vals = refnc.variables["xy"][:]
# Lats/Lons taken from NCL script, just hard-coding for now # Lats/Lons taken from NCL script, just hard-coding for now
lats = [22.0, 25.0, 27.0] lats = [22.0, 25.0, 27.0]
lons = [-90.0, -87.5, -83.75] lons = [-90.0, -87.5, -83.75]
xy = ll_to_xy(in_wrfnc, lats[0], lons[0]) xy = ll_to_xy(in_wrfnc, lats[0], lons[0])
nt.assert_allclose(to_np(xy), ref_vals) nt.assert_allclose(to_np(xy), ref_vals)
else: else:
# Since this domain is not moving, the reference values are the # Since this domain is not moving, the reference values are the
# same whether there are multiple or single files # same whether there are multiple or single files
ref_vals = refnc.variables["ll"][:] ref_vals = refnc.variables["ll"][:]
# i_s, j_s taken from NCL script, just hard-coding for now # i_s, j_s taken from NCL script, just hard-coding for now
# NCL uses 1-based indexing for this, so need to subtract 1 # NCL uses 1-based indexing for this, so need to subtract 1
x_s = np.asarray([10, 50, 90], int) x_s = np.asarray([10, 50, 90], int)
y_s = np.asarray([10, 50, 90], int) y_s = np.asarray([10, 50, 90], int)
ll = xy_to_ll(in_wrfnc, x_s[0], y_s[0]) ll = xy_to_ll(in_wrfnc, x_s[0], y_s[0])
nt.assert_allclose(to_np(ll), ref_vals) nt.assert_allclose(to_np(ll), ref_vals)
return test return test
class WRFVarsTest(ut.TestCase): class WRFVarsTest(ut.TestCase):
longMessage = True longMessage = True
class WRFInterpTest(ut.TestCase): class WRFInterpTest(ut.TestCase):
longMessage = True longMessage = True
class WRFLatLonTest(ut.TestCase): class WRFLatLonTest(ut.TestCase):
longMessage = True longMessage = True
if __name__ == "__main__": if __name__ == "__main__":
ignore_vars = [] # Not testable yet ignore_vars = [] # Not testable yet
wrf_vars = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz", wrf_vars = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz",
"geopt", "helicity", "lat", "lon", "omg", "p", "pressure", "geopt", "helicity", "lat", "lon", "omg", "p", "pressure",
"pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc", "pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc",
"theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va", "theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va",
"wa", "uvmet10", "uvmet", "z", "cfrac", "zstag"] "wa", "uvmet10", "uvmet", "z", "cfrac", "zstag"]
interp_methods = ["interplevel", "vertcross", "interpline", "vinterp"] interp_methods = ["interplevel", "vertcross", "interpline", "vinterp"]
latlon_tests = ["xy", "ll"] latlon_tests = ["xy", "ll"]
import netCDF4 import netCDF4
for var in wrf_vars: for var in wrf_vars:
if var in ignore_vars: if var in ignore_vars:
continue continue
test_func1 = make_test(var, TEST_FILE, REF_FILE) test_func1 = make_test(var, TEST_FILE, REF_FILE)
setattr(WRFVarsTest, 'test_{0}'.format(var), test_func1) setattr(WRFVarsTest, 'test_{0}'.format(var), test_func1)
for method in interp_methods: for method in interp_methods:
test_interp_func1 = make_interp_test(method, TEST_FILE, test_interp_func1 = make_interp_test(method, TEST_FILE,
REF_FILE) REF_FILE)
setattr(WRFInterpTest, 'test_{0}'.format(method), setattr(WRFInterpTest, 'test_{0}'.format(method),
test_interp_func1) test_interp_func1)
for testid in latlon_tests: for testid in latlon_tests:
for single in (True,): for single in (True,):
for multi in (False,): for multi in (False,):
test_ll_func = make_latlon_test(testid, TEST_FILE, test_ll_func = make_latlon_test(testid, TEST_FILE,
REF_FILE, REF_FILE,
single=single, multi=multi, single=single, multi=multi,
repeat=3, pynio=False) repeat=3, pynio=False)
multistr = "" if not multi else "_multi" multistr = "" if not multi else "_multi"
singlestr = "_nosingle" if not single else "_single" singlestr = "_nosingle" if not single else "_single"
test_name = "test_{}{}{}".format(testid, singlestr, test_name = "test_{}{}{}".format(testid, singlestr, multistr)
multistr)
setattr(WRFLatLonTest, test_name, test_ll_func) setattr(WRFLatLonTest, test_name, test_ll_func)
ut.main() ut.main()
Loading…
Cancel
Save