简介

该项目是一个基于深度学习的文本分类任务,旨在通过分析SQL查询语句和网页内容识别出潜在的SQL注入(SQLInjection)和跨站脚本攻击(XSS)。通过对原始数据进行预处理、特征提取以及构建混合模型,该模型能够利用字符级和符号级输入信息,实现对恶意攻击类别的精准预测

实验步骤

**1. 数据加载与预处理:**读取CSV格式的数据集,移除文本中的注释,并将文本转化为字符索引序列和符号索引序列,同时对数据进行划分,得到训练集、验证集和测试集。
**2. 特征工程:**使用Keras库的tf.keras.preprocessing.sequence.pad_sequences方法对序列进行填充或截断,使得所有样本长度一致,便于模型处理。
**3. 模型设计:**创建一个混合深度学习模型结构,分别处理字符和符号输入。模型包含嵌入层、卷积神经网络层、双向GRU层、多头注意力机制和全连接层,最终输出类别概率。
**4. 模型编译与训练:**配置模型优化器、损失函数及评估指标,然后编译模型并进行训练,同时利用验证集进行模型性能监控和超参数调整。
**5. 模型评估与预测:**在测试集上运行模型以获取预测结果,并计算准确率、精确率和召回率等评价指标,以衡量模型的实际表现。

实现代码

导导导

# 导入用于交互式显示的模块
from IPython.display import clear_output
# 导入科学计算库numpy
import numpy as np
# 导入数据处理库pandas
import pandas as pd
# 导入深度学习库tensorflow
import tensorflow as tf
# 导入keras中的神经网络层
from tensorflow.keras import layers
# 导入Add层,用于层间相加操作
from tensorflow.keras.layers import Add
# 导入scikit-learn中的数据集划分方法
from sklearn.model_selection import train_test_split
# 导入scikit-learn中的评价指标
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score
# 导入tensorflow的keras接口
from tensorflow import keras
# 导入IPython.display中的SVG和Image类,用于显示图像
from IPython.display import SVG, Image
# 导入matplotlib.pyplot库,用于数据可视化
import matplotlib.pyplot as plt
# 导入正则表达式模块
import re

数据加载与预处理

移除文本中的注释。

df = pd.read_csv('/kaggle/input/sqli-xss-dataset/SQLInjection_XSS_MixDataset.1.0.0.csv')
def remove_comment(text):
    # 使用正则表达式移除单行和多行注释
    text = re.sub('//.*?\n|/\*.*?\*/', '', text, flags=re.S)
    # 移除以 "--" 开头的注释
    text = text.split('--')[0]+"--"
    # 如果文本中含有单引号,移除单引号前的部分
    if '\'' in text :
      removeTarget = text.split('\'')[0]
      text = text.replace(removeTarget, "")
    print("After process: ",text)
    return text

将数据集X中的字符串转换为字符索引的序列,以便于后续的处理和模型输入。

def data2char_index(X, max_len, is_remove_comment = False):
    alphabet = " abcdefghijklmnopqrstuvwxyz0123456789-,;.!?:'\"/\\|_@#$%^&*~`+-=<>()[]{}"  # 定义字符集
    result = [] 
    for data in X:
        mat = []
        if is_remove_comment == True : 
          data = remove_comment(data)  # 移除字符串中的注释
        for ch in data:
            ch = ch.lower()  # 将字符转换为小写
            if ch not in alphabet:
                continue  # 如果字符不在字符集中,则跳过
            mat.append(alphabet.index(ch))  # 将字符转换为在字符集中的索引
        result.append(mat)
    # 使用Keras的预处理工具对序列进行填充或截断,统一长度
    X_char = tf.keras.preprocessing.sequence.pad_sequences(np.array(result, dtype=object), padding='post',truncating='post', maxlen=max_len)
    return X_char

特征工程

将数据集中的字符串转换为符号标记序列。

def data_to_symbol_tag(X, max_len ,is_remove_comment = False):
    symbol = " -,;.!?:'\"/\\|_@#$%^&*~`+-=<>()[]{}"  # 定义考虑的符号集合
    result = [] 
    for data in X:
        mat = []
        if is_remove_comment == True : 
          data = remove_comment(data)  # 移除字符串中的注释
        for ch in data:
            ch = ch.lower()  # 将字符转换为小写
            if ch not in symbol:
                mat.append(0)  # 如果字符不在符号集合中,则添加0
            else :
                mat.append(symbol.index(ch))  # 如果字符在符号集合中,则添加其索引
        result.append(mat)
    # 使用tf.keras.preprocessing.sequence.pad_sequences对结果进行填充或截断,以确保所有序列长度相同
    X_char = tf.keras.preprocessing.sequence.pad_sequences(np.array(result, dtype=object), padding='post',truncating='post', maxlen=max_len)
    return X_char
# 准备数据和标签
data = df['Sentence'].values  # 提取句子列的值
SQLInjection_label = df['SQLInjection'].values  # 提取SQL注入标签列的值
XSS = df['XSS'].values  # 提取XSS标签列的值
Normal = df['Normal'].values  # 提取正常标签列的值
# 将各类标签组合成一个二维数组,方便后续处理
label = np.array([SQLInjection_label,XSS,Normal])
label = label.T
# 划分数据集为训练集、验证集和测试集
trainX, testX, y_train, y_test = train_test_split(data, label, test_size=0.2)
# 从训练集中划分出验证集
trainX, x_val, y_train, y_val = train_test_split(trainX, y_train, test_size=0.2)

# 将训练集数据转换为字符索引形式,限定最大长度为1000
trainX_text = data2char_index(trainX,max_len=1000)
# 将测试集数据转换为字符索引形式,限定最大长度为1000
testX_text = data2char_index(testX,max_len=1000)
# 将训练集数据转换为符号标签形式,限定最大长度为1000
trainX_symbol = data_to_symbol_tag(trainX,max_len=1000)
# 将测试集数据转换为符号标签形式,限定最大长度为1000
testX_symbol = data_to_symbol_tag(testX,max_len=1000)
# 将验证集数据转换为字符索引形式,限定最大长度为1000
x_val_text = data2char_index(x_val,max_len=1000)
# 将验证集数据转换为符号标签形式,限定最大长度为1000
x_val_symbol = data_to_symbol_tag(x_val,max_len=1000)

# 打印训练集和测试集字符索引形式的数据形状
print(trainX_text.shape)
print(testX_text.shape)

模型设计

构建一个结合了文字和符号输入的深度学习模型结构。

def model_struct(max_len):
    # 初始化一些基本的模型参数
    pool_siz = 10
    num_heads = 3
    embed_dim = 100

    # 文字输入层和对应的嵌入层、卷积层、GRU层以及注意力机制
    input_text = tf.keras.layers.Input(shape=(max_len,))
    embed1 = tf.keras.layers.Embedding(input_dim=70,  output_dim=105, input_length=max_len, trainable=False)(input_text)
    cnn1 = tf.keras.layers.Conv1D(32, 3, padding='same', strides=1, activation='relu')(embed1)
    cnn1 = tf.keras.layers.MaxPooling1D(pool_size=pool_siz)(cnn1)
    GRU0 = layers.Bidirectional(tf.keras.layers.GRU(32, return_sequences=True, go_backwards=True))(cnn1)
    MHA0 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=1)(GRU0,GRU0)
    LayerNormalization0 = layers.LayerNormalization(epsilon=1e-6)(GRU0 + MHA0)
    x = tf.keras.Model(inputs=input_text, outputs=LayerNormalization0)
    
    # 符号输入层和对应的嵌入层、卷积层、GRU层以及注意力机制
    input_symbol = tf.keras.layers.Input(shape=(max_len,))
    embed2 = tf.keras.layers.Embedding(input_dim=34,  output_dim=51, input_length=max_len, trainable=False)(input_symbol)
    cnn1s = tf.keras.layers.Conv1D(32, 3, padding='same', strides=1, activation='relu')(embed2)
    cnn1s = tf.keras.layers.MaxPooling1D(pool_size=pool_siz)(cnn1s)
    GRU0s = layers.Bidirectional(tf.keras.layers.GRU(32, return_sequences=True, go_backwards=True))(cnn1s)
    MHA0s = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=1)(GRU0s,GRU0s)
    LayerNormalization0s = layers.LayerNormalization(epsilon=1e-6)(GRU0s + MHA0s)
    y = tf.keras.Model(inputs=input_symbol, outputs=LayerNormalization0s)
    
    # 将文字和符号的输出进行合并
    combined = tf.keras.layers.concatenate([x.output, y.output])
        
    # 对合并后的特征进行展平和全连接处理,得到最终的输出
    flat = tf.keras.layers.Flatten()(combined)
    dnn1 = tf.keras.layers.Dense(3, activation="softmax")(flat)

    # 构建最终的输入输出模型
    model = tf.keras.Model(inputs=[x.input, y.input], outputs=dnn1)
    return model

模型编译

#初始化模型结构
model = model_struct(max_len=1000)

#编译模型
model.compile("adam", "categorical_crossentropy", metrics=["accuracy"])

#打印模型总结信息
model.summary()

在这里插入图片描述

训练模型

history_log= model.fit([trainX_text,trainX_symbol], y_train, batch_size=64, epochs=20 ,validation_data=([x_val_text, x_val_symbol], y_val))

在这里插入图片描述

模型评估与预测

pred = model.predict([testX_text,testX_symbol])  # 使用模型对测试数据进行预测
y_pred = np.int64(pred>0.5)  # 将预测概率转化为二进制预测标签(大于0.51,否则为0)
accuracy = accuracy_score(y_test, y_pred)  # 计算预测的准确率
precision = precision_score(y_test, y_pred,average='micro')  # 计算预测的精确率
recall = recall_score(y_test, y_pred,average='micro')  # 计算预测的召回率
# 打印计算得到的准确率、精确率和召回率
print(" Accuracy : {0} \n Precision : {1} \n Recall : {2}".format(accuracy, precision, recall))

979/979 [==============================] - 55s 54ms/step
Accuracy : 0.9943501021450459
Precision : 0.9943501021450459
Recall : 0.9943501021450459

优化建议

  • 数据增强:考虑引入数据增强技术,如随机删除、替换或插入字符,以增加模型泛化能力。
  • 特征选择:进一步研究特征重要性,尝试剔除无关紧要的字符或符号,或者添加新的有意义特征。
  • 模型改进:实验不同类型的深度学习架构,比如Transformer模型,或者尝试更复杂的注意力机制来捕获长距离依赖关系。
  • 调参优化:通过网格搜索或随机搜索方法寻找最优模型超参数组合,包括学习率、批次大小、隐藏层大小等。
  • 集成学习:探索多个模型集成的方法,如投票、平均或堆叠集成,以提高整体预测性能。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐