forked from 3rdparty/wrf-python
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
7.4 KiB
216 lines
7.4 KiB
import unittest as ut |
|
import numpy.testing as nt |
|
import numpy as n |
|
import numpy.ma as ma |
|
import os, sys |
|
import subprocess |
|
|
|
from wrf.var import getvar |
|
|
|
NCL_EXE = "/Users/ladwig/nclbuild/6.3.0/bin/ncl" |
|
TEST_FILE = "/Users/ladwig/Documents/wrf_files/wrfout_d01_2010-06-13_21:00:00" |
|
OUT_NC_FILE = "/tmp/wrftest.nc" |
|
|
|
def setUpModule(): |
|
ncarg_root = os.environ.get("NCARG_ROOT", None) |
|
if ncarg_root is None: |
|
raise RuntimeError("$NCARG_ROOT environment variable not set") |
|
|
|
|
|
this_path = os.path.realpath(__file__) |
|
ncl_script = os.path.join(os.path.dirname(this_path), |
|
"ncl_get_var.ncl") |
|
ncfile = TEST_FILE + ".nc" # NCL requires extension |
|
|
|
# This needs to be set when PyNIO is installed, since PyNIOs data does |
|
# not contain the dat file for the CAPE calcluations |
|
os.environ["NCARG_NCARG"] = os.path.join(os.environ["NCARG_ROOT"], |
|
"lib", "ncarg") |
|
cmd = "%s %s 'in_file=\"%s\"' 'out_file=\"%s\"'" % (NCL_EXE, |
|
ncl_script, |
|
ncfile, |
|
OUT_NC_FILE) |
|
|
|
#print cmd |
|
|
|
if not os.path.exists(OUT_NC_FILE): |
|
status = subprocess.call(cmd, shell=True) |
|
if (status != 0): |
|
raise RuntimeError("NCL script failed. Could not set up test.") |
|
|
|
# Using helpful information at: |
|
# http://eli.thegreenplace.net/2014/04/02/dynamically-generating-python-test-cases |
|
def make_test(varname, wrf_in, referent, multi=False, repeat=3, pynio=False): |
|
def test(self): |
|
try: |
|
from netCDF4 import Dataset as NetCDF |
|
except: |
|
pass |
|
|
|
try: |
|
from PyNIO import Nio |
|
except: |
|
pass |
|
|
|
if not multi: |
|
if not pynio: |
|
in_wrfnc = NetCDF(wrf_in) |
|
else: |
|
# Note: Python doesn't like it if you reassign an outer scoped |
|
# variable (wrf_in in this case) |
|
if not wrf_in.endswith(".nc"): |
|
wrf_file = wrf_in + ".nc" |
|
else: |
|
wrf_file = wrf_in |
|
in_wrfnc = Nio.open_file(wrf_file) |
|
else: |
|
if not pynio: |
|
nc = NetCDF(wrf_in) |
|
in_wrfnc = [nc for i in xrange(repeat)] |
|
else: |
|
if not wrf_in.endswith(".nc"): |
|
wrf_file = wrf_in + ".nc" |
|
else: |
|
wrf_file = wrf_in |
|
nc = Nio.open_file(wrf_file) |
|
in_wrfnc = [nc for i in xrange(repeat)] |
|
|
|
refnc = NetCDF(referent) |
|
|
|
if not multi: |
|
ref_vals = refnc.variables[varname][:] |
|
else: |
|
data = refnc.variables[varname][:] |
|
new_dims = [repeat] + [x for x in data.shape] |
|
masked=False |
|
if (isinstance(data, ma.core.MaskedArray)): |
|
masked=True |
|
|
|
if not masked: |
|
ref_vals = n.zeros(new_dims, data.dtype) |
|
else: |
|
ref_vals = ma.asarray(n.zeros(new_dims, data.dtype)) |
|
|
|
for i in xrange(repeat): |
|
ref_vals[i,:] = data[:] |
|
|
|
if masked: |
|
ref_vals.mask[i,:] = data.mask[:] |
|
|
|
if (varname == "tc"): |
|
my_vals = getvar(in_wrfnc, "temp", units="c") |
|
tol = 0 |
|
atol = .1 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "tk"): |
|
my_vals = getvar(in_wrfnc, "temp", units="k") |
|
tol = 0 |
|
atol = .1 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "td"): |
|
my_vals = getvar(in_wrfnc, "td", units="c") |
|
tol = 0 |
|
atol = .1 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "pressure"): |
|
my_vals = getvar(in_wrfnc, varname, units="hpa") |
|
tol = 1/100. |
|
atol = 0 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "p"): |
|
my_vals = getvar(in_wrfnc, varname, units="pa") |
|
tol = 1/100. |
|
atol = 0 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "slp"): |
|
my_vals = getvar(in_wrfnc, varname, units="hpa") |
|
tol = 2/100. |
|
atol = 0 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
|
|
elif (varname == "uvmet"): |
|
my_vals = getvar(in_wrfnc, varname) |
|
tol = 1/100. |
|
atol = .5 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "uvmet10"): |
|
my_vals = getvar(in_wrfnc, varname) |
|
tol = 1/100. |
|
atol = .5 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
|
|
elif (varname == "omg"): |
|
my_vals = getvar(in_wrfnc, varname) |
|
tol = 2/100. |
|
atol = 0 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "ctt"): |
|
my_vals = getvar(in_wrfnc, varname) |
|
tol = 1/100. |
|
atol = 0 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
elif (varname == "cape_2d"): |
|
cape_2d = getvar(in_wrfnc, varname) |
|
tol = 1/100. |
|
atol = 0 |
|
nt.assert_allclose(cape_2d, ref_vals, tol, atol) |
|
elif (varname == "cape_3d"): |
|
cape_3d = getvar(in_wrfnc, varname) |
|
tol = 1/100. |
|
atol = 0 |
|
|
|
nt.assert_allclose(cape_3d, ref_vals, tol, atol) |
|
|
|
else: |
|
my_vals = getvar(in_wrfnc, varname) |
|
tol = 1/100. |
|
atol = 0 |
|
nt.assert_allclose(my_vals, ref_vals, tol, atol) |
|
|
|
|
|
return test |
|
|
|
class WRFVarsTest(ut.TestCase): |
|
longMessage = True |
|
|
|
|
|
if __name__ == "__main__": |
|
ignore_vars = [] # Not testable yet |
|
wrf_vars = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz", |
|
"geopt", "helicity", "lat", "lon", "omg", "p", "pressure", |
|
"pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc", |
|
"theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va", |
|
"wa", "uvmet10", "uvmet", "z", "ctt", "cape_2d", "cape_3d"] |
|
|
|
try: |
|
import netCDF4 |
|
except ImportError: |
|
pass |
|
else: |
|
for var in wrf_vars: |
|
if var in ignore_vars: |
|
continue |
|
|
|
test_func1 = make_test(var, TEST_FILE, OUT_NC_FILE) |
|
test_func2 = make_test(var, TEST_FILE, OUT_NC_FILE, multi=True) |
|
setattr(WRFVarsTest, 'test_{0}'.format(var), test_func1) |
|
setattr(WRFVarsTest, 'test_multi_{0}'.format(var), test_func2) |
|
|
|
try: |
|
import PyNIO |
|
except ImportError: |
|
pass |
|
else: |
|
for var in wrf_vars: |
|
if var in ignore_vars: |
|
continue |
|
|
|
test_func1 = make_test(var, TEST_FILE, OUT_NC_FILE,pynio=True) |
|
test_func2 = make_test(var, TEST_FILE, OUT_NC_FILE, multi=True, |
|
pynio=True) |
|
setattr(WRFVarsTest, 'test_pynio_{0}'.format(var), test_func1) |
|
setattr(WRFVarsTest, 'test_pynio_multi_{0}'.format(var), |
|
test_func2) |
|
|
|
ut.main() |
|
|