PyMPDATA_examples.Magnuszewski_et_al_2025.asian_option

  1from functools import cached_property
  2
  3import numpy as np
  4from PyMPDATA_examples.utils.discretisation import discretised_analytical_solution
  5
  6from PyMPDATA import ScalarField, Solver, Stepper, VectorField
  7from PyMPDATA.boundary_conditions import Extrapolated
  8from PyMPDATA.impl.enumerations import INNER, OUTER
  9
 10
 11# pylint: disable=too-few-public-methods
 12class Settings:
 13    def __init__(self, T, sgma, r, K, S_min, S_max):
 14        self.T = T
 15        self.sgma = sgma
 16        self.r = r
 17        self.K = K
 18        self.S_min = S_min
 19        self.S_max = S_max
 20        self.rh = None
 21
 22    def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"):
 23        def call(x):
 24            return np.maximum(0, x - self.K)
 25
 26        def put(x):
 27            return np.maximum(0, self.K - x)
 28
 29        if variant == "call":
 30            payoff_func = call
 31        else:
 32            payoff_func = put
 33
 34        self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1)
 35        output = discretised_analytical_solution(self.rh, payoff_func)
 36        return output
 37
 38
 39class Simulation:
 40    def __init__(self, settings, *, nx, ny, nt, options, variant="call"):
 41        self.nx = nx
 42        self.nt = nt
 43        self.settings = settings
 44        self.ny = ny
 45        self.dt = settings.T / self.nt
 46        log_s_min = np.log(settings.S_min)
 47        log_s_max = np.log(settings.S_max)
 48        self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx))
 49        self.A, self.dy = np.linspace(
 50            0, settings.S_max, self.ny, retstep=True, endpoint=True
 51        )
 52        self.dx = (log_s_max - log_s_min) / self.nx
 53        self.settings = settings
 54        sigma_squared = pow(settings.sgma, 2)
 55        courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx
 56        self.l2 = self.dx * self.dx / sigma_squared / self.dt
 57        self.mu_coeff = (0.5 / self.l2, 0)
 58        assert (
 59            self.l2 > 2
 60        ), f"Lambda squared should be more than 2 for stability {self.l2}"
 61        self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant)
 62        stepper = Stepper(options=options, n_dims=2)
 63        x_dim_advector = np.full(
 64            (self.nx + 1, self.ny),
 65            courant_number_x,
 66            dtype=options.dtype,
 67        )
 68        cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max(
 69            np.abs(x_dim_advector)
 70        )
 71        assert cfl_condition < 1, f"CFL condition not met {cfl_condition}"
 72        self.solver = Solver(
 73            stepper=stepper,
 74            advectee=ScalarField(
 75                self.payoff_2d.astype(dtype=options.dtype)
 76                * np.exp(-self.settings.r * self.settings.T),
 77                halo=options.n_halo,
 78                boundary_conditions=self.boundary_conditions,
 79            ),
 80            advector=VectorField(
 81                (x_dim_advector, self.a_dim_advector),
 82                halo=options.n_halo,
 83                boundary_conditions=self.boundary_conditions,
 84            ),
 85        )
 86        self.rhs = np.zeros((self.nx, self.ny))
 87
 88    @property
 89    def payoff_2d(self):
 90        raise NotImplementedError()
 91
 92    @property
 93    def a_dim_advector(self):
 94        raise NotImplementedError()
 95
 96    @property
 97    def boundary_conditions(self):
 98        return (
 99            Extrapolated(OUTER),
100            Extrapolated(INNER),
101        )
102
103    def step(self, nt=1):
104        self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff)
105
106
107class AsianArithmetic(Simulation):
108    @cached_property
109    def payoff_2d(self):
110        return np.repeat([self.payoff], self.nx, axis=0)
111
112    @property
113    def a_dim_advector(self):
114        a_dim_advector = np.zeros((self.nx, self.ny + 1))
115        for i in range(self.ny + 1):
116            a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T
117        return a_dim_advector
class Settings:
13class Settings:
14    def __init__(self, T, sgma, r, K, S_min, S_max):
15        self.T = T
16        self.sgma = sgma
17        self.r = r
18        self.K = K
19        self.S_min = S_min
20        self.S_max = S_max
21        self.rh = None
22
23    def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"):
24        def call(x):
25            return np.maximum(0, x - self.K)
26
27        def put(x):
28            return np.maximum(0, self.K - x)
29
30        if variant == "call":
31            payoff_func = call
32        else:
33            payoff_func = put
34
35        self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1)
36        output = discretised_analytical_solution(self.rh, payoff_func)
37        return output
Settings(T, sgma, r, K, S_min, S_max)
14    def __init__(self, T, sgma, r, K, S_min, S_max):
15        self.T = T
16        self.sgma = sgma
17        self.r = r
18        self.K = K
19        self.S_min = S_min
20        self.S_max = S_max
21        self.rh = None
T
sgma
r
K
S_min
S_max
rh
def payoff(self, A: numpy.ndarray, da: numpy.float32 = 0, variant='call'):
23    def payoff(self, A: np.ndarray, da: np.float32 = 0, variant="call"):
24        def call(x):
25            return np.maximum(0, x - self.K)
26
27        def put(x):
28            return np.maximum(0, self.K - x)
29
30        if variant == "call":
31            payoff_func = call
32        else:
33            payoff_func = put
34
35        self.rh = np.linspace(A[0] - da / 2, A[-1] + da / 2, len(A) + 1)
36        output = discretised_analytical_solution(self.rh, payoff_func)
37        return output
class Simulation:
 40class Simulation:
 41    def __init__(self, settings, *, nx, ny, nt, options, variant="call"):
 42        self.nx = nx
 43        self.nt = nt
 44        self.settings = settings
 45        self.ny = ny
 46        self.dt = settings.T / self.nt
 47        log_s_min = np.log(settings.S_min)
 48        log_s_max = np.log(settings.S_max)
 49        self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx))
 50        self.A, self.dy = np.linspace(
 51            0, settings.S_max, self.ny, retstep=True, endpoint=True
 52        )
 53        self.dx = (log_s_max - log_s_min) / self.nx
 54        self.settings = settings
 55        sigma_squared = pow(settings.sgma, 2)
 56        courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx
 57        self.l2 = self.dx * self.dx / sigma_squared / self.dt
 58        self.mu_coeff = (0.5 / self.l2, 0)
 59        assert (
 60            self.l2 > 2
 61        ), f"Lambda squared should be more than 2 for stability {self.l2}"
 62        self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant)
 63        stepper = Stepper(options=options, n_dims=2)
 64        x_dim_advector = np.full(
 65            (self.nx + 1, self.ny),
 66            courant_number_x,
 67            dtype=options.dtype,
 68        )
 69        cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max(
 70            np.abs(x_dim_advector)
 71        )
 72        assert cfl_condition < 1, f"CFL condition not met {cfl_condition}"
 73        self.solver = Solver(
 74            stepper=stepper,
 75            advectee=ScalarField(
 76                self.payoff_2d.astype(dtype=options.dtype)
 77                * np.exp(-self.settings.r * self.settings.T),
 78                halo=options.n_halo,
 79                boundary_conditions=self.boundary_conditions,
 80            ),
 81            advector=VectorField(
 82                (x_dim_advector, self.a_dim_advector),
 83                halo=options.n_halo,
 84                boundary_conditions=self.boundary_conditions,
 85            ),
 86        )
 87        self.rhs = np.zeros((self.nx, self.ny))
 88
 89    @property
 90    def payoff_2d(self):
 91        raise NotImplementedError()
 92
 93    @property
 94    def a_dim_advector(self):
 95        raise NotImplementedError()
 96
 97    @property
 98    def boundary_conditions(self):
 99        return (
100            Extrapolated(OUTER),
101            Extrapolated(INNER),
102        )
103
104    def step(self, nt=1):
105        self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff)
Simulation(settings, *, nx, ny, nt, options, variant='call')
41    def __init__(self, settings, *, nx, ny, nt, options, variant="call"):
42        self.nx = nx
43        self.nt = nt
44        self.settings = settings
45        self.ny = ny
46        self.dt = settings.T / self.nt
47        log_s_min = np.log(settings.S_min)
48        log_s_max = np.log(settings.S_max)
49        self.S = np.exp(np.linspace(log_s_min, log_s_max, self.nx))
50        self.A, self.dy = np.linspace(
51            0, settings.S_max, self.ny, retstep=True, endpoint=True
52        )
53        self.dx = (log_s_max - log_s_min) / self.nx
54        self.settings = settings
55        sigma_squared = pow(settings.sgma, 2)
56        courant_number_x = -(0.5 * sigma_squared - settings.r) * (-self.dt) / self.dx
57        self.l2 = self.dx * self.dx / sigma_squared / self.dt
58        self.mu_coeff = (0.5 / self.l2, 0)
59        assert (
60            self.l2 > 2
61        ), f"Lambda squared should be more than 2 for stability {self.l2}"
62        self.payoff = settings.payoff(A=self.A, da=self.dy, variant=variant)
63        stepper = Stepper(options=options, n_dims=2)
64        x_dim_advector = np.full(
65            (self.nx + 1, self.ny),
66            courant_number_x,
67            dtype=options.dtype,
68        )
69        cfl_condition = np.max(np.abs(self.a_dim_advector)) + np.max(
70            np.abs(x_dim_advector)
71        )
72        assert cfl_condition < 1, f"CFL condition not met {cfl_condition}"
73        self.solver = Solver(
74            stepper=stepper,
75            advectee=ScalarField(
76                self.payoff_2d.astype(dtype=options.dtype)
77                * np.exp(-self.settings.r * self.settings.T),
78                halo=options.n_halo,
79                boundary_conditions=self.boundary_conditions,
80            ),
81            advector=VectorField(
82                (x_dim_advector, self.a_dim_advector),
83                halo=options.n_halo,
84                boundary_conditions=self.boundary_conditions,
85            ),
86        )
87        self.rhs = np.zeros((self.nx, self.ny))
nx
nt
settings
ny
dt
S
dx
l2
mu_coeff
payoff
solver
rhs
payoff_2d
89    @property
90    def payoff_2d(self):
91        raise NotImplementedError()
a_dim_advector
93    @property
94    def a_dim_advector(self):
95        raise NotImplementedError()
boundary_conditions
 97    @property
 98    def boundary_conditions(self):
 99        return (
100            Extrapolated(OUTER),
101            Extrapolated(INNER),
102        )
def step(self, nt=1):
104    def step(self, nt=1):
105        self.solver.advance(n_steps=nt, mu_coeff=self.mu_coeff)
class AsianArithmetic(Simulation):
108class AsianArithmetic(Simulation):
109    @cached_property
110    def payoff_2d(self):
111        return np.repeat([self.payoff], self.nx, axis=0)
112
113    @property
114    def a_dim_advector(self):
115        a_dim_advector = np.zeros((self.nx, self.ny + 1))
116        for i in range(self.ny + 1):
117            a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T
118        return a_dim_advector
payoff_2d
109    @cached_property
110    def payoff_2d(self):
111        return np.repeat([self.payoff], self.nx, axis=0)
a_dim_advector
113    @property
114    def a_dim_advector(self):
115        a_dim_advector = np.zeros((self.nx, self.ny + 1))
116        for i in range(self.ny + 1):
117            a_dim_advector[:, i] = -self.dt / self.dy * self.S / self.settings.T
118        return a_dim_advector