目录

1.Pytorch中LSTM和GRU模块的使用

1.1 LSTM的介绍

1.2 LSTM使用示例api

1.3 模型的损失函数

1.4 双向LSTM

1.5 LSTM和GRU的使用注意点

2.使用LSTM完成文本情感分类

2.1 修改模型


1.Pytorch中LSTM和GRU模块的使用

1.1 LSTM的介绍

1.2 LSTM使用示例api

 

"""
lstm的使用实例
"""
import torch.nn as nn
import torch


batch_size=10
seq_len=20#句子的长度
vocab_size=100#词典的数量
embedding_dim=30#用长度为30的向量表示一个词语

hidden_size=18
num_layer=2

#构造一个batch的数据
input=torch.randint(low=0,high=100,size=[batch_size,seq_len])#[10,20]

#数据经过embedding处理
embedding=nn.Embedding(vocab_size,embedding_dim=embedding_dim)
input_embeded=embedding(input)#[10,20,30]

#把embedding之后的数据传入lstm
lstm=nn.LSTM(input_size=embedding_dim,hidden_size=hidden_size,num_layers=num_layer,batch_first=True,bidirectional=True)#bidirectional:双向LSTM
output,(h_n,c_n)=lstm(input_embeded)
print(output.size())#[10,20,18]  ==>num_layer=2,则[10,20,18]   ==>若bidirectional=True,则[10,20,18*2]
print('*'*100)
print(h_n.size())#[1*1,10,18]  ==>num_layer=2,则[2*1,10,18]  ==>若bidirectional=True,则[2*2,10,18]
print('*'*100)
print(c_n.size())#[1,10,18]  ==>num_layer=2,则[2,10,18]  ==>若bidirectional=True,则[4,10,18]

#获取最后一个时间步上的输出
last_output=output[:,-1,:]
#获取最后一次的hidden_state
last_hidden_state=h_n[-1,:,:]
print(last_output==last_hidden_state)#全是1,说明是一样的

#获取双向LSTM中正向的最后一个时间步的output
#last_output=output[:,-1,:18]
#反向
last_output=output[:,0,18:]
#获取双向LSTM中正向的最后一个hidden_state
last_hidden_state=h_n[-2,:,:]

#反向
last_hidden_state=h_n[-1,:,:]
#1:第一层的正向
#-1:第一层的反向
#1:第二层的正向
#-1:第二层的反向
print(last_hidden_state.eq(last_output))

1.3 模型的损失函数

1.4 双向LSTM

1.5 LSTM和GRU的使用注意点

2.使用LSTM完成文本情感分类

2.1 修改模型

 

lib.py内容如下:

import pickle

ws=pickle.load(open('./model/ws.pkl','rb'))

max_len=200
batch_size=512
test_batch_size=1000

hidden_size=128
num_layers=2
bidriectional=True
dropout=0.4

word_sequence.py内容如下:

"""
实现的是:构建字典,实现方法把句子转化为数字序列和其翻转
"""


class Word2Sequence:
    UNK_TAG = 'UNK'  # 不常见的单词  标记
    PAD_TAG = "PAD"  # padding填充,即测试集中遇到新单词  标记
    UNK = 0
    PAD = 1

    def __init__(self):
        self.dict = {
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.count = {}  # 统计词频

    def fit(self, sentence):
        """
        把单个句子保存到dict中
        :param sentence:[word1,word2,word3,...]
        :return:
        """
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1

    def build_vocab(self, min=5, max=None, max_features=None):
        """
        生成词典
        :param min:最小出现的次数
        :param max:最大出现的次数
        :param max_features:一共保留多少个词语
        :return:
        """
        # 删除count中词频小于min的word
        if min is not None:
            self.count = {word: value for word, value in self.count.items() if value > min}
        # 删除count中词频大于max的word
        if max is not None:
            self.count = {word: value for word, value in self.count.items() if value < max}
        # 限制保留的词语数
        if max_features is not None:
            temp = sorted(self.count.items(), key=lambda x: x[-1], reverse=True)[:max_features]  # 降序,按照values值
            self.count = dict(temp)  # 转换为字典
        for word in self.count:
            self.dict[word] = len(self.dict)
        # 得到一个翻转的字典
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))

    def transform(self, sentence, max_len=None):
        """
        把句子转化为数字序列
        :param sentence:[word1,word2,...]
        :param max_len:int,对句子进行填充或裁剪裁剪
        :return:
        """
        if max_len is not None:
            if max_len > len(sentence):  # 填充
                sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))
            elif max_len < len(sentence):  # 裁剪
                sentence = sentence[:max_len]
        return [self.dict.get(word, self.UNK) for word in sentence]

    def inverse_transform(self, indices):
        """
        把序列转化为句子
        :param indices:[1,2,3,4,...]
        :return:
        """
        return [self.inverse_dict.get(idx) for idx in indices]

    def __len__(self):
        return len(self.dict)

if __name__ == '__main__':
    # ws=Word2Sequence()
    # ws.fit(['我', '是', '谁'])
    # ws.fit(['我', '是', '我'])
    # ws.build_vocab(min=0)
    # print(ws.dict)
    # ret=ws.transform(['我','爱','北京'],max_len=10)
    # print(ret)
    # ret=ws.inverse_transform(ret)
    # print(ret)

    pass


 dataset.py内容如下:

import jieba
from keras.datasets import imdb#情感文本分类数据集
import torch,os,re
from lib import *#导入模型
from torch.utils.data import DataLoader,Dataset



def tokenlize(content):
    re.sub('<.*?>',' ',content)#将特殊字符替换成空格
    fileters=["\.",":",'\t','\n','\x97','\x96','#','$','%','&']#删去这些字符
    content=re.sub('|'.join(fileters),' ',content)
    tokens=[i.strip() for i in content.split()]
    return tokens

class ImdbDataset(Dataset):
    def __init__(self,train=True):
        self.train_data_path=r'D:\各种编译器的代码\pythonProject12\机器学习\NLP自然语言处理\datas\IMDB文本情感分类数据集\aclImdb\train'
        self.test_data_path=r'D:\各种编译器的代码\pythonProject12\机器学习\NLP自然语言处理\datas\IMDB文本情感分类数据集\aclImdb\test'
        data_path=self.train_data_path if train else self.test_data_path

        #1.把所有的文件名放入列表
        temp_data_path=[os.path.join(data_path,'pos'),os.path.join(data_path,'neg')]#不需要\符号了   两个文件夹
        self.total_file_path=[]#所有评论文件的path
        for path in temp_data_path:
            file_name_list=os.listdir(path)
            file_path_list=[os.path.join(path,i) for i in file_name_list if i.endswith('.txt')]#当前文件夹中所有的文件名字
            self.total_file_path.extend(file_path_list)#正例 负例文件名字都在


    def __getitem__(self, index):
        file_path=self.total_file_path[index]
        #获取label
        label_str=file_path.split("\\")[-2]
        label=0 if label_str=='neg' else 1#文本类型数字化
        #获取内容
        tokens=tokenlize(open(file_path,'r',encoding='utf8').read())
        return tokens,label


    def __len__(self):
        return len(self.total_file_path)

def collate_fn(batch):
    """
    :param batch:([tokens,label],[tokens,label]...)
    :return:
    """
    content,label=zip(*batch)
    content=[ws.transform(i,max_len=max_len) for i in content]
    content=torch.LongTensor(content)
    label=torch.LongTensor(label)
    return content,label

def get_dataloader(train=True,batch_size=batch_size):
    imdb_dataset=ImdbDataset(train)
    data_loader=DataLoader(imdb_dataset,batch_size=128,shuffle=True,collate_fn=collate_fn)
    return data_loader



if __name__ == '__main__':
    for idx,(input,target) in enumerate(get_dataloader()):
        print(idx)
        print(input)
        print(target)
        break

main.py内容如下:

from word_sequence import Word2Sequence
import pickle,os
from dataset import tokenlize
from tqdm import tqdm#打印可迭代对象的进度条

if __name__ == '__main__':
    ws = Word2Sequence()
    path = r'D:\各种编译器的代码\pythonProject12\机器学习\NLP自然语言处理\datas\IMDB文本情感分类数据集\aclImdb\train'
    temp_data_path = [os.path.join(path, 'pos'), os.path.join(path, 'neg')]  # 不需要\符号了   两个文件夹
    for data_path in temp_data_path:
        file_paths=[os.path.join(data_path,file_name) for file_name in os.listdir(data_path) if file_name.endswith('.txt')]
        for file_path in tqdm(file_paths):
            sentence=tokenlize(open(file_path,'r',encoding='utf8').read())
            ws.fit(sentence)
    ws.build_vocab(min=10,max_features=10000)
    pickle.dump(ws,open(r'./model/ws.pkl','wb'))#保存ws
    print(len(ws))

model.py内容如下:

'''
定义模型

模型优化方法:
    添加一个新的全连接层作为输出层,激活函数处理
    把双向的lstm的结果output传给一个单向的LSTM再进行处理
'''
import torch,os
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from dataset import get_dataloader
from lib import *
import numpy as np
from tqdm import tqdm#打印进度条

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel,self).__init__()
        self.embedding=nn.Embedding(len(ws),100)
        #加入LSTM
        self.lstm=nn.LSTM(input_size=100,hidden_size=hidden_size,num_layers=num_layers,
                    batch_first=True,bidirectional=bidriectional,dropout=dropout)
        self.fc=nn.Linear(hidden_size*2,2)

    def forward(self,input):
        """
        :param input:[batch_size,max_len]
        :return:
        """
        x=self.embedding(input) #进行embedding操作,形状:[batch_size,max_len,100]
        #x:[batch_size,max_len,2*hidden_size],h_n:[2*2,batch_size,hidden_size]
        x,(h_n, c_n)=self.lstm(x)
        #获取两个方向最后一次的output,进行concat操作
        output_fw=h_n[-2,:,:]#正向最后一次的输出
        output_bw=h_n[-1,:,:]#反向最后一次的输出
        output=torch.concat([output_bw,output_fw],dim=-1)#[batch_size,hidden_size*2]

        out=self.fc(output)
        return F.log_softmax(out,dim=-1)

model=MyModel()
optimizer=Adam(model.parameters(),lr=0.001)

if os.path.exists('./model/model.pkl'):
    model.load_state_dict(torch.load('./model/model.pkl'))
    optimizer.load_state_dict(torch.load('./model/optimizer.pkl'))

def train(epoch):
    for idx,(input,target) in enumerate(get_dataloader(train=True)):
        optimizer.zero_grad()#梯度归0
        output=model(input)
        loss=F.nll_loss(output,target)
        loss.backward()
        optimizer.step()
        print(epoch,idx,loss.item())
        if idx%100==0:
            torch.save(model.state_dict(),'./model/model.pkl')
            torch.save(optimizer.state_dict(),'./model/optimizer.pkl')

def eval():
    loss_list=[]#损失列表
    acc_list=[]#准确率列表
    for idx,(input,target) in enumerate(get_dataloader(train=False,batch_size=test_batch_size)):
        with torch.no_grad():
            output=model(input)
            cur_loss=F.nll_loss(output,target)
            loss_list.append(cur_loss)
            #计算准确率
            pred=output.max(dim=-1)[-1]
            cur_acc=pred.eq(target).float().mean()
            acc_list.append(cur_acc)
    print('totol loss,acc:',np.mean(loss_list),np.mean(acc_list))

if __name__ == '__main__':
    for i in range(1):
        train(i)

运行model.py文件即可。运行时间可能比较长,耐心等待即可。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐