PySDM.backends.numba
Multi-threaded CPU backend using LLVM-powered just-in-time compilation
1""" 2Multi-threaded CPU backend using LLVM-powered just-in-time compilation 3""" 4 5import os 6import platform 7import warnings 8 9import numba 10from numba import prange 11import numpy as np 12 13from PySDM.backends.impl_numba import methods 14from PySDM.backends.impl_numba.random import Random as ImportedRandom 15from PySDM.backends.impl_numba.storage import Storage as ImportedStorage 16from PySDM.formulae import Formulae 17from PySDM.backends.impl_numba.conf import JIT_FLAGS 18 19 20class Numba( # pylint: disable=too-many-ancestors,duplicate-code 21 methods.CollisionsMethods, 22 methods.FragmentationMethods, 23 methods.PairMethods, 24 methods.IndexMethods, 25 methods.PhysicsMethods, 26 methods.CondensationMethods, 27 methods.ChemistryMethods, 28 methods.MomentsMethods, 29 methods.FreezingMethods, 30 methods.DisplacementMethods, 31 methods.TerminalVelocityMethods, 32 methods.IsotopeMethods, 33 methods.SeedingMethods, 34 methods.DepositionMethods, 35): 36 Storage = ImportedStorage 37 Random = ImportedRandom 38 39 default_croupier = "local" 40 41 def __init__( 42 self, formulae=None, *, double_precision=True, override_jit_flags=None 43 ): 44 if not double_precision: 45 raise NotImplementedError() 46 self.formulae = formulae or Formulae() 47 self.formulae_flattened = self.formulae.flatten 48 49 parallel_default = True 50 51 if override_jit_flags is not None and "parallel" in override_jit_flags: 52 parallel_default = override_jit_flags["parallel"] 53 54 if parallel_default: 55 if platform.machine() == "arm64": 56 if "CI" not in os.environ: 57 warnings.warn( 58 "Disabling Numba threading due to ARM64 CPU (atomics do not work yet)" 59 ) 60 parallel_default = False # TODO #1183 - atomics don't work on ARM64! 61 62 try: 63 numba.parfors.parfor.ensure_parallel_support() 64 except numba.core.errors.UnsupportedParforsError: 65 if "CI" not in os.environ: 66 warnings.warn( 67 "Numba version used does not support parallel for (32 bits?)" 68 ) 69 parallel_default = False 70 71 if not numba.config.DISABLE_JIT: # pylint: disable=no-member 72 73 @numba.jit(parallel=True, nopython=True) 74 def fill_array_with_thread_id(arr): 75 """writes thread id to corresponding array element""" 76 for i in prange( # pylint: disable=not-an-iterable 77 numba.get_num_threads() 78 ): 79 arr[i] = numba.get_thread_id() 80 81 fill_array_with_thread_id(arr := np.full(numba.get_num_threads(), -1)) 82 if not max(arr) == arr[-1] == numba.get_num_threads() - 1: 83 raise ValueError( 84 "Numba threading enabled but does not work as expected" 85 " (try setting the NUMBA_THREADING_LAYER env var to 'workqueue')" 86 ) 87 88 assert "fastmath" not in (override_jit_flags or {}) 89 self.default_jit_flags = { 90 **JIT_FLAGS, # here parallel=False (for out-of-backend code) 91 **{"fastmath": self.formulae.fastmath, "parallel": parallel_default}, 92 **(override_jit_flags or {}), 93 } 94 95 methods.CollisionsMethods.__init__(self) 96 methods.FragmentationMethods.__init__(self) 97 methods.PairMethods.__init__(self) 98 methods.IndexMethods.__init__(self) 99 methods.PhysicsMethods.__init__(self) 100 methods.CondensationMethods.__init__(self) 101 methods.ChemistryMethods.__init__(self) 102 methods.MomentsMethods.__init__(self) 103 methods.FreezingMethods.__init__(self) 104 methods.DisplacementMethods.__init__(self) 105 methods.TerminalVelocityMethods.__init__(self) 106 methods.IsotopeMethods.__init__(self) 107 methods.SeedingMethods.__init__(self) 108 methods.DepositionMethods.__init__(self)
class
Numba(PySDM.backends.impl_numba.methods.collisions_methods.CollisionsMethods, PySDM.backends.impl_numba.methods.fragmentation_methods.FragmentationMethods, PySDM.backends.impl_numba.methods.pair_methods.PairMethods, PySDM.backends.impl_numba.methods.index_methods.IndexMethods, PySDM.backends.impl_numba.methods.physics_methods.PhysicsMethods, PySDM.backends.impl_numba.methods.condensation_methods.CondensationMethods, PySDM.backends.impl_numba.methods.chemistry_methods.ChemistryMethods, PySDM.backends.impl_numba.methods.moments_methods.MomentsMethods, PySDM.backends.impl_numba.methods.freezing_methods.FreezingMethods, PySDM.backends.impl_numba.methods.displacement_methods.DisplacementMethods, PySDM.backends.impl_numba.methods.terminal_velocity_methods.TerminalVelocityMethods, PySDM.backends.impl_numba.methods.isotope_methods.IsotopeMethods, PySDM.backends.impl_numba.methods.seeding_methods.SeedingMethods, PySDM.backends.impl_numba.methods.deposition_methods.DepositionMethods):
21class Numba( # pylint: disable=too-many-ancestors,duplicate-code 22 methods.CollisionsMethods, 23 methods.FragmentationMethods, 24 methods.PairMethods, 25 methods.IndexMethods, 26 methods.PhysicsMethods, 27 methods.CondensationMethods, 28 methods.ChemistryMethods, 29 methods.MomentsMethods, 30 methods.FreezingMethods, 31 methods.DisplacementMethods, 32 methods.TerminalVelocityMethods, 33 methods.IsotopeMethods, 34 methods.SeedingMethods, 35 methods.DepositionMethods, 36): 37 Storage = ImportedStorage 38 Random = ImportedRandom 39 40 default_croupier = "local" 41 42 def __init__( 43 self, formulae=None, *, double_precision=True, override_jit_flags=None 44 ): 45 if not double_precision: 46 raise NotImplementedError() 47 self.formulae = formulae or Formulae() 48 self.formulae_flattened = self.formulae.flatten 49 50 parallel_default = True 51 52 if override_jit_flags is not None and "parallel" in override_jit_flags: 53 parallel_default = override_jit_flags["parallel"] 54 55 if parallel_default: 56 if platform.machine() == "arm64": 57 if "CI" not in os.environ: 58 warnings.warn( 59 "Disabling Numba threading due to ARM64 CPU (atomics do not work yet)" 60 ) 61 parallel_default = False # TODO #1183 - atomics don't work on ARM64! 62 63 try: 64 numba.parfors.parfor.ensure_parallel_support() 65 except numba.core.errors.UnsupportedParforsError: 66 if "CI" not in os.environ: 67 warnings.warn( 68 "Numba version used does not support parallel for (32 bits?)" 69 ) 70 parallel_default = False 71 72 if not numba.config.DISABLE_JIT: # pylint: disable=no-member 73 74 @numba.jit(parallel=True, nopython=True) 75 def fill_array_with_thread_id(arr): 76 """writes thread id to corresponding array element""" 77 for i in prange( # pylint: disable=not-an-iterable 78 numba.get_num_threads() 79 ): 80 arr[i] = numba.get_thread_id() 81 82 fill_array_with_thread_id(arr := np.full(numba.get_num_threads(), -1)) 83 if not max(arr) == arr[-1] == numba.get_num_threads() - 1: 84 raise ValueError( 85 "Numba threading enabled but does not work as expected" 86 " (try setting the NUMBA_THREADING_LAYER env var to 'workqueue')" 87 ) 88 89 assert "fastmath" not in (override_jit_flags or {}) 90 self.default_jit_flags = { 91 **JIT_FLAGS, # here parallel=False (for out-of-backend code) 92 **{"fastmath": self.formulae.fastmath, "parallel": parallel_default}, 93 **(override_jit_flags or {}), 94 } 95 96 methods.CollisionsMethods.__init__(self) 97 methods.FragmentationMethods.__init__(self) 98 methods.PairMethods.__init__(self) 99 methods.IndexMethods.__init__(self) 100 methods.PhysicsMethods.__init__(self) 101 methods.CondensationMethods.__init__(self) 102 methods.ChemistryMethods.__init__(self) 103 methods.MomentsMethods.__init__(self) 104 methods.FreezingMethods.__init__(self) 105 methods.DisplacementMethods.__init__(self) 106 methods.TerminalVelocityMethods.__init__(self) 107 methods.IsotopeMethods.__init__(self) 108 methods.SeedingMethods.__init__(self) 109 methods.DepositionMethods.__init__(self)
CPU backend methods for droplet isotope processing.
Provides Numba-accelerated kernels for:
- isotopic ratio to delta conversion,
- Bolin number evaluation,
- heavy-isotope fractionation during condensation/evaporation.
Numba(formulae=None, *, double_precision=True, override_jit_flags=None)
42 def __init__( 43 self, formulae=None, *, double_precision=True, override_jit_flags=None 44 ): 45 if not double_precision: 46 raise NotImplementedError() 47 self.formulae = formulae or Formulae() 48 self.formulae_flattened = self.formulae.flatten 49 50 parallel_default = True 51 52 if override_jit_flags is not None and "parallel" in override_jit_flags: 53 parallel_default = override_jit_flags["parallel"] 54 55 if parallel_default: 56 if platform.machine() == "arm64": 57 if "CI" not in os.environ: 58 warnings.warn( 59 "Disabling Numba threading due to ARM64 CPU (atomics do not work yet)" 60 ) 61 parallel_default = False # TODO #1183 - atomics don't work on ARM64! 62 63 try: 64 numba.parfors.parfor.ensure_parallel_support() 65 except numba.core.errors.UnsupportedParforsError: 66 if "CI" not in os.environ: 67 warnings.warn( 68 "Numba version used does not support parallel for (32 bits?)" 69 ) 70 parallel_default = False 71 72 if not numba.config.DISABLE_JIT: # pylint: disable=no-member 73 74 @numba.jit(parallel=True, nopython=True) 75 def fill_array_with_thread_id(arr): 76 """writes thread id to corresponding array element""" 77 for i in prange( # pylint: disable=not-an-iterable 78 numba.get_num_threads() 79 ): 80 arr[i] = numba.get_thread_id() 81 82 fill_array_with_thread_id(arr := np.full(numba.get_num_threads(), -1)) 83 if not max(arr) == arr[-1] == numba.get_num_threads() - 1: 84 raise ValueError( 85 "Numba threading enabled but does not work as expected" 86 " (try setting the NUMBA_THREADING_LAYER env var to 'workqueue')" 87 ) 88 89 assert "fastmath" not in (override_jit_flags or {}) 90 self.default_jit_flags = { 91 **JIT_FLAGS, # here parallel=False (for out-of-backend code) 92 **{"fastmath": self.formulae.fastmath, "parallel": parallel_default}, 93 **(override_jit_flags or {}), 94 } 95 96 methods.CollisionsMethods.__init__(self) 97 methods.FragmentationMethods.__init__(self) 98 methods.PairMethods.__init__(self) 99 methods.IndexMethods.__init__(self) 100 methods.PhysicsMethods.__init__(self) 101 methods.CondensationMethods.__init__(self) 102 methods.ChemistryMethods.__init__(self) 103 methods.MomentsMethods.__init__(self) 104 methods.FreezingMethods.__init__(self) 105 methods.DisplacementMethods.__init__(self) 106 methods.TerminalVelocityMethods.__init__(self) 107 methods.IsotopeMethods.__init__(self) 108 methods.SeedingMethods.__init__(self) 109 methods.DepositionMethods.__init__(self)
Inherited Members
- PySDM.backends.impl_numba.methods.collisions_methods.CollisionsMethods
- adaptive_sdm_end
- scale_prob_for_adaptive_sdm_gamma
- cell_id
- collision_coalescence
- collision_coalescence_breakup
- compute_gamma
- make_cell_caretaker
- normalize
- remove_zero_n_or_flagged
- linear_collection_efficiency
- PySDM.backends.impl_numba.methods.fragmentation_methods.FragmentationMethods
- fragmentation_limiters
- slams_fragmentation
- exp_fragmentation
- feingold1988_fragmentation
- gauss_fragmentation
- straub_fragmentation
- ll82_fragmentation
- ll82_coalescence_check
- PySDM.backends.impl_numba.methods.pair_methods.PairMethods
- distance_pair
- find_pairs
- max_pair
- min_pair
- sort_pair
- sort_within_pair_by_attr
- sum_pair
- multiply_pair
- PySDM.backends.impl_numba.methods.index_methods.IndexMethods
- identity_index
- shuffle_global
- shuffle_local
- sort_by_key
- PySDM.backends.impl_numba.methods.physics_methods.PhysicsMethods
- critical_volume
- temperature_pressure_rh
- a_w_ice
- volume_of_water_mass
- mass_of_water_volume
- air_density
- air_dynamic_viscosity
- reynolds_number
- explicit_euler
- PySDM.backends.impl_numba.methods.condensation_methods.CondensationMethods
- condensation
- make_adapt_substeps
- make_step_fake
- make_step
- make_step_impl
- make_calculate_ml_old
- make_calculate_ml_new
- make_condensation_solver
- make_condensation_solver_impl
- PySDM.backends.impl_numba.methods.chemistry_methods.ChemistryMethods
- HENRY_CONST
- KINETIC_CONST
- EQUILIBRIUM_CONST
- specific_gravities
- dissolution
- dissolution_body
- oxidation
- oxidation_body
- chem_recalculate_drop_data
- chem_recalculate_cell_data
- equilibrate_H
- equilibrate_H_body
- PySDM.backends.impl_numba.methods.freezing_methods.FreezingMethods
- thaw_instantaneous
- immersion_freezing_singular
- immersion_freezing_time_dependent
- homogeneous_freezing_threshold
- homogeneous_freezing_time_dependent
- record_freezing_temperatures
- PySDM.backends.impl_numba.methods.displacement_methods.DisplacementMethods
- calculate_displacement_body_1d
- calculate_displacement_body_2d
- calculate_displacement_body_3d
- calculate_displacement
- flag_precipitated
- flag_out_of_column
- PySDM.backends.impl_numba.methods.terminal_velocity_methods.TerminalVelocityMethods
- gunn_and_kinzer_interpolation
- rogers_and_yau_terminal_velocity
- power_series
- terminal_velocity_columnar_ice_crystals
- terminal_velocity_ice_spheres
18class Storage(StorageBase): 19 FLOAT = np.float64 20 INT = np.int64 21 BOOL = np.bool_ 22 23 def __getitem__(self, item): 24 dim = len(self.shape) 25 if isinstance(item, slice): 26 step = item.step or 1 27 if step != 1: 28 raise NotImplementedError("step != 1") 29 start = item.start or 0 30 if dim == 1: 31 stop = item.stop or len(self) 32 result_data = self.data[item] 33 result_shape = (stop - start,) 34 elif dim == 2: 35 stop = item.stop or self.shape[0] 36 result_data = self.data[item] 37 result_shape = (stop - start, self.shape[1]) 38 else: 39 raise NotImplementedError( 40 "Only 2 or less dimensions array is supported." 41 ) 42 if stop > self.data.shape[0]: 43 raise IndexError( 44 f"requested a slice ({start}:{stop}) of Storage" 45 f" with first dim of length {self.data.shape[0]}" 46 ) 47 result = Storage(StorageSignature(result_data, result_shape, self.dtype)) 48 elif isinstance(item, tuple) and dim == 2 and isinstance(item[1], slice): 49 result = Storage( 50 StorageSignature(self.data[item[0]], (*self.shape[1:],), self.dtype) 51 ) 52 else: 53 result = self.data[item] 54 return result 55 56 def __setitem__(self, key, value): 57 if hasattr(value, "data"): 58 self.data[key] = value.data 59 else: 60 self.data[key] = value 61 return self 62 63 def __iadd__(self, other): 64 if isinstance(other, Storage): 65 impl.add(self.data, other.data) 66 elif ( 67 isinstance(other, tuple) 68 and len(other) == 3 69 and isinstance(other[0], float) 70 and other[1] == "*" 71 and isinstance(other[2], Storage) 72 ): 73 impl.add_with_multiplier(self.data, other[2].data, other[0]) 74 else: 75 impl.add(self.data, other) 76 return self 77 78 def __isub__(self, other): 79 impl.subtract(self.data, other.data) 80 return self 81 82 def __imul__(self, other): 83 if hasattr(other, "data"): 84 impl.multiply(self.data, other.data) 85 else: 86 impl.multiply(self.data, other) 87 return self 88 89 def __itruediv__(self, other): 90 if hasattr(other, "data"): 91 self.data[:] /= other.data[:] 92 else: 93 self.data[:] /= other 94 return self 95 96 def __imod__(self, other): 97 impl.row_modulo(self.data, other.data) 98 return self 99 100 def __ipow__(self, other): 101 impl.power(self.data, other) 102 return self 103 104 def __bool__(self): 105 if len(self) == 1: 106 result = bool(self.data[0] != 0) 107 else: 108 raise NotImplementedError("Logic value of array is ambiguous.") 109 return result 110 111 def detach(self): 112 if self.data.base is not None: 113 self.data = np.array(self.data) 114 115 def download(self, target, reshape=False): 116 if reshape: 117 data = self.data.reshape(target.shape) 118 else: 119 data = self.data 120 np.copyto(target, data, casting="safe") 121 122 @staticmethod 123 def _get_empty_data(shape, dtype): 124 if dtype in (float, Storage.FLOAT): 125 data = np.full(shape, np.nan, dtype=Storage.FLOAT) 126 dtype = Storage.FLOAT 127 elif dtype in (int, Storage.INT): 128 data = np.full(shape, -1, dtype=Storage.INT) 129 dtype = Storage.INT 130 elif dtype in (bool, Storage.BOOL): 131 data = np.full(shape, -1, dtype=Storage.BOOL) 132 dtype = Storage.BOOL 133 else: 134 raise NotImplementedError() 135 136 return StorageSignature(data, shape, dtype) 137 138 @staticmethod 139 def empty(shape, dtype): 140 return empty(shape, dtype, Storage) 141 142 @staticmethod 143 def _get_data_from_ndarray(array): 144 return get_data_from_ndarray( 145 array=array, 146 storage_class=Storage, 147 copy_fun=lambda array_astype: array_astype.copy(), 148 ) 149 150 def amin(self): 151 return impl.amin(self.data) 152 153 def amax(self): 154 return impl.amax(self.data) 155 156 def all(self): 157 return self.data.all() 158 159 @staticmethod 160 def from_ndarray(array): 161 result = Storage(Storage._get_data_from_ndarray(array)) 162 return result 163 164 def floor(self, other=None): 165 if other is None: 166 impl.floor(self.data) 167 else: 168 impl.floor_out_of_place(self.data, other.data) 169 return self 170 171 def product(self, multiplicand, multiplier): 172 if hasattr(multiplier, "data"): 173 impl.multiply_out_of_place(self.data, multiplicand.data, multiplier.data) 174 else: 175 impl.multiply_out_of_place(self.data, multiplicand.data, multiplier) 176 return self 177 178 def ratio(self, dividend, divisor): 179 impl.divide_out_of_place(self.data, dividend.data, divisor.data) 180 return self 181 182 def divide_if_not_zero(self, divisor): 183 impl.divide_if_not_zero(self.data, divisor.data) 184 return self 185 186 def where(self, condition, true_value, false_value): 187 impl.where(self.data, condition.data, true_value.data, false_value.data) 188 return self 189 190 def isless(self, comparison, value): 191 impl.isless(self.data, comparison.data, value) 192 193 def sum(self, arg_a, arg_b): 194 impl.sum_out_of_place(self.data, arg_a.data, arg_b.data) 195 return self 196 197 def ravel(self, other): 198 if isinstance(other, Storage): 199 self.data[:] = other.data.ravel() 200 else: 201 self.data[:] = other.ravel() 202 203 def urand(self, generator): 204 generator(self) 205 206 def to_ndarray(self): 207 return self.data.copy() 208 209 def upload(self, data): 210 np.copyto(self.data, data, casting="safe") 211 212 def fill(self, other): 213 if isinstance(other, Storage): 214 self.data[:] = other.data 215 else: 216 self.data[:] = other 217 218 def exp(self): 219 self.data[:] = np.exp(self.data) 220 221 def abs(self): 222 self.data[:] = np.abs(self.data)