Vary B¶

This notebook provides a structured framework for time series analysis using LSTM models. It includes functions for:

  • Data loading and preprocessing
  • Model configuration and training
  • Evaluation and visualization
In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mlflow
import random
import os, sys
from pathlib import Path
from typing import Dict, Tuple, List

# Setup paths
lstm_exp = Path("/home/ytli/research/lstm")
sys.path.append(str(lstm_exp))

# Import custom modules
from modules.dataset import create_subject_sliding_windows, TimeSeriesWindowDataset, time_series_train_val_splits
from modules.model import LSTMRegressor
from modules.plot import plot_all_subjects_combined, plot_time_series_by_subject

# PyTorch imports
import torch
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from lightning.pytorch.callbacks import EarlyStopping
import lightning as L

# Set random seeds
def set_random_seeds(seed: int = 42):
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)

# Configure GPU if available
def setup_device():
    os.environ["CUDA_VISIBLE_DEVICES"] = "0"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    return device
In [2]:
class TimeSeriesAnalysis:
    def __init__(self, experiment_name: str, difference_value: float):
        self.lstm_exp = Path("/home/ytli/research/lstm")
        self.experiment_name = experiment_name
        self.difference_value = difference_value
        self.device = setup_device()
        set_random_seeds()
        
        # MLflow setup
        mlflow.set_tracking_uri("http://localhost:8093")
        mlflow.set_experiment(experiment_name)
        
    def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Load the time series and subject information data."""
        data_path = self.lstm_exp / 'experiments' / 'vary_B' / f'difference={self.difference_value}' / 'records.csv'
        subject_info_path = self.lstm_exp / 'experiments' / 'vary_B' / f'difference={self.difference_value}' / 'subject_info.csv'
        
        df = pd.read_csv(data_path)
        subject_info_df = pd.read_csv(subject_info_path)
        
        # Convert subject_id to string
        df['subject_id'] = df['subject_id'].astype(str)
        subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
        
        return df, subject_info_df
    
    def create_sliding_windows(self, df: pd.DataFrame, window_size: int = 5) -> Dict:
        """Create sliding windows for each subject."""
        sliding_windows_dict = {}
        for subject_id in df['subject_id'].unique():
            subject_df = df[df['subject_id'] == subject_id]
            sliding_windows, targets = create_subject_sliding_windows(
                subject_df,
                window_size=window_size,
                stride=1,
                feature_cols=['y'],
                target_col='y',
            )
            sliding_windows_dict[subject_id] = {"X": sliding_windows, "y": targets}
        return sliding_windows_dict
    
    def prepare_datasets(self, sliding_windows_dict: Dict, test_size: int = 100, val_size: int = 100) -> Tuple[DataLoader, DataLoader, DataLoader]:
        """Prepare train, validation, and test datasets."""
        # Split test data
        subject_test_data = {}
        subject_dev_data = {}
        for subject_id, data in sliding_windows_dict.items():
            X, y = data["X"], data["y"]
            subject_test_data[subject_id] = {"X": X[-test_size:], "y": y[-test_size:]}
            subject_dev_data[subject_id] = {"X": X[:-test_size], "y": y[:-test_size]}
        
        # Create datasets
        subject_test_dataset = {}
        for subject_id, data in subject_test_data.items():
            subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
        
        subject_dev_dataset = time_series_train_val_splits(
            data=subject_dev_data,
            n_splits=5,
            val_size=val_size,
        )
        
        # Combine datasets
        last_train_fold_combined = []
        last_val_fold_combined = []
        for subject_id, data in subject_dev_dataset.items():
            last_dev_fold = data[-1]
            last_train_fold_combined.append(last_dev_fold['train'])
            last_val_fold_combined.append(last_dev_fold['val'])
        
        train_dataset = ConcatDataset(last_train_fold_combined)
        val_dataset = ConcatDataset(last_val_fold_combined)
        test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
        
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
        
        return train_loader, val_loader, test_loader
    
    def train_model(self, train_loader: DataLoader, val_loader: DataLoader, test_loader: DataLoader) -> Tuple[LSTMRegressor, Dict]:
        """Train the LSTM model and evaluate it."""
        mlflow.pytorch.autolog(log_every_n_epoch=1, log_every_n_step=10)
        
        model = LSTMRegressor(input_size=1, hidden_size=64, num_layers=2, lr=1e-3)
        early_stop_cb = EarlyStopping(
            monitor="val_loss",
            min_delta=0.00,
            patience=3,
            mode="min",
            verbose=False,
        )
        
        torch.set_float32_matmul_precision('high')
        
        with mlflow.start_run(run_name=f"Vary_B_difference_{self.difference_value}"):
            trainer = L.Trainer(
                max_epochs=50,
                accelerator="gpu",
                devices=1,
                deterministic=True,
                check_val_every_n_epoch=1,
                logger=False,
                enable_checkpointing=False,
                enable_progress_bar=False,
                enable_model_summary=False,
                callbacks=[early_stop_cb],
            )
            
            trainer.fit(model, train_loader, val_loader)
            best_model = mlflow.pytorch.load_checkpoint(LSTMRegressor, run_id=mlflow.active_run().info.run_id)
            
            test_results = trainer.test(best_model, test_loader, verbose=False)
            mlflow.log_metrics({"test_loss": test_results[0]["test_loss"]})
        
        return best_model, test_results
    
    def evaluate_model(self, model: LSTMRegressor, test_loader: DataLoader) -> Dict:
        """Evaluate the model on test data and prepare visualization data."""
        subject_ids = [dataset.subject_id for dataset in test_loader.dataset.datasets]
        test_eval_data = {subject_id: {'pred': [], 'target': []} for subject_id in subject_ids}
        
        with torch.no_grad():
            model = model.to(self.device)
            model.eval()
            for dataset in test_loader.dataset.datasets:
                subject_id = dataset.subject_id
                tmp_loader = DataLoader(dataset, batch_size=32, shuffle=False)
                for batch in tmp_loader:
                    X, y = batch
                    X = X.to(self.device)
                    y = y.to(self.device)
                    pred = model(X).cpu().numpy()
                    test_eval_data[subject_id]['pred'].extend(pred)
                    test_eval_data[subject_id]['target'].extend(y.cpu().numpy())
        
        return test_eval_data

Difference = 0.5¶

In [3]:
# Initialize the analysis
analysis = TimeSeriesAnalysis(experiment_name="test3", difference_value=0.5)

# Load and prepare data
df, subject_info_df = analysis.load_data()
plot_time_series_by_subject(df, subject_info_df).show()
sliding_windows_dict = analysis.create_sliding_windows(df)
train_loader, val_loader, test_loader = analysis.prepare_datasets(sliding_windows_dict)

# Train model
best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
print(f"Test Results: {test_results}")

# Evaluate and visualize
test_eval_data = analysis.evaluate_model(best_model, test_loader)
plot_all_subjects_combined(test_eval_data).show()
2025/05/07 14:11:51 WARNING mlflow.utils.autologging_utils: MLflow pytorch autologging is known to be compatible with 1.9.0 <= torch <= 2.6.0, but the installed version is 2.6.0+cu124. If you encounter errors during autologging, try upgrading / downgrading torch to a compatible version, or try upgrading MLflow.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2025/05/07 14:12:01 WARNING mlflow.models.model: Model logged without a signature and input example. Please set `input_example` parameter when logging the model to auto infer the model signature.
/home/ytli/miniconda3/envs/research/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning:

IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html

Downloading artifacts: 100%|██████████| 1/1 [00:00<00:00, 2032.12it/s] 
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run Vary_B_difference_0.5 at: http://localhost:8093/#/experiments/91/runs/7a009bb46d3d43aa82715ec74c8cfc72
🧪 View experiment at: http://localhost:8093/#/experiments/91
Test Results: [{'test_loss': 0.00011697951413225383}]

Difference = 1¶

In [4]:
# Initialize the analysis
analysis = TimeSeriesAnalysis(experiment_name="test3", difference_value=1)

# Load and prepare data
df, subject_info_df = analysis.load_data()
plot_time_series_by_subject(df, subject_info_df).show()
sliding_windows_dict = analysis.create_sliding_windows(df)
train_loader, val_loader, test_loader = analysis.prepare_datasets(sliding_windows_dict)

# Train model
best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
print(f"Test Results: {test_results}")

# Evaluate and visualize
test_eval_data = analysis.evaluate_model(best_model, test_loader)
plot_all_subjects_combined(test_eval_data).show()
2025/05/07 14:12:01 WARNING mlflow.utils.autologging_utils: MLflow pytorch autologging is known to be compatible with 1.9.0 <= torch <= 2.6.0, but the installed version is 2.6.0+cu124. If you encounter errors during autologging, try upgrading / downgrading torch to a compatible version, or try upgrading MLflow.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2025/05/07 14:12:09 WARNING mlflow.models.model: Model logged without a signature and input example. Please set `input_example` parameter when logging the model to auto infer the model signature.
Downloading artifacts: 100%|██████████| 1/1 [00:00<00:00, 1418.91it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run Vary_B_difference_1 at: http://localhost:8093/#/experiments/91/runs/a94f7c3690f143d0970d86647e8e1c95
🧪 View experiment at: http://localhost:8093/#/experiments/91
Test Results: [{'test_loss': 0.0006105868378654122}]