Keras之注意力模型实现

今天药忘吃喽~ 2023-06-02 09:12 123阅读 0赞

学习的一个github上的代码,分析了一下实现过程。代码下载链接:https://github.com/Choco31415/Attention_Network_With_Keras

代码的主要目标是通过一个描述时间的字符串,预测为数字形式的字符串。如“ten before ten o’clock a.m”预测为09:50

在jupyter上运行,代码如下:

1,导入模块,好像并没有全部使用到,如Permute,Multiply,Reshape,LearningRateScheduler等,这些应该是优化的时候使用的

  1. 1 from keras.layers import Bidirectional, Concatenate, Permute, Dot, Input, LSTM, Multiply, Reshape
  2. 2 from keras.layers import RepeatVector, Dense, Activation, Lambda
  3. 3 from keras.optimizers import Adam
  4. 4 #from keras.utils import to_categorical
  5. 5 from keras.models import load_model, Model
  6. 6 #from keras.callbacks import LearningRateScheduler
  7. 7 import keras.backend as K
  8. 8
  9. 9 import matplotlib.pyplot as plt
  10. 10 %matplotlib inline
  11. 11
  12. 12 import random
  13. 13 #import math
  14. 14
  15. 15 import json
  16. 16 import numpy as np

2,加载数据集,以及翻译前和翻译后的词典

  1. 1 with open('data/Time Dataset.json','r') as f:
  2. 2 dataset = json.loads(f.read())
  3. 3 with open('data/Time Vocabs.json','r') as f:
  4. 4 human_vocab, machine_vocab = json.loads(f.read())
  5. 5
  6. 6 human_vocab_size = len(human_vocab)
  7. 7 machine_vocab_size = len(machine_vocab)

这里human_vocab词典是将每个字符映射到索引,machine_vocab也是将翻译后的字符映射到索引,因为翻译后的时间只包含0-9以及:

3,定义数据处理方法

tokenize为将字符映射到索引,one-hot为对每个映射后的索引做了个one-hot编码处理

  1. 1 def preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty):
  2. 2 """
  3. 3 A method for tokenizing data.
  4. 4
  5. 5 Inputs:
  6. 6 dataset - A list of sentence data pairs.
  7. 7 human_vocab - A dictionary of tokens (char) to id's.
  8. 8 machine_vocab - A dictionary of tokens (char) to id's.
  9. 9 Tx - X data size
  10. 10 Ty - Y data size
  11. 11
  12. 12 Outputs:
  13. 13 X - Sparse tokens for X data
  14. 14 Y - Sparse tokens for Y data
  15. 15 Xoh - One hot tokens for X data
  16. 16 Yoh - One hot tokens for Y data
  17. 17 """
  18. 18
  19. 19 # Metadata
  20. 20 m = len(dataset)
  21. 21
  22. 22 # Initialize
  23. 23 X = np.zeros([m, Tx], dtype='int32')
  24. 24 Y = np.zeros([m, Ty], dtype='int32')
  25. 25
  26. 26 # Process data
  27. 27 for i in range(m):
  28. 28 data = dataset[i]
  29. 29 X[i] = np.array(tokenize(data[0], human_vocab, Tx))
  30. 30 Y[i] = np.array(tokenize(data[1], machine_vocab, Ty))
  31. 31
  32. 32 # Expand one hots
  33. 33 Xoh = oh_2d(X, len(human_vocab))
  34. 34 Yoh = oh_2d(Y, len(machine_vocab))
  35. 35
  36. 36 return (X, Y, Xoh, Yoh)
  37. 37
  38. 38 def tokenize(sentence, vocab, length):
  39. 39 """
  40. 40 Returns a series of id's for a given input token sequence.
  41. 41
  42. 42 It is advised that the vocab supports <pad> and <unk>.
  43. 43
  44. 44 Inputs:
  45. 45 sentence - Series of tokens
  46. 46 vocab - A dictionary from token to id
  47. 47 length - Max number of tokens to consider
  48. 48
  49. 49 Outputs:
  50. 50 tokens -
  51. 51 """
  52. 52 tokens = [0]*length
  53. 53 for i in range(length):
  54. 54 char = sentence[i] if i < len(sentence) else "<pad>"
  55. 55 char = char if (char in vocab) else "<unk>"
  56. 56 tokens[i] = vocab[char]
  57. 57
  58. 58 return tokens
  59. 59
  60. 60 def ids_to_keys(sentence, vocab):
  61. 61 """
  62. 62 Converts a series of id's into the keys of a dictionary.
  63. 63 """
  64. 64 reverse_vocab = {v: k for k, v in vocab.items()}
  65. 65
  66. 66 return [reverse_vocab[id] for id in sentence]
  67. 67
  68. 68 def oh_2d(dense, max_value):
  69. 69 """
  70. 70 Create a one hot array for the 2D input dense array.
  71. 71 """
  72. 72 # Initialize
  73. 73 oh = np.zeros(np.append(dense.shape, [max_value]))
  74. 74 # oh=np.zeros((dense.shape[0],dense.shape[1],max_value)) 这样写更为直观
  75. 75
  76. 76 # Set correct indices
  77. 77 ids1, ids2 = np.meshgrid(np.arange(dense.shape[0]), np.arange(dense.shape[1]))
  78. 78
  79. 79 # 'F'表示一列列的展开,默认按行展开,这一行看不太懂
  80. 80 oh[ids1.flatten(), ids2.flatten(), dense.flatten('F').astype(int)] = 1
  81. 81
  82. 82 return oh

4,输入中最长的字符串为41,输出长度都是5,训练测试数据使用one-hot编码后的,训练集占比80%

  1. 1 Tx = 41 # Max x sequence length
  2. 2 Ty = 5 # y sequence length
  3. 3 X, Y, Xoh, Yoh = preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty)
  4. 4
  5. 5 # Split data 80-20 between training and test
  6. 6 train_size = int(0.8*len(dataset))
  7. 7 Xoh_train = Xoh[:train_size]
  8. 8 Yoh_train = Yoh[:train_size]
  9. 9 Xoh_test = Xoh[train_size:]
  10. 10 Yoh_test = Yoh[train_size:]

5,定义每次新预测时注意力的更新

个人解释:注意力机制即我们对输入产生一个偏好分布,让模型知道集中注意哪部分的输入,为此,在预测输出yi-1后,预测yi时,我们需要不同的注意力分布,即重新生成这个分布

  1. 1 # Define part of the attention layer gloablly so as to
  2. 2 # share the same layers for each attention step.
  3. 3 def softmax(x):
  4. 4 return K.softmax(x, axis=1)
  5. 5 # 重复矢量,用于将一个矢量扩展成一个维度合适的tensor
  6. 6 at_repeat = RepeatVector(Tx)
  7. 7 # 在最后一位进行维度合并
  8. 8 at_concatenate = Concatenate(axis=-1)
  9. 9 at_dense1 = Dense(8, activation="tanh")
  10. 10 at_dense2 = Dense(1, activation="relu")
  11. 11 at_softmax = Activation(softmax, name='attention_weights')
  12. 12 # 这里参数名为axes。。虽然和axis是一个意思
  13. 13 at_dot = Dot(axes=1)
  14. 14
  15. 15 # 每次新的预测的时候都需要更新attention
  16. 16 def one_step_of_attention(h_prev, a):
  17. 17 """
  18. 18 Get the context.
  19. 19
  20. 20 Input:
  21. 21 h_prev - Previous hidden state of a RNN layer (m, n_h)
  22. 22 a - Input data, possibly processed (m, Tx, n_a)
  23. 23
  24. 24 Output:
  25. 25 context - Current context (m, Tx, n_a)
  26. 26 """
  27. 27 # Repeat vector to match a's dimensions
  28. 28 h_repeat = at_repeat(h_prev)
  29. 29 # Calculate attention weights
  30. 30 i = at_concatenate([a, h_repeat]) #对应公式中x和yt-1合并
  31. 31 i = at_dense1(i)#对应公式中第一个Dense
  32. 32 i = at_dense2(i)#第二个Dense
  33. 33 attention = at_softmax(i)#Softmax,此时得到一个注意力分布
  34. 34 # Calculate the context
  35. 35 # 这里使用新的attention与输入相乘,即注意力的核心原理:对于输入产生某种偏好分布
  36. 36 context = at_dot([attention, a])#Dot,使用注意力偏好分布作用于输入,返回更新后的输入
  37. 37
  38. 38 return context

以上,注意力的计算公式如下所示:

1254945-20190910102627585-1932185625.png

6,定义注意力层

  1. 1 def attention_layer(X, n_h, Ty):
  2. 2 """
  3. 3 Creates an attention layer.
  4. 4
  5. 5 Input:
  6. 6 X - Layer input (m, Tx, x_vocab_size)
  7. 7 n_h - Size of LSTM hidden layer
  8. 8 Ty - Timesteps in output sequence
  9. 9
  10. 10 Output:
  11. 11 output - The output of the attention layer (m, Tx, n_h)
  12. 12 """
  13. 13 # Define the default state for the LSTM layer
  14. 14 # Lambda层不需要训练参数,这里初始化状态
  15. 15 h = Lambda(lambda X: K.zeros(shape=(K.shape(X)[0], n_h)))(X)
  16. 16 c = Lambda(lambda X: K.zeros(shape=(K.shape(X)[0], n_h)))(X)
  17. 17 # Messy, but the alternative is using more Input()
  18. 18
  19. 19 at_LSTM = LSTM(n_h, return_state=True)
  20. 20
  21. 21 output = []
  22. 22
  23. 23 # Run attention step and RNN for each output time step
  24.     # 这里就是每次预测时,先更新context,用这个新的context通过LSTM获得各个输出h
  25. 24 for _ in range(Ty):
  26. 25 # 第一次使用初始化的注意力参数作用输入X,之后使用上一次的h作用输入X,保证每次预测的时候注意力都对输入产生偏好
  27. 26 context = one_step_of_attention(h, X)
  28. 27 # 得到新的输出
  29. 28 h, _, c = at_LSTM(context, initial_state=[h, c])
  30. 29
  31. 30 output.append(h)
  32. 31 # 返回全部输出
  33. 32 return output

7,定义模型

  1. 1 layer3 = Dense(machine_vocab_size, activation=softmax)
  2. 2 layer1_size=32
  3. 3 layer2_size=64
  4. 4 def get_model(Tx, Ty, layer1_size, layer2_size, x_vocab_size, y_vocab_size):
  5. 5 """
  6. 6 Creates a model.
  7. 7
  8. 8 input:
  9. 9 Tx - Number of x timesteps
  10. 10 Ty - Number of y timesteps
  11. 11 size_layer1 - Number of neurons in BiLSTM
  12. 12 size_layer2 - Number of neurons in attention LSTM hidden layer
  13. 13 x_vocab_size - Number of possible token types for x
  14. 14 y_vocab_size - Number of possible token types for y
  15. 15
  16. 16 Output:
  17. 17 model - A Keras Model.
  18. 18 """
  19. 19
  20. 20 # Create layers one by one
  21. 21 X = Input(shape=(Tx, x_vocab_size))
  22. 22 # 使用双向LSTM
  23. 23 a1 = Bidirectional(LSTM(layer1_size, return_sequences=True), merge_mode='concat')(X)
  24. 24
  25. 25 # 注意力层
  26. 26 a2 = attention_layer(a1, layer2_size, Ty)
  27. 27 # 对输出h应用一个Dense得到最后输出y
  28. 28 a3 = [layer3(timestep) for timestep in a2]
  29. 29
  30. 30 # Create Keras model
  31. 31 model = Model(inputs=[X], outputs=a3)
  32. 32
  33. 33 return model

8,训练模型

  1. 1 model = get_model(Tx, Ty, layer1_size, layer2_size, human_vocab_size, machine_vocab_size)
  2. 2 #这里我们可以看下模型的构成,需要提前安装graphviz模块
  3. 3 from keras.utils import plot_model
  4. 4 #在当前路径下生成模型各层的结构图,自己去看看理解
  5. 5 plot_model(model,show_shapes=True,show_layer_names=True)
  6. 6 opt = Adam(lr=0.05, decay=0.04, clipnorm=1.0)
  7. 7 model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
  8. 8 # (8000,5,11)->(5,8000,11),以时间序列而非样本序列去训练,因为多个样本间是没有“序”的关系的,这样RNN也学不到啥东西
  9. 9 outputs_train = list(Yoh_train.swapaxes(0,1))
  10. 10 model.fit([Xoh_train], outputs_train, epochs=30, batch_size=100,verbose=2

如下为模型的结构图

1254945-20190910110420265-1731919968.png

9,评估

  1. 1 outputs_test = list(Yoh_test.swapaxes(0,1))
  2. 2 score = model.evaluate(Xoh_test, outputs_test)
  3. 3 print('Test loss: ', score[0])

10,预测

这里就随机对数据集中的一个样本进行预测

  1. 3 i = random.randint(0, len(dataset))
  2. 4
  3. 5 def get_prediction(model, x):
  4. 6 prediction = model.predict(x)
  5. 7 max_prediction = [y.argmax() for y in prediction]
  6. 8 str_prediction = "".join(ids_to_keys(max_prediction, machine_vocab))
  7. 9 return (max_prediction, str_prediction)
  8. 10
  9. 11 max_prediction, str_prediction = get_prediction(model, Xoh[i:i+1])
  10. 12
  11. 13 print("Input: " + str(dataset[i][0]))
  12. 14 print("Tokenized: " + str(X[i]))
  13. 15 print("Prediction: " + str(max_prediction))
  14. 16 print("Prediction text: " + str(str_prediction))

11,还可以查看一下注意力的图像

  1. 1 i = random.randint(0, len(dataset))
  2. 2
  3. 3 def plot_attention_graph(model, x, Tx, Ty, human_vocab, layer=7):
  4. 4 # Process input
  5. 5 tokens = np.array([tokenize(x, human_vocab, Tx)])
  6. 6 tokens_oh = oh_2d(tokens, len(human_vocab))
  7. 7
  8. 8 # Monitor model layer
  9. 9 layer = model.layers[layer]
  10. 10
  11. 11 layer_over_time = K.function(model.inputs, [layer.get_output_at(t) for t in range(Ty)])
  12. 12 layer_output = layer_over_time([tokens_oh])
  13. 13 layer_output = [row.flatten().tolist() for row in layer_output]
  14. 14
  15. 15 # Get model output
  16. 16 prediction = get_prediction(model, tokens_oh)[1]
  17. 17
  18. 18 # Graph the data
  19. 19 fig = plt.figure()
  20. 20 fig.set_figwidth(20)
  21. 21 fig.set_figheight(1.8)
  22. 22 ax = fig.add_subplot(111)
  23. 23
  24. 24 plt.title("Attention Values per Timestep")
  25. 25
  26. 26 plt.rc('figure')
  27. 27 cax = plt.imshow(layer_output, vmin=0, vmax=1)
  28. 28 fig.colorbar(cax)
  29. 29
  30. 30 plt.xlabel("Input")
  31. 31 ax.set_xticks(range(Tx))
  32. 32 ax.set_xticklabels(x)
  33. 33
  34. 34 plt.ylabel("Output")
  35. 35 ax.set_yticks(range(Ty))
  36. 36 ax.set_yticklabels(prediction)
  37. 37
  38. 38 plt.show()
  39. 39 # 这个图像如何看:先看纵坐标,从上到下,为15:48,生成1和5时注意力在four这个单词上,生成48分钟的时候注意力集中在before单词上,这个例子非常好
  40. 40 plot_attention_graph(model, dataset[i][0], Tx, Ty, human_vocab)

如图所示,在预测1和5时注意力在four单词上,预测4,8时注意力在before单词上,这比较符合逻辑。

1254945-20190910105812127-1544655057.png

转载于:https://www.cnblogs.com/lunge-blog/p/11496287.html

发表评论

表情:
评论列表 (有 0 条评论,123人围观)

还没有评论,来说两句吧...

相关阅读