PyMPDATA_examples.Magnuszewski_et_al_2025.asian_option
1from functools import cached_property 2 3import numpy as np 4from PyMPDATA_examples.utils.discretisation import discretised_analytical_solution 5 6from PyMPDATA import ScalarField, Solver, Stepper, VectorField 7from PyMPDATA.boundary_conditions import Extrapolated 8from PyMPDATA.impl.enumerations import INNER, OUTER 9 10 11# pylint: disable=too-few-public-methods 12class Settings: 13 def __init__(self, T, sgma, r, K, S_min, S_max): 14 self.T = T 15 self.sgma = sgma 16 self.r = r 17 self.K = K 18 self.S_min = S_min 19 self.S_max = S_max 20 self.rh = None 21 22 def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"): 23 def call(x): 24 return np.maximum(0, x - self.K) 25 26 def put(x): 27 return np.maximum(0, self.K - x) 28 29 if variant == "call": 30 payoff_func = call 31 else: 32 payoff_func = put 33 34 self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1) 35 output = discretised_analytical_solution(self.rh, payoff_func) 36 return output 37 38 39class Simulation: 40 def __init__(self, settings, *, nx, ny, nt, options, variant="call", eps=1e-10): 41 self.nx = nx 42 self.nt = nt 43 self.eps = eps 44 self.settings = settings 45 self.ny = ny 46 self.dt = settings.T / self.nt 47 log_s_min = np.log(settings.S_min) 48 log_s_max = np.log(settings.S_max) 49 self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx)) 50 self.A, self.dy = np.linspace( 51 0, settings.S_max, self.ny, retstep=True, endpoint=True 52 ) 53 self.dx = (log_s_max - log_s_min) / self.nx 54 self.settings = settings 55 sigma_squared = pow(settings.sgma, 2) 56 courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx 57 self.l2 = self.dx * self.dx / sigma_squared / self.dt 58 self.mu_coeff = (0.5 / self.l2, 0) 59 assert ( 60 self.l2 > 2 61 ), f"Lambda squared should be more than 2 for stability {self.l2}" 62 self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant) 63 stepper = Stepper(options=options, n_dims=2, n_threads=1) # TODO #570 64 x_dim_advector = np.full( 65 (self.nx + 1, self.ny), 66 courant_number_x, 67 dtype=options.dtype, 68 ) 69 cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max( 70 np.abs(x_dim_advector) 71 ) 72 assert cfl_condition < 1, f"CFL condition not met {cfl_condition}" 73 self.solver = Solver( 74 stepper=stepper, 75 advectee=ScalarField( 76 self.payoff_2d.astype(dtype=options.dtype) 77 * np.exp(-self.settings.r * self.settings.T), 78 halo=options.n_halo, 79 boundary_conditions=self.boundary_conditions, 80 ), 81 advector=VectorField( 82 (x_dim_advector, self.a_dim_advector), 83 halo=options.n_halo, 84 boundary_conditions=self.boundary_conditions, 85 ), 86 ) 87 self.rhs = np.zeros((self.nx, self.ny)) 88 89 @property 90 def payoff_2d(self): 91 raise NotImplementedError() 92 93 @property 94 def a_dim_advector(self): 95 raise NotImplementedError() 96 97 @property 98 def boundary_conditions(self): 99 return ( 100 Extrapolated(OUTER, eps=self.eps), 101 Extrapolated(INNER, eps=self.eps), 102 ) 103 104 def step(self, nt=1): 105 self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff) 106 107 108class AsianArithmetic(Simulation): 109 @cached_property 110 def payoff_2d(self): 111 return np.repeat([self.payoff], self.nx, axis=0) 112 113 @property 114 def a_dim_advector(self): 115 a_dim_advector = np.zeros((self.nx, self.ny + 1)) 116 for i in range(self.ny + 1): 117 a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T 118 return a_dim_advector
class
Settings:
13class Settings: 14 def __init__(self, T, sgma, r, K, S_min, S_max): 15 self.T = T 16 self.sgma = sgma 17 self.r = r 18 self.K = K 19 self.S_min = S_min 20 self.S_max = S_max 21 self.rh = None 22 23 def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"): 24 def call(x): 25 return np.maximum(0, x - self.K) 26 27 def put(x): 28 return np.maximum(0, self.K - x) 29 30 if variant == "call": 31 payoff_func = call 32 else: 33 payoff_func = put 34 35 self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1) 36 output = discretised_analytical_solution(self.rh, payoff_func) 37 return output
def
payoff(self, A: numpy.ndarray, da: numpy.float32 = 0, variant='call'):
23 def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"): 24 def call(x): 25 return np.maximum(0, x - self.K) 26 27 def put(x): 28 return np.maximum(0, self.K - x) 29 30 if variant == "call": 31 payoff_func = call 32 else: 33 payoff_func = put 34 35 self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1) 36 output = discretised_analytical_solution(self.rh, payoff_func) 37 return output
class
Simulation:
40class Simulation: 41 def __init__(self, settings, *, nx, ny, nt, options, variant="call", eps=1e-10): 42 self.nx = nx 43 self.nt = nt 44 self.eps = eps 45 self.settings = settings 46 self.ny = ny 47 self.dt = settings.T / self.nt 48 log_s_min = np.log(settings.S_min) 49 log_s_max = np.log(settings.S_max) 50 self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx)) 51 self.A, self.dy = np.linspace( 52 0, settings.S_max, self.ny, retstep=True, endpoint=True 53 ) 54 self.dx = (log_s_max - log_s_min) / self.nx 55 self.settings = settings 56 sigma_squared = pow(settings.sgma, 2) 57 courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx 58 self.l2 = self.dx * self.dx / sigma_squared / self.dt 59 self.mu_coeff = (0.5 / self.l2, 0) 60 assert ( 61 self.l2 > 2 62 ), f"Lambda squared should be more than 2 for stability {self.l2}" 63 self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant) 64 stepper = Stepper(options=options, n_dims=2, n_threads=1) # TODO #570 65 x_dim_advector = np.full( 66 (self.nx + 1, self.ny), 67 courant_number_x, 68 dtype=options.dtype, 69 ) 70 cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max( 71 np.abs(x_dim_advector) 72 ) 73 assert cfl_condition < 1, f"CFL condition not met {cfl_condition}" 74 self.solver = Solver( 75 stepper=stepper, 76 advectee=ScalarField( 77 self.payoff_2d.astype(dtype=options.dtype) 78 * np.exp(-self.settings.r * self.settings.T), 79 halo=options.n_halo, 80 boundary_conditions=self.boundary_conditions, 81 ), 82 advector=VectorField( 83 (x_dim_advector, self.a_dim_advector), 84 halo=options.n_halo, 85 boundary_conditions=self.boundary_conditions, 86 ), 87 ) 88 self.rhs = np.zeros((self.nx, self.ny)) 89 90 @property 91 def payoff_2d(self): 92 raise NotImplementedError() 93 94 @property 95 def a_dim_advector(self): 96 raise NotImplementedError() 97 98 @property 99 def boundary_conditions(self): 100 return ( 101 Extrapolated(OUTER, eps=self.eps), 102 Extrapolated(INNER, eps=self.eps), 103 ) 104 105 def step(self, nt=1): 106 self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff)
Simulation(settings, *, nx, ny, nt, options, variant='call', eps=1e-10)
41 def __init__(self, settings, *, nx, ny, nt, options, variant="call", eps=1e-10): 42 self.nx = nx 43 self.nt = nt 44 self.eps = eps 45 self.settings = settings 46 self.ny = ny 47 self.dt = settings.T / self.nt 48 log_s_min = np.log(settings.S_min) 49 log_s_max = np.log(settings.S_max) 50 self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx)) 51 self.A, self.dy = np.linspace( 52 0, settings.S_max, self.ny, retstep=True, endpoint=True 53 ) 54 self.dx = (log_s_max - log_s_min) / self.nx 55 self.settings = settings 56 sigma_squared = pow(settings.sgma, 2) 57 courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx 58 self.l2 = self.dx * self.dx / sigma_squared / self.dt 59 self.mu_coeff = (0.5 / self.l2, 0) 60 assert ( 61 self.l2 > 2 62 ), f"Lambda squared should be more than 2 for stability {self.l2}" 63 self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant) 64 stepper = Stepper(options=options, n_dims=2, n_threads=1) # TODO #570 65 x_dim_advector = np.full( 66 (self.nx + 1, self.ny), 67 courant_number_x, 68 dtype=options.dtype, 69 ) 70 cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max( 71 np.abs(x_dim_advector) 72 ) 73 assert cfl_condition < 1, f"CFL condition not met {cfl_condition}" 74 self.solver = Solver( 75 stepper=stepper, 76 advectee=ScalarField( 77 self.payoff_2d.astype(dtype=options.dtype) 78 * np.exp(-self.settings.r * self.settings.T), 79 halo=options.n_halo, 80 boundary_conditions=self.boundary_conditions, 81 ), 82 advector=VectorField( 83 (x_dim_advector, self.a_dim_advector), 84 halo=options.n_halo, 85 boundary_conditions=self.boundary_conditions, 86 ), 87 ) 88 self.rhs = np.zeros((self.nx, self.ny))
109class AsianArithmetic(Simulation): 110 @cached_property 111 def payoff_2d(self): 112 return np.repeat([self.payoff], self.nx, axis=0) 113 114 @property 115 def a_dim_advector(self): 116 a_dim_advector = np.zeros((self.nx, self.ny + 1)) 117 for i in range(self.ny + 1): 118 a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T 119 return a_dim_advector