本文为看雪论坛优秀文章
看雪论坛作者ID:1900
一
前言
1、实验内容
2、实验环境
-
Python版本:3.6.13
-
Pytorch版本:1.8.1
-
CUDA版本:11.4
二
数据处理
import os
from torch.utils.data import Dataset
import glob
import pandas as pd
import torch
from torchvision import transforms
import cv2
from torchvision.transforms import InterpolationMode
from PIL import Image
class MalwareDataset(Dataset):
def __init__(self, file_path, is_train):
self.is_train = is_train
self.file_path = glob.glob(os.path.join(file_path, "*.png"))
self.len = len(self.file_path)
self.transforms_data = transforms.Compose([transforms.ToTensor()])
# 判断是否是获取训练数据集标志
if is_train:
train_label_path = os.path.join(file_path, "..", "trainLabels.csv")
df = pd.read_csv(train_label_path)
self.y_data = get_train_label(self.file_path, df)
self.y_data = torch.Tensor(self.y_data)
# 随机选择三种数据增强方法中的一种
transforms_choice = transforms.RandomChoice([transforms.RandomRotation(degrees=45,
interpolation=InterpolationMode.NEAREST,
expand=True),
transforms.RandomHorizontalFlip(p=0.4),
transforms.RandomVerticalFlip(p=0.4)])
# 数据增强以后在缩放到(224, 224)后转成tensor
self.transforms_data = transforms.Compose([transforms_choice,
transforms.Resize((224, 224), interpolation=InterpolationMode.NEAREST),
transforms.ToTensor()])
def __getitem__(self, index):
image = cv2.imread(self.file_path[index])
image = cv2.resize(image, (224, 224))
image = cv2.applyColorMap(image, cv2.COLORMAP_RAINBOW)
image = Image.fromarray(image)
image = self.transforms_data(image)
if self.is_train:
return image, self.y_data[index]
else:
file_name = get_file_name(self.file_path[index])
return image, file_name
def __len__(self):
return self.len
# 根据文件路径得出不带后缀的文件名
def get_file_name(file_path):
file_name_begin = file_path.rfind("/") + 1
file_name_end = file_path.rfind(".")
return file_path[file_name_begin:file_name_end]
# 从trainLabels.csv中获得文件名对应的类别
def get_train_label(file_path, df):
train_label = []
for fp in file_path:
file_name = get_file_name(fp)
train_label.append(df[df["Id"] == file_name]["Class"].astype(int).values[0] - 1)
return train_label
三
模型
1、卷积神经网络
2、VGG16
3、微调模型
import torch.nn as nn
from torch.hub import load_state_dict_from_url
__all__ = [
'VGG', 'vgg16', 'vgg16_bn'
]
model_urls = {
'vgg16': 'https://download.pytorch.org/models/vgg16-397923af.pth',
'vgg16_bn': 'https://download.pytorch.org/models/vgg16_bn-6c64b313.pth',
}
class VGG(nn.Module):
def __init__(self, features, num_classes=1000, init_weights=True):
super(VGG, self).__init__()
self.features = features
self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
self.classifier1 = nn.Sequential(
nn.Linear(512 * 7 * 7, 4096),
nn.ReLU(True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(True),
nn.Dropout(),
nn.Linear(4096, num_classes),
)
if init_weights:
self._initialize_weights()
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.classifier1(x)
return x
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
def make_layers(cfg, batch_norm=False):
layers = []
in_channels = 3
for v in cfg:
if v == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
if batch_norm:
layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
else:
layers += [conv2d, nn.ReLU(inplace=True)]
in_channels = v
return nn.Sequential(*layers)
cfgs = {
'D': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
}
def _vgg(arch, cfg, batch_norm, pretrained, progress, **kwargs):
if pretrained:
kwargs['init_weights'] = False
model = VGG(make_layers(cfgs[cfg], batch_norm=batch_norm), **kwargs)
if pretrained:
state_dict = load_state_dict_from_url(model_urls[arch],
progress=progress)
model.load_state_dict(state_dict, strict=False)
return model
def vgg16(pretrained=False, progress=True, **kwargs):
return _vgg('vgg16', 'D', False, pretrained, progress, **kwargs)
def vgg16_bn(pretrained=False, progress=True, **kwargs):
return _vgg('vgg16_bn', 'D', True, pretrained, progress, **kwargs)
# 冻结层
def set_parameter_requires_grad(model):
count = 0
for param in model.parameters():
param.requires_grad = False
if param.size()[0] == 512:
count += 1
if count == 6:
break
# 获取需要训练的参数
def train_param_number(model):
train_num = sum(param.numel() for param in model.parameters() if param.requires_grad)
print("train_num:%d" % train_num)
model = vgg16(num_classes=9)
train_param_number(model)
set_parameter_requires_grad(model)
train_param_number(model.classifier1)
......
train_num:134297417
train_num:119582729
四
参数
import os
class Configure:
# 设置数据集的路径
base_path = ""
train_gray_path = os.path.join(base_path, "train_gray_images")
test_gray_path = os.path.join(base_path, "test_gray_images")
submit_path = os.path.join(base_path, "submit.csv")
is_train = True # 用来设置是训练模型还是测试模型
batch_size = 8
num_workers = 2
epochs = 40
lr = 1e-3
decay = 0.0005
momentum = 0.9
model_path = "IMCFN.pth"
num_classes = 9
五
分类结果
import os
import sys
from MalwareDataset import MalwareDataset
from torch.utils.data import DataLoader
import torch
import torch.nn.functional as F
import pandas as pd
from Configure import Configure
from VGG import vgg16
# 冻结层
def set_parameter_requires_grad(model):
count = 0
for param in model.parameters():
param.requires_grad = False
if param.size()[0] == 512:
count += 1
if count == 6:
break
# 获取需要训练的参数
def train_param_number(model):
train_num = sum(param.numel() for param in model.parameters() if param.requires_grad)
print("train_num:%d" % train_num)
def load_model(model_path):
if not os.path.exists(model_path):
print("模型路径错误,模型加载失败")
sys.exit(0)
else:
return torch.load(model_path)
def save_model(target_model, model_path):
if os.path.exists(model_path):
os.remove(model_path)
torch.save(target_model, model_path)
def train(epoch):
for batch_idx, data in enumerate(train_loader, 0):
optimizer.zero_grad() # 梯度清0
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
y_pred = modeler(inputs) # 前向传播
loss = F.cross_entropy(y_pred, labels.long()) # 计算损失
if batch_idx % 100 == 99:
print("epoch=%d, loss=%f" % (epoch, loss.item()))
loss.backward() # 反向传播
optimizer.step() # 梯度更新
def test():
df = pd.read_csv(conf.submit_path)
with torch.no_grad():
for inputs, file_name in test_loader:
inputs = inputs.to(device)
outputs = modeler(inputs)
predicted = F.softmax(outputs.data, dim=1)
data_len = len(inputs)
for i in range(data_len):
dict_res = {"Id": file_name[i], "Prediction1": 0, "Prediction2": 0,
"Prediction3": 0, "Prediction4": 0, "Prediction5": 0, "Prediction6": 0,
"Prediction7": 0, "Prediction8": 0, "Prediction9": 0}
for j in range(9):
dict_res["Prediction" + str(j + 1)] = predicted[i][j].item()
df = df.append(dict_res, ignore_index=True)
df.to_csv(conf.submit_path, index=0)
if __name__ == '__main__':
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "5"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
conf = Configure()
test_dataset = MalwareDataset(conf.test_gray_path, False)
test_loader = DataLoader(test_dataset, batch_size=conf.batch_size,
shuffle=False, num_workers=conf.num_workers)
# 根据是否训练还选择是否加载保存的模型
if conf.is_train:
train_dataset = MalwareDataset(conf.train_gray_path, True)
train_loader = DataLoader(train_dataset, batch_size=conf.batch_size,
shuffle=True, num_workers=conf.num_workers)
modeler = vgg16(pretrained=True, num_classes=conf.num_classes)
else:
print("=====================开始加载模型================")
modeler = load_model(conf.model_path)
print("=====================模型加载完成================")
# train_param_number(modeler)
set_parameter_requires_grad(modeler)
# train_param_number(modeler)
modeler.to(device)
if conf.is_train:
optimizer = torch.optim.SGD(modeler.parameters(), lr=conf.lr,
weight_decay=conf.decay, momentum=conf.momentum)
print("=====================开始训练模型================")
for i in range(conf.epochs):
train(i)
print("=====================模型训练完成================")
save_model(modeler, conf.model_path)
print("=====================开始测试模型================")
test()
print("=====================模型测试完成================")
看雪ID:1900
https://bbs.pediy.com/user-home-835440.htm
# 往期推荐
球分享
球点赞
球在看
点击“阅读原文”,了解更多!
原文始发于微信公众号(看雪学苑):基于深度学习的恶意软件分类器