# -*- coding: utf-8 -*- """ Created on Wed Apr 1 08:55:40 2020 @author: sugis """ import glob import os.path as osp import random import torch import torch.nn as nn import torch.optim as optim from tqdm import tqdm from torch.optim import lr_scheduler from torchvision import models, transforms from PIL import Image import matplotlib.pyplot as plt import torch.utils.data as data import torchvision import numpy as np import cv2 from torch.autograd import Function Image.LOAD_TRUNCATED_IMAGES = True import math as math # In[ ]: class ImageTransform(): def __init__(self,resize,mean,std): self.data_transform={ 'train':transforms.Compose([ transforms.RandomResizedCrop( resize,scale=(0.5,1.0)), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize(mean,std) ]), 'val':transforms.Compose([ transforms.Resize(resize), transforms.CenterCrop(resize), # 画像中央をresize×resizeで切り取り transforms.ToTensor(), transforms.Normalize(mean,std) ]) } def __call__(self, img,phase='train'): return self.data_transform[phase](img) size = 224 mean = (0.485, 0.456, 0.406) std = (0.229, 0.224, 0.225) #transform_test=ImageTransform(size, mean, std) #img_transformed_test = transform_test(img, phase='train') # 乱数のシードを設定 torch.manual_seed(1234) np.random.seed(1234) random.seed(1234) torch.backends.cudnn.deterministic =True torch.backends.cudnn.benchmark=False # 入力画像の前処理をするクラス # 訓練時と推論時で処理が異なる # In[ ]: def make_datapath_list(phase='train'): rootpath="./bridgedata/" target_path=osp.join(rootpath+phase+'/**/*.jpg') path_list=[] for path in glob.glob(target_path): path_list.append(path) return path_list # 実行 train_list = make_datapath_list(phase="train") val_list = make_datapath_list(phase="val") print(len(train_list)) # In[ ]: class MyDatasets(data.Dataset): def __init__(self, file_list,transform = None,phase='train'): self.file_list=file_list self.transform = transform self.phase = phase def __len__(self): return len(self.file_list) def __getitem__(self, index): img_path=self.file_list[index] img=Image.open(img_path) img_transformed=self.transform( img,self.phase) fp=osp.basename(osp.dirname(img_path)) if self.phase=='train': label=fp elif self.phase=='val': label=fp if label=='girder': label=0 elif label=='nogirder': label=1 return img_transformed,label train_dataset = MyDatasets( file_list=train_list, transform=ImageTransform(size, mean, std), phase='train') val_dataset = MyDatasets( file_list=val_list, transform=ImageTransform(size, mean, std), phase='val') all_size=len(train_dataset) #train_dataset, val_dataset = data.random_split(train_dataset1, [math.floor(all_size*0.6),all_size-math.floor(all_size*0.6)]) ''' # 動作確認 index = 10 print(train_dataset.__getitem__(index)[0].size()) print(train_dataset.__getitem__(index)[1]) def show(img): npimg = img.numpy() plt.imshow(np.transpose(npimg, (1,2,0)), interpolation='nearest') show(torchvision.utils.make_grid(train_dataset.__getitem__(index)[0], padding=1)) ''' batch_size=12 # DataLoaderを作成 train_dataloader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True) val_dataloader = torch.utils.data.DataLoader( val_dataset, batch_size=batch_size, shuffle=False) dataloaders_dict = {"train": train_dataloader, "val": val_dataloader} # 動作確認 batch_iterator = iter(dataloaders_dict["train"]) # イテレータに変換 inputs, labels = next( batch_iterator) # 1番目の要素を取り出す print(inputs.size()) print(labels) modelselect=1 if modelselect==0: # In[ ]:モデルを1から作る場合 #全結合の次元を計算 size_check = torch.FloatTensor(batch_size, 3, size, size) features = nn.Sequential( nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), nn.Conv2d(64, 192, kernel_size=5, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), nn.Conv2d(192, 384, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), ) #バッチサイズ10, 6×6のフィルターが256枚 #10バッチは残して、6×6×256を1次元に落とす=>6×6×256=9216 #print(features(size_check).size()) #バッチ10の値を軸にして残りの次元を1次元へ落とした場合のTensorの形状をチェックすると9216。 #print(features(size_check).view(size_check.size(0), -1).size()) #fc_sizeを全結合の形状として保持しておく fc_size = features(size_check).view(size_check.size(0), -1).size()[1] print(fc_size) num_classes = 2#分類するクラスの数 class AlexNet(nn.Module): #fc_sizeを引き渡す def __init__(self, num_classes, fc_size): super(AlexNet, self).__init__() self.features = nn.Sequential( nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), nn.Conv2d(64, 192, kernel_size=5, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), nn.Conv2d(192, 384, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), ) #fc_sizeで計算した形状を指定 self.classifier = nn.Sequential( nn.Dropout(p=0.5), nn.Linear(fc_size, 4096), nn.ReLU(inplace=True), nn.Dropout(p=0.5), nn.Linear(4096, 4096), nn.ReLU(inplace=True), nn.Linear(4096, num_classes) ) def forward(self, x): x = self.features(x) x = x.view(x.size(0), -1) x = self.classifier(x) return x device = 'cuda' if torch.cuda.is_available() else 'cpu' net = AlexNet(num_classes, fc_size).to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4) EPOCH=30 num_epochs = EPOCH train_loss_value=[] #trainingのlossを保持するlist train_acc_value=[] #trainingのaccuracyを保持するlist test_loss_value=[] #testのlossを保持するlist test_acc_value=[] #testのaccuracyを保持するlist #print(net) else: # In[ ]:VGG16モデルをファインチューニングする場合 # 学習済みのVGG-16モデルをロード # VGG-16モデルのインスタンスを生成 use_pretrained = True # 学習済みのパラメータを使用 net = models.vgg16(pretrained=use_pretrained) # VGG16の最後の出力層の出力ユニットをアリとハチの2つに付け替える net.classifier[6] = nn.Linear(in_features=4096, out_features=2)#分類するクラスの数 # 訓練モードに設定 net.train() #print('ネットワーク設定完了:学習済みの重みをロードし、訓練モードに設定しました') # 損失関数の設定 criterion = nn.CrossEntropyLoss() # ファインチューニングで学習させるパラメータを、変数params_to_updateの1~3に格納する params_to_update_1 = [] params_to_update_2 = [] params_to_update_3 = [] # 学習させる層のパラメータ名を指定 update_param_names_1 = ["features"] update_param_names_2 = ["classifier.0.weight","classifier.0.bias", "classifier.3.weight", "classifier.3.bias"] update_param_names_3 = ["classifier.6.weight", "classifier.6.bias"] # パラメータごとに各リストに格納する for name, param in net.named_parameters(): if update_param_names_1[0] in name: param.requires_grad = True params_to_update_1.append(param) print("params_to_update_1に格納:", name) elif name in update_param_names_2: param.requires_grad = True params_to_update_2.append(param) print("params_to_update_2に格納:", name) elif name in update_param_names_3: param.requires_grad = True params_to_update_3.append(param) print("params_to_update_3に格納:", name) else: param.requires_grad = False print("勾配計算なし。学習しない:", name) # 最適化手法の設定 optimizer = optim.SGD([ {'params': params_to_update_1, 'lr': 1e-4}, {'params': params_to_update_2, 'lr': 5e-4}, {'params': params_to_update_3, 'lr': 1e-3} ], momentum=0.9) # In[ ]: def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs): epochn=[] epoch_t_loss=[] epoch_v_loss=[] epoch_t_acc=[] epoch_v_acc=[] # 初期設定 # GPUが使えるかを確認 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print("使用デバイス:", device) # ネットワークをGPUへ net.to(device) # ネットワークがある程度固定であれば、高速化させる torch.backends.cudnn.benchmark = True # epochのループ for epoch in range(num_epochs): print('Epoch {}/{}'.format(epoch+1, num_epochs)) print('-------------') # epochごとの訓練と検証のループ for phase in ['train', 'val']: if phase == 'train': net.train() # モデルを訓練モードに else: net.eval() # モデルを検証モードに epoch_loss = 0.0 # epochの損失和 epoch_corrects = 0 # epochの正解数 # 未学習時の検証性能を確かめるため、epoch=0の訓練は省略 if (epoch == 0) and (phase == 'train'): continue # データローダーからミニバッチを取り出すループ for inputs, labels in tqdm(dataloaders_dict[phase]): # GPUが使えるならGPUにデータを送る inputs = inputs.to(device) labels = labels.to(device) # optimizerを初期化 optimizer.zero_grad() # 順伝搬(forward)計算 with torch.set_grad_enabled(phase == 'train'): outputs = net(inputs) loss = criterion(outputs, labels) # 損失を計算 _, preds = torch.max(outputs, 1) # ラベルを予測 #print(inputs) #print(labels) #print(torch.max(outputs, 1)) # 訓練時はバックプロパゲーション if phase == 'train': loss.backward() optimizer.step() # 結果の計算 epoch_loss += loss.item() * inputs.size(0) # lossの合計を更新 # 正解数の合計を更新 epoch_corrects += torch.sum(preds == labels.data) # epochごとのlossと正解率を表示 epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset) epoch_acc = epoch_corrects.double( ) / len(dataloaders_dict[phase].dataset) print('{} Loss: {:.4f} Acc: {:.4f}'.format( phase, epoch_loss, epoch_acc)) if phase == 'train': epoch_t_loss.append(epoch_loss) # モデルを訓練モードに epoch_t_acc.append(epoch_acc) # モデルを訓練モードに epochn.append(epoch) else: epoch_v_loss.append(epoch_loss) # モデルを検証モードに epoch_v_acc.append(epoch_acc) # モデルを検証モードに return epochn,epoch_t_loss,epoch_v_loss,epoch_t_acc,epoch_v_acc # 学習・検証を実行する num_epochs=20 te=train_model(net, dataloaders_dict, criterion, optimizer, num_epochs=num_epochs) plt.figure() plt.title("TrainLoss") plt.xlabel("epoc") plt.ylabel("Loss") # Traing score と Test score をプロット plt.plot(te[0], te[1], 'o-', color="r", label="Training loss") plt.plot(te[0], te[2][1:], 'o-', color="g", label="Val loss") plt.legend(loc="best") plt.figure() plt.title("TrainAcc") plt.xlabel("epoc") plt.ylabel("Acc") # Traing score と Test score をプロット plt.plot(te[0], te[3], 'o-', color="r", label="Training Acc") plt.plot(te[0], te[4][1:], 'o-', color="g", label="Val Acc") plt.legend(loc="best") # PyTorchのネットワークパラメータの保存 if modelselect==0: save_path = './weights_cnn.pth' torch.save(net.state_dict(), save_path) else: save_path = './weights_fine_tuning.pth' torch.save(net.state_dict(), save_path) # PyTorchのネットワークパラメータのロード load_path = './weights_fine_tuning.pth' load_weights = torch.load(load_path) net.load_state_dict(load_weights) # GPU上で保存された重みをCPU上でロードする場合 load_weights = torch.load(load_path, map_location={'cuda:0': 'cpu'}) net.load_state_dict(load_weights)