PySDM.backends.numba

Multi-threaded CPU backend using LLVM-powered just-in-time compilation

  1"""
  2Multi-threaded CPU backend using LLVM-powered just-in-time compilation
  3"""
  4
  5import os
  6import platform
  7import warnings
  8
  9import numba
 10from numba import prange
 11import numpy as np
 12
 13from PySDM.backends.impl_numba import methods
 14from PySDM.backends.impl_numba.random import Random as ImportedRandom
 15from PySDM.backends.impl_numba.storage import Storage as ImportedStorage
 16from PySDM.formulae import Formulae
 17from PySDM.backends.impl_numba.conf import JIT_FLAGS
 18
 19
 20class Numba(  # pylint: disable=too-many-ancestors,duplicate-code
 21    methods.CollisionsMethods,
 22    methods.FragmentationMethods,
 23    methods.PairMethods,
 24    methods.IndexMethods,
 25    methods.PhysicsMethods,
 26    methods.CondensationMethods,
 27    methods.ChemistryMethods,
 28    methods.MomentsMethods,
 29    methods.FreezingMethods,
 30    methods.DisplacementMethods,
 31    methods.TerminalVelocityMethods,
 32    methods.IsotopeMethods,
 33    methods.SeedingMethods,
 34    methods.DepositionMethods,
 35):
 36    Storage = ImportedStorage
 37    Random = ImportedRandom
 38
 39    default_croupier = "local"
 40
 41    def __init__(
 42        self, formulae=None, *, double_precision=True, override_jit_flags=None
 43    ):
 44        if not double_precision:
 45            raise NotImplementedError()
 46        self.formulae = formulae or Formulae()
 47        self.formulae_flattened = self.formulae.flatten
 48
 49        parallel_default = True
 50
 51        if override_jit_flags is not None and "parallel" in override_jit_flags:
 52            parallel_default = override_jit_flags["parallel"]
 53
 54        if parallel_default:
 55            if platform.machine() == "arm64":
 56                if "CI" not in os.environ:
 57                    warnings.warn(
 58                        "Disabling Numba threading due to ARM64 CPU (atomics do not work yet)"
 59                    )
 60                parallel_default = False  # TODO #1183 - atomics don't work on ARM64!
 61
 62            try:
 63                numba.parfors.parfor.ensure_parallel_support()
 64            except numba.core.errors.UnsupportedParforsError:
 65                if "CI" not in os.environ:
 66                    warnings.warn(
 67                        "Numba version used does not support parallel for (32 bits?)"
 68                    )
 69                parallel_default = False
 70
 71            if not numba.config.DISABLE_JIT:  # pylint: disable=no-member
 72
 73                @numba.jit(parallel=True, nopython=True)
 74                def fill_array_with_thread_id(arr):
 75                    """writes thread id to corresponding array element"""
 76                    for i in prange(  # pylint: disable=not-an-iterable
 77                        numba.get_num_threads()
 78                    ):
 79                        arr[i] = numba.get_thread_id()
 80
 81                fill_array_with_thread_id(arr := np.full(numba.get_num_threads(), -1))
 82                if not max(arr) == arr[-1] == numba.get_num_threads() - 1:
 83                    raise ValueError(
 84                        "Numba threading enabled but does not work as expected"
 85                        " (try setting the NUMBA_THREADING_LAYER env var to 'workqueue')"
 86                    )
 87
 88        assert "fastmath" not in (override_jit_flags or {})
 89        self.default_jit_flags = {
 90            **JIT_FLAGS,  # here parallel=False (for out-of-backend code)
 91            **{"fastmath": self.formulae.fastmath, "parallel": parallel_default},
 92            **(override_jit_flags or {}),
 93        }
 94
 95        methods.CollisionsMethods.__init__(self)
 96        methods.FragmentationMethods.__init__(self)
 97        methods.PairMethods.__init__(self)
 98        methods.IndexMethods.__init__(self)
 99        methods.PhysicsMethods.__init__(self)
100        methods.CondensationMethods.__init__(self)
101        methods.ChemistryMethods.__init__(self)
102        methods.MomentsMethods.__init__(self)
103        methods.FreezingMethods.__init__(self)
104        methods.DisplacementMethods.__init__(self)
105        methods.TerminalVelocityMethods.__init__(self)
106        methods.IsotopeMethods.__init__(self)
107        methods.SeedingMethods.__init__(self)
108        methods.DepositionMethods.__init__(self)
 21class Numba(  # pylint: disable=too-many-ancestors,duplicate-code
 22    methods.CollisionsMethods,
 23    methods.FragmentationMethods,
 24    methods.PairMethods,
 25    methods.IndexMethods,
 26    methods.PhysicsMethods,
 27    methods.CondensationMethods,
 28    methods.ChemistryMethods,
 29    methods.MomentsMethods,
 30    methods.FreezingMethods,
 31    methods.DisplacementMethods,
 32    methods.TerminalVelocityMethods,
 33    methods.IsotopeMethods,
 34    methods.SeedingMethods,
 35    methods.DepositionMethods,
 36):
 37    Storage = ImportedStorage
 38    Random = ImportedRandom
 39
 40    default_croupier = "local"
 41
 42    def __init__(
 43        self, formulae=None, *, double_precision=True, override_jit_flags=None
 44    ):
 45        if not double_precision:
 46            raise NotImplementedError()
 47        self.formulae = formulae or Formulae()
 48        self.formulae_flattened = self.formulae.flatten
 49
 50        parallel_default = True
 51
 52        if override_jit_flags is not None and "parallel" in override_jit_flags:
 53            parallel_default = override_jit_flags["parallel"]
 54
 55        if parallel_default:
 56            if platform.machine() == "arm64":
 57                if "CI" not in os.environ:
 58                    warnings.warn(
 59                        "Disabling Numba threading due to ARM64 CPU (atomics do not work yet)"
 60                    )
 61                parallel_default = False  # TODO #1183 - atomics don't work on ARM64!
 62
 63            try:
 64                numba.parfors.parfor.ensure_parallel_support()
 65            except numba.core.errors.UnsupportedParforsError:
 66                if "CI" not in os.environ:
 67                    warnings.warn(
 68                        "Numba version used does not support parallel for (32 bits?)"
 69                    )
 70                parallel_default = False
 71
 72            if not numba.config.DISABLE_JIT:  # pylint: disable=no-member
 73
 74                @numba.jit(parallel=True, nopython=True)
 75                def fill_array_with_thread_id(arr):
 76                    """writes thread id to corresponding array element"""
 77                    for i in prange(  # pylint: disable=not-an-iterable
 78                        numba.get_num_threads()
 79                    ):
 80                        arr[i] = numba.get_thread_id()
 81
 82                fill_array_with_thread_id(arr := np.full(numba.get_num_threads(), -1))
 83                if not max(arr) == arr[-1] == numba.get_num_threads() - 1:
 84                    raise ValueError(
 85                        "Numba threading enabled but does not work as expected"
 86                        " (try setting the NUMBA_THREADING_LAYER env var to 'workqueue')"
 87                    )
 88
 89        assert "fastmath" not in (override_jit_flags or {})
 90        self.default_jit_flags = {
 91            **JIT_FLAGS,  # here parallel=False (for out-of-backend code)
 92            **{"fastmath": self.formulae.fastmath, "parallel": parallel_default},
 93            **(override_jit_flags or {}),
 94        }
 95
 96        methods.CollisionsMethods.__init__(self)
 97        methods.FragmentationMethods.__init__(self)
 98        methods.PairMethods.__init__(self)
 99        methods.IndexMethods.__init__(self)
100        methods.PhysicsMethods.__init__(self)
101        methods.CondensationMethods.__init__(self)
102        methods.ChemistryMethods.__init__(self)
103        methods.MomentsMethods.__init__(self)
104        methods.FreezingMethods.__init__(self)
105        methods.DisplacementMethods.__init__(self)
106        methods.TerminalVelocityMethods.__init__(self)
107        methods.IsotopeMethods.__init__(self)
108        methods.SeedingMethods.__init__(self)
109        methods.DepositionMethods.__init__(self)

CPU backend methods for droplet isotope processing.

Provides Numba-accelerated kernels for:

  • isotopic ratio to delta conversion,
  • Bolin number evaluation,
  • heavy-isotope fractionation during condensation/evaporation.
Numba(formulae=None, *, double_precision=True, override_jit_flags=None)
 42    def __init__(
 43        self, formulae=None, *, double_precision=True, override_jit_flags=None
 44    ):
 45        if not double_precision:
 46            raise NotImplementedError()
 47        self.formulae = formulae or Formulae()
 48        self.formulae_flattened = self.formulae.flatten
 49
 50        parallel_default = True
 51
 52        if override_jit_flags is not None and "parallel" in override_jit_flags:
 53            parallel_default = override_jit_flags["parallel"]
 54
 55        if parallel_default:
 56            if platform.machine() == "arm64":
 57                if "CI" not in os.environ:
 58                    warnings.warn(
 59                        "Disabling Numba threading due to ARM64 CPU (atomics do not work yet)"
 60                    )
 61                parallel_default = False  # TODO #1183 - atomics don't work on ARM64!
 62
 63            try:
 64                numba.parfors.parfor.ensure_parallel_support()
 65            except numba.core.errors.UnsupportedParforsError:
 66                if "CI" not in os.environ:
 67                    warnings.warn(
 68                        "Numba version used does not support parallel for (32 bits?)"
 69                    )
 70                parallel_default = False
 71
 72            if not numba.config.DISABLE_JIT:  # pylint: disable=no-member
 73
 74                @numba.jit(parallel=True, nopython=True)
 75                def fill_array_with_thread_id(arr):
 76                    """writes thread id to corresponding array element"""
 77                    for i in prange(  # pylint: disable=not-an-iterable
 78                        numba.get_num_threads()
 79                    ):
 80                        arr[i] = numba.get_thread_id()
 81
 82                fill_array_with_thread_id(arr := np.full(numba.get_num_threads(), -1))
 83                if not max(arr) == arr[-1] == numba.get_num_threads() - 1:
 84                    raise ValueError(
 85                        "Numba threading enabled but does not work as expected"
 86                        " (try setting the NUMBA_THREADING_LAYER env var to 'workqueue')"
 87                    )
 88
 89        assert "fastmath" not in (override_jit_flags or {})
 90        self.default_jit_flags = {
 91            **JIT_FLAGS,  # here parallel=False (for out-of-backend code)
 92            **{"fastmath": self.formulae.fastmath, "parallel": parallel_default},
 93            **(override_jit_flags or {}),
 94        }
 95
 96        methods.CollisionsMethods.__init__(self)
 97        methods.FragmentationMethods.__init__(self)
 98        methods.PairMethods.__init__(self)
 99        methods.IndexMethods.__init__(self)
100        methods.PhysicsMethods.__init__(self)
101        methods.CondensationMethods.__init__(self)
102        methods.ChemistryMethods.__init__(self)
103        methods.MomentsMethods.__init__(self)
104        methods.FreezingMethods.__init__(self)
105        methods.DisplacementMethods.__init__(self)
106        methods.TerminalVelocityMethods.__init__(self)
107        methods.IsotopeMethods.__init__(self)
108        methods.SeedingMethods.__init__(self)
109        methods.DepositionMethods.__init__(self)
default_croupier = 'local'
formulae
formulae_flattened
default_jit_flags
Inherited Members
PySDM.backends.impl_numba.methods.collisions_methods.CollisionsMethods
adaptive_sdm_end
scale_prob_for_adaptive_sdm_gamma
cell_id
collision_coalescence
collision_coalescence_breakup
compute_gamma
make_cell_caretaker
normalize
remove_zero_n_or_flagged
linear_collection_efficiency
PySDM.backends.impl_numba.methods.fragmentation_methods.FragmentationMethods
fragmentation_limiters
slams_fragmentation
exp_fragmentation
feingold1988_fragmentation
gauss_fragmentation
straub_fragmentation
ll82_fragmentation
ll82_coalescence_check
PySDM.backends.impl_numba.methods.pair_methods.PairMethods
distance_pair
find_pairs
max_pair
min_pair
sort_pair
sort_within_pair_by_attr
sum_pair
multiply_pair
PySDM.backends.impl_numba.methods.index_methods.IndexMethods
identity_index
shuffle_global
shuffle_local
sort_by_key
PySDM.backends.impl_numba.methods.physics_methods.PhysicsMethods
critical_volume
temperature_pressure_rh
a_w_ice
volume_of_water_mass
mass_of_water_volume
air_density
air_dynamic_viscosity
reynolds_number
explicit_euler
PySDM.backends.impl_numba.methods.condensation_methods.CondensationMethods
condensation
make_adapt_substeps
make_step_fake
make_step
make_step_impl
make_calculate_ml_old
make_calculate_ml_new
make_condensation_solver
make_condensation_solver_impl
PySDM.backends.impl_numba.methods.chemistry_methods.ChemistryMethods
HENRY_CONST
KINETIC_CONST
EQUILIBRIUM_CONST
specific_gravities
dissolution
dissolution_body
oxidation
oxidation_body
chem_recalculate_drop_data
chem_recalculate_cell_data
equilibrate_H
equilibrate_H_body
PySDM.backends.impl_numba.methods.moments_methods.MomentsMethods
moments
spectrum_moments
PySDM.backends.impl_numba.methods.freezing_methods.FreezingMethods
thaw_instantaneous
immersion_freezing_singular
immersion_freezing_time_dependent
homogeneous_freezing_threshold
homogeneous_freezing_time_dependent
record_freezing_temperatures
PySDM.backends.impl_numba.methods.displacement_methods.DisplacementMethods
calculate_displacement_body_1d
calculate_displacement_body_2d
calculate_displacement_body_3d
calculate_displacement
flag_precipitated
flag_out_of_column
PySDM.backends.impl_numba.methods.terminal_velocity_methods.TerminalVelocityMethods
gunn_and_kinzer_interpolation
rogers_and_yau_terminal_velocity
power_series
terminal_velocity_columnar_ice_crystals
terminal_velocity_ice_spheres
PySDM.backends.impl_numba.methods.isotope_methods.IsotopeMethods
isotopic_delta
isotopic_fractionation
bolin_number
PySDM.backends.impl_numba.methods.seeding_methods.SeedingMethods
seeding
PySDM.backends.impl_numba.methods.deposition_methods.DepositionMethods
deposition
class Numba.Storage(PySDM.backends.impl_common.storage_utils.StorageBase):
 18class Storage(StorageBase):
 19    FLOAT = np.float64
 20    INT = np.int64
 21    BOOL = np.bool_
 22
 23    def __getitem__(self, item):
 24        dim = len(self.shape)
 25        if isinstance(item, slice):
 26            step = item.step or 1
 27            if step != 1:
 28                raise NotImplementedError("step != 1")
 29            start = item.start or 0
 30            if dim == 1:
 31                stop = item.stop or len(self)
 32                result_data = self.data[item]
 33                result_shape = (stop - start,)
 34            elif dim == 2:
 35                stop = item.stop or self.shape[0]
 36                result_data = self.data[item]
 37                result_shape = (stop - start, self.shape[1])
 38            else:
 39                raise NotImplementedError(
 40                    "Only 2 or less dimensions array is supported."
 41                )
 42            if stop > self.data.shape[0]:
 43                raise IndexError(
 44                    f"requested a slice ({start}:{stop}) of Storage"
 45                    f" with first dim of length {self.data.shape[0]}"
 46                )
 47            result = Storage(StorageSignature(result_data, result_shape, self.dtype))
 48        elif isinstance(item, tuple) and dim == 2 and isinstance(item[1], slice):
 49            result = Storage(
 50                StorageSignature(self.data[item[0]], (*self.shape[1:],), self.dtype)
 51            )
 52        else:
 53            result = self.data[item]
 54        return result
 55
 56    def __setitem__(self, key, value):
 57        if hasattr(value, "data"):
 58            self.data[key] = value.data
 59        else:
 60            self.data[key] = value
 61        return self
 62
 63    def __iadd__(self, other):
 64        if isinstance(other, Storage):
 65            impl.add(self.data, other.data)
 66        elif (
 67            isinstance(other, tuple)
 68            and len(other) == 3
 69            and isinstance(other[0], float)
 70            and other[1] == "*"
 71            and isinstance(other[2], Storage)
 72        ):
 73            impl.add_with_multiplier(self.data, other[2].data, other[0])
 74        else:
 75            impl.add(self.data, other)
 76        return self
 77
 78    def __isub__(self, other):
 79        impl.subtract(self.data, other.data)
 80        return self
 81
 82    def __imul__(self, other):
 83        if hasattr(other, "data"):
 84            impl.multiply(self.data, other.data)
 85        else:
 86            impl.multiply(self.data, other)
 87        return self
 88
 89    def __itruediv__(self, other):
 90        if hasattr(other, "data"):
 91            self.data[:] /= other.data[:]
 92        else:
 93            self.data[:] /= other
 94        return self
 95
 96    def __imod__(self, other):
 97        impl.row_modulo(self.data, other.data)
 98        return self
 99
100    def __ipow__(self, other):
101        impl.power(self.data, other)
102        return self
103
104    def __bool__(self):
105        if len(self) == 1:
106            result = bool(self.data[0] != 0)
107        else:
108            raise NotImplementedError("Logic value of array is ambiguous.")
109        return result
110
111    def detach(self):
112        if self.data.base is not None:
113            self.data = np.array(self.data)
114
115    def download(self, target, reshape=False):
116        if reshape:
117            data = self.data.reshape(target.shape)
118        else:
119            data = self.data
120        np.copyto(target, data, casting="safe")
121
122    @staticmethod
123    def _get_empty_data(shape, dtype):
124        if dtype in (float, Storage.FLOAT):
125            data = np.full(shape, np.nan, dtype=Storage.FLOAT)
126            dtype = Storage.FLOAT
127        elif dtype in (int, Storage.INT):
128            data = np.full(shape, -1, dtype=Storage.INT)
129            dtype = Storage.INT
130        elif dtype in (bool, Storage.BOOL):
131            data = np.full(shape, -1, dtype=Storage.BOOL)
132            dtype = Storage.BOOL
133        else:
134            raise NotImplementedError()
135
136        return StorageSignature(data, shape, dtype)
137
138    @staticmethod
139    def empty(shape, dtype):
140        return empty(shape, dtype, Storage)
141
142    @staticmethod
143    def _get_data_from_ndarray(array):
144        return get_data_from_ndarray(
145            array=array,
146            storage_class=Storage,
147            copy_fun=lambda array_astype: array_astype.copy(),
148        )
149
150    def amin(self):
151        return impl.amin(self.data)
152
153    def amax(self):
154        return impl.amax(self.data)
155
156    def all(self):
157        return self.data.all()
158
159    @staticmethod
160    def from_ndarray(array):
161        result = Storage(Storage._get_data_from_ndarray(array))
162        return result
163
164    def floor(self, other=None):
165        if other is None:
166            impl.floor(self.data)
167        else:
168            impl.floor_out_of_place(self.data, other.data)
169        return self
170
171    def product(self, multiplicand, multiplier):
172        if hasattr(multiplier, "data"):
173            impl.multiply_out_of_place(self.data, multiplicand.data, multiplier.data)
174        else:
175            impl.multiply_out_of_place(self.data, multiplicand.data, multiplier)
176        return self
177
178    def ratio(self, dividend, divisor):
179        impl.divide_out_of_place(self.data, dividend.data, divisor.data)
180        return self
181
182    def divide_if_not_zero(self, divisor):
183        impl.divide_if_not_zero(self.data, divisor.data)
184        return self
185
186    def where(self, condition, true_value, false_value):
187        impl.where(self.data, condition.data, true_value.data, false_value.data)
188        return self
189
190    def isless(self, comparison, value):
191        impl.isless(self.data, comparison.data, value)
192
193    def sum(self, arg_a, arg_b):
194        impl.sum_out_of_place(self.data, arg_a.data, arg_b.data)
195        return self
196
197    def ravel(self, other):
198        if isinstance(other, Storage):
199            self.data[:] = other.data.ravel()
200        else:
201            self.data[:] = other.ravel()
202
203    def urand(self, generator):
204        generator(self)
205
206    def to_ndarray(self):
207        return self.data.copy()
208
209    def upload(self, data):
210        np.copyto(self.data, data, casting="safe")
211
212    def fill(self, other):
213        if isinstance(other, Storage):
214            self.data[:] = other.data
215        else:
216            self.data[:] = other
217
218    def exp(self):
219        self.data[:] = np.exp(self.data)
220
221    def abs(self):
222        self.data[:] = np.abs(self.data)
14class Random(RandomCommon):  # pylint: disable=too-few-public-methods
15    def __init__(self, size, seed):
16        super().__init__(size, seed)
17        self.generator = np.random.default_rng(seed)
18
19    def __call__(self, storage):
20        storage.data[:] = self.generator.uniform(0, 1, storage.shape)