Vary A¶
This notebook provides a structured framework for time series analysis using LSTM models. It includes functions for:
- Data loading and preprocessing
- Model configuration and training
- Evaluation and visualization
In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mlflow
import random
import os, sys
from pathlib import Path
from typing import Dict, Tuple, List
# Setup paths
lstm_exp = Path("/home/ytli/research/lstm")
sys.path.append(str(lstm_exp))
# Import custom modules
from modules.dataset import create_subject_sliding_windows, TimeSeriesWindowDataset, time_series_train_val_splits
from modules.model import LSTMRegressor
from modules.plot import plot_all_subjects_combined, plot_time_series_by_subject
# PyTorch imports
import torch
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from lightning.pytorch.callbacks import EarlyStopping
import lightning as L
# Set random seeds
def set_random_seeds(seed: int = 42):
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)
# Configure GPU if available
def setup_device():
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
return device
In [2]:
class TimeSeriesAnalysis:
def __init__(self, experiment_name: str, difference_value: float):
self.lstm_exp = Path("/home/ytli/research/lstm")
self.experiment_name = experiment_name
self.difference_value = difference_value
self.device = setup_device()
set_random_seeds()
# MLflow setup
mlflow.set_tracking_uri("http://localhost:8093")
mlflow.set_experiment(experiment_name)
def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Load the time series and subject information data."""
data_path = self.lstm_exp / 'experiments' / 'vary_A' / f'difference={self.difference_value}' / 'records.csv'
subject_info_path = self.lstm_exp / 'experiments' / 'vary_A' / f'difference={self.difference_value}' / 'subject_info.csv'
df = pd.read_csv(data_path)
subject_info_df = pd.read_csv(subject_info_path)
# Convert subject_id to string
df['subject_id'] = df['subject_id'].astype(str)
subject_info_df['subject_id'] = subject_info_df['subject_id'].astype(str)
return df, subject_info_df
def create_sliding_windows(self, df: pd.DataFrame, window_size: int = 5) -> Dict:
"""Create sliding windows for each subject."""
sliding_windows_dict = {}
for subject_id in df['subject_id'].unique():
subject_df = df[df['subject_id'] == subject_id]
sliding_windows, targets = create_subject_sliding_windows(
subject_df,
window_size=window_size,
stride=1,
feature_cols=['y'],
target_col='y',
)
sliding_windows_dict[subject_id] = {"X": sliding_windows, "y": targets}
return sliding_windows_dict
def prepare_datasets(self, sliding_windows_dict: Dict, test_size: int = 100, val_size: int = 100) -> Tuple[DataLoader, DataLoader, DataLoader]:
"""Prepare train, validation, and test datasets."""
# Split test data
subject_test_data = {}
subject_dev_data = {}
for subject_id, data in sliding_windows_dict.items():
X, y = data["X"], data["y"]
subject_test_data[subject_id] = {"X": X[-test_size:], "y": y[-test_size:]}
subject_dev_data[subject_id] = {"X": X[:-test_size], "y": y[:-test_size]}
# Create datasets
subject_test_dataset = {}
for subject_id, data in subject_test_data.items():
subject_test_dataset[subject_id] = TimeSeriesWindowDataset(subject_id, data["X"], data["y"])
subject_dev_dataset = time_series_train_val_splits(
data=subject_dev_data,
n_splits=5,
val_size=val_size,
)
# Combine datasets
last_train_fold_combined = []
last_val_fold_combined = []
for subject_id, data in subject_dev_dataset.items():
last_dev_fold = data[-1]
last_train_fold_combined.append(last_dev_fold['train'])
last_val_fold_combined.append(last_dev_fold['val'])
train_dataset = ConcatDataset(last_train_fold_combined)
val_dataset = ConcatDataset(last_val_fold_combined)
test_dataset = ConcatDataset([subject_test_dataset[subject_id] for subject_id in subject_test_dataset.keys()])
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
return train_loader, val_loader, test_loader
def train_model(self, train_loader: DataLoader, val_loader: DataLoader, test_loader: DataLoader) -> Tuple[LSTMRegressor, Dict]:
"""Train the LSTM model and evaluate it."""
mlflow.pytorch.autolog(log_every_n_epoch=1, log_every_n_step=10)
model = LSTMRegressor(input_size=1, hidden_size=64, num_layers=2, lr=1e-3)
early_stop_cb = EarlyStopping(
monitor="val_loss",
min_delta=0.00,
patience=3,
mode="min",
verbose=False,
)
torch.set_float32_matmul_precision('high')
with mlflow.start_run(run_name=f"Vary_A_difference_{self.difference_value}"):
trainer = L.Trainer(
max_epochs=50,
accelerator="gpu",
devices=1,
deterministic=True,
check_val_every_n_epoch=1,
logger=False,
enable_checkpointing=False,
enable_progress_bar=False,
enable_model_summary=False,
callbacks=[early_stop_cb],
)
trainer.fit(model, train_loader, val_loader)
best_model = mlflow.pytorch.load_checkpoint(LSTMRegressor, run_id=mlflow.active_run().info.run_id)
test_results = trainer.test(best_model, test_loader, verbose=False)
mlflow.log_metrics({"test_loss": test_results[0]["test_loss"]})
return best_model, test_results
def evaluate_model(self, model: LSTMRegressor, test_loader: DataLoader) -> Dict:
"""Evaluate the model on test data and prepare visualization data."""
subject_ids = [dataset.subject_id for dataset in test_loader.dataset.datasets]
test_eval_data = {subject_id: {'pred': [], 'target': []} for subject_id in subject_ids}
with torch.no_grad():
model = model.to(self.device)
model.eval()
for dataset in test_loader.dataset.datasets:
subject_id = dataset.subject_id
tmp_loader = DataLoader(dataset, batch_size=32, shuffle=False)
for batch in tmp_loader:
X, y = batch
X = X.to(self.device)
y = y.to(self.device)
pred = model(X).cpu().numpy()
test_eval_data[subject_id]['pred'].extend(pred)
test_eval_data[subject_id]['target'].extend(y.cpu().numpy())
return test_eval_data
Difference = 0.5¶
In [3]:
# Initialize the analysis
analysis = TimeSeriesAnalysis(experiment_name="test3", difference_value=0.5)
# Load and prepare data
df, subject_info_df = analysis.load_data()
plot_time_series_by_subject(df, subject_info_df).show()
sliding_windows_dict = analysis.create_sliding_windows(df)
train_loader, val_loader, test_loader = analysis.prepare_datasets(sliding_windows_dict)
# Train model
best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
print(f"Test Results: {test_results}")
# Evaluate and visualize
test_eval_data = analysis.evaluate_model(best_model, test_loader)
plot_all_subjects_combined(test_eval_data).show()
2025/05/07 13:57:46 WARNING mlflow.utils.autologging_utils: MLflow pytorch autologging is known to be compatible with 1.9.0 <= torch <= 2.6.0, but the installed version is 2.6.0+cu124. If you encounter errors during autologging, try upgrading / downgrading torch to a compatible version, or try upgrading MLflow.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2025/05/07 13:57:54 WARNING mlflow.models.model: Model logged without a signature and input example. Please set `input_example` parameter when logging the model to auto infer the model signature.
/home/ytli/miniconda3/envs/research/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning:
IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
Downloading artifacts: 100%|██████████| 1/1 [00:00<00:00, 4744.69it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run Vary_A_difference_0.5 at: http://localhost:8093/#/experiments/91/runs/40b3e8f19d2c40b99a3107cee851d0bd
🧪 View experiment at: http://localhost:8093/#/experiments/91
Test Results: [{'test_loss': 0.0001478465273976326}]
Difference = 1¶
In [4]:
# Initialize the analysis
analysis = TimeSeriesAnalysis(experiment_name="test3", difference_value=1)
# Load and prepare data
df, subject_info_df = analysis.load_data()
plot_time_series_by_subject(df, subject_info_df).show()
sliding_windows_dict = analysis.create_sliding_windows(df)
train_loader, val_loader, test_loader = analysis.prepare_datasets(sliding_windows_dict)
# Train model
best_model, test_results = analysis.train_model(train_loader, val_loader, test_loader)
print(f"Test Results: {test_results}")
# Evaluate and visualize
test_eval_data = analysis.evaluate_model(best_model, test_loader)
plot_all_subjects_combined(test_eval_data).show()
2025/05/07 13:57:55 WARNING mlflow.utils.autologging_utils: MLflow pytorch autologging is known to be compatible with 1.9.0 <= torch <= 2.6.0, but the installed version is 2.6.0+cu124. If you encounter errors during autologging, try upgrading / downgrading torch to a compatible version, or try upgrading MLflow. INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2025/05/07 13:58:05 WARNING mlflow.models.model: Model logged without a signature and input example. Please set `input_example` parameter when logging the model to auto infer the model signature.
Downloading artifacts: 100%|██████████| 1/1 [00:00<00:00, 3483.64it/s]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
🏃 View run Vary_A_difference_1 at: http://localhost:8093/#/experiments/91/runs/72755af34243468fbec5fbaa85415f39
🧪 View experiment at: http://localhost:8093/#/experiments/91
Test Results: [{'test_loss': 5.3055639000376686e-05}]