本文實例講述了Python使用循環神經網絡解決文本分類問題的方法。分享給大家供大家參考,具體如下:
1、概念
1.1、循環神經網絡
循環神經網絡(Recurrent Neural Network, RNN)是一類以序列數據為輸入,在序列的演進方向進行遞歸且所有節點(循環單元)按鏈式連接的遞歸神經網絡。
卷積網絡的輸入只有輸入數據X,而循環神經網絡除了輸入數據X之外,每一步的輸出會作為下一步的輸入,如此循環,并且每一次采用相同的激活函數和參數。在每次循環中,x0乘以系數U得到s0,再經過系數W輸入到下一次,以此循環構成循環神經網絡的正向傳播。
在反向傳播中要求損失函數E對參數W的導數,通過鏈式求導法則可以得到右下的公式
循環神經網絡與卷積神經網絡作比較,卷積神經網絡是一個輸出經過網絡產生一個輸出。而循環神經網絡可以實現一個輸入多個輸出(生成圖片描述)、多個輸入一個輸出(文本分類)、多輸入多輸出(機器翻譯、視頻解說)。
RNN使用的是tan激活函數,輸出在-1到1之間,容易梯度消失。距離輸出較遠的步驟對于梯度貢獻很小。
將底層的輸出作為高層的輸入就構成了多層的RNN網絡,而且高層之間也可以進行傳遞,并且可以采用殘差連接防止過擬合。
1.2、長短期記憶網絡
RNN的每次傳播之間只有一個參數W,用這一個參數很難描述大量的、復雜的信息需求,為了解決這個問題引入了長短期記憶網絡(Long Short Term Memory,LSTM)。這個網絡可以進行選擇性機制,選擇性的輸入、輸出需要使用的信息以及選擇性地遺忘不需要的信息。選擇性機制的實現是通過Sigmoid門實現的,sigmoid函數的輸出介于0到1之間,0代表遺忘,1代表記憶,0.5代表記憶50%
LSTM網絡結構如下圖所示,
如上右圖所示為本輪運算的隱含狀態state,當前狀態由上一狀態和遺忘門結果作點積,再加上傳入們結果得到
如下左圖所示為遺忘門結構,上一輪的輸出ht-1和數據xt在經過遺忘門選擇是否遺忘之后,產生遺忘結果ft
如下中圖所示為傳入門結構,ht-1和xt在經過遺忘門的結果it和tanh的結果Ct作點積運算得到本次運算的輸入
如下右圖所示為輸出門結構,ht-1和xt經過遺忘門的結果ot與當狀態作點積產生本次的輸出
如下實現LSTM網絡,首先定義_generate_params函數用于生成每個門所需的參數,調用該函數定義輸入門、輸出門、遺忘門、和中間狀態tanh的參數。每個門的參數都是三個,輸入x、h的權重和偏置值。
接著開始進行LSTM的每輪循環計算,輸入門計算就是將輸入embedded_input矩陣乘以輸入門參數x_in,再加上h和對應參數相乘的結果,最后再加上偏置值b_in經過sigmoid便得到輸入門結果。
同理進行矩陣相乘加偏置操作得到遺忘門、輸出門的結果。中間態tanh與三個門的操作類似,只不過最后經過tanh函數。
將上一個隱含態state乘以遺忘門加上輸入門乘以中間態的結果就得到當前的隱含態state
將當前的state經過tanh函數再加上輸出門就得到本輪的輸出h
經過多輪輸入循環得到的就是LSTM網絡的最后輸出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# 實現LSTM網絡 # 生成Cell網格所需參數 def _generate_paramas(x_size, h_size, b_size): x_w = tf.get_variable( 'x_weight' , x_size) h_w = tf.get_variable( 'h_weight' , h_size) bias = tf.get_variable( 'bias' , b_size, initializer = tf.constant_initializer( 0.0 )) return x_w, h_w, bias scale = 1.0 / math.sqrt(embedding_size + lstm_nodes[ - 1 ]) / 3.0 lstm_init = tf.random_uniform_initializer( - scale, scale) with tf.variable_scope( 'lstm_nn' , initializer = lstm_init): # 輸入門參數 with tf.variable_scope( 'input' ): x_in, h_in, b_in = _generate_paramas( x_size = [embedding_size, lstm_nodes[ 0 ]], h_size = [lstm_nodes[ 0 ], lstm_nodes[ 0 ]], b_size = [ 1 , lstm_nodes[ 0 ]] ) # 輸出門參數 with tf.variable_scope( 'output' ): x_out, h_out, b_out = _generate_paramas( x_size = [embedding_size, lstm_nodes[ 0 ]], h_size = [lstm_nodes[ 0 ], lstm_nodes[ 0 ]], b_size = [ 1 , lstm_nodes[ 0 ]] ) # 遺忘門參數 with tf.variable_scope( 'forget' ): x_f, h_f, b_f = _generate_paramas( x_size = [embedding_size, lstm_nodes[ 0 ]], h_size = [lstm_nodes[ 0 ], lstm_nodes[ 0 ]], b_size = [ 1 , lstm_nodes[ 0 ]] ) # 中間狀態參數 with tf.variable_scope( 'mid_state' ): x_m, h_m, b_m = _generate_paramas( x_size = [embedding_size, lstm_nodes[ 0 ]], h_size = [lstm_nodes[ 0 ], lstm_nodes[ 0 ]], b_size = [ 1 , lstm_nodes[ 0 ]] ) # 兩個初始化狀態,隱含狀態state和初始輸入h state = tf.Variable(tf.zeros([batch_size, lstm_nodes[ 0 ]]), trainable = False ) h = tf.Variable(tf.zeros([batch_size, lstm_nodes[ 0 ]]), trainable = False ) # 遍歷LSTM每輪循環,即每個詞的輸入過程 for i in range (max_words): # 取出每輪輸入,三維數組embedd_inputs的第二維代表訓練的輪數 embedded_input = embedded_inputs[:, i, :] # 將取出的結果reshape為二維 embedded_input = tf.reshape(embedded_input, [batch_size, embedding_size]) # 遺忘門計算 forget_gate = tf.sigmoid(tf.matmul(embedded_input, x_f) + tf.matmul(h, h_f) + b_f) # 輸入門計算 input_gate = tf.sigmoid(tf.matmul(embedded_input, x_in) + tf.matmul(h, h_in) + b_in) # 輸出門 output_gate = tf.sigmoid(tf.matmul(embedded_input, x_out) + tf.matmul(h, h_out) + b_out) # 中間狀態 mid_state = tf.tanh(tf.matmul(embedded_input, x_m) + tf.matmul(h, h_m) + b_m) # 計算隱含狀態state和輸入h state = state * forget_gate + input_gate * mid_state h = output_gate + tf.tanh(state) # 最后遍歷的結果就是LSTM的輸出 last_output = h |
1.3、文本分類
文本分類問題就是對輸入的文本字符串進行分析判斷,之后再輸出結果。字符串無法直接輸入到RNN網絡,因此在輸入之前需要先對文本拆分成單個詞組,將詞組進行embedding編碼成一個向量,每輪輸入一個詞組,當最后一個詞組輸入完畢時得到輸出結果也是一個向量。embedding將一個詞對應為一個向量,向量的每一個維度對應一個浮點值,動態調整這些浮點值使得embedding編碼和詞的意思相關。這樣網絡的輸入輸出都是向量,再最后進行全連接操作對應到不同的分類即可。
RNN網絡不可避免地帶來問題就是最后的輸出結果受最近的輸入較大,而之前較遠的輸入可能無法影響結果,這就是信息瓶頸問題,為了解決這個問題引入了雙向LSTM。雙向LSTM不僅增加了反向信息傳播,而且每一輪的都會有一個輸出,將這些輸出進行組合之后再傳給全連接層。
另一個文本分類模型就是HAN(Hierarchy Attention Network),首先將文本分為句子、詞語級別,將輸入的詞語進行編碼然后相加得到句子的編碼,然后再將句子編碼相加得到最后的文本編碼。而attention是指在每一個級別的編碼進行累加前,加入一個加權值,根據不同的權值對編碼進行累加。
由于輸入的文本長度不統一,所以無法直接使用神經網絡進行學習,為了解決這個問題,可以將輸入文本的長度統一為一個最大值,勉強采用卷積神經網絡進行學習,即TextCNN。文本卷積網絡的卷積過程采用的是多通道一維卷積,與二維卷積相比一維卷積就是卷積核只在一個方向上移動。例如下左圖所示,1×1+5×2+2×2+4×3+3×3+3×4=48,之后卷積核向下移動一格得到45,以此類推。如下右圖所示,輸入長短不一的多個詞匯。首先將其全部填充為六通道的embedding數組,然后采用六通道的一維卷積核從上到下進行卷積,得到一維的數組,然后再經過池化層和全連接層后輸出。
可以看到CNN網絡不能完美處理輸入長短不一的序列式問題,但是它可以并行處理多個詞組,效率更高,而RNN可以更好地處理序列式的輸入,將兩者的優勢結合起來就構成了R-CNN模型。首先通過雙向RNN網絡對輸入進行特征提取,再使用CNN進一步提取,之后通過池化層將每一步的特征融合在一起,最后經過全連接層進行分類。
無論什么模型都需要使用embedding將輸入轉化為一個向量,當輸入過大時,轉化的embedding層參數就會過大,不僅不利于存儲,還會造成過擬合,因此需要對embedding層進行壓縮。原來的embedding編碼是一個參數對應一個輸入,例如wait對應參數x1,for對應x2,the對應x3。如果輸入過多,編碼參數就會很大,可以采用兩個參數對組合的方式來編碼輸入,例如wait對應(x1,x2),for對應(x1,x3)...,這樣就可以極大的節省參數的數量,這就是共享壓縮。
2、通過Text RNN進行文本分類
2.1、數據預處理
在網上下載的文本分類數據集文件如下,分為測試集和訓練集數據,每個訓練集下有四個文件夾,每個文件夾是一個分類,每個分類有1000個txt文件,每個文件中有一條該分類的文本
通過os.walk遍歷所有訓練集文件,將分類文本通過jieba庫拆分成單個詞組,用空格分隔。然后將分類文本添加到開頭,并用制表符分隔,最后將結果輸出到train_segment.txt,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# 將文件中的句子通過jieba庫拆分為單個詞 def segment_word(input_file, output_file): # 循環遍歷訓練數據集的每一個文件 for root, folders, files in os.walk(input_file): print ( 'root:' , root) for folder in folders: print ( 'dir:' , folder) for file in files: file_dir = os.path.join(root, file ) with open (file_dir, 'rb' ) as in_file: # 讀取文件中的文本 sentence = in_file.read() # 通過jieba函數庫將句子拆分為單個詞組 words = jieba.cut(sentence) # 文件夾路徑最后兩個字即為分類名 content = root[ - 2 :] + '\t' # 去除詞組中的空格,排除為空的詞組 for word in words: word = word.strip( ' ' ) if word ! = '': content + = word + ' ' # 換行并將文本寫入輸出文件 content + = '\n' with open (output_file, 'a' ) as outfile: outfile.write(content.strip( ' ' )) |
結果如下:
由于一些詞組出現次數很少,不具有統計意義,所以需要排除,通過get_list()方法統計每個詞組出現的頻率。利用python自帶的dictionary數據類型可以輕易實現詞組數據統計,格式為{"keyword":frequency},frequency記錄keyword出現的次數。如果一個詞組是新出現的則作為新詞條加入詞典,否則將frequency值+1。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# 統計每個詞出現的頻率 def get_list(segment_file, out_file): # 通過詞典保存每個詞組出現的頻率 word_dict = {} with open (segment_file, 'r' ) as seg_file: lines = seg_file.readlines() # 遍歷文件的每一行 for line in lines: line = line.strip( '\r\n' ) # 將一行按空格拆分為每個詞,統計詞典 for word in line.split( ' ' ): # 如果這個詞組沒有在word_dict詞典中出現過,則新建詞典項并設為0 word_dict.setdefault(word, 0 ) # 將詞典word_dict中詞組word對應的項計數加一 word_dict[word] + = 1 # 將詞典中的列表排序,關鍵字為列表下標為1的項,且逆序 sorted_list = sorted (word_dict.items(), key = lambda d: d[ 1 ], reverse = True ) with open (out_file, 'w' ) as outfile: # 將排序后的每條詞典項寫入文件 for item in sorted_list: outfile.write( '%s\t%d\n' % (item[ 0 ], item[ 1 ])) |
統計結果如下:
2.2、數據讀入
直接使用詞組無法進行編碼學習,需要將詞組轉化為embedding編碼,根據剛才生成的train_list列表,按照從前往后的順序為每個詞組編號,如果詞組頻率小于閾值則排除掉。通過Word_list類來構建訓練數據、測試數據的詞組對象,在類的構造函數__init__()實現詞組的編碼。并定義類方法sentence2id將拆分好的句子詞組轉化為對應的id數組,如果詞組列表中沒有該詞,則將該值置為-1。
在定義類之前首先規定一些超參數供后續使用:
1
2
3
4
5
6
7
8
9
10
11
|
# 定義超參數 embedding_size = 32 # 每個詞組向量的長度 max_words = 10 # 一個句子最大詞組長度 lstm_layers = 2 # lstm網絡層數 lstm_nodes = [ 64 , 64 ] # lstm每層結點數 fc_nodes = 64 # 全連接層結點數 batch_size = 100 # 每個批次樣本數據 lstm_grads = 1.0 # lstm網絡梯度 learning_rate = 0.001 # 學習率 word_threshold = 10 # 詞表頻率門限,低于該值的詞語不統計 num_classes = 4 # 最后的分類結果有4類 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class Word_list: def __init__( self , filename): # 用詞典類型來保存需要統計的詞組及其頻率 self ._word_dic = {} with open (filename, 'r' ,encoding = 'GB2312' ,errors = 'ignore' ) as f: lines = f.readlines() for line in lines: word, freq = line.strip( '\r\n' ).split( '\t' ) freq = int (freq) # 如果詞組的頻率小于閾值,跳過不統計 if freq < word_threshold: continue # 詞組列表中每個詞組都是不重復的,按序添加到word_dic中即可,下一個詞組id就是當前word_dic的長度 word_id = len ( self ._word_dic) self ._word_dic[word] = word_id def sentence2id( self , sentence): # 將以空格分割的句子返回word_dic中對應詞組的id,若不存在返回-1 sentence_id = [ self ._word_dic.get(word, - 1 ) for word in sentence.split()] return sentence_id train_list = Word_list(train_list_dir) |
定義TextData類來完成數據的讀入和管理,在__init__()函數中讀取剛才處理好的train_segment.txt文件,根據制表符分割類別標記和句子詞組,將類別和句子分別轉化為數字id。如果句子的詞組超過了最大閾值,則截去后面多余的,如果不夠則用-1填充。定義類函數_shuffle_data()用于清洗數據,next_batch()用于按批次返回數據和標簽,get_size()用于返回詞組總條數。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
class TextData: def __init__( self , segment_file, word_list): self .inputs = [] self .labels = [] # 通過詞典管理文本類別 self .label_dic = { '體育' : 0 , '校園' : 1 , '女性' : 2 , '出版' : 3 } self .index = 0 with open (segment_file, 'r' ) as f: lines = f.readlines() for line in lines: # 文本按制表符分割,前面為類別,后面為句子 label, content = line.strip( '\r\n' ).split( '\t' )[ 0 : 2 ] self .content_size = len (content) # 將類別轉換為數字id label_id = self .label_dic.get(label) # 將句子轉化為embedding數組 content_id = word_list.sentence2id(content) # 如果句子的詞組長超過最大值,截取max_words長度以內的id值 content_id = content_id[ 0 :max_words] # 如果不夠則填充-1,直到max_words長度 padding_num = max_words - len (content_id) content_id = content_id + [ - 1 for i in range (padding_num)] self .inputs.append(content_id) self .labels.append(label_id) self .inputs = np.asarray( self .inputs, dtype = np.int32) self .labels = np.asarray( self .labels, dtype = np.int32) self ._shuffle_data() # 對數據按照(input,label)對來打亂順序 def _shuffle_data( self ): r_index = np.random.permutation( len ( self .inputs)) self .inputs = self .inputs[r_index] self .labels = self .labels[r_index] # 返回一個批次的數據 def next_batch( self , batch_size): # 當前索引+批次大小得到批次的結尾索引 end_index = self .index + batch_size # 如果結尾索引大于樣本總數,則打亂所有樣本從頭開始 if end_index > len ( self .inputs): self ._shuffle_data() self .index = 0 end_index = batch_size # 按索引返回一個批次的數據 batch_inputs = self .inputs[ self .index:end_index] batch_labels = self .labels[ self .index:end_index] self .index = end_index return batch_inputs, batch_labels # 獲取詞表數目 def get_size( self ): return self .content_size # 訓練數據集對象 train_set = TextData(train_segment_dir, train_list) # print(data_set.next_batch(10)) # 訓練數據集詞組條數 train_list_size = train_set.get_size() |
2.3、構建計算圖模型
定義函數create_model來實現計算圖模型的構建。首先定義模型輸入的占位符,分別為輸入文本inputs、輸出標簽outputs、Dropout的比率keep_prob。
首先構建embedding層,將輸入的inputs編碼抽取出來拼接成一個矩陣,例如輸入[1,8,3]則抽取embeding[1]、embedding[8]和embedding[3]拼接成一個矩陣
接下來構建LSTM網絡,這里構建了兩層網絡,每層的結點數在之前的參數lstm_node[]數組中定義。每個cell的構建通過函數tf.contrib.rnn.BasicLSTMCell實現,之后經過Dropout操作。再將兩個cell合并為一個LSTM網絡,通過函數tf.nn.dynamic_rnn將輸入embedded_inputs輸入到LSTM網絡中進行訓練得到輸出rnn_output。這是一個三維數組,第二維表示訓練的步數,我們只取最后一維的結果,即下標值為-1.
接下來構建全連接層,通過tf.layers.dense函數定義全連接層,再經過一個dropout操作后將輸出映射到類別上,類別的種類的參數num_classes,得到估計值logits
接下來就可以求損失、精確率等評估值了。計算算預測值logits與標簽值outputs之間的交叉熵損失值,接下來通過arg_max計算預測值,進而求準確率
接下來定義訓練方法,通過梯度裁剪應用到變量上以防止梯度消失。
最后將輸入占位符、損失等評估值、其他訓練參數返回到調用函數的外部。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# 創建計算圖模型 def create_model(list_size, num_classes): # 定義輸入輸出占位符 inputs = tf.placeholder(tf.int32, (batch_size, max_words)) outputs = tf.placeholder(tf.int32, (batch_size,)) # 定義是否dropout的比率 keep_prob = tf.placeholder(tf.float32, name = 'keep_rate' ) # 記錄訓練的總次數 global_steps = tf.Variable(tf.zeros([], tf.float32), name = 'global_steps' , trainable = False ) # 將輸入轉化為embedding編碼 with tf.variable_scope( 'embedding' , initializer = tf.random_normal_initializer( - 1.0 , 1.0 )): embeddings = tf.get_variable( 'embedding' , [list_size, embedding_size], tf.float32) # 將指定行的embedding數值抽取出來 embedded_inputs = tf.nn.embedding_lookup(embeddings, inputs) # 實現LSTM網絡 scale = 1.0 / math.sqrt(embedding_size + lstm_nodes[ - 1 ]) / 3.0 lstm_init = tf.random_uniform_initializer( - scale, scale) with tf.variable_scope( 'lstm_nn' , initializer = lstm_init): # 構建兩層的lstm,每層結點數為lstm_nodes[i] cells = [] for i in range (lstm_layers): cell = tf.contrib.rnn.BasicLSTMCell(lstm_nodes[i], state_is_tuple = True ) # 實現Dropout操作 cell = tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob = keep_prob) cells.append(cell) # 合并兩個lstm的cell cell = tf.contrib.rnn.MultiRNNCell(cells) # 將embedded_inputs輸入到RNN中進行訓練 initial_state = cell.zero_state(batch_size, tf.float32) # runn_output:[batch_size,num_timestep,lstm_outputs[-1] rnn_output, _ = tf.nn.dynamic_rnn(cell, embedded_inputs, initial_state = initial_state) last_output = rnn_output[:, - 1 , :] # 構建全連接層 fc_init = tf.uniform_unit_scaling_initializer(factor = 1.0 ) with tf.variable_scope( 'fc' , initializer = fc_init): fc1 = tf.layers.dense(last_output, fc_nodes, activation = tf.nn.relu, name = 'fc1' ) fc1_drop = tf.contrib.layers.dropout(fc1, keep_prob) logits = tf.layers.dense(fc1_drop, num_classes, name = 'fc2' ) # 定義評估指標 with tf.variable_scope( 'matrics' ): # 計算損失值 softmax_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits = logits, labels = outputs) loss = tf.reduce_mean(softmax_loss) # 計算預測值,求第1維中最大值的下標,例如[1,1,5,3,2] argmax=> 2 y_pred = tf.argmax(tf.nn.softmax(logits), 1 , output_type = tf.int32) # 求準確率 correct_prediction = tf.equal(outputs, y_pred) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # 定義訓練方法 with tf.variable_scope( 'train_op' ): train_var = tf.trainable_variables() # for var in train_var: # print(var) # 對梯度進行裁剪防止梯度消失或者梯度爆炸 grads, _ = tf.clip_by_global_norm(tf.gradients(loss, train_var), clip_norm = lstm_grads) # 將梯度應用到變量上去 optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate) train_op = optimizer.apply_gradients( zip (grads, train_var), global_steps) # 以元組的方式將結果返回 return ((inputs, outputs, keep_prob), (loss, accuracy), (train_op, global_steps)) # 調用構建函數,接收解析返回的參數 placeholders, matrics, others = create_model(train_list_size, num_classes) inputs, outputs, keep_prob = placeholders loss, accuracy = matrics train_op, global_steps = others |
2.4、進行訓練
通過Session運行計算圖模型,從train_set中按批次獲取訓練集數據并填充占位符,運行sess.run,獲取損失值、準確率等中間值打印
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# 進行訓練 init_op = tf.global_variables_initializer() train_keep_prob = 0.8 # 訓練集的dropout比率 train_steps = 10000 with tf.Session() as sess: sess.run(init_op) for i in range (train_steps): # 按批次獲取訓練集數據 batch_inputs, batch_labels = train_set.next_batch(batch_size) # 運行計算圖 res = sess.run([loss, accuracy, train_op, global_steps], feed_dict = {inputs: batch_inputs, outputs: batch_labels, keep_prob: train_keep_prob}) loss_val, acc_val, _, g_step_val = res if g_step_val % 20 = = 0 : print ( '第%d輪訓練,損失:%3.3f,準確率:%3.5f' % (g_step_val, loss_val, acc_val)) |
在我的數據集進行一萬輪訓練后,訓練集的準確率在90%左右徘徊
源代碼及相關數據文件:https://github.com/SuperTory/MachineLearning/tree/master/TextRNN
希望本文所述對大家Python程序設計有所幫助。
原文鏈接:https://blog.csdn.net/theVicTory/article/details/101017006