PySDM_examples.Morrison_and_Grabowski_2007.cumulus

  1import numpy as np
  2from PySDM_examples.Morrison_and_Grabowski_2007.common import Common
  3from PySDM_examples.Szumowski_et_al_1998 import sounding
  4from scipy.integrate import solve_ivp
  5from scipy.interpolate import interp1d
  6
  7from PySDM.physics import si
  8
  9
 10class Cumulus(Common):
 11    def __init__(self, formulae):
 12        super().__init__(formulae)
 13        self.size = (9 * si.km, 2.7 * si.km)
 14        self.hx = 1.8 * si.km
 15        self.x0 = 3.6 * si.km
 16        self.xc = self.size[0] / 2
 17        self.dz = 50 * si.m
 18        self.grid = tuple(s / self.dz for s in self.size)
 19        for g in self.grid:
 20            assert int(g) == g
 21        self.grid = tuple(int(g) for g in self.grid)
 22        self.dt = 1 * si.s
 23        self.simulation_time = 60 * si.min
 24
 25        self.__init_profiles()
 26
 27    def __init_profiles(self):
 28        z_of_p = self.__z_of_p()
 29        T_of_z = interp1d(z_of_p, sounding.temperature)
 30        p_of_z = interp1d(z_of_p, sounding.pressure)
 31        q_of_z = interp1d(z_of_p, sounding.mixing_ratio)
 32
 33        z_points_scalar = np.linspace(
 34            self.dz / 2, self.size[-1] - self.dz / 2, num=self.grid[-1], endpoint=True
 35        )
 36        z_points_both = np.linspace(
 37            0, self.size[-1], num=2 * self.grid[-1] + 1, endpoint=True
 38        )
 39        rhod_of_z = self.__rhod_of_z(T_of_z, p_of_z, q_of_z, z_of_p, z_points_both)
 40        self.rhod = interp1d(z_points_both / self.size[-1], rhod_of_z)
 41        self.initial_water_vapour_mixing_ratio = q_of_z(z_points_scalar)
 42        self.th_std0 = self.formulae.trivia.th_std(
 43            p_of_z(z_points_scalar), T_of_z(z_points_scalar)
 44        )
 45
 46    def rhod_of_zZ(self, zZ):
 47        return self.rhod(zZ)
 48
 49    def __rhod_of_z(self, T_of_z, p_of_z, q_of_z, z_of_p, z_points):
 50        def drhod_dz(z, _):
 51            lv = self.formulae.latent_heat.lv(T_of_z(z))
 52            return self.formulae.hydrostatics.drho_dz(
 53                self.formulae.constants.g_std, p_of_z(z), T_of_z(z), q_of_z(z), lv
 54            )
 55
 56        theta_std0 = self.formulae.trivia.th_std(
 57            sounding.pressure[0], sounding.temperature[0]
 58        )
 59        rhod0 = self.formulae.state_variable_triplet.rho_d(
 60            sounding.pressure[0], sounding.mixing_ratio[0], theta_std0
 61        )
 62        rhod_solution = solve_ivp(
 63            fun=drhod_dz,
 64            t_span=(0, np.amax(z_of_p)),
 65            y0=np.asarray((rhod0,)),
 66            t_eval=z_points,
 67        )
 68        assert rhod_solution.success
 69        return rhod_solution.y[0]
 70
 71    def __z_of_p(self):
 72        T_virt_of_p = interp1d(
 73            sounding.pressure,
 74            sounding.temperature
 75            * (1 + sounding.mixing_ratio / self.formulae.constants.eps)
 76            / (1 + sounding.mixing_ratio),
 77        )
 78
 79        def dz_dp(p, _):
 80            return (
 81                -self.formulae.constants.Rd
 82                * T_virt_of_p(p)
 83                / self.formulae.constants.g_std
 84                / p
 85            )
 86
 87        z_of_p = solve_ivp(
 88            fun=dz_dp,
 89            t_span=(max(sounding.pressure), min(sounding.pressure)),
 90            y0=np.asarray((0,)),
 91            t_eval=sounding.pressure,
 92        )
 93        assert z_of_p.success
 94        return z_of_p.y[0]
 95
 96    @staticmethod
 97    def z0(z):
 98        return np.where(z <= 1.7 * si.km, 0, 0.7 * si.km)
 99
100    @staticmethod
101    def hz(z):
102        return np.where(z <= 1.7 * si.km, 3.4 * si.km, 2.0 * si.km)
103
104    def alpha(self, x):
105        return np.where(abs(x - self.xc) <= 0.9 * si.km, 1, 0)
106
107    @staticmethod
108    def beta(x):
109        return np.where(x <= 5.4 * si.km, 1, -1)
110
111    @staticmethod
112    def A1(t):
113        if t <= 900 * si.s:
114            return 5.73e2
115        if t <= 1500 * si.s:
116            return 5.73e2 + 2.02e3 * (1 + np.cos(np.pi * ((t - 900) / 600 + 1)))
117        return 1.15e3 + 1.72e3 * (1 + np.cos(np.pi * ((min(t, 2400) - 1500) / 900 + 1)))
118
119    @staticmethod
120    def A2(t):
121        if t <= 300 * si.s:
122            return 0
123        if t <= 1500 * si.s:
124            return 6e2 * (1 + np.cos(np.pi * ((t - 300) / 600 - 1)))
125        return 5e2 * (1 + np.cos(np.pi * ((min(2400, t) - 1500) / 900 - 1)))
126
127    # see Appendix (page 2859)
128    def stream_function(self, xX, zZ, t):
129        x = xX * self.size[0]
130        z = zZ * self.size[-1]
131        return (
132            -self.A1(t)
133            * np.cos(self.alpha(x) * np.pi * (x - self.x0) / self.hx)
134            * np.sin(self.beta(x) * np.pi * (z - self.z0(z)) / self.hz(z))
135            + self.A2(t) / 2 * zZ**2
136        )
 11class Cumulus(Common):
 12    def __init__(self, formulae):
 13        super().__init__(formulae)
 14        self.size = (9 * si.km, 2.7 * si.km)
 15        self.hx = 1.8 * si.km
 16        self.x0 = 3.6 * si.km
 17        self.xc = self.size[0] / 2
 18        self.dz = 50 * si.m
 19        self.grid = tuple(s / self.dz for s in self.size)
 20        for g in self.grid:
 21            assert int(g) == g
 22        self.grid = tuple(int(g) for g in self.grid)
 23        self.dt = 1 * si.s
 24        self.simulation_time = 60 * si.min
 25
 26        self.__init_profiles()
 27
 28    def __init_profiles(self):
 29        z_of_p = self.__z_of_p()
 30        T_of_z = interp1d(z_of_p, sounding.temperature)
 31        p_of_z = interp1d(z_of_p, sounding.pressure)
 32        q_of_z = interp1d(z_of_p, sounding.mixing_ratio)
 33
 34        z_points_scalar = np.linspace(
 35            self.dz / 2, self.size[-1] - self.dz / 2, num=self.grid[-1], endpoint=True
 36        )
 37        z_points_both = np.linspace(
 38            0, self.size[-1], num=2 * self.grid[-1] + 1, endpoint=True
 39        )
 40        rhod_of_z = self.__rhod_of_z(T_of_z, p_of_z, q_of_z, z_of_p, z_points_both)
 41        self.rhod = interp1d(z_points_both / self.size[-1], rhod_of_z)
 42        self.initial_water_vapour_mixing_ratio = q_of_z(z_points_scalar)
 43        self.th_std0 = self.formulae.trivia.th_std(
 44            p_of_z(z_points_scalar), T_of_z(z_points_scalar)
 45        )
 46
 47    def rhod_of_zZ(self, zZ):
 48        return self.rhod(zZ)
 49
 50    def __rhod_of_z(self, T_of_z, p_of_z, q_of_z, z_of_p, z_points):
 51        def drhod_dz(z, _):
 52            lv = self.formulae.latent_heat.lv(T_of_z(z))
 53            return self.formulae.hydrostatics.drho_dz(
 54                self.formulae.constants.g_std, p_of_z(z), T_of_z(z), q_of_z(z), lv
 55            )
 56
 57        theta_std0 = self.formulae.trivia.th_std(
 58            sounding.pressure[0], sounding.temperature[0]
 59        )
 60        rhod0 = self.formulae.state_variable_triplet.rho_d(
 61            sounding.pressure[0], sounding.mixing_ratio[0], theta_std0
 62        )
 63        rhod_solution = solve_ivp(
 64            fun=drhod_dz,
 65            t_span=(0, np.amax(z_of_p)),
 66            y0=np.asarray((rhod0,)),
 67            t_eval=z_points,
 68        )
 69        assert rhod_solution.success
 70        return rhod_solution.y[0]
 71
 72    def __z_of_p(self):
 73        T_virt_of_p = interp1d(
 74            sounding.pressure,
 75            sounding.temperature
 76            * (1 + sounding.mixing_ratio / self.formulae.constants.eps)
 77            / (1 + sounding.mixing_ratio),
 78        )
 79
 80        def dz_dp(p, _):
 81            return (
 82                -self.formulae.constants.Rd
 83                * T_virt_of_p(p)
 84                / self.formulae.constants.g_std
 85                / p
 86            )
 87
 88        z_of_p = solve_ivp(
 89            fun=dz_dp,
 90            t_span=(max(sounding.pressure), min(sounding.pressure)),
 91            y0=np.asarray((0,)),
 92            t_eval=sounding.pressure,
 93        )
 94        assert z_of_p.success
 95        return z_of_p.y[0]
 96
 97    @staticmethod
 98    def z0(z):
 99        return np.where(z <= 1.7 * si.km, 0, 0.7 * si.km)
100
101    @staticmethod
102    def hz(z):
103        return np.where(z <= 1.7 * si.km, 3.4 * si.km, 2.0 * si.km)
104
105    def alpha(self, x):
106        return np.where(abs(x - self.xc) <= 0.9 * si.km, 1, 0)
107
108    @staticmethod
109    def beta(x):
110        return np.where(x <= 5.4 * si.km, 1, -1)
111
112    @staticmethod
113    def A1(t):
114        if t <= 900 * si.s:
115            return 5.73e2
116        if t <= 1500 * si.s:
117            return 5.73e2 + 2.02e3 * (1 + np.cos(np.pi * ((t - 900) / 600 + 1)))
118        return 1.15e3 + 1.72e3 * (1 + np.cos(np.pi * ((min(t, 2400) - 1500) / 900 + 1)))
119
120    @staticmethod
121    def A2(t):
122        if t <= 300 * si.s:
123            return 0
124        if t <= 1500 * si.s:
125            return 6e2 * (1 + np.cos(np.pi * ((t - 300) / 600 - 1)))
126        return 5e2 * (1 + np.cos(np.pi * ((min(2400, t) - 1500) / 900 - 1)))
127
128    # see Appendix (page 2859)
129    def stream_function(self, xX, zZ, t):
130        x = xX * self.size[0]
131        z = zZ * self.size[-1]
132        return (
133            -self.A1(t)
134            * np.cos(self.alpha(x) * np.pi * (x - self.x0) / self.hx)
135            * np.sin(self.beta(x) * np.pi * (z - self.z0(z)) / self.hz(z))
136            + self.A2(t) / 2 * zZ**2
137        )
Cumulus(formulae)
12    def __init__(self, formulae):
13        super().__init__(formulae)
14        self.size = (9 * si.km, 2.7 * si.km)
15        self.hx = 1.8 * si.km
16        self.x0 = 3.6 * si.km
17        self.xc = self.size[0] / 2
18        self.dz = 50 * si.m
19        self.grid = tuple(s / self.dz for s in self.size)
20        for g in self.grid:
21            assert int(g) == g
22        self.grid = tuple(int(g) for g in self.grid)
23        self.dt = 1 * si.s
24        self.simulation_time = 60 * si.min
25
26        self.__init_profiles()
size
hx
x0
xc
dz
grid
dt
simulation_time
def rhod_of_zZ(self, zZ):
47    def rhod_of_zZ(self, zZ):
48        return self.rhod(zZ)
@staticmethod
def z0(z):
97    @staticmethod
98    def z0(z):
99        return np.where(z <= 1.7 * si.km, 0, 0.7 * si.km)
@staticmethod
def hz(z):
101    @staticmethod
102    def hz(z):
103        return np.where(z <= 1.7 * si.km, 3.4 * si.km, 2.0 * si.km)
def alpha(self, x):
105    def alpha(self, x):
106        return np.where(abs(x - self.xc) <= 0.9 * si.km, 1, 0)
@staticmethod
def beta(x):
108    @staticmethod
109    def beta(x):
110        return np.where(x <= 5.4 * si.km, 1, -1)
@staticmethod
def A1(t):
112    @staticmethod
113    def A1(t):
114        if t <= 900 * si.s:
115            return 5.73e2
116        if t <= 1500 * si.s:
117            return 5.73e2 + 2.02e3 * (1 + np.cos(np.pi * ((t - 900) / 600 + 1)))
118        return 1.15e3 + 1.72e3 * (1 + np.cos(np.pi * ((min(t, 2400) - 1500) / 900 + 1)))
@staticmethod
def A2(t):
120    @staticmethod
121    def A2(t):
122        if t <= 300 * si.s:
123            return 0
124        if t <= 1500 * si.s:
125            return 6e2 * (1 + np.cos(np.pi * ((t - 300) / 600 - 1)))
126        return 5e2 * (1 + np.cos(np.pi * ((min(2400, t) - 1500) / 900 - 1)))
def stream_function(self, xX, zZ, t):
129    def stream_function(self, xX, zZ, t):
130        x = xX * self.size[0]
131        z = zZ * self.size[-1]
132        return (
133            -self.A1(t)
134            * np.cos(self.alpha(x) * np.pi * (x - self.x0) / self.hx)
135            * np.sin(self.beta(x) * np.pi * (z - self.z0(z)) / self.hz(z))
136            + self.A2(t) / 2 * zZ**2
137        )