- 投稿日:2021-03-21T23:29:37+09:00
Tensorflow.kerasの事前学習済みモデルで、どうしても入力チャンネル数を変えたい!
はじめに tensorflow.kerasには事前学習済みモデルがあります。 tf.keras.applications.EfficientNetB0(weights="imagenet")といった感じで、モデルが作れるので、ちょっとしたときにささっと活用できます。 ですが、入力チャンネルのデフォルトはRGB3チャンネルなので、 グレースケース1chで控えめにいきたいとき 多チャンネル入力でガンガンいきたいとき に対応することができません。 inputs = Input([None,None,5]) pretrained_model = EfficientNetB0(weights="imagenet", input_tensor=inputs) と試したところで、きっちりエラーをはいてくれます。 そんなこんなでお悩みな初心者向けの記事になります。 チャンネル数を減らしたい場合 『入力1チャンネルでいいんですよ』というケース。 これは比較的簡単に解決できます。 ① チャンネルをコピーして増やす 同じチャンネルを3つに増やして、無理やり辻褄を合わせます。 冗長な行列になりますが、numpy やtfのtileを使うと簡単にできます。たとえば、 inputs = Input([None,None,1]) x = Lambda(lambda x: tf.tile(x, [1,1,1,3]))(inputs) pretrained_model = EfficientNetB0(weights="imagenet", input_tensor=x) ② 最初に畳み込みを一個かます 事前学習済みモデルの入力前に出力チャンネルを3にする畳み込みをかまします。 任意のチャンネル数の入力で使えます。 inputs = Input([None,None,2]) x = Conv2D(filters=3, kernel_size=1)(inputs) pretrained_model = EfficientNetB0(weights="imagenet", input_tensor=None) outputs = pretrained_model(x) new_model = Model(inputs, outputs) チャンネル数を増やしたい場合 『私の入力は53チャンネルです』というケース。 これはちょっと厄介です。上述②の畳み込みも使えますが、チャンネル数が無駄に多いと事前学習済みモデルの入力前に情報が大幅に圧縮されてしまうのが難点です。 個人的には事前学習済みモデルの中の最初に出てくる畳み込みを書き換えるぐらいの気持ちでどうにかしていただきたいところですが、私のテクニックtf.keras理解不足か、なかなかうまくいきません。 こうなったら全部レイヤーばらして再構成してやりましょう。 レイヤーの入出力関係を記録しておいて、最初の畳み込みだけ再定義して繋ぎなおせばいけるはずです。 def rebuild_model(pretrained_model, inputs): name_to_num = {} for i, layer in enumerate(pretrained_model.layers): name = layer.name name_to_num[name] = i # 最初の畳み込みを書き換えるので、パラメータを保管しておく。ついでに前処理レイヤも保管。 preprocessing_layers = [] for i, layer in enumerate(pretrained_model.layers): if "preprocessing" in str(layer.__class__): preprocessing_layers.append(i) if "Conv2D" in str(layer.__class__):# rebuild first conv2d rebuild_layer_num = i rebuild_layer_params = {"filters":layer.filters, "kernel_size": layer.kernel_size, "strides": layer.strides, "activation": layer.activation, "use_bias": layer.use_bias, "name": layer.name} break # あとから再構成できるように、各レイヤーの入力を読んでおく。 # ResNetやEfficientNetは分岐があるので、複数入力になるケースがあることに注意。 in_layer_nums = [] for i, layer in enumerate(pretrained_model.layers): layer_in = layer.input if not type(layer_in)==list: in_names = layer_in.name.split("/")[0].split(":")[0] in_nums = name_to_num[in_names] else: in_names = [l_in.name.split("/")[0].split(":")[0] for l_in in layer_in] in_nums = [name_to_num[in_name] for in_name in in_names] in_layer_nums.append(in_nums) # 再構成する。最初の畳み込みだけ作り直し。 # 前処理レイヤは邪魔なので消す。(最初の0-255の変換処理でチャンネル数が違うことによるエラーが起きるので) new_layers = [inputs] for i, [layer, in_nums] in enumerate(zip(pretrained_model.layers, in_layer_nums)): if i==0: continue elif i in preprocessing_layers: print("skip preprocessing_layers: ", layer.name) x = new_layers[in_nums] elif i == rebuild_layer_num: print("rebuild conv2d layers: ", layer.name) x = Conv2D(**rebuild_layer_params)(new_layers[in_nums]) else: if type(in_nums)==list: x = layer([new_layers[num] for num in in_nums]) else: x = layer(new_layers[in_nums]) new_layers.append(x) model = Model(inputs, x) return model という感じに作ってやれば、あとは簡単。 pretrained_model = EfficientNetB0(weights="imagenet" ,input_shape=[None,None, 3], input_tensor=None) inputs = Input([None,None,5]) new_model = rebuild_model(pretrained_model, inputs) tf.kerasの事前学習済みモデルには255除算のような正規化処理が含まれており、チャンネルが変わると怒られるのでそれを消しています。あとは畳み込みを再定義しなおしていますのでここのweightは初期化されています。ご留意のほど。 ということで、任意の入力チャンネル数にすることができました。めでたしめでたし!
- 投稿日:2021-03-21T20:54:47+09:00
Lyapunov指数をTensorflow/KerasのRNNを使って計算する
この記事の内容: 「力学系のリアプノフ指数を求める標準的な数値計算は,RNNを使ったKerasのModelとして完全に書ける.」 Tensorflow のKeras RNNレイヤーを使って計算の流れを実装してみます. すでに微分可能な関数$f$がわかっている場合を扱います.リカレントニューラルネットワークを時系列データから学習させ力学系を同定し,リアプノフ指数を推定する,という内容ではありません. 環境: python 3.8 tensorflow 2.4 問題 状態量$x \in R^{d}$ の時間発展が x_{n+1}=f(x_{n}) で表される力学系を考えて,その軌道のリアプノフ指数をすべて数値的に求めます. QR分解(グラム・シュミット直交化)を行い基底を変換しながら接空間内の互いに直交する方向ごとの伸び率を推定するという標準的な数値計算法を使います. リアプノフ指数の定義については力学系に関するテキスト等を参考にしてください. Hénon写像 (x_{n+1}, y_{n+1}) = ( 1 -a x_{n}^2+y_{n}, bx_{n}) を例にして行います. 計算フロー 全体の計算フローはこの図のようになります. tenorflowでの実装はすべてbatch計算に対応した形になり,複数の軌道の同時計算できることを前提にしています. 下からみていきます. (1) 軌道の計算 初期値$x_0$ から軌道$(x_1, x_2,.., x_L)$ を求めます. 実装上は写像をRNNのcellとして定義して,軌道計算をRNNとして実装することで行うことができます. 前回に書いた記事でここまでやっています. RNNクラスの使い方は前回の記事参照してください. (2) Jacobianの計算 各$x_n$ からヤコビアン$J_{f}(x_n)=...$ を求めます. ヤコビアンを求める部分をKerasレイヤー化しておきます. (3) Jacobianの積をQR分解しながら計算 $Q_0$ を単位行列として準備します. nステップでは,ヤコビアン$J_{n}$を$Q_{n-1}$に掛け,結果をQR分解します: Q_{n}R_{n}=J_{n}Q_{n-1} そして$Q_{n}$を次のステップに送ります. この結果Jacobianの積$J_{L}\cdots J_{2}J_{1}$は Q_{L}R_{L}\cdots R_{2}R_{1}=J_{L}\cdots J_{2}J_{1} となります. この流れもRNNレイヤーとして実装できます. $Q_{n}$がRNNの内部状態(state), $J_{n}$が入力,$R_{n}$ が出力となります. (4) Rの対角項をとり,絶対値の対数をとる. (5) 要素毎に時間方向の平均を取る. これがリアプノフ指数の推定値になります. コード 準備 倍精度を使います. import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import time import matplotlib.pyplot as plt tf.keras.backend.set_floatx('float64') mydtype = tf.float64 Hénon写像の定義・軌道を計算するRNN 前回に書いた記事 とやってることは同じなので省略します. 公開したgithubのコードでは実際は実装を少し変えてtf.stackを使わないようにしたため,見づらいですが速度は上がっています. Jacobian Layer Jacobianの計算を行うKerasレイヤーを定義します.(batch_size, 時間ステップ, 力学系の次元) のテンソル'X'を受け取って,各サンプル,各時刻ごとにヤコビアンを計算します. GradientTapeで記録するなかで各$x_n$で$f(x_n)$ をもう一度計算し,GradientTape.batch_jacobianを使いヤコビアンを出しますが,その仕様上,前後でreshapeをしています. レイヤーはステップ数をオプションの引数で与えることにしておきます. class Jacobian(layers.Layer): def __init__(self, func, **kwargs): ''' input-output shapes of func should be (batch, dim) -> (batch, dim) ''' self.func = func super(Jacobian, self).__init__(**kwargs) def call(self, X, t_length=None): # x have (batch, timestep, dim) if t_length == None: t_length = X.shape[1] batch_size = X.shape[0] X = tf.reshape(X, [batch_size*t_length, X.shape[2]]) with tf.GradientTape() as tape: tape.watch(X) X_next, _ = self.func(None, [X]) Jxs = tape.batch_jacobian(X_next, X) Jxs = tf.reshape(Jxs, [batch_size, t_length, X.shape[1], X.shape[1]]) return Jxs QR分解レイヤー (3)に対応する計算を行うKerasレイヤーをRNNで作るため,1ステップ計算を実装するcellを作ります. 初期状態として単位行列を設定するための関数get_initial_stateを忘れないように書いておきます. class QRDcell(layers.Layer): ''' performing successive QR decomposition. This class can be used as a RNN cell in Keras RNN Layer ''' def __init__(self, dim=2, **kwargs): super(QRDcell, self).__init__(**kwargs) self.dim = dim # d x d dimension (d is a dimension of dynamical systems) self.state_size = tf.TensorShape([dim, dim]) self.output_size = tf.constant(dim) def get_initial_state(self, inputs=None, batch_size=None, dtype=mydtype): ''' return identity matrices''' return tf.linalg.eye(self.dim, self.dim, batch_shape=[batch_size], dtype=mydtype) def call(self, inputs, states): # inputs is J_{n} (batch, dim, dim) # states is Q_{n-1} (batch, dim,dim). Q_{0} is identity matrix # Q_{n}R_{n} = J_{n}Q_{n-1} # Q_{n} is the next state. (Q_new) # R_{n} is the output. (R_new) J = inputs Q = states[0] Q_new, R_new = tf.linalg.qr(J@Q) return R_new, [Q_new] モデルの構築 Keras Functional API を使って計算の流れを記述しモデルを作成します. # parameters batch_size = tf.constant(10, dtype=tf.int32) dim = tf.constant(2, dtype=tf.int32) t_length = tf.constant(10000, dtype=tf.int32) # layers hc = HenonCell(a=1.4, b=0.3) henon_rnn = layers.RNN(hc, return_sequences=True) jl = Jacobian(hc) qrd_rnn = layers.RNN(QRDcell(), return_sequences=True) # inputs (initial conditions and time length) input_initial_condition = keras.Input(shape=(dim,), dtype=mydtype) # input object for keras model tlength_inp = keras.Input(batch_shape=(1,), dtype=tf.int32) # passing model length of timesteps # tracing forward calculation dummy_input = tf.zeros( shape=[batch_size, tlength_inp[0], dim], dtype=mydtype) X = henon_rnn(dummy_input, initial_state=input_initial_condition) # 軌道の計算 js = jl(X, t_length=tlength_inp[0]) # reshapeするときに必要なため,明示的にt_length_inp[0] を渡す.(これはt_lengthが決まっているtensorのときは必要ない) rs = qrd_rnn(js) # qr分解しながらRを集める. log_diag_r = tf.math.log(tf.math.abs( tf.linalg.diag_part(rs))) # Rの対角項をとって絶対値の対数を取る m_log_r = tf.reduce_mean(log_diag_r, axis=1) # 平均 # functional APIによりモデル作成 # 軌道計算するモデル gen_trj = keras.models.Model(inputs=[input_initial_condition, tlength_inp], outputs=X) # リアプノフ指数の計算までするモデル est_lyap = keras.models.Model(inputs=[input_initial_condition, tlength_inp], outputs=m_log_r) # 高速化する関数 @tf.function def calc_lyap(x0, t_length): return est_lyap([x0, t_length]) 計算実行・結果 実際に計算する部分です. 初期値が異なると有限の長さでは異なった値になるため,バッチ内のサンプル平均と標準偏差を表示しています. g1 = tf.random.Generator.from_non_deterministic_state() x0 = 0.1*g1.normal(shape=(batch_size, dim), dtype=mydtype) lyaps_batch = calc_lyap(x0, t_length) lyap_ave = tf.reduce_mean(lyaps_batch, axis=0) lyap_std = tf.math.reduce_std(lyaps_batch, axis=0) print(f'Estimation of lyapnov exponents (average over different trajectories +- std.):') for i,(av,st) in enumerate(zip(lyap_ave, lyap_std)): print(f'{i}: {av.numpy()} +- {st.numpy()} ') Estimation of lyapnov exponents (average over different trajectories +- std.): 0: 0.41938474252452246 +- 0.002342227276169231 1: -1.6233575468504582 +- 0.0023422272761692764 memo 改良する点: 初期遷移のステップを計算から除くなど Jacobianの計算のために$f$を2度計算するのはコストがかかるので,一度でできるのではないか? RNNcell内でJacobianを計算すればそのようにできるが,使い勝手が悪いのでは. Jacobianを求めるときの$f$は並列的に計算できるので,実際の計算時間はほとんど変わらないのでは. github コードを下記に公開しています. https://github.com/yymgch/sample_codes_public/blob/master/tf_lyapunov_exp.py