Bill Ladwig 6 years ago
parent
commit
3c03cd2916
  1. 32
      test/cachetest.py
  2. 528
      test/comp_utest.py

32
test/cachetest.py

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
from __future__ import (absolute_import, division, print_function,
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from threading import Thread
@ -9,7 +9,7 @@ except ImportError: @@ -9,7 +9,7 @@ except ImportError:
from collections import OrderedDict
import unittest as ut
import numpy.testing as nt
import numpy.testing as nt
from wrf.cache import cache_item, get_cached_item, _get_cache
from wrf.config import get_cache_size
@ -20,49 +20,49 @@ class TestThread(Thread): @@ -20,49 +20,49 @@ class TestThread(Thread):
self.num = num
self.q = q
super(TestThread, self).__init__()
def run(self):
for i in range(get_cache_size() + 10):
key = "A" + str(i)
cache_item(key, "test", i * self.num)
item = get_cached_item(key, "test")
if item != i * self.num:
raise RuntimeError("cache is bogus")
cache = OrderedDict(_get_cache())
self.q.put(cache)
class CacheTest(ut.TestCase):
longMessage = True
def test_thread_local(self):
q1 = Queue()
q2 = Queue()
thread1 = TestThread(2, q1)
thread2 = TestThread(40, q2)
thread1.start()
thread2.start()
result1 = q1.get(True, 1)
result2 = q2.get(True, 1)
thread1.join()
thread2.join()
print(result1)
print(result2)
# Result 1 and 2 shoudl be different
self.assertNotEqual(result1, result2)
# This thread should have no cache
self.assertIsNone(_get_cache())
if __name__ == "__main__":
ut.main()

528
test/comp_utest.py

@ -1,10 +1,11 @@ @@ -1,10 +1,11 @@
from math import fabs, log, tan, sin, cos
import unittest as ut
import numpy.testing as nt
import numpy.testing as nt
import numpy as np
import numpy.ma as ma
import os, sys
import os
import sys
import subprocess
from netCDF4 import Dataset as nc
@ -21,37 +22,37 @@ NCGROUP = [NCFILE, NCFILE, NCFILE] @@ -21,37 +22,37 @@ NCGROUP = [NCFILE, NCFILE, NCFILE]
if sys.version_info > (3,):
xrange = range
ROUTINE_MAP = {"avo": avo,
"eth": eth,
"cape_2d": cape_2d,
"cape_3d": cape_3d,
"ctt": ctt,
"dbz": dbz,
"helicity": srhel,
"omg": omega,
"pvo": pvo,
"pw": pw,
"rh": rh,
"slp": slp,
"td": td,
"tk": tk,
"tv": tvirtual,
"twb": wetbulb,
"updraft_helicity": udhel,
"uvmet": uvmet,
"cloudfrac": cloudfrac}
ROUTINE_MAP = {"avo" : avo,
"eth" : eth,
"cape_2d" : cape_2d,
"cape_3d" : cape_3d,
"ctt" : ctt,
"dbz" : dbz,
"helicity" : srhel,
"omg" : omega,
"pvo" : pvo,
"pw" : pw,
"rh" : rh,
"slp" : slp,
"td" : td,
"tk" : tk,
"tv" : tvirtual,
"twb" : wetbulb,
"updraft_helicity" : udhel,
"uvmet" : uvmet,
"cloudfrac" : cloudfrac}
class ProjectionError(RuntimeError):
pass
def get_args(varname, wrfnc, timeidx, method, squeeze):
if varname == "avo":
ncvars = extract_vars(wrfnc, timeidx, ("U", "V", "MAPFAC_U",
"MAPFAC_V", "MAPFAC_M",
"F"),
method, squeeze, cache=None, meta=True)
varnames = ("U", "V", "MAPFAC_U", "MAPFAC_V", "MAPFAC_M", "F")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
attrs = extract_global_attrs(wrfnc, attrs=("DX", "DY"))
u = ncvars["U"]
v = ncvars["V"]
@ -59,20 +60,20 @@ def get_args(varname, wrfnc, timeidx, method, squeeze): @@ -59,20 +60,20 @@ def get_args(varname, wrfnc, timeidx, method, squeeze):
msfv = ncvars["MAPFAC_V"]
msfm = ncvars["MAPFAC_M"]
cor = ncvars["F"]
dx = attrs["DX"]
dy = attrs["DY"]
return (u, v, msfu, msfv, msfm, cor, dx, dy)
if varname == "pvo":
ncvars = extract_vars(wrfnc, timeidx, ("U", "V", "T", "P",
"PB", "MAPFAC_U",
"MAPFAC_V", "MAPFAC_M",
"F"),
method, squeeze, cache=None, meta=True)
varnames = ("U", "V", "T", "P", "PB", "MAPFAC_U", "MAPFAC_V",
"MAPFAC_M", "F")
ncvars = extract_vars(wrfnc, timeidx,
varnames,
method, squeeze, cache=None, meta=True)
attrs = extract_global_attrs(wrfnc, attrs=("DX", "DY"))
u = ncvars["U"]
v = ncvars["V"]
t = ncvars["T"]
@ -82,35 +83,35 @@ def get_args(varname, wrfnc, timeidx, method, squeeze): @@ -82,35 +83,35 @@ def get_args(varname, wrfnc, timeidx, method, squeeze):
msfv = ncvars["MAPFAC_V"]
msfm = ncvars["MAPFAC_M"]
cor = ncvars["F"]
dx = attrs["DX"]
dy = attrs["DY"]
full_t = t + 300
full_p = p + pb
return (u, v, full_t, full_p, msfu, msfv, msfm, cor, dx, dy)
if varname == "eth":
varnames=("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
qv = ncvars["QVAPOR"]
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
return (qv, tkel, full_p)
if varname == "cape_2d":
varnames = ("T", "P", "PB", "QVAPOR", "PH","PHB", "HGT", "PSFC")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "QVAPOR", "PH", "PHB", "HGT", "PSFC")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
@ -119,27 +120,27 @@ def get_args(varname, wrfnc, timeidx, method, squeeze): @@ -119,27 +120,27 @@ def get_args(varname, wrfnc, timeidx, method, squeeze):
phb = ncvars["PHB"]
ter = ncvars["HGT"]
psfc = ncvars["PSFC"]
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
geopt = ph + phb
geopt_unstag = destagger(geopt, -3)
z = geopt_unstag/Constants.G
# Convert pressure to hPa
p_hpa = ConversionFactors.PA_TO_HPA * full_p
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
i3dflag = 0
ter_follow = 1
return (p_hpa, tkel, qv, z, ter, psfc_hpa, ter_follow)
if varname == "cape_3d":
varnames = ("T", "P", "PB", "QVAPOR", "PH", "PHB", "HGT", "PSFC")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
@ -149,27 +150,27 @@ def get_args(varname, wrfnc, timeidx, method, squeeze): @@ -149,27 +150,27 @@ def get_args(varname, wrfnc, timeidx, method, squeeze):
phb = ncvars["PHB"]
ter = ncvars["HGT"]
psfc = ncvars["PSFC"]
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
geopt = ph + phb
geopt_unstag = destagger(geopt, -3)
z = geopt_unstag/Constants.G
# Convert pressure to hPa
p_hpa = ConversionFactors.PA_TO_HPA * full_p
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
i3dflag = 1
ter_follow = 1
return (p_hpa, tkel, qv, z, ter, psfc_hpa, ter_follow)
if varname == "ctt":
varnames = ("T", "P", "PB", "PH", "PHB", "HGT", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
@ -177,382 +178,389 @@ def get_args(varname, wrfnc, timeidx, method, squeeze): @@ -177,382 +178,389 @@ def get_args(varname, wrfnc, timeidx, method, squeeze):
ph = ncvars["PH"]
phb = ncvars["PHB"]
ter = ncvars["HGT"]
qv = ncvars["QVAPOR"] * 1000.0 # g/kg
qv = ncvars["QVAPOR"] * 1000.0 # g/kg
haveqci = 1
try:
icevars = extract_vars(wrfnc, timeidx, "QICE",
icevars = extract_vars(wrfnc, timeidx, "QICE",
method, squeeze, cache=None, meta=False)
except KeyError:
qice = np.zeros(qv.shape, qv.dtype)
haveqci = 0
else:
qice = icevars["QICE"] * 1000.0 #g/kg
qice = icevars["QICE"] * 1000.0 # g/kg
try:
cldvars = extract_vars(wrfnc, timeidx, "QCLOUD",
cldvars = extract_vars(wrfnc, timeidx, "QCLOUD",
method, squeeze, cache=None, meta=False)
except KeyError:
raise RuntimeError("'QCLOUD' not found in NetCDF file")
else:
qcld = cldvars["QCLOUD"] * 1000.0 #g/kg
qcld = cldvars["QCLOUD"] * 1000.0 # g/kg
full_p = p + pb
p_hpa = full_p * ConversionFactors.PA_TO_HPA
full_t = t + Constants.T_BASE
tkel = tk(full_p, full_t, meta=False)
geopt = ph + phb
geopt_unstag = destagger(geopt, -3)
ght = geopt_unstag / Constants.G
return (p_hpa, tkel, qv, qcld, ght, ter, qice)
if varname == "dbz":
varnames = ("T", "P", "PB", "QVAPOR", "QRAIN")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
qv = ncvars["QVAPOR"]
qr = ncvars["QRAIN"]
try:
snowvars = extract_vars(wrfnc, timeidx, "QSNOW",
snowvars = extract_vars(wrfnc, timeidx, "QSNOW",
method, squeeze, cache=None, meta=False)
except KeyError:
qs = np.zeros(qv.shape, qv.dtype)
else:
qs = snowvars["QSNOW"]
try:
graupvars = extract_vars(wrfnc, timeidx, "QGRAUP",
graupvars = extract_vars(wrfnc, timeidx, "QGRAUP",
method, squeeze, cache=None, meta=False)
except KeyError:
qg = np.zeros(qv.shape, qv.dtype)
else:
qg = graupvars["QGRAUP"]
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
return (full_p, tkel, qv, qr, qs, qg)
if varname == "helicity":
# Top can either be 3000 or 1000 (for 0-1 srh or 0-3 srh)
ncvars = extract_vars(wrfnc, timeidx, ("HGT", "PH", "PHB"),
method, squeeze, cache=None, meta=True)
ter = ncvars["HGT"]
ph = ncvars["PH"]
phb = ncvars["PHB"]
# As coded in NCL, but not sure this is possible
varname = "U"
u_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
u_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
cache=None, meta=False)
u = destagger(u_vars[varname], -1)
u = destagger(u_vars[varname], -1)
varname = "V"
v_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
v_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
cache=None, meta=False)
v = destagger(v_vars[varname], -2)
geopt = ph + phb
geopt_unstag = destagger(geopt, -3)
z = geopt_unstag / Constants.G
return (u, v, z, ter)
if varname == "updraft_helicity":
ncvars = extract_vars(wrfnc, timeidx, ("W", "PH", "PHB", "MAPFAC_M"),
method, squeeze, cache=None, meta=True)
varnames = ("W", "PH", "PHB", "MAPFAC_M")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
wstag = ncvars["W"]
ph = ncvars["PH"]
phb = ncvars["PHB"]
mapfct = ncvars["MAPFAC_M"]
attrs = extract_global_attrs(wrfnc, attrs=("DX", "DY"))
attrs = extract_global_attrs(wrfnc, attrs=("DX", "DY"))
dx = attrs["DX"]
dy = attrs["DY"]
# As coded in NCL, but not sure this is possible
varname = "U"
u_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
u_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
cache=None, meta=True)
u = destagger(u_vars[varname], -1, meta=True)
u = destagger(u_vars[varname], -1, meta=True)
varname = "V"
v_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
v_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
cache=None, meta=True)
v = destagger(v_vars[varname], -2, meta=True)
v = destagger(v_vars[varname], -2, meta=True)
zstag = ph + phb
return (zstag, mapfct, u, v, wstag, dx, dy)
if varname == "omg":
varnames=("T", "P", "W", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "W", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
w = ncvars["W"]
pb = ncvars["PB"]
qv = ncvars["QVAPOR"]
wa = destagger(w, -3)
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
return (qv, tkel, wa, full_p)
if varname == "pw":
varnames=("T", "P", "PB", "PH", "PHB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "PH", "PHB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
ph = ncvars["PH"]
phb = ncvars["PHB"]
qv = ncvars["QVAPOR"]
# Change this to use real virtual temperature!
full_p = p + pb
ht = (ph + phb)/Constants.G
full_t = t + Constants.T_BASE
full_t = t + Constants.T_BASE
tkel = tk(full_p, full_t, meta=False)
return (full_p, tkel, qv, ht)
if varname == "rh":
varnames=("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
qvapor = to_np(ncvars["QVAPOR"])
full_t = t + Constants.T_BASE
full_p = p + pb
qvapor[qvapor < 0] = 0
tkel = tk(full_p, full_t, meta=False)
return (qvapor, full_p, tkel)
if varname == "slp":
varnames=("T", "P", "PB", "QVAPOR", "PH", "PHB")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "QVAPOR", "PH", "PHB")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
qvapor = to_np(ncvars["QVAPOR"])
ph = ncvars["PH"]
phb = ncvars["PHB"]
full_t = t + Constants.T_BASE
full_p = p + pb
qvapor[qvapor < 0] = 0.
full_ph = (ph + phb) / Constants.G
destag_ph = destagger(full_ph, -3)
tkel = tk(full_p, full_t, meta=False)
return (destag_ph, tkel, full_p, qvapor)
if varname == "td":
varnames=("P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
p = ncvars["P"]
pb = ncvars["PB"]
qvapor = to_np(ncvars["QVAPOR"])
# Algorithm requires hPa
full_p = .01*(p + pb)
qvapor[qvapor < 0] = 0
return (full_p, qvapor)
if varname == "tk":
varnames=("T", "P", "PB")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
full_t = t + Constants.T_BASE
full_p = p + pb
return (full_p, full_t)
if varname == "tv":
varnames=("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
qv = ncvars["QVAPOR"]
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t)
return (tkel, qv)
if varname == "twb":
varnames=("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
varnames = ("T", "P", "PB", "QVAPOR")
ncvars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
qv = ncvars["QVAPOR"]
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t)
return (full_p, tkel, qv)
if varname == "uvmet":
varname = "U"
u_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
u_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
cache=None, meta=True)
u = destagger(u_vars[varname], -1, meta=True)
varname = "V"
v_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
v_vars = extract_vars(wrfnc, timeidx, varname, method, squeeze,
cache=None, meta=True)
v = destagger(v_vars[varname], -2, meta=True)
map_proj_attrs = extract_global_attrs(wrfnc, attrs="MAP_PROJ")
map_proj = map_proj_attrs["MAP_PROJ"]
if map_proj in (0,3,6):
if map_proj in (0, 3, 6):
raise ProjectionError("Map projection does not need rotation")
elif map_proj in (1,2):
elif map_proj in (1, 2):
lat_attrs = extract_global_attrs(wrfnc, attrs=("TRUELAT1",
"TRUELAT2"))
radians_per_degree = Constants.PI/180.0
# Rotation needed for Lambert and Polar Stereographic
true_lat1 = lat_attrs["TRUELAT1"]
true_lat2 = lat_attrs["TRUELAT2"]
try:
lon_attrs = extract_global_attrs(wrfnc, attrs="STAND_LON")
except AttributeError:
try:
cen_lon_attrs = extract_global_attrs(wrfnc, attrs="CEN_LON")
cen_lon_attrs = extract_global_attrs(wrfnc,
attrs="CEN_LON")
except AttributeError:
raise RuntimeError("longitude attributes not found in NetCDF")
raise RuntimeError("longitude attributes not found in "
"NetCDF")
else:
cen_lon = cen_lon_attrs["CEN_LON"]
else:
cen_lon = lon_attrs["STAND_LON"]
varname = "XLAT"
xlat_var = extract_vars(wrfnc, timeidx, varname,
xlat_var = extract_vars(wrfnc, timeidx, varname,
method, squeeze, cache=None, meta=True)
lat = xlat_var[varname]
varname = "XLONG"
xlon_var = extract_vars(wrfnc, timeidx, varname,
xlon_var = extract_vars(wrfnc, timeidx, varname,
method, squeeze, cache=None, meta=True)
lon = xlon_var[varname]
if map_proj == 1:
if((fabs(true_lat1 - true_lat2) > 0.1) and
(fabs(true_lat2 - 90.) > 0.1)):
cone = (log(cos(true_lat1*radians_per_degree))
- log(cos(true_lat2*radians_per_degree)))
cone = (cone /
(log(tan((45.-fabs(true_lat1/2.))*radians_per_degree))
- log(tan((45.-fabs(true_lat2/2.))*radians_per_degree))))
(fabs(true_lat2 - 90.) > 0.1)):
cone = (log(cos(true_lat1 * radians_per_degree))
- log(cos(true_lat2 * radians_per_degree)))
cone = (cone /
(log(tan((45.-fabs(true_lat1/2.)) *
radians_per_degree))
- log(tan((45.-fabs(true_lat2/2.)) *
radians_per_degree))))
else:
cone = sin(fabs(true_lat1)*radians_per_degree)
cone = sin(fabs(true_lat1) * radians_per_degree)
else:
cone = 1
return (u, v, lat, lon, cen_lon, cone)
if varname == "cloudfrac":
from wrf.g_geoht import get_height
vars = extract_vars(wrfnc, timeidx, ("P", "PB", "QVAPOR", "T"),
method, squeeze, cache=None, meta=True)
varnames = ("P", "PB", "QVAPOR", "T")
vars = extract_vars(wrfnc, timeidx, varnames, method, squeeze,
cache=None, meta=True)
p = vars["P"]
pb = vars["PB"]
qv = vars["QVAPOR"]
t = vars["T"]
geoht_agl = get_height(wrfnc, timeidx, method, squeeze,
geoht_agl = get_height(wrfnc, timeidx, method, squeeze,
cache=None, meta=True, msl=False)
full_p = p + pb
full_t = t + Constants.T_BASE
tkel = tk(full_p, full_t)
relh = rh(qv, full_p, tkel)
return (geoht_agl, relh, 1, 300., 2000., 6000.)
class WRFVarsTest(ut.TestCase):
longMessage = True
def make_func(varname, wrfnc, timeidx, method, squeeze, meta):
def func(self):
try:
args = get_args(varname, wrfnc, timeidx, method, squeeze)
except ProjectionError: # Don't fail for this
except ProjectionError: # Don't fail for this
return
routine = ROUTINE_MAP[varname]
kwargs = {"meta" : meta}
kwargs = {"meta": meta}
result = routine(*args, **kwargs)
ref = getvar(wrfnc, varname, timeidx, method, squeeze, cache=None,
ref = getvar(wrfnc, varname, timeidx, method, squeeze, cache=None,
meta=meta)
nt.assert_allclose(to_np(result), to_np(ref))
if meta:
self.assertEqual(result.dims, ref.dims)
return func
def test_cape3d_1d(wrfnc):
def func(self):
varnames = ("T", "P", "PB", "QVAPOR", "PH", "PHB", "HGT", "PSFC")
ncvars = extract_vars(wrfnc, 0, varnames, method="cat", squeeze=True,
ncvars = extract_vars(wrfnc, 0, varnames, method="cat", squeeze=True,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
@ -561,9 +569,9 @@ def test_cape3d_1d(wrfnc): @@ -561,9 +569,9 @@ def test_cape3d_1d(wrfnc):
phb = ncvars["PHB"]
ter = ncvars["HGT"]
psfc = ncvars["PSFC"]
col_idxs = (slice(None), t.shape[-2]//2, t.shape[-1]//2)
t = t[col_idxs]
p = p[col_idxs]
pb = pb[col_idxs]
@ -572,40 +580,40 @@ def test_cape3d_1d(wrfnc): @@ -572,40 +580,40 @@ def test_cape3d_1d(wrfnc):
phb = phb[col_idxs]
ter = float(ter[col_idxs[1:]])
psfc = float(psfc[col_idxs[1:]])
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
geopt = ph + phb
geopt_unstag = destagger(geopt, -1)
z = geopt_unstag/Constants.G
# Convert pressure to hPa
p_hpa = ConversionFactors.PA_TO_HPA * full_p
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
i3dflag = 1
ter_follow = 1
result = cape_3d(p_hpa, tkel, qv, z, ter, psfc_hpa, ter_follow)
ref = getvar(wrfnc, "cape_3d")
ref = ref[(slice(None),) + col_idxs]
nt.assert_allclose(to_np(result), to_np(ref))
return func
def test_cape2d_1d(wrfnc):
def func(self):
varnames = ("T", "P", "PB", "QVAPOR", "PH", "PHB", "HGT", "PSFC")
ncvars = extract_vars(wrfnc, 0, varnames, method="cat", squeeze=True,
ncvars = extract_vars(wrfnc, 0, varnames, method="cat", squeeze=True,
cache=None, meta=True)
t = ncvars["T"]
p = ncvars["P"]
pb = ncvars["PB"]
@ -614,9 +622,9 @@ def test_cape2d_1d(wrfnc): @@ -614,9 +622,9 @@ def test_cape2d_1d(wrfnc):
phb = ncvars["PHB"]
ter = ncvars["HGT"]
psfc = ncvars["PSFC"]
col_idxs = (slice(None), t.shape[-2]//2, t.shape[-1]//2)
t = t[col_idxs]
p = p[col_idxs]
pb = pb[col_idxs]
@ -625,82 +633,76 @@ def test_cape2d_1d(wrfnc): @@ -625,82 +633,76 @@ def test_cape2d_1d(wrfnc):
phb = phb[col_idxs]
ter = float(ter[col_idxs[1:]])
psfc = float(psfc[col_idxs[1:]])
full_t = t + Constants.T_BASE
full_p = p + pb
tkel = tk(full_p, full_t, meta=False)
geopt = ph + phb
geopt_unstag = destagger(geopt, -1)
z = geopt_unstag/Constants.G
# Convert pressure to hPa
p_hpa = ConversionFactors.PA_TO_HPA * full_p
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
psfc_hpa = ConversionFactors.PA_TO_HPA * psfc
i3dflag = 0
ter_follow = 1
result = cape_2d(p_hpa, tkel, qv, z, ter, psfc_hpa, ter_follow)
ref = getvar(wrfnc, "cape_2d")
ref = ref[(slice(None),) + col_idxs[1:]]
nt.assert_allclose(to_np(result), to_np(ref))
return func
if __name__ == "__main__":
from wrf import (omp_set_num_threads, omp_set_schedule, omp_get_schedule,
from wrf import (omp_set_num_threads, omp_set_schedule, omp_get_schedule,
omp_set_dynamic, omp_get_num_procs, OMP_SCHED_STATIC)
omp_set_num_threads(omp_get_num_procs()//2)
omp_set_schedule(OMP_SCHED_STATIC, 0)
omp_set_dynamic(False)
varnames = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz",
"geopt", "helicity", "lat", "lon", "omg", "p", "pressure",
"pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc",
"theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va",
"wa", "uvmet10", "uvmet", "z", "cloudfrac"]
#varnames = ["helicity"]
varnames=["avo", "pvo", "eth", "dbz", "helicity", "updraft_helicity",
"omg", "pw", "rh", "slp", "td", "tk", "tv", "twb", "uvmet",
"cloudfrac", "ctt"]
varnames = ["avo", "eth", "cape_2d", "cape_3d", "ctt", "dbz", "mdbz",
"geopt", "helicity", "lat", "lon", "omg", "p", "pressure",
"pvo", "pw", "rh2", "rh", "slp", "ter", "td2", "td", "tc",
"theta", "tk", "tv", "twb", "updraft_helicity", "ua", "va",
"wa", "uvmet10", "uvmet", "z", "cloudfrac"]
omp_set_num_threads(omp_get_num_procs()-1)
omp_set_schedule(OMP_SCHED_STATIC, 0)
omp_set_dynamic(False)
# Turn this one off when not needed, since it's slow
varnames += ["cape_2d", "cape_3d"]
for varname in varnames:
for i,wrfnc in enumerate((NCFILE, NCGROUP)):
for j,timeidx in enumerate((0, ALL_TIMES)):
for i, wrfnc in enumerate((NCFILE, NCGROUP)):
for j, timeidx in enumerate((0, ALL_TIMES)):
for method in ("cat", "join"):
for squeeze in (True, False):
for meta in (True, False):
func = make_func(varname, wrfnc, timeidx, method,
squeeze, meta)
func = make_func(varname, wrfnc, timeidx, method,
squeeze, meta)
ncname = "single" if i == 0 else "multi"
timename = "t0" if j == 0 else "all"
squeeze_name = "squeeze" if squeeze else "nosqueeze"
squeeze_name = ("squeeze" if squeeze
else "nosqueeze")
meta_name = "meta" if meta else "nometa"
test_name = "test_{}_{}_{}_{}_{}_{}".format(varname,
ncname, timename, method,
squeeze_name, meta_name)
test_name = "test_{}_{}_{}_{}_{}_{}".format(
varname, ncname, timename, method,
squeeze_name, meta_name)
setattr(WRFVarsTest, test_name, func)
func = test_cape3d_1d(wrfnc)
setattr(WRFVarsTest, "test_cape3d_1d", func)
func = test_cape2d_1d(wrfnc)
setattr(WRFVarsTest, "test_cape2d_1d", func)
ut.main()
Loading…
Cancel
Save