Skip to content
This repository was archived by the owner on Sep 21, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

# Ignore the virtual environment
.venv/
venv/

WEAP_CV/
# Ignore Python cache files
__pycache__/
*.pyc
Expand Down
169 changes: 165 additions & 4 deletions datasets/data_loader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,129 @@
"""
Encapsulates DataLoader-related logic, including splitting and parallel loading.
Encapsulates DataLoader-related logic, including splitting and parallel loading and transformations.
"""
import torch
from torch.utils.data import DataLoader, random_split
import cv2
import numpy as np
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import DataLoader, random_split, Dataset

import os
import requests
import zipfile
from tqdm import tqdm # used for progress bars in downloading the dataset

# this function will download the unaugmented train and val datasets from roboflow
def download_roboflow_dataset(dataset_url, output_dir='stop_sign_dataset'):
"""
Download and extract dataset from Roboflow

Args:
dataset_url: URL to download the dataset from
output_dir: Directory where the dataset will be extracted
"""
# Create full paths
base_dir = os.path.dirname(os.path.abspath(__file__))
output_path = os.path.join(base_dir, output_dir)
zip_path = os.path.join(output_path, "temp_dataset.zip")

# Create directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)

# Console messages
print(f'Downloading dataset to {zip_path}...')
try:
response = requests.get(dataset_url, stream=True)
response.raise_for_status() # raise status to check for bad response

dataset_size = int(response.headers.get('content-length', 0))

with open(zip_path, 'wb') as file:
for data in tqdm(response.iter_content(chunk_size=1024), total=dataset_size, unit='B', unit_scale=True):
file.write(data)

print("\nDownload complete. Extracting files...")

# extract zips
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_path)

# delete downloaded zip file
os.remove(zip_path)
print(f'Extraction complete. Files extracted to {output_path}')
except requests.exceptions.RequestException as e:
print(f"Error downloading the dataset: {e}")


def apply_cv2_transforms(img):
"""
Applies OpenCV-based augmentations dynamically when images are loaded.
:param img: PIL Image (converted to OpenCV format)
:return: Transformed PIL Image
"""

img = np.array(img) # Convert PIL image to OpenCV format (NumPy array)
img = cv2.convertScaleAbs(img, alpha=np.random.uniform(0.9, 1.2), beta=np.random.randint(-15, 15)) # Reduced contrast variation + Reduced brightness shift

img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
img_hsv[..., 1] = img_hsv[..., 1] * np.random.uniform(0.85, 1.15)
img_hsv[..., 0] = img_hsv[..., 0] + np.random.randint(-5, 5)
img = cv2.cvtColor(img_hsv, cv2.COLOR_HSV2RGB)

if np.random.rand() < 0.2:
img = cv2.GaussianBlur(img, (3, 3), 0)

if np.random.rand() < 0.15:
kernel_size = 3
kernel_motion_blur = np.zeros((kernel_size, kernel_size))
kernel_motion_blur[int((kernel_size - 1) / 2), :] = np.ones(kernel_size)
kernel_motion_blur = kernel_motion_blur / kernel_size
img = cv2.filter2D(img, -1, kernel_motion_blur)

if np.random.rand() < 0.2:
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), np.random.randint(50, 90)]
_, enc_img = cv2.imencode('.jpg', img, encode_param)
img = cv2.imdecode(enc_img, cv2.IMREAD_UNCHANGED)

if np.random.rand() < 0.2:
noise = np.random.normal(0, 5, img.shape).astype(np.uint8)
img = cv2.add(img, noise)

return Image.fromarray(img)

def get_stop_sign_transforms():
"""
Returns the transformation pipeline for dynamic augmentation during training.
"""
return transforms.Compose([
transforms.Lambda(lambda img: apply_cv2_transforms(img)),
transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.3, hue=0.05),
transforms.RandomAffine(degrees=7, translate=(0.05, 0.05), scale=(0.95, 1.05)),
transforms.RandomHorizontalFlip(p=0.3),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

class StopSignDataset(Dataset):
"""
Custom dataset class for Stop Sign detection that applies transformations during training.
"""
def __init__(self, image_paths, labels, transform=None):
self.image_paths = image_paths
self.labels = labels
self.transform = transform

def __len__(self):
return len(self.image_paths)

def __getitem__(self, idx):
image = Image.open(self.image_paths[idx]).convert("RGB")
label = self.labels[idx]

augmented_image = self.transform(image) if self.transform else transforms.ToTensor()(image)

return augmented_image, label # Only return augmented image and label

def create_dataloader(dataset, batch_size=8, shuffle=True, num_workers=4):
"""
Creates and returns a DataLoader
Expand Down Expand Up @@ -39,13 +159,54 @@ def split_dataset(dataset, train_ratio=0.7, test_ratio=0.15, seed=42):
torch.manual_seed(seed)
return random_split(dataset, [train_size, test_size, val_size])

def create_train_val_test_loaders(dataset, batch_size=8, train_ratio=0.7, test_ratio=0.15, num_workers=4):
def create_train_val_test_loaders(image_paths, labels, batch_size=8, train_ratio=0.7, test_ratio=0.15, num_workers=4):
"""
Combines dataset splitting and DataLoader creation.
:return: (train_loader, test_loader, val_loader)
"""
dataset = StopSignDataset(image_paths=image_paths, labels=labels, transform=get_stop_sign_transforms())
train_dataset, test_dataset, val_dataset = split_dataset(dataset, train_ratio, test_ratio)
train_loader = create_dataloader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = create_dataloader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
val_loader = create_dataloader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
return train_loader, test_loader, val_loader
return train_loader, test_loader, val_loader


class StopSignDataset(Dataset):
"""
Custom dataset class for Stop Sign detection that applies transformations during training.
"""
def __init__(self, img_dir, transform=None):
self.img_dir = img_dir
self.transform = transform
self.img_paths = [os.path.join(img_dir, f) for f in os.listdir(img_dir) if f.endswith(".jpg")]

def __len__(self):
return len(self.img_paths)

def __getitem__(self, idx):
img_path = self.img_paths[idx]
img = Image.open(img_path).convert("RGB")
if self.transform:
img = self.transform(img)
return img

def connect_dataset_with_transforms(dataset_path):
"""
Connects the downloaded dataset with transformations.
Ensures dataset is properly loaded with augmentations before training.
"""
print("Applying transformations and preparing dataloaders...")

transform_pipeline = get_stop_sign_transforms()
dataset = StopSignDataset(dataset_path, transform=transform_pipeline)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=8, pin_memory=True, persistent_workers=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=8, pin_memory=True, persistent_workers=True)

print("Transformations applied successfully! Training and validation dataloaders ready.")
return train_loader, val_loader
7 changes: 7 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from datasets.data_loader import create_train_val_test_loaders
from torchvision import transforms
from train import train_model
import datasets.data_loader as data_loader

def main():
'''
data_dir = "./coco"

transform = transforms.Compose([
Expand All @@ -28,6 +30,7 @@ def main():
num_workers=2
)


model = nn.Sequential(
nn.Flatten(),
nn.Linear(3*224*224, 100),
Expand All @@ -44,6 +47,10 @@ def main():
lr=1e-3,
device=device
)
'''

# testing the dataloader
data_loader.download_roboflow_dataset('https://app.roboflow.com/ds/LXl5gthuky?key=9cEzxHzAiX')

if __name__ == '__main__':
main()
6 changes: 6 additions & 0 deletions src/3D/cam_to_lidar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import numpy as np

# draw center line,
center_coord = [0,0]

# get some test pics from KITTI
146 changes: 146 additions & 0 deletions src/models/detection_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np

'''
Define a small CNN.
We want 8 layers - conv1, batch + poo1, relu1, conv2, batch + pool2, relu2, fc1, fc2
Split up the two parts 1.Model 2.Classifier
'''
class SmallCNN(nn.Module):
def __init__(self, num_classes=2, input_size=128) -> None:
super().__init__()

# calculate size after convolution and pooling
feature_size = ((input_size - 2) // 2 - 2) // 2
# Calculate the flattened feature size
self.flat_features = 64 * feature_size * feature_size

# Small CNN part
self.stop_sign_cnn = nn.Sequential(
# First conv block
nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=0), # 64 features/outputs, input channels = 3 (RGB), square kernel of size 3,
nn.BatchNorm2d(32),
nn.MaxPool2d(kernel_size=2,stride=2),
nn.ReLU(),

# Second conv block
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
nn.BatchNorm2d(64),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.ReLU(),

# Dropout for regularization to avoid overfitting
nn.Dropout2d(0.20)
)

# Fully connected part
self.stop_sign_classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(self.flat_features, 128),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(128, num_classes)
)

def forward(self, x):
x = self.stop_sign_cnn(x)
x = self.stop_sign_classifier(x)
return x

# Create the model
detection_model = SmallCNN()

# Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
detection_model.to(device)

# define a loss function and optimizer
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(detection_model.parameters(), lr=0.001, betas=(0.5, 0.999), weight_decay=1e-4) # weigh decay for regularization

# learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.1)

def train_model(model, train_loader, val_loader, num_epochs = 25, ):
best = 0.0 # holds best accuracy

for epoch in range(num_epochs):
print(f'Epoch {epoch + 1}/{num_epochs}')
print('-' * 10)
running_loss = 0.0

model.train()
running_loss = 0.0
running_corrects = 0

for inputs, labels in train_loader:
print('.', end='', flush=True)
inputs = inputs.to(device)
labels = labels.to(device)

# zero the parameter gradients
optimizer.zero_grad()

# forward pass
outputs = model(inputs)
_, preds = torch.max(outputs, 1) # outputs is a tensor of shape [batch_size,2] and torch.max igores the first value
loss = loss_fcn(outputs, labels)

# backward pass and optimize
loss.backward()
optimizer.step()

# loss and corrects
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)

epoch_loss = running_loss / len(train_loader.dataset)
epoch_acc = running_corrects.double() / len(train_loader.dataset)

print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

# Validation phase
model.eval() # Set model to evaluate mode
val_loss = 0.0
val_corrects = 0

# No gradient calculation needed for validation
with torch.no_grad():
for inputs, labels in val_loader:
inputs = inputs.to(device)
labels = labels.to(device)

# Forward pass
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = loss_fcn(outputs, labels)

# Statistics
val_loss += loss.item() * inputs.size(0)
val_corrects += torch.sum(preds == labels.data)

val_loss = val_loss / len(val_loader.dataset)
val_acc = val_corrects.double() / len(val_loader.dataset)

print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

# Update learning rate based on validation loss
scheduler.step(val_loss)

# Save the best model
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')
print(f'New best model saved with accuracy: {val_acc:.4f}')

print(f'Best validation accuracy: {best_acc:.4f}')
return model





4 changes: 2 additions & 2 deletions src/models/lane_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
# Canny edge detection, black and white representation of edges in a frame
canny = cv2.Canny(grayFrame, 100, 200)
cv2.imshow("Canny edge detection", canny)

# Display the resulting frame
#cv2.imshow('F1tenth Onboard Video', frame)
# cv2.imshow('F1tenth Onboard Video', frame)

# define q as the exit button
if cv2.waitKey(25) & 0xFF == ord('q'):
Expand Down
Loading