PyMPDATA_examples.Arabas_and_Farhat_2020.simulation
1import numba 2import numpy as np 3from PyMPDATA_examples.Arabas_and_Farhat_2020.options import OPTIONS 4 5from PyMPDATA import Options, ScalarField, Solver, Stepper, VectorField 6from PyMPDATA.boundary_conditions import Extrapolated 7 8 9class Simulation: 10 @staticmethod 11 def _factory( 12 *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions 13 ): 14 stepper = Stepper( 15 options=options, n_dims=len(advectee.shape), non_unit_g_factor=False 16 ) 17 return Solver( 18 stepper=stepper, 19 advectee=ScalarField( 20 advectee.astype(dtype=options.dtype), 21 halo=options.n_halo, 22 boundary_conditions=boundary_conditions, 23 ), 24 advector=VectorField( 25 (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),), 26 halo=options.n_halo, 27 boundary_conditions=boundary_conditions, 28 ), 29 ) 30 31 def __init__(self, settings): 32 self.settings = settings 33 34 sigma2 = pow(settings.sigma, 2) 35 dx_opt = abs( 36 settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2 37 ) 38 dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt 39 40 # adjusting dt so that nt is integer 41 self.dt = settings.T 42 self.nt = 0 43 while self.dt > dt_opt: 44 self.nt += 1 45 self.dt = settings.T / self.nt 46 47 # adjusting dx to match requested l^2 48 dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma 49 50 # calculating actual u number and lambda 51 self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx 52 self.l2 = dx * dx / sigma2 / self.dt 53 54 # adjusting nx and setting S_beg, S_end 55 S_beg = settings.S_match 56 self.nx = 1 57 while S_beg > settings.S_min: 58 self.nx += 1 59 S_beg = np.exp(np.log(settings.S_match) - self.nx * dx) 60 61 self.ix_match = self.nx 62 63 S_end = settings.S_match 64 while S_end < settings.S_max: 65 self.nx += 1 66 S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx) 67 68 # asset price 69 self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx) 70 71 self.mu_coeff = (0.5 / self.l2,) 72 self.solvers = {} 73 self.solvers[1] = self._factory( 74 advectee=settings.payoff(self.S), 75 advector=self.C, 76 options=Options(n_iters=1, non_zero_mu_coeff=True), 77 boundary_conditions=(Extrapolated(),), 78 ) 79 self.solvers[2] = self._factory( 80 advectee=settings.payoff(self.S), 81 advector=self.C, 82 options=Options(**OPTIONS), 83 boundary_conditions=(Extrapolated(),), 84 ) 85 86 def run(self, n_iters: int): 87 if self.settings.amer: 88 psi = self.solvers[n_iters].advectee.data 89 f_T = np.empty_like(psi) 90 f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T) 91 T = self.settings.T 92 r = self.settings.r 93 dt = self.dt 94 95 @numba.experimental.jitclass([]) 96 class PostStep: 97 # pylint: disable=too-few-public-methods 98 def __init__(self): 99 pass 100 101 def call(self, psi, step): 102 t = T - (step + 1) * dt 103 psi += np.maximum(psi, f_T / np.exp(r * t)) - psi 104 105 self.solvers[n_iters].advance(self.nt, self.mu_coeff, PostStep()) 106 else: 107 self.solvers[n_iters].advance(self.nt, self.mu_coeff) 108 109 return self.solvers[n_iters].advectee.get() 110 111 def terminal_value(self): 112 return self.solvers[1].advectee.get()
class
Simulation:
10class Simulation: 11 @staticmethod 12 def _factory( 13 *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions 14 ): 15 stepper = Stepper( 16 options=options, n_dims=len(advectee.shape), non_unit_g_factor=False 17 ) 18 return Solver( 19 stepper=stepper, 20 advectee=ScalarField( 21 advectee.astype(dtype=options.dtype), 22 halo=options.n_halo, 23 boundary_conditions=boundary_conditions, 24 ), 25 advector=VectorField( 26 (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),), 27 halo=options.n_halo, 28 boundary_conditions=boundary_conditions, 29 ), 30 ) 31 32 def __init__(self, settings): 33 self.settings = settings 34 35 sigma2 = pow(settings.sigma, 2) 36 dx_opt = abs( 37 settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2 38 ) 39 dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt 40 41 # adjusting dt so that nt is integer 42 self.dt = settings.T 43 self.nt = 0 44 while self.dt > dt_opt: 45 self.nt += 1 46 self.dt = settings.T / self.nt 47 48 # adjusting dx to match requested l^2 49 dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma 50 51 # calculating actual u number and lambda 52 self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx 53 self.l2 = dx * dx / sigma2 / self.dt 54 55 # adjusting nx and setting S_beg, S_end 56 S_beg = settings.S_match 57 self.nx = 1 58 while S_beg > settings.S_min: 59 self.nx += 1 60 S_beg = np.exp(np.log(settings.S_match) - self.nx * dx) 61 62 self.ix_match = self.nx 63 64 S_end = settings.S_match 65 while S_end < settings.S_max: 66 self.nx += 1 67 S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx) 68 69 # asset price 70 self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx) 71 72 self.mu_coeff = (0.5 / self.l2,) 73 self.solvers = {} 74 self.solvers[1] = self._factory( 75 advectee=settings.payoff(self.S), 76 advector=self.C, 77 options=Options(n_iters=1, non_zero_mu_coeff=True), 78 boundary_conditions=(Extrapolated(),), 79 ) 80 self.solvers[2] = self._factory( 81 advectee=settings.payoff(self.S), 82 advector=self.C, 83 options=Options(**OPTIONS), 84 boundary_conditions=(Extrapolated(),), 85 ) 86 87 def run(self, n_iters: int): 88 if self.settings.amer: 89 psi = self.solvers[n_iters].advectee.data 90 f_T = np.empty_like(psi) 91 f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T) 92 T = self.settings.T 93 r = self.settings.r 94 dt = self.dt 95 96 @numba.experimental.jitclass([]) 97 class PostStep: 98 # pylint: disable=too-few-public-methods 99 def __init__(self): 100 pass 101 102 def call(self, psi, step): 103 t = T - (step + 1) * dt 104 psi += np.maximum(psi, f_T / np.exp(r * t)) - psi 105 106 self.solvers[n_iters].advance(self.nt, self.mu_coeff, PostStep()) 107 else: 108 self.solvers[n_iters].advance(self.nt, self.mu_coeff) 109 110 return self.solvers[n_iters].advectee.get() 111 112 def terminal_value(self): 113 return self.solvers[1].advectee.get()
Simulation(settings)
32 def __init__(self, settings): 33 self.settings = settings 34 35 sigma2 = pow(settings.sigma, 2) 36 dx_opt = abs( 37 settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2 38 ) 39 dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt 40 41 # adjusting dt so that nt is integer 42 self.dt = settings.T 43 self.nt = 0 44 while self.dt > dt_opt: 45 self.nt += 1 46 self.dt = settings.T / self.nt 47 48 # adjusting dx to match requested l^2 49 dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma 50 51 # calculating actual u number and lambda 52 self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx 53 self.l2 = dx * dx / sigma2 / self.dt 54 55 # adjusting nx and setting S_beg, S_end 56 S_beg = settings.S_match 57 self.nx = 1 58 while S_beg > settings.S_min: 59 self.nx += 1 60 S_beg = np.exp(np.log(settings.S_match) - self.nx * dx) 61 62 self.ix_match = self.nx 63 64 S_end = settings.S_match 65 while S_end < settings.S_max: 66 self.nx += 1 67 S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx) 68 69 # asset price 70 self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx) 71 72 self.mu_coeff = (0.5 / self.l2,) 73 self.solvers = {} 74 self.solvers[1] = self._factory( 75 advectee=settings.payoff(self.S), 76 advector=self.C, 77 options=Options(n_iters=1, non_zero_mu_coeff=True), 78 boundary_conditions=(Extrapolated(),), 79 ) 80 self.solvers[2] = self._factory( 81 advectee=settings.payoff(self.S), 82 advector=self.C, 83 options=Options(**OPTIONS), 84 boundary_conditions=(Extrapolated(),), 85 )
def
run(self, n_iters: int):
87 def run(self, n_iters: int): 88 if self.settings.amer: 89 psi = self.solvers[n_iters].advectee.data 90 f_T = np.empty_like(psi) 91 f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T) 92 T = self.settings.T 93 r = self.settings.r 94 dt = self.dt 95 96 @numba.experimental.jitclass([]) 97 class PostStep: 98 # pylint: disable=too-few-public-methods 99 def __init__(self): 100 pass 101 102 def call(self, psi, step): 103 t = T - (step + 1) * dt 104 psi += np.maximum(psi, f_T / np.exp(r * t)) - psi 105 106 self.solvers[n_iters].advance(self.nt, self.mu_coeff, PostStep()) 107 else: 108 self.solvers[n_iters].advance(self.nt, self.mu_coeff) 109 110 return self.solvers[n_iters].advectee.get()