Vary C¶
This notebook provides a structured framework for time series analysis using LSTM models. It includes functions for:
- Data loading and preprocessing
- Model configuration and training
- Evaluation and visualization
In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mlflow
import random
import os, sys
from pathlib import Path
from typing import Dict, Tuple, List
# Setup paths
lstm_exp = Path("/home/ytli/research/lstm")
sys.path.append(str(lstm_exp))
# Import custom modules
from modules.dataset import create_subject_sliding_windows, TimeSeriesWindowDataset, time_series_train_val_splits
from modules.model import LSTMRegressor
from modules.plot import plot_all_subjects_combined, plot_time_series_by_subject
# PyTorch imports
import torch
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from lightning.pytorch.callbacks import EarlyStopping
import lightning as L
# Set random seeds
def set_random_seeds(seed: int = 42):
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)
# Configure GPU if available
def setup_device():
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
return device
import logging
logging.getLogger("pytorch_lightning.utilities.rank_zero").setLevel(logging.WARNING)
logging.getLogger("pytorch_lightning.accelerators.cuda").setLevel(logging.WARNING)
logging.getLogger("mlflow").setLevel(logging.ERROR)
In [2]:
class TimeSeriesAnalysis:
def __init__(self, experiment_name: str, study_name: str, folder_name: str, difference_value: float):
self.lstm_exp = Path("/home/ytli/research/lstm")
self.experiment_name = experiment_name
self.study_name = study_name
self.folder_name = folder_name
self.difference_value = difference_value
self.data_path = self.lstm_exp / study_name / folder_name / f'difference={self.difference_value}'
self.device = setup_device()
set_random_seeds()
# MLflow setup
mlflow.set_tracking_uri("http://localhost:8093")
mlflow.set_experiment(experiment_name)
def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Load the time series and subject information data."""
record_path = self.data_path / 'records.csv'
subject_info_path = self.data_path / 'subject_info.csv'
df = pd.read_csv(record_path)
subject_info_df = pd.read_csv(subject_info_path)
# Convert subject_id to string
df['subject_id'] = df['subject_id'].astype(str)
subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
return df, subject_info_df
def load_test_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Load the test data."""
test_record_path = self.data_path / 'test_subjects' / 'records.csv'
test_subject_info_path = self.data_path / 'test_subjects' / 'subject_info.csv'
df = pd.read_csv(test_record_path)
subject_info_df = pd.read_csv(test_subject_info_path)
# Convert subject_id to string
df['subject_id'] = df['subject_id'].astype(str)
subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
return df, subject_info_df
def create_sliding_windows(self, df: pd.DataFrame, window_size: int = 5) -> Dict:
"""Create sliding windows for each subject."""
sliding_windows_dict = {}
for subject_id in df['subject_id'].unique():
subject_df = df[df['subject_id'] == subject_id]
sliding_windows, targets = create_subject_sliding_windows(
subject_df,
window_size=window_size,
stride=1,
feature_cols=['y'],
target_col='y',
)
sliding_windows_dict[subject_id] = {"X": sliding_windows, "y": targets}
return sliding_windows_dict
def prepare_datasets(self, sliding_windows_dict: Dict, test_size: int = 100, val_size: int = 100) -> Tuple[DataLoader, DataLoader, DataLoader]:
"""Prepare train, validation, and test datasets."""
# Split test data
subject_test_data = {}
subject_dev_data = {}
for subject_id, data in sliding_windows_dict.items():
X, y = data["X"], data["y"]
subject_test_data[subject_id] = {"X": X[-test_size:], "y": y[-test_size:]}
subject_dev_data[subject_id] = {"X": X[:-test_size], "y": y[:-test_size]}
# Create datasets
subject_test_dataset = {}
for subject_id, data in subject_test_data.items():
subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
subject_dev_dataset = time_series_train_val_splits(
data=subject_dev_data,
n_splits=5,
val_size=val_size,
)
# Combine datasets
last_train_fold_combined = []
last_val_fold_combined = []
for subject_id, data in subject_dev_dataset.items():
last_dev_fold = data[-1]
last_train_fold_combined.append(last_dev_fold['train'])
last_val_fold_combined.append(last_dev_fold['val'])
train_dataset = ConcatDataset(last_train_fold_combined)
val_dataset = ConcatDataset(last_val_fold_combined)
test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
return train_loader, val_loader, test_loader
def prepare_test_datasets(self, sliding_windows_dict: Dict) -> DataLoader:
"""Prepare test datasets."""
subject_test_data = sliding_windows_dict
subject_test_dataset = {}
for subject_id, data in subject_test_data.items():
subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
return test_loader
def train_model(self, train_loader: DataLoader, val_loader: DataLoader, test_loader: DataLoader) -> Tuple[LSTMRegressor, Dict]:
"""Train the LSTM model and evaluate it."""
mlflow.pytorch.autolog(log_every_n_epoch=1, log_every_n_step=10)
model = LSTMRegressor(input_size=1, hidden_size=64, num_layers=2, lr=1e-3)
early_stop_cb = EarlyStopping(
monitor="val_loss",
min_delta=0.00,
patience=3,
mode="min",
verbose=False,
)
torch.set_float32_matmul_precision('high')
with mlflow.start_run(run_name=f"{self.folder_name}_difference_{self.difference_value}"):
trainer = L.Trainer(
max_epochs=50,
accelerator="gpu",
devices=1,
deterministic=True,
check_val_every_n_epoch=1,
logger=False,
enable_checkpointing=False,
enable_progress_bar=False,
enable_model_summary=False,
callbacks=[early_stop_cb],
)
trainer.fit(model, train_loader, val_loader)
best_model = mlflow.pytorch.load_checkpoint(LSTMRegressor, run_id=mlflow.active_run().info.run_id)
test_results = trainer.test(best_model, test_loader, verbose=False)
mlflow.log_metrics({"test_loss": test_results[0]["test_loss"]})
return best_model, test_results
def evaluate_model(self, model: LSTMRegressor, test_loader: DataLoader) -> Dict:
"""Evaluate the model on test data and prepare visualization data."""
subject_ids = [dataset.subject_id for dataset in test_loader.dataset.datasets]
test_eval_data = {subject_id: {'pred': [], 'target': []} for subject_id in subject_ids}
with torch.no_grad():
model = model.to(self.device)
model.eval()
for dataset in test_loader.dataset.datasets:
subject_id = dataset.subject_id
tmp_loader = DataLoader(dataset, batch_size=32, shuffle=False)
for batch in tmp_loader:
X, y = batch
X = X.to(self.device)
y = y.to(self.device)
pred = model(X).cpu().numpy()
test_eval_data[subject_id]['pred'].extend(pred)
test_eval_data[subject_id]['target'].extend(y.cpu().numpy())
return test_eval_data
In [3]:
conditions_path = lstm_exp / 'study2' / 'conditions.csv'
conditions_df = pd.read_csv(conditions_path)
conditions_df
Out[3]:
| Condition | Difference | A (train) | B (train) | C (train) | D (train) | E (test) | F (test) | G (test) | H (test) | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | vary_A | 0.5 | [1.0, 1.5, 2.0] | 1 | 0 | 0 | [0.5, 2.5] | 1 | 0 | 0 |
| 1 | vary_A | 1.0 | [2.0, 3.0, 4.0] | 1 | 0 | 0 | [1.0, 5.0] | 1 | 0 | 0 |
| 2 | vary_B | 0.5 | 1 | [1.0, 1.5, 2.0] | 0 | 0 | 1 | [0.5, 2.5] | 0 | 0 |
| 3 | vary_B | 1.0 | 1 | [1.5, 2.5, 3.5] | 0 | 0 | 1 | [0.5, 4.5] | 0 | 0 |
| 4 | vary_C | 0.5 | 1 | 1 | [-0.5, 0.0, 0.5] | 0 | 1 | 1 | [-1.0, 1.0] | 0 |
| 5 | vary_C | 1.0 | 1 | 1 | [-1.0, 0.0, 1.0] | 0 | 1 | 1 | [-2.0, 2.0] | 0 |
| 6 | vary_C | 2.0 | 1 | 1 | [-2.0, 0.0, 2.0] | 0 | 1 | 1 | [-4.0, 4.0] | 0 |
| 7 | vary_D | 0.5 | 1 | 1 | 0 | [-0.5, 0.0, 0.5] | 1 | 1 | 0 | [-1.0, 1.0] |
| 8 | vary_D | 1.0 | 1 | 1 | 0 | [-1.0, 0.0, 1.0] | 1 | 1 | 0 | [-2.0, 2.0] |
| 9 | vary_D | 2.0 | 1 | 1 | 0 | [-2.0, 0.0, 2.0] | 1 | 1 | 0 | [-4.0, 4.0] |
Differences¶
In [4]:
study_name = "study2"
folder_name = "vary_C"
sub_condition_df = conditions_df[conditions_df['Condition'] == folder_name]
for index, row in sub_condition_df.iterrows():
difference_value = row['Difference']
print("="*80)
print(f"\033[1m\033[94m>>> CONDITION: {row['Condition']}, DIFFERENCE VALUE: {difference_value} <<<\033[0m")
print("-"*80)
# Determine which parameters are being varied based on the condition
condition = row['Condition']
if condition == "vary_A":
trained_on = row['A (train)']
test_on = row['E (test)']
print(f"\033[1m\033[92mTRAINING SET:\033[0m A values in {trained_on}")
print(f"\033[1m\033[93mTEST SET:\033[0m A values in {test_on}")
elif condition == "vary_B":
trained_on = row['B (train)']
test_on = row['F (test)']
print(f"\033[1m\033[92mTRAINING SET:\033[0m B values in {trained_on}")
print(f"\033[1m\033[93mTEST SET:\033[0m B values in {test_on}")
elif condition == "vary_C":
trained_on = row['C (train)']
test_on = row['G (test)']
print(f"\033[1m\033[92mTRAINING SET:\033[0m C values in {trained_on}")
print(f"\033[1m\033[93mTEST SET:\033[0m C values in {test_on}")
elif condition == "vary_D":
trained_on = row['D (train)']
test_on = row['H (test)']
print(f"\033[1m\033[92mTRAINING SET:\033[0m D values in {trained_on}")
print(f"\033[1m\033[93mTEST SET:\033[0m D values in {test_on}")
print("-"*80)
# Initialize the analysis
analysis = TimeSeriesAnalysis(experiment_name="test3", study_name=study_name, folder_name=folder_name, difference_value=difference_value)
# Load and prepare data
df, subject_info_df = analysis.load_data()
test_df, test_subject_info_df = analysis.load_test_data()
all_df = pd.concat([df, test_df], ignore_index=True)
all_subject_info_df = pd.concat([subject_info_df, test_subject_info_df], ignore_index=True)
plot_time_series_by_subject(all_df, all_subject_info_df).show()
sliding_windows_dict = analysis.create_sliding_windows(df)
sliding_windows_dict_test = analysis.create_sliding_windows(test_df)
train_loader, val_loader, _not_used_test_loader = analysis.prepare_datasets(sliding_windows_dict)
test_loader = analysis.prepare_test_datasets(sliding_windows_dict_test)
# Train model
print("\033[1m\033[95mTraining model...\033[0m")
best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
print(f"\033[1m\033[96mTEST RESULTS:\033[0m loss = {test_results[0]['test_loss']:.5f}")
# Evaluate and visualize
print("\033[1m\033[95mEvaluating model on test data...\033[0m")
test_eval_data = analysis.evaluate_model(best_model, test_loader)
plot_all_subjects_combined(test_eval_data).show()
print("="*80)
================================================================================ >>> CONDITION: vary_C, DIFFERENCE VALUE: 0.5 <<< -------------------------------------------------------------------------------- TRAINING SET: C values in [-0.5, 0.0, 0.5] TEST SET: C values in [-1.0, 1.0] --------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts: 0%| | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_C_difference_0.5 at: http://localhost:8093/#/experiments/91/runs/51d433e003b7493ca32ae3f53f604159 🧪 View experiment at: http://localhost:8093/#/experiments/91 TEST RESULTS: loss = 0.00005 Evaluating model on test data...
================================================================================ ================================================================================ >>> CONDITION: vary_C, DIFFERENCE VALUE: 1.0 <<< -------------------------------------------------------------------------------- TRAINING SET: C values in [-1.0, 0.0, 1.0] TEST SET: C values in [-2.0, 2.0] --------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts: 0%| | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_C_difference_1.0 at: http://localhost:8093/#/experiments/91/runs/92bb15d5c72147eea45ddca51708c019 🧪 View experiment at: http://localhost:8093/#/experiments/91 TEST RESULTS: loss = 0.00002 Evaluating model on test data...
================================================================================ ================================================================================ >>> CONDITION: vary_C, DIFFERENCE VALUE: 2.0 <<< -------------------------------------------------------------------------------- TRAINING SET: C values in [-2.0, 0.0, 2.0] TEST SET: C values in [-4.0, 4.0] --------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts: 0%| | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_C_difference_2.0 at: http://localhost:8093/#/experiments/91/runs/71047cc346dc4ad29af681c34c18fc4c 🧪 View experiment at: http://localhost:8093/#/experiments/91 TEST RESULTS: loss = 0.00004 Evaluating model on test data...
================================================================================