본 포스팅은 이전 포스팅과 같은 내용을 TensorFlow가 아닌 PyTorch로 구현한 내용입니다.
모델의 구성은 다음과 같습니다. 이전 포스팅의 TensorFloe 모델과 같은 구조입니다.
따라서, TensorFlow와 PyTorch 코드를 비교해서 보시면 더욱 좋을 것 같습니다.
아래는 전체 코드입니다.
[train.py]
# Import library
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import click
import pickle
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import torchsummary
def generate_data_frame(data_dir):
dir = Path(data_dir)
filepaths = list(dir.glob(r'**/*.jpg'))
labels = [str(filepaths[i]).split("\\")[-2] \
for i in range(len(filepaths))]
filepath = pd.Series(filepaths, name='Filepath').astype(str)
labels = pd.Series(labels, name='Label')
df = pd.concat([filepath, labels], axis=1)
df = df.sample(frac=1, random_state=0).reset_index(drop=True)
return df
class ImageDataGenerator(Dataset):
def __init__(self, image_paths, labels):
self.image_list = []
self.label_list = []
for i in range(len(image_paths)):
image = cv2.imread(image_paths[i], cv2.IMREAD_COLOR)
image = cv2.resize(image, (227, 227))
label = labels[i]
self.image_list.append(image)
self.label_list.append(label)
def __len__(self):
return len(self.image_list)
def __getitem__(self, index):
images = torch.FloatTensor(self.image_list[index]).permute(2, 0, 1)
labels = torch.LongTensor([self.label_list[index]])
return images, labels
class MyModel(nn.Module):
def __init__(self, class_num):
super(MyModel, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=96, kernel_size=(11, 11), stride=4, padding=0),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(in_channels=96, out_channels=256, kernel_size=(5, 5), stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(in_channels=256, out_channels=384, kernel_size=(3, 3), stride=1, padding=1),
nn.ReLU(),
nn.Conv2d(in_channels=384, out_channels=384, kernel_size=(3, 3), stride=1, padding=1),
nn.ReLU(),
nn.Conv2d(in_channels=384, out_channels=256, kernel_size=(3, 3), stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2)
)
self.classifier = nn.Sequential(
nn.Dropout(p=0.5),
nn.Linear(in_features=(256 * 6 * 6), out_features=4096),
nn.ReLU(),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096),
nn.ReLU(),
nn.Linear(4096, class_num)
)
def forward(self, x):
x = self.features(x)
x = x.view(-1, 256 * 6 * 6)
x = self.classifier(x)
return x
@click.command()
@click.option('--data_dir', default='data/train', help='Data path')
@click.option('--batch_size', default=128, help='Batch size')
@click.option('--epochs', default=20, help='Epochs')
@click.option('--model_name', default='pytorch_model', help='Model name')
def run(data_dir, batch_size, epochs, model_name):
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'{device} is available')
# Load data
df = generate_data_frame(data_dir)
print('========== Data shape ==========')
print(df.shape)
print('========== Data ================')
print(df)
print()
# Label encoding
class_label = LabelEncoder()
df['Label'] = class_label.fit_transform(df['Label'].values)
print('========== Class ================')
print(np.sort(df['Label'].unique()))
print('========== Class number =========')
class_num = len(df['Label'].unique())
print(class_num)
# Separate dataset
train_df, valid_df = train_test_split(df, test_size=0.2, random_state=0)
train_dataset = ImageDataGenerator(train_df['Filepath'].tolist(), train_df['Label'].tolist())
train_loader = DataLoader(train_dataset, batch_size=batch_size)
valid_dataset = ImageDataGenerator(valid_df['Filepath'].tolist(), valid_df['Label'].tolist())
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)
# Define model
model = MyModel(class_num=class_num).to(device)
torchsummary.summary(model, input_size=(3, 227, 227))
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_function = nn.CrossEntropyLoss().to(device)
# Train model
train_losses = []
valid_losses = []
avg_train_losses = []
avg_valid_losses = []
train_accs = []
valid_accs = []
avg_train_accs = []
avg_valid_accs = []
print('Start training..')
for epoch in range(1, epochs + 1):
# train
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
target = target.squeeze(dim=-1)
loss = loss_function(output, target)
loss.backward()
optimizer.step()
train_losses.append(loss.item())
pred = torch.max(output, 1)[1]
train_accs.append(((target == pred).sum() / len(target)).cpu())
# validation
model.eval()
with torch.no_grad():
for data, target in valid_loader:
data, target = data.to(device), target.to(device)
output = model(data)
target = target.squeeze(dim=-1)
loss = loss_function(output, target)
valid_losses.append(loss.item())
pred = torch.max(output, 1)[1]
valid_accs.append(((target == pred).sum() / len(target)).cpu())
train_loss = np.average(train_losses)
valid_loss = np.average(valid_losses)
avg_train_losses.append(train_loss)
avg_valid_losses.append(valid_loss)
train_acc = np.average(train_accs)
valid_acc = np.average(valid_accs)
avg_train_accs.append(train_acc)
avg_valid_accs.append(valid_acc)
train_losses = []
valid_losses = []
train_accs = []
valid_accs = []
epoch_len = len(str(epochs))
print_msg = (f'Epoch {epoch:>{epoch_len}}/{epochs:>{epoch_len}} ' +
f'loss: {train_loss:.5f} ' +
f'accuracy: {train_acc:.5f} ' +
f'val_loss: {valid_loss:.5f} ' +
f'val_accuracy: {valid_acc:.5f}')
print(print_msg)
history = {'loss': avg_train_losses, 'val_loss': avg_valid_losses,
'accuracy': avg_train_accs, 'val_accuracy': avg_valid_accs}
pd.DataFrame(history)[['accuracy', 'val_accuracy']].plot()
plt.title("Accuracy")
plt.show()
pd.DataFrame(history)[['loss', 'val_loss']].plot()
plt.title("Loss")
plt.show()
torch.save(model.state_dict(), model_name + '.pt')
# Map the label
mapping = dict(zip(class_label.classes_, range(len(class_label.classes_))))
labels = (mapping)
labels = dict((v, k) for k, v in labels.items())
with open('labels.pkl', 'wb') as f:
pickle.dump(labels, f)
if __name__ == '__main__':
run()
[test.py]
import click
import numpy as np
import pandas as pd
import pickle
import os
import cv2
import torch
import torch.nn as nn
def load_data(images_dir):
name_list = []
image_list = []
files = os.listdir(images_dir)
for file in files:
try:
path = images_dir + '/' + file
image = cv2.imread(path, cv2.IMREAD_COLOR)
image = cv2.resize(image, (227, 227))
name_list.append(file)
image_list.append(image)
except FileNotFoundError as e:
print('ERROR : ', e)
print('torch.FloatTensor(image_list).shape', torch.FloatTensor(image_list).shape)
images = torch.FloatTensor(image_list).permute(0, 3, 1, 2)
names = np.array(name_list)
return names, images
class MyModel(nn.Module):
def __init__(self, class_num):
super(MyModel, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=96, kernel_size=(11, 11), stride=4, padding=0),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(in_channels=96, out_channels=256, kernel_size=(5, 5), stride=1, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(in_channels=256, out_channels=384, kernel_size=(3, 3), stride=1, padding=1),
nn.ReLU(),
nn.Conv2d(in_channels=384, out_channels=384, kernel_size=(3, 3), stride=1, padding=1),
nn.ReLU(),
nn.Conv2d(in_channels=384, out_channels=256, kernel_size=(3, 3), stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2)
)
self.classifier = nn.Sequential(
nn.Dropout(p=0.5),
nn.Linear(in_features=(256 * 6 * 6), out_features=4096),
nn.ReLU(),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096),
nn.ReLU(),
nn.Linear(4096, class_num)
)
def forward(self, x):
x = self.features(x)
x = x.contiguous().view(-1, 256 * 6 * 6)
x = self.classifier(x)
return x
@click.command()
@click.option('--data_dir', default='data/test', help='Data path')
@click.option('--model_name', default='pytorch_model', help='Model name')
def run(data_dir, model_name):
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'{device} is available')
image_names, X_test = load_data(data_dir)
loaded_model = MyModel(8).to(device)
loaded_model.load_state_dict(torch.load(model_name + '.pt'))
loaded_model.eval()
with torch.no_grad():
data = X_test.to(device)
pred = loaded_model(data)
pred = np.argmax(pred.cpu(), axis=1).detach().numpy()
with open('labels.pkl', 'rb') as f:
labels = pickle.load(f)
pred = [labels[k] for k in pred]
results_df = pd.DataFrame({'image_names': image_names, 'class': pred})
results_df.to_csv('pytorch_results.csv', index=False)
if __name__ == '__main__':
run()
'AI > TensorFlow & PyTorch' 카테고리의 다른 글
[PyTorch] Faster R-CNN을 이용한 객체 탐지 (0) | 2023.07.09 |
---|---|
[TensorFlow] InceptionV3을 이용한 이미지 검색 (0) | 2023.06.19 |
[TensorFlow] CNNs을 이용한 이미지 분류 (0) | 2023.06.15 |
[TensorFlow] GAN 예제 코드 (1) | 2023.06.10 |
[TensorFlow] 오토 인코더를 이용한 이미지 노이즈 제거 (0) | 2023.06.08 |