This is the theano in build in method that i utilize to build RNN .all vaiables and input output args are theano shared variable
you can use it in your theano in-build function before compile
make fun~~~~
# -*- coding: utf-8 -*-
__author__ = 'benywon'
import numpy as np
import theano
import theano.tensor as T
dtype = theano.config.floatX
sigma = lambda x: 1 / (1 + T.exp(-x))
rng = np.random.RandomState(1991)
theano.config.exception_verbosity = 'high'
"""
this is the main function to utilize theano to build a RNN layer
the input should be a theano shared variable
and the output is either a theano shared variable or update parameter
you can use it directly in your model building process
"""
"""
after long period experience
I noticed that the best inner active function is sigmoid
however the hidden to output is acv
"""
class RNN:
"""
base class for all recurrent model
"""
def __init__(self,
N_out=2,
N_in=None,
W_initiation='svd',
N_hidden=50,
only_return_final=False,
backwards=False,
contain_output=False):
self.contain_output = contain_output
self.backwards = backwards
self.only_return_final = only_return_final
self.N_in = N_in
self.N_out = N_out
self.N_hidden = N_hidden
self.W_initiation = W_initiation
# standard rnn parameter
self.b_h = theano.shared(np.zeros(N_hidden, dtype=dtype))
self.h0 = theano.shared(np.zeros(N_hidden, dtype=dtype))
self.W_ih = theano.shared(self.sample_weights(N_in, N_hidden))
self.W_hh = theano.shared(self.sample_weights(N_hidden, N_hidden))
self.W_ho = theano.shared(self.sample_weights(N_hidden, N_out))
self.b_o = theano.shared(np.zeros(N_out, dtype=dtype))
self.params = [self.W_ih, self.W_hh, self.b_h, self.h0]
self.h_vals = None
if self.contain_output:
self.params.extend([self.W_ho, self.b_o])
self.y_vals = None
def build(self, matrix_in):
step_fun = self.one_step if self.contain_output else self.one_step_no_output
[self.h_vals, self.y_vals], _ = theano.scan(fn=step_fun,
sequences=dict(input=matrix_in, taps=[0]),
outputs_info=[self.h0, None], # corresponds to return type of fn
non_sequences=[self.W_ih, self.W_hh, self.b_h, self.W_ho, self.b_o])
def one_step(x_t, h_tm1, W_ih, W_hh, b_h, W_ho, b_o):
h_t = T.tanh(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)
y_t = theano.dot(h_t, W_ho) + b_o
y_t = sigma(y_t)
return [h_t, y_t]
def one_step_no_output(x_t, h_tm1, W_ih, W_hh, b_h, W_ho, b_o):
"""
function that did not calculate the output data
"""
h_t = T.tanh(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)
return [h_t, h_t]
def sample_weights(self, sizeX, sizeY):
"""
it has been proved that the max singular value of a matirx can not
exceed 1 for the non exploding RNN issues
:param sizeY: the initiation matrix size y
:param sizeX:the initiation matrix size x
:return: the svd matrix remove max value
"""
if self.W_initiation == 'random':
return rng.normal([sizeX, sizeY])
else:
values = np.ndarray([sizeX, sizeY], dtype=dtype)
for dx in xrange(sizeX):
vals = np.random.uniform(low=-1., high=1., size=(sizeY,))
# vals_norm = np.sqrt((vals**2).sum())
# vals = vals / vals_norm
values[dx, :] = vals
_, svs, _ = np.linalg.svd(values)
# svs[0] is the largest singular value
values = values / svs[0]
return values
def get_parameter(self):
return self.params
def get_hidden(self):
if self.only_return_final:
return self.h_vals[-1]
else:
return self.h_vals
def get_output(self):
if self.only_return_final:
return self.y_vals[-1]
else:
return self.y_vals
class LSTM(RNN):
"""
this is my implementation of lstm
borrow heavily from this blog
http://christianherta.de/lehre/dataScience/machineLearning/neuralNetworks/LSTM.php
you can get your own parameters
"""
def __init__(self,
b_i_init=(-0.5, 0.5),
b_o_init=(-0.5, 0.5),
b_f_init=(0., 1.),
act=T.tanh,
**kwargs):
# init parent attributes
RNN.__init__(self, **kwargs)
self.act = act
self.W_xi = theano.shared(self.sample_weights(self.N_in, self.N_hidden))
self.W_hi = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.W_ci = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.b_i = theano.shared(np.cast[dtype](np.random.uniform(b_i_init[0], b_i_init[1], size=self.N_hidden)))
self.W_xf = theano.shared(self.sample_weights(self.N_in, self.N_hidden))
self.W_hf = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.W_cf = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.b_f = theano.shared(np.cast[dtype](np.random.uniform(b_f_init[0], b_f_init[1], size=self.N_hidden)))
self.W_xc = theano.shared(self.sample_weights(self.N_in, self.N_hidden))
self.W_hc = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.b_c = theano.shared(np.zeros(self.N_hidden, dtype=dtype))
self.W_xo = theano.shared(self.sample_weights(self.N_in, self.N_hidden))
self.W_ho = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.W_co = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))
self.b_o = theano.shared(np.cast[dtype](np.random.uniform(b_o_init[0], b_o_init[1], size=self.N_hidden)))
self.W_hy = theano.shared(self.sample_weights(self.N_hidden, self.N_out))
self.b_y = theano.shared(np.zeros(self.N_out, dtype=dtype))
self.c0 = theano.shared(np.zeros(self.N_hidden, dtype=dtype))
self.h0 = T.tanh(self.c0)
self.c_vals = None
self.params = [self.W_xi, self.W_hi, self.W_ci, self.b_i, self.W_xf, self.W_hf, self.W_cf, self.b_f, self.W_xc,
self.W_hc, self.b_c, self.W_xo, self.W_ho, self.W_co, self.b_o, self.c0]
if self.contain_output:
self.params.extend([self.W_hy, self.b_y])
def build(self, matrix_in):
if self.backwards:
matrix_in = matrix_in[::-1]
lstm_step_fun = self.one_lstm_step if self.contain_output else self.one_lstm_step_no_output
[self.h_vals, self.c_vals, self.y_vals], _ = theano.scan(fn=lstm_step_fun,
sequences=dict(input=matrix_in, taps=[0]),
outputs_info=[self.h0, self.c0, None],
non_sequences=[self.W_xi, self.W_hi, self.W_ci,
self.b_i,
self.W_xf,
self.W_hf, self.W_cf, self.b_f,
self.W_xc,
self.W_hc,
self.b_c, self.W_xo, self.
W_ho, self.W_co, self.b_o, self.W_hy, self.b_y])
def get_cell(self):
if not self.only_return_final:
return self.c_vals
else:
return self.c_vals[-1]
def one_lstm_step(self, x_t, h_tm1, c_tm1, W_xi, W_hi, W_ci, b_i, W_xf, W_hf, W_cf, b_f, W_xc, W_hc, b_c, W_xy,
W_ho, W_cy, b_o, W_hy, b_y):
"""
this is the inner step for calc lstm
remember that we use sigma function to make sure the output lie in sigmoid 0-1
:return: the hidden and c_t y_t state
"""
i_t = sigma(theano.dot(x_t, W_xi) + theano.dot(h_tm1, W_hi) + theano.dot(c_tm1, W_ci) + b_i)
f_t = sigma(theano.dot(x_t, W_xf) + theano.dot(h_tm1, W_hf) + theano.dot(c_tm1, W_cf) + b_f)
c_t = f_t * c_tm1 + i_t * self.act(theano.dot(x_t, W_xc) + theano.dot(h_tm1, W_hc) + b_c)
o_t = sigma(theano.dot(x_t, self.W_xo) + theano.dot(h_tm1, W_ho) + theano.dot(c_t, self.W_co) + b_o)
h_t = o_t * self.act(c_t)
y_t = sigma(theano.dot(h_t, W_hy) + b_y)
return [h_t, c_t, y_t]
def one_lstm_step_no_output(self, x_t, h_tm1, c_tm1, W_xi, W_hi, W_ci, b_i, W_xf, W_hf, W_cf, b_f, W_xc, W_hc, b_c,
W_xy,
W_ho, W_cy, b_o, W_hy, b_y):
i_t = sigma(theano.dot(x_t, W_xi) + theano.dot(h_tm1, W_hi) + theano.dot(c_tm1, W_ci) + b_i)
f_t = sigma(theano.dot(x_t, W_xf) + theano.dot(h_tm1, W_hf) + theano.dot(c_tm1, W_cf) + b_f)
c_t = f_t * c_tm1 + i_t * self.act(theano.dot(x_t, W_xc) + theano.dot(h_tm1, W_hc) + b_c)
o_t = sigma(theano.dot(x_t, self.W_xo) + theano.dot(h_tm1, W_ho) + theano.dot(c_t, self.W_co) + b_o)
h_t = o_t * self.act(c_t)
return [h_t, c_t, o_t]
class GRU():
def __init__(self,
N_hidden=15,
N_in=None,
N_out=2,
act=T.tanh,
w_initiation='svd',
only_return_final=False,
backwards=False,
contain_output=False):
self.N_hidden = N_hidden
self.N_in = N_in
if __name__ == '__main__':
a=LSTM(N_in=10,N_hidden=50,only_return_final=False,backwards=False)
ain=np.ones(60).reshape((6,10))
ins=T.matrix('ins')
a.build(ins)
hid=a.get_hidden()
params=a.get_parameter()
fun=theano.function([ins],hid)
oo=fun(ain)
print oo
回复列表: