PyMPDATA_examples.Magnuszewski_et_al_2025.asian_option
1from functools import cached_property 2 3import numpy as np 4from PyMPDATA_examples.utils.discretisation import discretised_analytical_solution 5 6from PyMPDATA import ScalarField, Solver, Stepper, VectorField 7from PyMPDATA.boundary_conditions import Extrapolated 8from PyMPDATA.impl.enumerations import INNER, OUTER 9 10 11# pylint: disable=too-few-public-methods 12class Settings: 13 def __init__(self, T, sgma, r, K, S_min, S_max): 14 self.T = T 15 self.sgma = sgma 16 self.r = r 17 self.K = K 18 self.S_min = S_min 19 self.S_max = S_max 20 self.rh = None 21 22 def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"): 23 def call(x): 24 return np.maximum(0, x - self.K) 25 26 def put(x): 27 return np.maximum(0, self.K - x) 28 29 if variant == "call": 30 payoff_func = call 31 else: 32 payoff_func = put 33 34 self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1) 35 output = discretised_analytical_solution(self.rh, payoff_func) 36 return output 37 38 39class Simulation: 40 def __init__(self, settings, *, nx, ny, nt, options, variant="call"): 41 self.nx = nx 42 self.nt = nt 43 self.settings = settings 44 self.ny = ny 45 self.dt = settings.T / self.nt 46 log_s_min = np.log(settings.S_min) 47 log_s_max = np.log(settings.S_max) 48 self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx)) 49 self.A, self.dy = np.linspace( 50 0, settings.S_max, self.ny, retstep=True, endpoint=True 51 ) 52 self.dx = (log_s_max - log_s_min) / self.nx 53 self.settings = settings 54 sigma_squared = pow(settings.sgma, 2) 55 courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx 56 self.l2 = self.dx * self.dx / sigma_squared / self.dt 57 self.mu_coeff = (0.5 / self.l2, 0) 58 assert ( 59 self.l2 > 2 60 ), f"Lambda squared should be more than 2 for stability {self.l2}" 61 self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant) 62 stepper = Stepper(options=options, n_dims=2) 63 x_dim_advector = np.full( 64 (self.nx + 1, self.ny), 65 courant_number_x, 66 dtype=options.dtype, 67 ) 68 cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max( 69 np.abs(x_dim_advector) 70 ) 71 assert cfl_condition < 1, f"CFL condition not met {cfl_condition}" 72 self.solver = Solver( 73 stepper=stepper, 74 advectee=ScalarField( 75 self.payoff_2d.astype(dtype=options.dtype) 76 * np.exp(-self.settings.r * self.settings.T), 77 halo=options.n_halo, 78 boundary_conditions=self.boundary_conditions, 79 ), 80 advector=VectorField( 81 (x_dim_advector, self.a_dim_advector), 82 halo=options.n_halo, 83 boundary_conditions=self.boundary_conditions, 84 ), 85 ) 86 self.rhs = np.zeros((self.nx, self.ny)) 87 88 @property 89 def payoff_2d(self): 90 raise NotImplementedError() 91 92 @property 93 def a_dim_advector(self): 94 raise NotImplementedError() 95 96 @property 97 def boundary_conditions(self): 98 return ( 99 Extrapolated(OUTER), 100 Extrapolated(INNER), 101 ) 102 103 def step(self, nt=1): 104 self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff) 105 106 107class AsianArithmetic(Simulation): 108 @cached_property 109 def payoff_2d(self): 110 return np.repeat([self.payoff], self.nx, axis=0) 111 112 @property 113 def a_dim_advector(self): 114 a_dim_advector = np.zeros((self.nx, self.ny + 1)) 115 for i in range(self.ny + 1): 116 a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T 117 return a_dim_advector
class
Settings:
13class Settings: 14 def __init__(self, T, sgma, r, K, S_min, S_max): 15 self.T = T 16 self.sgma = sgma 17 self.r = r 18 self.K = K 19 self.S_min = S_min 20 self.S_max = S_max 21 self.rh = None 22 23 def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"): 24 def call(x): 25 return np.maximum(0, x - self.K) 26 27 def put(x): 28 return np.maximum(0, self.K - x) 29 30 if variant == "call": 31 payoff_func = call 32 else: 33 payoff_func = put 34 35 self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1) 36 output = discretised_analytical_solution(self.rh, payoff_func) 37 return output
def
payoff(self, A: numpy.ndarray, da: numpy.float32 = 0, variant='call'):
23 def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"): 24 def call(x): 25 return np.maximum(0, x - self.K) 26 27 def put(x): 28 return np.maximum(0, self.K - x) 29 30 if variant == "call": 31 payoff_func = call 32 else: 33 payoff_func = put 34 35 self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1) 36 output = discretised_analytical_solution(self.rh, payoff_func) 37 return output
class
Simulation:
40class Simulation: 41 def __init__(self, settings, *, nx, ny, nt, options, variant="call"): 42 self.nx = nx 43 self.nt = nt 44 self.settings = settings 45 self.ny = ny 46 self.dt = settings.T / self.nt 47 log_s_min = np.log(settings.S_min) 48 log_s_max = np.log(settings.S_max) 49 self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx)) 50 self.A, self.dy = np.linspace( 51 0, settings.S_max, self.ny, retstep=True, endpoint=True 52 ) 53 self.dx = (log_s_max - log_s_min) / self.nx 54 self.settings = settings 55 sigma_squared = pow(settings.sgma, 2) 56 courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx 57 self.l2 = self.dx * self.dx / sigma_squared / self.dt 58 self.mu_coeff = (0.5 / self.l2, 0) 59 assert ( 60 self.l2 > 2 61 ), f"Lambda squared should be more than 2 for stability {self.l2}" 62 self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant) 63 stepper = Stepper(options=options, n_dims=2) 64 x_dim_advector = np.full( 65 (self.nx + 1, self.ny), 66 courant_number_x, 67 dtype=options.dtype, 68 ) 69 cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max( 70 np.abs(x_dim_advector) 71 ) 72 assert cfl_condition < 1, f"CFL condition not met {cfl_condition}" 73 self.solver = Solver( 74 stepper=stepper, 75 advectee=ScalarField( 76 self.payoff_2d.astype(dtype=options.dtype) 77 * np.exp(-self.settings.r * self.settings.T), 78 halo=options.n_halo, 79 boundary_conditions=self.boundary_conditions, 80 ), 81 advector=VectorField( 82 (x_dim_advector, self.a_dim_advector), 83 halo=options.n_halo, 84 boundary_conditions=self.boundary_conditions, 85 ), 86 ) 87 self.rhs = np.zeros((self.nx, self.ny)) 88 89 @property 90 def payoff_2d(self): 91 raise NotImplementedError() 92 93 @property 94 def a_dim_advector(self): 95 raise NotImplementedError() 96 97 @property 98 def boundary_conditions(self): 99 return ( 100 Extrapolated(OUTER), 101 Extrapolated(INNER), 102 ) 103 104 def step(self, nt=1): 105 self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff)
Simulation(settings, *, nx, ny, nt, options, variant='call')
41 def __init__(self, settings, *, nx, ny, nt, options, variant="call"): 42 self.nx = nx 43 self.nt = nt 44 self.settings = settings 45 self.ny = ny 46 self.dt = settings.T / self.nt 47 log_s_min = np.log(settings.S_min) 48 log_s_max = np.log(settings.S_max) 49 self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx)) 50 self.A, self.dy = np.linspace( 51 0, settings.S_max, self.ny, retstep=True, endpoint=True 52 ) 53 self.dx = (log_s_max - log_s_min) / self.nx 54 self.settings = settings 55 sigma_squared = pow(settings.sgma, 2) 56 courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx 57 self.l2 = self.dx * self.dx / sigma_squared / self.dt 58 self.mu_coeff = (0.5 / self.l2, 0) 59 assert ( 60 self.l2 > 2 61 ), f"Lambda squared should be more than 2 for stability {self.l2}" 62 self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant) 63 stepper = Stepper(options=options, n_dims=2) 64 x_dim_advector = np.full( 65 (self.nx + 1, self.ny), 66 courant_number_x, 67 dtype=options.dtype, 68 ) 69 cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max( 70 np.abs(x_dim_advector) 71 ) 72 assert cfl_condition < 1, f"CFL condition not met {cfl_condition}" 73 self.solver = Solver( 74 stepper=stepper, 75 advectee=ScalarField( 76 self.payoff_2d.astype(dtype=options.dtype) 77 * np.exp(-self.settings.r * self.settings.T), 78 halo=options.n_halo, 79 boundary_conditions=self.boundary_conditions, 80 ), 81 advector=VectorField( 82 (x_dim_advector, self.a_dim_advector), 83 halo=options.n_halo, 84 boundary_conditions=self.boundary_conditions, 85 ), 86 ) 87 self.rhs = np.zeros((self.nx, self.ny))
108class AsianArithmetic(Simulation): 109 @cached_property 110 def payoff_2d(self): 111 return np.repeat([self.payoff], self.nx, axis=0) 112 113 @property 114 def a_dim_advector(self): 115 a_dim_advector = np.zeros((self.nx, self.ny + 1)) 116 for i in range(self.ny + 1): 117 a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T 118 return a_dim_advector