PySDM_examples.Ware_et_al_2025.example

  1from typing import Optional
  2from packaging import version
  3
  4import numpy as np
  5import matplotlib
  6from matplotlib import pyplot
  7from open_atmos_jupyter_utils import show_plot
  8
  9from PySDM.builder import Builder
 10from PySDM.dynamics import Coalescence
 11from PySDM.environments import Box
 12from PySDM.products import (
 13    ParticleVolumeVersusRadiusLogarithmSpectrum,
 14    WallTime,
 15    CollisionRateDeficitPerGridbox,
 16)
 17
 18from PySDM import Formulae
 19from PySDM.dynamics.collisions.collision_kernels import Golovin
 20from PySDM.initialisation import spectra
 21from PySDM.physics import si
 22
 23_matplotlib_version_3_3_3 = version.parse("3.3.0")
 24_matplotlib_version_actual = version.parse(matplotlib.__version__)
 25
 26
 27def error_measure(y, y_true, x):
 28    # The length of each bin on a logarithmic scale.
 29    dlnr = np.gradient(np.log(x))
 30
 31    F = np.cumsum(y * dlnr)
 32    CDF_Golovin = np.cumsum(y_true * dlnr)
 33
 34    return np.trapz(np.abs(CDF_Golovin - F), np.log(x))
 35
 36
 37class Settings:
 38    def __init__(self: int, steps: Optional[list] = None):
 39        steps = steps or [0, 1200, 2400, 3600]
 40        self.formulae = Formulae()
 41        self.n_sd = 2**13
 42        self.n_part = 2**23 / si.metre**3
 43        self.X0 = self.formulae.trivia.volume(radius=30.531 * si.micrometres)
 44        self.dv = 1e6 * si.metres**3
 45        self.norm_factor = self.n_part * self.dv
 46        self.rho = 1000 * si.kilogram / si.metre**3
 47        self.dt = 1 * si.seconds
 48        self.adaptive = False
 49        self.steps = steps
 50        self.kernel = Golovin(b=1.5e3 / si.second)
 51        self.spectrum = spectra.Exponential(norm_factor=self.norm_factor, scale=self.X0)
 52        self.radius_bins_edges = np.logspace(
 53            np.log10(10 * si.um), np.log10(5e3 * si.um), num=128, endpoint=True
 54        )
 55
 56    @property
 57    def output_steps(self):
 58        return [int(step / self.dt) for step in self.steps]
 59
 60
 61class SpectrumColors:
 62    def __init__(self, begining="#2cbdfe", end="#b317b1"):
 63        self.b = begining
 64        self.e = end
 65
 66    def __call__(self, value: float):
 67        bR, bG, bB = int(self.b[1:3], 16), int(self.b[3:5], 16), int(self.b[5:7], 16)
 68        eR, eG, eB = int(self.e[1:3], 16), int(self.e[3:5], 16), int(self.e[5:7], 16)
 69        R = bR + int((eR - bR) * value)
 70        G = bG + int((eG - bG) * value)
 71        B = bB + int((eB - bB) * value)
 72        result = f"#{hex(R)[2:4]}{hex(G)[2:4]}{hex(B)[2:4]}"
 73        return result
 74
 75
 76class SpectrumPlotter:
 77    def __init__(self, settings, title=None, grid=True, legend=True, log_base=10):
 78        self.settings = settings
 79        self.format = "pdf"
 80        self.colors = SpectrumColors()
 81        self.smooth = False
 82        self.smooth_scope = 2
 83        self.legend = legend
 84        self.grid = grid
 85        self.title = title
 86        self.xlabel = "particle radius [µm]"
 87        self.ylabel = "dm/dlnr [g/m^3/(unit dr/r)]"
 88        self.log_base = log_base
 89        self._ax = None
 90        self.finished = False
 91
 92    @property
 93    def ax(self):
 94        return self._ax or pyplot.gca()
 95
 96    @ax.setter
 97    def ax(self, value):
 98        self._ax = value
 99
100    def finish(self):
101        if self.finished:
102            return
103        self.finished = True
104        if self.grid:
105            self.ax.grid()
106
107        base_arg = {
108            "base"
109            + (
110                "x" if _matplotlib_version_actual < _matplotlib_version_3_3_3 else ""
111            ): self.log_base
112        }
113        if self.title is not None:
114            self.ax.set_title(self.title)
115        self.ax.set_xscale("log", **base_arg)
116        self.ax.set_xlabel(self.xlabel)
117        self.ax.set_ylabel(self.ylabel)
118        if self.legend:
119            self.ax.legend()
120
121    def show(self):
122        self.finish()
123        pyplot.tight_layout()
124        show_plot()
125
126    def save(self, file):
127        self.finish()
128        pyplot.savefig(file, format=self.format)
129
130    def plot(
131        self, spectrum, t, label=None, color=None, title=None, add_error_to_label=False
132    ):
133        error = self.plot_analytic_solution(self.settings, t, spectrum, title)
134        if label is not None and add_error_to_label:
135            label += f" error={error:.4g}"
136        self.plot_data(self.settings, t, spectrum, label, color)
137        return error
138
139    def plot_analytic_solution(self, settings, t, spectrum, title):
140        if t == 0:
141            analytic_solution = settings.spectrum.size_distribution
142        else:
143
144            def analytic_solution(x):
145                return settings.norm_factor * settings.kernel.analytic_solution(
146                    x=x, t=t, x_0=settings.X0, N_0=settings.n_part
147                )
148
149        volume_bins_edges = self.settings.formulae.trivia.volume(
150            settings.radius_bins_edges
151        )
152        dm = np.diff(volume_bins_edges)
153        dr = np.diff(settings.radius_bins_edges)
154
155        pdf_m_x = volume_bins_edges[:-1] + dm / 2
156        pdf_m_y = analytic_solution(pdf_m_x)
157
158        pdf_r_x = settings.radius_bins_edges[:-1] + dr / 2
159        pdf_r_y = pdf_m_y * dm / dr * pdf_r_x
160
161        x = pdf_r_x * si.metres / si.micrometres
162        y_true = (
163            pdf_r_y
164            * self.settings.formulae.trivia.volume(radius=pdf_r_x)
165            * settings.rho
166            / settings.dv
167            * si.kilograms
168            / si.grams
169        )
170
171        self.ax.plot(x, y_true, color="black")
172
173        if spectrum is not None:
174            y = spectrum * si.kilograms / si.grams
175            error = error_measure(y, y_true, x)
176            self.title = (
177                title or f"error measure: {error:.2f}"
178            )  # TODO #327 relative error
179            return error
180        return None
181
182    def plot_data(self, settings, t, spectrum, label, color):
183        if self.smooth:
184            scope = self.smooth_scope
185            if t != 0:
186                new = np.copy(spectrum)
187                for _ in range(2):
188                    for i in range(scope, len(spectrum) - scope):
189                        new[i] = np.mean(spectrum[i - scope : i + scope + 1])
190                    scope = 1
191                    for i in range(scope, len(spectrum) - scope):
192                        spectrum[i] = np.mean(new[i - scope : i + scope + 1])
193
194            x = settings.radius_bins_edges[:-scope]
195            dx = np.diff(x)
196            self.ax.plot(
197                (x[:-1] + dx / 2) * si.metres / si.micrometres,
198                spectrum[:-scope] * si.kilograms / si.grams,
199                label=label or f"t = {t}s",
200                color=color
201                or self.colors(t / (self.settings.output_steps[-1] * self.settings.dt)),
202            )
203        else:
204            self.ax.step(
205                settings.radius_bins_edges[:-1] * si.metres / si.micrometres,
206                spectrum * si.kilograms / si.grams,
207                where="post",
208                label=label or f"t = {t}s",
209                color=color
210                or self.colors(t / (self.settings.output_steps[-1] * self.settings.dt)),
211            )
212
213
214def run(settings, backend, observers=(), sampling_method="deterministic"):
215    builder = Builder(
216        n_sd=settings.n_sd,
217        backend=backend,
218        environment=Box(dv=settings.dv, dt=settings.dt),
219        dynamics=(
220            Coalescence(collision_kernel=settings.kernel, adaptive=settings.adaptive),
221        ),
222    )
223    builder.particulator.environment["rhod"] = 1.0
224    attributes = {}
225    sampling = settings.sampling
226    attributes["volume"], attributes["multiplicity"] = getattr(
227        sampling, f"sample_{sampling_method}"
228    )(settings.n_sd, backend=backend)
229    products = (
230        ParticleVolumeVersusRadiusLogarithmSpectrum(
231            settings.radius_bins_edges, name="dv/dlnr"
232        ),
233        WallTime(),
234        CollisionRateDeficitPerGridbox(name="deficit"),
235    )
236    particulator = builder.build(attributes, products)
237
238    for observer in observers:
239        particulator.observers.append(observer)
240
241    vals = {}
242    deficit = 0
243    particulator.products["wall time"].reset()
244    for step in settings.output_steps:
245        particulator.run(step - particulator.n_steps)
246        vals[step] = particulator.products["dv/dlnr"].get()[0]
247        vals[step][:] *= settings.rho
248        deficit += particulator.products["deficit"].get()
249    deficit = deficit / len(settings.output_steps)
250
251    exec_time = particulator.products["wall time"].get()
252    return vals, exec_time, deficit
def error_measure(y, y_true, x):
28def error_measure(y, y_true, x):
29    # The length of each bin on a logarithmic scale.
30    dlnr = np.gradient(np.log(x))
31
32    F = np.cumsum(y * dlnr)
33    CDF_Golovin = np.cumsum(y_true * dlnr)
34
35    return np.trapz(np.abs(CDF_Golovin - F), np.log(x))
class Settings:
38class Settings:
39    def __init__(self: int, steps: Optional[list] = None):
40        steps = steps or [0, 1200, 2400, 3600]
41        self.formulae = Formulae()
42        self.n_sd = 2**13
43        self.n_part = 2**23 / si.metre**3
44        self.X0 = self.formulae.trivia.volume(radius=30.531 * si.micrometres)
45        self.dv = 1e6 * si.metres**3
46        self.norm_factor = self.n_part * self.dv
47        self.rho = 1000 * si.kilogram / si.metre**3
48        self.dt = 1 * si.seconds
49        self.adaptive = False
50        self.steps = steps
51        self.kernel = Golovin(b=1.5e3 / si.second)
52        self.spectrum = spectra.Exponential(norm_factor=self.norm_factor, scale=self.X0)
53        self.radius_bins_edges = np.logspace(
54            np.log10(10 * si.um), np.log10(5e3 * si.um), num=128, endpoint=True
55        )
56
57    @property
58    def output_steps(self):
59        return [int(step / self.dt) for step in self.steps]
Settings(steps: Optional[list] = None)
39    def __init__(self: int, steps: Optional[list] = None):
40        steps = steps or [0, 1200, 2400, 3600]
41        self.formulae = Formulae()
42        self.n_sd = 2**13
43        self.n_part = 2**23 / si.metre**3
44        self.X0 = self.formulae.trivia.volume(radius=30.531 * si.micrometres)
45        self.dv = 1e6 * si.metres**3
46        self.norm_factor = self.n_part * self.dv
47        self.rho = 1000 * si.kilogram / si.metre**3
48        self.dt = 1 * si.seconds
49        self.adaptive = False
50        self.steps = steps
51        self.kernel = Golovin(b=1.5e3 / si.second)
52        self.spectrum = spectra.Exponential(norm_factor=self.norm_factor, scale=self.X0)
53        self.radius_bins_edges = np.logspace(
54            np.log10(10 * si.um), np.log10(5e3 * si.um), num=128, endpoint=True
55        )
formulae
n_sd
n_part
X0
dv
norm_factor
rho
dt
adaptive
steps
kernel
spectrum
radius_bins_edges
output_steps
57    @property
58    def output_steps(self):
59        return [int(step / self.dt) for step in self.steps]
class SpectrumColors:
62class SpectrumColors:
63    def __init__(self, begining="#2cbdfe", end="#b317b1"):
64        self.b = begining
65        self.e = end
66
67    def __call__(self, value: float):
68        bR, bG, bB = int(self.b[1:3], 16), int(self.b[3:5], 16), int(self.b[5:7], 16)
69        eR, eG, eB = int(self.e[1:3], 16), int(self.e[3:5], 16), int(self.e[5:7], 16)
70        R = bR + int((eR - bR) * value)
71        G = bG + int((eG - bG) * value)
72        B = bB + int((eB - bB) * value)
73        result = f"#{hex(R)[2:4]}{hex(G)[2:4]}{hex(B)[2:4]}"
74        return result
SpectrumColors(begining='#2cbdfe', end='#b317b1')
63    def __init__(self, begining="#2cbdfe", end="#b317b1"):
64        self.b = begining
65        self.e = end
b
e
class SpectrumPlotter:
 77class SpectrumPlotter:
 78    def __init__(self, settings, title=None, grid=True, legend=True, log_base=10):
 79        self.settings = settings
 80        self.format = "pdf"
 81        self.colors = SpectrumColors()
 82        self.smooth = False
 83        self.smooth_scope = 2
 84        self.legend = legend
 85        self.grid = grid
 86        self.title = title
 87        self.xlabel = "particle radius [µm]"
 88        self.ylabel = "dm/dlnr [g/m^3/(unit dr/r)]"
 89        self.log_base = log_base
 90        self._ax = None
 91        self.finished = False
 92
 93    @property
 94    def ax(self):
 95        return self._ax or pyplot.gca()
 96
 97    @ax.setter
 98    def ax(self, value):
 99        self._ax = value
100
101    def finish(self):
102        if self.finished:
103            return
104        self.finished = True
105        if self.grid:
106            self.ax.grid()
107
108        base_arg = {
109            "base"
110            + (
111                "x" if _matplotlib_version_actual < _matplotlib_version_3_3_3 else ""
112            ): self.log_base
113        }
114        if self.title is not None:
115            self.ax.set_title(self.title)
116        self.ax.set_xscale("log", **base_arg)
117        self.ax.set_xlabel(self.xlabel)
118        self.ax.set_ylabel(self.ylabel)
119        if self.legend:
120            self.ax.legend()
121
122    def show(self):
123        self.finish()
124        pyplot.tight_layout()
125        show_plot()
126
127    def save(self, file):
128        self.finish()
129        pyplot.savefig(file, format=self.format)
130
131    def plot(
132        self, spectrum, t, label=None, color=None, title=None, add_error_to_label=False
133    ):
134        error = self.plot_analytic_solution(self.settings, t, spectrum, title)
135        if label is not None and add_error_to_label:
136            label += f" error={error:.4g}"
137        self.plot_data(self.settings, t, spectrum, label, color)
138        return error
139
140    def plot_analytic_solution(self, settings, t, spectrum, title):
141        if t == 0:
142            analytic_solution = settings.spectrum.size_distribution
143        else:
144
145            def analytic_solution(x):
146                return settings.norm_factor * settings.kernel.analytic_solution(
147                    x=x, t=t, x_0=settings.X0, N_0=settings.n_part
148                )
149
150        volume_bins_edges = self.settings.formulae.trivia.volume(
151            settings.radius_bins_edges
152        )
153        dm = np.diff(volume_bins_edges)
154        dr = np.diff(settings.radius_bins_edges)
155
156        pdf_m_x = volume_bins_edges[:-1] + dm / 2
157        pdf_m_y = analytic_solution(pdf_m_x)
158
159        pdf_r_x = settings.radius_bins_edges[:-1] + dr / 2
160        pdf_r_y = pdf_m_y * dm / dr * pdf_r_x
161
162        x = pdf_r_x * si.metres / si.micrometres
163        y_true = (
164            pdf_r_y
165            * self.settings.formulae.trivia.volume(radius=pdf_r_x)
166            * settings.rho
167            / settings.dv
168            * si.kilograms
169            / si.grams
170        )
171
172        self.ax.plot(x, y_true, color="black")
173
174        if spectrum is not None:
175            y = spectrum * si.kilograms / si.grams
176            error = error_measure(y, y_true, x)
177            self.title = (
178                title or f"error measure: {error:.2f}"
179            )  # TODO #327 relative error
180            return error
181        return None
182
183    def plot_data(self, settings, t, spectrum, label, color):
184        if self.smooth:
185            scope = self.smooth_scope
186            if t != 0:
187                new = np.copy(spectrum)
188                for _ in range(2):
189                    for i in range(scope, len(spectrum) - scope):
190                        new[i] = np.mean(spectrum[i - scope : i + scope + 1])
191                    scope = 1
192                    for i in range(scope, len(spectrum) - scope):
193                        spectrum[i] = np.mean(new[i - scope : i + scope + 1])
194
195            x = settings.radius_bins_edges[:-scope]
196            dx = np.diff(x)
197            self.ax.plot(
198                (x[:-1] + dx / 2) * si.metres / si.micrometres,
199                spectrum[:-scope] * si.kilograms / si.grams,
200                label=label or f"t = {t}s",
201                color=color
202                or self.colors(t / (self.settings.output_steps[-1] * self.settings.dt)),
203            )
204        else:
205            self.ax.step(
206                settings.radius_bins_edges[:-1] * si.metres / si.micrometres,
207                spectrum * si.kilograms / si.grams,
208                where="post",
209                label=label or f"t = {t}s",
210                color=color
211                or self.colors(t / (self.settings.output_steps[-1] * self.settings.dt)),
212            )
SpectrumPlotter(settings, title=None, grid=True, legend=True, log_base=10)
78    def __init__(self, settings, title=None, grid=True, legend=True, log_base=10):
79        self.settings = settings
80        self.format = "pdf"
81        self.colors = SpectrumColors()
82        self.smooth = False
83        self.smooth_scope = 2
84        self.legend = legend
85        self.grid = grid
86        self.title = title
87        self.xlabel = "particle radius [µm]"
88        self.ylabel = "dm/dlnr [g/m^3/(unit dr/r)]"
89        self.log_base = log_base
90        self._ax = None
91        self.finished = False
settings
format
colors
smooth
smooth_scope
legend
grid
title
xlabel
ylabel
log_base
finished
ax
93    @property
94    def ax(self):
95        return self._ax or pyplot.gca()
def finish(self):
101    def finish(self):
102        if self.finished:
103            return
104        self.finished = True
105        if self.grid:
106            self.ax.grid()
107
108        base_arg = {
109            "base"
110            + (
111                "x" if _matplotlib_version_actual < _matplotlib_version_3_3_3 else ""
112            ): self.log_base
113        }
114        if self.title is not None:
115            self.ax.set_title(self.title)
116        self.ax.set_xscale("log", **base_arg)
117        self.ax.set_xlabel(self.xlabel)
118        self.ax.set_ylabel(self.ylabel)
119        if self.legend:
120            self.ax.legend()
def show(self):
122    def show(self):
123        self.finish()
124        pyplot.tight_layout()
125        show_plot()
def save(self, file):
127    def save(self, file):
128        self.finish()
129        pyplot.savefig(file, format=self.format)
def plot( self, spectrum, t, label=None, color=None, title=None, add_error_to_label=False):
131    def plot(
132        self, spectrum, t, label=None, color=None, title=None, add_error_to_label=False
133    ):
134        error = self.plot_analytic_solution(self.settings, t, spectrum, title)
135        if label is not None and add_error_to_label:
136            label += f" error={error:.4g}"
137        self.plot_data(self.settings, t, spectrum, label, color)
138        return error
def plot_analytic_solution(self, settings, t, spectrum, title):
140    def plot_analytic_solution(self, settings, t, spectrum, title):
141        if t == 0:
142            analytic_solution = settings.spectrum.size_distribution
143        else:
144
145            def analytic_solution(x):
146                return settings.norm_factor * settings.kernel.analytic_solution(
147                    x=x, t=t, x_0=settings.X0, N_0=settings.n_part
148                )
149
150        volume_bins_edges = self.settings.formulae.trivia.volume(
151            settings.radius_bins_edges
152        )
153        dm = np.diff(volume_bins_edges)
154        dr = np.diff(settings.radius_bins_edges)
155
156        pdf_m_x = volume_bins_edges[:-1] + dm / 2
157        pdf_m_y = analytic_solution(pdf_m_x)
158
159        pdf_r_x = settings.radius_bins_edges[:-1] + dr / 2
160        pdf_r_y = pdf_m_y * dm / dr * pdf_r_x
161
162        x = pdf_r_x * si.metres / si.micrometres
163        y_true = (
164            pdf_r_y
165            * self.settings.formulae.trivia.volume(radius=pdf_r_x)
166            * settings.rho
167            / settings.dv
168            * si.kilograms
169            / si.grams
170        )
171
172        self.ax.plot(x, y_true, color="black")
173
174        if spectrum is not None:
175            y = spectrum * si.kilograms / si.grams
176            error = error_measure(y, y_true, x)
177            self.title = (
178                title or f"error measure: {error:.2f}"
179            )  # TODO #327 relative error
180            return error
181        return None
def plot_data(self, settings, t, spectrum, label, color):
183    def plot_data(self, settings, t, spectrum, label, color):
184        if self.smooth:
185            scope = self.smooth_scope
186            if t != 0:
187                new = np.copy(spectrum)
188                for _ in range(2):
189                    for i in range(scope, len(spectrum) - scope):
190                        new[i] = np.mean(spectrum[i - scope : i + scope + 1])
191                    scope = 1
192                    for i in range(scope, len(spectrum) - scope):
193                        spectrum[i] = np.mean(new[i - scope : i + scope + 1])
194
195            x = settings.radius_bins_edges[:-scope]
196            dx = np.diff(x)
197            self.ax.plot(
198                (x[:-1] + dx / 2) * si.metres / si.micrometres,
199                spectrum[:-scope] * si.kilograms / si.grams,
200                label=label or f"t = {t}s",
201                color=color
202                or self.colors(t / (self.settings.output_steps[-1] * self.settings.dt)),
203            )
204        else:
205            self.ax.step(
206                settings.radius_bins_edges[:-1] * si.metres / si.micrometres,
207                spectrum * si.kilograms / si.grams,
208                where="post",
209                label=label or f"t = {t}s",
210                color=color
211                or self.colors(t / (self.settings.output_steps[-1] * self.settings.dt)),
212            )
def run(settings, backend, observers=(), sampling_method='deterministic'):
215def run(settings, backend, observers=(), sampling_method="deterministic"):
216    builder = Builder(
217        n_sd=settings.n_sd,
218        backend=backend,
219        environment=Box(dv=settings.dv, dt=settings.dt),
220        dynamics=(
221            Coalescence(collision_kernel=settings.kernel, adaptive=settings.adaptive),
222        ),
223    )
224    builder.particulator.environment["rhod"] = 1.0
225    attributes = {}
226    sampling = settings.sampling
227    attributes["volume"], attributes["multiplicity"] = getattr(
228        sampling, f"sample_{sampling_method}"
229    )(settings.n_sd, backend=backend)
230    products = (
231        ParticleVolumeVersusRadiusLogarithmSpectrum(
232            settings.radius_bins_edges, name="dv/dlnr"
233        ),
234        WallTime(),
235        CollisionRateDeficitPerGridbox(name="deficit"),
236    )
237    particulator = builder.build(attributes, products)
238
239    for observer in observers:
240        particulator.observers.append(observer)
241
242    vals = {}
243    deficit = 0
244    particulator.products["wall time"].reset()
245    for step in settings.output_steps:
246        particulator.run(step - particulator.n_steps)
247        vals[step] = particulator.products["dv/dlnr"].get()[0]
248        vals[step][:] *= settings.rho
249        deficit += particulator.products["deficit"].get()
250    deficit = deficit / len(settings.output_steps)
251
252    exec_time = particulator.products["wall time"].get()
253    return vals, exec_time, deficit