PyMPDATA_examples.Arabas_and_Farhat_2020.simulation

  1import numba
  2import numpy as np
  3from PyMPDATA_examples.Arabas_and_Farhat_2020.options import OPTIONS
  4
  5from PyMPDATA import Options, ScalarField, Solver, Stepper, VectorField
  6from PyMPDATA.boundary_conditions import Extrapolated
  7from PyMPDATA.impl.enumerations import ARG_DATA
  8
  9
 10class Simulation:
 11    @staticmethod
 12    def _factory(
 13        *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions
 14    ):
 15        stepper = Stepper(
 16            options=options, n_dims=len(advectee.shape), non_unit_g_factor=False
 17        )
 18        return Solver(
 19            stepper=stepper,
 20            advectee=ScalarField(
 21                advectee.astype(dtype=options.dtype),
 22                halo=options.n_halo,
 23                boundary_conditions=boundary_conditions,
 24            ),
 25            advector=VectorField(
 26                (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),),
 27                halo=options.n_halo,
 28                boundary_conditions=boundary_conditions,
 29            ),
 30        )
 31
 32    def __init__(self, settings):
 33        self.settings = settings
 34
 35        sigma2 = pow(settings.sigma, 2)
 36        dx_opt = abs(
 37            settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2
 38        )
 39        dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt
 40
 41        # adjusting dt so that nt is integer
 42        self.dt = settings.T
 43        self.nt = 0
 44        while self.dt > dt_opt:
 45            self.nt += 1
 46            self.dt = settings.T / self.nt
 47
 48        # adjusting dx to match requested l^2
 49        dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma
 50
 51        # calculating actual u number and lambda
 52        self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx
 53        self.l2 = dx * dx / sigma2 / self.dt
 54
 55        # adjusting nx and setting S_beg, S_end
 56        S_beg = settings.S_match
 57        self.nx = 1
 58        while S_beg > settings.S_min:
 59            self.nx += 1
 60            S_beg = np.exp(np.log(settings.S_match) - self.nx * dx)
 61
 62        self.ix_match = self.nx
 63
 64        S_end = settings.S_match
 65        while S_end < settings.S_max:
 66            self.nx += 1
 67            S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx)
 68
 69        # asset price
 70        self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx)
 71
 72        self.mu_coeff = (0.5 / self.l2,)
 73        self.solvers = {}
 74        self.solvers[1] = self._factory(
 75            advectee=settings.payoff(self.S),
 76            advector=self.C,
 77            options=Options(n_iters=1, non_zero_mu_coeff=True),
 78            boundary_conditions=(Extrapolated(),),
 79        )
 80        self.solvers[2] = self._factory(
 81            advectee=settings.payoff(self.S),
 82            advector=self.C,
 83            options=Options(**OPTIONS),
 84            boundary_conditions=(Extrapolated(),),
 85        )
 86
 87    def run(self, n_iters: int):
 88        if self.settings.amer:
 89            psi = self.solvers[n_iters].advectee.data
 90            f_T = np.empty_like(psi)
 91            f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T)
 92            T = self.settings.T
 93            r = self.settings.r
 94            dt = self.dt
 95
 96            @numba.experimental.jitclass([])
 97            class PostStep:
 98                # pylint: disable=too-few-public-methods
 99                def __init__(self):
100                    pass
101
102                def call(self, _, psi, step, index):
103                    t = T - (step + 1) * dt
104                    psi[index].field[ARG_DATA][:] += (
105                        np.maximum(psi[index].field[ARG_DATA], f_T / np.exp(r * t))
106                        - psi[index].field[ARG_DATA]
107                    )
108
109            self.solvers[n_iters].advance(
110                self.nt, mu_coeff=self.mu_coeff, post_step=PostStep()
111            )
112        else:
113            self.solvers[n_iters].advance(self.nt, mu_coeff=self.mu_coeff)
114
115        return self.solvers[n_iters].advectee.get()
116
117    def terminal_value(self):
118        return self.solvers[1].advectee.get()
class Simulation:
 11class Simulation:
 12    @staticmethod
 13    def _factory(
 14        *, options: Options, advectee: np.ndarray, advector: float, boundary_conditions
 15    ):
 16        stepper = Stepper(
 17            options=options, n_dims=len(advectee.shape), non_unit_g_factor=False
 18        )
 19        return Solver(
 20            stepper=stepper,
 21            advectee=ScalarField(
 22                advectee.astype(dtype=options.dtype),
 23                halo=options.n_halo,
 24                boundary_conditions=boundary_conditions,
 25            ),
 26            advector=VectorField(
 27                (np.full(advectee.shape[0] + 1, advector, dtype=options.dtype),),
 28                halo=options.n_halo,
 29                boundary_conditions=boundary_conditions,
 30            ),
 31        )
 32
 33    def __init__(self, settings):
 34        self.settings = settings
 35
 36        sigma2 = pow(settings.sigma, 2)
 37        dx_opt = abs(
 38            settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2
 39        )
 40        dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt
 41
 42        # adjusting dt so that nt is integer
 43        self.dt = settings.T
 44        self.nt = 0
 45        while self.dt > dt_opt:
 46            self.nt += 1
 47            self.dt = settings.T / self.nt
 48
 49        # adjusting dx to match requested l^2
 50        dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma
 51
 52        # calculating actual u number and lambda
 53        self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx
 54        self.l2 = dx * dx / sigma2 / self.dt
 55
 56        # adjusting nx and setting S_beg, S_end
 57        S_beg = settings.S_match
 58        self.nx = 1
 59        while S_beg > settings.S_min:
 60            self.nx += 1
 61            S_beg = np.exp(np.log(settings.S_match) - self.nx * dx)
 62
 63        self.ix_match = self.nx
 64
 65        S_end = settings.S_match
 66        while S_end < settings.S_max:
 67            self.nx += 1
 68            S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx)
 69
 70        # asset price
 71        self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx)
 72
 73        self.mu_coeff = (0.5 / self.l2,)
 74        self.solvers = {}
 75        self.solvers[1] = self._factory(
 76            advectee=settings.payoff(self.S),
 77            advector=self.C,
 78            options=Options(n_iters=1, non_zero_mu_coeff=True),
 79            boundary_conditions=(Extrapolated(),),
 80        )
 81        self.solvers[2] = self._factory(
 82            advectee=settings.payoff(self.S),
 83            advector=self.C,
 84            options=Options(**OPTIONS),
 85            boundary_conditions=(Extrapolated(),),
 86        )
 87
 88    def run(self, n_iters: int):
 89        if self.settings.amer:
 90            psi = self.solvers[n_iters].advectee.data
 91            f_T = np.empty_like(psi)
 92            f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T)
 93            T = self.settings.T
 94            r = self.settings.r
 95            dt = self.dt
 96
 97            @numba.experimental.jitclass([])
 98            class PostStep:
 99                # pylint: disable=too-few-public-methods
100                def __init__(self):
101                    pass
102
103                def call(self, _, psi, step, index):
104                    t = T - (step + 1) * dt
105                    psi[index].field[ARG_DATA][:] += (
106                        np.maximum(psi[index].field[ARG_DATA], f_T / np.exp(r * t))
107                        - psi[index].field[ARG_DATA]
108                    )
109
110            self.solvers[n_iters].advance(
111                self.nt, mu_coeff=self.mu_coeff, post_step=PostStep()
112            )
113        else:
114            self.solvers[n_iters].advance(self.nt, mu_coeff=self.mu_coeff)
115
116        return self.solvers[n_iters].advectee.get()
117
118    def terminal_value(self):
119        return self.solvers[1].advectee.get()
Simulation(settings)
33    def __init__(self, settings):
34        self.settings = settings
35
36        sigma2 = pow(settings.sigma, 2)
37        dx_opt = abs(
38            settings.C_opt / (0.5 * sigma2 - settings.r) * settings.l2_opt * sigma2
39        )
40        dt_opt = pow(dx_opt, 2) / sigma2 / settings.l2_opt
41
42        # adjusting dt so that nt is integer
43        self.dt = settings.T
44        self.nt = 0
45        while self.dt > dt_opt:
46            self.nt += 1
47            self.dt = settings.T / self.nt
48
49        # adjusting dx to match requested l^2
50        dx = np.sqrt(settings.l2_opt * self.dt) * settings.sigma
51
52        # calculating actual u number and lambda
53        self.C = -(0.5 * sigma2 - settings.r) * (-self.dt) / dx
54        self.l2 = dx * dx / sigma2 / self.dt
55
56        # adjusting nx and setting S_beg, S_end
57        S_beg = settings.S_match
58        self.nx = 1
59        while S_beg > settings.S_min:
60            self.nx += 1
61            S_beg = np.exp(np.log(settings.S_match) - self.nx * dx)
62
63        self.ix_match = self.nx
64
65        S_end = settings.S_match
66        while S_end < settings.S_max:
67            self.nx += 1
68            S_end = np.exp(np.log(S_beg) + (self.nx - 1) * dx)
69
70        # asset price
71        self.S = np.exp(np.log(S_beg) + np.arange(self.nx) * dx)
72
73        self.mu_coeff = (0.5 / self.l2,)
74        self.solvers = {}
75        self.solvers[1] = self._factory(
76            advectee=settings.payoff(self.S),
77            advector=self.C,
78            options=Options(n_iters=1, non_zero_mu_coeff=True),
79            boundary_conditions=(Extrapolated(),),
80        )
81        self.solvers[2] = self._factory(
82            advectee=settings.payoff(self.S),
83            advector=self.C,
84            options=Options(**OPTIONS),
85            boundary_conditions=(Extrapolated(),),
86        )
settings
dt
nt
C
l2
nx
ix_match
S
mu_coeff
solvers
def run(self, n_iters: int):
 88    def run(self, n_iters: int):
 89        if self.settings.amer:
 90            psi = self.solvers[n_iters].advectee.data
 91            f_T = np.empty_like(psi)
 92            f_T[:] = psi[:] / np.exp(-self.settings.r * self.settings.T)
 93            T = self.settings.T
 94            r = self.settings.r
 95            dt = self.dt
 96
 97            @numba.experimental.jitclass([])
 98            class PostStep:
 99                # pylint: disable=too-few-public-methods
100                def __init__(self):
101                    pass
102
103                def call(self, _, psi, step, index):
104                    t = T - (step + 1) * dt
105                    psi[index].field[ARG_DATA][:] += (
106                        np.maximum(psi[index].field[ARG_DATA], f_T / np.exp(r * t))
107                        - psi[index].field[ARG_DATA]
108                    )
109
110            self.solvers[n_iters].advance(
111                self.nt, mu_coeff=self.mu_coeff, post_step=PostStep()
112            )
113        else:
114            self.solvers[n_iters].advance(self.nt, mu_coeff=self.mu_coeff)
115
116        return self.solvers[n_iters].advectee.get()
def terminal_value(self):
118    def terminal_value(self):
119        return self.solvers[1].advectee.get()