PyMPDATA_examples.Arabas_and_Farhat_2020.simulation
1import numba 2import numpy as np 3from PyMPDATA_examples.Arabas_and_Farhat_2020.options import OPTIONS 4 5from PyMPDATA import Options, ScalarField, Solver, Stepper, VectorField 6from PyMPDATA.boundary_conditions import Extrapolated 7from PyMPDATA.impl.enumerations import ARG_DATA 8 9 10class Simulation: 11 @staticmethod 12 def _factory( 13 *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions 14 ): 15 stepper = Stepper( 16 options=options, n_dims=len(advectee.shape), non_unit_g_factor=False 17 ) 18 return Solver( 19 stepper=stepper, 20 advectee=ScalarField( 21 advectee.astype(dtype=options.dtype), 22 halo=options.n_halo, 23 boundary_conditions=boundary_conditions, 24 ), 25 advector=VectorField( 26 (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),), 27 halo=options.n_halo, 28 boundary_conditions=boundary_conditions, 29 ), 30 ) 31 32 def __init__(self, settings): 33 self.settings = settings 34 35 sigma2 = pow(settings.sigma, 2) 36 dx_opt = abs( 37 settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2 38 ) 39 dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt 40 41 # adjusting dt so that nt is integer 42 self.dt = settings.T 43 self.nt = 0 44 while self.dt > dt_opt: 45 self.nt += 1 46 self.dt = settings.T / self.nt 47 48 # adjusting dx to match requested l^2 49 dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma 50 51 # calculating actual u number and lambda 52 self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx 53 self.l2 = dx * dx / sigma2 / self.dt 54 55 # adjusting nx and setting S_beg, S_end 56 S_beg = settings.S_match 57 self.nx = 1 58 while S_beg > settings.S_min: 59 self.nx += 1 60 S_beg = np.exp(np.log(settings.S_match) - self.nx * dx) 61 62 self.ix_match = self.nx 63 64 S_end = settings.S_match 65 while S_end < settings.S_max: 66 self.nx += 1 67 S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx) 68 69 # asset price 70 self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx) 71 72 self.mu_coeff = (0.5 / self.l2,) 73 self.solvers = {} 74 self.solvers[1] = self._factory( 75 advectee=settings.payoff(self.S), 76 advector=self.C, 77 options=Options(n_iters=1, non_zero_mu_coeff=True), 78 boundary_conditions=(Extrapolated(),), 79 ) 80 self.solvers[2] = self._factory( 81 advectee=settings.payoff(self.S), 82 advector=self.C, 83 options=Options(**OPTIONS), 84 boundary_conditions=(Extrapolated(),), 85 ) 86 87 def run(self, n_iters: int): 88 if self.settings.amer: 89 psi = self.solvers[n_iters].advectee.data 90 f_T = np.empty_like(psi) 91 f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T) 92 T = self.settings.T 93 r = self.settings.r 94 dt = self.dt 95 96 @numba.experimental.jitclass([]) 97 class PostStep: 98 # pylint: disable=too-few-public-methods 99 def __init__(self): 100 pass 101 102 def call(self, _, psi, step, index): 103 t = T - (step + 1) * dt 104 psi[index].field[ARG_DATA][:] += ( 105 np.maximum(psi[index].field[ARG_DATA], f_T / np.exp(r * t)) 106 - psi[index].field[ARG_DATA] 107 ) 108 109 self.solvers[n_iters].advance( 110 self.nt, mu_coeff=self.mu_coeff, post_step=PostStep() 111 ) 112 else: 113 self.solvers[n_iters].advance(self.nt, mu_coeff=self.mu_coeff) 114 115 return self.solvers[n_iters].advectee.get() 116 117 def terminal_value(self): 118 return self.solvers[1].advectee.get()
class
Simulation:
11class Simulation: 12 @staticmethod 13 def _factory( 14 *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions 15 ): 16 stepper = Stepper( 17 options=options, n_dims=len(advectee.shape), non_unit_g_factor=False 18 ) 19 return Solver( 20 stepper=stepper, 21 advectee=ScalarField( 22 advectee.astype(dtype=options.dtype), 23 halo=options.n_halo, 24 boundary_conditions=boundary_conditions, 25 ), 26 advector=VectorField( 27 (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),), 28 halo=options.n_halo, 29 boundary_conditions=boundary_conditions, 30 ), 31 ) 32 33 def __init__(self, settings): 34 self.settings = settings 35 36 sigma2 = pow(settings.sigma, 2) 37 dx_opt = abs( 38 settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2 39 ) 40 dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt 41 42 # adjusting dt so that nt is integer 43 self.dt = settings.T 44 self.nt = 0 45 while self.dt > dt_opt: 46 self.nt += 1 47 self.dt = settings.T / self.nt 48 49 # adjusting dx to match requested l^2 50 dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma 51 52 # calculating actual u number and lambda 53 self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx 54 self.l2 = dx * dx / sigma2 / self.dt 55 56 # adjusting nx and setting S_beg, S_end 57 S_beg = settings.S_match 58 self.nx = 1 59 while S_beg > settings.S_min: 60 self.nx += 1 61 S_beg = np.exp(np.log(settings.S_match) - self.nx * dx) 62 63 self.ix_match = self.nx 64 65 S_end = settings.S_match 66 while S_end < settings.S_max: 67 self.nx += 1 68 S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx) 69 70 # asset price 71 self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx) 72 73 self.mu_coeff = (0.5 / self.l2,) 74 self.solvers = {} 75 self.solvers[1] = self._factory( 76 advectee=settings.payoff(self.S), 77 advector=self.C, 78 options=Options(n_iters=1, non_zero_mu_coeff=True), 79 boundary_conditions=(Extrapolated(),), 80 ) 81 self.solvers[2] = self._factory( 82 advectee=settings.payoff(self.S), 83 advector=self.C, 84 options=Options(**OPTIONS), 85 boundary_conditions=(Extrapolated(),), 86 ) 87 88 def run(self, n_iters: int): 89 if self.settings.amer: 90 psi = self.solvers[n_iters].advectee.data 91 f_T = np.empty_like(psi) 92 f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T) 93 T = self.settings.T 94 r = self.settings.r 95 dt = self.dt 96 97 @numba.experimental.jitclass([]) 98 class PostStep: 99 # pylint: disable=too-few-public-methods 100 def __init__(self): 101 pass 102 103 def call(self, _, psi, step, index): 104 t = T - (step + 1) * dt 105 psi[index].field[ARG_DATA][:] += ( 106 np.maximum(psi[index].field[ARG_DATA], f_T / np.exp(r * t)) 107 - psi[index].field[ARG_DATA] 108 ) 109 110 self.solvers[n_iters].advance( 111 self.nt, mu_coeff=self.mu_coeff, post_step=PostStep() 112 ) 113 else: 114 self.solvers[n_iters].advance(self.nt, mu_coeff=self.mu_coeff) 115 116 return self.solvers[n_iters].advectee.get() 117 118 def terminal_value(self): 119 return self.solvers[1].advectee.get()
Simulation(settings)
33 def __init__(self, settings): 34 self.settings = settings 35 36 sigma2 = pow(settings.sigma, 2) 37 dx_opt = abs( 38 settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2 39 ) 40 dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt 41 42 # adjusting dt so that nt is integer 43 self.dt = settings.T 44 self.nt = 0 45 while self.dt > dt_opt: 46 self.nt += 1 47 self.dt = settings.T / self.nt 48 49 # adjusting dx to match requested l^2 50 dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma 51 52 # calculating actual u number and lambda 53 self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx 54 self.l2 = dx * dx / sigma2 / self.dt 55 56 # adjusting nx and setting S_beg, S_end 57 S_beg = settings.S_match 58 self.nx = 1 59 while S_beg > settings.S_min: 60 self.nx += 1 61 S_beg = np.exp(np.log(settings.S_match) - self.nx * dx) 62 63 self.ix_match = self.nx 64 65 S_end = settings.S_match 66 while S_end < settings.S_max: 67 self.nx += 1 68 S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx) 69 70 # asset price 71 self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx) 72 73 self.mu_coeff = (0.5 / self.l2,) 74 self.solvers = {} 75 self.solvers[1] = self._factory( 76 advectee=settings.payoff(self.S), 77 advector=self.C, 78 options=Options(n_iters=1, non_zero_mu_coeff=True), 79 boundary_conditions=(Extrapolated(),), 80 ) 81 self.solvers[2] = self._factory( 82 advectee=settings.payoff(self.S), 83 advector=self.C, 84 options=Options(**OPTIONS), 85 boundary_conditions=(Extrapolated(),), 86 )
def
run(self, n_iters: int):
88 def run(self, n_iters: int): 89 if self.settings.amer: 90 psi = self.solvers[n_iters].advectee.data 91 f_T = np.empty_like(psi) 92 f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T) 93 T = self.settings.T 94 r = self.settings.r 95 dt = self.dt 96 97 @numba.experimental.jitclass([]) 98 class PostStep: 99 # pylint: disable=too-few-public-methods 100 def __init__(self): 101 pass 102 103 def call(self, _, psi, step, index): 104 t = T - (step + 1) * dt 105 psi[index].field[ARG_DATA][:] += ( 106 np.maximum(psi[index].field[ARG_DATA], f_T / np.exp(r * t)) 107 - psi[index].field[ARG_DATA] 108 ) 109 110 self.solvers[n_iters].advance( 111 self.nt, mu_coeff=self.mu_coeff, post_step=PostStep() 112 ) 113 else: 114 self.solvers[n_iters].advance(self.nt, mu_coeff=self.mu_coeff) 115 116 return self.solvers[n_iters].advectee.get()