PyMPDATA_examples.Arabas_and_Farhat_2020.simulation

  1import numba
  2import numpy as np
  3from PyMPDATA_examples.Arabas_and_Farhat_2020.options import OPTIONS
  4
  5from PyMPDATA import Options, ScalarField, Solver, Stepper, VectorField
  6from PyMPDATA.boundary_conditions import Extrapolated
  7
  8
  9class Simulation:
 10    @staticmethod
 11    def _factory(
 12        *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions
 13    ):
 14        stepper = Stepper(
 15            options=options, n_dims=len(advectee.shape), non_unit_g_factor=False
 16        )
 17        return Solver(
 18            stepper=stepper,
 19            advectee=ScalarField(
 20                advectee.astype(dtype=options.dtype),
 21                halo=options.n_halo,
 22                boundary_conditions=boundary_conditions,
 23            ),
 24            advector=VectorField(
 25                (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),),
 26                halo=options.n_halo,
 27                boundary_conditions=boundary_conditions,
 28            ),
 29        )
 30
 31    def __init__(self, settings):
 32        self.settings = settings
 33
 34        sigma2 = pow(settings.sigma, 2)
 35        dx_opt = abs(
 36            settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2
 37        )
 38        dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt
 39
 40        # adjusting dt so that nt is integer
 41        self.dt = settings.T
 42        self.nt = 0
 43        while self.dt > dt_opt:
 44            self.nt += 1
 45            self.dt = settings.T / self.nt
 46
 47        # adjusting dx to match requested l^2
 48        dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma
 49
 50        # calculating actual u number and lambda
 51        self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx
 52        self.l2 = dx * dx / sigma2 / self.dt
 53
 54        # adjusting nx and setting S_beg, S_end
 55        S_beg = settings.S_match
 56        self.nx = 1
 57        while S_beg > settings.S_min:
 58            self.nx += 1
 59            S_beg = np.exp(np.log(settings.S_match) - self.nx * dx)
 60
 61        self.ix_match = self.nx
 62
 63        S_end = settings.S_match
 64        while S_end < settings.S_max:
 65            self.nx += 1
 66            S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx)
 67
 68        # asset price
 69        self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx)
 70
 71        self.mu_coeff = (0.5 / self.l2,)
 72        self.solvers = {}
 73        self.solvers[1] = self._factory(
 74            advectee=settings.payoff(self.S),
 75            advector=self.C,
 76            options=Options(n_iters=1, non_zero_mu_coeff=True),
 77            boundary_conditions=(Extrapolated(),),
 78        )
 79        self.solvers[2] = self._factory(
 80            advectee=settings.payoff(self.S),
 81            advector=self.C,
 82            options=Options(**OPTIONS),
 83            boundary_conditions=(Extrapolated(),),
 84        )
 85
 86    def run(self, n_iters: int):
 87        if self.settings.amer:
 88            psi = self.solvers[n_iters].advectee.data
 89            f_T = np.empty_like(psi)
 90            f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T)
 91            T = self.settings.T
 92            r = self.settings.r
 93            dt = self.dt
 94
 95            @numba.experimental.jitclass([])
 96            class PostStep:
 97                # pylint: disable=too-few-public-methods
 98                def __init__(self):
 99                    pass
100
101                def call(self, psi, step):
102                    t = T - (step + 1) * dt
103                    psi += np.maximum(psi, f_T / np.exp(r * t)) - psi
104
105            self.solvers[n_iters].advance(self.nt, self.mu_coeff, PostStep())
106        else:
107            self.solvers[n_iters].advance(self.nt, self.mu_coeff)
108
109        return self.solvers[n_iters].advectee.get()
110
111    def terminal_value(self):
112        return self.solvers[1].advectee.get()
class Simulation:
 10class Simulation:
 11    @staticmethod
 12    def _factory(
 13        *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions
 14    ):
 15        stepper = Stepper(
 16            options=options, n_dims=len(advectee.shape), non_unit_g_factor=False
 17        )
 18        return Solver(
 19            stepper=stepper,
 20            advectee=ScalarField(
 21                advectee.astype(dtype=options.dtype),
 22                halo=options.n_halo,
 23                boundary_conditions=boundary_conditions,
 24            ),
 25            advector=VectorField(
 26                (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),),
 27                halo=options.n_halo,
 28                boundary_conditions=boundary_conditions,
 29            ),
 30        )
 31
 32    def __init__(self, settings):
 33        self.settings = settings
 34
 35        sigma2 = pow(settings.sigma, 2)
 36        dx_opt = abs(
 37            settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2
 38        )
 39        dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt
 40
 41        # adjusting dt so that nt is integer
 42        self.dt = settings.T
 43        self.nt = 0
 44        while self.dt > dt_opt:
 45            self.nt += 1
 46            self.dt = settings.T / self.nt
 47
 48        # adjusting dx to match requested l^2
 49        dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma
 50
 51        # calculating actual u number and lambda
 52        self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx
 53        self.l2 = dx * dx / sigma2 / self.dt
 54
 55        # adjusting nx and setting S_beg, S_end
 56        S_beg = settings.S_match
 57        self.nx = 1
 58        while S_beg > settings.S_min:
 59            self.nx += 1
 60            S_beg = np.exp(np.log(settings.S_match) - self.nx * dx)
 61
 62        self.ix_match = self.nx
 63
 64        S_end = settings.S_match
 65        while S_end < settings.S_max:
 66            self.nx += 1
 67            S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx)
 68
 69        # asset price
 70        self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx)
 71
 72        self.mu_coeff = (0.5 / self.l2,)
 73        self.solvers = {}
 74        self.solvers[1] = self._factory(
 75            advectee=settings.payoff(self.S),
 76            advector=self.C,
 77            options=Options(n_iters=1, non_zero_mu_coeff=True),
 78            boundary_conditions=(Extrapolated(),),
 79        )
 80        self.solvers[2] = self._factory(
 81            advectee=settings.payoff(self.S),
 82            advector=self.C,
 83            options=Options(**OPTIONS),
 84            boundary_conditions=(Extrapolated(),),
 85        )
 86
 87    def run(self, n_iters: int):
 88        if self.settings.amer:
 89            psi = self.solvers[n_iters].advectee.data
 90            f_T = np.empty_like(psi)
 91            f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T)
 92            T = self.settings.T
 93            r = self.settings.r
 94            dt = self.dt
 95
 96            @numba.experimental.jitclass([])
 97            class PostStep:
 98                # pylint: disable=too-few-public-methods
 99                def __init__(self):
100                    pass
101
102                def call(self, psi, step):
103                    t = T - (step + 1) * dt
104                    psi += np.maximum(psi, f_T / np.exp(r * t)) - psi
105
106            self.solvers[n_iters].advance(self.nt, self.mu_coeff, PostStep())
107        else:
108            self.solvers[n_iters].advance(self.nt, self.mu_coeff)
109
110        return self.solvers[n_iters].advectee.get()
111
112    def terminal_value(self):
113        return self.solvers[1].advectee.get()
Simulation(settings)
32    def __init__(self, settings):
33        self.settings = settings
34
35        sigma2 = pow(settings.sigma, 2)
36        dx_opt = abs(
37            settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2
38        )
39        dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt
40
41        # adjusting dt so that nt is integer
42        self.dt = settings.T
43        self.nt = 0
44        while self.dt > dt_opt:
45            self.nt += 1
46            self.dt = settings.T / self.nt
47
48        # adjusting dx to match requested l^2
49        dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma
50
51        # calculating actual u number and lambda
52        self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx
53        self.l2 = dx * dx / sigma2 / self.dt
54
55        # adjusting nx and setting S_beg, S_end
56        S_beg = settings.S_match
57        self.nx = 1
58        while S_beg > settings.S_min:
59            self.nx += 1
60            S_beg = np.exp(np.log(settings.S_match) - self.nx * dx)
61
62        self.ix_match = self.nx
63
64        S_end = settings.S_match
65        while S_end < settings.S_max:
66            self.nx += 1
67            S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx)
68
69        # asset price
70        self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx)
71
72        self.mu_coeff = (0.5 / self.l2,)
73        self.solvers = {}
74        self.solvers[1] = self._factory(
75            advectee=settings.payoff(self.S),
76            advector=self.C,
77            options=Options(n_iters=1, non_zero_mu_coeff=True),
78            boundary_conditions=(Extrapolated(),),
79        )
80        self.solvers[2] = self._factory(
81            advectee=settings.payoff(self.S),
82            advector=self.C,
83            options=Options(**OPTIONS),
84            boundary_conditions=(Extrapolated(),),
85        )
settings
dt
nt
C
l2
nx
ix_match
S
mu_coeff
solvers
def run(self, n_iters: int):
 87    def run(self, n_iters: int):
 88        if self.settings.amer:
 89            psi = self.solvers[n_iters].advectee.data
 90            f_T = np.empty_like(psi)
 91            f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T)
 92            T = self.settings.T
 93            r = self.settings.r
 94            dt = self.dt
 95
 96            @numba.experimental.jitclass([])
 97            class PostStep:
 98                # pylint: disable=too-few-public-methods
 99                def __init__(self):
100                    pass
101
102                def call(self, psi, step):
103                    t = T - (step + 1) * dt
104                    psi += np.maximum(psi, f_T / np.exp(r * t)) - psi
105
106            self.solvers[n_iters].advance(self.nt, self.mu_coeff, PostStep())
107        else:
108            self.solvers[n_iters].advance(self.nt, self.mu_coeff)
109
110        return self.solvers[n_iters].advectee.get()
def terminal_value(self):
112    def terminal_value(self):
113        return self.solvers[1].advectee.get()