Vary A¶

This notebook provides a structured framework for time series analysis using LSTM models. It includes functions for:

  • Data loading and preprocessing
  • Model configuration and training
  • Evaluation and visualization
In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mlflow
import random
import os, sys
from pathlib import Path
from typing import Dict, Tuple, List

# Setup paths
lstm_exp = Path("/home/ytli/research/lstm")
sys.path.append(str(lstm_exp))

# Import custom modules
from modules.dataset import create_subject_sliding_windows, TimeSeriesWindowDataset, time_series_train_val_splits
from modules.model import LSTMRegressor
from modules.plot import plot_all_subjects_combined, plot_time_series_by_subject

# PyTorch imports
import torch
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from lightning.pytorch.callbacks import EarlyStopping
import lightning as L

# Set random seeds
def set_random_seeds(seed: int = 42):
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)

# Configure GPU if available
def setup_device():
    os.environ["CUDA_VISIBLE_DEVICES"] = "0"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    return device

import logging
logging.getLogger("pytorch_lightning.utilities.rank_zero").setLevel(logging.WARNING)
logging.getLogger("pytorch_lightning.accelerators.cuda").setLevel(logging.WARNING)
logging.getLogger("mlflow").setLevel(logging.ERROR)
In [2]:
class TimeSeriesAnalysis:
    def __init__(self, experiment_name: str, study_name: str, folder_name: str, difference_value: float):
        self.lstm_exp = Path("/home/ytli/research/lstm")
        self.experiment_name = experiment_name
        self.study_name = study_name
        self.folder_name = folder_name
        self.difference_value = difference_value
        self.data_path = self.lstm_exp / study_name / folder_name / f'difference={self.difference_value}'

        self.device = setup_device()
        set_random_seeds()
        
        # MLflow setup
        mlflow.set_tracking_uri("http://localhost:8093")
        mlflow.set_experiment(experiment_name)
        
    def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Load the time series and subject information data."""
        record_path = self.data_path / 'records.csv'
        subject_info_path = self.data_path / 'subject_info.csv'
        
        df = pd.read_csv(record_path)
        subject_info_df = pd.read_csv(subject_info_path)
        
        # Convert subject_id to string
        df['subject_id'] = df['subject_id'].astype(str)
        subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
        
        return df, subject_info_df
    
    def load_test_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Load the test data."""
        test_record_path = self.data_path / 'test_subjects' / 'records.csv'
        test_subject_info_path = self.data_path / 'test_subjects' / 'subject_info.csv'
        
        df = pd.read_csv(test_record_path)
        subject_info_df = pd.read_csv(test_subject_info_path)
        
        # Convert subject_id to string
        df['subject_id'] = df['subject_id'].astype(str)
        subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
        
        return df, subject_info_df

    
    def create_sliding_windows(self, df: pd.DataFrame, window_size: int = 5) -> Dict:
        """Create sliding windows for each subject."""
        sliding_windows_dict = {}
        for subject_id in df['subject_id'].unique():
            subject_df = df[df['subject_id'] == subject_id]
            sliding_windows, targets = create_subject_sliding_windows(
                subject_df,
                window_size=window_size,
                stride=1,
                feature_cols=['y'],
                target_col='y',
            )
            sliding_windows_dict[subject_id] = {"X": sliding_windows, "y": targets}
        return sliding_windows_dict
    
    def prepare_datasets(self, sliding_windows_dict: Dict, test_size: int = 100, val_size: int = 100) -> Tuple[DataLoader, DataLoader, DataLoader]:
        """Prepare train, validation, and test datasets."""
        # Split test data
        subject_test_data = {}
        subject_dev_data = {}
        for subject_id, data in sliding_windows_dict.items():
            X, y = data["X"], data["y"]
            subject_test_data[subject_id] = {"X": X[-test_size:], "y": y[-test_size:]}
            subject_dev_data[subject_id] = {"X": X[:-test_size], "y": y[:-test_size]}
        
        # Create datasets
        subject_test_dataset = {}
        for subject_id, data in subject_test_data.items():
            subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
        
        subject_dev_dataset = time_series_train_val_splits(
            data=subject_dev_data,
            n_splits=5,
            val_size=val_size,
        )
        
        # Combine datasets
        last_train_fold_combined = []
        last_val_fold_combined = []
        for subject_id, data in subject_dev_dataset.items():
            last_dev_fold = data[-1]
            last_train_fold_combined.append(last_dev_fold['train'])
            last_val_fold_combined.append(last_dev_fold['val'])
        
        train_dataset = ConcatDataset(last_train_fold_combined)
        val_dataset = ConcatDataset(last_val_fold_combined)
        test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
        
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
        
        return train_loader, val_loader, test_loader
    
    def prepare_test_datasets(self, sliding_windows_dict: Dict) -> DataLoader:
        """Prepare test datasets."""
        subject_test_data = sliding_windows_dict

        subject_test_dataset = {}
        for subject_id, data in subject_test_data.items():
            subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
        
        test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
        
        return test_loader

    
    def train_model(self, train_loader: DataLoader, val_loader: DataLoader, test_loader: DataLoader) -> Tuple[LSTMRegressor, Dict]:
        """Train the LSTM model and evaluate it."""
        mlflow.pytorch.autolog(log_every_n_epoch=1, log_every_n_step=10)
        
        model = LSTMRegressor(input_size=1, hidden_size=64, num_layers=2, lr=1e-3)
        early_stop_cb = EarlyStopping(
            monitor="val_loss",
            min_delta=0.00,
            patience=3,
            mode="min",
            verbose=False,
        )
        
        torch.set_float32_matmul_precision('high')
        
        with mlflow.start_run(run_name=f"{self.folder_name}_difference_{self.difference_value}"):
            trainer = L.Trainer(
                max_epochs=50,
                accelerator="gpu",
                devices=1,
                deterministic=True,
                check_val_every_n_epoch=1,
                logger=False,
                enable_checkpointing=False,
                enable_progress_bar=False,
                enable_model_summary=False,
                callbacks=[early_stop_cb],
            )
            
            trainer.fit(model, train_loader, val_loader)
            best_model = mlflow.pytorch.load_checkpoint(LSTMRegressor, run_id=mlflow.active_run().info.run_id)
            
            test_results = trainer.test(best_model, test_loader, verbose=False)
            mlflow.log_metrics({"test_loss": test_results[0]["test_loss"]})
        
        return best_model, test_results
    
    def evaluate_model(self, model: LSTMRegressor, test_loader: DataLoader) -> Dict:
        """Evaluate the model on test data and prepare visualization data."""
        subject_ids = [dataset.subject_id for dataset in test_loader.dataset.datasets]
        test_eval_data = {subject_id: {'pred': [], 'target': []} for subject_id in subject_ids}
        
        with torch.no_grad():
            model = model.to(self.device)
            model.eval()
            for dataset in test_loader.dataset.datasets:
                subject_id = dataset.subject_id
                tmp_loader = DataLoader(dataset, batch_size=32, shuffle=False)
                for batch in tmp_loader:
                    X, y = batch
                    X = X.to(self.device)
                    y = y.to(self.device)
                    pred = model(X).cpu().numpy()
                    test_eval_data[subject_id]['pred'].extend(pred)
                    test_eval_data[subject_id]['target'].extend(y.cpu().numpy())
        
        return test_eval_data
In [3]:
conditions_path = lstm_exp / 'study2' / 'conditions.csv'
conditions_df = pd.read_csv(conditions_path)
conditions_df
Out[3]:
Condition Difference A (train) B (train) C (train) D (train) E (test) F (test) G (test) H (test)
0 vary_A 0.5 [1.0, 1.5, 2.0] 1 0 0 [0.5, 2.5] 1 0 0
1 vary_A 1.0 [2.0, 3.0, 4.0] 1 0 0 [1.0, 5.0] 1 0 0
2 vary_B 0.5 1 [1.0, 1.5, 2.0] 0 0 1 [0.5, 2.5] 0 0
3 vary_B 1.0 1 [1.5, 2.5, 3.5] 0 0 1 [0.5, 4.5] 0 0
4 vary_C 0.5 1 1 [-0.5, 0.0, 0.5] 0 1 1 [-1.0, 1.0] 0
5 vary_C 1.0 1 1 [-1.0, 0.0, 1.0] 0 1 1 [-2.0, 2.0] 0
6 vary_C 2.0 1 1 [-2.0, 0.0, 2.0] 0 1 1 [-4.0, 4.0] 0
7 vary_D 0.5 1 1 0 [-0.5, 0.0, 0.5] 1 1 0 [-1.0, 1.0]
8 vary_D 1.0 1 1 0 [-1.0, 0.0, 1.0] 1 1 0 [-2.0, 2.0]
9 vary_D 2.0 1 1 0 [-2.0, 0.0, 2.0] 1 1 0 [-4.0, 4.0]

Differences¶

In [4]:
study_name = "study2"
folder_name = "vary_A"
sub_condition_df = conditions_df[conditions_df['Condition'] == folder_name]
for index, row in sub_condition_df.iterrows():
    difference_value = row['Difference']

    print("="*80)
    print(f"\033[1m\033[94m>>> CONDITION: {row['Condition']}, DIFFERENCE VALUE: {difference_value} <<<\033[0m")
    print("-"*80)
    
    # Determine which parameters are being varied based on the condition
    condition = row['Condition']
    if condition == "vary_A":
        trained_on = row['A (train)']
        test_on = row['E (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m A values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m A values in {test_on}")
    elif condition == "vary_B":
        trained_on = row['B (train)']
        test_on = row['F (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m B values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m B values in {test_on}")
    elif condition == "vary_C":
        trained_on = row['C (train)']
        test_on = row['G (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m C values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m C values in {test_on}")
    elif condition == "vary_D":
        trained_on = row['D (train)']
        test_on = row['H (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m D values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m D values in {test_on}")
    print("-"*80)

    # Initialize the analysis
    analysis = TimeSeriesAnalysis(experiment_name="test3", study_name=study_name, folder_name=folder_name, difference_value=difference_value)

    # Load and prepare data
    df, subject_info_df = analysis.load_data()
    test_df, test_subject_info_df = analysis.load_test_data()

    all_df = pd.concat([df, test_df], ignore_index=True)
    all_subject_info_df = pd.concat([subject_info_df, test_subject_info_df], ignore_index=True)
    plot_time_series_by_subject(all_df, all_subject_info_df).show()

    sliding_windows_dict = analysis.create_sliding_windows(df)
    sliding_windows_dict_test = analysis.create_sliding_windows(test_df)
    train_loader, val_loader, _not_used_test_loader = analysis.prepare_datasets(sliding_windows_dict)
    test_loader = analysis.prepare_test_datasets(sliding_windows_dict_test)

    # Train model
    print("\033[1m\033[95mTraining model...\033[0m")
    best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
    print(f"\033[1m\033[96mTEST RESULTS:\033[0m loss = {test_results[0]['test_loss']:.5f}")

    # Evaluate and visualize
    print("\033[1m\033[95mEvaluating model on test data...\033[0m")
    test_eval_data = analysis.evaluate_model(best_model, test_loader)
    plot_all_subjects_combined(test_eval_data).show()
    print("="*80)
================================================================================
>>> CONDITION: vary_A, DIFFERENCE VALUE: 0.5 <<<
--------------------------------------------------------------------------------
TRAINING SET: A values in [1.0, 1.5, 2.0]
TEST SET: A values in [0.5, 2.5]
--------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_A_difference_0.5 at: http://localhost:8093/#/experiments/91/runs/68bf871adb064c0c8c97eee52ec5124e
🧪 View experiment at: http://localhost:8093/#/experiments/91
TEST RESULTS: loss = 0.00077
Evaluating model on test data...
================================================================================
================================================================================
>>> CONDITION: vary_A, DIFFERENCE VALUE: 1.0 <<<
--------------------------------------------------------------------------------
TRAINING SET: A values in [2.0, 3.0, 4.0]
TEST SET: A values in [1.0, 5.0]
--------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_A_difference_1.0 at: http://localhost:8093/#/experiments/91/runs/ed5be6f603484cfabf224bdb31eeb152
🧪 View experiment at: http://localhost:8093/#/experiments/91
TEST RESULTS: loss = 0.00326
Evaluating model on test data...
================================================================================