# -*- coding: utf-8 -*-
import theano.tensor as T
import theano
import numpy as np
__author__ = 'Bingning Wang'
__mail__ = 'research@bingning.wang'
"""
this is our
"""
rng = np.random.RandomState(1991)
class RNN:
def __init__(self,
n_in=None,
weight_initiation='svd',
n_hidden=50,
only_return_final=False,
return_method='ave',
backwards=False,
ignore_zero=False,
learn_hidden_init=False,
dtype=theano.config.floatX,
name='RNN',
inner_activation=T.tanh,
**kwargs
):
self.name = name
self.return_method = return_method
self.inner_activation = inner_activation
self.learn_hidden_init = learn_hidden_init
self.ignore_zero = ignore_zero
self.backwards = backwards
self.only_return_final = only_return_final
self.n_hidden = n_hidden
self.W_initiation = weight_initiation
self.n_in = n_in
self.dtype = dtype
self.batch_mode = False
# parameter
self.b_h = theano.shared(np.zeros(n_hidden, dtype=self.dtype), name=self.name + 'b_h')
self.W_ih = self.init_one_parameter(shape=[n_in, n_hidden], name=self.name + 'W_ih')
self.W_hh = self.init_one_parameter(shape=[n_hidden, n_hidden], name=self.name + 'W_hh')
self._h0 = theano.shared(np.zeros(n_hidden, dtype=self.dtype), name=self.name + '_h0')
self.params = [self.W_ih, self.W_hh, self.b_h]
if learn_hidden_init:
self.params.append(self._h0)
self.h_vals = None
self.__updates = None
def set_h0(self, h0):
assert not self.learn_hidden_init
self._h0 = h0
def reset_h0(self):
self._h0 = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + '_h0')
@property
def h0(self):
return self._h0
def init_one_parameter(self, shape=None, name='None'):
if shape is None:
shape = []
if len(shape) == 2:
return theano.shared(self.sample_weights(shape[0], shape[1]), name=name)
return theano.shared(np.random.randn(shape[0]), name=name)
def sample_weights(self, sizeX, sizeY):
"""
it has been proved that the max singular value of a matirx can not
exceed 1 for the non exploding RNN issues
:param sizeY: the initiation matrix size y
:param sizeX:the initiation matrix size x
:return: the svd matrix remove max values
"""
if self.W_initiation == 'random':
return rng.normal(size=[sizeX, sizeY])
else:
values = np.ndarray([sizeX, sizeY], dtype=self.dtype)
for dx in xrange(sizeX):
vals = np.random.uniform(low=-1., high=1., size=(sizeY,))
values[dx, :] = vals
_, svs, _ = np.linalg.svd(values)
# svs[0] is the largest singular value
values = values / svs[0]
return values
@property
def updates(self):
return self.__updates
def __call__(self, inputs, batch_mode=False, **kwargs):
self.batch_mode = batch_mode
if batch_mode:
self.sample_size = inputs.shape[0]
inputs = inputs.dimshuffle(1, 0, 2)
outputs_info = self.get_initiation()
non_sequence = self.get_sequences()
step_fun = self.one_step
self.h_vals = None
self.h_vals, self.__updates = theano.scan(fn=step_fun,
sequences=inputs,
outputs_info=outputs_info,
go_backwards=self.backwards,
non_sequences=non_sequence)
return self.hidden_states
def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h):
h_t = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)
return h_t
@property
def hidden_states(self):
if self.only_return_final:
if self.return_method == 'ave':
return T.mean(self.h_vals, axis=0)
elif self.return_method == 'max':
return T.max(self.h_vals, axis=0)
else:
return self.h_vals[-1]
return self.h_vals
@property
def parameter(self):
return self.params
def get_initiation(self):
if self.batch_mode:
return [T.alloc(self.h0, self.sample_size, self.n_hidden)]
return [self.h0]
def get_sequences(self):
return [self.W_ih, self.W_hh, self.b_h]
class RNN_DECODER(RNN):
def __init__(self, **kwargs):
RNN.__init__(self, **kwargs)
self.projection_matrix_hidden = self.init_one_parameter(shape=[self.n_hidden, self.n_in],
name=self.name + 'projection_matrix_hidden')
self.params.append(self.projection_matrix_hidden)
def decode(self, start_hidden, max_length):
self.set_h0(start_hidden)
outputs_info = self.get_initiation()
non_sequence = self.get_sequences()
step_fun = self.one_step
self.h_vals = None
self.h_vals, self.__updates = theano.scan(fn=step_fun,
n_steps=max_length,
outputs_info=outputs_info,
go_backwards=self.backwards,
non_sequences=non_sequence)
return self.h_vals
def get_initiation(self):
xt = theano.shared(np.zeros(self.n_in, dtype=self.dtype), name=self.name + '_x0_')
return [xt, self.h0]
def get_sequences(self):
return [self.W_ih, self.W_hh, self.b_h, self.projection_matrix_hidden]
def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h, projection):
h_t = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)
word_project = T.tanh(T.dot(h_t, projection))
# prediction_word_probability = T.nnet.softmax_graph(T.dot(EmbeddingMatrix, word_project))
# predicted_word = T.argmax(prediction_word_probability)
# output_word_embedding = EmbeddingMatrix[predicted_word]
return [word_project, h_t]
class GRU(RNN):
def __init__(self, gate_activation=T.nnet.sigmoid, **kwargs):
# init parent attributes
RNN.__init__(self, **kwargs)
self.gate_activation = gate_activation
self.W_iz = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_iz')
self.W_hz = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hz')
self.W_ir = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_ir')
self.W_hr = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hr')
self.b_z = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_z')
self.b_r = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_r')
self.params.extend([self.W_iz, self.W_hz, self.W_ir, self.W_hr, self.b_z, self.b_r])
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h]
def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h):
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t = (1 - zt) * h_tm1 + zt * ht_hat
return h_t
class GRU_DECODER(GRU):
def __init__(self, **kwargs):
GRU.__init__(self, **kwargs)
self.projection_matrix_hidden = self.init_one_parameter(shape=[self.n_hidden, self.n_in],
name=self.name + 'projection_matrix_hidden')
self.params.append(self.projection_matrix_hidden)
self.x0 = theano.shared(np.zeros(self.n_in, dtype=self.dtype), name=self.name + '_x0_')
self.Embedding = None
def decode(self, start_hidden, max_length, Embedding, x0=None):
if x0 is not None:
self.x0 = x0
self.Embedding = Embedding
self.set_h0(start_hidden)
outputs_info = self.get_initiation()
non_sequence = self.get_sequences()
step_fun = self.one_step
self.h_vals = None
self.h_vals, self.__updates = theano.scan(fn=step_fun,
n_steps=max_length,
outputs_info=outputs_info,
go_backwards=self.backwards,
non_sequences=non_sequence)
return self.h_vals[0]
def get_initiation(self):
return [self.x0, self.h0]
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h, self.projection_matrix_hidden]
def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h,
projection):
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t = (1 - zt) * h_tm1 + zt * ht_hat
word_project = T.tanh(T.dot(h_t, projection))
return [word_project, h_t]
class RNN_ATTENTION:
def __init__(self, n_attention=100, need_attention=True, **kwargs):
self.n_attention = n_attention
self.attention = T.zeros(shape=[self.n_attention])
self.need_attention = need_attention
def set_attention(self, attention):
self.attention = attention
def reset_attention(self):
self.attention = T.zeros(shape=[self.n_attention])
class IAGRU_GATE(GRU, RNN_ATTENTION):
def __init__(self,
use_separate_weight_for_GRU_inner_gate=False,
**kwargs):
# init parent attributes
GRU.__init__(self, **kwargs)
RNN_ATTENTION.__init__(self, **kwargs)
# the attention gate to the inner activation
self.M_az = self.init_one_parameter(shape=[self.n_attention, self.n_hidden], name=self.name + 'M_az')
self.params.append(self.M_az)
if use_separate_weight_for_GRU_inner_gate:
self.M_ar = self.init_one_parameter(shape=[self.n_attention, self.n_hidden], name=self.name + 'M_ar')
self.params.append(self.M_ar)
else:
self.M_ar = self.M_az
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h, self.M_ar, self.M_az, self.attention]
def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, M_ar, M_az, attention):
if self.need_attention:
attention_r = theano.dot(attention, M_ar)
attention_z = theano.dot(attention, M_az)
else:
attention_r = attention_z = T.zeros(shape=[self.n_hidden])
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z + attention_r)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r + attention_z)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t = (1 - zt) * h_tm1 + zt * ht_hat
return h_t
class HighWay(RNN, RNN_ATTENTION):
def __init__(self, **kwargs):
# init parent attributes
RNN.__init__(self, **kwargs)
RNN_ATTENTION.__init__(self, **kwargs)
self.W_highway_i = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_highway_i')
self.W_highway_a = self.init_one_parameter(shape=[self.n_attention, self.n_hidden],
name=self.name + 'W_highway_a')
self.M_highway_p = self.init_one_parameter(shape=[self.n_in, self.n_hidden],
name=self.name + 'M_highway_p')
# self.params.append(self.M_highway_p)
self.params.append(self.W_highway_i)
if self.need_attention:
self.params.append(self.W_highway_a)
def get_sequences(self):
return [self.W_ih, self.W_hh, self.b_h, self.W_highway_i, self.W_highway_a, self.attention]
def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h, W_h_i, W_h_a, attention):
h_t_hat = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)
trans_gate_un_norm = T.dot(x_t, W_h_i)
if self.need_attention:
trans_gate_un_norm += T.dot(attention, W_h_a)
TransformGate = T.nnet.sigmoid(trans_gate_un_norm)
h_t = (1 - TransformGate) * h_t_hat + TransformGate * T.dot(x_t, self.M_highway_p)
return h_t
class HighWayGRU(GRU, RNN_ATTENTION):
def __init__(self, **kwargs):
# init parent attributes
GRU.__init__(self, **kwargs)
RNN_ATTENTION.__init__(self, **kwargs)
self.W_highway_i = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_highway_i')
self.W_highway_a = self.init_one_parameter(shape=[self.n_attention, self.n_hidden],
name=self.name + 'W_highway_a')
self.M_highway_p = self.init_one_parameter(shape=[self.n_in, self.n_hidden],
name=self.name + 'M_highway_p')
self.params.append(self.M_highway_p)
self.params.append(self.W_highway_i)
if self.need_attention:
self.params.append(self.W_highway_a)
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h, self.W_highway_i, self.W_highway_a, self.attention]
def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, W_h_i, W_h_a, attention):
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t_hat = (1 - zt) * h_tm1 + zt * ht_hat
trans_gate_un_norm = T.dot(x_t, W_h_i)
if self.need_attention:
trans_gate_un_norm += T.dot(attention, W_h_a)
TransformGate = T.nnet.sigmoid(trans_gate_un_norm)
h_t = (1 - TransformGate) * h_t_hat + TransformGate * T.dot(x_t, self.M_highway_p)
return h_t
class IAGRU_WORD(GRU, RNN_ATTENTION):
def __init__(self,
**kwargs):
# init parent attributes
GRU.__init__(self, **kwargs)
RNN_ATTENTION.__init__(self, **kwargs)
self.M_qi = self.init_one_parameter(shape=[self.n_attention, self.n_in], name=self.name + 'M_qi')
self.params.extend([self.M_qi])
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h, self.M_qi, self.attention]
def one_step(self, x_t_original, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, M_qi, attention):
at = T.nnet.sigmoid(theano.dot(theano.dot(attention, M_qi), x_t_original))
x_t = at * x_t_original
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t = (1 - zt) * h_tm1 + zt * ht_hat
return h_t
class IAGRU_CONTEXT(GRU, RNN_ATTENTION):
def __init__(self,
**kwargs):
# init parent attributes
GRU.__init__(self, **kwargs)
RNN_ATTENTION.__init__(self, **kwargs)
self.M_hc = self.init_one_parameter(shape=[self.n_hidden, self.n_in], name=self.name + 'M_hc')
self.M_qc = self.init_one_parameter(shape=[self.n_attention, self.n_in], name=self.name + 'M_qc')
self.params.extend([self.M_hc, self.M_qc])
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h, self.M_hc, self.M_qc, self.attention]
def one_step(self, x_t_original, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, M_hc, M_qc, attention):
w_c = T.tanh(theano.dot(h_tm1, M_hc) + theano.dot(attention, M_qc))
at = T.nnet.sigmoid(theano.dot(w_c, x_t_original))
x_t = at * x_t_original
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t = (1 - zt) * h_tm1 + zt * ht_hat
return h_t
class OAGRU(GRU, RNN_ATTENTION):
def __init__(self,
**kwargs):
# init parent attributes
GRU.__init__(self, **kwargs)
RNN_ATTENTION.__init__(self, **kwargs)
self.W_hm = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hm')
self.W_qm = self.init_one_parameter(shape=[self.n_attention, self.n_hidden], name=self.name + 'W_qm')
self.w_ms = theano.shared(np.random.uniform(size=[self.n_hidden]), name=self.name + 'w_ms')
self.attention = T.zeros(shape=[self.n_attention])
self.params.extend([self.W_hm, self.W_qm, self.w_ms])
def __call__(self, inputs, is_attention_resource=True):
outputs_info = self.get_initiation()
non_sequence = self.get_sequences()
step_fun = self.one_step
self.h_vals, _ = theano.scan(fn=step_fun,
sequences=inputs,
outputs_info=outputs_info,
go_backwards=self.backwards,
non_sequences=non_sequence)
if is_attention_resource:
return self.hidden_states
WqmOq = T.dot(self.W_qm, self.attention)
Saq_before_softmax = T.tanh(T.dot(self.h_vals, self.W_hm) + WqmOq)
Saq = T.nnet.softmax(T.dot(Saq_before_softmax, self.w_ms))
Oa = T.dot(T.flatten(Saq), self.h_vals)
return Oa
class SequenceDecoder(IAGRU_GATE):
def __init__(self,
**kwargs):
# init parent attributes
IAGRU_GATE.__init__(self, **kwargs)
self.previous_h = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype))
self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))
def set_previous_h(self, previous_h):
self.previous_h = previous_h
def reset_x_t(self):
self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))
def set_x_t(self, x_t):
self.x_t = x_t
def get_previous_h(self):
return self.previous_h
def get_now(self):
attention_r = theano.dot(self.attention, self.M_ar)
attention_z = theano.dot(self.attention, self.M_az)
zt = self.gate_activation(
theano.dot(self.x_t, self.W_iz) + theano.dot(self.previous_h, self.W_hz) + self.b_z + attention_r)
rt = self.gate_activation(
theano.dot(self.x_t, self.W_ir) + theano.dot(self.previous_h, self.W_hr) + self.b_r + attention_z)
rtht_1 = rt * self.previous_h
ht_hat = T.tanh(theano.dot(self.x_t, self.W_ih) + theano.dot(rtht_1, self.W_hh) + self.b_h)
h_t = (1 - zt) * self.previous_h + zt * ht_hat
self.previous_h = h_t
return h_t
class SequenceDecoderGRU(GRU):
def __init__(self,
**kwargs):
# init parent attributes
GRU.__init__(self, **kwargs)
self.previous_h = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype))
self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))
def set_previous_h(self, previous_h):
self.previous_h = previous_h
def reset_x_t(self):
self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))
def set_x_t(self, x_t):
self.x_t = x_t
def get_previous_h(self):
return self.previous_h
@property
def hidden(self):
zt = self.gate_activation(theano.dot(self.x_t, self.W_iz) + theano.dot(self.previous_h, self.W_hz) + self.b_z)
rt = self.gate_activation(theano.dot(self.x_t, self.W_ir) + theano.dot(self.previous_h, self.W_hr) + self.b_r)
rtht_1 = rt * self.previous_h
ht_hat = T.tanh(theano.dot(self.x_t, self.W_ih) + theano.dot(rtht_1, self.W_hh) + self.b_h)
h_t = (1 - zt) * self.previous_h + zt * ht_hat
return h_t
class LSTM(RNN):
def __init__(self,
**kwargs):
# init parent attributes
RNN.__init__(self, **kwargs)
self.W_xi = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xi')
self.W_hi = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hi')
self.W_ci = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_ci')
self.b_i = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_i')
self.W_xf = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xf')
self.W_hf = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hf')
self.W_cf = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_cf')
self.b_f = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_f')
self.W_xc = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xc')
self.W_hc = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hc')
self.b_c = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_c')
self.W_xo = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xo')
self.W_ho = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_ho')
self.W_co = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_co')
self.b_o = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_o')
self.c0 = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype))
self.params = []
self.params.extend(
[self.W_xi, self.W_hi, self.W_ci, self.b_i, self.W_xf, self.W_hf, self.W_cf, self.b_f, self.W_xc,
self.W_hc, self.b_c, self.W_xo, self.W_ho, self.W_co, self.b_o])
def get_sequences(self):
return [self.W_xi, self.W_hi, self.W_ci,
self.b_i,
self.W_xf,
self.W_hf, self.W_cf, self.b_f,
self.W_xc,
self.W_hc,
self.b_c, self.W_xo, self.W_ho, self.W_co, self.b_o]
def get_initiation(self):
return [self._h0, self.c0]
def one_step(self, x_t, h_tm1, c_tm1, W_xi, W_hi, W_ci, b_i, W_xf, W_hf, W_cf, b_f, W_xc, W_hc, b_c, W_xo,
W_ho, W_co, b_o):
"""
this is the inner step for calc lstm
remember that we use sigma function to make sure the output normalized to 0-1
:return: the hidden and c_t state
"""
i_t = T.nnet.sigmoid(theano.dot(x_t, W_xi) + theano.dot(h_tm1, W_hi) + theano.dot(c_tm1, W_ci) + b_i)
f_t = T.nnet.sigmoid(theano.dot(x_t, W_xf) + theano.dot(h_tm1, W_hf) + theano.dot(c_tm1, W_cf) + b_f)
c_t = f_t * c_tm1 + i_t * self.inner_activation(theano.dot(x_t, W_xc) + theano.dot(h_tm1, W_hc) + b_c)
o_t = T.nnet.sigmoid(theano.dot(x_t, W_xo) + theano.dot(h_tm1, W_ho) + theano.dot(c_t, W_co) + b_o)
h_t = o_t * self.inner_activation(c_t)
return [h_t, c_t]
@property
def hidden_states(self):
[_h_vals, _] = self.h_vals
if self.only_return_final:
if self.return_method == 'ave':
return T.mean(_h_vals, axis=0)
elif self.return_method == 'max':
return T.max(_h_vals, axis=0)
else:
return _h_vals[-1]
return _h_vals
class RNNResidual(RNN):
def __init__(self, level=None,
**kwargs):
# init parent attributes
RNN.__init__(self, **kwargs)
if level is None:
level = []
self.level = level
if 'word' in self.level:
self.W_residual_is = self.init_one_parameter(shape=[self.n_in, self.n_hidden],
name=self.name + 'W_residual_word')
self.parameter.append(self.W_residual_is)
self.context = T.zeros(shape=[self.n_hidden])
def set_attention(self, context):
self.context = context
def reset_attention(self):
self.context = T.zeros(shape=[self.n_hidden])
def get_sequences(self):
return [self.W_ih, self.W_hh, self.b_h, self.context]
def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h, context):
h_t = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)
if 'word' in self.level:
h_t += theano.dot(x_t, self.W_residual_is)
if 'sentence' in self.level:
h_t += h_tm1
if 'context' in self.level:
h_t += context
return h_t
class GRUResidual(RNNResidual, GRU):
def __init__(self,
**kwargs):
# init parent attributes
RNNResidual.__init__(self, **kwargs)
GRU.__init__(self, **kwargs)
def get_sequences(self):
return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,
self.b_r, self.W_ih, self.W_hh, self.b_h, self.context]
def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, context):
zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)
rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)
rtht_1 = rt * h_tm1
ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)
h_t = (1 - zt) * h_tm1 + zt * ht_hat
if 'word' in self.level:
h_t += theano.dot(x_t, self.W_residual_is)
if 'sentence' in self.level:
h_t += h_tm1
if 'context' in self.level:
h_t += context
return h_t
if __name__ == '__main__':
sentence_number = 300
sentence_length = 25
embedding_size = 300
hidden_size = 200
ain = rng.normal(size=(sentence_number, sentence_length, embedding_size))
in_vector = T.tensor3('inv') # this should be replaced by your theano shared variable or T input
rnn = GRU(n_hidden=hidden_size, n_in=embedding_size, only_return_final=True, return_method='max',
learn_hidden_init=False)
hidden = rnn(in_vector, batch_mode=True)
hidden_one = rnn(in_vector[0], batch_mode=False)
loss = T.maximum(0, 0.5 - hidden_one)
fun = theano.function([in_vector], outputs=[hidden, hidden_one, loss], on_unused_input='ignore',
allow_input_downcast=True)
print 'build done'
for i in range(1000):
sentence_length = np.random.random_integers(12, 38)
ain = rng.normal(size=(sentence_number, sentence_length, embedding_size))
hidden_output = fun(ain)
print i
回复列表: