Vary D¶
This notebook provides a structured framework for time series analysis using LSTM models. It includes functions for:
- Data loading and preprocessing
- Model configuration and training
- Evaluation and visualization
In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mlflow
import random
import os, sys
from pathlib import Path
from typing import Dict, Tuple, List
# Setup paths
lstm_exp = Path("/home/ytli/research/lstm")
sys.path.append(str(lstm_exp))
# Import custom modules
from modules.dataset import create_subject_sliding_windows, TimeSeriesWindowDataset, time_series_train_val_splits
from modules.model import LSTMRegressor
from modules.plot import plot_all_subjects_combined, plot_time_series_by_subject
# PyTorch imports
import torch
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from lightning.pytorch.callbacks import EarlyStopping
import lightning as L
# Set random seeds
def set_random_seeds(seed: int = 42):
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)
# Configure GPU if available
def setup_device():
    os.environ["CUDA_VISIBLE_DEVICES"] = "0"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    return device
import logging
logging.getLogger("pytorch_lightning.utilities.rank_zero").setLevel(logging.WARNING)
logging.getLogger("pytorch_lightning.accelerators.cuda").setLevel(logging.WARNING)
logging.getLogger("mlflow").setLevel(logging.ERROR)
In [2]:
class TimeSeriesAnalysis:
    def __init__(self, experiment_name: str, study_name: str, folder_name: str, difference_value: float):
        self.lstm_exp = Path("/home/ytli/research/lstm")
        self.experiment_name = experiment_name
        self.study_name = study_name
        self.folder_name = folder_name
        self.difference_value = difference_value
        self.data_path = self.lstm_exp / study_name / folder_name / f'difference={self.difference_value}'
        self.device = setup_device()
        set_random_seeds()
        
        # MLflow setup
        mlflow.set_tracking_uri("http://localhost:8093")
        mlflow.set_experiment(experiment_name)
        
    def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Load the time series and subject information data."""
        record_path = self.data_path / 'records.csv'
        subject_info_path = self.data_path / 'subject_info.csv'
        
        df = pd.read_csv(record_path)
        subject_info_df = pd.read_csv(subject_info_path)
        
        # Convert subject_id to string
        df['subject_id'] = df['subject_id'].astype(str)
        subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
        
        return df, subject_info_df
    
    def load_test_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Load the test data."""
        test_record_path = self.data_path / 'test_subjects' / 'records.csv'
        test_subject_info_path = self.data_path / 'test_subjects' / 'subject_info.csv'
        
        df = pd.read_csv(test_record_path)
        subject_info_df = pd.read_csv(test_subject_info_path)
        
        # Convert subject_id to string
        df['subject_id'] = df['subject_id'].astype(str)
        subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
        
        return df, subject_info_df
    
    def create_sliding_windows(self, df: pd.DataFrame, window_size: int = 5) -> Dict:
        """Create sliding windows for each subject."""
        sliding_windows_dict = {}
        for subject_id in df['subject_id'].unique():
            subject_df = df[df['subject_id'] == subject_id]
            sliding_windows, targets = create_subject_sliding_windows(
                subject_df,
                window_size=window_size,
                stride=1,
                feature_cols=['y'],
                target_col='y',
            )
            sliding_windows_dict[subject_id] = {"X": sliding_windows, "y": targets}
        return sliding_windows_dict
    
    def prepare_datasets(self, sliding_windows_dict: Dict, test_size: int = 100, val_size: int = 100) -> Tuple[DataLoader, DataLoader, DataLoader]:
        """Prepare train, validation, and test datasets."""
        # Split test data
        subject_test_data = {}
        subject_dev_data = {}
        for subject_id, data in sliding_windows_dict.items():
            X, y = data["X"], data["y"]
            subject_test_data[subject_id] = {"X": X[-test_size:], "y": y[-test_size:]}
            subject_dev_data[subject_id] = {"X": X[:-test_size], "y": y[:-test_size]}
        
        # Create datasets
        subject_test_dataset = {}
        for subject_id, data in subject_test_data.items():
            subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
        
        subject_dev_dataset = time_series_train_val_splits(
            data=subject_dev_data,
            n_splits=5,
            val_size=val_size,
        )
        
        # Combine datasets
        last_train_fold_combined = []
        last_val_fold_combined = []
        for subject_id, data in subject_dev_dataset.items():
            last_dev_fold = data[-1]
            last_train_fold_combined.append(last_dev_fold['train'])
            last_val_fold_combined.append(last_dev_fold['val'])
        
        train_dataset = ConcatDataset(last_train_fold_combined)
        val_dataset = ConcatDataset(last_val_fold_combined)
        test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
        
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
        
        return train_loader, val_loader, test_loader
    
    def prepare_test_datasets(self, sliding_windows_dict: Dict) -> DataLoader:
        """Prepare test datasets."""
        subject_test_data = sliding_windows_dict
        subject_test_dataset = {}
        for subject_id, data in subject_test_data.items():
            subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
        
        test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
        
        return test_loader
    
    def train_model(self, train_loader: DataLoader, val_loader: DataLoader, test_loader: DataLoader) -> Tuple[LSTMRegressor, Dict]:
        """Train the LSTM model and evaluate it."""
        mlflow.pytorch.autolog(log_every_n_epoch=1, log_every_n_step=10)
        
        model = LSTMRegressor(input_size=1, hidden_size=64, num_layers=2, lr=1e-3)
        early_stop_cb = EarlyStopping(
            monitor="val_loss",
            min_delta=0.00,
            patience=3,
            mode="min",
            verbose=False,
        )
        
        torch.set_float32_matmul_precision('high')
        
        with mlflow.start_run(run_name=f"{self.folder_name}_difference_{self.difference_value}"):
            trainer = L.Trainer(
                max_epochs=50,
                accelerator="gpu",
                devices=1,
                deterministic=True,
                check_val_every_n_epoch=1,
                logger=False,
                enable_checkpointing=False,
                enable_progress_bar=False,
                enable_model_summary=False,
                callbacks=[early_stop_cb],
            )
            
            trainer.fit(model, train_loader, val_loader)
            best_model = mlflow.pytorch.load_checkpoint(LSTMRegressor, run_id=mlflow.active_run().info.run_id)
            
            test_results = trainer.test(best_model, test_loader, verbose=False)
            mlflow.log_metrics({"test_loss": test_results[0]["test_loss"]})
        
        return best_model, test_results
    
    def evaluate_model(self, model: LSTMRegressor, test_loader: DataLoader) -> Dict:
        """Evaluate the model on test data and prepare visualization data."""
        subject_ids = [dataset.subject_id for dataset in test_loader.dataset.datasets]
        test_eval_data = {subject_id: {'pred': [], 'target': []} for subject_id in subject_ids}
        
        with torch.no_grad():
            model = model.to(self.device)
            model.eval()
            for dataset in test_loader.dataset.datasets:
                subject_id = dataset.subject_id
                tmp_loader = DataLoader(dataset, batch_size=32, shuffle=False)
                for batch in tmp_loader:
                    X, y = batch
                    X = X.to(self.device)
                    y = y.to(self.device)
                    pred = model(X).cpu().numpy()
                    test_eval_data[subject_id]['pred'].extend(pred)
                    test_eval_data[subject_id]['target'].extend(y.cpu().numpy())
        
        return test_eval_data
In [3]:
conditions_path = lstm_exp / 'study2' / 'conditions.csv'
conditions_df = pd.read_csv(conditions_path)
conditions_df
Out[3]:
| Condition | Difference | A (train) | B (train) | C (train) | D (train) | E (test) | F (test) | G (test) | H (test) | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | vary_A | 0.5 | [1.0, 1.5, 2.0] | 1 | 0 | 0 | [0.5, 2.5] | 1 | 0 | 0 | 
| 1 | vary_A | 1.0 | [2.0, 3.0, 4.0] | 1 | 0 | 0 | [1.0, 5.0] | 1 | 0 | 0 | 
| 2 | vary_B | 0.5 | 1 | [1.0, 1.5, 2.0] | 0 | 0 | 1 | [0.5, 2.5] | 0 | 0 | 
| 3 | vary_B | 1.0 | 1 | [1.5, 2.5, 3.5] | 0 | 0 | 1 | [0.5, 4.5] | 0 | 0 | 
| 4 | vary_C | 0.5 | 1 | 1 | [-0.5, 0.0, 0.5] | 0 | 1 | 1 | [-1.0, 1.0] | 0 | 
| 5 | vary_C | 1.0 | 1 | 1 | [-1.0, 0.0, 1.0] | 0 | 1 | 1 | [-2.0, 2.0] | 0 | 
| 6 | vary_C | 2.0 | 1 | 1 | [-2.0, 0.0, 2.0] | 0 | 1 | 1 | [-4.0, 4.0] | 0 | 
| 7 | vary_D | 0.5 | 1 | 1 | 0 | [-0.5, 0.0, 0.5] | 1 | 1 | 0 | [-1.0, 1.0] | 
| 8 | vary_D | 1.0 | 1 | 1 | 0 | [-1.0, 0.0, 1.0] | 1 | 1 | 0 | [-2.0, 2.0] | 
| 9 | vary_D | 2.0 | 1 | 1 | 0 | [-2.0, 0.0, 2.0] | 1 | 1 | 0 | [-4.0, 4.0] | 
Differences¶
In [4]:
study_name = "study2"
folder_name = "vary_D"
sub_condition_df = conditions_df[conditions_df['Condition'] == folder_name]
for index, row in sub_condition_df.iterrows():
    difference_value = row['Difference']
    print("="*80)
    print(f"\033[1m\033[94m>>> CONDITION: {row['Condition']}, DIFFERENCE VALUE: {difference_value} <<<\033[0m")
    print("-"*80)
    
    # Determine which parameters are being varied based on the condition
    condition = row['Condition']
    if condition == "vary_A":
        trained_on = row['A (train)']
        test_on = row['E (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m A values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m A values in {test_on}")
    elif condition == "vary_B":
        trained_on = row['B (train)']
        test_on = row['F (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m B values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m B values in {test_on}")
    elif condition == "vary_C":
        trained_on = row['C (train)']
        test_on = row['G (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m C values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m C values in {test_on}")
    elif condition == "vary_D":
        trained_on = row['D (train)']
        test_on = row['H (test)']
        print(f"\033[1m\033[92mTRAINING SET:\033[0m D values in {trained_on}")
        print(f"\033[1m\033[93mTEST SET:\033[0m D values in {test_on}")
    print("-"*80)
    # Initialize the analysis
    analysis = TimeSeriesAnalysis(experiment_name="test3", study_name=study_name, folder_name=folder_name, difference_value=difference_value)
    # Load and prepare data
    df, subject_info_df = analysis.load_data()
    test_df, test_subject_info_df = analysis.load_test_data()
    all_df = pd.concat([df, test_df], ignore_index=True)
    all_subject_info_df = pd.concat([subject_info_df, test_subject_info_df], ignore_index=True)
    plot_time_series_by_subject(all_df, all_subject_info_df).show()
    sliding_windows_dict = analysis.create_sliding_windows(df)
    sliding_windows_dict_test = analysis.create_sliding_windows(test_df)
    train_loader, val_loader, _not_used_test_loader = analysis.prepare_datasets(sliding_windows_dict)
    test_loader = analysis.prepare_test_datasets(sliding_windows_dict_test)
    # Train model
    print("\033[1m\033[95mTraining model...\033[0m")
    best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
    print(f"\033[1m\033[96mTEST RESULTS:\033[0m loss = {test_results[0]['test_loss']:.5f}")
    # Evaluate and visualize
    print("\033[1m\033[95mEvaluating model on test data...\033[0m")
    test_eval_data = analysis.evaluate_model(best_model, test_loader)
    plot_all_subjects_combined(test_eval_data).show()
    print("="*80)
================================================================================ >>> CONDITION: vary_D, DIFFERENCE VALUE: 0.5 <<< -------------------------------------------------------------------------------- TRAINING SET: D values in [-0.5, 0.0, 0.5] TEST SET: D values in [-1.0, 1.0] --------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts: 0%| | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_D_difference_0.5 at: http://localhost:8093/#/experiments/91/runs/f5bfaf27752c486199c150f4768aeb99 🧪 View experiment at: http://localhost:8093/#/experiments/91 TEST RESULTS: loss = 0.00035 Evaluating model on test data...
================================================================================ ================================================================================ >>> CONDITION: vary_D, DIFFERENCE VALUE: 1.0 <<< -------------------------------------------------------------------------------- TRAINING SET: D values in [-1.0, 0.0, 1.0] TEST SET: D values in [-2.0, 2.0] --------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts: 0%| | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_D_difference_1.0 at: http://localhost:8093/#/experiments/91/runs/9fd817572aa042038cbf228d300f59f2 🧪 View experiment at: http://localhost:8093/#/experiments/91 TEST RESULTS: loss = 0.00149 Evaluating model on test data...
================================================================================ ================================================================================ >>> CONDITION: vary_D, DIFFERENCE VALUE: 2.0 <<< -------------------------------------------------------------------------------- TRAINING SET: D values in [-2.0, 0.0, 2.0] TEST SET: D values in [-4.0, 4.0] --------------------------------------------------------------------------------
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Training model...
Downloading artifacts: 0%| | 0/1 [00:00<?, ?it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run vary_D_difference_2.0 at: http://localhost:8093/#/experiments/91/runs/e663660fa065401b8405fc881b196afd 🧪 View experiment at: http://localhost:8093/#/experiments/91 TEST RESULTS: loss = 0.04083 Evaluating model on test data...
================================================================================