RNN

# -*- coding: utf-8 -*-


import theano.tensor as T

import theano

import numpy as np


__author__ = 'Bingning Wang'

__mail__ = 'research@bingning.wang'

"""

this is our

"""


rng = np.random.RandomState(1991)



class RNN:

    def __init__(self,

                 n_in=None,

                 weight_initiation='svd',

                 n_hidden=50,

                 only_return_final=False,

                 return_method='ave',

                 backwards=False,

                 ignore_zero=False,

                 learn_hidden_init=False,

                 dtype=theano.config.floatX,

                 name='RNN',

                 inner_activation=T.tanh,

                 **kwargs

                 ):

        self.name = name

        self.return_method = return_method

        self.inner_activation = inner_activation

        self.learn_hidden_init = learn_hidden_init

        self.ignore_zero = ignore_zero

        self.backwards = backwards

        self.only_return_final = only_return_final

        self.n_hidden = n_hidden

        self.W_initiation = weight_initiation

        self.n_in = n_in

        self.dtype = dtype

        self.batch_mode = False

        # parameter

        self.b_h = theano.shared(np.zeros(n_hidden, dtype=self.dtype), name=self.name + 'b_h')

        self.W_ih = self.init_one_parameter(shape=[n_in, n_hidden], name=self.name + 'W_ih')

        self.W_hh = self.init_one_parameter(shape=[n_hidden, n_hidden], name=self.name + 'W_hh')

        self._h0 = theano.shared(np.zeros(n_hidden, dtype=self.dtype), name=self.name + '_h0')

        self.params = [self.W_ih, self.W_hh, self.b_h]

        if learn_hidden_init:

            self.params.append(self._h0)

        self.h_vals = None

        self.__updates = None


    def set_h0(self, h0):

        assert not self.learn_hidden_init

        self._h0 = h0


    def reset_h0(self):

        self._h0 = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + '_h0')


    @property

    def h0(self):

        return self._h0


    def init_one_parameter(self, shape=None, name='None'):

        if shape is None:

            shape = []

        if len(shape) == 2:

            return theano.shared(self.sample_weights(shape[0], shape[1]), name=name)

        return theano.shared(np.random.randn(shape[0]), name=name)


    def sample_weights(self, sizeX, sizeY):

        """

        it has been proved that the max singular value of a matirx can not

        exceed 1 for the non exploding RNN issues

        :param sizeY: the initiation matrix size y

        :param sizeX:the initiation matrix size x

        :return: the svd matrix remove max values

        """

        if self.W_initiation == 'random':

            return rng.normal(size=[sizeX, sizeY])

        else:

            values = np.ndarray([sizeX, sizeY], dtype=self.dtype)

            for dx in xrange(sizeX):

                vals = np.random.uniform(low=-1., high=1., size=(sizeY,))

                values[dx, :] = vals

            _, svs, _ = np.linalg.svd(values)

            # svs[0] is the largest singular value

            values = values / svs[0]

            return values


    @property

    def updates(self):

        return self.__updates


    def __call__(self, inputs, batch_mode=False, **kwargs):

        self.batch_mode = batch_mode

        if batch_mode:

            self.sample_size = inputs.shape[0]

            inputs = inputs.dimshuffle(1, 0, 2)

        outputs_info = self.get_initiation()

        non_sequence = self.get_sequences()

        step_fun = self.one_step

        self.h_vals = None

        self.h_vals, self.__updates = theano.scan(fn=step_fun,

                                                  sequences=inputs,

                                                  outputs_info=outputs_info,

                                                  go_backwards=self.backwards,

                                                  non_sequences=non_sequence)

        return self.hidden_states


    def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h):

        h_t = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)

        return h_t


    @property

    def hidden_states(self):

        if self.only_return_final:

            if self.return_method == 'ave':

                return T.mean(self.h_vals, axis=0)

            elif self.return_method == 'max':

                return T.max(self.h_vals, axis=0)

            else:

                return self.h_vals[-1]

        return self.h_vals


    @property

    def parameter(self):

        return self.params


    def get_initiation(self):

        if self.batch_mode:

            return [T.alloc(self.h0, self.sample_size, self.n_hidden)]

        return [self.h0]


    def get_sequences(self):

        return [self.W_ih, self.W_hh, self.b_h]



class RNN_DECODER(RNN):

    def __init__(self, **kwargs):

        RNN.__init__(self, **kwargs)

        self.projection_matrix_hidden = self.init_one_parameter(shape=[self.n_hidden, self.n_in],

                                                                name=self.name + 'projection_matrix_hidden')

        self.params.append(self.projection_matrix_hidden)


    def decode(self, start_hidden, max_length):

        self.set_h0(start_hidden)

        outputs_info = self.get_initiation()

        non_sequence = self.get_sequences()

        step_fun = self.one_step

        self.h_vals = None

        self.h_vals, self.__updates = theano.scan(fn=step_fun,

                                                  n_steps=max_length,

                                                  outputs_info=outputs_info,

                                                  go_backwards=self.backwards,

                                                  non_sequences=non_sequence)

        return self.h_vals


    def get_initiation(self):

        xt = theano.shared(np.zeros(self.n_in, dtype=self.dtype), name=self.name + '_x0_')

        return [xt, self.h0]


    def get_sequences(self):

        return [self.W_ih, self.W_hh, self.b_h, self.projection_matrix_hidden]


    def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h, projection):

        h_t = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)

        word_project = T.tanh(T.dot(h_t, projection))

        # prediction_word_probability = T.nnet.softmax_graph(T.dot(EmbeddingMatrix, word_project))

        # predicted_word = T.argmax(prediction_word_probability)

        # output_word_embedding = EmbeddingMatrix[predicted_word]

        return [word_project, h_t]



class GRU(RNN):

    def __init__(self, gate_activation=T.nnet.sigmoid, **kwargs):

        # init parent attributes

        RNN.__init__(self, **kwargs)

        self.gate_activation = gate_activation

        self.W_iz = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_iz')

        self.W_hz = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hz')

        self.W_ir = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_ir')

        self.W_hr = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hr')

        self.b_z = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_z')

        self.b_r = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_r')

        self.params.extend([self.W_iz, self.W_hz, self.W_ir, self.W_hr, self.b_z, self.b_r])


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h]


    def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h):

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t = (1 - zt) * h_tm1 + zt * ht_hat

        return h_t



class GRU_DECODER(GRU):

    def __init__(self, **kwargs):

        GRU.__init__(self, **kwargs)

        self.projection_matrix_hidden = self.init_one_parameter(shape=[self.n_hidden, self.n_in],

                                                                name=self.name + 'projection_matrix_hidden')

        self.params.append(self.projection_matrix_hidden)

        self.x0 = theano.shared(np.zeros(self.n_in, dtype=self.dtype), name=self.name + '_x0_')

        self.Embedding = None


    def decode(self, start_hidden, max_length, Embedding, x0=None):

        if x0 is not None:

            self.x0 = x0

        self.Embedding = Embedding

        self.set_h0(start_hidden)

        outputs_info = self.get_initiation()

        non_sequence = self.get_sequences()

        step_fun = self.one_step

        self.h_vals = None

        self.h_vals, self.__updates = theano.scan(fn=step_fun,

                                                  n_steps=max_length,

                                                  outputs_info=outputs_info,

                                                  go_backwards=self.backwards,

                                                  non_sequences=non_sequence)

        return self.h_vals[0]


    def get_initiation(self):

        return [self.x0, self.h0]


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h, self.projection_matrix_hidden]


    def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h,

                 projection):

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t = (1 - zt) * h_tm1 + zt * ht_hat

        word_project = T.tanh(T.dot(h_t, projection))

        return [word_project, h_t]



class RNN_ATTENTION:

    def __init__(self, n_attention=100, need_attention=True, **kwargs):

        self.n_attention = n_attention

        self.attention = T.zeros(shape=[self.n_attention])

        self.need_attention = need_attention


    def set_attention(self, attention):

        self.attention = attention


    def reset_attention(self):

        self.attention = T.zeros(shape=[self.n_attention])



class IAGRU_GATE(GRU, RNN_ATTENTION):

    def __init__(self,

                 use_separate_weight_for_GRU_inner_gate=False,

                 **kwargs):

        # init parent attributes

        GRU.__init__(self, **kwargs)

        RNN_ATTENTION.__init__(self, **kwargs)

        # the attention gate to the inner activation

        self.M_az = self.init_one_parameter(shape=[self.n_attention, self.n_hidden], name=self.name + 'M_az')

        self.params.append(self.M_az)

        if use_separate_weight_for_GRU_inner_gate:

            self.M_ar = self.init_one_parameter(shape=[self.n_attention, self.n_hidden], name=self.name + 'M_ar')

            self.params.append(self.M_ar)

        else:

            self.M_ar = self.M_az


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h, self.M_ar, self.M_az, self.attention]


    def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, M_ar, M_az, attention):

        if self.need_attention:

            attention_r = theano.dot(attention, M_ar)

            attention_z = theano.dot(attention, M_az)

        else:

            attention_r = attention_z = T.zeros(shape=[self.n_hidden])

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z + attention_r)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r + attention_z)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t = (1 - zt) * h_tm1 + zt * ht_hat

        return h_t



class HighWay(RNN, RNN_ATTENTION):

    def __init__(self, **kwargs):

        # init parent attributes

        RNN.__init__(self, **kwargs)

        RNN_ATTENTION.__init__(self, **kwargs)

        self.W_highway_i = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_highway_i')

        self.W_highway_a = self.init_one_parameter(shape=[self.n_attention, self.n_hidden],

                                                   name=self.name + 'W_highway_a')

        self.M_highway_p = self.init_one_parameter(shape=[self.n_in, self.n_hidden],

                                                   name=self.name + 'M_highway_p')

        # self.params.append(self.M_highway_p)

        self.params.append(self.W_highway_i)

        if self.need_attention:

            self.params.append(self.W_highway_a)


    def get_sequences(self):

        return [self.W_ih, self.W_hh, self.b_h, self.W_highway_i, self.W_highway_a, self.attention]


    def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h, W_h_i, W_h_a, attention):

        h_t_hat = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)

        trans_gate_un_norm = T.dot(x_t, W_h_i)

        if self.need_attention:

            trans_gate_un_norm += T.dot(attention, W_h_a)

        TransformGate = T.nnet.sigmoid(trans_gate_un_norm)

        h_t = (1 - TransformGate) * h_t_hat + TransformGate * T.dot(x_t, self.M_highway_p)

        return h_t



class HighWayGRU(GRU, RNN_ATTENTION):

    def __init__(self, **kwargs):

        # init parent attributes

        GRU.__init__(self, **kwargs)

        RNN_ATTENTION.__init__(self, **kwargs)

        self.W_highway_i = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_highway_i')

        self.W_highway_a = self.init_one_parameter(shape=[self.n_attention, self.n_hidden],

                                                   name=self.name + 'W_highway_a')

        self.M_highway_p = self.init_one_parameter(shape=[self.n_in, self.n_hidden],

                                                   name=self.name + 'M_highway_p')

        self.params.append(self.M_highway_p)

        self.params.append(self.W_highway_i)

        if self.need_attention:

            self.params.append(self.W_highway_a)


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h, self.W_highway_i, self.W_highway_a, self.attention]


    def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, W_h_i, W_h_a, attention):

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t_hat = (1 - zt) * h_tm1 + zt * ht_hat

        trans_gate_un_norm = T.dot(x_t, W_h_i)

        if self.need_attention:

            trans_gate_un_norm += T.dot(attention, W_h_a)

        TransformGate = T.nnet.sigmoid(trans_gate_un_norm)

        h_t = (1 - TransformGate) * h_t_hat + TransformGate * T.dot(x_t, self.M_highway_p)

        return h_t



class IAGRU_WORD(GRU, RNN_ATTENTION):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        GRU.__init__(self, **kwargs)

        RNN_ATTENTION.__init__(self, **kwargs)

        self.M_qi = self.init_one_parameter(shape=[self.n_attention, self.n_in], name=self.name + 'M_qi')

        self.params.extend([self.M_qi])


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h, self.M_qi, self.attention]


    def one_step(self, x_t_original, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, M_qi, attention):

        at = T.nnet.sigmoid(theano.dot(theano.dot(attention, M_qi), x_t_original))

        x_t = at * x_t_original

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t = (1 - zt) * h_tm1 + zt * ht_hat

        return h_t



class IAGRU_CONTEXT(GRU, RNN_ATTENTION):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        GRU.__init__(self, **kwargs)

        RNN_ATTENTION.__init__(self, **kwargs)

        self.M_hc = self.init_one_parameter(shape=[self.n_hidden, self.n_in], name=self.name + 'M_hc')

        self.M_qc = self.init_one_parameter(shape=[self.n_attention, self.n_in], name=self.name + 'M_qc')

        self.params.extend([self.M_hc, self.M_qc])


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h, self.M_hc, self.M_qc, self.attention]


    def one_step(self, x_t_original, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, M_hc, M_qc, attention):

        w_c = T.tanh(theano.dot(h_tm1, M_hc) + theano.dot(attention, M_qc))

        at = T.nnet.sigmoid(theano.dot(w_c, x_t_original))

        x_t = at * x_t_original

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t = (1 - zt) * h_tm1 + zt * ht_hat

        return h_t



class OAGRU(GRU, RNN_ATTENTION):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        GRU.__init__(self, **kwargs)

        RNN_ATTENTION.__init__(self, **kwargs)

        self.W_hm = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hm')

        self.W_qm = self.init_one_parameter(shape=[self.n_attention, self.n_hidden], name=self.name + 'W_qm')

        self.w_ms = theano.shared(np.random.uniform(size=[self.n_hidden]), name=self.name + 'w_ms')

        self.attention = T.zeros(shape=[self.n_attention])

        self.params.extend([self.W_hm, self.W_qm, self.w_ms])


    def __call__(self, inputs, is_attention_resource=True):

        outputs_info = self.get_initiation()

        non_sequence = self.get_sequences()

        step_fun = self.one_step

        self.h_vals, _ = theano.scan(fn=step_fun,

                                     sequences=inputs,

                                     outputs_info=outputs_info,

                                     go_backwards=self.backwards,

                                     non_sequences=non_sequence)

        if is_attention_resource:

            return self.hidden_states

        WqmOq = T.dot(self.W_qm, self.attention)


        Saq_before_softmax = T.tanh(T.dot(self.h_vals, self.W_hm) + WqmOq)


        Saq = T.nnet.softmax(T.dot(Saq_before_softmax, self.w_ms))

        Oa = T.dot(T.flatten(Saq), self.h_vals)

        return Oa



class SequenceDecoder(IAGRU_GATE):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        IAGRU_GATE.__init__(self, **kwargs)

        self.previous_h = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype))

        self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))


    def set_previous_h(self, previous_h):

        self.previous_h = previous_h


    def reset_x_t(self):

        self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))


    def set_x_t(self, x_t):

        self.x_t = x_t


    def get_previous_h(self):

        return self.previous_h


    def get_now(self):

        attention_r = theano.dot(self.attention, self.M_ar)

        attention_z = theano.dot(self.attention, self.M_az)

        zt = self.gate_activation(

            theano.dot(self.x_t, self.W_iz) + theano.dot(self.previous_h, self.W_hz) + self.b_z + attention_r)

        rt = self.gate_activation(

            theano.dot(self.x_t, self.W_ir) + theano.dot(self.previous_h, self.W_hr) + self.b_r + attention_z)

        rtht_1 = rt * self.previous_h

        ht_hat = T.tanh(theano.dot(self.x_t, self.W_ih) + theano.dot(rtht_1, self.W_hh) + self.b_h)

        h_t = (1 - zt) * self.previous_h + zt * ht_hat

        self.previous_h = h_t

        return h_t



class SequenceDecoderGRU(GRU):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        GRU.__init__(self, **kwargs)

        self.previous_h = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype))

        self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))


    def set_previous_h(self, previous_h):

        self.previous_h = previous_h


    def reset_x_t(self):

        self.x_t = theano.shared(np.zeros(self.n_in, dtype=self.dtype))


    def set_x_t(self, x_t):

        self.x_t = x_t


    def get_previous_h(self):

        return self.previous_h


    @property

    def hidden(self):

        zt = self.gate_activation(theano.dot(self.x_t, self.W_iz) + theano.dot(self.previous_h, self.W_hz) + self.b_z)

        rt = self.gate_activation(theano.dot(self.x_t, self.W_ir) + theano.dot(self.previous_h, self.W_hr) + self.b_r)

        rtht_1 = rt * self.previous_h

        ht_hat = T.tanh(theano.dot(self.x_t, self.W_ih) + theano.dot(rtht_1, self.W_hh) + self.b_h)

        h_t = (1 - zt) * self.previous_h + zt * ht_hat

        return h_t



class LSTM(RNN):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        RNN.__init__(self, **kwargs)

        self.W_xi = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xi')

        self.W_hi = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hi')

        self.W_ci = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_ci')

        self.b_i = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_i')

        self.W_xf = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xf')

        self.W_hf = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hf')

        self.W_cf = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_cf')

        self.b_f = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_f')

        self.W_xc = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xc')

        self.W_hc = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_hc')

        self.b_c = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_c')

        self.W_xo = self.init_one_parameter(shape=[self.n_in, self.n_hidden], name=self.name + 'W_xo')

        self.W_ho = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_ho')

        self.W_co = self.init_one_parameter(shape=[self.n_hidden, self.n_hidden], name=self.name + 'W_co')

        self.b_o = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype), name=self.name + 'b_o')

        self.c0 = theano.shared(np.zeros(self.n_hidden, dtype=self.dtype))

        self.params = []

        self.params.extend(

            [self.W_xi, self.W_hi, self.W_ci, self.b_i, self.W_xf, self.W_hf, self.W_cf, self.b_f, self.W_xc,

             self.W_hc, self.b_c, self.W_xo, self.W_ho, self.W_co, self.b_o])


    def get_sequences(self):

        return [self.W_xi, self.W_hi, self.W_ci,

                self.b_i,

                self.W_xf,

                self.W_hf, self.W_cf, self.b_f,

                self.W_xc,

                self.W_hc,

                self.b_c, self.W_xo, self.W_ho, self.W_co, self.b_o]


    def get_initiation(self):

        return [self._h0, self.c0]


    def one_step(self, x_t, h_tm1, c_tm1, W_xi, W_hi, W_ci, b_i, W_xf, W_hf, W_cf, b_f, W_xc, W_hc, b_c, W_xo,

                 W_ho, W_co, b_o):

        """

        this is the inner step for calc lstm


        remember that we use sigma function to make sure the output normalized to 0-1


        :return: the hidden and c_t state

        """

        i_t = T.nnet.sigmoid(theano.dot(x_t, W_xi) + theano.dot(h_tm1, W_hi) + theano.dot(c_tm1, W_ci) + b_i)

        f_t = T.nnet.sigmoid(theano.dot(x_t, W_xf) + theano.dot(h_tm1, W_hf) + theano.dot(c_tm1, W_cf) + b_f)

        c_t = f_t * c_tm1 + i_t * self.inner_activation(theano.dot(x_t, W_xc) + theano.dot(h_tm1, W_hc) + b_c)

        o_t = T.nnet.sigmoid(theano.dot(x_t, W_xo) + theano.dot(h_tm1, W_ho) + theano.dot(c_t, W_co) + b_o)

        h_t = o_t * self.inner_activation(c_t)

        return [h_t, c_t]


    @property

    def hidden_states(self):

        [_h_vals, _] = self.h_vals

        if self.only_return_final:

            if self.return_method == 'ave':

                return T.mean(_h_vals, axis=0)

            elif self.return_method == 'max':

                return T.max(_h_vals, axis=0)

            else:

                return _h_vals[-1]

        return _h_vals



class RNNResidual(RNN):

    def __init__(self, level=None,

                 **kwargs):

        # init parent attributes

        RNN.__init__(self, **kwargs)

        if level is None:

            level = []

        self.level = level

        if 'word' in self.level:

            self.W_residual_is = self.init_one_parameter(shape=[self.n_in, self.n_hidden],

                                                         name=self.name + 'W_residual_word')

            self.parameter.append(self.W_residual_is)

        self.context = T.zeros(shape=[self.n_hidden])


    def set_attention(self, context):

        self.context = context


    def reset_attention(self):

        self.context = T.zeros(shape=[self.n_hidden])


    def get_sequences(self):

        return [self.W_ih, self.W_hh, self.b_h, self.context]


    def one_step(self, x_t, h_tm1, W_ih, W_hh, b_h, context):

        h_t = self.inner_activation(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)

        if 'word' in self.level:

            h_t += theano.dot(x_t, self.W_residual_is)

        if 'sentence' in self.level:

            h_t += h_tm1

        if 'context' in self.level:

            h_t += context

        return h_t



class GRUResidual(RNNResidual, GRU):

    def __init__(self,

                 **kwargs):

        # init parent attributes

        RNNResidual.__init__(self, **kwargs)

        GRU.__init__(self, **kwargs)


    def get_sequences(self):

        return [self.W_iz, self.W_hz, self.b_z, self.W_ir, self.W_hr,

                self.b_r, self.W_ih, self.W_hh, self.b_h, self.context]


    def one_step(self, x_t, h_tm1, W_iz, W_hz, b_z, W_ir, W_hr, b_r, W_ih, W_hh, b_h, context):

        zt = self.gate_activation(theano.dot(x_t, W_iz) + theano.dot(h_tm1, W_hz) + b_z)

        rt = self.gate_activation(theano.dot(x_t, W_ir) + theano.dot(h_tm1, W_hr) + b_r)

        rtht_1 = rt * h_tm1

        ht_hat = T.tanh(theano.dot(x_t, W_ih) + theano.dot(rtht_1, W_hh) + b_h)

        h_t = (1 - zt) * h_tm1 + zt * ht_hat

        if 'word' in self.level:

            h_t += theano.dot(x_t, self.W_residual_is)

        if 'sentence' in self.level:

            h_t += h_tm1

        if 'context' in self.level:

            h_t += context

        return h_t



if __name__ == '__main__':

    sentence_number = 300

    sentence_length = 25

    embedding_size = 300

    hidden_size = 200

    ain = rng.normal(size=(sentence_number, sentence_length, embedding_size))

    in_vector = T.tensor3('inv')  # this should be replaced by your theano shared variable or T input

    rnn = GRU(n_hidden=hidden_size, n_in=embedding_size, only_return_final=True, return_method='max',

              learn_hidden_init=False)

    hidden = rnn(in_vector, batch_mode=True)

    hidden_one = rnn(in_vector[0], batch_mode=False)


    loss = T.maximum(0, 0.5 - hidden_one)


    fun = theano.function([in_vector], outputs=[hidden, hidden_one, loss], on_unused_input='ignore',

                          allow_input_downcast=True)

    print 'build done'

    for i in range(1000):

        sentence_length = np.random.random_integers(12, 38)

        ain = rng.normal(size=(sentence_number, sentence_length, embedding_size))

        hidden_output = fun(ain)

        print i


留下您的评论

回复列表:

By王炳宁 on March 24, 2016 | 类别 Python

关于本站