Skip to content
17 changes: 9 additions & 8 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,14 +1538,12 @@ def worker(j):

# If probmatching_method is not None, resample the distribution from
# both the extrapolation cascade and the model (NWP) cascade and use
# that for the probability matching
# that for the probability matching.
if probmatching_method is not None and resample_distribution:
# deal with missing values
arr1 = R_pm_ep[t_index]
arr2 = precip_models_pm_temp[j]
arr2 = np.where(np.isnan(arr2), np.nanmin(arr2), arr2)
arr1 = np.where(np.isnan(arr1), arr2, arr1)
# resample weights based on cascade level 2
# resample weights based on cascade level 2.
# Areas where one of the fields is nan are not included.
R_pm_resampled = probmatching.resample_distributions(
first_array=arr1,
second_array=arr2,
Expand All @@ -1555,11 +1553,14 @@ def worker(j):
R_pm_resampled = R_pm_blended.copy()

if probmatching_method == "cdf":
# Adjust the CDF of the forecast to match the most recent
# benchmark rainfall field (R_pm_blended). If the forecast
# nan indices in the extrapolation nowcast
nan_indices = np.isnan(R_pm_ep[t_index])
# Adjust the CDF of the forecast to match the resampled distribution combined from
# extrapolation and model fields.
# Rainfall outside the pure extrapolation domain is not taken into account.
if np.any(np.isfinite(R_f_new)):
R_f_new = probmatching.nonparam_match_empirical_cdf(
R_f_new, R_pm_resampled
R_f_new, R_pm_resampled, nan_indices
)
R_pm_resampled = None
elif probmatching_method == "mean":
Expand Down
76 changes: 49 additions & 27 deletions pysteps/postprocessing/probmatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def compute_empirical_cdf(bin_edges, hist):
return cdf


def nonparam_match_empirical_cdf(initial_array, target_array):
def nonparam_match_empirical_cdf(initial_array, target_array, ignore_indices=None):
"""
Matches the empirical CDF of the initial array with the empirical CDF
of a target array. Initial ranks are conserved, but empirical distribution
Expand All @@ -64,55 +64,66 @@ def nonparam_match_empirical_cdf(initial_array, target_array):
initial_array: array_like
The initial array whose CDF is to be matched with the target.
target_array: array_like
The target array.
The target array
ignore_indices: array_like, optional
Indices of pixels in the initial_array which are to be ignored (not
rescaled) or an array of booleans with True at the pixel locations to
be ignored in initial_array and False elsewhere.


Returns
-------
output_array: ndarray
The matched array of the same shape as the initial array.
"""

if np.any(~np.isfinite(initial_array)):
raise ValueError("initial array contains non-finite values")
if np.any(~np.isfinite(target_array)):
raise ValueError("target array contains non-finite values")
if np.all(np.isnan(initial_array)):
raise ValueError("Initial array contains only nans.")
if initial_array.size != target_array.size:
raise ValueError(
"dimension mismatch between initial_array and target_array: "
f"initial_array.shape={initial_array.shape}, target_array.shape={target_array.shape}"
)

initial_array = np.array(initial_array)
target_array = np.array(target_array)
initial_array_copy = np.array(initial_array, dtype=float)
target_array = np.array(target_array, dtype=float)

# zeros in initial array
zvalue = initial_array.min()
idxzeros = initial_array == zvalue
# Determine zero in initial array and set nans to zero
zvalue = np.nanmin(initial_array_copy)
if ignore_indices is not None:
initial_array_copy[ignore_indices] = zvalue
# Check if there are still nans left after setting the values at ignore_indices to zero.
if np.any(~np.isfinite(initial_array_copy)):
raise ValueError(
"Initial array contains non-finite values outside ignore_indices mask."
)

# zeros in the target array
zvalue_trg = target_array.min()
idxzeros = initial_array_copy == zvalue

# Determine zero of target_array and set nans to zero.
zvalue_trg = np.nanmin(target_array)
target_array = np.where(np.isnan(target_array), zvalue_trg, target_array)

# adjust the fraction of rain in target distribution if the number of
# nonzeros is greater than in the initial array
if np.sum(target_array > zvalue_trg) > np.sum(initial_array > zvalue):
war = np.sum(initial_array > zvalue) / initial_array.size
# nonzeros is greater than in the initial array (the lowest values will be set to zero)
if np.sum(target_array > zvalue_trg) > np.sum(initial_array_copy > zvalue):
war = np.sum(initial_array_copy > zvalue) / initial_array_copy.size
p = np.percentile(target_array, 100 * (1 - war))
target_array = target_array.copy()
target_array[target_array < p] = zvalue_trg

# flatten the arrays
arrayshape = initial_array.shape
target_array = target_array.flatten()
initial_array = initial_array.flatten()
# flatten the arrays without copying them
arrayshape = initial_array_copy.shape
target_array.reshape(-1)
initial_array_copy.reshape(-1)

# rank target values
order = target_array.argsort()
ranked = target_array[order]

# rank initial values order
orderin = initial_array.argsort()
ranks = np.empty(len(initial_array), int)
ranks[orderin] = np.arange(len(initial_array))
orderin = initial_array_copy.argsort()
ranks = np.empty(len(initial_array_copy), int)
ranks[orderin] = np.arange(len(initial_array_copy))

# get ranked values from target and rearrange with the initial order
output_array = ranked[ranks]
Expand All @@ -123,6 +134,9 @@ def nonparam_match_empirical_cdf(initial_array, target_array):
# read original zeros
output_array[idxzeros] = zvalue_trg

# Put back the original values outside the nan-mask of the target array.
if ignore_indices is not None:
output_array[ignore_indices] = initial_array[ignore_indices]
return output_array


Expand Down Expand Up @@ -264,6 +278,7 @@ def resample_distributions(first_array, second_array, probability_first_array):
"""
Merges two distributions (e.g., from the extrapolation nowcast and NWP in the blending module)
to effectively combine two distributions for probability matching without losing extremes.
Entries for which one array has a nan will not be included from the other array either.

Parameters
----------
Expand All @@ -287,16 +302,22 @@ def resample_distributions(first_array, second_array, probability_first_array):
Raises
------
ValueError
If `first_array` and `second_array` do not have the same shape or if inputs contain NaNs.
If `first_array` and `second_array` do not have the same shape.
"""

# Valide inputs
if first_array.shape != second_array.shape:
raise ValueError("first_array and second_array must have the same shape")
if np.isnan(first_array).any() or np.isnan(second_array).any():
raise ValueError("Input arrays must not contain NaNs")
probability_first_array = np.clip(probability_first_array, 0.0, 1.0)

# Propagate the NaN values of the arrays to each other if there are any; convert to float to make sure this works.
nanmask = np.isnan(first_array) | np.isnan(second_array)
if np.any(nanmask):
first_array = first_array.astype(float)
first_array[nanmask] = np.nan
second_array = second_array.astype(float)
second_array[nanmask] = np.nan

# Flatten and sort the arrays
asort = np.sort(first_array, axis=None)[::-1]
bsort = np.sort(second_array, axis=None)[::-1]
Expand All @@ -307,6 +328,7 @@ def resample_distributions(first_array, second_array, probability_first_array):
csort = np.where(idxsamples, asort, bsort)
csort = np.sort(csort)[::-1]

# Return the resampled array in descending order (starting with the nan values)
return csort


Expand Down
164 changes: 150 additions & 14 deletions pysteps/tests/test_postprocessing_probmatching.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import numpy as np
import pytest
from pysteps.postprocessing.probmatching import resample_distributions
from pysteps.postprocessing.probmatching import nonparam_match_empirical_cdf


class TestResampleDistributions:
Expand Down Expand Up @@ -39,19 +40,154 @@ def test_probability_one(self):
)
assert np.array_equal(result, np.sort(first_array)[::-1])

def test_nan_in_inputs(self):
def test_nan_in_arr1_prob_1(self):
array_with_nan = np.array([1, 3, np.nan, 7, 9])
array_without_nan = np.array([2.0, 4, 6, 8, 10])
probability_first_array = 1.0
result = resample_distributions(
array_with_nan, array_without_nan, probability_first_array
)
expected_result = np.array([np.nan, 9, 7, 3, 1], dtype=float)
assert np.allclose(result, expected_result, equal_nan=True)

def test_nan_in_arr1_prob_0(self):
array_with_nan = np.array([1, 3, np.nan, 7, 9])
array_without_nan = np.array([2, 4, 6, 8, 10])
probability_first_array = 0.6
with pytest.raises(ValueError, match="Input arrays must not contain NaNs"):
resample_distributions(
array_with_nan, array_without_nan, probability_first_array
)
with pytest.raises(ValueError, match="Input arrays must not contain NaNs"):
resample_distributions(
array_without_nan, array_with_nan, probability_first_array
)
with pytest.raises(ValueError, match="Input arrays must not contain NaNs"):
resample_distributions(
array_with_nan, array_with_nan, probability_first_array
)
probability_first_array = 0.0
result = resample_distributions(
array_with_nan, array_without_nan, probability_first_array
)
expected_result = np.array([np.nan, 10, 8, 4, 2], dtype=float)
assert np.allclose(result, expected_result, equal_nan=True)

def test_nan_in_arr2_prob_1(self):
array_without_nan = np.array([1, 3, 5, 7, 9])
array_with_nan = np.array([2.0, 4, 6, np.nan, 10])
probability_first_array = 1.0
result = resample_distributions(
array_without_nan, array_with_nan, probability_first_array
)
expected_result = np.array([np.nan, 9, 5, 3, 1], dtype=float)
assert np.allclose(result, expected_result, equal_nan=True)

def test_nan_in_arr2_prob_0(self):
array_without_nan = np.array([1, 3, 5, 7, 9])
array_with_nan = np.array([2, 4, 6, np.nan, 10])
probability_first_array = 0.0
result = resample_distributions(
array_without_nan, array_with_nan, probability_first_array
)
expected_result = np.array([np.nan, 10, 6, 4, 2], dtype=float)
assert np.allclose(result, expected_result, equal_nan=True)

def test_nan_in_both_prob_1(self):
array1_with_nan = np.array([1, np.nan, np.nan, 7, 9])
array2_with_nan = np.array([2.0, 4, np.nan, np.nan, 10])
probability_first_array = 1.0
result = resample_distributions(
array1_with_nan, array2_with_nan, probability_first_array
)
expected_result = np.array([np.nan, np.nan, np.nan, 9, 1], dtype=float)
assert np.allclose(result, expected_result, equal_nan=True)

def test_nan_in_both_prob_0(self):
array1_with_nan = np.array([1, np.nan, np.nan, 7, 9])
array2_with_nan = np.array([2.0, 4, np.nan, np.nan, 10])
probability_first_array = 0.0
result = resample_distributions(
array1_with_nan, array2_with_nan, probability_first_array
)
expected_result = np.array([np.nan, np.nan, np.nan, 10, 2], dtype=float)
assert np.allclose(result, expected_result, equal_nan=True)


class TestNonparamMatchEmpiricalCDF:
@pytest.fixture(autouse=True)
def setup(self):
# Set the seed for reproducibility
np.random.seed(42)

def test_ignore_indices_with_nans_both(self):
initial_array = np.array([np.nan, np.nan, 6, 2, 0, 0, 0, 0, 0, 0])
target_array = np.array([np.nan, np.nan, 9, 5, 4, 0, 0, 0, 0, 0])
result = nonparam_match_empirical_cdf(
initial_array, target_array, ignore_indices=np.isnan(initial_array)
)
expected_result = np.array([np.nan, np.nan, 9, 5, 0, 0, 0, 0, 0, 0])
assert np.allclose(result, expected_result, equal_nan=True)

def test_zeroes_initial(self):
initial_array = np.zeros(10)
target_array = np.array([0, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = nonparam_match_empirical_cdf(initial_array, target_array)
expected_result = np.zeros(10)
assert np.allclose(result, expected_result)

def test_nans_initial(self):
initial_array = np.array(
[0, 1, 2, 3, 4, np.nan, np.nan, np.nan, np.nan, np.nan]
)
target_array = np.array([0, 2, 3, 4, 5, 6, 7, 8, 9, 10])
with pytest.raises(
ValueError,
match="Initial array contains non-finite values outside ignore_indices mask.",
):
nonparam_match_empirical_cdf(initial_array, target_array)

def test_all_nans_initial(self):
initial_array = np.full(10, np.nan)
target_array = np.array([0, 2, 3, 4, 5, 6, 7, 8, 9, 10])
with pytest.raises(ValueError, match="Initial array contains only nans."):
nonparam_match_empirical_cdf(initial_array, target_array)

def test_ignore_indices_nans_initial(self):
initial_array = np.array(
[0, 1, 2, 3, 4, np.nan, np.nan, np.nan, np.nan, np.nan]
)
target_array = np.array([0, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = nonparam_match_empirical_cdf(
initial_array, target_array, ignore_indices=np.isnan(initial_array)
)
expected_result = np.array(
[0, 7, 8, 9, 10, np.nan, np.nan, np.nan, np.nan, np.nan]
)
assert np.allclose(result, expected_result, equal_nan=True)

def test_ignore_indices_nans_target(self):
# We expect the initial_array values for which ignore_indices is true to be conserved as-is.
initial_array = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
target_array = np.array(
[0, 2, 3, 4, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan]
)
result = nonparam_match_empirical_cdf(
initial_array, target_array, ignore_indices=np.isnan(target_array)
)
expected_result = np.array([0, 2, 3, 4, 4, 5, 6, 7, 8, 9])
assert np.allclose(result, expected_result, equal_nan=True)

def test_more_zeroes_in_initial(self):
initial_array = np.array([1, 4, 0, 0, 0, 0, 0, 0, 0, 0])
target_array = np.array([10, 8, 6, 4, 2, 0, 0, 0, 0, 0])
result = nonparam_match_empirical_cdf(
initial_array, target_array, ignore_indices=np.isnan(initial_array)
)
expected_result = np.array([8, 10, 0, 0, 0, 0, 0, 0, 0, 0])
assert np.allclose(result, expected_result, equal_nan=True)

def test_more_zeroes_in_initial_unsrt(self):
initial_array = np.array([1, 4, 0, 0, 0, 0, 0, 0, 0, 0])
target_array = np.array([6, 4, 2, 0, 0, 0, 0, 0, 10, 8])
result = nonparam_match_empirical_cdf(
initial_array, target_array, ignore_indices=np.isnan(initial_array)
)
expected_result = np.array([8, 10, 0, 0, 0, 0, 0, 0, 0, 0])
assert np.allclose(result, expected_result, equal_nan=True)

def test_more_zeroes_in_target(self):
initial_array = np.array([1, 3, 7, 5, 0, 0, 0, 0, 0, 0])
target_array = np.array([10, 8, 0, 0, 0, 0, 0, 0, 0, 0])
result = nonparam_match_empirical_cdf(
initial_array, target_array, ignore_indices=np.isnan(initial_array)
)
expected_result = np.array([0, 0, 10, 8, 0, 0, 0, 0, 0, 0])
assert np.allclose(result, expected_result, equal_nan=True)