這篇教程是翻譯Peter Roelants寫的循環(huán)神經網絡教程,作者已經授權翻譯,這是原文。
import itertools import numpy as np import matplotlib import matplotlib.pyplot as plt定義數據集
在這個教程中,我們訓練的數據集是2000個數據,在程序中用 create_dataset 函數產生。每個訓練樣本都是有兩部分 (Xi1, Xi2)組成,每一部分是一個7位的二進制表示,分別由6位的二進制和最右邊一位的0組成(最右邊的0是為了防止二進制相加溢出)。預測目標 ti 也是一個7位的二進制表示,即 ti = Xi1 + Xi2。我們之所以從左到右編碼二進制,是因為我們的RNN網絡是從左到右進行計算的。
輸入數據和預測結果都是被存儲在三維張量里,比如下圖表示了我們的訓練數據 (X_train, T_train)的維度表示。第一維度表示一共有多少組數據(我們第一維度的值是 2000),第二維度表示的是每個時間步長上面的取值,一共7個時間步長,第三維度表示RNN輸入單元神經元的個數(該教程設置的是2個神經元)。下圖就是輸入張量 X_train 的可視化展示:
# Create dataset nb_train = 2000 # Number of training samples # Addition of 2 n-bit numbers can result in a n+1 bit number sequence_len = 7 # Length of the binary sequence def create_dataset(nb_samples, sequence_len): """Create a dataset for binary addition and return as input, targets.""" max_int = 2**(sequence_len-1) # Maximum integer that can be added format_str = "{:0" + str(sequence_len) + "b}" # Transform integer in binary format nb_inputs = 2 # Add 2 binary numbers nb_outputs = 1 # Result is 1 binary number X = np.zeros((nb_samples, sequence_len, nb_inputs)) # Input samples T = np.zeros((nb_samples, sequence_len, nb_outputs)) # Target samples # Fill up the input and target matrix for i in xrange(nb_samples): # Generate random numbers to add nb1 = np.random.randint(0, max_int) nb2 = np.random.randint(0, max_int) # Fill current input and target row. # Note that binary numbers are added from right to left, but our RNN reads # from left to right, so reverse the sequence. X[i,:,0] = list(reversed([int(b) for b in format_str.format(nb1)])) X[i,:,1] = list(reversed([int(b) for b in format_str.format(nb2)])) T[i,:,0] = list(reversed([int(b) for b in format_str.format(nb1+nb2)])) return X, T # Create training samples X_train, T_train = create_dataset(nb_train, sequence_len) print("X_train shape: {0}".format(X_train.shape)) print("T_train shape: {0}".format(T_train.shape))
X_train shape: (2000, 7, 2)
T_train shape: (2000, 7, 1)
# Show an example input and target def printSample(x1, x2, t, y=None): """Print a sample in a more visual way.""" x1 = "".join([str(int(d)) for d in x1]) x2 = "".join([str(int(d)) for d in x2]) t = "".join([str(int(d[0])) for d in t]) if not y is None: y = "".join([str(int(d[0])) for d in y]) print("x1: {:s} {:2d}".format(x1, int("".join(reversed(x1)), 2))) print("x2: + {:s} {:2d} ".format(x2, int("".join(reversed(x2)), 2))) print(" ------- --") print("t: = {:s} {:2d}".format(t, int("".join(reversed(t)), 2))) if not y is None: print("y: = {:s}".format(t)) # Print the first sample printSample(X_train[0,:,0], X_train[0,:,1], T_train[0,:,:])
x1: 1010010 37
x2: + 1101010 43
t: = 0000101 80
在代碼中,每一個映射過程被抽象成了一個類。在每一個類中,都有一個 forward 函數用來計算BP算法中的前向傳播,backward 函數來計算BP算法中的反向傳播。
RNN的計算過程在神經網絡中,將輸入數據映射到下一層的常用方法是矩陣相乘并且加上偏差項,最后利用一個非線性函數進行激活操作。在這篇教程中,二維的輸入數據 (Xik1, Xik2) ,通過一個 2*3 的鏈接矩陣和長度是3的偏差向量映射到狀態(tài)層,即下一層。在狀態(tài)反饋之前,三維的狀態(tài)值,將會通過一個 3*1 的鏈接矩陣和長度是1的偏差向量映射到輸出層,即得到輸出概率。
因為我們想在一個計算步驟中,將訓練樣本中的每個樣本在每個時間點都進行狀態(tài)映射,所以我們將使用 Numpy 中的 tensordot 函數,去實現這個相乘的操作。這個函數需要輸入2個張量和指定需要累加的軸。比如,輸入數據為 shape(X) = (2000, 7, 2),狀態(tài)值為 shape(S) = (2000, 7, 3),鏈接矩陣為 shape(W) = (2, 3),那么我們可以得到公式 S = tensordot(X, W, axes = ((-1), (0)) 。這個公式會把 X 的最后一維 (-1) 和 W 的第零維度進行相乘累加,最后得到一個維度是 (2000, 7, 3) 的張量。
這個線性轉換可以用在輸入數據 X 到狀態(tài)層 S 的映射,也可以用在狀態(tài)層 S 到輸出層 Y 的映射。在代碼的 TensorLinear 類中,實現了這個線性轉換,還實現了它的梯度。根據Xavier Glorot的建議,權重初始值是一個均勻分布,數據范圍是:
Logistic分類函數將被在輸出層使用,用來得到輸出的概率值,在 LogisticClassifier 函數中實現了它的損失函數和梯度。
# Define the linear tensor transformation layer class TensorLinear(object): """The linear tensor layer applies a linear tensor dot product and a bias to its input.""" def __init__(self, n_in, n_out, tensor_order, W=None, b=None): """Initialse the weight W and bias b parameters.""" a = np.sqrt(6.0 / (n_in + n_out)) self.W = (np.random.uniform(-a, a, (n_in, n_out)) if W is None else W) self.b = (np.zeros((n_out)) if b is None else b) # Bias paramters self.bpAxes = tuple(range(tensor_order-1)) # Axes summed over in backprop def forward(self, X): """Perform forward step transformation with the help of a tensor product.""" # Same as: Y[i,j,:] = np.dot(X[i,j,:], self.W) + self.b (for i,j in X.shape[0:1]) # Same as: Y = np.einsum("ijk,kl->ijl", X, self.W) + self.b return np.tensordot(X, self.W, axes=((-1),(0))) + self.b def backward(self, X, gY): """Return the gradient of the parmeters and the inputs of this layer.""" # Same as: gW = np.einsum("ijk,ijl->kl", X, gY) # Same as: gW += np.dot(X[:,j,:].T, gY[:,j,:]) (for i,j in X.shape[0:1]) gW = np.tensordot(X, gY, axes=(self.bpAxes, self.bpAxes)) gB = np.sum(gY, axis=self.bpAxes) # Same as: gX = np.einsum("ijk,kl->ijl", gY, self.W.T) # Same as: gX[i,j,:] = np.dot(gY[i,j,:], self.W.T) (for i,j in gY.shape[0:1]) gX = np.tensordot(gY, self.W.T, axes=((-1),(0))) return gX, gW, gB
# Define the logistic classifier layer class LogisticClassifier(object): """The logistic layer applies the logistic function to its inputs.""" def forward(self, X): """Perform the forward step transformation.""" return 1 / (1 + np.exp(-X)) def backward(self, Y, T): """Return the gradient with respect to the cost function at the inputs of this layer.""" # Normalise of the number of samples and sequence length. return (Y - T) / (Y.shape[0] * Y.shape[1]) def cost(self, Y, T): """Compute the cost at the output.""" # Normalise of the number of samples and sequence length. # Add a small number (1e-99) because Y can become 0 if the network learns # to perfectly predict the output. log(0) is undefined. return - np.sum(np.multiply(T, np.log(Y+1e-99)) + np.multiply((1-T), np.log(1-Y+1e-99))) / (Y.shape[0] * Y.shape[1])
在上一部分教程中,我們知道隨著時間步長,我們需要把循環(huán)狀態(tài)進行展開處理。在代碼中,RecurrentStateUnfold 類實現了這個展開的BPTT算法。這個類包含了前一狀態(tài)層到當前狀態(tài)層的權重,偏差單元,當然也實現了權重初始化和優(yōu)化函數。
在 RecurrentStateUnfold 類中,forward 函數實現了隨著時間步長,狀態(tài)函數的迭代更新。backward 函數實現了每個輸出狀態(tài)值的梯度。在每個時間點 k 上,輸出層 Y 的梯度還需要加上上一狀態(tài)的梯度之和。權重項和偏差項的梯度需要將所有時間點上面的權重項和偏差項的梯度都進行累加,因為在每一個時間點它們的值都是共享的。在時間點 k = 0,最后狀態(tài)的梯度需要去優(yōu)化初始的狀態(tài) S0,因為初始狀體的梯度是 ?ξ/?S0 。
RecurrentStateUnfold 類需要使用 RecurrentStateUpdate 類。這個類中的 forward 方法實現了將 k-1 的狀態(tài)和 input 輸入進行聯(lián)合計算得到 k 時刻的狀態(tài)值。backward 方法實現了BPTT算法。在 RecurrentStateUpdate 類中實現的非線性激活函數是hyperbolic tangent (tanh)函數,這個函數的取值范圍是從 -1 到 +1。這個函數在 TanH 類中實現了。
# Define tanh layer class TanH(object): """TanH applies the tanh function to its inputs.""" def forward(self, X): """Perform the forward step transformation.""" return np.tanh(X) def backward(self, Y, output_grad): """Return the gradient at the inputs of this layer.""" gTanh = 1.0 - np.power(Y,2) return np.multiply(gTanh, output_grad)
# Define internal state update layer class RecurrentStateUpdate(object): """Update a given state.""" def __init__(self, nbStates, W, b): """Initialse the linear transformation and tanh transfer function.""" self.linear = TensorLinear(nbStates, nbStates, 2, W, b) self.tanh = TanH() def forward(self, Xk, Sk): """Return state k+1 from input and state k.""" return self.tanh.forward(Xk + self.linear.forward(Sk)) def backward(self, Sk0, Sk1, output_grad): """Return the gradient of the parmeters and the inputs of this layer.""" gZ = self.tanh.backward(Sk1, output_grad) gSk0, gW, gB = self.linear.backward(Sk0, gZ) return gZ, gSk0, gW, gB
# Define layer that unfolds the states over time class RecurrentStateUnfold(object): """Unfold the recurrent states.""" def __init__(self, nbStates, nbTimesteps): " Initialse the shared parameters, the inital state and state update function." a = np.sqrt(6.0 / (nbStates * 2)) self.W = np.random.uniform(-a, a, (nbStates, nbStates)) self.b = np.zeros((self.W.shape[0])) # Shared bias self.S0 = np.zeros(nbStates) # Initial state self.nbTimesteps = nbTimesteps # Timesteps to unfold self.stateUpdate = RecurrentStateUpdate(nbStates, self.W, self.b) # State update function def forward(self, X): """Iteratively apply forward step to all states.""" S = np.zeros((X.shape[0], X.shape[1]+1, self.W.shape[0])) # State tensor S[:,0,:] = self.S0 # Set initial state for k in range(self.nbTimesteps): # Update the states iteratively S[:,k+1,:] = self.stateUpdate.forward(X[:,k,:], S[:,k,:]) return S def backward(self, X, S, gY): """Return the gradient of the parmeters and the inputs of this layer.""" gSk = np.zeros_like(gY[:,self.nbTimesteps-1,:]) # Initialise gradient of state outputs gZ = np.zeros_like(X) # Initialse gradient tensor for state inputs gWSum = np.zeros_like(self.W) # Initialise weight gradients gBSum = np.zeros_like(self.b) # Initialse bias gradients # Propagate the gradients iteratively for k in range(self.nbTimesteps-1, -1, -1): # Gradient at state output is gradient from previous state plus gradient from output gSk += gY[:,k,:] # Propgate the gradient back through one state gZ[:,k,:], gSk, gW, gB = self.stateUpdate.backward(S[:,k,:], S[:,k+1,:], gSk) gWSum += gW # Update total weight gradient gBSum += gB # Update total bias gradient gS0 = np.sum(gSk, axis=0) # Get gradient of initial state over all samples return gZ, gWSum, gBSum, gS0
在 RnnBinaryAdder 類中,實現了整個二進制相加的網絡過程。它在創(chuàng)建的時候,同時初始化了所有的網絡參數。forward 方法實現了整個網絡的前向傳播過程,backward 方法實現了整個網絡的梯度更新和反向傳播過程。getParamGrads 方法計算了每一個參數的梯度,并且作為一個列表進行返回。get_params_iter 方法是將參數做一個索引排序,使得參數的梯度按照一定的順序返回。
# Define the full network class RnnBinaryAdder(object): """RNN to perform binary addition of 2 numbers.""" def __init__(self, nb_of_inputs, nb_of_outputs, nb_of_states, sequence_len): """Initialse the network layers.""" self.tensorInput = TensorLinear(nb_of_inputs, nb_of_states, 3) # Input layer self.rnnUnfold = RecurrentStateUnfold(nb_of_states, sequence_len) # Recurrent layer self.tensorOutput = TensorLinear(nb_of_states, nb_of_outputs, 3) # Linear output transform self.classifier = LogisticClassifier() # Classification output def forward(self, X): """Perform the forward propagation of input X through all layers.""" recIn = self.tensorInput.forward(X) # Linear input transformation # Forward propagate through time and return states S = self.rnnUnfold.forward(recIn) Z = self.tensorOutput.forward(S[:,1:sequence_len+1,:]) # Linear output transformation Y = self.classifier.forward(Z) # Get classification probabilities # Return: input to recurrent layer, states, input to classifier, output return recIn, S, Z, Y def backward(self, X, Y, recIn, S, T): """Perform the backward propagation through all layers. Input: input samples, network output, intput to recurrent layer, states, targets.""" gZ = self.classifier.backward(Y, T) # Get output gradient gRecOut, gWout, gBout = self.tensorOutput.backward(S[:,1:sequence_len+1,:], gZ) # Propagate gradient backwards through time gRnnIn, gWrec, gBrec, gS0 = self.rnnUnfold.backward(recIn, S, gRecOut) gX, gWin, gBin = self.tensorInput.backward(X, gRnnIn) # Return the parameter gradients of: linear output weights, linear output bias, # recursive weights, recursive bias, linear input weights, linear input bias, initial state. return gWout, gBout, gWrec, gBrec, gWin, gBin, gS0 def getOutput(self, X): """Get the output probabilities of input X.""" recIn, S, Z, Y = self.forward(X) return Y # Only return the output. def getBinaryOutput(self, X): """Get the binary output of input X.""" return np.around(self.getOutput(X)) def getParamGrads(self, X, T): """Return the gradients with respect to input X and target T as a list. The list has the same order as the get_params_iter iterator.""" recIn, S, Z, Y = self.forward(X) gWout, gBout, gWrec, gBrec, gWin, gBin, gS0 = self.backward(X, Y, recIn, S, T) return [g for g in itertools.chain( np.nditer(gS0), np.nditer(gWin), np.nditer(gBin), np.nditer(gWrec), np.nditer(gBrec), np.nditer(gWout), np.nditer(gBout))] def cost(self, Y, T): """Return the cost of input X w.r.t. targets T.""" return self.classifier.cost(Y, T) def get_params_iter(self): """Return an iterator over the parameters. The iterator has the same order as get_params_grad. The elements returned by the iterator are editable in-place.""" return itertools.chain( np.nditer(self.rnnUnfold.S0, op_flags=["readwrite"]), np.nditer(self.tensorInput.W, op_flags=["readwrite"]), np.nditer(self.tensorInput.b, op_flags=["readwrite"]), np.nditer(self.rnnUnfold.W, op_flags=["readwrite"]), np.nditer(self.rnnUnfold.b, op_flags=["readwrite"]), np.nditer(self.tensorOutput.W, op_flags=["readwrite"]), np.nditer(self.tensorOutput.b, op_flags=["readwrite"]))梯度檢查
# Do gradient checking # Define an RNN to test RNN = RnnBinaryAdder(2, 1, 3, sequence_len) # Get the gradients of the parameters from a subset of the data backprop_grads = RNN.getParamGrads(X_train[0:100,:,:], T_train[0:100,:,:]) eps = 1e-7 # Set the small change to compute the numerical gradient # Compute the numerical gradients of the parameters in all layers. for p_idx, param in enumerate(RNN.get_params_iter()): grad_backprop = backprop_grads[p_idx] # + eps param += eps plus_cost = RNN.cost(RNN.getOutput(X_train[0:100,:,:]), T_train[0:100,:,:]) # - eps param -= 2 * eps min_cost = RNN.cost(RNN.getOutput(X_train[0:100,:,:]), T_train[0:100,:,:]) # reset param value param += eps # calculate numerical gradient grad_num = (plus_cost - min_cost)/(2*eps) # Raise error if the numerical grade is not close to the backprop gradient if not np.isclose(grad_num, grad_backprop): raise ValueError("Numerical gradient of {:.6f} is not close to the backpropagation gradient of {:.6f}!".format(float(grad_num), float(grad_backprop))) print("No gradient errors found")
No gradient errors found
使用動量方法優(yōu)化Rmsprop在上一部分中,我們使用彈性反向傳播算法去優(yōu)化我們的網絡。在這個博客中,我們將使用動量方法來優(yōu)化Rmsprop。我們將原來的 Rprop 算法替換為 Rmsprop 算法,是因為 Rprop 算法在處理小批量數據上的效果并不是很好,可能會發(fā)生梯度翻轉的情況。
Rmsprop 算法是從 Rprop 算法中得到靈感的,它保留了對于每一個參數 θ 的平方梯度的平均移動,如下:
其中,λ 是一個平均移動參數。
注意,這個梯度不是直接被使用在參數的更新上面,而是用在每個參數的速度參數(Vs)上面的更新。這個參數和這篇博客中的動量部分中的速度參數很像,但是在使用的方法上面又有一點差異。Nesterov 的加速梯度和一般的動量方法是不同的,主要體現在更新迭代方面。常規(guī)的動量算法在每一次迭代的開始就計算梯度,并且更新速度參數。但是 Nesterov 的加速梯度算法是根據較少速度來計算梯度的值,然后再更新速度,最后再根據局部梯度進行移動。這種處理方法有一個優(yōu)點就是梯度在進行局部更新時將得到更多的信息,即使當前速度進行了一個錯誤的更新,該算法也能使梯度進行正確的計算。Nesterov 的更新可以如下計算:
其中,?(θ) 是一個在關于參數 θ 的局部梯度。比如,當前是第 i 次循環(huán),那么式子可以被表示為如下圖:
注意,我們不能保證一定會收斂到全局最小值,即 cost = 0。因為如果你在參數更新的開始取的位置不是很好,那么最后的優(yōu)化可能會取到局部最小值。而且訓練過程對參數 lmbd,learning_rate,mementum_term,eps 都很敏感。你可以嘗試一下以下代碼,看看運行多久可以達到收斂。
# Set hyper-parameters lmbd = 0.5 # Rmsprop lambda learning_rate = 0.05 # Learning rate momentum_term = 0.80 # Momentum term eps = 1e-6 # Numerical stability term to prevent division by zero mb_size = 100 # Size of the minibatches (number of samples) # Create the network nb_of_states = 3 # Number of states in the recurrent layer RNN = RnnBinaryAdder(2, 1, nb_of_states, sequence_len) # Set the initial parameters nbParameters = sum(1 for _ in RNN.get_params_iter()) # Number of parameters in the network maSquare = [0.0 for _ in range(nbParameters)] # Rmsprop moving average Vs = [0.0 for _ in range(nbParameters)] # Velocity # Create a list of minibatch costs to be plotted ls_of_costs = [RNN.cost(RNN.getOutput(X_train[0:100,:,:]), T_train[0:100,:,:])] # Iterate over some iterations for i in range(5): # Iterate over all the minibatches for mb in range(nb_train/mb_size): X_mb = X_train[mb:mb+mb_size,:,:] # Input minibatch T_mb = T_train[mb:mb+mb_size,:,:] # Target minibatch V_tmp = [v * momentum_term for v in Vs] # Update each parameters according to previous gradient for pIdx, P in enumerate(RNN.get_params_iter()): P += V_tmp[pIdx] # Get gradients after following old velocity backprop_grads = RNN.getParamGrads(X_mb, T_mb) # Get the parameter gradients # Update each parameter seperately for pIdx, P in enumerate(RNN.get_params_iter()): # Update the Rmsprop moving averages maSquare[pIdx] = lmbd * maSquare[pIdx] + (1-lmbd) * backprop_grads[pIdx]**2 # Calculate the Rmsprop normalised gradient pGradNorm = learning_rate * backprop_grads[pIdx] / np.sqrt(maSquare[pIdx] + eps) # Update the momentum velocity Vs[pIdx] = V_tmp[pIdx] - pGradNorm P -= pGradNorm # Update the parameter ls_of_costs.append(RNN.cost(RNN.getOutput(X_mb), T_mb)) # Add cost to list to plot
# Plot the cost over the iterations plt.plot(ls_of_costs, "b-") plt.xlabel("minibatch iteration") plt.ylabel("$xi$", fontsize=15) plt.title("Decrease of cost over backprop iteration") plt.grid() plt.show()測試網絡
# Create test samples nb_test = 5 Xtest, Ttest = create_dataset(nb_test, sequence_len) # Push test data through network Y = RNN.getBinaryOutput(Xtest) Yf = RNN.getOutput(Xtest) # Print out all test examples for i in range(Xtest.shape[0]): printSample(Xtest[i,:,0], Xtest[i,:,1], Ttest[i,:,:], Y[i,:,:]) print ""
x1: 0100010 34
x2: + 1100100 19
------- --
t: = 1010110 53
y: = 1010110
x1: 1010100 21
x2: + 1110100 23
------- --
t: = 0011010 44
y: = 0011010
x1: 1111010 47
x2: + 0000000 0
------- --
t: = 1111010 47
y: = 1111010
x1: 1000000 1
x2: + 1111110 63
------- --
t: = 0000001 64
y: = 0000001
x1: 1010100 21
x2: + 1010100 21
------- --
t: = 0101010 42
y: = 0101010
