import copy
from math import exp, pi, sqrt
from typing import Optional
import cf_xarray as cf
import dask
import matplotlib as mpl
import numpy as np
import xarray as xr
from cartopy import crs as ccrs
from cartopy.util import add_cyclic_point
from matplotlib import pyplot as plt
from scipy import stats as ss
from scipy.ndimage import gaussian_filter
from skimage.util import crop
from xrft import dft
xr.set_options(keep_attrs=True)
[docs]class Datasetcalcs:
"""
This class contains calcs for each point of a dataset after aggregating across one or more dimensions, and a method to access these calcs. Expects a DataArray.
"""
def __init__(
self,
ds: xr.DataArray,
aggregate_dims: list,
time_dim_name: str = 'time',
lat_dim_name: str = None,
lon_dim_name: str = None,
vert_dim_name: str = None,
lat_coord_name: str = None,
lon_coord_name: str = None,
q: float = 0.5,
spre_tol: float = 1.0e-4,
weighted=True,
):
self._ds = ds if (ds.dtype == np.float64) else ds.astype(np.float64)
# For some reason, casting to float64 removes all attrs from the dataset
self._ds.attrs = ds.attrs
if weighted:
if 'cell_measures' not in self._ds.attrs:
self._ds.attrs['cell_measures'] = 'area: cell_area'
# Let's just get all the lat/lon and time
# names from the file if they are None
# lon coord
if lon_coord_name is None:
lon_coord_name = ds.cf.coordinates['longitude'][0]
self._lon_coord_name = lon_coord_name
# lat coord
if lat_coord_name is None:
lat_coord_name = ds.cf.coordinates['latitude'][0]
self._lat_coord_name = lat_coord_name
dd = ds.cf[ds.cf.coordinates['latitude'][0]].dims
ll = len(dd)
if ll == 1:
if lat_dim_name is None:
lat_dim_name = dd[0]
if lon_dim_name is None:
lon_dim_name = ds.cf['longitude'].dims[0]
elif ll == 2:
if lat_dim_name is None:
lat_dim_name = dd[0]
if lon_dim_name is None:
lon_dim_name = dd[1]
self._latlon_dims = ll
self._lat_dim_name = lat_dim_name
self._lon_dim_name = lon_dim_name
# vertical dimension?
if vert_dim_name is None:
vert = 'vertical' in ds.cf
if vert:
vert_dim_name = ds.cf['vertical'].name
self._vert_dim_name = vert_dim_name
# time dimension TO DO: check this (after cf_xarray update)
self._time_dim_name = time_dim_name
self._quantile = q
self._spre_tol = spre_tol
self._agg_dims = aggregate_dims
self._frame_size = 1
# array calcs
self._weighted = weighted
self._ns_con_var = None
self._ew_con_var = None
self._mean = None
self._mean_abs = None
self._std = None
self._ddof = 1
self._num_positive = None
self._num_negative = None
self._num_zero = None
self._prob_positive = None
self._odds_positive = None
self._prob_negative = None
self._zscore = None
self._mae_day_max = None
self._lag1 = None
self._lag1_first_difference = None
self._quantile_value = None
self._mean_squared = None
self._root_mean_squared = None
self._sum = None
self._sum_squared = None
self._variance = None
self._pooled_variance = None
self._pooled_variance_ratio = None
self._standardized_mean = None
self._max_abs = None
self._min_abs = None
self._d_range = None
self._min_val = None
self._max_val = None
self._grouping = None
self._annual_harmonic_relative_ratio = None
# single value calcs
self._zscore_cutoff = None
self._zscore_percent_significant = None
# for probability functions, what is the size
if aggregate_dims is not None:
for dim in aggregate_dims:
self._frame_size *= int(self._ds.sizes[dim])
else:
dp = np.count_nonzero(~np.isnan(self._ds))
self._frame_size = dp
# print(self._frame_size)
def _is_memoized(self, calc_name: str) -> bool:
return hasattr(self, calc_name) and (self.__getattribute__(calc_name) is not None)
def _con_var(self, dir, dataset) -> xr.DataArray:
if dir == 'ns':
tt = dataset.diff(self._lat_dim_name, 1)
elif dir == 'ew':
ds_h = xr.concat(
[
dataset,
dataset.head({self._lon_dim_name: 1}),
],
dim=self._lon_dim_name,
)
tt = ds_h.diff(self._lon_dim_name, 1)
con_var = np.square(tt)
return con_var
@property
def pooled_variance(self) -> xr.DataArray:
"""
The overall variance of the dataset
"""
if not self._is_memoized('_pooled_variance_mean'):
self._pooled_variance = self._ds.var(self._agg_dims)
self._pooled_variance.attrs['cell_measures'] = self._ds.attrs['cell_measures']
if self._weighted:
self._pooled_variance_mean = self._pooled_variance.cf.weighted('area').mean()
else:
self._pooled_variance_mean = self._pooled_variance.mean()
self._pooled_variance_mean.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._pooled_variance_mean.attrs['units'] = f'{self._ds.units}$^2$'
return self._pooled_variance_mean
@property
def ns_con_var(self) -> xr.DataArray:
"""
The North-South Contrast Variance averaged along the aggregate dimensions
"""
if not self._is_memoized('_ns_con_var_mean'):
self._ns_con_var = self._con_var('ns', self._ds)
if self._weighted:
self._ns_con_var.attrs['cell_measures'] = self._ds.attrs['cell_measures']
adims = self._agg_dims
if adims is None:
self._ns_con_var_mean = self._ns_con_var.cf.weighted('area').mean(skipna=True)
else:
self._ns_con_var_mean = self._ns_con_var.cf.weighted('area').mean(
dim=adims, skipna=True
)
else:
self._ns_con_var_mean = self._ns_con_var.mean(self._agg_dims)
self._ns_con_var_mean.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._ns_con_var_mean.attrs['units'] = f'{self._ds.units}$^2$'
return self._ns_con_var_mean
@property
def ew_con_var(self) -> xr.DataArray:
"""
The East-West Contrast Variance averaged along the aggregate dimensions
"""
if not self._is_memoized('_ew_con_var'):
self._ew_con_var = self._con_var('ew', self._ds)
if self._weighted:
self._ew_con_var.attrs['cell_measures'] = self._ds.attrs['cell_measures']
adims = self._agg_dims
if adims is None:
self._ew_con_var_mean = self._ew_con_var.cf.weighted('area').mean(skipna=True)
else:
self._ew_con_var_mean = self._ew_con_var.cf.weighted('area').mean(
dim=adims, skipna=True
)
else:
self._ew_con_var_mean = self._ew_con_var.mean(self._agg_dims)
self._ew_con_var_mean.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._ew_con_var_mean.attrs['units'] = f'{self._ds.units}$^2$'
return self._ew_con_var_mean
@property
def mean(self) -> xr.DataArray:
"""
The mean along the aggregate dimensions
"""
if not self._is_memoized('_mean'):
if self._weighted:
adims = self._agg_dims
if adims is None:
self._mean = self._ds.cf.weighted('area').mean(skipna=True)
else:
self._mean = self._ds.cf.weighted('area').mean(dim=adims, skipna=True)
else:
self._mean = self._ds.mean(self._agg_dims, skipna=True)
self._mean.attrs = self._ds.attrs
return self._mean
@property
def mean_abs(self) -> xr.DataArray:
"""
The mean of the absolute errors along the aggregate dimensions
"""
if not self._is_memoized('_mean_abs'):
if self._weighted:
adims = self._agg_dims
if adims is None:
self._mean_abs = abs(self._ds).cf.weighted('area').mean(skipna=True)
else:
self._mean_abs = abs(self._ds).cf.weighted('area').mean(dim=adims, skipna=True)
else:
self._mean_abs = abs(self._ds).mean(self._agg_dims, skipna=True)
self._mean_abs.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._mean_abs.attrs['units'] = f'{self._ds.units}'
return self._mean_abs
@property
def mean_squared(self) -> xr.DataArray:
"""
The absolute value of the mean along the aggregate dimensions
"""
if not self._is_memoized('_mean_squared'):
self._mean_squared = np.square(self.mean)
self.mean_squared.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self.mean_squared.attrs['units'] = f'{self._ds.units}$^2$'
return self._mean_squared
@property
def root_mean_squared(self) -> xr.DataArray:
"""
The absolute value of the mean along the aggregate dimensions
"""
if not self._is_memoized('_root_mean_squared_mean'):
self._squared = np.square(self._ds)
if self._weighted:
adims = self._agg_dims
if adims is None:
self._root_mean_squared = np.sqrt(
self._squared.cf.weighted('area').mean(skipna=True)
)
else:
self._root_mean_squared = np.sqrt(
self._squared.cf.weighted('area').mean(dim=adims, skipna=True)
)
else:
self._root_mean_squared = np.sqrt(self._squared.mean(self._agg_dims, skipna=True))
self._root_mean_squared.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._root_mean_squared.attrs['units'] = f'{self._ds.units}'
return self._root_mean_squared
@property
def sum(self) -> xr.DataArray:
if not self._is_memoized('_sum'):
if self._weighted:
self._sum = self._ds.sum(dim=self._agg_dims, skipna=True)
else:
self._sum = self._ds.sum(dim=self._agg_dims, skipna=True)
self._sum.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._sum.attrs['units'] = f'{self._ds.units}'
return self._sum
@property
def sum_squared(self) -> xr.DataArray:
if not self._is_memoized('_sum_squared'):
self._sum_squared = np.square(self._sum_squared)
self._sum_squared.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._sum_squared.attrs['units'] = f'{self._ds.units}$^2$'
return self._sum_squared
@property
def std(self) -> xr.DataArray:
"""
The standard deviation along the aggregate dimensions
"""
if not self._is_memoized('_std'):
if self._weighted:
adims = self._agg_dims
if self._ddof == 0:
# biased std
if adims is None:
self._std = np.sqrt(
((self._ds - self.mean) ** 2).cf.weighted('area').mean()
)
else:
self._std = np.sqrt(
((self._ds - self.mean) ** 2)
.cf.weighted('area')
.mean(dim=self._agg_dims)
)
elif adims is not None:
if 'lat' in adims:
# assume unbiased std (reliability weighted)
V1 = self._ds.coords['cell_area'].sum(dim='lat')[0]
V2 = np.square(self._ds.coords['cell_area']).sum(dim='lat')[0]
_biased_var = (
((self._ds - self.mean) ** 2).cf.weighted('area').mean(dim=adims)
)
self._std = np.sqrt(_biased_var * (1 / (1 - V2 / (V1 ** 2))))
else:
# same as unweighted
self._std = self._ds.std(adims, ddof=self._ddof, skipna=True)
else:
# same as unweighted
self._std = self._ds.std(adims, ddof=self._ddof, skipna=True)
else:
self._std = self._ds.std(self._agg_dims, ddof=self._ddof, skipna=True)
self._std.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._std.attrs['units'] = ''
return self._std
@property
def standardized_mean(self) -> xr.DataArray:
"""
The mean at each point along the aggregate dimensions divided by the standard deviation
NOTE: will always be 0 if aggregating over all dimensions
"""
if not self._is_memoized('_standardized_mean'):
if self._grouping is None:
if self._weighted:
adims = self._agg_dims
if adims is None:
self._standardized_mean = (
self.mean - self._ds.cf.weighted('area').mean(skipna=True)
) / self.std
else:
self._standardized_mean = (
self.mean - self._ds.cf.weighted('area').mean(dim=adims, skipna=True)
) / self.std
else:
self._standardized_mean = (self.mean - self._ds.mean()) / self._ds.std(ddof=1)
else:
grouped = self.mean.groupby(self._grouping)
grouped_mean = grouped.mean()
self._standardized_mean = (
grouped_mean - grouped_mean.mean(skipna=True)
) / grouped_mean.std(ddof=1)
if hasattr(self._ds, 'units'):
self._standardized_mean.attrs['units'] = ''
return self._standardized_mean
@property
def variance(self) -> xr.DataArray:
"""
The variance along the aggregate dimensions
"""
if not self._is_memoized('_variance'):
if self._weighted:
self._variance = np.square(self.std)
else:
self._variance = self._ds.var(self._agg_dims, skipna=True)
self._variance.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._variance.attrs['units'] = f'{self._ds.units}$^2$'
return self._variance
@property
def pooled_variance_ratio(self) -> xr.DataArray:
"""
The pooled variance along the aggregate dimensions
"""
if not self._is_memoized('_pooled_variance_ratio'):
self._pooled_variance_ratio = self.variance / self.pooled_variance
self._pooled_variance_ratio.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._pooled_variance_ratio.attrs['units'] = ''
return self._pooled_variance_ratio
@property
def num_positive(self) -> xr.DataArray:
"""
The probability that a point is positive
"""
if not self._is_memoized('_num_positive'):
if self._weighted:
self._num_positive = (self._ds > 0).sum(self._agg_dims)
else:
self._num_positive = (self._ds > 0).sum(self._agg_dims)
return self._num_positive
@property
def num_negative(self) -> xr.DataArray:
"""
The probability that a point is negative
"""
if not self._is_memoized('_num_negative'):
if self._weighted:
self._num_negative = (self._ds < 0).sum(self._agg_dims)
else:
self._num_negative = (self._ds < 0).sum(self._agg_dims)
return self._num_negative
@property
def num_zero(self) -> xr.DataArray:
"""
The probability that a point is zero
"""
if not self._is_memoized('_num_zero'):
if self._weighted:
self._num_zero = (self._ds == 0).sum(self._agg_dims)
else:
self._num_zero = (self._ds == 0).sum(self._agg_dims)
return self._num_zero
@property
def prob_positive(self) -> xr.DataArray:
"""
The probability that a point is positive
"""
if not self._is_memoized('_prob_positive'):
self._prob_positive = self.num_positive / self._frame_size
# print(self._frame_size)
self._prob_positive.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._prob_positive.attrs['units'] = ''
return self._prob_positive
@property
def prob_negative(self) -> xr.DataArray:
"""
The probability that a point is negative
"""
if not self._is_memoized('_prob_negative'):
self._prob_negative = self.num_negative / self._frame_size
self._prob_negative.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._prob_negative.attrs['units'] = ''
return self._prob_negative
@property
def odds_positive(self) -> xr.DataArray:
"""
The odds that a point is positive = prob_positive/(1-prob_positive)
"""
if not self._is_memoized('_odds_positive'):
if self._grouping is not None:
grouped = self.prob_positive.groupby(self._grouping)
grouped_mean = grouped.mean()
self._odds_positive = grouped_mean / (1 - grouped_mean)
else:
self._odds_positive = self.prob_positive / (1 - self.prob_positive)
self._odds_positive.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._odds_positive.attrs['units'] = ''
return self._odds_positive
@property
def zscore(self) -> xr.DataArray:
"""
The z-score of a point averaged along the aggregate dimensions under the null hypothesis that the true mean is zero.
NOTE: currently assumes we are aggregating along the time dimension so is only suitable for a spatial plot.
"""
if not self._is_memoized('_zscore'):
self._zscore = np.divide(
self.mean, self.std / np.sqrt(self._ds.sizes[self._time_dim_name])
)
self._zscore.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._zscore.attrs['units'] = ''
return self._zscore
@property
def mae_day_max(self) -> xr.DataArray:
"""
The day of maximum mean absolute value at the point.
NOTE: only available in spatial and spatial comparison plots
"""
if not self._is_memoized('_mae_day_max'):
key = f'{self._time_dim_name}.dayofyear'
self._mae_day_max = 0
self._test = abs(self._ds).groupby(key).mean()
self._mae_day_max = self._test.idxmax(dim='dayofyear')
self._mae_day_max.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._mae_day_max.attrs['units'] = f'{self._ds.units}'
return self._mae_day_max
@property
def quantile(self):
return self._quantile
@quantile.setter
def quantile(self, q):
self._quantile = q
@property
def spre_tol(self):
return self._spre_tol
@spre_tol.setter
def spre_tol(self, t):
self._spre_tol = t
@property
def quantile_value(self) -> xr.DataArray:
self._quantile_value = self._ds.quantile(self.quantile, dim=self._agg_dims)
self._quantile_value.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._quantile_value.attrs['units'] = ''
return self._quantile_value
@property
def max_abs(self) -> xr.DataArray:
if not self._is_memoized('_max_abs'):
self._max_abs = abs(self._ds).max(dim=self._agg_dims)
self._max_abs.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._max_abs.attrs['units'] = f'{self._ds.units}'
return self._max_abs
@property
def min_abs(self) -> xr.DataArray:
if not self._is_memoized('_min_abs'):
self._min_abs = abs(self._ds).min(dim=self._agg_dims)
self._min_abs.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._min_abs.attrs['units'] = f'{self._ds.units}'
return self._min_abs
@property
def max_val(self) -> xr.DataArray:
if not self._is_memoized('_max_val'):
self._max_val = self._ds.max(dim=self._agg_dims)
self._max_val.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._max_val.attrs['units'] = f'{self._ds.units}'
return self._max_val
@property
def min_val(self) -> xr.DataArray:
if not self._is_memoized('_min_val'):
self._min_val = self._ds.min(dim=self._agg_dims)
self._min_val.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._min_val.attrs['units'] = f'{self._ds.units}'
return self._min_val
@property
def dyn_range(self) -> xr.DataArray:
if not self._is_memoized('_range'):
self._dyn_range = abs((self._ds).max() - (self._ds).min())
self._dyn_range.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._dyn_range.attrs['units'] = f'{self._ds.units}'
return self._dyn_range
@property
def lag1(self) -> xr.DataArray:
"""
The deseasonalized lag-1 autocorrelation value by day of year
NOTE: This calc returns an array of spatial values as the data set regardless of aggregate dimensions,
so can only be plotted in a spatial plot.
"""
if not self._is_memoized('_lag1'):
key = f'{self._time_dim_name}.dayofyear'
grouped = self._ds.groupby(key)
self._deseas_resid = grouped - grouped.mean(dim=self._time_dim_name)
time_length = self._deseas_resid.sizes[self._time_dim_name]
current = self._deseas_resid.head({self._time_dim_name: time_length - 1})
next = self._deseas_resid.shift({self._time_dim_name: -1}).head(
{self._time_dim_name: time_length - 1}
)
num = current.dot(next, dims=self._time_dim_name)
denom = current.dot(current, dims=self._time_dim_name)
self._lag1 = num / denom
self._lag1.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._lag1.attrs['units'] = ''
return self._lag1
@property
def lag1_first_difference(self) -> xr.DataArray:
"""
The deseasonalized lag-1 autocorrelation value of the first difference of the data by day of year
NOTE: This calc returns an array of spatial values as the data set regardless of aggregate dimensions,
so can only be plotted in a spatial plot.
"""
if not self._is_memoized('_lag1_first_difference'):
key = f'{self._time_dim_name}.dayofyear'
grouped = self._ds.groupby(key)
self._deseas_resid = grouped - grouped.mean(dim=self._time_dim_name)
time_length = self._deseas_resid.sizes[self._time_dim_name]
current = self._deseas_resid.head({self._time_dim_name: time_length - 1})
next = self._deseas_resid.shift({self._time_dim_name: -1}).head(
{self._time_dim_name: time_length - 1}
)
first_difference = next - current
first_difference_current = first_difference.head({self._time_dim_name: time_length - 1})
first_difference_next = first_difference.shift({self._time_dim_name: -1}).head(
{self._time_dim_name: time_length - 1}
)
num = (first_difference_current * first_difference_next).sum(
dim=[self._time_dim_name], skipna=True
)
denom = first_difference_current.dot(first_difference_current, dims=self._time_dim_name)
self._lag1_first_difference = num / denom
self._lag1_first_difference.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._lag1_first_difference.attrs['units'] = ''
return self._lag1_first_difference
@property
def annual_harmonic_relative_ratio(self) -> xr.DataArray:
"""
The annual harmonic relative to the average periodogram value
in a neighborhood of 50 frequencies around the annual frequency
NOTE: This assumes the values along the "time" dimension are equally spaced.
NOTE: This calc returns a lat-lon array regardless of aggregate dimensions, so can only be used in a spatial plot.
"""
if not self._is_memoized('_annual_harmonic_relative_ratio'):
# drop time coordinate labels or else it will try to parse them as numbers to check spacing and fail
ds_copy = self._ds
new_index = [i for i in range(0, self._ds[self._time_dim_name].size)]
new_ds = ds_copy.assign_coords({self._time_dim_name: new_index})
lon_coord_name = self._lon_coord_name
lat_coord_name = self._lat_coord_name
DF = dft(new_ds, dim=[self._time_dim_name], detrend='constant')
# the above does not preserve the lat/lon attributes
DF[lon_coord_name].attrs = new_ds[lon_coord_name].attrs
DF[lat_coord_name].attrs = new_ds[lat_coord_name].attrs
DF.attrs = new_ds.attrs
S = np.real(DF * np.conj(DF) / self._ds.sizes[self._time_dim_name])
S_annual = S.isel(
freq_time=int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
) # annual power
neighborhood = (
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
- 25,
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
+ 25,
)
S_mean = xr.concat(
[
S.isel(
freq_time=slice(
max(0, neighborhood[0]),
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
- 1,
)
),
S.isel(
freq_time=slice(
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
+ 1,
neighborhood[1],
)
),
],
dim='freq_time',
).mean(dim='freq_time')
ratio = S_annual / S_mean
# ratio.cf.describe()
self._annual_harmonic_relative_ratio = ratio
if hasattr(self._ds, 'units'):
self._annual_harmonic_relative_ratio.attrs['units'] = ''
return self._annual_harmonic_relative_ratio
@property
def zscore_cutoff(self) -> np.ndarray:
"""
The Z-Score cutoff for a point to be considered significant
"""
if not self._is_memoized('_zscore_cutoff'):
pvals = 2 * (1 - ss.norm.cdf(np.abs(self.zscore)))
if isinstance(pvals, np.float64):
pvals_array = np.array(pvals)
sorted_pvals = pvals_array
else:
pvals_array = pvals
sorted_pvals = np.sort(pvals_array).flatten()
fdr_zscore = 0.01
p = np.argwhere(
sorted_pvals <= fdr_zscore * np.arange(1, pvals_array.size + 1) / pvals_array.size
)
pval_cutoff = np.empty(0)
if not len(p) == 0:
pval_cutoff = sorted_pvals[p[len(p) - 1]]
if not (pval_cutoff.size == 0):
zscore_cutoff = ss.norm.ppf(1 - pval_cutoff)
else:
zscore_cutoff = 'na'
self._zscore_cutoff = zscore_cutoff
return self._zscore_cutoff
@property
def annual_harmonic_relative_ratio_pct_sig(self) -> np.ndarray:
"""
The percentage of points past the significance cutoff (p value <= 0.01) for the
annual harmonic relative to the average periodogram value
in a neighborhood of 50 frequencies around the annual frequency
"""
pvals = 1 - ss.f.cdf(self.annual_harmonic_relative_ratio, 2, 100)
sorted_pvals = np.sort(pvals)
if len(sorted_pvals[sorted_pvals <= 0.01]) == 0:
return 0
sig_cutoff = ss.f.ppf(1 - max(sorted_pvals[sorted_pvals <= 0.01]), 2, 50)
pct_sig = 100 * np.mean(self.annual_harmonic_relative_ratio > sig_cutoff)
return pct_sig
@property
def zscore_percent_significant(self) -> np.ndarray:
"""
The percent of points where the zscore is considered significant
"""
if not self._is_memoized('_zscore_percent_significant'):
pvals = 2 * (1 - ss.norm.cdf(np.abs(self.zscore)))
if isinstance(pvals, np.float64):
pvals_array = np.array(pvals)
sorted_pvals = pvals_array
else:
pvals_array = pvals
sorted_pvals = np.sort(pvals_array).flatten()
fdr_zscore = 0.01
p = np.argwhere(sorted_pvals <= fdr_zscore * np.arange(1, pvals.size + 1) / pvals.size)
pval_cutoff = sorted_pvals[p[len(p) - 1]]
if not (pval_cutoff.size == 0):
sig_locs = np.argwhere(pvals <= pval_cutoff)
percent_sig = 100 * np.size(sig_locs, 0) / pvals.size
else:
percent_sig = 0
self._zscore_percent_significant = percent_sig
return self._zscore_percent_significant
[docs] def get_calc(self, name: str, q: Optional[int] = 0.5, grouping: Optional[str] = None, ddof=1):
"""
Gets a calc aggregated across one or more dimensions of the dataset
Parameters
==========
name : str
The name of the calc (must be identical to a property name)
q: float, optional
(default 0.5)
Returns
=======
out : xarray.DataArray
A DataArray of the same size and dimensions the original dataarray,
minus those dimensions that were aggregated across.
"""
if isinstance(name, str):
if name == 'ns_con_var':
return self.ns_con_var
if name == 'ew_con_var':
return self.ew_con_var
if name == 'mean':
return self.mean
if name == 'std':
self._ddof = ddof
return self.std
if name == 'standardized_mean':
self._grouping = grouping
return self.standardized_mean
if name == 'variance':
return self.variance
if name == 'pooled_var_ratio':
return self.pooled_variance_ratio
if name == 'prob_positive':
return self.prob_positive
if name == 'prob_negative':
return self.prob_negative
if name == 'num_positive':
return self.num_positive
if name == 'num_negative':
return self.num_negative
if name == 'num_zero':
return self.num_zero
if name == 'odds_positive':
self._grouping = grouping
return self.odds_positive
if name == 'zscore':
return self.zscore
if name == 'mae_day_max':
return self.mae_day_max
if name == 'mean_abs':
return self.mean_abs
if name == 'mean_squared':
return self.mean_squared
if name == 'rms':
return self.root_mean_squared
if name == 'sum':
return self.sum
if name == 'sum_squared':
return self.sum_squared
if name == 'ann_harmonic_ratio':
return self.annual_harmonic_relative_ratio
if name == 'quantile':
self.quantile = q
return self.quantile_value
if name == 'lag1':
return self.lag1
if name == 'lag1_first_difference':
return self.lag1_first_difference
if name == 'max_abs':
return self.max_abs
if name == 'min_abs':
return self.min_abs
if name == 'max_val':
return self.max_val
if name == 'min_val':
return self.min_val
if name == 'ds':
return self._ds
raise ValueError(f'there is no calc with the name: {name}.')
else:
raise TypeError('name must be a string.')
[docs] def get_single_calc(self, name: str):
"""
Gets a calc consisting of a single float value
Parameters
==========
name : str
the name of the calc (must be identical to a property name)
Returns
=======
out : float
The calc value
"""
if isinstance(name, str):
if name == 'zscore_cutoff':
return self.zscore_cutoff
if name == 'zscore_percent_significant':
return self.zscore_percent_significant
if name == 'range':
return self.dyn_range
if name == 'spre_tol':
return self.spre_tol
if name == 'pooled_variance':
return self.pooled_variance
if name == 'annual_harmonic_relative_ratio_pct_sig':
return self.annual_harmonic_relative_ratio_pct_sig
raise ValueError(f'there is no calcs with the name: {name}.')
else:
raise TypeError('name must be a string.')
[docs]class Diffcalcs:
"""
This class contains calcs on the overall dataset that require more than one input dataset to compute
"""
def __init__(
self,
ds1: xr.DataArray,
ds2: xr.DataArray,
aggregate_dims: Optional[list] = None,
**calcs_kwargs,
) -> None:
if isinstance(ds1, xr.DataArray) and isinstance(ds2, xr.DataArray):
# Datasets
self._ds1 = ds1
self._ds2 = ds2
else:
raise TypeError(
f'ds must be of type xarray.DataArray. Type(s): {str(type(ds1))} {str(type(ds2))}'
)
self._calcs1 = Datasetcalcs(self._ds1, aggregate_dims, **calcs_kwargs)
self._calcs2 = Datasetcalcs(self._ds2, aggregate_dims, **calcs_kwargs)
self._aggregate_dims = aggregate_dims
self._pcc = None
self._covariance = None
self._ks_p_value = None
self._n_rms = None
self._n_emax = None
self._spatial_rel_error = None
self._ssim_value = None # uses images
self._ssim_value_fp_old = None # like the old zchecker one
self._ssim_value_fp_fast = None # faster use - use this for Data SSIM
self._ssim_value_fp_fast2 = None # orig faster version
self._ssim_value_fp_orig = None # slower version
self._max_spatial_rel_error = None
self._ssim_mat_fp = None
self._ssim_mat = None
self._ssim_mat_fp_orig = None
self._ssim_mat_fp_fast2 = None
def _is_memoized(self, calc_name: str) -> bool:
return hasattr(self, calc_name) and (self.__getattribute__(calc_name) is not None)
# n is the box width (1D)
# sigma is the radius for the gaussian
def _oned_gauss(self, n, sigma):
r = range(-int(n / 2), int(n / 2) + 1)
return [(1 / (sigma * sqrt(2 * pi))) * exp(-float(x) ** 2 / (2 * sigma ** 2)) for x in r]
# this adjusts the boundary area after filtering with astropy
# to account for the fact that we want to divide only by the weight in
# the domain at the edges (this is used in fast2)
def _filter_adjust_edges(self, mydata, kernel, k):
k_sum = kernel.array.sum()
X = mydata.shape[0]
Y = mydata.shape[1]
# R and L rectangles and T and B rectangles
for j in range(k):
kk = kernel.array[:, 0 : k + j + 1].sum()
scale = k_sum / kk
# L and R
mydata[k : X - k, j] = mydata[k : X - k, j] * scale
mydata[k : X - k, Y - j - 1] = mydata[k : X - k, Y - j - 1] * scale
# T and B
mydata[j, k : Y - k] = mydata[j, k : Y - k] * scale
mydata[X - j - 1, k : Y - k] = mydata[X - j - 1, k : Y - k] * scale
# corners
for i in range(k):
for j in range(k):
kk = kernel.array[0 : k + i + 1, 0 : k + j + 1].sum()
scale = k_sum / kk
# top left
mydata[i, j] = mydata[i, j] * scale
# top right
mydata[i, Y - j - 1] = mydata[i, Y - j - 1] * scale
# bottom left
mydata[X - i - 1, j] = mydata[X - i - 1, j] * scale
# bottom right
mydata[X - i - 1, Y - j - 1] = mydata[X - i - 1, Y - j - 1] * scale
def plot_ssim_mat(self, return_mat=False, ssim_type='ssim_fp'):
if ssim_type == 'orig':
if self._ssim_value is None:
self.ssim_value
mats = self._ssim_mat
elif ssim_type == 'ssim_fp_orig':
if self._ssim_value_fp_orig is None:
self.ssim_value_fp_orig
mats = self._ssim_mat_fp_orig
elif ssim_type == 'ssim_fp_fast2':
if self._ssim_value_fp_fast2 is None:
self.ssim_value_fp_fast2
mats = self._ssim_mat_fp_fast2
else:
if self._ssim_value_fp_fast is None:
self.ssim_value_fp_fast
mats = self._ssim_mat_fp
num = len(mats)
meana = np.zeros(num)
for i in range(num):
meana[i] = np.nanmean(mats[i])
# for 3D which is the min level
min_meana = meana.min()
min_lev = meana.argmin()
# smallest value at that level
min_lev_val = np.nanmin(mats[min_lev])
ind = np.unravel_index(np.nanargmin(mats[min_lev], axis=None), mats[min_lev].shape)
plt.imshow(mats[min_lev], interpolation='none', vmax=1.0, cmap='bone')
plt.colorbar(orientation='horizontal')
if num == 1:
mytitle = f'ssim val = {min_meana:.4f}, min = {min_lev_val:.4f} at {ind}'
else:
mytitle = (
f'ssim val = {min_meana:.4f}, min = {min_lev_val:.4f} at {ind} on lev={min_lev}'
)
plt.title(mytitle)
# plt.show()
if return_mat:
return mats
@property
def covariance(self) -> xr.DataArray:
"""
The covariance between the two datasets
"""
if not self._is_memoized('_covariance'):
# need to use unweighted means
c1_mean = self._calcs1.get_calc('ds').mean(skipna=True)
c2_mean = self._calcs2.get_calc('ds').mean(skipna=True)
self._covariance = (
(self._calcs2.get_calc('ds') - c2_mean) * (self._calcs1.get_calc('ds') - c1_mean)
).mean()
return self._covariance
@property
def ks_p_value(self):
"""
The Kolmogorov-Smirnov p-value
"""
# Note: ravel() forces a compute for dask, but ks test in scipy can't
# work with uncomputed dask arrays
if not self._is_memoized('_ks_p_value'):
d1_p = (np.ravel(self._ds1)).astype('float64')
d2_p = (np.ravel(self._ds2)).astype('float64')
self._ks_p_value = np.asanyarray(ss.ks_2samp(d2_p, d1_p))
return self._ks_p_value[1]
@property
def pearson_correlation_coefficient(self):
"""
returns the pearson correlation coefficient between the two datasets
"""
if not self._is_memoized('_pearson_correlation_coefficient'):
# we need to do this with unweighted data
c1_std = self._calcs1.get_calc('ds').std(skipna=True)
c2_std = self._calcs2.get_calc('ds').std(skipna=True)
cov = self.covariance
self._pcc = cov / c1_std / c2_std
return self._pcc
@property
def normalized_max_pointwise_error(self):
"""
The absolute value of the maximum pointwise difference, normalized
by the range of values for the first set
"""
if not self._is_memoized('_normalized_max_pointwise_error'):
tt = abs((self._calcs1.get_calc('ds') - self._calcs2.get_calc('ds')).max())
self._n_emax = tt / self._calcs1.dyn_range
return self._n_emax
@property
def normalized_root_mean_squared(self):
"""
The absolute value of the mean along the aggregate dimensions, normalized
by the range of values for the first set
"""
if not self._is_memoized('_normalized_root_mean_squared'):
tt = np.sqrt(
np.square(self._calcs1.get_calc('ds') - self._calcs2.get_calc('ds')).mean(
dim=self._aggregate_dims
)
)
self._n_rms = tt / self._calcs1.dyn_range
return self._n_rms
@property
def spatial_rel_error(self):
"""
At each grid point, we compute the relative error. Then we report the percentage of grid point whose
relative error is above the specified tolerance (1e-4 by default).
"""
if not self._is_memoized('_spatial_rel_error'):
sp_tol = self._calcs1.spre_tol
# unraveling converts the dask array to numpy, but then
# we can assign the 1.0 and avoid zero (couldn't figure another way)
t1 = np.ravel(self._calcs1.get_calc('ds'))
t2 = np.ravel(self._calcs2.get_calc('ds'))
# check for zeros in t1 (if zero then change to 1 - which
# does an absolute error at that point)
z = (np.where(abs(t1) == 0))[0]
if z.size > 0:
t1_denom = np.copy(t1)
t1_denom[z] = 1.0
else:
t1_denom = t1
# we don't want to use nan
# (ocassionally in cam data - often in ocn)
m_t2 = np.ma.masked_invalid(t2).compressed()
m_t1 = np.ma.masked_invalid(t1).compressed()
if m_t2.shape != m_t1.shape:
print('Warning: Spatial error not calculated with differing numbers of Nans')
self._spatial_rel_error = 0
self._max_spatial_rel_error = 0
else:
if z.size > 0:
m_t1_denom = np.ma.masked_invalid(t1_denom).compressed()
else:
m_t1_denom = m_t1
m_tt = m_t1 - m_t2
m_tt = m_tt / m_t1_denom
# find the max spatial error also if None
if self._max_spatial_rel_error is None:
max_spre = np.max(m_tt)
self._max_spatial_rel_error = max_spre
# percentage greater than the tolerance
a = len(m_tt[abs(m_tt) > sp_tol])
sz = m_tt.shape[0]
self._spatial_rel_error = (a / sz) * 100
return self._spatial_rel_error
@property
def max_spatial_rel_error(self):
"""
We compute the relative error at each grid point and return the maximun.
"""
# this is also computed as part of the spatial rel error
if not self._is_memoized('_max_spatial_rel_error'):
t1 = np.ravel(self._calcs1.get_calc('ds'))
t2 = np.ravel(self._calcs2.get_calc('ds'))
# check for zeros in t1 (if zero then change to 1 - which
# does an absolute error at that point)
z = (np.where(abs(t1) == 0))[0]
if z.size > 0:
t1_denom = np.copy(t1)
t1_denom[z] = 1.0
else:
t1_denom = t1
# we don't want to use nan
# (occassionally in cam data - often in ocn)
m_t2 = np.ma.masked_invalid(t2).compressed()
m_t1 = np.ma.masked_invalid(t1).compressed()
if z.size > 0:
m_t1_denom = np.ma.masked_invalid(t1_denom).compressed()
else:
m_t1_denom = m_t1
m_tt = m_t1 - m_t2
m_tt = m_tt / m_t1_denom
max_spre = np.max(abs(m_tt))
self._max_spatial_rel_error = max_spre
return self._max_spatial_rel_error
@property
def ssim_value(self):
"""
We compute the SSIM (structural similarity index) on the visualization of the spatial data.
"""
import tempfile
import skimage.io
import skimage.metrics
from skimage.metrics import structural_similarity as ssim
if not self._is_memoized('_ssim_value'):
# Prevent showing stuff
backend_ = mpl.get_backend()
mpl.use('Agg')
d1 = self._calcs1.get_calc('ds')
d2 = self._calcs2.get_calc('ds')
lat1 = d1[self._calcs1._lat_coord_name]
lat2 = d2[self._calcs2._lat_coord_name]
lon1 = d1[self._calcs1._lon_coord_name]
lon2 = d2[self._calcs2._lon_coord_name]
latdim = d1.cf[self._calcs1._lon_coord_name].ndim
central = 0.0 # might make this a parameter later
if latdim == 2: # probably pop
central = 300.0
# make periodic
if latdim == 2:
cy_lon1 = np.hstack((lon1, lon1[:, 0:1]))
cy_lon2 = np.hstack((lon2, lon2[:, 0:1]))
cy_lat1 = np.hstack((lat1, lat1[:, 0:1]))
cy_lat2 = np.hstack((lat2, lat2[:, 0:1]))
cy1 = add_cyclic_point(d1)
cy2 = add_cyclic_point(d2)
else: # 1d
cy1, cy_lon1 = add_cyclic_point(d1, coord=lon1)
cy2, cy_lon2 = add_cyclic_point(d2, coord=lon2)
cy_lat1 = lat1
cy_lat2 = lat2
no_inf_d1 = np.nan_to_num(cy1, nan=np.nan)
no_inf_d2 = np.nan_to_num(cy2, nan=np.nan)
# is it 3D? must do each level
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
color_min = min(
np.min(d1.where(d1 != -np.inf)).values.min(),
np.min(d2.where(d2 != -np.inf)).values.min(),
)
color_max = max(
np.max(d1.where(d1 != np.inf)).values.max(),
np.max(d2.where(d2 != np.inf)).values.max(),
)
mymap = copy.copy(plt.cm.get_cmap('coolwarm'))
mymap.set_under(color='black')
mymap.set_over(color='white')
mymap.set_bad(alpha=0)
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
for this_lev in range(nlevels):
with tempfile.TemporaryDirectory() as tmpdirname:
filename_1, filename_2 = (
f'{tmpdirname}/t_ssim1.png',
f'{tmpdirname}/t_ssim2.png',
)
if nlevels == 1:
this_no_inf_d1 = no_inf_d1
this_no_inf_d2 = no_inf_d2
else:
# this assumes time level is first (TO DO: verify)
this_no_inf_d1 = no_inf_d1[this_lev, :, :]
this_no_inf_d2 = no_inf_d2[this_lev, :, :]
fig = plt.figure(dpi=300, figsize=(9, 2.5))
ax1 = plt.subplot(1, 2, 1, projection=ccrs.Robinson(central_longitude=central))
ax1.set_facecolor('#39ff14')
ax1.pcolormesh(
cy_lon1,
cy_lat1,
this_no_inf_d1,
transform=ccrs.PlateCarree(),
cmap=mymap,
vmin=color_min,
vmax=color_max,
)
ax1.set_global()
ax1.coastlines(linewidth=0.5)
ax1.axis('off')
plt.margins(0, 0)
extent1 = ax1.get_window_extent().transformed(fig.dpi_scale_trans.inverted())
ax1.imshow
plt.savefig(filename_1, bbox_inches=extent1, transparent=True, pad_inches=0)
ax1.axis('on')
ax2 = plt.subplot(1, 2, 2, projection=ccrs.Robinson(central_longitude=central))
ax2.set_facecolor('#39ff14')
ax2.pcolormesh(
cy_lon2,
cy_lat2,
this_no_inf_d2,
transform=ccrs.PlateCarree(),
cmap=mymap,
vmin=color_min,
vmax=color_max,
)
ax2.set_global()
ax2.coastlines(linewidth=0.5)
plt.margins(0, 0)
ax2.imshow
ax2.axis('off')
extent2 = ax2.get_window_extent().transformed(fig.dpi_scale_trans.inverted())
plt.savefig(filename_2, bbox_inches=extent2, transparent=True, pad_inches=0)
ax2.axis('on')
img1 = skimage.io.imread(filename_1)
img2 = skimage.io.imread(filename_2)
# scikit is adding an alpha channel for some reason - get rid of it
img1 = img1[:, :, :3]
img2 = img2[:, :, :3]
# s = ssim(img1, img2, multichannel=True)
# the following version closer to matlab version (and orig ssim paper)
s, ssim_mat = ssim(
img1,
img2,
multichannel=True,
gaussian_weights=True,
use_sample_covariance=False,
full=True,
)
# print(s)
ssim_levs[this_lev] = s
plt.close(fig)
ssim_mats_array.append(np.mean(ssim_mat, axis=2))
return_ssim = ssim_levs.min()
# Reset backend
mpl.use(backend_)
# save full matrix
self._ssim_mat = ssim_mats_array
self._ssim_value = return_ssim
return self._ssim_value
@property
def ssim_value_fp_orig(self):
"""
We compute the SSIM (structural similarity index) on the spatial data
- using the data itself (we do not create an image).
Here we scale from [0,1] - then quantize to 256 bins
"""
if not self._is_memoized('_ssim_value_fp'):
# if this is a 3D variable, we will do each level seperately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
# re-scale to [0,1] - if not constant
smin = min(np.nanmin(a1), np.nanmin(a2))
smax = max(np.nanmax(a1), np.nanmax(a2))
r = smax - smin
if r == 0.0: # scale by smax if fiels is a constant (and smax != 0)
if smax == 0.0:
sc_a1 = a1
sc_a2 = a2
else:
sc_a1 = a1 / smax
sc_a2 = a2 / smax
else:
sc_a1 = (a1 - smin) / r
sc_a2 = (a2 - smin) / r
# now quantize to 256 bins
sc_a1 = np.round(sc_a1 * 255) / 255
sc_a2 = np.round(sc_a2 * 255) / 255
# gaussian filter
n = 11 # recommended window size
k = 5
# extent
sigma = 1.5
X = sc_a1.shape[0]
Y = sc_a1.shape[1]
g_w = np.array(self._oned_gauss(n, sigma))
# 2D gauss weights
gg_w = np.outer(g_w, g_w)
# init ssim matrix
ssim_mat = np.zeros_like(sc_a1)
my_eps = 1.0e-15
# DATA LOOP
# go through 2D arrays - each grid point x0, y0 has
# a 2D window [x0 - k, x0+k] [y0 - k, y0 + k]
for i in range(X):
# don't go over boundaries
imin = max(0, i - k)
imax = min(X - 1, i + k)
for j in range(Y):
if np.isnan(sc_a1[i, j]):
# SKIP IF gridpoint is nan
ssim_mat[i, j] = np.nan
continue
jmin = max(0, j - k)
jmax = min(Y - 1, j + k)
# WINDOW CALC
a1_win = sc_a1[imin : imax + 1, jmin : jmax + 1]
a2_win = sc_a2[imin : imax + 1, jmin : jmax + 1]
# if window is by boundary, then it is not 11x11 and we must adjust weights also
if min(a1_win.shape) < n:
Wt = gg_w[
imin + k - i : imax + k - i + 1, jmin + k - j : jmax + k - j + 1
]
else:
Wt = gg_w
# weighted means (TO DO: what if indices are not the same)
indices1 = ~np.isnan(a1_win)
indices2 = ~np.isnan(a2_win)
if not np.all(indices1 == indices2):
print('SSIM ERROR: indices are not the same!')
a1_mu = np.average(a1_win[indices1], weights=Wt[indices1])
a2_mu = np.average(a2_win[indices2], weights=Wt[indices2])
# weighted std squared (variance)
a1_std_sq = (
np.average((a1_win[indices1] * a1_win[indices1]), weights=Wt[indices1])
- a1_mu * a1_mu
)
a2_std_sq = (
np.average((a2_win[indices2] * a2_win[indices2]), weights=Wt[indices2])
- a2_mu * a2_mu
)
# cov of a1 and a2
a1a2_cov = (
np.average(
(a1_win[indices1] * a2_win[indices2]),
weights=Wt[indices1],
)
- a1_mu * a2_mu
)
# SSIM for this window
# first term
ssim_t1 = 2 * a1_mu * a2_mu
ssim_b1 = a1_mu * a1_mu + a2_mu * a2_mu
C1 = my_eps
ssim_t1 = ssim_t1 + C1
ssim_b1 = ssim_b1 + C1
# second term
ssim_t2 = 2 * a1a2_cov
ssim_b2 = a1_std_sq + a2_std_sq
C2 = C3 = my_eps
ssim_t2 = ssim_t2 + C3
ssim_b2 = ssim_b2 + C2
ssim_1 = ssim_t1 / ssim_b1
ssim_2 = ssim_t2 / ssim_b2
ssim_mat[i, j] = ssim_1 * ssim_2
mean_ssim = np.nanmean(ssim_mat)
ssim_levs[this_lev] = mean_ssim
ssim_mats_array.append(ssim_mat)
return_ssim = ssim_levs.min()
self._ssim_value_fp_orig = return_ssim
# save full matrix
self._ssim_mat_fp_orig = ssim_mats_array
return self._ssim_value_fp_orig
@property
def ssim_value_fp_fast2(self):
"""
Faster implementation then ssim_value_fp_orig
Use other version below - not this one
"""
from astropy.convolution import Gaussian2DKernel, convolve, interpolate_replace_nans
if not self._is_memoized('_ssim_value_fp_fast2'):
# if this is a 3D variable, we will do each level seperately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
my_eps = 1.0e-15
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
# re-scale to [0,1] - if not constant
smin = min(np.nanmin(a1), np.nanmin(a2))
smax = max(np.nanmax(a1), np.nanmax(a2))
r = smax - smin
if r == 0.0: # scale by smax if field is a constant (and smax != 0)
if smax == 0.0:
sc_a1 = a1
sc_a2 = a2
else:
sc_a1 = a1 / smax
sc_a2 = a2 / smax
else:
sc_a1 = (a1 - smin) / r
sc_a2 = (a2 - smin) / r
# now quantize to 256 bins
sc_a1 = np.round(sc_a1 * 255) / 255
sc_a2 = np.round(sc_a2 * 255) / 255
# gaussian filter
kernel = Gaussian2DKernel(x_stddev=1.5, x_size=11, y_size=11)
k = 5
filter_args = {'boundary': 'fill', 'preserve_nan': True}
a1_mu = convolve(sc_a1, kernel, **filter_args)
self._filter_adjust_edges(a1_mu, kernel, k)
a2_mu = convolve(sc_a2, kernel, **filter_args)
self._filter_adjust_edges(a2_mu, kernel, k)
a1a1 = convolve(sc_a1 * sc_a1, kernel, **filter_args)
self._filter_adjust_edges(a1a1, kernel, k)
a2a2 = convolve(sc_a2 * sc_a2, kernel, **filter_args)
self._filter_adjust_edges(a2a2, kernel, k)
a1a2 = convolve(sc_a1 * sc_a2, kernel, **filter_args)
self._filter_adjust_edges(a1a2, kernel, k)
###########
var_a1 = a1a1 - a1_mu * a1_mu
var_a2 = a2a2 - a2_mu * a2_mu
cov_a1a2 = a1a2 - a1_mu * a2_mu
# ssim constants
C1 = my_eps
C2 = my_eps
ssim_t1 = 2 * a1_mu * a2_mu + C1
ssim_t2 = 2 * cov_a1a2 + C2
ssim_b1 = a1_mu * a1_mu + a2_mu * a2_mu + C1
ssim_b2 = var_a1 + var_a2 + C2
ssim_1 = ssim_t1 / ssim_b1
ssim_2 = ssim_t2 / ssim_b2
ssim_mat = ssim_1 * ssim_2
# cropping (temp)
ssim_mat = crop(ssim_mat, k)
mean_ssim = np.nanmean(ssim_mat)
ssim_levs[this_lev] = mean_ssim
ssim_mats_array.append(ssim_mat)
# end of levels calculation
return_ssim = ssim_levs.min()
self._ssim_value_fp_fast2 = return_ssim
# save full matrix
self._ssim_mat_fp_fast2 = ssim_mats_array
return self._ssim_value_fp_fast2
@property
def ssim_value_fp_fast(self):
"""
Faster implementation then ssim_value_fp_orig
"""
from astropy.convolution import Gaussian2DKernel, convolve, interpolate_replace_nans
if not self._is_memoized('_ssim_value_fp_fast'):
# if this is a 3D variable, we will do each level seperately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
my_eps = 1.0e-8
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
# re-scale to [0,1] - if not constant
smin = min(np.nanmin(a1), np.nanmin(a2))
smax = max(np.nanmax(a1), np.nanmax(a2))
r = smax - smin
if r == 0.0: # scale by smax if field is a constant (and smax != 0)
if smax == 0.0:
sc_a1 = a1
sc_a2 = a2
else:
sc_a1 = a1 / smax
sc_a2 = a2 / smax
else:
sc_a1 = (a1 - smin) / r
sc_a2 = (a2 - smin) / r
# now quantize to 256 bins
sc_a1 = np.round(sc_a1 * 255) / 255
sc_a2 = np.round(sc_a2 * 255) / 255
# gaussian filter
kernel = Gaussian2DKernel(x_stddev=1.5, x_size=11, y_size=11)
k = 5
filter_args = {'boundary': 'fill', 'preserve_nan': True}
a1_mu = convolve(sc_a1, kernel, **filter_args)
a2_mu = convolve(sc_a2, kernel, **filter_args)
a1a1 = convolve(sc_a1 * sc_a1, kernel, **filter_args)
a2a2 = convolve(sc_a2 * sc_a2, kernel, **filter_args)
a1a2 = convolve(sc_a1 * sc_a2, kernel, **filter_args)
###########
var_a1 = a1a1 - a1_mu * a1_mu
var_a2 = a2a2 - a2_mu * a2_mu
cov_a1a2 = a1a2 - a1_mu * a2_mu
# ssim constants
C1 = my_eps
C2 = my_eps
ssim_t1 = 2 * a1_mu * a2_mu + C1
ssim_t2 = 2 * cov_a1a2 + C2
ssim_b1 = a1_mu * a1_mu + a2_mu * a2_mu + C1
ssim_b2 = var_a1 + var_a2 + C2
ssim_1 = ssim_t1 / ssim_b1
ssim_2 = ssim_t2 / ssim_b2
ssim_mat = ssim_1 * ssim_2
# cropping (the border region)
ssim_mat = crop(ssim_mat, k)
mean_ssim = np.nanmean(ssim_mat)
ssim_levs[this_lev] = mean_ssim
ssim_mats_array.append(ssim_mat)
# end of levels calculation
return_ssim = ssim_levs.min()
self._ssim_value_fp_fast = return_ssim
# save full matrix
self._ssim_mat_fp = ssim_mats_array
return self._ssim_value_fp_fast
@property
def ssim_value_fp_old(self):
"""To mimic what zchecker does - the ssim on the fp data with
original constants and no scaling. This will return Nan on POP data.
"""
import numpy as np
from skimage.metrics import structural_similarity as ssim
if not self._is_memoized('_ssim_value_fp_old'):
# if this is a 3D variable, we will just do level 0 for now...
# (consider doing each level seperately)
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
a1 = self._calcs1.get_calc('ds').isel({vname: 0}).data
a2 = self._calcs2.get_calc('ds').isel({vname: 0}).data
else:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
maxr = max(a1.max(), a2.max())
minr = min(a1.min(), a2.min())
myrange = maxr - minr
mean_ssim = ssim(
a1,
a2,
multichannel=False,
data_range=myrange,
gaussian_weights=True,
use_sample_covariance=False,
)
self._ssim_value_fp_old = mean_ssim
return self._ssim_value_fp_old
[docs] def get_diff_calc(self, name: str):
"""
Gets a calc on the dataset that requires more than one input dataset
Parameters
==========
name : str
The name of the calc (must be identical to a property name)
Returns
=======
out : float
"""
if isinstance(name, str):
if name == 'pearson_correlation_coefficient':
return self.pearson_correlation_coefficient
if name == 'covariance':
return self.covariance
if name == 'ks_p_value':
return self.ks_p_value
if name == 'n_rms':
return self.normalized_root_mean_squared
if name == 'n_emax':
return self.normalized_max_pointwise_error
if name == 'spatial_rel_error':
return self.spatial_rel_error
if name == 'max_spatial_rel_error':
return self.max_spatial_rel_error
if name == 'ssim':
return self.ssim_value
if name == 'ssim_fp_orig':
return self.ssim_value_fp_orig
if name == 'ssim_fp':
return self.ssim_value_fp_fast
if name == 'ssim_fp_fast2':
return self.ssim_value_fp_fast2
if name == 'ssim_fp_old':
return self.ssim_value_fp_old
raise ValueError(f'there is no calc with the name: {name}.')
else:
raise TypeError('name must be a string.')