Theano RNN in-build-in funtion

This is the theano in build in method that i utilize to build RNN .all vaiables and input output args are theano shared variable

you can use it in your theano in-build function before compile

make fun~~~~


# -*- coding: utf-8 -*- 

__author__ = 'benywon'

import numpy as np

import theano

import theano.tensor as T


dtype = theano.config.floatX


sigma = lambda x: 1 / (1 + T.exp(-x))


rng = np.random.RandomState(1991)


theano.config.exception_verbosity = 'high'


"""

this is the main function to utilize theano to build a RNN layer


the input should be a theano shared variable


and the output is either a theano shared variable or update parameter


you can use it directly in your model building process


"""


"""

after long period experience

I noticed that the best inner active function is sigmoid

however the hidden to output is acv

"""



class RNN:

    """

    base class for all recurrent model

    """


    def __init__(self,

                 N_out=2,

                 N_in=None,

                 W_initiation='svd',

                 N_hidden=50,

                 only_return_final=False,

                 backwards=False,

                 contain_output=False):

        self.contain_output = contain_output

        self.backwards = backwards

        self.only_return_final = only_return_final

        self.N_in = N_in

        self.N_out = N_out

        self.N_hidden = N_hidden

        self.W_initiation = W_initiation


        # standard rnn parameter

        self.b_h = theano.shared(np.zeros(N_hidden, dtype=dtype))

        self.h0 = theano.shared(np.zeros(N_hidden, dtype=dtype))

        self.W_ih = theano.shared(self.sample_weights(N_in, N_hidden))

        self.W_hh = theano.shared(self.sample_weights(N_hidden, N_hidden))

        self.W_ho = theano.shared(self.sample_weights(N_hidden, N_out))

        self.b_o = theano.shared(np.zeros(N_out, dtype=dtype))


        self.params = [self.W_ih, self.W_hh, self.b_h, self.h0]

        self.h_vals = None

        if self.contain_output:

            self.params.extend([self.W_ho, self.b_o])

            self.y_vals = None


    def build(self, matrix_in):

        step_fun = self.one_step if self.contain_output else self.one_step_no_output

        [self.h_vals, self.y_vals], _ = theano.scan(fn=step_fun,

                                                    sequences=dict(input=matrix_in, taps=[0]),

                                                    outputs_info=[self.h0, None],  # corresponds to return type of fn

                                                    non_sequences=[self.W_ih, self.W_hh, self.b_h, self.W_ho, self.b_o])


    def one_step(x_t, h_tm1, W_ih, W_hh, b_h, W_ho, b_o):

        h_t = T.tanh(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)

        y_t = theano.dot(h_t, W_ho) + b_o

        y_t = sigma(y_t)

        return [h_t, y_t]


    def one_step_no_output(x_t, h_tm1, W_ih, W_hh, b_h, W_ho, b_o):

        """

        function that did not calculate the output data

        """

        h_t = T.tanh(theano.dot(x_t, W_ih) + theano.dot(h_tm1, W_hh) + b_h)

        return [h_t, h_t]


    def sample_weights(self, sizeX, sizeY):

        """

        it has been proved that the max singular value of a matirx can not

        exceed 1 for the non exploding RNN issues

        :param sizeY: the initiation matrix size y

        :param sizeX:the initiation matrix size x

        :return: the svd matrix remove max value

        """

        if self.W_initiation == 'random':

            return rng.normal([sizeX, sizeY])

        else:

            values = np.ndarray([sizeX, sizeY], dtype=dtype)

            for dx in xrange(sizeX):

                vals = np.random.uniform(low=-1., high=1., size=(sizeY,))

                # vals_norm = np.sqrt((vals**2).sum())

                # vals = vals / vals_norm

                values[dx, :] = vals

            _, svs, _ = np.linalg.svd(values)

            # svs[0] is the largest singular value

            values = values / svs[0]

            return values


    def get_parameter(self):

        return self.params


    def get_hidden(self):

        if self.only_return_final:

            return self.h_vals[-1]

        else:

            return self.h_vals


    def get_output(self):

        if self.only_return_final:

            return self.y_vals[-1]

        else:

            return self.y_vals


class LSTM(RNN):

    """

    this is my implementation of lstm

    borrow heavily from this blog

    http://christianherta.de/lehre/dataScience/machineLearning/neuralNetworks/LSTM.php

    you can get your own parameters

    """


    def __init__(self,

                 b_i_init=(-0.5, 0.5),

                 b_o_init=(-0.5, 0.5),

                 b_f_init=(0., 1.),

                 act=T.tanh,

                 **kwargs):

        # init parent attributes

        RNN.__init__(self, **kwargs)

        self.act = act

        self.W_xi = theano.shared(self.sample_weights(self.N_in, self.N_hidden))

        self.W_hi = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.W_ci = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.b_i = theano.shared(np.cast[dtype](np.random.uniform(b_i_init[0], b_i_init[1], size=self.N_hidden)))

        self.W_xf = theano.shared(self.sample_weights(self.N_in, self.N_hidden))

        self.W_hf = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.W_cf = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.b_f = theano.shared(np.cast[dtype](np.random.uniform(b_f_init[0], b_f_init[1], size=self.N_hidden)))

        self.W_xc = theano.shared(self.sample_weights(self.N_in, self.N_hidden))

        self.W_hc = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.b_c = theano.shared(np.zeros(self.N_hidden, dtype=dtype))

        self.W_xo = theano.shared(self.sample_weights(self.N_in, self.N_hidden))

        self.W_ho = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.W_co = theano.shared(self.sample_weights(self.N_hidden, self.N_hidden))

        self.b_o = theano.shared(np.cast[dtype](np.random.uniform(b_o_init[0], b_o_init[1], size=self.N_hidden)))

        self.W_hy = theano.shared(self.sample_weights(self.N_hidden, self.N_out))

        self.b_y = theano.shared(np.zeros(self.N_out, dtype=dtype))

        self.c0 = theano.shared(np.zeros(self.N_hidden, dtype=dtype))

        self.h0 = T.tanh(self.c0)

        self.c_vals = None


        self.params = [self.W_xi, self.W_hi, self.W_ci, self.b_i, self.W_xf, self.W_hf, self.W_cf, self.b_f, self.W_xc,

                       self.W_hc, self.b_c, self.W_xo, self.W_ho, self.W_co, self.b_o, self.c0]

        if self.contain_output:

            self.params.extend([self.W_hy, self.b_y])


    def build(self, matrix_in):

        if self.backwards:

            matrix_in = matrix_in[::-1]

        lstm_step_fun = self.one_lstm_step if self.contain_output else self.one_lstm_step_no_output

        [self.h_vals, self.c_vals, self.y_vals], _ = theano.scan(fn=lstm_step_fun,

                                                                 sequences=dict(input=matrix_in, taps=[0]),

                                                                 outputs_info=[self.h0, self.c0, None],

                                                                 non_sequences=[self.W_xi, self.W_hi, self.W_ci,

                                                                                self.b_i,

                                                                                self.W_xf,

                                                                                self.W_hf, self.W_cf, self.b_f,

                                                                                self.W_xc,

                                                                                self.W_hc,

                                                                                self.b_c, self.W_xo, self.

                                                                 W_ho, self.W_co, self.b_o, self.W_hy, self.b_y])


    def get_cell(self):

        if not self.only_return_final:

            return self.c_vals

        else:

            return self.c_vals[-1]


    def one_lstm_step(self, x_t, h_tm1, c_tm1, W_xi, W_hi, W_ci, b_i, W_xf, W_hf, W_cf, b_f, W_xc, W_hc, b_c, W_xy,

                      W_ho, W_cy, b_o, W_hy, b_y):

        """

        this is the inner step for calc lstm


        remember that we use sigma function to make sure the output lie in sigmoid 0-1


        :return: the hidden and c_t y_t state

        """

        i_t = sigma(theano.dot(x_t, W_xi) + theano.dot(h_tm1, W_hi) + theano.dot(c_tm1, W_ci) + b_i)

        f_t = sigma(theano.dot(x_t, W_xf) + theano.dot(h_tm1, W_hf) + theano.dot(c_tm1, W_cf) + b_f)

        c_t = f_t * c_tm1 + i_t * self.act(theano.dot(x_t, W_xc) + theano.dot(h_tm1, W_hc) + b_c)

        o_t = sigma(theano.dot(x_t, self.W_xo) + theano.dot(h_tm1, W_ho) + theano.dot(c_t, self.W_co) + b_o)

        h_t = o_t * self.act(c_t)

        y_t = sigma(theano.dot(h_t, W_hy) + b_y)

        return [h_t, c_t, y_t]


    def one_lstm_step_no_output(self, x_t, h_tm1, c_tm1, W_xi, W_hi, W_ci, b_i, W_xf, W_hf, W_cf, b_f, W_xc, W_hc, b_c,

                                W_xy,

                                W_ho, W_cy, b_o, W_hy, b_y):


        i_t = sigma(theano.dot(x_t, W_xi) + theano.dot(h_tm1, W_hi) + theano.dot(c_tm1, W_ci) + b_i)

        f_t = sigma(theano.dot(x_t, W_xf) + theano.dot(h_tm1, W_hf) + theano.dot(c_tm1, W_cf) + b_f)

        c_t = f_t * c_tm1 + i_t * self.act(theano.dot(x_t, W_xc) + theano.dot(h_tm1, W_hc) + b_c)

        o_t = sigma(theano.dot(x_t, self.W_xo) + theano.dot(h_tm1, W_ho) + theano.dot(c_t, self.W_co) + b_o)

        h_t = o_t * self.act(c_t)

        return [h_t, c_t, o_t]



class GRU():

    def __init__(self,

                 N_hidden=15,

                 N_in=None,

                 N_out=2,

                 act=T.tanh,

                 w_initiation='svd',

                 only_return_final=False,

                 backwards=False,

                 contain_output=False):

        self.N_hidden = N_hidden

        self.N_in = N_in


if __name__ == '__main__':

    a=LSTM(N_in=10,N_hidden=50,only_return_final=False,backwards=False)

    ain=np.ones(60).reshape((6,10))

    ins=T.matrix('ins')

    a.build(ins)

    hid=a.get_hidden()

    params=a.get_parameter()

    fun=theano.function([ins],hid)

    oo=fun(ain)

    print oo



留下您的评论

回复列表:

By王炳宁 on Jan. 12, 2016 | 类别 NLP

关于本站