Commit bec85447 authored by Abdalah Namwenje's avatar Abdalah Namwenje
Browse files

Upload New File

parent 895f0dbe
Loading
Loading
Loading
Loading
+548 −0
Original line number Diff line number Diff line
%% Cell type:markdown id:7419044a tags:

## Code for figure 3.1 and 3.2

%% Cell type:code id:6e8f1407 tags:

``` python
import numpy as np
import matplotlib.pyplot as plt

# Define functions
def arcsinh(x):
    return np.arcsinh(x)

def g(x):
    return np.log(2 * x)

def h(x):
    return -np.log(-2 * x)

# Define x ranges
x_arcsinh = np.linspace(-10, 10, 400)
x_log_pos = np.linspace(0.1, 10, 400)
x_log_neg = np.linspace(-10, -0.1, 400)
x_diff = np.linspace(0.1, 10, 400)

# Compute y values
y_arcsinh = arcsinh(x_arcsinh)
y_log_pos = g(x_log_pos)
y_log_neg = h(x_log_neg)
y_diff = arcsinh(x_diff) - g(x_diff)

# --------- Plot 1: arcsinh(x), log(2x), -log(-2x) ---------
plt.figure(figsize=(10, 6))
plt.plot(x_arcsinh, y_arcsinh, label=r'$\mathrm{arcsinh}(P_t)$', color='blue')
plt.plot(x_log_pos, y_log_pos, label=r'$\log(2P_t)$', color='green')
plt.plot(x_log_neg, y_log_neg, label=r'$-\log|2P_t|$', color='red')

# Axes at zero
plt.axhline(0, color='black', linewidth=1)
plt.axvline(0, color='black', linewidth=1)

plt.xlabel(r'$P_t$')
plt.ylabel('Value')
plt.title('Comparison of arcsinh and log transformations')
plt.legend(fontsize=12)

plt.xticks(np.arange(-10, 11, 1))
plt.xlim(-10, 10)

plt.grid(False)  # No grid squares
plt.savefig('arcsinh_log_comparison.png')
plt.show()

# --------- Plot 2: Difference arcsinh(x) - log(2x), positive x only ---------
plt.figure(figsize=(10, 6))
plt.plot(x_diff, y_diff, label=r'$\mathrm{arcsinh}(P_t) - \log(2P_t)$', color='purple')

# Axes at zero
plt.axhline(0, color='black', linewidth=1)
plt.axvline(0, color='black', linewidth=1)

plt.xlabel(r'$P_t$')
plt.ylabel('Difference')
plt.title('Difference between arcsinh and log transformations')
plt.legend(fontsize=12)

plt.xticks(np.arange(0, 11, 1))
plt.xlim(0, 10)
plt.ylim(min(y_diff), max(y_diff))

plt.grid(False)  # No grid squares
plt.savefig('difference_plot_positive.png')
plt.show()
```

%% Cell type:code id:32c78e6a tags:

``` python
```

%% Cell type:code id:91cf8c2e tags:

``` python
```

%% Cell type:markdown id:c36b4679 tags:

## Code for Section 4.1: Plotting distributions of returns(positive stocks) with its statistics
 Columns analyzed where TELEFONICA, FORD MOTOR ,SAP, and SIEMENS, df was the name assigned to dataset

%% Cell type:code id:e63efff8 tags:

``` python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import skew, kurtosis

#Prepare Data
price_col = 'SIEMENS'
df = df.dropna(subset=[price_col])

#Log Returns
df['LogReturn'] = np.log(df[price_col] / df[price_col].shift(1))
log_returns = df['LogReturn'].dropna()

#Simple Returns
df['SimpleReturn'] = df[price_col].pct_change()
simple_returns = df['SimpleReturn'].dropna()

#Asinh (Inverse Hyperbolic Sine) Returns
df['AsinhReturn'] = np.arcsinh(df[price_col]) - np.arcsinh(df[price_col].shift(1))
asinh_returns = df['AsinhReturn'].dropna()

# New Model Returns: sign(P) * log(1 + |P|/k)
df['NewReturn'] = np.nan
abs_prices = np.abs(df[price_col].dropna())
k = 0.1 * np.median(abs_prices)

#transformed prices
transformed_prices = np.log(1 + (abs_prices / k)) * np.sign(df[price_col].dropna())
df.loc[transformed_prices.index, 'Price_Transformed'] = transformed_prices

# returns from transformed prices
df['NewReturn'] = df['Price_Transformed'] - df['Price_Transformed'].shift(1)
new_returns = df['NewReturn'].dropna()

# Function to Plot Histogram + KDE
def plot_returns_with_normal(returns, title, color):
    plt.figure(figsize=(5, 5))
    sns.histplot(returns, bins=50, stat="density", color=color, alpha=0.6)
    sns.kdeplot(returns, color="red", linewidth=2)
    plt.title(title)
    plt.xlabel('Return Value')
    plt.xlim(-0.2,0.2)
    plt.ylabel('Density')
    plt.ylim(0, 45)
    plt.grid(True, linestyle='--', alpha=0.3)
    plt.tight_layout()
    plt.show()

# --- Descriptive Statistics ---
def describe_returns(returns):
    return {
        'Mean': np.mean(returns),
        'Std Dev': np.std(returns),
        'Skewness': skew(returns),
        'Kurtosis': kurtosis(returns)
    }

# stats computation
stats_log = describe_returns(log_returns)
stats_simple = describe_returns(simple_returns)
stats_asinh = describe_returns(asinh_returns)
stats_new = describe_returns(new_returns)

# Combining into DataFrame
stats_df = pd.DataFrame(
    [stats_log, stats_simple, stats_asinh, stats_new],
    index=['Log Returns', 'Simple Returns', 'Asinh Returns', 'New model Returns']
).round(6)

# Printing stats
print("Descriptive Statistics for Return Types")
print(stats_df)

# Plotting each return distribution
plot_returns_with_normal(log_returns, 'Log-Returns', 'blue')
plot_returns_with_normal(simple_returns, 'Simple Returns', 'green')
plot_returns_with_normal(asinh_returns, 'Arcsinh Returns', 'purple')
plot_returns_with_normal(new_returns, 'New model Returns', 'skyblue')
```

%% Cell type:code id:cf48e81b tags:

``` python
```

%% Cell type:markdown id:bea1e9b9 tags:

## Code for section 4.2: Plotting distributions of returns(market with negative prices)with its statistics
the dataset was also named df, run in a different notebook

%% Cell type:code id:6d2cb807 tags:

``` python
import pandas as pd
import numpy as np
from scipy.stats import skew, kurtosis
import matplotlib.pyplot as plt
import seaborn as sns

# Preparing datetime and extract daily closings
df["Datetime (UTC)"] = pd.to_datetime(df["Datetime (UTC)"])
df["Daily Closing Price"] = np.nan
for i in range(23, len(df), 24):
    df.loc[i, "Daily Closing Price"] = df.loc[i, "Price (EUR/MWhe)"]

# Initializing return columns
df["New Return"] = np.nan
df["Asinh Return"] = np.nan
df["Simple Return"] = np.nan

# Calculating k = median of absolute daily closing prices
daily_closing_prices = df["Daily Closing Price"].dropna()
k = 0.1 * np.median(np.abs(daily_closing_prices))

# Step 3: Compute new returns, asinh, and simple returns
for i in range(23 + 24, len(df), 24):
    p_curr = df["Daily Closing Price"].iloc[i]
    if pd.isna(p_curr):
        continue

    # Find previous non-NaN, non-zero price
    j = i - 24
    p_prev = df["Daily Closing Price"].iloc[j]
    while j >= 0 and (pd.isna(p_prev) or p_prev == 0):
        j -= 24
        if j < 0:
            p_prev = np.nan
            break
        p_prev = df["Daily Closing Price"].iloc[j]

    if pd.isna(p_prev):
        continue

    # New Return (log(1 + |P|/k) * sign(P))
    rt_prev = np.log(1 + (abs(p_prev) / k)) * np.sign(p_prev)
    rt_curr = np.log(1 + (abs(p_curr) / k)) * np.sign(p_curr)
    new_r = rt_curr - rt_prev
    df.at[i, "New Return"] = new_r

    # Asinh Return
    asinh_r = np.arcsinh(p_curr) - np.arcsinh(p_prev)
    df.at[i, "Asinh Return"] = asinh_r

    # Simple Return
    simple_r = (p_curr - p_prev) / abs(p_prev)
    df.at[i, "Simple Return"] = simple_r

# Step 4: Drop NaNs
new_returns = df["New Return"].dropna()
asinh_returns = df["Asinh Return"].dropna()
simple_returns = df["Simple Return"].dropna()

# Step 5: Descriptive statistics
def describe_returns(returns):
    return {
        'Mean': np.mean(returns),
        'Std Dev': np.std(returns),
        'Skewness': skew(returns),
        'Kurtosis': kurtosis(returns)
    }

stats_hybrid = describe_returns(new_returns)
stats_asinh = describe_returns(asinh_returns)
stats_simple = describe_returns(simple_returns)

stats_df = pd.DataFrame(
    [stats_hybrid, stats_simple, stats_asinh],
    index=["New Return", "Simple Return", "Asinh Return"]
).round(6)

# Step 6: Plotting
plt.figure(figsize=(18, 5))

# New Return Plot
plt.subplot(1, 3, 1)
sns.histplot(new_returns, bins=50, stat="density", color="skyblue", alpha=0.6, label="Histogram")
sns.kdeplot(new_returns, color="blue", linewidth=2, label="KDE")
plt.title("New model returns")
plt.xlabel("Return value")
plt.xlim(-8,8)
plt.ylabel("Density")
plt.ylim(0,3)
plt.grid(True, linestyle="--", alpha=0.3)





# Asinh Return Plot
plt.subplot(1, 3, 2)
sns.histplot(asinh_returns, bins=50, stat="density", color="purple", alpha=0.6, label="Histogram")
sns.kdeplot(asinh_returns, color="purple", linewidth=2, label="KDE")
plt.title("Arcsinh Returns")
plt.xlabel("Return value")
plt.xlim(-8,8)
plt.ylabel("Density")
plt.ylim(0,3)
plt.grid(True, linestyle="--", alpha=0.3)



# Simple Return Plot
plt.subplot(1, 3, 3)
sns.histplot(simple_returns, bins=50, stat="density", color="green", alpha=0.6, label="Histogram")
sns.kdeplot(simple_returns, color="darkred", linewidth=2, label="KDE")
plt.title("Simple Returns")
plt.xlabel("Return value")
plt.ylabel("Density")
plt.grid(True, linestyle="--", alpha=0.3)




plt.tight_layout()
plt.show()

print("Descriptive Statistics:\n")
print(stats_df)
```

%% Cell type:code id:6c4c16ea tags:

``` python
```

%% Cell type:markdown id:6f129a2d tags:

## Code for Section 4.3.1: Estimating OU volatility parameter with daily closing prices

%% Cell type:code id:e448ecf6 tags:

``` python
import numpy as np
import pandas as pd
from scipy.optimize import minimize

def ou_neg_log_likelihood_euler(params, x, delta_t=1.0):
    theta, mu, sigma = params
    if sigma <= 0:
        return np.inf
    means = x[:-1] + theta * (mu - x[:-1]) * delta_t
    var = sigma**2 * delta_t
    residuals = x[1:] - means
    nll = 0.5 * np.sum(np.log(2 * np.pi * var) + (residuals**2) / var)
    return nll

# Preparation of daily closing prices
df["Datetime (UTC)"] = pd.to_datetime(df["Datetime (UTC)"])
df["Daily Closing Price"] = np.nan
for i in range(23, len(df), 24):
    df.loc[i, "Daily Closing Price"] = df.loc[i, "Price (EUR/MWhe)"]

daily_prices = df["Daily Closing Price"].dropna()
k = 0.1 * np.median(np.abs(daily_prices))

def new_transform(p):
    return np.log(1 + np.abs(p) / k) * np.sign(p)

new_prices = daily_prices.apply(new_transform).values

# Estimating OU parameters via MLE
delta_t = 1.0
init_params = [0.5, np.mean(new_prices), np.std(new_prices)]

result = minimize(
    ou_neg_log_likelihood_euler,
    init_params,
    args=(new_prices, delta_t),
    bounds=[(1e-5, None), (None, None), (1e-5, None)]
)

if result.success:
    est_theta, est_mu, est_sigma = result.x
    print("Estimated OU parameters from new-transformed prices:")
    print(f" theta = {est_theta:.6f}")
    print(f" mu    = {est_mu:.6f}")
    print(f" sigma = {est_sigma:.6f}")
else:
    print("Optimization failed:", result.message)
```

%% Cell type:code id:d73b7d08 tags:

``` python
```

%% Cell type:markdown id:72e7ae8f tags:

## Code for Section 4.3.1: Estimating OU volatility parameter with hourly prices

%% Cell type:code id:793d88d4 tags:

``` python
import numpy as np
import pandas as pd
from scipy.optimize import minimize

# Preparing datetime and sort data
df["Datetime (UTC)"] = pd.to_datetime(df["Datetime (UTC)"])
df = df.sort_values("Datetime (UTC)").reset_index(drop=True)

# New transformation function and constants
prices = df["Price (EUR/MWhe)"].dropna()
k = 0.1 * np.median(np.abs(prices))  # scaling constant

def new_transform(p):
    return np.log(1 + np.abs(p) / k) * np.sign(p)

new_prices = new_transform(prices).values

# Defining OU negative log-likelihood (Euler-Maruyama)
def ou_neg_log_likelihood_euler(params, x, delta_t=1.0 / 24):
    theta, mu, sigma = params
    if sigma <= 0 or theta <= 0:
        return np.inf
    means = x[:-1] + theta * (mu - x[:-1]) * delta_t
    var = sigma**2 * delta_t
    residuals = x[1:] - means
    nll = 0.5 * np.sum(np.log(2 * np.pi * var) + (residuals**2) / var)
    return nll

# Estimating OU parameter
delta_t = 1.0 / 24
init_params = [0.5, np.mean(new_prices), np.std(new_prices)]

result = minimize(
    ou_neg_log_likelihood_euler,
    init_params,
    args=(new_prices, delta_t),
    bounds=[(1e-5, None), (None, None), (1e-5, None)]
)

if result.success:
    est_theta, est_mu, est_sigma = result.x
    print("Estimated OU parameters from new-transformed prices:")
    print(f" theta = {est_theta:.6f}")
    print(f" mu    = {est_mu:.6f}")
    print(f" sigma = {est_sigma:.6f}")
else:
    print("Optimization failed:", result.message)
```

%% Cell type:code id:eaf8a3fa tags:

``` python
```

%% Cell type:markdown id:1e57b2c8 tags:

## Code for Section 4.3.2 and 4.3.3, Andersen Volatility and Garman-Klass

%% Cell type:code id:7c428dad tags:

``` python
import numpy as np
import pandas as pd

# Convert datetime and extract date
df['Datetime'] = pd.to_datetime(df['Datetime (UTC)'])
df['Date'] = df['Datetime'].dt.date

# Daily scale factor k = 0.1 * median(abs(price)) per day
daily_k = df.groupby('Date')['Price (EUR/MWhe)'].apply(lambda x: 0.1 * np.nanmedian(np.abs(x)))
df = df.merge(daily_k.rename('k'), on='Date', how='left')

# New Returns using new transformation
df['NewReturn'] = np.nan
for i in range(1, len(df)):
    p_prev = df.iat[i - 1, df.columns.get_loc('Price (EUR/MWhe)')]
    p_curr = df.iat[i, df.columns.get_loc('Price (EUR/MWhe)')]
    k_curr = df.iat[i, df.columns.get_loc('k')]
    date_prev = df.iat[i - 1, df.columns.get_loc('Date')]
    date_curr = df.iat[i, df.columns.get_loc('Date')]

    # Skip if NaN or crossing day boundary
    if pd.isna(p_prev) or pd.isna(p_curr) or pd.isna(k_curr) or date_prev != date_curr:
        continue

    rt_prev = np.log(1 + abs(p_prev) / k_curr) * np.sign(p_prev)
    rt_curr = np.log(1 + abs(p_curr) / k_curr) * np.sign(p_curr)
    df.iat[i, df.columns.get_loc('NewReturn')] = rt_curr - rt_prev  # fix column name here

# New Transformed Prices
def new_transform(p, k):
    if pd.isna(p) or pd.isna(k) or k <= 0:
        return np.nan
    return np.sign(p) * np.log(1 + abs(p) / k)

df['NewPrice'] = df.apply(lambda row: new_transform(row['Price (EUR/MWhe)'], row['k']), axis=1)

# Daily Garman-Klass volatility
def garman_klass_volatility(group):
    prices = group['NewPrice'].dropna()
    if len(prices) < 2:
        return np.nan
    O = prices.iloc[0]
    H = prices.max()
    L = prices.min()
    C = prices.iloc[-1]

    var = 0.5 * (H - L)**2 - (2 * np.log(2) - 1) * (C - O)**2
    return np.sqrt(var) if var > 0 else np.nan

daily_gk_vol = df.groupby('Date').apply(garman_klass_volatility).reset_index(name='GarmanKlassVolatility')

# Daily Andersen volatility: sqrt of sum of squared returns per day
df['NewReturnSquared'] = df['NewReturn'] ** 2
daily_andersen_vol = df.groupby('Date')['NewReturnSquared'].sum().apply(np.sqrt).reset_index(name='NewVolatility')

# RMS-based average volatilities
andersen2_vol = np.sqrt(np.nanmean(daily_andersen_vol['NewVolatility'] ** 2))
print(f"Average daily Andersen2 volatility (RMS, New Model): {andersen2_vol:.6f}")

garman_klass2_vol = np.sqrt(np.nanmean(daily_gk_vol['GarmanKlassVolatility'] ** 2))
print(f"Average daily Garman-Klass2 volatility (RMS, New Model): {garman_klass2_vol:.6f}")

# Output sample volatilities
print("\nSample of daily Andersen volatility:")
print(daily_andersen_vol.head())

print("\nSample of daily Garman-Klass volatility:")
print(daily_gk_vol.head())
```

%% Cell type:code id:e69fcecf tags:

``` python
```

%% Cell type:code id:5f8d6239 tags:

``` python
```

%% Cell type:code id:a1c25a7c tags:

``` python
```

%% Cell type:code id:4806bb89 tags:

``` python
```

%% Cell type:code id:353cb044 tags:

``` python
```

%% Cell type:code id:c252bbe7 tags:

``` python
```