import copy
import ctypes
import gzip
import struct
import tempfile
import time
from math import exp, pi, sqrt
from typing import Optional
import cf_xarray as cf
import dask
import matplotlib as mpl
import numpy as np
import pandas as pd
import skimage.io
import skimage.metrics
import statsmodels.api as sm
import xarray as xr
from astropy.convolution import Gaussian2DKernel, convolve, interpolate_replace_nans
from cartopy import crs as ccrs
from cartopy.util import add_cyclic_point
from matplotlib import pyplot as plt
from scipy import stats as ss
from scipy.ndimage import gaussian_filter
from scipy.signal import stft
from skimage.metrics import structural_similarity as ssim
from skimage.util import crop
from xrft import dft
# from .collect_datasets import collect_datasets
xr.set_options(keep_attrs=True)
[docs]
class Datasetcalcs:
"""
This class contains calcs for each point of a dataset after aggregating across one or more dimensions, and a method to access these calcs. Expects a DataArray.
"""
def __init__(
self,
ds: xr.DataArray,
data_type: str,
aggregate_dims: list,
time_dim_name: str = None,
lat_dim_name: str = None,
lon_dim_name: str = None,
vert_dim_name: str = None,
lat_coord_name: str = None,
lon_coord_name: str = None,
q: float = 0.5,
weighted=True,
):
self._ds = ds if (ds.dtype == np.float64) else ds.astype(np.float64)
# For some reason, casting to float64 removes all attrs from the dataset
self._ds.attrs = ds.attrs
if data_type == 'wrf':
weighted = False
if weighted:
if 'cell_measures' not in self._ds.attrs:
self._ds.attrs['cell_measures'] = 'area: cell_area'
# Let's just get all the lat/lon and time
# names from the file if they are None
# lon coord
if lon_coord_name is None:
lon_coord_name = ds.cf.coordinates['longitude'][0]
self._lon_coord_name = lon_coord_name
# lat coord
if lat_coord_name is None:
lat_coord_name = ds.cf.coordinates['latitude'][0]
self._lat_coord_name = lat_coord_name
# WRF ALSO HAS XLAT_U and XLONG_U, XLAT_v and XLONG_V
dd = ds.cf[ds.cf.coordinates['latitude'][0]].dims
ll = len(dd)
if data_type == 'cam-fv': # ll == 1:
if lat_dim_name is None:
lat_dim_name = dd[0]
if lon_dim_name is None:
lon_dim_name = ds.cf['longitude'].dims[0]
elif data_type == 'pop': # ll == 2:
if lat_dim_name is None:
lat_dim_name = dd[0]
if lon_dim_name is None:
lon_dim_name = dd[1]
elif data_type == 'wrf':
if lat_dim_name is None:
lat_dim_name = dd[0]
if lon_dim_name is None:
lon_dim_name = dd[1]
else:
print('Warning: unknown data_type: ', data_type)
self._latlon_dims = ll
self._lat_dim_name = lat_dim_name
self._lon_dim_name = lon_dim_name
# vertical dimension?
if vert_dim_name is None:
if data_type == 'wrf':
vert = 'z' in ds.dims
if vert:
vert_dim_name = 'z'
else:
vert = 'vertical' in ds.cf
if vert:
vert_dim_name = ds.cf['vertical'].name
self._vert_dim_name = vert_dim_name
# time dimension
if time_dim_name is None:
if 'time' in ds.cf.coordinates.keys():
time_dim_name = ds.cf.coordinates['time'][0]
else:
time_dim_name = None
self._time_dim_name = time_dim_name
self._quantile = q
self._agg_dims = aggregate_dims
self._frame_size = 1
self._data_type = data_type
# array calcs
self._weighted = weighted
self._ns_con_var = None
self._ew_con_var = None
self._mean = None
self._mean_abs = None
self._std = None
self._ddof = 1
self._num_positive = None
self._num_negative = None
self._num_zero = None
self._prob_positive = None
self._odds_positive = None
self._prob_negative = None
self._zscore = None
self._mae_day_max = None
self._lag1 = None
self._lag1_first_difference = None
self._quantile_value = None
self._mean_squared = None
self._root_mean_squared = None
self._sum = None
self._sum_squared = None
self._variance = None
self._pooled_variance = None
self._pooled_variance_ratio = None
self._standardized_mean = None
self._max_abs = None
self._min_abs = None
self._min_abs_nonzero = None
self._min_val_nonzero = None
self._d_range = None
self._min_val = None
self._max_val = None
self._grouping = None
self._annual_harmonic_relative_ratio = None
self._lon_autocorr = None
self._lat_autocorr = None
self._lev_autocorr = None
self._entropy = None
self._w_e_first_differences = None
self._w_e_first_differences_max = None
self._n_s_first_differences = None
self._n_s_first_differences_max = None
self._w_e_derivative = None
self._fft2 = None
self._percent_unique = None
self._most_repeated = None
self._most_repeated_pct = None
self._cdf = None
self._dtype = ds.dtype
self._stft = None
self._fftratio = None
self._fftmax = None
self._vfftratio = None
self._vfftmax = None
self._magnitude_range = None
self._magnitude_diff_ew = None
self._magnitude_diff_ns = None
self._real_information = None
self._real_information_cutoff = None
# single value calcs
self._zscore_cutoff = None
self._zscore_percent_significant = None
adims = self._agg_dims
self._not_agg_dims = []
if adims is None:
adims = self._ds.dims
else:
for d in self._ds.dims:
if d not in adims:
self._not_agg_dims.append(d)
# for probability functions, what is the size
if aggregate_dims is not None:
for dim in aggregate_dims:
self._frame_size *= int(self._ds.sizes[dim])
else:
dp = np.count_nonzero(~np.isnan(self._ds))
self._frame_size = dp
def _is_memoized(self, calc_name: str) -> bool:
return hasattr(self, calc_name) and (self.__getattribute__(calc_name) is not None)
def _con_var(self, dir, dataset) -> xr.DataArray:
if dir == 'ns':
tt = dataset.diff(self._lat_dim_name, 1)
elif dir == 'ew':
ds_h = xr.concat(
[
dataset,
dataset.head({self._lon_dim_name: 1}),
],
dim=self._lon_dim_name,
)
tt = ds_h.diff(self._lon_dim_name, 1)
con_var = np.square(tt)
return con_var
@property
def dtype(self) -> xr.DataArray:
return self._dtype
@property
def pooled_variance(self) -> xr.DataArray:
"""
The overall variance of the dataset
"""
if not self._is_memoized('_pooled_variance_mean'):
self._pooled_variance = self._ds.var(self._agg_dims)
self._pooled_variance.attrs['cell_measures'] = self._ds.attrs['cell_measures']
if self._weighted:
self._pooled_variance_mean = self._pooled_variance.cf.weighted('area').mean()
else:
self._pooled_variance_mean = self._pooled_variance.mean()
self._pooled_variance_mean.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._pooled_variance_mean.attrs['units'] = f'{self._ds.units}$^2$'
return self._pooled_variance_mean
@property
def w_e_first_differences(self) -> xr.DataArray:
"""
First differences along the west-east direction
"""
if not self._is_memoized('_w_e_first_differences'):
# self._first_differences = self._ds.diff('lat').mean(self._agg_dims)
self._w_e_first_differences = self._ds.roll(
{self._lat_dim_name: -1}, roll_coords=False
) - self._ds.roll({self._lat_dim_name: 1}, roll_coords=False)
self._w_e_first_differences.attrs = self._ds.attrs
return self._w_e_first_differences.mean(self._agg_dims)
@property
def w_e_first_differences_max(self) -> xr.DataArray:
"""
First differences along the west-east direction
"""
if not self._is_memoized('_w_e_first_differences'):
# self._first_differences = self._ds.diff('lat').mean(self._agg_dims)
self._w_e_first_differences = self._ds.roll(
{self._lat_dim_name: -1}, roll_coords=False
) - self._ds.roll({self._lat_dim_name: 1}, roll_coords=False)
self._w_e_first_differences.attrs = self._ds.attrs
return self._w_e_first_differences.max(self._agg_dims)
@property
def n_s_first_differences(self) -> xr.DataArray:
"""
First differences along the north-south direction
"""
if not self._is_memoized('_n_s_first_differences'):
self._n_s_first_differences = self._ds.diff(self._lon_dim_name).mean(self._agg_dims)
# self._first_differences = self._ds.roll({"lon": -1}, roll_coords=False) - self._ds.roll({"lat": 1}, roll_coords=False)
self._n_s_first_differences.attrs = self._ds.attrs
return self._n_s_first_differences
@property
def n_s_first_differences_max(self) -> xr.DataArray:
"""
First differences along the n-s direction
"""
if not self._is_memoized('_n_s_first_differences'):
self._n_s_first_differences = self._ds.diff(self._lon_dim_name).max(self._agg_dims)
# self._first_differences = self._ds.roll({"lon": -1}, roll_coords=False) - self._ds.roll({"lat": 1}, roll_coords=False)
self._n_s_first_differences.attrs = self._ds.attrs
return self._n_s_first_differences
@property
def percent_unique(self) -> xr.DataArray:
"""
Percentage of unique values in the dataset
"""
if not self._is_memoized('_percent_unique'):
count_unique = len(pd.unique(self._ds.values.flatten()))
count_all = len(self._ds.values.flatten())
self._percent_unique = count_unique / count_all
return self._percent_unique
@property
def most_repeated(self) -> xr.DataArray:
"""
Most repeated value in dataset
"""
if not self._is_memoized('_most_repeated'):
self._most_repeated = ss.mode(self._ds.values.flatten())[0]
return self._most_repeated
@property
def most_repeated_percent(self) -> xr.DataArray:
"""
Most repeated value in dataset
"""
if not self._is_memoized('_most_repeated_pct'):
self._most_repeated_pct = ss.mode(self._ds.values.flatten())[1][0] / len(
self._ds.values.flatten()
)
return self._most_repeated_pct
# def log_max(ds):
# a_d = abs(ds)
# return np.log10(a_d, where=a_d.data > 0 ).max(skipna=True)
# return np.log10(abs(ds)).where(np.log10(abs(ds)) != -np.inf).max(skipna=True)
@property
def magnitude_range(self) -> xr.DataArray:
"""
The range of the dataset
"""
if not self._is_memoized('_magnitude_range'):
# Get the range in exponent space
def max_agg(ds):
return ds.max(skipna=True)
def min_agg(ds):
return ds.min(skipna=True)
# avoid divde by zero warning
# log_ds = np.log10(abs(self._ds)).where(np.log10(abs(self._ds)) != -np.inf)
a_d = abs(self._ds.copy())
log_ds = np.log10(a_d, where=a_d.data > 0)
if len(self._not_agg_dims) == 0:
my_max = max_agg(log_ds)
my_min = min_agg(log_ds)
else:
stack = log_ds.stack(multi_index=tuple(self._not_agg_dims))
my_max = stack.groupby('multi_index').map(max_agg)
my_min = stack.groupby('multi_index').map(min_agg)
if (
np.isinf(my_max).any()
or np.isinf(my_min).any()
or np.isnan(my_max).any()
or np.isnan(my_min).any()
):
self._magnitude_range = -1
return self._magnitude_range
else:
self._magnitude_range = my_max - my_min
return self._magnitude_range
@property
def magnitude_diff_ew(self) -> xr.DataArray:
"""
Maximum magnitude differences along ew direction
"""
if not self._is_memoized('_magnitude_diff_ew'):
# self._first_differences = self._ds.diff('lat').mean(self._agg_dims)
here = np.log10(abs(self._ds.roll({'lon': 0}, roll_coords=False)))
there = np.log10(abs(self._ds.roll({'lon': 1}, roll_coords=False)))
if (
np.isinf(here).any()
or np.isinf(there).any()
or np.isnan(here).any()
or np.isnan(there).any()
):
self._magnitude_diff_ew_int = -1
return xr.DataArray(self._magnitude_diff_ew_int)
else:
self._magnitude_diff_ew_int = here - there
self._magnitude_diff_ew = xr.DataArray(self._magnitude_diff_ew_int)
self._magnitude_diff_ew.attrs = self._ds.attrs
return self._magnitude_diff_ew.max(self._agg_dims, skipna=True)
@property
def magnitude_diff_ns(self) -> xr.DataArray:
"""
Maximum magnitude irst difference along the n-s direction
"""
if not self._is_memoized('_magnitude_diff_ns'):
max = np.log10(abs(self._ds.diff('lat').max(self._agg_dims)))
min = np.log10(abs(self._ds.diff('lat').min(self._agg_dims)))
if (
np.isinf(max).any()
or np.isinf(min).any()
or np.isnan(max).any()
or np.isnan(min).any()
):
self._magnitude_diff_ns_int = -1
return xr.DataArray(self._magnitude_diff_ns_int)
else:
self._magnitude_diff_ns_int = max - min
# self._first_differences = self._ds.roll({"lon": -1}, roll_coords=False) - self._ds.roll({"lat": 1}, roll_coords=False)
self._magnitude_diff_ns = xr.DataArray(self._magnitude_diff_ns_int)
self._magnitude_diff_ns.attrs = self._ds.attrs
return self._magnitude_diff_ns.max(self._agg_dims, skipna=True)
@property
def range(self) -> xr.DataArray:
"""
The range of the dataset
"""
return self.max_val - self.min_val
@property
def w_e_derivative(self) -> xr.DataArray:
"""
Derivative of dataset from west-east
"""
if not self._is_memoized('_derivative'):
self._derivative = self._ds.differentiate('lon').mean(self._agg_dims)
self._derivative.attrs = self._ds.attrs
return self._derivative
@property
def ns_con_var(self) -> xr.DataArray:
"""
The North-South Contrast Variance averaged along the aggregate dimensions
"""
if not self._is_memoized('_ns_con_var_mean'):
self._ns_con_var = self._con_var('ns', self._ds)
if self._weighted:
self._ns_con_var.attrs['cell_measures'] = self._ds.attrs['cell_measures']
adims = self._agg_dims
if adims is None:
self._ns_con_var_mean = self._ns_con_var.cf.weighted('area').mean(skipna=True)
else:
self._ns_con_var_mean = self._ns_con_var.cf.weighted('area').mean(
dim=adims, skipna=True
)
else:
self._ns_con_var_mean = self._ns_con_var.mean(self._agg_dims)
self._ns_con_var_mean.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._ns_con_var_mean.attrs['units'] = f'{self._ds.units}$^2$'
return self._ns_con_var_mean
@property
def ew_con_var(self) -> xr.DataArray:
"""
The East-West Contrast Variance averaged along the aggregate dimensions
"""
if not self._is_memoized('_ew_con_var'):
self._ew_con_var = self._con_var('ew', self._ds)
if self._weighted:
self._ew_con_var.attrs['cell_measures'] = self._ds.attrs['cell_measures']
adims = self._agg_dims
if adims is None:
self._ew_con_var_mean = self._ew_con_var.cf.weighted('area').mean(skipna=True)
else:
self._ew_con_var_mean = self._ew_con_var.cf.weighted('area').mean(
dim=adims, skipna=True
)
else:
self._ew_con_var_mean = self._ew_con_var.mean(self._agg_dims)
self._ew_con_var_mean.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._ew_con_var_mean.attrs['units'] = f'{self._ds.units}$^2$'
return self._ew_con_var_mean
@property
def lat_autocorr(self) -> xr.DataArray:
"""
Autocorrelation: the correlation of a variable with itself shifted in the latitude dimension
"""
if not self._is_memoized('_lat_autocorr'):
dims = self._ds.dims
lat_dim_name = self._lat_dim_name
if lat_dim_name not in dims:
print('error: latitude not found for autocorrelation')
self._lat_autocorr = 0
else:
idx = dims.index(lat_dim_name)
sz = self._ds.sizes[lat_dim_name]
y_lat = np.roll(self._ds, -1, axis=idx)
x_lat = self._ds
# this one (longitude) should not wrap
yd_lat = np.delete(y_lat, sz - 1, axis=idx)
xd_lat = np.delete(x_lat, sz - 1, axis=idx)
y_lat_r = np.ravel(yd_lat)
x_lat_r = np.ravel(xd_lat)
aa = np.corrcoef(x_lat_r, y_lat_r)[0, 1]
self._lat_autocorr = aa
return self._lat_autocorr
@property
def lon_autocorr(self) -> xr.DataArray:
"""
Autocorrelation: the correlation of a variable with itself shifted in the longitude dimension
"""
if not self._is_memoized('_lon_autocorr'):
dims = self._ds.dims
lon_dim_name = self._lon_dim_name
if lon_dim_name not in dims:
print('error: longitude not found for autocorrelation')
self._lon_autocorr = 0
else:
idx = dims.index(lon_dim_name)
y_lon = np.roll(self._ds, -1, axis=idx)
x_lon = self._ds
y_lon_r = np.ravel(y_lon)
x_lon_r = np.ravel(x_lon)
aa = np.corrcoef(x_lon_r, y_lon_r)[0, 1]
self._lon_autocorr = aa
return self._lon_autocorr
@property
def lev_autocorr(self) -> xr.DataArray:
"""
Autocorrelation: the correlation of a variable with itself shifted in the vertical dimension
"""
if not self._is_memoized('_lev_autocorr'):
dims = self._ds.dims
vert_dim_name = self._vert_dim_name
if vert_dim_name not in dims:
print('error: vertical (lev) not found for autocorrelation')
self._lon_autocorr = 0
else:
idx = dims.index(vert_dim_name)
sz = self._ds.sizes[vert_dim_name]
# print(idx)
y = np.roll(self._ds, -1, axis=idx)
x = self._ds
# this one (vert level) should not wrap
yd = np.delete(y, sz - 1, axis=idx)
xd = np.delete(x, sz - 1, axis=idx)
yd_r = np.ravel(yd)
xd_r = np.ravel(xd)
aa = np.corrcoef(xd_r, yd_r)[0, 1]
self._lev_autocorr = aa
return self._lev_autocorr
@property
def entropy(self) -> xr.DataArray:
"""
An estimate for the entropy of the data (using gzip)
# lower is better (1.0 means random - no compression possible)
"""
if not self._is_memoized('_entropy'):
es = []
a1 = self._ds.data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if len(self._not_agg_dims) == 0:
# don't stack at all, just make a single multi_index group
cc = gzip.compress(a1, mtime=0)
dd = gzip.decompress(cc)
cl = len(cc)
dl = len(dd)
if dl > 0:
e = cl / dl
else:
e = 0.0
es.append(e)
else:
stack = a1.stack(multi_index=tuple(self._not_agg_dims))
for d, slice in stack.groupby('multi_index'):
cc = gzip.compress(slice.data)
dd = gzip.decompress(cc)
cl = len(cc)
dl = len(dd)
if dl > 0:
e = cl / dl
else:
e = 0.0
es.append(e)
if len(self._not_agg_dims) == 0:
self._entropy = xr.DataArray(es)
else:
self._entropy = xr.DataArray(es, dims=self._not_agg_dims)
self._entropy.attrs = self._ds.attrs
self._entropy.attrs['units'] = ''
return self._entropy
@property
def mean(self) -> xr.DataArray:
"""
The mean along the aggregate dimensions
"""
if not self._is_memoized('_mean'):
if self._weighted:
adims = self._agg_dims
if adims is None:
self._mean = self._ds.cf.weighted('area').mean(skipna=True)
else:
self._mean = self._ds.cf.weighted('area').mean(dim=adims, skipna=True)
else:
self._mean = self._ds.mean(self._agg_dims, skipna=True)
self._mean.attrs = self._ds.attrs
return self._mean
@property
def mean_abs(self) -> xr.DataArray:
"""
The mean of the absolute errors along the aggregate dimensions
"""
if not self._is_memoized('_mean_abs'):
if self._weighted:
adims = self._agg_dims
if adims is None:
self._mean_abs = abs(self._ds).cf.weighted('area').mean(skipna=True)
else:
self._mean_abs = abs(self._ds).cf.weighted('area').mean(dim=adims, skipna=True)
else:
self._mean_abs = abs(self._ds).mean(self._agg_dims, skipna=True)
self._mean_abs.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._mean_abs.attrs['units'] = f'{self._ds.units}'
return self._mean_abs
@property
def mean_squared(self) -> xr.DataArray:
"""
The absolute value of the mean along the aggregate dimensions
"""
if not self._is_memoized('_mean_squared'):
self._mean_squared = np.square(self.mean)
self.mean_squared.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self.mean_squared.attrs['units'] = f'{self._ds.units}$^2$'
return self._mean_squared
@property
def root_mean_squared(self) -> xr.DataArray:
"""
The absolute value of the mean along the aggregate dimensions
"""
if not self._is_memoized('_root_mean_squared_mean'):
self._squared = np.square(self._ds)
if self._weighted:
adims = self._agg_dims
if adims is None:
self._root_mean_squared = np.sqrt(
self._squared.cf.weighted('area').mean(skipna=True)
)
else:
self._root_mean_squared = np.sqrt(
self._squared.cf.weighted('area').mean(dim=adims, skipna=True)
)
else:
self._root_mean_squared = np.sqrt(self._squared.mean(self._agg_dims, skipna=True))
self._root_mean_squared.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._root_mean_squared.attrs['units'] = f'{self._ds.units}'
return self._root_mean_squared
@property
def sum(self) -> xr.DataArray:
if not self._is_memoized('_sum'):
if self._weighted:
self._sum = self._ds.sum(dim=self._agg_dims, skipna=True)
else:
self._sum = self._ds.sum(dim=self._agg_dims, skipna=True)
self._sum.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._sum.attrs['units'] = f'{self._ds.units}'
return self._sum
@property
def sum_squared(self) -> xr.DataArray:
if not self._is_memoized('_sum_squared'):
self._sum_squared = np.square(self._sum_squared)
self._sum_squared.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._sum_squared.attrs['units'] = f'{self._ds.units}$^2$'
return self._sum_squared
@property
def std(self) -> xr.DataArray:
"""
The standard deviation along the aggregate dimensions
"""
if not self._is_memoized('_std'):
if self._weighted:
adims = self._agg_dims
if self._ddof == 0:
# biased std
if adims is None:
self._std = np.sqrt(
((self._ds - self.mean) ** 2).cf.weighted('area').mean()
)
else:
self._std = np.sqrt(
((self._ds - self.mean) ** 2)
.cf.weighted('area')
.mean(dim=self._agg_dims)
)
elif adims is not None:
if 'lat' in adims:
# assume unbiased std (reliability weighted)
V1 = self._ds.coords['cell_area'].sum(dim='lat')[0]
V2 = np.square(self._ds.coords['cell_area']).sum(dim='lat')[0]
_biased_var = (
((self._ds - self.mean) ** 2).cf.weighted('area').mean(dim=adims)
)
self._std = np.sqrt(_biased_var * (1 / (1 - V2 / (V1**2))))
else:
# same as unweighted
self._std = self._ds.std(adims, ddof=self._ddof, skipna=True)
else:
# same as unweighted
self._std = self._ds.std(adims, ddof=self._ddof, skipna=True)
else:
self._std = self._ds.std(self._agg_dims, ddof=self._ddof, skipna=True)
self._std.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._std.attrs['units'] = ''
return self._std
@property
def standardized_mean(self) -> xr.DataArray:
"""
The mean at each point along the aggregate dimensions divided by the standard deviation
NOTE: will always be 0 if aggregating over all dimensions
"""
if not self._is_memoized('_standardized_mean'):
if self._grouping is None:
if self._weighted:
adims = self._agg_dims
if adims is None:
self._standardized_mean = (
self.mean - self._ds.cf.weighted('area').mean(skipna=True)
) / self.std
else:
self._standardized_mean = (
self.mean - self._ds.cf.weighted('area').mean(dim=adims, skipna=True)
) / self.std
else:
self._standardized_mean = (self.mean - self._ds.mean()) / self._ds.std(ddof=1)
else:
grouped = self.mean.groupby(self._grouping)
grouped_mean = grouped.mean()
self._standardized_mean = (
grouped_mean - grouped_mean.mean(skipna=True)
) / grouped_mean.std(ddof=1)
if hasattr(self._ds, 'units'):
self._standardized_mean.attrs['units'] = ''
return self._standardized_mean
@property
def variance(self) -> xr.DataArray:
"""
The variance along the aggregate dimensions
"""
if not self._is_memoized('_variance'):
if self._weighted:
self._variance = np.square(self.std)
else:
self._variance = self._ds.var(self._agg_dims, skipna=True)
self._variance.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._variance.attrs['units'] = f'{self._ds.units}$^2$'
return self._variance
@property
def pooled_variance_ratio(self) -> xr.DataArray:
"""
The pooled variance along the aggregate dimensions
"""
if not self._is_memoized('_pooled_variance_ratio'):
denom = self.pooled_variance.copy().fillna(0)
denom = np.where(denom == 0, 1e-12, denom)
self._pooled_variance_ratio = self.variance / denom
# self._pooled_variance_ratio = self.variance / self.pooled_variance
self._pooled_variance_ratio.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._pooled_variance_ratio.attrs['units'] = ''
return self._pooled_variance_ratio
@property
def num_positive(self) -> xr.DataArray:
"""
The probability that a point is positive
"""
if not self._is_memoized('_num_positive'):
if self._weighted:
self._num_positive = (self._ds > 0).sum(self._agg_dims)
else:
self._num_positive = (self._ds > 0).sum(self._agg_dims)
return self._num_positive
@property
def num_negative(self) -> xr.DataArray:
"""
The probability that a point is negative
"""
if not self._is_memoized('_num_negative'):
if self._weighted:
self._num_negative = (self._ds < 0).sum(self._agg_dims)
else:
self._num_negative = (self._ds < 0).sum(self._agg_dims)
return self._num_negative
@property
def num_zero(self) -> xr.DataArray:
"""
The probability that a point is zero
"""
if not self._is_memoized('_num_zero'):
if self._weighted:
self._num_zero = (self._ds == 0).sum(self._agg_dims)
else:
self._num_zero = (self._ds == 0).sum(self._agg_dims)
return self._num_zero
@property
def prob_positive(self) -> xr.DataArray:
"""
The probability that a point is positive
"""
if not self._is_memoized('_prob_positive'):
self._prob_positive = self.num_positive / self._frame_size
# print(self._frame_size)
self._prob_positive.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._prob_positive.attrs['units'] = ''
return self._prob_positive
@property
def prob_negative(self) -> xr.DataArray:
"""
The probability that a point is negative
"""
if not self._is_memoized('_prob_negative'):
self._prob_negative = self.num_negative / self._frame_size
self._prob_negative.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._prob_negative.attrs['units'] = ''
return self._prob_negative
@property
def odds_positive(self) -> xr.DataArray:
"""
The odds that a point is positive = prob_positive/(1-prob_positive)
"""
if not self._is_memoized('_odds_positive'):
if self._grouping is not None:
grouped = self.prob_positive.groupby(self._grouping)
grouped_mean = grouped.mean()
self._odds_positive = grouped_mean / (1 - grouped_mean)
else:
self._odds_positive = self.prob_positive / (1 - self.prob_positive)
self._odds_positive.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._odds_positive.attrs['units'] = ''
return self._odds_positive
@property
def zscore(self) -> xr.DataArray:
"""
The z-score of a point averaged along the aggregate dimensions under the null hypothesis that the true mean is zero.
NOTE: currently assumes we are aggregating along the time dimension so is only suitable for a spatial plot.
"""
if not self._is_memoized('_zscore'):
if self._time_dim_name is not None:
self._zscore = np.divide(
self.mean, self.std / np.sqrt(self._ds.sizes[self._time_dim_name])
)
self._zscore.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._zscore.attrs['units'] = ''
else:
self._zscore = 0
print('Warning: Zscore requires a time dimension')
return self._zscore
@property
def mae_day_max(self) -> xr.DataArray:
"""
The day of maximum mean absolute value at the point.
NOTE: only available in spatial and spatial comparison plots
"""
if not self._is_memoized('_mae_day_max'):
key = f'{self._time_dim_name}.dayofyear'
self._mae_day_max = 0
self._test = abs(self._ds).groupby(key).mean()
self._mae_day_max = self._test.idxmax(dim='dayofyear')
self._mae_day_max.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._mae_day_max.attrs['units'] = f'{self._ds.units}'
return self._mae_day_max
@property
def cdf(self) -> xr.DataArray:
"""
The empirical CDF of the dataset.
"""
if not self._is_memoized('_cdf'):
# ecfd = sm.distributions.ECDF(self._ds)
x = np.linspace(min(self._ds), max(self._ds))
self._cdf = sm.distributions.ECDF(self._ds)(x)
return self._cdf
@property
def quantile(self):
return self._quantile
@quantile.setter
def quantile(self, q):
self._quantile = q
@property
def quantile_value(self) -> xr.DataArray:
self._quantile_value = self._ds.quantile(self.quantile, dim=self._agg_dims)
self._quantile_value.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._quantile_value.attrs['units'] = ''
return self._quantile_value
@property
def max_abs(self) -> xr.DataArray:
if not self._is_memoized('_max_abs'):
self._max_abs = abs(self._ds).max(dim=self._agg_dims)
self._max_abs.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._max_abs.attrs['units'] = f'{self._ds.units}'
return self._max_abs
@property
def min_abs(self) -> xr.DataArray:
if not self._is_memoized('_min_abs'):
self._min_abs = abs(self._ds).min(dim=self._agg_dims)
self._min_abs.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._min_abs.attrs['units'] = f'{self._ds.units}'
return self._min_abs
@property
def min_abs_nonzero(self) -> xr.DataArray:
if not self._is_memoized('_min_abs_nonzero'):
mx = abs(self._ds).where(self._ds != 0)
self._min_abs_nonzero = mx.min(dim=self._agg_dims)
self._min_abs_nonzero.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._min_abs_nonzero.attrs['units'] = f'{self._ds.units}'
return self._min_abs_nonzero
@property
def max_val(self) -> xr.DataArray:
if not self._is_memoized('_max_val'):
self._max_val = self._ds.max(dim=self._agg_dims)
self._max_val.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._max_val.attrs['units'] = f'{self._ds.units}'
return self._max_val
@property
def min_val(self) -> xr.DataArray:
if not self._is_memoized('_min_val'):
self._min_val = self._ds.min(dim=self._agg_dims)
self._min_val.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._min_val.attrs['units'] = f'{self._ds.units}'
return self._min_val
@property
def min_val_nonzero(self) -> xr.DataArray:
if not self._is_memoized('_min_val_nonzero'):
mx = self._ds.where(self._ds != 0)
self._min_val_nonzero = mx.min(dim=self._agg_dims)
self._min_val_nonzero.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._min_val_nonzero.attrs['units'] = f'{self._ds.units}'
return self._min_val_nonzero
@property
def dyn_range(self) -> xr.DataArray:
if not self._is_memoized('_range'):
self._dyn_range = abs((self._ds).max() - (self._ds).min())
self._dyn_range.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._dyn_range.attrs['units'] = f'{self._ds.units}'
return self._dyn_range
def dec_to_binary(self, num):
b = bin(ctypes.c_uint32.from_buffer(ctypes.c_float(num)).value)
strip = b.lstrip('0b')
return strip.zfill(32)
def dec_to_binary_3(self, num):
a = time.perf_counter()
int32bits = np.asarray(num, dtype=np.float32).view(np.int32).item()
b = time.perf_counter() - a
print(b)
return '{:032b}'.format(int32bits)
# def get_adj_bit(self, bit_pos):
# return [bit_pos[0] - 1, bit_pos[1]]
def get_dict_list(self, data_array):
dict_list_H = []
# To do: should make this work for 64 also
N_BITS = 32
# sz = data_array.size
# Intialize
for i in range(N_BITS - 1):
new_dict = {'00': 0, '01': 0, '10': 0, '11': 0}
dict_list_H.append(new_dict)
# convert data array values to bits (so len(b) = sz)
b = np.empty(data_array.size, dtype=object)
i = 0
for y in np.nditer(np.asarray(data_array, dtype=np.float32).view(np.int32)):
b[i] = '{:032b}'.format(y)
i += 1
# go through each bit position (i) (except the last)
for i in range(N_BITS - 1):
count = 0
for y in range(1, len(b)): # go through each data point
if b[y - 1][i] in ('0', '1') and b[y][i] in ('0', '1'):
count += 1
dict_list_H[i][b[y - 1][i] + b[y][i]] += 1
# dict_list_H[i]['00']
# dict_list_H[i]['01']
# dict_list_H[i]['10']
# dict_list_H[i]['11']
# turn to fractions (probabilities)
if count == 0:
dict_list_H[i]['00'] = 0
dict_list_H[i]['01'] = 0
dict_list_H[i]['10'] = 0
dict_list_H[i]['11'] = 0
else:
dict_list_H[i]['00'] /= count
dict_list_H[i]['01'] /= count
dict_list_H[i]['10'] /= count
dict_list_H[i]['11'] /= count
return dict_list_H
def get_mutual_info(self, p00, p01, p10, p11):
p0 = p00 + p10 # current bit is 0
p1 = p11 + p01 # current bit is 1
p0_prev = p00 + p01 # prev bit was 0
p1_prev = p11 + p10 # prev bit was 1
# From (4) in paper
mutual_info = 0
if p00 > 0:
mutual_info += p00 * np.log2(p00 / p0_prev / p0)
if p11 > 0:
mutual_info += p11 * np.log2(p11 / p1_prev / p1)
if p01 > 0:
mutual_info += p01 * np.log2(p01 / p0_prev / p1)
if p10 > 0:
mutual_info += p10 * np.log2(p10 / p1_prev / p0)
return mutual_info
def get_real_info(self):
# first, flatten the data array by stacking all the dimensions and removing the coordinates
# like this: flattened_ds = self._ds.stack(flattened=['lat', 'lon', 'time']).reset_index('flattened')
# but over any number of dimensions
flattened_ds = self._ds.stack(flattened=self._ds.dims).reset_index('flattened')
dict_list_H = self.get_dict_list(flattened_ds)
# print(dict_list_H)
mutual_info_array = []
# here we go thru each bit position
for bit_pos_dict in dict_list_H:
p00 = bit_pos_dict['00']
p01 = bit_pos_dict['01']
p10 = bit_pos_dict['10']
p11 = bit_pos_dict['11']
mutual_info = self.get_mutual_info(p00, p01, p10, p11)
mutual_info_array.append(mutual_info)
mutual_info_array = np.array(mutual_info_array)
# print("MIA =", mutual_info_array)
return xr.DataArray(mutual_info_array)
@property
def real_information(self) -> xr.DataArray:
if not self._is_memoized('_real_information'):
self._real_information = self.get_real_info()
if hasattr(self._ds, 'units'):
self._real_information.attrs['units'] = f'{self._ds.units}'
return self._real_information
@property
def real_information_cutoff(self) -> xr.DataArray:
if not self._is_memoized('_real_information_cutoff'):
# get the first binary digit
self._real_information_cutoff = 0
self._captured_information = 0
# print(self.real_information)
normalized_information = self.real_information / self.real_information.sum()
while self._captured_information < 0.99:
self._captured_information += normalized_information[self._real_information_cutoff]
self._real_information_cutoff += 1
return self._real_information_cutoff
# else:
# return 0
@property
def lag1(self) -> xr.DataArray:
"""
The deseasonalized lag-1 autocorrelation value by day of year
NOTE: This calc returns an array of spatial values as the data set regardless of aggregate dimensions,
so can only be plotted in a spatial plot.
"""
if not self._is_memoized('_lag1'):
if 'dayofyear' in self._ds.attrs.keys():
key = f'{self._time_dim_name}.dayofyear'
else:
key = f'{self._time_dim_name}'
grouped = self._ds.groupby(key, squeeze=False)
if self._time_dim_name in self._ds.attrs.keys():
self._deseas_resid = grouped - grouped.mean(dim=self._time_dim_name)
else:
# note: not actually deseasonalized
self._deseas_resid = grouped.mean(dim=self._time_dim_name) - self._ds.mean()
time_length = self._deseas_resid.sizes[self._time_dim_name]
current = self._deseas_resid.head({self._time_dim_name: time_length - 1})
next_one = self._deseas_resid.shift({self._time_dim_name: -1}).head(
{self._time_dim_name: time_length - 1}
)
num = current.fillna(0).dot(next_one.fillna(0), dim=self._time_dim_name)
denom = current.fillna(0).dot(current.fillna(0), dim=self._time_dim_name)
# don't divide by zero :)
denom = np.where(denom == 0, 1e-12, denom)
self._lag1 = num / denom
self._lag1.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._lag1.attrs['units'] = ''
return self._lag1
@property
def lag1_first_difference(self) -> xr.DataArray:
"""
The deseasonalized lag-1 autocorrelation value of the first difference of the data by day of year
NOTE: This calc returns an array of spatial values as the data set regardless of aggregate dimensions,
so can only be plotted in a spatial plot.
"""
if not self._is_memoized('_lag1_first_difference'):
if 'dayofyear' in self._ds.attrs.keys():
key = f'{self._time_dim_name}.dayofyear'
else:
key = f'{self._time_dim_name}'
grouped = self._ds.groupby(key, squeeze=False)
if self._time_dim_name in self._ds.attrs.keys():
self._deseas_resid = grouped - grouped.mean(dim=self._time_dim_name)
else:
# note: not actually deseasonalized
self._deseas_resid = grouped.mean(dim=self._time_dim_name) - self._ds.mean()
time_length = self._deseas_resid.sizes[self._time_dim_name]
current = self._deseas_resid.head({self._time_dim_name: time_length - 1})
next_one = self._deseas_resid.shift({self._time_dim_name: -1}).head(
{self._time_dim_name: time_length - 1}
)
first_difference = next_one - current
first_difference_current = first_difference.head({self._time_dim_name: time_length - 1})
first_difference_next = first_difference.shift({self._time_dim_name: -1}).head(
{self._time_dim_name: time_length - 1}
)
num = (first_difference_current * first_difference_next).sum(
dim=[self._time_dim_name], skipna=True
)
denom = first_difference_current.dot(first_difference_current, dim=self._time_dim_name)
# don't divide by zero
denom = np.where(denom == 0, 1e-12, denom)
self._lag1_first_difference = num / denom
self._lag1_first_difference.attrs = self._ds.attrs
if hasattr(self._ds, 'units'):
self._lag1_first_difference.attrs['units'] = ''
return self._lag1_first_difference
@property
def fft2(self) -> xr.DataArray:
if not self._is_memoized('_fft2'):
self._fft2 = xr.DataArray(np.abs(np.fft.fft2(self._ds)))
self._fft2.attrs = self._ds.attrs
# if hasattr(self._ds, 'units'):
# self._fft2.attrs['units'] = f'{self._ds.units}'
self._fft2 = self._fft2.rename(
{
'dim_0': self._time_dim_name,
'dim_1': self._lat_dim_name,
'dim_2': self._lon_dim_name,
}
)
self._fft2 = self._fft2.assign_coords(
{
self._lat_dim_name: self._ds.coords[self._lat_dim_name],
self._lon_dim_name: self._ds.coords[self._lon_dim_name],
}
)
return self._fft2 / np.mean(self._fft2)
@property
def fftratio(self) -> xr.DataArray:
if not self._is_memoized('_fftratio'):
# get longitude length: self.fft2.shape[1]
# select second latitude coordinate: self.fft2.isel({"lat":2})
# select range: self.fft2.where(self.fft2.lat > 1, drop=True)
# multirange: self.fft2.where(self.fft2.lat > 1, drop=True).where(self.fft2.lon == 3, drop=True)
top_val = (
self.fft2.where(
self.fft2.latitude > self.fft2.latitude[int((self.fft2.shape[2] - 1) * 3 / 4)],
drop=True,
)
.max(dim=self._lon_dim_name)
.mean(dim=self._lat_dim_name)
)
bottom_val = (
self.fft2.where(
np.logical_and(
self.fft2.latitude > self.fft2.latitude[int((self.fft2.shape[2] - 1) / 2)],
self.fft2.latitude
<= self.fft2.latitude[int((self.fft2.shape[2] - 1) * 3 / 4)],
),
drop=True,
)
.max(dim=self._lon_dim_name)
.mean(dim=self._lat_dim_name)
)
# greater than 1 = more higher frequencies than lower frequencies
self._fftratio = top_val / bottom_val
return self._fftratio
@property
def fftmax(self) -> xr.DataArray:
if not self._is_memoized('_fftmax'):
# get longitude length: self.fft2.shape[1]
# select second latitude coordinate: self.fft2.isel({"lat":2})
# select range: self.fft2.where(self.fft2.lat > 1, drop=True)
# multirange: self.fft2.where(self.fft2.lat > 1, drop=True).where(self.fft2.lon == 3, drop=True)
top_val = (
self.fft2.where(
self.fft2.lat > self.fft2.lat[int((self.fft2.shape[1] - 1) * 15 / 16)],
drop=True,
)
.max(dim='lon')
.max()
)
bottom_val = (
self.fft2.where(
np.logical_and(
self.fft2.lat > self.fft2.lat[int((self.fft2.shape[1] - 1) / 2)],
self.fft2.lat <= self.fft2.lat[int((self.fft2.shape[1] - 1) * 9 / 16)],
),
drop=True,
)
.max(dim='lon')
.max()
)
# greater than 1 = more higher frequencies than lower frequencies
self._fftmax = top_val / bottom_val
return self._fftmax
@property
def vfftratio(self) -> xr.DataArray:
if not self._is_memoized('_fftratio'):
# get longitude length: self.fft2.shape[1]
# select second latitude coordinate: self.fft2.isel({"lat":2})
# select range: self.fft2.where(self.fft2.lat > 1, drop=True)
# multirange: self.fft2.where(self.fft2.lat > 1, drop=True).where(self.fft2.lon == 3, drop=True)
top_val = (
self.fft2.where(
self.fft2.lon > self.fft2.lon[int((self.fft2.shape[2] - 1) * 3 / 4)], drop=True
)
.max(dim='lat')
.mean()
)
bottom_val = (
self.fft2.where(
np.logical_and(
self.fft2.lon > self.fft2.lon[int((self.fft2.shape[2] - 1) / 2)],
self.fft2.lon <= self.fft2.lon[int((self.fft2.shape[2] - 1) * 3 / 4)],
),
drop=True,
)
.max(dim='lat')
.mean()
)
# greater than 1 = more higher frequencies than lower frequencies
self._fftratio = top_val / bottom_val
return self._fftratio
@property
def vfftmax(self) -> xr.DataArray:
if not self._is_memoized('_fftmax'):
# get longitude length: self.fft2.shape[1]
# select second latitude coordinate: self.fft2.isel({"lat":2})
# select range: self.fft2.where(self.fft2.lat > 1, drop=True)
# multirange: self.fft2.where(self.fft2.lat > 1, drop=True).where(self.fft2.lon == 3, drop=True)
top_val = (
self.fft2.where(
self.fft2.lon > self.fft2.lon[int((self.fft2.shape[2] - 1) * 15 / 16)],
drop=True,
)
.max(dim='lat')
.max()
)
bottom_val = (
self.fft2.where(
np.logical_and(
self.fft2.lon > self.fft2.lon[int((self.fft2.shape[2] - 1) / 2)],
self.fft2.lon <= self.fft2.lon[int((self.fft2.shape[2] - 1) * 9 / 16)],
),
drop=True,
)
.max(dim='lat')
.max()
)
# greater than 1 = more higher frequencies than lower frequencies
self._fftmax = top_val / bottom_val
return self._fftmax
@property
def stft(self) -> xr.DataArray:
if not self._is_memoized('_stft'):
f, t, self._stft = np.abs(
stft(
self.mean.isel({'lat': 100}).squeeze(), nperseg=1, nfft=382, detrend='constant'
)
)
self._stft = xr.DataArray(self._stft)
self._stft.attrs = self._ds.attrs
# if hasattr(self._ds, 'units'):
# self._stft.attrs['units'] = f'{self._ds.units}'
self._stft = self._stft.rename({'dim_0': 'lat', 'dim_1': 'lon'})
self._stft = self._stft.assign_coords(
{'lat': self._ds.coords['lat'], 'lon': self._ds.coords['lon']}
)
return self._stft
@property
def annual_harmonic_relative_ratio(self) -> xr.DataArray:
"""
The annual harmonic relative to the average periodogram value
in a neighborhood of 50 frequencies around the annual frequency
NOTE: This assumes the values along the "time" dimension are equally spaced.
NOTE: This calc returns a lat-lon array regardless of aggregate dimensions, so can only be used in a spatial plot.
"""
if not self._is_memoized('_annual_harmonic_relative_ratio'):
# drop time coordinate labels or else it will try to parse them as numbers to check spacing and fail
new_index = [i for i in range(0, self._ds[self._time_dim_name].size)]
new_ds = self._ds.assign_coords({self._time_dim_name: new_index})
lon_coord_name = self._lon_coord_name
lat_coord_name = self._lat_coord_name
# rechunk to a single time chunk
new_ds2 = new_ds.chunk({self._time_dim_name: new_ds[self._time_dim_name].size})
DF = dft(new_ds2, dim=[self._time_dim_name], detrend='constant')
# the above does not preserve the lat/lon attributes
DF[lon_coord_name].attrs = new_ds[lon_coord_name].attrs
DF[lat_coord_name].attrs = new_ds[lat_coord_name].attrs
DF.attrs = new_ds.attrs
S = np.real(DF * np.conj(DF) / self._ds.sizes[self._time_dim_name])
S_annual = S.isel(
freq_time=int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
) # annual power
neighborhood = (
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
- 25,
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
+ 25,
)
S_mean = xr.concat(
[
S.isel(
freq_time=slice(
max(0, neighborhood[0]),
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
- 1,
)
),
S.isel(
freq_time=slice(
int(self._ds.sizes[self._time_dim_name] / 2)
+ int(self._ds.sizes[self._time_dim_name] / 365)
+ 1,
neighborhood[1],
)
),
],
dim='freq_time',
).mean(dim='freq_time')
ratio = S_annual / S_mean
# ratio.cf.describe()
self._annual_harmonic_relative_ratio = ratio
if hasattr(self._ds, 'units'):
self._annual_harmonic_relative_ratio.attrs['units'] = ''
return self._annual_harmonic_relative_ratio
@property
def zscore_cutoff(self) -> np.ndarray:
"""
The Z-Score cutoff for a point to be considered significant
"""
if not self._is_memoized('_zscore_cutoff'):
pvals = 2 * (1 - ss.norm.cdf(np.abs(self.zscore)))
if isinstance(pvals, np.float64):
pvals_array = np.array(pvals)
sorted_pvals = pvals_array
else:
pvals_array = pvals
sorted_pvals = np.sort(pvals_array).flatten()
fdr_zscore = 0.01
p = np.argwhere(
sorted_pvals <= fdr_zscore * np.arange(1, pvals_array.size + 1) / pvals_array.size
)
pval_cutoff = np.empty(0)
if not len(p) == 0:
pval_cutoff = sorted_pvals[p[len(p) - 1]]
if not (pval_cutoff.size == 0):
zscore_cutoff = ss.norm.ppf(1 - pval_cutoff)
else:
zscore_cutoff = 'na'
self._zscore_cutoff = zscore_cutoff
return self._zscore_cutoff
@property
def annual_harmonic_relative_ratio_pct_sig(self) -> np.ndarray:
"""
The percentage of points past the significance cutoff (p value <= 0.01) for the
annual harmonic relative to the average periodogram value
in a neighborhood of 50 frequencies around the annual frequency
"""
pvals = 1 - ss.f.cdf(self.annual_harmonic_relative_ratio, 2, 100)
sorted_pvals = np.sort(pvals)
if len(sorted_pvals[sorted_pvals <= 0.01]) == 0:
return 0
sig_cutoff = ss.f.ppf(1 - max(sorted_pvals[sorted_pvals <= 0.01]), 2, 50)
pct_sig = 100 * np.mean(self.annual_harmonic_relative_ratio > sig_cutoff)
return pct_sig
@property
def zscore_percent_significant(self) -> np.ndarray:
"""
The percent of points where the zscore is considered significant
"""
if not self._is_memoized('_zscore_percent_significant'):
pvals = 2 * (1 - ss.norm.cdf(np.abs(self.zscore)))
if isinstance(pvals, np.float64):
pvals_array = np.array(pvals)
sorted_pvals = pvals_array
else:
pvals_array = pvals
sorted_pvals = np.sort(pvals_array).flatten()
fdr_zscore = 0.01
p = np.argwhere(sorted_pvals <= fdr_zscore * np.arange(1, pvals.size + 1) / pvals.size)
if p.size > 0:
pval_cutoff = sorted_pvals[p[len(p) - 1]]
if not (pval_cutoff.size == 0):
sig_locs = np.argwhere(pvals <= pval_cutoff)
percent_sig = 100 * np.size(sig_locs, 0) / pvals.size
else:
percent_sig = 0
else:
percent_sig = 0
self._zscore_percent_significant = percent_sig
return self._zscore_percent_significant
def get_calc_ds(self, calc_name: str, var_name: str) -> xr.Dataset:
da = self.get_calc(calc_name)
ds = da.squeeze().to_dataset(name=var_name, promote_attrs=True)
ds.attrs['data_type'] = da.data_type
return ds
[docs]
def get_calc(self, name: str, q: Optional[int] = 0.5, grouping: Optional[str] = None, ddof=1):
"""
Gets a calc aggregated across one or more dimensions of the dataset
Parameters
==========
name : str
The name of the calc (must be identical to a property name)
q: float, optional
(default 0.5)
Returns
=======
out : xarray.DataArray
A DataArray of the same size and dimensions the original dataarray,
minus those dimensions that were aggregated across.
"""
if isinstance(name, str):
if name == 'ns_con_var':
return self.ns_con_var
if name == 'ew_con_var':
return self.ew_con_var
if name == 'fft2':
return self.fft2
if name == 'stft':
return self.stft
if name == 'w_e_first_differences':
return self.w_e_first_differences
if name == 'n_s_first_differences':
return self.n_s_first_differences
if name == 'w_e_first_differences_max':
return self.w_e_first_differences_max
if name == 'n_s_first_differences_max':
return self.n_s_first_differences_max
if name == 'w_e_derivative':
return self.w_e_derivative
if name == 'mean':
return self.mean
if name == 'std':
self._ddof = ddof
return self.std
if name == 'standardized_mean':
self._grouping = grouping
return self.standardized_mean
if name == 'variance':
return self.variance
if name == 'pooled_var_ratio':
return self.pooled_variance_ratio
if name == 'prob_positive':
return self.prob_positive
if name == 'prob_negative':
return self.prob_negative
if name == 'num_positive':
return self.num_positive
if name == 'num_negative':
return self.num_negative
if name == 'num_zero':
return self.num_zero
if name == 'odds_positive':
self._grouping = grouping
return self.odds_positive
if name == 'zscore':
return self.zscore
if name == 'mae_day_max':
return self.mae_day_max
if name == 'mean_abs':
return self.mean_abs
if name == 'mean_squared':
return self.mean_squared
if name == 'range':
return self.range
if name == 'rms':
return self.root_mean_squared
if name == 'sum':
return self.sum
if name == 'sum_squared':
return self.sum_squared
if name == 'ann_harmonic_ratio':
return self.annual_harmonic_relative_ratio
if name == 'quantile':
self.quantile = q
return self.quantile_value
if name == 'lag1':
return self.lag1
if name == 'lag1_first_difference':
return self.lag1_first_difference
if name == 'max_abs':
return self.max_abs
if name == 'min_abs_nonzero':
return self.min_abs_nonzero
if name == 'min_abs':
return self.min_abs
if name == 'max_val':
return self.max_val
if name == 'min_val':
return self.min_val
if name == 'min_val_nonzero':
return self.min_val_nonzero
if name == 'cdf':
return self.cdf
if name == 'fftratio':
return self.fftratio
if name == 'fftmax':
return self.fftmax
if name == 'vfftratio':
return self.vfftratio
if name == 'vfftmax':
return self.vfftmax
if name == 'ds':
return self._ds
if name == 'magnitude_diff_ew':
return self.magnitude_diff_ew
if name == 'magnitude_diff_ns':
return self.magnitude_diff_ns
if name == 'real_information':
return self.real_information
if name == 'magnitude_range':
return self.magnitude_range
raise ValueError(f'there is no calc with the name: {name}.')
else:
raise TypeError('name must be a string.')
[docs]
def get_single_calc(self, name: str):
"""
Gets a calc consisting of a single float value
Parameters
==========
name : str
the name of the calc (must be identical to a property name)
Returns
=======
out : float
The calc value
"""
if isinstance(name, str):
if name == 'real_information_cutoff':
return self.real_information_cutoff
if name == 'zscore_cutoff':
return self.zscore_cutoff
if name == 'zscore_percent_significant':
return self.zscore_percent_significant
if name == 'lat_autocorr':
return self.lat_autocorr
if name == 'lon_autocorr':
return self.lon_autocorr
if name == 'lev_autocorr':
return self.lev_autocorr
if name == 'entropy':
return self.entropy
if name == 'range':
return self.dyn_range
if name == 'pooled_variance':
return self.pooled_variance
if name == 'annual_harmonic_relative_ratio_pct_sig':
return self.annual_harmonic_relative_ratio_pct_sig
if name == 'percent_unique':
return self.percent_unique
if name == 'most_repeated':
return self.most_repeated
if name == 'most_repeated_percent':
return self.most_repeated_percent
if name == 'fftratio':
return self.fftratio
if name == 'fftmax':
return self.fftmax
if name == 'vfftratio':
return self.vfftratio
if name == 'vfftmax':
return self.vfftmax
raise ValueError(f'there is no calcs with the name: {name}.')
else:
raise TypeError('name must be a string.')
[docs]
class Diffcalcs:
"""
This class contains calcs on the overall dataset that require more than one input dataset to compute
"""
def __init__(
self,
ds1: xr.DataArray,
ds2: xr.DataArray,
data_type: str,
aggregate_dims: Optional[list] = None,
spre_tol: float = 1.0e-4,
k1: float = 0.01,
k2: float = 0.03,
**calcs_kwargs,
) -> None:
if isinstance(ds1, xr.DataArray) and isinstance(ds2, xr.DataArray):
# Datasets
self._ds1 = ds1
self._ds2 = ds2
else:
raise TypeError(
f'ds must be of type xarray.DataArray. Type(s): {str(type(ds1))} {str(type(ds2))}'
)
self._calcs1 = Datasetcalcs(self._ds1, data_type, aggregate_dims, **calcs_kwargs)
self._calcs2 = Datasetcalcs(self._ds2, data_type, aggregate_dims, **calcs_kwargs)
self._data_type = data_type
self._aggregate_dims = aggregate_dims
self._pcc = None
self._covariance = None
self._ks_p_value = None
self._n_rms = None
self._n_emax = None
self._spatial_rel_error = None
self._ssim_value = None # uses images
self._ssim_value_fp_orig = (
None # "straightforward" version for floating points - not recommended
)
self._ssim_value_fp_fast = None # faster Data SSIM - the default
self._ssim_value_fp_fast_exp = None # experimenting
self._ssim_value_fp_slow = None # slower non-matrix version of DSSIM - for experimenting
self._max_spatial_rel_error = None
self._ssim_mat_fp = None
self._ssim_mat = None
self._ssim_mat_fp_orig = None
self._spre_tol = spre_tol
self._ssim_levs = None
self._k1 = k1
self._k2 = k2
self._quant = 256
@property
def spre_tol(self):
return self._spre_tol
@spre_tol.setter
def spre_tol(self, t):
self._spre_tol = t
@property
def k1(self):
return self._k1
@k1.setter
def k1(self, t):
self._k1 = t
@property
def k2(self):
return self._k2
@k2.setter
def k2(self, t):
self._k2 = t
@property
def quant(self):
return self._quant
@quant.setter
def quant(self, t):
self._quant = t
def _is_memoized(self, calc_name: str) -> bool:
return hasattr(self, calc_name) and (self.__getattribute__(calc_name) is not None)
# n is the box width (1D)
# sigma is the radius for the gaussian
def _oned_gauss(self, n, sigma):
r = range(-int(n / 2), int(n / 2) + 1)
return [(1 / (sigma * sqrt(2 * pi))) * exp(-float(x) ** 2 / (2 * sigma**2)) for x in r]
# this adjusts the boundary area after filtering with astropy
# to account for the fact that we want to divide only by the weight in
# the domain at the edges (this is used in fast2)
def _filter_adjust_edges(self, mydata, kernel, k):
k_sum = kernel.array.sum()
X = mydata.shape[0]
Y = mydata.shape[1]
# R and L rectangles and T and B rectangles
for j in range(k):
kk = kernel.array[:, 0 : k + j + 1].sum()
scale = k_sum / kk
# L and R
mydata[k : X - k, j] = mydata[k : X - k, j] * scale
mydata[k : X - k, Y - j - 1] = mydata[k : X - k, Y - j - 1] * scale
# T and B
mydata[j, k : Y - k] = mydata[j, k : Y - k] * scale
mydata[X - j - 1, k : Y - k] = mydata[X - j - 1, k : Y - k] * scale
# corners
for i in range(k):
for j in range(k):
kk = kernel.array[0 : k + i + 1, 0 : k + j + 1].sum()
scale = k_sum / kk
# top left
mydata[i, j] = mydata[i, j] * scale
# top right
mydata[i, Y - j - 1] = mydata[i, Y - j - 1] * scale
# bottom left
mydata[X - i - 1, j] = mydata[X - i - 1, j] * scale
# bottom right
mydata[X - i - 1, Y - j - 1] = mydata[X - i - 1, Y - j - 1] * scale
def plot_ssim_mat(self, return_mat=False, ssim_type='ssim_fp'):
if ssim_type == 'orig':
if self._ssim_value is None:
self.ssim_value
mats = self._ssim_mat
elif ssim_type == 'ssim_fp_orig':
if self._ssim_value_fp_orig is None:
self.ssim_value_fp_orig
mats = self._ssim_mat_fp_orig
else:
if self._ssim_value_fp_fast is None:
self.ssim_value_fp_fast
mats = self._ssim_mat_fp
num = len(mats)
meana = np.zeros(num)
for i in range(num):
meana[i] = np.nanmean(mats[i])
# for 3D which is the min level
min_meana = meana.min()
min_lev = meana.argmin()
# smallest value at that level
min_lev_val = np.nanmin(mats[min_lev])
ind = np.unravel_index(np.nanargmin(mats[min_lev], axis=None), mats[min_lev].shape)
plt.imshow(mats[min_lev], interpolation='none', vmax=1.0, cmap='bone')
plt.colorbar(orientation='horizontal')
if num == 1:
mytitle = f'ssim val = {min_meana:.4f}, min = {min_lev_val:.4f} at {ind}'
else:
mytitle = (
f'ssim val = {min_meana:.4f}, min = {min_lev_val:.4f} at {ind} on lev={min_lev}'
)
plt.title(mytitle)
# plt.show()
if return_mat:
return mats
@property
def covariance(self) -> xr.DataArray:
"""
The covariance between the two datasets
"""
if not self._is_memoized('_covariance'):
# need to use unweighted means
c1_mean = self._calcs1.get_calc('ds').mean(dim=self._aggregate_dims, skipna=True)
c2_mean = self._calcs2.get_calc('ds').mean(dim=self._aggregate_dims, skipna=True)
self._covariance = (
(self._calcs2.get_calc('ds') - c2_mean) * (self._calcs1.get_calc('ds') - c1_mean)
).mean(dim=self._aggregate_dims)
return self._covariance
# @property
# def ks_p_value(self):
# """
# The Kolmogorov-Smirnov p-value
# """
# # Note: ravel() forces a compute for dask, but ks test in scipy can't
# # work with uncomputed dask arrays
# if not self._is_memoized('_ks_p_value'):
# d1_p = (np.ravel(self._ds1)).astype('float64')
# d2_p = (np.ravel(self._ds2)).astype('float64')
# self._ks_p_value = np.asanyarray(ss.ks_2samp(d2_p, d1_p))
# return self._ks_p_value[1]
@property
def ks_p_value(self):
"""
The Kolmogorov-Smirnov p-value, calculated for slices of the data defined by specified dimensions.
:param dim: Dimensions over which to apply the KS test. Can be a string or a list of strings.
"""
if not self._is_memoized('_ks_p_value'):
# Apply the KS test across the specified dimensions
# This will create a DataArray of p-values
if self._aggregate_dims is None:
lat_n = self._ds1.cf['latitude'].name
lon_n = self._ds2.cf['longitude'].name
core_d = [[lat_n, lon_n], [lat_n, lon_n]]
else:
core_d = [self._aggregate_dims, self._aggregate_dims]
# print(core_d)
self._ks_p_value = xr.apply_ufunc(
lambda x, y: ss.ks_2samp(x.ravel(), y.ravel())[1],
self._ds1,
self._ds2,
input_core_dims=core_d,
dask='parallelized',
dask_gufunc_kwargs={'allow_rechunk': True},
vectorize=True,
)
return self._ks_p_value
@property
def pearson_correlation_coefficient(self):
"""
returns the pearson correlation coefficient between the two datasets
"""
if not self._is_memoized('_pearson_correlation_coefficient'):
# Calculate standard deviation over the specified dimensions
c1_std = self._calcs1.get_calc('ds').std(dim=self._aggregate_dims, skipna=True)
c2_std = self._calcs2.get_calc('ds').std(dim=self._aggregate_dims, skipna=True)
# Handle cases where standard deviations are zero
zero_std = (c1_std == 0) | (c2_std == 0)
self._pcc = xr.where(zero_std, -1.0, self.covariance / c1_std / c2_std)
return self._pcc
@property
def normalized_max_pointwise_error(self):
"""
The absolute value of the maximum pointwise difference, normalized
by the range of values for the first set
"""
if not self._is_memoized('_normalized_max_pointwise_error'):
tt = abs((self._calcs1.get_calc('ds') - self._calcs2.get_calc('ds')).max())
self._n_emax = tt / self._calcs1.dyn_range
return float(self._n_emax)
@property
def normalized_root_mean_squared(self):
"""
The absolute value of the mean along the aggregate dimensions, normalized
by the range of values for the first set
"""
if not self._is_memoized('_normalized_root_mean_squared'):
tt = np.sqrt(
np.square(self._calcs1.get_calc('ds') - self._calcs2.get_calc('ds')).mean(
dim=self._aggregate_dims
)
)
self._n_rms = tt / self._calcs1.dyn_range
return float(self._n_rms)
# @property
# def spatial_rel_error(self):
# """
# At each grid point, we compute the relative error. Then we report the percentage of grid point whose
# relative error is above the specified tolerance (1e-4 by default).
# """
#
# # We don't check for memoization here in case the spre_tol has changed
# # since the last time it has been called
# sp_tol = self._spre_tol
# # unraveling converts the dask array to numpy, but then
# # we can assign the 1.0 and avoid zero (couldn't figure another way)
# t1 = np.ravel(self._calcs1.get_calc('ds'))
# t2 = np.ravel(self._calcs2.get_calc('ds'))
#
# # check for zeros in t1 (if zero then change to 1 - which
# # does an absolute error at that point)
# z = (np.where(abs(t1) == 0))[0]
# if z.size > 0:
# t1_denom = np.copy(t1)
# t1_denom[z] = 1.0
# else:
# t1_denom = t1
#
# # we don't want to use nan
# # (ocassionally in cam data - often in ocn)
# m_t2 = np.ma.masked_invalid(t2).compressed()
# m_t1 = np.ma.masked_invalid(t1).compressed()
#
# if m_t2.shape != m_t1.shape:
# print('Warning: Spatial error not calculated with differing numbers of Nans')
# self._spatial_rel_error = 0
# self._max_spatial_rel_error = 0
# else:
#
# if z.size > 0:
# m_t1_denom = np.ma.masked_invalid(t1_denom).compressed()
# else:
# m_t1_denom = m_t1
#
# m_tt = m_t1 - m_t2
# m_tt = m_tt / m_t1_denom
#
# # find the max spatial error also if None
# if self._max_spatial_rel_error is None:
# max_spre = np.max(m_tt)
# self._max_spatial_rel_error = max_spre
#
# # percentage greater than the tolerance
# a = len(m_tt[abs(m_tt) > sp_tol])
# sz = m_tt.shape[0]
#
# self._spatial_rel_error = (a / sz) * 100
#
# return float(self._spatial_rel_error)
@property
def spatial_rel_error(self):
"""
At each grid point, we compute the relative error. Then we report the percentage of grid points whose
relative error is above the specified tolerance (1e-4 by default), optionally calculated over specified dimensions.
:param dim: Dimensions over which to calculate the spatial relative error. Can be a string or a list of strings.
"""
sp_tol = self._spre_tol
t1 = self._calcs1.get_calc('ds')
t2 = self._calcs2.get_calc('ds')
# Replace zeros in t1 with 1.0 (to avoid division by zero)
t1_denom = xr.where(t1 == 0, 1.0, t1)
# Compute the relative error
rel_error = xr.where(t1 != 0, abs(t1 - t2) / abs(t1_denom), abs(t1 - t2))
# Aggregate the relative error over the specified dimensions
# Here we calculate the fraction of data points with error above tolerance in each slice
if self._aggregate_dims is not None:
# Mask invalid values
rel_error = rel_error.where(rel_error.notnull(), 0)
# Calculate the fraction of points above the tolerance for each slice
error_fraction = (rel_error > sp_tol).mean(dim=self._aggregate_dims) * 100
else:
# Global calculation (as in the original function)
valid_points = rel_error.count()
error_points = rel_error.where(rel_error > sp_tol).count()
error_fraction = (error_points / valid_points) * 100
return error_fraction
@property
def max_spatial_rel_error(self):
"""
We compute the relative error at each grid point and return the maximun.
"""
# this is also computed as part of the spatial rel error
if not self._is_memoized('_max_spatial_rel_error'):
t1 = np.ravel(self._calcs1.get_calc('ds'))
t2 = np.ravel(self._calcs2.get_calc('ds'))
# check for zeros in t1 (if zero then change to 1 - which
# does an absolute error at that point)
z = (np.where(abs(t1) == 0))[0]
if z.size > 0:
t1_denom = np.copy(t1)
t1_denom[z] = 1.0
else:
t1_denom = t1
# we don't want to use nan
# (occassionally in cam data - often in ocn)
m_t2 = np.ma.masked_invalid(t2).compressed()
m_t1 = np.ma.masked_invalid(t1).compressed()
if z.size > 0:
m_t1_denom = np.ma.masked_invalid(t1_denom).compressed()
else:
m_t1_denom = m_t1
m_tt = m_t1 - m_t2
m_tt = m_tt / m_t1_denom
max_spre = np.max(abs(m_tt))
self._max_spatial_rel_error = max_spre
return float(self._max_spatial_rel_error)
@property
def ssim_value(self):
"""
We compute the SSIM (structural similarity index) on the visualization of the spatial data.
This creates two plots and uses the standard SSIM.
"""
k1 = self._k1
k2 = self._k2
if True:
# Prevent showing stuff
backend_ = mpl.get_backend()
mpl.use('Agg')
d1 = self._calcs1.get_calc('ds')
d2 = self._calcs2.get_calc('ds')
lat1 = d1[self._calcs1._lat_coord_name]
lat2 = d2[self._calcs2._lat_coord_name]
lon1 = d1[self._calcs1._lon_coord_name]
lon2 = d2[self._calcs2._lon_coord_name]
# latdim = d1.cf[self._calcs1._lon_coord_name].ndim
central = 0.0 # might make this a parameter later
if self._data_type == 'pop':
central = 300.0
# make periodic for pop or cam-fv
if self._data_type == 'pop':
cy_lon1 = np.hstack((lon1, lon1[:, 0:1]))
cy_lon2 = np.hstack((lon2, lon2[:, 0:1]))
cy_lat1 = np.hstack((lat1, lat1[:, 0:1]))
cy_lat2 = np.hstack((lat2, lat2[:, 0:1]))
cy1 = add_cyclic_point(d1)
cy2 = add_cyclic_point(d2)
no_inf_d1 = np.nan_to_num(cy1, nan=np.nan)
no_inf_d2 = np.nan_to_num(cy2, nan=np.nan)
elif self._data_type == 'cam-fv': # cam-fv
cy1, cy_lon1 = add_cyclic_point(d1, coord=lon1)
cy2, cy_lon2 = add_cyclic_point(d2, coord=lon2)
cy_lat1 = lat1
cy_lat2 = lat2
no_inf_d1 = np.nan_to_num(cy1, nan=np.nan)
no_inf_d2 = np.nan_to_num(cy2, nan=np.nan)
elif self._data_type == 'wrf':
no_inf_d1 = np.nan_to_num(d1, nan=np.nan)
no_inf_d2 = np.nan_to_num(d2, nan=np.nan)
# is it 3D? must do each level
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
color_min = min(
np.min(d1.where(d1 != -np.inf)).values.min(),
np.min(d2.where(d2 != -np.inf)).values.min(),
)
color_max = max(
np.max(d1.where(d1 != np.inf)).values.max(),
np.max(d2.where(d2 != np.inf)).values.max(),
)
mymap = copy.copy(plt.cm.get_cmap(self.color))
mymap.set_under(color='black')
mymap.set_over(color='white')
mymap.set_bad(alpha=0)
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
for this_lev in range(nlevels):
with tempfile.TemporaryDirectory() as tmpdirname:
filename_1, filename_2 = (
f'{tmpdirname}/t_ssim1.png',
f'{tmpdirname}/t_ssim2.png',
)
if nlevels == 1:
this_no_inf_d1 = no_inf_d1
this_no_inf_d2 = no_inf_d2
else:
# this assumes 3D level is first (TO DO: verify)
this_no_inf_d1 = no_inf_d1[this_lev, :, :]
this_no_inf_d2 = no_inf_d2[this_lev, :, :]
fig = plt.figure(dpi=300, figsize=(9, 2.5))
ax1 = plt.subplot(1, 2, 1, projection=ccrs.Robinson(central_longitude=central))
ax1.set_facecolor('#39ff14')
ax1.pcolormesh(
cy_lon1,
cy_lat1,
this_no_inf_d1,
transform=ccrs.PlateCarree(),
cmap=mymap,
vmin=color_min,
vmax=color_max,
)
ax1.set_global()
ax1.coastlines(linewidth=0.5)
ax1.axis('off')
plt.margins(0, 0)
extent1 = ax1.get_window_extent().transformed(fig.dpi_scale_trans.inverted())
ax1.imshow
plt.savefig(filename_1, bbox_inches=extent1, transparent=True, pad_inches=0)
ax1.axis('on')
ax2 = plt.subplot(1, 2, 2, projection=ccrs.Robinson(central_longitude=central))
ax2.set_facecolor('#39ff14')
ax2.pcolormesh(
cy_lon2,
cy_lat2,
this_no_inf_d2,
transform=ccrs.PlateCarree(),
cmap=mymap,
vmin=color_min,
vmax=color_max,
)
ax2.set_global()
ax2.coastlines(linewidth=0.5)
plt.margins(0, 0)
ax2.imshow
ax2.axis('off')
extent2 = ax2.get_window_extent().transformed(fig.dpi_scale_trans.inverted())
plt.savefig(filename_2, bbox_inches=extent2, transparent=True, pad_inches=0)
ax2.axis('on')
img1 = skimage.io.imread(filename_1)
img2 = skimage.io.imread(filename_2)
# scikit is adding an alpha channel of all zeros - get rid of it or it arificially
# inflates the ssim
img1 = img1[:, :, :3]
img2 = img2[:, :, :3]
# s = ssim(img1, img2, multichannel=True)
# channel_axis =
# the following version closer to matlab version (and orig ssim paper)
s, ssim_mat = ssim(
img1,
img2,
# multichannel=True,
channel_axis=2,
gaussian_weights=True,
use_sample_covariance=False,
full=True,
K1=k1,
K2=k2,
)
ssim_levs[this_lev] = s
plt.close(fig)
ssim_mats_array.append(np.mean(ssim_mat, axis=2))
return_ssim = ssim_levs.min()
# Reset backend
mpl.use(backend_)
# save full matrix
self._ssim_mat = ssim_mats_array
self._ssim_levs = ssim_levs
self._ssim_value = return_ssim
return float(self._ssim_value)
@property
def ssim_value_fp_slow(self):
"""
We compute the SSIM (structural similarity index) on the spatial data
- using the data itself (we do not create an image) - this is the slower
non-matrix implementation that is good for experiementing (not in practice).
"""
if not self._is_memoized('_ssim_value_fp_slow'):
# if this is a 3D variable, we will do each level seperately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
# re-scale to [0,1] - if not constant
smin = min(np.nanmin(a1), np.nanmin(a2))
smax = max(np.nanmax(a1), np.nanmax(a2))
r = smax - smin
if r == 0.0: # scale by smax if field is a constant (and smax != 0)
if smax == 0.0:
sc_a1 = a1
sc_a2 = a2
else:
sc_a1 = a1 / smax
sc_a2 = a2 / smax
else:
sc_a1 = (a1 - smin) / r
sc_a2 = (a2 - smin) / r
# TEMP - don't quantize
# now quantize to 256 bins
sc_a1 = np.round(sc_a1 * 255) / 255
sc_a2 = np.round(sc_a2 * 255) / 255
# gaussian filter
n = 11 # recommended window size
k = 5
# extent
sigma = 1.5
X = sc_a1.shape[0]
Y = sc_a1.shape[1]
g_w = np.array(self._oned_gauss(n, sigma))
# 2D gauss weights
gg_w = np.outer(g_w, g_w)
# init ssim matrix
ssim_mat = np.zeros_like(sc_a1)
my_eps = 1.0e-8
# DATA LOOP
# go through 2D arrays - each grid point x0, y0 has
# a 2D window [x0 - k, x0+k] [y0 - k, y0 + k]
for i in range(X):
# don't go over boundaries
imin = max(0, i - k)
imax = min(X - 1, i + k)
for j in range(Y):
if np.isnan(sc_a1[i, j]):
# SKIP IF gridpoint is nan
ssim_mat[i, j] = np.nan
continue
jmin = max(0, j - k)
jmax = min(Y - 1, j + k)
# WINDOW CALC
a1_win = sc_a1[imin : imax + 1, jmin : jmax + 1]
a2_win = sc_a2[imin : imax + 1, jmin : jmax + 1]
# if window is by boundary, then it is not 11x11 and we must adjust weights also
if min(a1_win.shape) < n:
Wt = gg_w[
imin + k - i : imax + k - i + 1, jmin + k - j : jmax + k - j + 1
]
else:
Wt = gg_w
# weighted means (TO DO: what if indices are not the same)
indices1 = ~np.isnan(a1_win)
indices2 = ~np.isnan(a2_win)
if not np.all(indices1 == indices2):
print('SSIM ERROR: indices are not the same!')
a1_mu = np.average(a1_win[indices1], weights=Wt[indices1])
a2_mu = np.average(a2_win[indices2], weights=Wt[indices2])
# weighted std squared (variance)
a1_std_sq = (
np.average((a1_win[indices1] * a1_win[indices1]), weights=Wt[indices1])
- a1_mu * a1_mu
)
a2_std_sq = (
np.average((a2_win[indices2] * a2_win[indices2]), weights=Wt[indices2])
- a2_mu * a2_mu
)
# cov of a1 and a2
a1a2_cov = (
np.average(
(a1_win[indices1] * a2_win[indices2]),
weights=Wt[indices1],
)
- a1_mu * a2_mu
)
# SSIM for this window
# first term
ssim_t1 = 2 * a1_mu * a2_mu
ssim_b1 = a1_mu * a1_mu + a2_mu * a2_mu
C1 = my_eps
ssim_t1 = ssim_t1 + C1
ssim_b1 = ssim_b1 + C1
# second term
ssim_t2 = 2 * a1a2_cov
ssim_b2 = a1_std_sq + a2_std_sq
C2 = C3 = my_eps
ssim_t2 = ssim_t2 + C3
ssim_b2 = ssim_b2 + C2
ssim_1 = ssim_t1 / ssim_b1
ssim_2 = ssim_t2 / ssim_b2
ssim_mat[i, j] = ssim_1 * ssim_2
# add cropping
ssim_mat = crop(ssim_mat, k)
mean_ssim = np.nanmean(ssim_mat)
ssim_levs[this_lev] = mean_ssim
ssim_mats_array.append(ssim_mat)
return_ssim = ssim_levs.min()
self._ssim_value_fp_slow = return_ssim
# save full matrix
self._ssim_mat_fp_slow = ssim_mats_array
self._ssim_levs = ssim_levs
return float(self._ssim_value_fp_slow)
@property
def xsize(self):
return self._xsize
@xsize.setter
def xsize(self, xsize):
self._xsize = xsize
@property
def ysize(self):
return self._ysize
@ysize.setter
def ysize(self, ysize):
self._ysize = ysize
@property
def ssim_value_fp_fast(self):
"""
Faster implementation then ssim_value_fp_slow (this is the default DSSIM option).
"""
# from astropy.convolution import Gaussian2DKernel, convolve, interpolate_replace_nans
if not self._is_memoized('_ssim_value_fp_fast'):
# if this is a 3D variable, we will do each level separately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
my_eps = 1.0e-8
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
# re-scale to [0,1] - if not constant
smin = min(np.nanmin(a1), np.nanmin(a2))
smax = max(np.nanmax(a1), np.nanmax(a2))
r = smax - smin
if r == 0.0: # scale by smax if field is a constant (and smax != 0)
if smax == 0.0:
sc_a1 = a1
sc_a2 = a2
else:
sc_a1 = a1 / smax
sc_a2 = a2 / smax
else:
sc_a1 = (a1 - smin) / r
sc_a2 = (a2 - smin) / r
# now quantize to 256 bins
sc_a1 = np.round(sc_a1 * 255) / 255
sc_a2 = np.round(sc_a2 * 255) / 255
# gaussian filter
kernel = Gaussian2DKernel(x_stddev=1.5, x_size=self.xsize, y_size=self.ysize)
k = 5
filter_args = {'boundary': 'fill', 'preserve_nan': True}
a1_mu = convolve(sc_a1, kernel, **filter_args)
a2_mu = convolve(sc_a2, kernel, **filter_args)
a1a1 = convolve(sc_a1 * sc_a1, kernel, **filter_args)
a2a2 = convolve(sc_a2 * sc_a2, kernel, **filter_args)
a1a2 = convolve(sc_a1 * sc_a2, kernel, **filter_args)
###########
var_a1 = a1a1 - a1_mu * a1_mu
var_a2 = a2a2 - a2_mu * a2_mu
cov_a1a2 = a1a2 - a1_mu * a2_mu
# ssim constants
C1 = my_eps
C2 = my_eps
ssim_t1 = 2 * a1_mu * a2_mu + C1
ssim_t2 = 2 * cov_a1a2 + C2
ssim_b1 = a1_mu * a1_mu + a2_mu * a2_mu + C1
ssim_b2 = var_a1 + var_a2 + C2
ssim_1 = ssim_t1 / ssim_b1
ssim_2 = ssim_t2 / ssim_b2
ssim_mat = ssim_1 * ssim_2
# cropping (the border region)
ssim_mat = crop(ssim_mat, k)
mean_ssim = np.nanmean(ssim_mat)
ssim_levs[this_lev] = mean_ssim
ssim_mats_array.append(ssim_mat)
# end of levels calculation
return_ssim = ssim_levs.min()
self._ssim_value_fp_fast = return_ssim
# save ssim on each level
self._ssim_levs = ssim_levs
# save full matrix
self._ssim_mat_fp = ssim_mats_array
return float(self._ssim_value_fp_fast)
@property
def ssim_value_fp_fast_exp(self):
"""
EXP:
"""
# from astropy.convolution import Gaussian2DKernel, convolve, interpolate_replace_nans
if not self._is_memoized('_ssim_value_fp_fast_exp'):
# if this is a 3D variable, we will do each level separately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
ssim_mats_array = []
my_eps = 1.0e-8
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
# re-scale to [0,1] - if not constant
smin = min(np.nanmin(a1), np.nanmin(a2))
smax = max(np.nanmax(a1), np.nanmax(a2))
r = smax - smin
if r == 0.0: # scale by smax if field is a constant (and smax != 0)
if smax == 0.0:
sc_a1 = a1
sc_a2 = a2
else:
sc_a1 = a1 / smax
sc_a2 = a2 / smax
else:
sc_a1 = (a1 - smin) / r
sc_a2 = (a2 - smin) / r
# now quantize to 256 bins
# sc_a1 = np.round(sc_a1 * 255) / 255
# sc_a2 = np.round(sc_a2 * 255) / 255
# gaussian filter
kernel = Gaussian2DKernel(x_stddev=1.5, x_size=11, y_size=11)
k = 5
filter_args = {'boundary': 'fill', 'preserve_nan': True}
a1_mu = convolve(sc_a1, kernel, **filter_args)
a2_mu = convolve(sc_a2, kernel, **filter_args)
a1a1 = convolve(sc_a1 * sc_a1, kernel, **filter_args)
a2a2 = convolve(sc_a2 * sc_a2, kernel, **filter_args)
a1a2 = convolve(sc_a1 * sc_a2, kernel, **filter_args)
###########
var_a1 = a1a1 - a1_mu * a1_mu
var_a2 = a2a2 - a2_mu * a2_mu
cov_a1a2 = a1a2 - a1_mu * a2_mu
# ssim constants
C1 = my_eps
C2 = my_eps
ssim_t1 = 2 * a1_mu * a2_mu + C1
ssim_t2 = 2 * cov_a1a2 + C2
ssim_b1 = a1_mu * a1_mu + a2_mu * a2_mu + C1
ssim_b2 = var_a1 + var_a2 + C2
ssim_1 = ssim_t1 / ssim_b1
ssim_2 = ssim_t2 / ssim_b2
ssim_mat = ssim_1 * ssim_2
# cropping (the border region)
ssim_mat = crop(ssim_mat, k)
mean_ssim = np.nanmean(ssim_mat)
ssim_levs[this_lev] = mean_ssim
ssim_mats_array.append(ssim_mat)
# end of levels calculation
return_ssim = ssim_levs.min()
self._ssim_value_fp_fast_exp = return_ssim
# save ssim on each level
self._ssim_levs = ssim_levs
# save full matrix
self._ssim_mat_fp = ssim_mats_array
return float(self._ssim_value_fp_fast_exp)
@property
def ssim_value_fp_orig(self):
"""the ssim on the fp data with
original constants and no scaling (so-called "straightforward" approach.
This will return Nan on POP data or CAM data with NaNs because scikit
SSIM fuction does not handle NaNs.
"""
import numpy as np
from skimage.metrics import structural_similarity as ssim
if not self._is_memoized('_ssim_value_fp_orig'):
# if this is a 3D variable, we will do each level seperately
if self._calcs1._vert_dim_name is not None:
vname = self._calcs1._vert_dim_name
if vname not in self._calcs1.get_calc('ds').sizes:
nlevels = 1
else:
nlevels = self._calcs1.get_calc('ds').sizes[vname]
else:
nlevels = 1
ssim_levs = np.zeros(nlevels)
for this_lev in range(nlevels):
if nlevels == 1:
a1 = self._calcs1.get_calc('ds').data
a2 = self._calcs2.get_calc('ds').data
else:
a1 = self._calcs1.get_calc('ds').isel({vname: this_lev}).data
a2 = self._calcs2.get_calc('ds').isel({vname: this_lev}).data
if dask.is_dask_collection(a1):
a1 = a1.compute()
if dask.is_dask_collection(a2):
a2 = a2.compute()
maxr = max(a1.max(), a2.max())
minr = min(a1.min(), a2.min())
myrange = maxr - minr
mean_ssim = ssim(
a1,
a2,
multichannel=False,
data_range=myrange,
gaussian_weights=True,
use_sample_covariance=False,
)
ssim_levs[this_lev] = mean_ssim
# end of levels calculation
return_ssim = ssim_levs.min()
self._ssim_value_fp_orig = return_ssim
return self._ssim_value_fp_orig
[docs]
def get_diff_calc(self, name: str, color: Optional[str] = 'coolwarm', xsize=11, ysize=11):
"""
Gets a calc on the dataset that requires more than one input dataset
Parameters
==========
name : str
The name of the calc (must be identical to a property name)
Returns
=======
out : float
"""
if isinstance(name, str):
if name == 'pearson_correlation_coefficient':
return self.pearson_correlation_coefficient
if name == 'covariance':
return self.covariance
if name == 'ks_p_value':
return self.ks_p_value
if name == 'n_rms':
return self.normalized_root_mean_squared
if name == 'n_emax':
return self.normalized_max_pointwise_error
if name == 'spatial_rel_error':
return self.spatial_rel_error
if name == 'max_spatial_rel_error':
return self.max_spatial_rel_error
if name == 'spre_tol':
return self.spre_tol
if name == 'ssim':
self.color = color
return self.ssim_value
if name == 'ssim_fp':
self.xsize = xsize
self.ysize = ysize
return self.ssim_value_fp_fast
if name == 'ssim_fp_exp':
return self.ssim_value_fp_fast_exp
if name == 'ssim_fp_orig': # this is using standard SSIM with floats
# ("straightforward approach")
# not recommended
return self.ssim_value_fp_orig
if name == 'ssim_fp_slow':
# the non-matrix DSSIM implementation - good for experimenting
# not recommended in practice
return self.ssim_value_fp_slow
raise ValueError(f'there is no calc with the name: {name}.')
else:
raise TypeError('name must be a string.')