我就廢話不多說了,大家還是直接看代碼吧!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import keras from keras.layers import Input ,Dense,Conv2D from keras.layers import MaxPooling2D,Flatten,Convolution2D from keras.models import Model import os import numpy as np from PIL import Image from keras.optimizers import SGD from scipy import misc root_path = os.getcwd() train_names = [ 'bear' , 'blackswan' , 'bus' , 'camel' , 'car' , 'cows' , 'dance' , 'dog' , 'hike' , 'hoc' , 'kite' , 'lucia' , 'mallerd' , 'pigs' , 'soapbox' , 'stro' , 'surf' , 'swing' , 'train' , 'walking' ] test_names = [ 'boat' , 'dance-jump' , 'drift-turn' , 'elephant' , 'libby' ] def load_data(seq_names,data_number,seq_len): #生成圖片對 print ( 'loading data.....' ) frame_num = 51 train_data1 = [] train_data2 = [] train_lab = [] count = 0 while count < data_number: count = count + 1 pos_neg = np.random.randint( 0 , 2 ) if pos_neg = = 0 : seed1 = np.random.randint( 0 ,seq_len) seed2 = np.random.randint( 0 ,seq_len) while seed1 = = seed2: seed1 = np.random.randint( 0 ,seq_len) seed2 = np.random.randint( 0 ,seq_len) frame1 = np.random.randint( 1 ,frame_num) frame2 = np.random.randint( 1 ,frame_num) path1 = os.path.join(root_path, 'data' , 'simility_data' ,seq_names[seed1], str (frame1) + '.jpg' ) path2 = os.path.join(root_path, 'data' , 'simility_data' , seq_names[seed2], str (frame2) + '.jpg' ) image1 = np.array(misc.imresize(Image. open (path1),[ 224 , 224 ])) image2 = np.array(misc.imresize(Image. open (path2),[ 224 , 224 ])) train_data1.append(image1) train_data2.append(image2) train_lab.append(np.array( 0 )) else : seed = np.random.randint( 0 ,seq_len) frame1 = np.random.randint( 1 , frame_num) frame2 = np.random.randint( 1 , frame_num) path1 = os.path.join(root_path, 'data' , 'simility_data' , seq_names[seed], str (frame1) + '.jpg' ) path2 = os.path.join(root_path, 'data' , 'simility_data' , seq_names[seed], str (frame2) + '.jpg' ) image1 = np.array(misc.imresize(Image. open (path1),[ 224 , 224 ])) image2 = np.array(misc.imresize(Image. open (path2),[ 224 , 224 ])) train_data1.append(image1) train_data2.append(image2) train_lab.append(np.array( 1 )) return np.array(train_data1),np.array(train_data2),np.array(train_lab) def vgg_16_base(input_tensor): net = Conv2D( 64 ( 3 , 3 ),activation = 'relu' ,padding = 'same' ,input_shape = ( 224 , 224 , 3 ))(input_tensor) net = Convolution2D( 64 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = MaxPooling2D(( 2 , 2 ),strides = ( 2 , 2 ))(net) net = Convolution2D( 128 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 128 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = MaxPooling2D(( 2 , 2 ),strides = ( 2 , 2 ))(net) net = Convolution2D( 256 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 256 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 256 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = MaxPooling2D(( 2 , 2 ),strides = ( 2 , 2 ))(net) net = Convolution2D( 512 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 512 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 512 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = MaxPooling2D(( 2 , 2 ),strides = ( 2 , 2 ))(net) net = Convolution2D( 512 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 512 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = Convolution2D( 512 ,( 3 , 3 ),activation = 'relu' ,padding = 'same' )(net) net = MaxPooling2D(( 2 , 2 ),strides = ( 2 , 2 ))(net) net = Flatten()(net) return net def siamese(vgg_path = None ,siamese_path = None ): input_tensor = Input (shape = ( 224 , 224 , 3 )) vgg_model = Model(input_tensor,vgg_16_base(input_tensor)) if vgg_path: vgg_model.load_weights(vgg_path) input_im1 = Input (shape = ( 224 , 224 , 3 )) input_im2 = Input (shape = ( 224 , 224 , 3 )) out_im1 = vgg_model(input_im1) out_im2 = vgg_model(input_im2) diff = keras.layers.substract([out_im1,out_im2]) out = Dense( 500 ,activation = 'relu' )(diff) out = Dense( 1 ,activation = 'sigmoid' )(out) model = Model([input_im1,input_im2],out) if siamese_path: model.load_weights(siamese_path) return model train = True if train: model = siamese(siamese_path = 'model/simility/vgg.h5' ) sgd = SGD(lr = 1e - 6 ,momentum = 0.9 ,decay = 1e - 6 ,nesterov = True ) model. compile (optimizer = sgd,loss = 'mse' ,metrics = [ 'accuracy' ]) tensorboard = keras.callbacks.TensorBoard(histogram_freq = 5 ,log_dir = 'log/simility' ,write_grads = True ,write_images = True ) ckpt = keras.callbacks.ModelCheckpoint(os.path.join(root_path, 'model' , 'simility' , 'vgg.h5' ), verbose = 1 ,period = 5 ) train_data1,train_data2,train_lab = load_data(train_names, 4000 , 20 ) model.fit([train_data1,train_data2],train_lab,callbacks = [tensorboard,ckpt],batch_size = 64 ,epochs = 50 ) else : model = siamese(siamese_path = 'model/simility/vgg.h5' ) test_im1,test_im2,test_labe = load_data(test_names, 1000 , 5 ) TP = 0 for i in range ( 1000 ): im1 = np.expand_dims(test_im1[i],axis = 0 ) im2 = np.expand_dims(test_im2[i],axis = 0 ) lab = test_labe[i] pre = model.predict([im1,im2]) if pre> 0.9 and lab = = 1 : TP = TP + 1 if pre< 0.9 and lab = = 0 : TP = TP + 1 print ( float (TP) / 1000 ) |
輸入兩張圖片,標記1為相似,0為不相似。
損失函數用的是簡單的均方誤差,有待改成Siamese的對比損失。
總結:
1.隨機生成了幾組1000對的圖片,測試精度0.7左右,效果一般。
2.問題 1)數據加載沒有用生成器,還得繼續認真看看文檔 2)訓練時劃分驗證集的時候,訓練就會報錯,什么輸入維度的問題,暫時沒找到原因 3)輸入的shape好像必須給出數字,本想用shape= input_tensor.get_shape(),能訓練,不能保存模型,會報(NOT JSON Serializable,Dimension(None))類型錯誤
補充知識: keras 問答匹配孿生網絡文本匹配 RNN 帶有數據
用途:
這篇博客解釋了如何搭建一個簡單的匹配網絡。并且使用了keras的lambda層。在建立網絡之前需要對數據進行預處理。處理過后,文本轉變為id字符序列。將一對question,answer分別編碼可以得到兩個向量,在匹配層中比較兩個向量,計算相似度。
網絡圖示:
數據準備:
數據基于網上的淘寶客服對話數據,我也會放在我的下載頁面中。原數據是對話,我篩選了其中label為1的對話。然后將對話拆解成QA對,q是用戶,a是客服。然后對于每個q,有一個a是匹配的,label為1.再選擇一個a,構成新的樣本,label為0.
超參數:
比較簡單,具體看代碼就可以了。
1
2
3
4
5
6
7
8
9
10
11
|
# dialogue max pair q,a max_pair = 30000 # top k frequent word ,k MAX_FEATURES = 450 # fixed q,a length MAX_SENTENCE_LENGTH = 30 embedding_size = 100 batch_size = 600 # learning rate lr = 0.01 HIDDEN_LAYER_SIZE = n_hidden_units = 256 # neurons in hidden layer |
細節:
導入一些庫
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# -*- coding: utf-8 -*- from keras.layers.core import Activation, Dense, Dropout, SpatialDropout1D from keras.layers.embeddings import Embedding from keras.layers.recurrent import LSTM from keras.preprocessing import sequence from sklearn.model_selection import train_test_split import collections import matplotlib.pyplot as plt import nltk import numpy as np import os import pandas as pd from alime_data import convert_dialogue_to_pair from parameter import MAX_SENTENCE_LENGTH,MAX_FEATURES,embedding_size,max_pair,batch_size,HIDDEN_LAYER_SIZE DATA_DIR = "../data" NUM_EPOCHS = 2 # Read training data and generate vocabulary maxlen = 0 num_recs = 0 |
數據準備,先統計詞頻,然后取出top N個常用詞,然后將句子轉換成 單詞id的序列。把句子中的有效id靠右邊放,將句子左邊補齊padding。然后分成訓練集和測試集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
word_freqs = collections.Counter() training_data = convert_dialogue_to_pair(max_pair) num_recs = len ([ 1 for r in training_data.iterrows()]) #for line in ftrain: for line in training_data.iterrows(): label ,sentence_q = line[ 1 ][ 'label' ],line[ 1 ][ 'sentence_q' ] label ,sentence_a = line[ 1 ][ 'label' ],line[ 1 ][ 'sentence_a' ] words = nltk.word_tokenize(sentence_q.lower()) #.decode("ascii", "ignore") if len (words) > maxlen: maxlen = len (words) for word in words: word_freqs[word] + = 1 words = nltk.word_tokenize(sentence_a.lower()) #.decode("ascii", "ignore") if len (words) > maxlen: maxlen = len (words) for word in words: word_freqs[word] + = 1 #num_recs += 1 ## Get some information about our corpus # 1 is UNK, 0 is PAD # We take MAX_FEATURES-1 featurs to accound for PAD vocab_size = min (MAX_FEATURES, len (word_freqs)) + 2 word2index = {x[ 0 ]: i + 2 for i, x in enumerate (word_freqs.most_common(MAX_FEATURES))} word2index[ "PAD" ] = 0 word2index[ "UNK" ] = 1 index2word = {v:k for k, v in word2index.items()} # convert sentences to sequences X_q = np.empty((num_recs, ), dtype = list ) X_a = np.empty((num_recs, ), dtype = list ) y = np.zeros((num_recs, )) i = 0 def chinese_split(x): return x.split( ' ' ) for line in training_data.iterrows(): label ,sentence_q,sentence_a = line[ 1 ][ 'label' ],line[ 1 ][ 'sentence_q' ],line[ 1 ][ 'sentence_a' ] #label, sentence = line.strip().split("\t") #print(label,sentence) #words = nltk.word_tokenize(sentence_q.lower()) words = chinese_split(sentence_q) seqs = [] for word in words: if word in word2index.keys(): seqs.append(word2index[word]) else : seqs.append(word2index[ "UNK" ]) X_q[i] = seqs #print('add_q') #words = nltk.word_tokenize(sentence_a.lower()) words = chinese_split(sentence_a) seqs = [] for word in words: if word in word2index.keys(): seqs.append(word2index[word]) else : seqs.append(word2index[ "UNK" ]) X_a[i] = seqs y[i] = int (label) i + = 1 # Pad the sequences (left padded with zeros) X_a = sequence.pad_sequences(X_a, maxlen = MAX_SENTENCE_LENGTH) X_q = sequence.pad_sequences(X_q, maxlen = MAX_SENTENCE_LENGTH) X = [] for i in range ( len (X_a)): concat = [X_q[i],X_a[i]] X.append(concat) # Split input into training and test Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size = 0.2 , random_state = 42 ) #print(Xtrain.shape, Xtest.shape, ytrain.shape, ytest.shape) Xtrain_Q = [e[ 0 ] for e in Xtrain] Xtrain_A = [e[ 1 ] for e in Xtrain] Xtest_Q = [e[ 0 ] for e in Xtest] Xtest_A = [e[ 1 ] for e in Xtest] |
最后建立網絡。先定義兩個函數,一個是句子編碼器,另一個是lambda層,計算兩個向量的絕對差。將QA分別用encoder處理得到兩個向量,把兩個向量放入lambda層。最后有了2*hidden size的一層,將這一層接一個dense層,接activation,得到分類概率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
from keras.layers.wrappers import Bidirectional from keras.layers import Input ,Lambda from keras.models import Model def encoder(inputs_seqs,rnn_hidden_size,dropout_rate): x_embed = Embedding(vocab_size, embedding_size, input_length = MAX_SENTENCE_LENGTH)(inputs_seqs) inputs_drop = SpatialDropout1D( 0.2 )(x_embed) encoded_Q = Bidirectional( LSTM(rnn_hidden_size, dropout = dropout_rate, recurrent_dropout = dropout_rate, name = 'RNN' ))(inputs_drop) return encoded_Q def absolute_difference(vecs): a,b = vecs #d = a-b return abs (a - b) inputs_Q = Input (shape = (MAX_SENTENCE_LENGTH,), name = "input" ) # x_embed = Embedding(vocab_size, embedding_size, input_length=MAX_SENTENCE_LENGTH)(inputs_Q) # inputs_drop = SpatialDropout1D(0.2)(x_embed) # encoded_Q = Bidirectional(LSTM(HIDDEN_LAYER_SIZE, dropout=0.2, recurrent_dropout=0.2,name= 'RNN'))(inputs_drop) inputs_A = Input (shape = (MAX_SENTENCE_LENGTH,), name = "input_a" ) # x_embed = Embedding(vocab_size, embedding_size, input_length=MAX_SENTENCE_LENGTH)(inputs_A) # inputs_drop = SpatialDropout1D(0.2)(x_embed) # encoded_A = Bidirectional(LSTM(HIDDEN_LAYER_SIZE, dropout=0.2, recurrent_dropout=0.2,name= 'RNN'))(inputs_drop) encoded_Q = encoder(inputs_Q,HIDDEN_LAYER_SIZE, 0.1 ) encoded_A = encoder(inputs_A,HIDDEN_LAYER_SIZE, 0.1 ) # import tensorflow as tf # difference = tf.subtract(encoded_Q, encoded_A) # difference = tf.abs(difference) similarity = Lambda(absolute_difference)([encoded_Q, encoded_A]) # x = concatenate([encoded_Q, encoded_A]) # # matching_x = Dense(128)(x) # matching_x = Activation("sigmoid")(matching_x) polar = Dense( 1 )(similarity) prop = Activation( "sigmoid" )(polar) model = Model(inputs = [inputs_Q,inputs_A], outputs = prop) model. compile (loss = "binary_crossentropy" , optimizer = "adam" , metrics = [ "accuracy" ]) training_history = model.fit([Xtrain_Q, Xtrain_A], ytrain, batch_size = batch_size, epochs = NUM_EPOCHS, validation_data = ([Xtest_Q,Xtest_A], ytest)) # plot loss and accuracy def plot(training_history): plt.subplot( 211 ) plt.title( "Accuracy" ) plt.plot(training_history.history[ "acc" ], color = "g" , label = "Train" ) plt.plot(training_history.history[ "val_acc" ], color = "b" , label = "Validation" ) plt.legend(loc = "best" ) plt.subplot( 212 ) plt.title( "Loss" ) plt.plot(training_history.history[ "loss" ], color = "g" , label = "Train" ) plt.plot(training_history.history[ "val_loss" ], color = "b" , label = "Validation" ) plt.legend(loc = "best" ) plt.tight_layout() plt.show() # evaluate score, acc = model.evaluate([Xtest_Q,Xtest_A], ytest, batch_size = batch_size) print ( "Test score: %.3f, accuracy: %.3f" % (score, acc)) for i in range ( 25 ): idx = np.random.randint( len (Xtest_Q)) #idx2 = np.random.randint(len(Xtest_A)) xtest_Q = Xtest_Q[idx].reshape( 1 ,MAX_SENTENCE_LENGTH) xtest_A = Xtest_A[idx].reshape( 1 ,MAX_SENTENCE_LENGTH) ylabel = ytest[idx] ypred = model.predict([xtest_Q,xtest_A])[ 0 ][ 0 ] sent_Q = " " .join([index2word[x] for x in xtest_Q[ 0 ].tolist() if x ! = 0 ]) sent_A = " " .join([index2word[x] for x in xtest_A[ 0 ].tolist() if x ! = 0 ]) print ( "%.0f\t%d\t%s\t%s" % (ypred, ylabel, sent_Q,sent_A)) |
最后是處理數據的函數,寫在另一個文件里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import nltk from parameter import MAX_FEATURES,MAX_SENTENCE_LENGTH import pandas as pd from collections import Counter def get_pair(number, dialogue): pairs = [] for conversation in dialogue: utterances = conversation[ 2 :].strip( '\n' ).split( '\t' ) # print(utterances) # break for i, utterance in enumerate (utterances): if i % 2 ! = 0 : continue pairs.append([utterances[i], utterances[i + 1 ]]) if len (pairs) > = number: return pairs return pairs def convert_dialogue_to_pair(k): dialogue = open ( 'dialogue_alibaba2.txt' , encoding = 'utf-8' , mode = 'r' ) dialogue = dialogue.readlines() dialogue = [p for p in dialogue if p.startswith( '1' )] print ( len (dialogue)) pairs = get_pair(k, dialogue) # break # print(pairs) data = [] for p in pairs: data.append([p[ 0 ], p[ 1 ], 1 ]) for i, p in enumerate (pairs): data.append([p[ 0 ], pairs[(i + 8 ) % len (pairs)][ 1 ], 0 ]) df = pd.DataFrame(data, columns = [ 'sentence_q' , 'sentence_a' , 'label' ]) print ( len (data)) return df |
以上這篇keras實現基于孿生網絡的圖片相似度計算方式就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/u013491950/article/details/83997234