- 投稿日:2021-01-01T13:32:02+09:00
dm-haikuをよむ
はじめに
あけましておめでとうございます。今日から2021年です。新年ということでですね、haikuをよんでいこうと思います。
haiku(dm-haiku)とは
haiku(dm-haiku)はdeepmind製のjax向け深層学習モデル記述ライブラリです。まだ開発途中でβ版として公開されています。
sonnetをご存じの方には、haikuはsonnetのjax版と言うと分かりやすいと思います。sonnet → sonnet2 → haiku と進化する過程を見ると面白いです。
先月に、DeepMindのブログで、DeepMindがjaxを使っているよ、色々ライブラリ作って公開しているよ、という旨の内容が紹介されていました。また、ODE-GAN(論文、コード)という研究にhaikuが使われています(この論文もDeepMindの方々が執筆)。ODE-GANは、ルンゲクッタ法や、パラメータの勾配に対する正則化のように、柔軟な勾配計算を要求する手法で、tensorflowで書くと大変そうな手法なのですが、ODE-GANのコードはhaikuを使ってシンプルに書かれています。これが、haikuについて調べてみようと思った経緯です。haikuを使った例
まず、haikuの機能の紹介に、以下の例を用意しました。コメントの[A]から[G]の部分がhaikuの特徴的な部分です。コードの後でこれらを解説します。
import haiku as hk # [A] haikuのimport import jax import jax.numpy as jnp # 典型的なモジュールの書き方 class MyModule(hk.Module): def __init__(self, n_hidden_layer, dim_hidden, dim_out, name=None): super().__init__(name=name) # [B] name scopeを作成 self._hidden_layers = [hk.Linear(dim_hidden) for _ in range(n_hidden_layer)] self._dim_hidden = dim_hidden self._last_layer = hk.Linear(dim_out) def __call__(self, x): # [C] 計算内容の記述 for i, layer in enumerate(self._hidden_layers): # [D] 変数の取得 b = hk.get_parameter(f"b_{i}", shape=[self._dim_hidden], dtype=x.dtype, init=jnp.zeros) x = jax.nn.relu(layer(x)) + b return self._last_layer(x) # transformを使ったコンパクトな書き方 def get_my_model_by_function(n_hidden_layer, dim_hidden, dim_out): def my_module_by_function(x): for i in range(n_hidden_layer): b = hk.get_parameter(f"b_{i}", shape=[dim_hidden], dtype=x.dtype, init=jnp.zeros) x = jax.nn.relu(hk.Linear(dim_hidden)(x)) + b return hk.Linear(dim_out)(x) return hk.transform(my_module_by_function) # [E] 関数をモデルに変換する # transformを使ったモデルの使い方 model = get_my_model_by_function(5, 100, 10) key = hk.PRNGSequence(20210101) # [F] 乱数生成のキーの作成が簡単にできる便利イテレータ x = jnp.ones([1, 30]) params = model.init(next(key), x) # [G] transformで作ったモデルにはinitとapplyの関数がある y = model.apply(params, None, x)例の説明(import)
[A]のようにhaikuをhkと省略してimportするのがおすすめのようです。haikuを使う場合、同時にjaxやjax.numpyやnumpyも一緒にimportすることが多いでしょう。jax.numpyはjnpと省略される例が多いです。
例の説明(hk.Module)
モジュール(全結合層や畳み込み層などの層)の記述に
hk.Module
を使います。hk.Module
がhaikuの最も基礎的な機能です。モジュールの役割は、以下の2点です。
- そのモジュール用の名前空間を作成する(内部で作った変数等にそのモジュールのものと分かる名前が付く)
- モジュールの計算内容を記述する
hk.Module
は、sonnetのAbstractModule
やsonnet2のModule
に相当するものです。インターフェースはsonnet2のModule
と同じです。2系のtensorflowのtf.Module
も似た機能です。
1番はコードの[B]の部分です(親クラスでいい感じにやってくれます)。tensorflowでいうところのvariable_scope
やname_scope
にあたる機能です。sonnet(v1)の時代から似たものがあり、sonnet(v1)ではvariable_scope
とcustom_getter
で実現されていました。tensorflow2系からtf.Module
ができたため、sonnet2のModule
はtf.Module
をベースに作成されています。jaxにはname_scope
のようなものがないため、haikuはそれに相当するものを用意しています。
2番はコードの[C]の部分です。pythonでは__call__
というメソッドがあると、関数のようにmodel(x)
のように使えます(__call__
メソッドの内容を実行します)。ここもsonnet2の典型的な使い方と同じです。ただし、sonnet2では、変数の作成にonce
という関数でデコレートしたメソッドで、最初の一回だけパラメータ作成のために呼ぶメソッドを用意するパターンを採用していました。tensorflowが2系になったときにget_variable
等の関数が無くなり、その影響でこのような面倒が発生していました。haikuでは、get_parameter
という関数があり、最初に呼ばれる場合を意識することなく変数を扱うことができます。例の説明(hk.transform)
モジュールを組み合わせてモデルを作るわけですが(そのモデルを一つのモジュールで書いてもいいですが)、扱いやすいやすさ・インターフェースの整理のために、
transform
やtransform_with_state
という関数があります(コードの[E]の部分)。簡単のためtransform
だけに話を絞ります。transform
は(モジュールを使う)関数をモデルに変換するものです。返り値の型はTransformed
という型ですが、ただのnamedtupleです。Transformed
はinit
とapply
という属性を持ちます。init
はモデルの初期化を行う関数で、モデルの中のパラメータの値を返します。逆にapply
はパラメータと入力を引数に与えて、そのモデルの計算結果を返す関数です。コードの[G]の部分が該当します。init
の引数は、乱数生成のキー・入力の順で、apply
の引数は、モデルのパラメータ・乱数生成のキー・入力の順です。パラメータを引数にするあたりが特徴的です。冒頭のODE-GANのような手法を運用のに便利なインターフェースです。例の説明(その他)
jaxを触ってみると分かることですが、jaxの乱数の仕組みは他(numpyやtensorflow)と比べて複雑です。乱数生成のキーを生成する仕組みを使って・・・というような処理が必要です。高度なコントロールができるのが利点ですが、利用するには簡単なものがあると便利です。haikuの
PRNGSequence
はそれを簡単に使えるものです(コードの[F]の部分)。PRNGSequence
を使うと乱数生成のキーを簡単に用意できるため、苦労が減ることでしょう。haikuのよみ方
haikuの基本構造
haikuだけでなく、前身のsonnet・sonnet2でもそうですが、これらはシンプルな構造のライブラリになることを目指しています。その構造は少量のコア機能とそれを使った具体的な部品からなります。haikuの場合具体的には以下のものから構成されます。
役割 内容 補足 コア機能 hk.Module
部品 hk.Linear
などhk.Module
を継承して多くの層が用意されている便利機能 hk.transform
やhk.PRNGSequence
などhk.transform
はコア機能かもsonnetも似た構造をしています(「どうせ同じでしょ」と思ってhaikuをよんだらやっぱりそうでした)。
どこをよむといいか
どこをよむといいかなんて、よむ人次第ですが、「haikuの仕組みが知りたい」「haikuのどの仕組みが他ライブラリとの違いか」といった部分に興味があったので、コア機能を中心によみました。逆にhaikuを使って自作の層を作ってみたい人は、部品を中心によむとよいでしょう。
haikuをよむ
前章で説明した通り、コア機能に興味があるので、そこを中心によんで分かったことを紹介していきます。
また、そのときのバージョンのソースコードのURLは、https://github.com/deepmind/dm-haiku/tree/300e6a40be31e35940f0725ae7ed3457b737a5a3です。ディレクトリ構造
haikuの主要なファイル・ディレクトリの構造は以下の通りです。基本的にプログラムは
haiku/_src/XX.py
に書き、haiku
ディレクトリ直下のpythonファイルで公開APIの部分だけをインポートしています。. ├── WORKSPACE ├── haiku │ ├── BUILD │ ├── __init__.py │ ├── _src │ │ ├── base.py │ │ ├── data_structure.py │ │ ├── stateful.py │ │ ├── module.py │ │ ├── transform.py │ │ └── typing.py │ ├── data_structures.py │ ├── experimental.py │ ├── initializers.py │ ├── nets.py │ ├── pad.py │ ├── testing.py │ └── typing.py ├── requirements-jax.txt ├── requirements-test.txt ├── requirements.txt ├── setup.py └── test.sh
haiku/_src
ディレクトリの中身はコア機能に関係するものだけを書きました。haiku/_src/base.py
haiku/_src/base.py
には、name_scopeを支える仕組みやPRNGSequenceが書かれています。
name_scopeには、このスコープ中のこのスコープの・・・というようなスコープの階層構造があります。name_scopeでは今、どのスコープにいるかを把握しないといけません。そのために「今どこのスコープにいるか」を意味するグローバル変数を用意しています。具体的には、stackとして実現しており、スコープに入ったらpush、スコープから出たらpopする。スコープを意識した処理をするときは、例えばstackの最後を見て処理をするであったり、stackの先頭から順に何かを適用したりします(再帰で似たことを実現しているコードがなん箇所かにあります)。雑にname_scopeと書きましたが、このような階層構造を、変数・状態変数(BatchNormalizationの移動平均等)・乱数・モジュール・名前それぞれのためにstackを用意していました。haiku/_src/data_structure.py
haiku/_src/data_structure.py
にはhaikuのコア機能内のために使われる基礎的なデータ構造が書かれています。主要なものはStackとFlatMappingです。
Stackはbase.pyで説明したように、階層構造のどこにいるのかを表現するために使われます。
FlatMappingはモデルの変数一覧等に用いられます。haikuのモデルの変数は階層構造があります。その階層構造を表現するためにdictのかわりにこのFlatMappingを使います。Transformed
のinit
の返り値などで用いられます。FlatMappingはjax.tree_XXというような名前のjaxの関数と一緒に使うために準備されたものです。jax.tree_XXという関数を使って、パラメータの更新や、勾配同士の演算をしたり(ルンゲクッタ法のような)します。haiku/_src/stateful.py
haiku/_src/stateful.py
はtransofrom
等の中でjaxの関数とhaikuのスコープを整合的に運用するための機能が実装されています。jaxの基礎的な関数をhaiku用にラップしたものがあります。細かなパーツがたくさんある感じなのと、そこまでjaxのあれこれを知っているわけではないので、現状は深く踏み込めなくてこのファイルはこれくらいで諦めました。haiku/_src/module.py
haiku/_src/module.py
は名前からわかるとおり、Module
が書かれています。また、Module
の基本機能のname_scope自体もこのファイルにあります。haiku/_src/transform.py
haiku/_src/transform.py
も名前からわかるとおり、transform
が実装されています。実装を見ていくと、少したらいまわしにされますが、init
とapply
の二つの関数を適切なスコープで作ってTransformed
として返すだけの関数です。haiku/_src/typing.py
haiku/_src/typing.py
はよく使う型を特別に変数に代入しているだけです。一箇所気になるのは、次のような行です。Module = typing._ForwardRef("haiku.Module")
Module
が最も基礎的な機能なので、typing.pyで型ヒント用の変数を用意したいのはよくわかります。forward referenceという実験的な機能があるようで、それが使えたら使うということをしています。最後に
ということでね、haikuをよみました。
2020年はtransformerの高速化や画像認識への適用、GPT3、alphafoldの活躍といった話題がありましたが、2021年はどんな技術・手法が登場するのでしょうか。今はtensorflow/pytorchの2強状態ですが、jaxやhaikuじゃないと上手く書けないような高度な手法とかが登場したりするんでしょうかね?東京オリンピックもどうなるんでしょうね?開会式などで日本の技術力をアピールするような深層学習を使った何かがみれたりするんでしょうかね?
ポエム
新年や 離れて祝い haikuよむ
- 投稿日:2021-01-01T12:12:27+09:00
独自のデータセットをTFRecord 形式にする
TensorFlow Object Detection API で独自のデータセットをつかうには、TFRecord ファイル形式にする必要があります。
データセットからTFRecordを作る手順です。手順
tf_exampleを生成するコードスニペットを使うためにobject_detectionAPI をクローンします。
import os import pathlib if "models" in pathlib.Path.cwd().parts: while "models" in pathlib.Path.cwd().parts: os.chdir('..') elif not pathlib.Path('models').exists(): !git clone --depth 1 https://github.com/tensorflow/models %%bash cd models/research/ protoc object_detection/protos/*.proto --python_out=. cp object_detection/packages/tf2/setup.py . python -m pip install .個々のデータを tf_example に変換する関数を定義
個々の画像とアノテーション情報をバイトに変換して、
tf_exampleという形式に変えます。import tensorflow as tf from object_detection.utils import dataset_util def create_tf_example(height, width, filename, image_format, xmins,xmaxs, ymins, ymaxs, classes_text, classes): # TODO(user): Populate the following variables from your example. # height = None # Image height # width = None # Image width # filename = None # Filename of the image. Empty if image is not from file # encoded_image_data = None # Encoded image bytes # image_format = None # b'jpeg' or b'png' # xmins = [] # List of normalized left x coordinates in bounding box (1 per box) # xmaxs = [] # List of normalized right x coordinates in bounding box # # (1 per box) # ymins = [] # List of normalized top y coordinates in bounding box (1 per box) # ymaxs = [] # List of normalized bottom y coordinates in bounding box # # (1 per box) # classes_text = [] # List of string class name of bounding box (1 per box) # classes = [] # List of integer class id of bounding box (1 per box) with tf.io.gfile.GFile(filename, 'rb') as fid: encoded_jpg = fid.read() # encoded_jpg_io = io.BytesIO(encoded_jpg) tf_example = tf.train.Example(features=tf.train.Features(feature={ 'image/height': dataset_util.int64_feature(height), 'image/width': dataset_util.int64_feature(width), 'image/filename': dataset_util.bytes_feature(filename.encode('utf-8')), 'image/source_id': dataset_util.bytes_feature(filename.encode('utf-8')), 'image/encoded': dataset_util.bytes_feature(encoded_jpg), 'image/format': dataset_util.bytes_feature(image_format), 'image/object/bbox/xmin': dataset_util.float_list_feature(xmins), 'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs), 'image/object/bbox/ymin': dataset_util.float_list_feature(ymins), 'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs), 'image/object/class/text': dataset_util.bytes_list_feature(classes_text), 'image/object/class/label': dataset_util.int64_list_feature(classes), })) return tf_exampleデータセットをForLoop処理でtf_exampleにしてTFRecordWriterで書き込む
以下のようなアノテーションデータがあるとします。
boxは [minx, miny, maxx, maxy]{ "categories": [ { "id": 1, "name": "cat" }, { "id": 2, "name": "dog" } ], "annotations": [ { "filename": "train_000.jpg", "image_height": 3840, "image_width": 2160, "labels": [ 1, 1, 2 ], "label_texts": [ "cat", "cat", "dog" ], "boxes": [ [ 1250, 790, 1850, 1300 ], [ 920, 1230, 1310, 1550 ], [ 12, 1180, 550, 1450 ] ] }, ... } ] }データセットを1画像分ずつ tf_example にして tf_records に書き込みます。
tf_example の中身の tf.train.Feature はバイトしか受け付けないので、データをバイトにして与える必要があります。import tensorflow as tf import os import numpy as np from PIL import Image # from object_detection.utils import dataset_util output_path = './data.tfrecords' image_dir = './train_images/' writer = tf.io.TFRecordWriter(output_path) annotations = dataset['annotations'] for annotation in annotations: if annotation['boxes'] != []: height = annotation['image_height'] width = annotation['image_width'] filename = (image_dir + annotation['filename']).encode('utf-8') image_format = b'jpeg' xmins = [] xmaxs = [] ymins = [] ymaxs = [] for box in annotation['boxes']: xmins.append(box[0] / width) # 0~1に正規化 xmaxs.append(box[2] / width) ymins.append(box[1] / height) ymaxs.append(box[3] / height) classes_text = [] for text in annotation['label_texts']: classes_text.append(text.encode('utf-8')) classes = [] for label in annotation['labels']: classes.append(bytes([label])) tf_example = create_tf_example(height,width,filename,image_format,xmins,xmaxs,ymins,ymaxs,classes_text,classes) writer.write(tf_example.SerializeToString()) writer.close()分割して書き込む
データセットが大きい場合は、TFRecordを分割してファイルにすると便利です。
公式のドキュメントによるとtf.data.Dataset APIは、入力例を並行して読み取ることができ、スループットを向上させます。
tf.data.Dataset APIは、モデルのパフォーマンスをわずかに向上させるシャードファイルを使用して例をより適切にシャッフルできます。tf_example を生成し、分割して書き込みます。
import contextlib2 from object_detection.dataset_tools import tf_record_creation_util num_shards=10 output_filebase='./train_dataset.record' with contextlib2.ExitStack() as tf_record_close_stack: output_tfrecords = tf_record_creation_util.open_sharded_output_tfrecords( tf_record_close_stack, output_filebase, num_shards) annotations = dataset['annotations'] for i in range(len(annotations)): if annotations[i]['boxes'] != []: height = annotations[i]['image_height'] width = annotations[i]['image_width'] filename = (image_dir + annotations[i]['filename']).encode('utf-8') image_format = b'jpeg' xmins = [] xmaxs = [] ymins = [] ymaxs = [] for box in annotations[i]['boxes']: xmins.append(box[0] / width) # 0~1に正規化 xmaxs.append(box[2] / width) ymins.append(box[1] / height) ymaxs.append(box[3] / height) classes_text = [] for text in annotations[i]['label_texts']: classes_text.append(text.encode('utf-8')) classes = [] for label in annotations[i]['labels']: classes.append(bytes([label])) tf_example = create_tf_example(height,width,filename,image_format,xmins,xmaxs,ymins,ymaxs,classes_text,classes) output_shard_index = i % num_shards output_tfrecords[output_shard_index].write(tf_example.SerializeToString())分割したファイルが生成されます。
./train_dataset.record-00000-00010
./train_dataset.record-00001-00010
...
./train_dataset.record-00009-00010使用するときはConfigを以下に設定します
tf_record_input_reader { input_path:" /path/to/train_dataset.record-?????-of-00010 " }?
フリーランスエンジニアです。
お仕事のご相談こちらまで
rockyshikoku@gmail.comCore MLを使ったアプリを作っています。
機械学習関連の情報を発信しています。
- 投稿日:2021-01-01T09:29:47+09:00
TensorFlow Object Detection APIで物体検出モデルを簡易トレーニング
手軽に物体検出モデルをトレーニング
Colabサンプル
事前トレーニング済みモデルのファインチューニングで、
新たなオブジェクトを検出できるようになります。
トレーニング後はモデルの保存、復元もできます。
事前トレーニングしたモデルの最後の層の転移学習です。手順(Colabリンクでも実行できます)
0.TensorFlow2をインストール
!pip install -U --pre tensorflow=="2.2.0"1.リポジトリのクローン
import os import pathlib if "models" in pathlib.Path.cwd().parts: while "models" in pathlib.Path.cwd().parts: os.chdir('..') elif not pathlib.Path('models').exists(): git clone --depth 1 https://github.com/tensorflow/models2.Object Detection APIをインストール
%%bash cd models/research/ protoc object_detection/protos/*.proto --python_out=. cp object_detection/packages/tf2/setup.py . python -m pip install .3.モジュールのインポート
import matplotlib import matplotlib.pyplot as plt import os import random import io import imageio import glob import scipy.misc import numpy as np from six import BytesIO from PIL import Image, ImageDraw, ImageFont from IPython.display import display, Javascript from IPython.display import Image as IPyImage import tensorflow as tf from object_detection.utils import label_map_util from object_detection.utils import config_util from object_detection.utils import visualization_utils as viz_utils from object_detection.utils import colab_utils from object_detection.builders import model_builder %matplotlib inline4.画像読み込み関数
画像を Numpy array にします。
def load_image_into_numpy_array(path): """画像ファイルをNumpy配列にする. TensorFlowのグラフに食わせるために画像をNumpy配列に。 慣例として(高さ、幅、カラーチャネル)形状のNumpy配列にする。 引数: path: 画像ファイルのパス. 戻り値: uint8、(高さ, 幅, 3チャネル)形状のnumpy配列。 """ img_data = tf.io.gfile.GFile(path, 'rb').read() image = Image.open(BytesIO(img_data)) (im_width, im_height) = image.size return np.array(image.getdata()).reshape( (im_height, im_width, 3)).astype(np.uint8)5.結果を視覚化する関数
def plot_detections(image_np, boxes, classes, scores, category_index, figsize=(12, 16), image_name=None): """検出結果を視覚化するAPI機能のラッパー関数. 引数: image_np: uint8、(高さ, 幅, 3チャネル)形状のnumpy配列。 boxes: [数, 4]形状のnumpy配列。 classes: [数]形状のNumpy配列。 注:クラスのインデックスは「1」からはじまり(0ではなく) ラベルマップのキーの数と一致する。 scores: [数]形状のNumpy配列もしくはNone。 scores=Noneの場合, then この関数はプロットするボックスをグランドトゥルース(完全な正解)のボックスとして扱い、全てのボックスをクラスとスコアなしの 黒のボックスとしてプロットする。 category_index: カテゴリインデックスでキー付されたカテゴリ・ディクショナリ (それぞれがカテゴリ・インデックス:idとカテゴリ名:nameを持つ)を含む辞書。 figsize: 表示するサイズ。 image_name: 画像ファイルの名前。 """ image_np_with_annotations = image_np.copy() viz_utils.visualize_boxes_and_labels_on_image_array( image_np_with_annotations, boxes, classes, scores, category_index, use_normalized_coordinates=True, min_score_thresh=0.8) if image_name: plt.imsave(image_name, image_np_with_annotations) else: plt.imshow(image_np_with_annotations)6.画像とラベルマップ、アノテーション・データを用意
必要なものは以下。
1、画像パスの配列
2、ラベルマップ(どのIDにどのラベル名が対応するかの辞書)
3、ラベルIDの配列
4、バウンディング・ボックスの配列<例>
# 画像パスの配列 train_image_filenames = [ './datasets/train_images/train_image0001.jpg', './datasets/train_images/train_image0002.jpg' ] # ラベルマップ idは1から category_index = { 1: {'id': 1, 'name': 'cat'}, 2: {'id': 2, 'name': 'dog'} } # クラス数 num_classes = 2 # ラベルIDの配列 gt_labels = [ np.array([1,1]), np.array([1,2,2]) ] # バウンディング・ボックス[ miny, minx, maxy, maxx]のnumpy配列 gt_boxes = [ np.array([[0.436, 0.591, 0.629, 0.712],[0.539, 0.583, 0.73, 0.71]], dtype=np.float32), np.array([[0.464, 0.414, 0.626, 0.548],[0.313, 0.308, 0.648, 0.526],[0.256, 0.444, 0.484, 0.629]], dtype=np.float32) ]<要件>
画像はモデルの入力サイズにリサイズしておく必要があります。
画像・ラベル・ボックスの、配列内インデックスが一致している必要があります。<参考記事>
7、画像を numpy array に
train_image_dir = 'models/research/object_detection/test_images/ducky/train/' # 1、画像ディレクトリのパス train_images_np = [] for filename in train_image_filenames: train_images_np.append(load_image_into_numpy_array(filename)) # 画像を読み込んで表示してみる plt.imshow(train_image_np[0]) plt.show()8.クラスラベルをワンホットTensorに、画像とBoxデータをTensorに
ワンホットとは番号を0と1の配列で表したもの。
例えば、2クラス内の1は[1,0] 2は[0,1] と該当順番のみ1になっている。# クラスラベルをワンホットに変換; 全てをTensorに変換。 # ここで `label_id_offset`は、すべてのクラスを特定の数のインデックスだけシフト # バックグラウンド以外はモデルがワンホットラベルを受け取るように、ここでこれを行う # クラスは0から数え始める。 label_id_offset = 1 train_image_tensors = [] gt_classes_one_hot_tensors = [] gt_box_tensors = [] for (train_image_np, gt_box_np, gt_label_np) in zip( train_images_np, gt_boxes, gt_labels): train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor( train_image_np, dtype=tf.float32), axis=0)) # Numpy画像をTensorに gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32)) # Numpy boxをTensorに zero_indexed_groundtruth_classes = tf.convert_to_tensor( gt_label_np - label_id_offset) # Numpy labelを最小値0の配列にして、Tensorに gt_classes_one_hot_tensors.append(tf.one_hot( zero_indexed_groundtruth_classes, num_classes)) # label Tensorをワンホットに print('データの準備が終わりました')たとえば、ある一枚の画像に対する
[1,1,2]
のラベル配列が
array([ [1., 0.],[1., 0.],[0., 1.] ], dtype=float32)>
のワンホット配列になります9.アノテーションした正解ボックスを視覚化してチェック
dummy_scores = np.array([1.0], dtype=np.float32) # 100%のボックススコアを仮で入れる plt.figure(figsize=(30, 15)) for idx in range(5): plt.subplot(2, 3, idx+1) plot_detections( train_images_np[idx], gt_boxes[idx], gt_labels[idx], dummy_scores, category_index) plt.show()10.モデルをビルドして重みをリストアする
最後レイヤー以外の重みをリストアします。最後のレイヤーのみトレーニング用にランダムな重みで初期化されます。
ここではResnetバックボーンのRetinanetを使っています。
Object Detection API にはさまざまなモデルがあります。# モデルをダウンロード !wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz !tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz !mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/モデルごとにパイプライン定義辞書ファイルがあり、クラス数などパラメーターが書き込まれています。
パイプライン定義辞書ファイルは、Object Detection リポジトリの configs フォルダ、もしくはダウンロードしたモデルのディレクトリにあります。
定義ファイルのクラス数を、自前のデータのクラス数に書き換えます。チェックポイントからリストアする層をHeadで指定しています。今回は、クラス分類用の部分の重みはリストアしないので、ボックス分類用の部分の重みのみ指定しています。
tf.keras.backend.clear_session() print('簡易トレーニングのためにモデルをビルドして重みをリストアしています...', flush=True) pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config' checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0' # パイプライン定義を読み込んで物体検出モデルをビルド。 # デフォルトでは90クラスを検出するCOCOアーキテクチュアで作業しているので、 # パイプライン定義のクラス数をデータのクラス数に上書きする。 configs = config_util.get_configs_from_pipeline_file(pipeline_config) model_config = configs['model'] model_config.ssd.num_classes = num_classes model_config.ssd.freeze_batchnorm = True detection_model = model_builder.build( model_config=model_config, is_training=True) # 物体分類・検出の重みをリストアする --- RetinaNetは2つの推論チェックポイントHeadをもっている # --- 一つはクラス分類用, もう一つはボックス検出用. We will # ボックス検出用のチェックポイントHeadから重みをリストアするが、クラス分類用の重みはスクラッチで初期化する 両方のヘッドからリストアしたい場合は追加するラインをコメントアウトで以下に示している fake_box_predictor = tf.compat.v2.train.Checkpoint( _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads, # _prediction_heads=detection_model._box_predictor._prediction_heads, # (今回はリストアしないクラス分類用のHead) _box_prediction_head=detection_model._box_predictor._box_prediction_head, ) fake_model = tf.compat.v2.train.Checkpoint( _feature_extractor=detection_model._feature_extractor, _box_predictor=fake_box_predictor) ckpt = tf.compat.v2.train.Checkpoint(model=fake_model) ckpt.restore(checkpoint_path).expect_partial() # 部分的にリストア # モデルの重み値が作られるように、ダミーインプット(0配列)で実行 image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3])) prediction_dict = detection_model.predict(image, shapes) _ = detection_model.postprocess(prediction_dict, shapes) print('重みをリストアしました!')11.トレーニング
トレーニング時間は数分。
tf.keras.backend.set_learning_phase(True) # これらのパラメーターは調整できる; サンプルでは5枚の画像でトレーニングするため # 大きなバッチサイズは機能しない, 必要に応じてもっと大きいバッチでトレーニング画像を # メモリにフィットできる. batch_size = 4 learning_rate = 0.01 num_batches = 100 # 今回簡易トレーニングするトップ層の値を選択する. trainable_variables = detection_model.trainable_variables to_fine_tune = [] prefixes_to_train = [ 'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead', 'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead'] for var in trainable_variables: if any([var.name.startswith(prefix) for prefix in prefixes_to_train]): to_fine_tune.append(var) # 一回のトレーニングステップのフォワード・バックワード処理を設定 def get_model_train_step_function(model, optimizer, vars_to_fine_tune): """Get a tf.function for training step.""" # 速度をあげるために tf.function を使用. # 内部の具体的な値が欲しい場合(Eager Execution)は @tf.functionデコレーターをコメントアウト @tf.function def train_step_fn(image_tensors, groundtruth_boxes_list, groundtruth_classes_list): """一回分のトレーニングの計算. 引数: image_tensors: [1, 高さ, 幅, 3]のtf.float32タイプのTensorのリスト. 注。画像によってサイズは異なり、この関数内で640x640にリシェイプされる. groundtruth_boxes_list: バッチ内の各画像の正解ボックスを表す [N_i, 4]形状のtf.float32 タイプのTensorのリスト groundtruth_classes_list: バッチ内の各画像の正解ボックスを表す [N_i, num_classes]形状のtf.float32 タイプのTensorのリスト 戻り値: 入力バッチのトータルロスを表すスカラー(単一値)のTensor """ shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32) model.provide_groundtruth( groundtruth_boxes_list=groundtruth_boxes_list, groundtruth_classes_list=groundtruth_classes_list) with tf.GradientTape() as tape: preprocessed_images = tf.concat( [detection_model.preprocess(image_tensor)[0] for image_tensor in image_tensors], axis=0) prediction_dict = model.predict(preprocessed_images, shapes) losses_dict = model.loss(prediction_dict, shapes) total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss'] gradients = tape.gradient(total_loss, vars_to_fine_tune) optimizer.apply_gradients(zip(gradients, vars_to_fine_tune)) return total_loss return train_step_fn optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9) train_step_fn = get_model_train_step_function( detection_model, optimizer, to_fine_tune) print('簡易トレーニングをスタート!', flush=True) for idx in range(num_batches): # データセットのランダムなサブセットのキーを取得 all_keys = list(range(len(train_images_np))) random.shuffle(all_keys) example_keys = all_keys[:batch_size] # 注 このサンプルデモではデータ拡張を行わないが、楽しい練習のためにランダム横フリップと # ランダムクロップをお勧めする gt_boxes_list = [gt_box_tensors[key] for key in example_keys] gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys] image_tensors = [train_image_tensors[key] for key in example_keys] # トレーニングステップ (フォワードパス + バックワードパス) total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list) if idx % 10 == 0: print('batch ' + str(idx) + ' of ' + str(num_batches) + ', loss=' + str(total_loss.numpy()), flush=True) print('簡易トレーニングが終わりました!')batch 0 of 100, loss=1.2068503
batch 10 of 100, loss=0.12002414
batch 20 of 100, loss=0.10228661
batch 30 of 100, loss=0.0361837
batch 40 of 100, loss=0.011348422
batch 50 of 100, loss=0.0028579112
batch 60 of 100, loss=0.0032960502
batch 70 of 100, loss=0.002372135912.未知の画像でテスト!
テスト画像を numpy array にしてモデルの推論にかけます。結果は100個のバウンディングボックス、
100個のラベル、100個のスコアで返ってきます。
バウンディングボックスのi番目は、ラベルのi番目、スコアのi番目に対応しています。
どんな場合もこの100個という数は変わりません。この中で、スコアの高いものを視覚化したり、推論結果のボックスとして用います。
視覚化する場合のデフォルトのスコアの閾値は0.8です。(plot_detection関数で指定される)
手元で試した時も、大体の場合、例えばぼくが目で画像をみて推論対象のオブジェクトが2つ見える場合、モデルが出力した100個のスコアのうち0.5を超えるものは2つです。他のスコアは0.02など極端に低いです。なので、100個の結果のうち信頼できるボックスやラベルを見つけるのはそれほど難しくありません(トレーニングがうまくいっていれば)。pip install natsort #テスト画像の順番を保って推論するために、名前でソートするライブラリをインストールしています。from natsort import natsorted print(sorted_file_names) test_image_dir = './dataset/test' test_images_np = [] file_names = os.listdir(test_image_dir) test_paths = natsorted(file_names) for test_path in test_paths: test_images_np.append(np.expand_dims( load_image_into_numpy_array(test_path), axis=0)) # Again, uncomment this decorator if you want to run inference eagerly @tf.function def detect(input_tensor): """Run detection on an input image. Args: input_tensor: A [1, height, width, 3] Tensor of type tf.float32. Note that height and width can be anything since the image will be immediately resized according to the needs of the model within this function. Returns: A dict containing 3 Tensors (`detection_boxes`, `detection_classes`, and `detection_scores`). """ preprocessed_image, shapes = detection_model.preprocess(input_tensor) prediction_dict = detection_model.predict(preprocessed_image, shapes) return detection_model.postprocess(prediction_dict, shapes) # Note that the first frame will trigger tracing of the tf.function, which will # take some time, after which inference should be fast. label_id_offset = 1 for i in range(len(test_images_np)): input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=tf.float32) detections = detect(input_tensor) # このdetectionsで結果が取れます。 plot_detections( test_images_np[i][0], detections['detection_boxes'][0].numpy(), detections['detection_classes'][0].numpy().astype(np.uint32) + label_id_offset, detections['detection_scores'][0].numpy(), category_index, figsize=(15, 20), image_name="gif_frame_" + ('%02d' % i) + ".jpg") # 指定したパスにスコア0.8を超えるボックスを描画した画像が保存されます。print(detections) # 以下出力結果。省略してあるがそれぞれ100個ある # 'detection_boxes' 'detection_classes' 'detection_scores'が最終結果 # 'detection_anchor_indices''raw_detection_boxes''raw_detection_scores'は最終結果を計算する際使った途中データ(だと思う。たぶん) {'detection_anchor_indices': <tf.Tensor: shape=(1, 100), dtype=int32, numpy= array([[49416, 50753, ... 51112, 26364]], dtype=int32)>, 'detection_boxes': <tf.Tensor: shape=(1, 100, 4), dtype=float32, numpy= array([[[0.43758985, 0.7465773 , 0.63472795, 0.9252911 ], [0.1677289 , 0.6480559 , 0.890319 , 1. ], ... [0.40918362, 0.3183376 , 1. , 0.9439225 ], [0.639281 , 0.8898159 , 0.7221419 , 0.97141266]]], dtype=float32)>, 'detection_classes': <tf.Tensor: shape=(1, 100), dtype=float32, numpy= array([[0., 0., ... 1., 0.]], dtype=float32)>, 'detection_multiclass_scores': <tf.Tensor: shape=(1, 100, 3), dtype=float32, numpy= array([[[5.47093153e-03, 3.10172260e-01, 1.57460570e-03], [3.18378210e-03, 2.98067868e-01, 1.27398968e-03], ... [1.98462605e-03, 7.14010894e-02, 1.30185485e-03]]], dtype=float32)>, 'detection_scores': <tf.Tensor: shape=(1, 100), dtype=float32, numpy= array([[0.31017226, 0.29806787, 0.26563442, 0.23411435, 0.22276634, 0.21396422, 0.20716852, 0.18401867, 0.17277354, 0.16559672, ... 0.14484483, 0.14467192, 0.13986477, 0.13589099, 0.13474342, 0.07329145, 0.0723871 , 0.07223672, 0.07157233, 0.07140109]], dtype=float32)>, 'num_detections': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([100.], dtype=float32)>, 'raw_detection_boxes': <tf.Tensor: shape=(1, 51150, 4), dtype=float32, numpy= array([[[-3.6555314e-03, -1.2414398e-02, 1.4784184e-02, 1.0699857e-02], [-9.5088510e-03, -2.2957223e-02, 3.9035182e-02, 1.7941574e-02], ..., [ 3.1216300e-01, 6.6491508e-01, 1.3707981e+00, 1.0911807e+00], [ 6.6202581e-02, 4.6959493e-01, 1.5031044e+00, 1.2707567e+00]]], dtype=float32)>, 'raw_detection_scores': <tf.Tensor: shape=(1, 51150, 3), dtype=float32, numpy= array([[[9.3629062e-03, 7.2856843e-03, 4.1753352e-03], [4.8707724e-03, 1.5826846e-06, 3.3203959e-03], ..., [7.2056055e-03, 1.9515157e-02, 1.4944762e-02], [8.9454055e-03, 1.9429326e-03, 1.5336275e-03]]], dtype=float32)>}12’.結果をGifで表示
imageio.plugins.freeimage.download() anim_file = 'test.gif' filenames = glob.glob('gif_frame_*.jpg') filenames = sorted(filenames) last = -1 images = [] for filename in filenames: image = imageio.imread(filename) images.append(image) imageio.mimsave(anim_file, images, 'GIF-FI', fps=5) display(IPyImage(open(anim_file, 'rb').read()))13.モデルの保存
import os ckpt_path = 'ckpt/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8' os.makedirs(ckpt_path, exist_ok=True) checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=detection_model) manager = tf.train.CheckpointManager(checkpoint, directory=ckpt_path, max_to_keep=5) manager.save()14.モデルの復元
trained_model = model_builder.build(model_config=model_config, is_training=False) ckpt_trained = tf.compat.v2.train.Checkpoint(model=ssd_model) # ダミー入力で実行して重み値を生成 image, shapes = trained_model.preprocess(tf.zeros([1, 640, 640, 3])) prediction_dict = trained_model.predict(image, shapes) _ = trained_model.postprocess(prediction_dict, shapes) ckpt_trained.restore('ckpt/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/ckpt-1') print('重みをリストアしました!')15.復元したモデルの実行
12、のテストの detect_model を trained_modelに書き換えて実行します。
?
フリーランスエンジニアです。
お仕事のご相談こちらまで
rockyshikoku@gmail.comCore MLを使ったアプリを作っています。
機械学習関連の情報を発信しています。
- 投稿日:2021-01-01T09:29:47+09:00
TensorFlow Object Detection APIで物体検出モデルを簡易学習
手軽に物体検出モデルをトレーニング
Colabサンプル
事前トレーニング済みモデルのファインチューニングで、
新たなオブジェクトを検出できるようになります。
トレーニング後はモデルの保存、復元もできます。
事前トレーニングしたモデルの最後の層の転移学習です。手順(Colabリンクでも実行できます)
0.TensorFlow2をインストール
!pip install -U --pre tensorflow=="2.2.0"1.リポジトリのクローン
import os import pathlib if "models" in pathlib.Path.cwd().parts: while "models" in pathlib.Path.cwd().parts: os.chdir('..') elif not pathlib.Path('models').exists(): git clone --depth 1 https://github.com/tensorflow/models2.Object Detection APIをインストール
%%bash cd models/research/ protoc object_detection/protos/*.proto --python_out=. cp object_detection/packages/tf2/setup.py . python -m pip install .3.モジュールのインポート
import matplotlib import matplotlib.pyplot as plt import os import random import io import imageio import glob import scipy.misc import numpy as np from six import BytesIO from PIL import Image, ImageDraw, ImageFont from IPython.display import display, Javascript from IPython.display import Image as IPyImage import tensorflow as tf from object_detection.utils import label_map_util from object_detection.utils import config_util from object_detection.utils import visualization_utils as viz_utils from object_detection.utils import colab_utils from object_detection.builders import model_builder %matplotlib inline4.画像読み込み関数
画像を Numpy array にします。
def load_image_into_numpy_array(path): """画像ファイルをNumpy配列にする. TensorFlowのグラフに食わせるために画像をNumpy配列に。 慣例として(高さ、幅、カラーチャネル)形状のNumpy配列にする。 引数: path: 画像ファイルのパス. 戻り値: uint8、(高さ, 幅, 3チャネル)形状のnumpy配列。 """ img_data = tf.io.gfile.GFile(path, 'rb').read() image = Image.open(BytesIO(img_data)) (im_width, im_height) = image.size return np.array(image.getdata()).reshape( (im_height, im_width, 3)).astype(np.uint8)5.結果を視覚化する関数
def plot_detections(image_np, boxes, classes, scores, category_index, figsize=(12, 16), image_name=None): """検出結果を視覚化するAPI機能のラッパー関数. 引数: image_np: uint8、(高さ, 幅, 3チャネル)形状のnumpy配列。 boxes: [数, 4]形状のnumpy配列。 classes: [数]形状のNumpy配列。 注:クラスのインデックスは「1」からはじまり(0ではなく) ラベルマップのキーの数と一致する。 scores: [数]形状のNumpy配列もしくはNone。 scores=Noneの場合, then この関数はプロットするボックスをグランドトゥルース(完全な正解)のボックスとして扱い、全てのボックスをクラスとスコアなしの 黒のボックスとしてプロットする。 category_index: カテゴリインデックスでキー付されたカテゴリ・ディクショナリ (それぞれがカテゴリ・インデックス:idとカテゴリ名:nameを持つ)を含む辞書。 figsize: 表示するサイズ。 image_name: 画像ファイルの名前。 """ image_np_with_annotations = image_np.copy() viz_utils.visualize_boxes_and_labels_on_image_array( image_np_with_annotations, boxes, classes, scores, category_index, use_normalized_coordinates=True, min_score_thresh=0.8) if image_name: plt.imsave(image_name, image_np_with_annotations) else: plt.imshow(image_np_with_annotations)6.画像とラベルマップ、アノテーション・データを用意
必要なものは以下。
1、画像パスの配列
2、ラベルマップ(どのIDにどのラベル名が対応するかの辞書)
3、ラベルIDの配列
4、バウンディング・ボックスの配列<例>
# 画像パスの配列 train_image_filenames = [ './datasets/train_images/train_image0001.jpg', './datasets/train_images/train_image0002.jpg' ] # ラベルマップ idは1から category_index = { 1: {'id': 1, 'name': 'cat'}, 2: {'id': 2, 'name': 'dog'} } # クラス数 num_classes = 2 # ラベルIDの配列 gt_labels = [ np.array([1,1]), np.array([1,2,2]) ] # バウンディング・ボックス[ miny, minx, maxy, maxx]のnumpy配列 gt_boxes = [ np.array([[0.436, 0.591, 0.629, 0.712],[0.539, 0.583, 0.73, 0.71]], dtype=np.float32), np.array([[0.464, 0.414, 0.626, 0.548],[0.313, 0.308, 0.648, 0.526],[0.256, 0.444, 0.484, 0.629]], dtype=np.float32) ]<要件>
画像はモデルの入力サイズにリサイズしておく必要があります。
画像・ラベル・ボックスの、配列内インデックスが一致している必要があります。<参考記事>
7、画像を numpy array に
train_image_dir = 'models/research/object_detection/test_images/ducky/train/' # 1、画像ディレクトリのパス train_images_np = [] for filename in train_image_filenames: train_images_np.append(load_image_into_numpy_array(filename)) # 画像を読み込んで表示してみる plt.imshow(train_image_np[0]) plt.show()8.クラスラベルをワンホットTensorに、画像とBoxデータをTensorに
ワンホットとは番号を0と1の配列で表したもの。
例えば、2クラス内の1は[1,0] 2は[0,1] と該当順番のみ1になっている。# クラスラベルをワンホットに変換; 全てをTensorに変換。 # ここで `label_id_offset`は、すべてのクラスを特定の数のインデックスだけシフト # バックグラウンド以外はモデルがワンホットラベルを受け取るように、ここでこれを行う # クラスは0から数え始める。 label_id_offset = 1 train_image_tensors = [] gt_classes_one_hot_tensors = [] gt_box_tensors = [] for (train_image_np, gt_box_np, gt_label_np) in zip( train_images_np, gt_boxes, gt_labels): train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor( train_image_np, dtype=tf.float32), axis=0)) # Numpy画像をTensorに gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32)) # Numpy boxをTensorに zero_indexed_groundtruth_classes = tf.convert_to_tensor( gt_label_np - label_id_offset) # Numpy labelを最小値0の配列にして、Tensorに gt_classes_one_hot_tensors.append(tf.one_hot( zero_indexed_groundtruth_classes, num_classes)) # label Tensorをワンホットに print('データの準備が終わりました')たとえば、ある一枚の画像に対する
[1,1,2]
のラベル配列が
array([ [1., 0.],[1., 0.],[0., 1.] ], dtype=float32)>
のワンホット配列になります9.アノテーションした正解ボックスを視覚化してチェック
dummy_scores = np.array([1.0], dtype=np.float32) # 100%のボックススコアを仮で入れる plt.figure(figsize=(30, 15)) for idx in range(5): plt.subplot(2, 3, idx+1) plot_detections( train_images_np[idx], gt_boxes[idx], gt_labels[idx], dummy_scores, category_index) plt.show()10.モデルをビルドして重みをリストアする
最後レイヤー以外の重みをリストアします。最後のレイヤーのみトレーニング用にランダムな重みで初期化されます。
ここではResnetバックボーンのRetinanetを使っています。
Object Detection API にはさまざまなモデルがあります。# モデルをダウンロード !wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz !tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz !mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/モデルごとにパイプライン定義辞書ファイルがあり、クラス数などパラメーターが書き込まれています。
パイプライン定義辞書ファイルは、Object Detection リポジトリの configs フォルダ、もしくはダウンロードしたモデルのディレクトリにあります。
定義ファイルのクラス数を、自前のデータのクラス数に書き換えます。チェックポイントからリストアする層をHeadで指定しています。今回は、クラス分類用の部分の重みはリストアしないので、ボックス分類用の部分の重みのみ指定しています。
tf.keras.backend.clear_session() print('簡易トレーニングのためにモデルをビルドして重みをリストアしています...', flush=True) pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config' checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0' # パイプライン定義を読み込んで物体検出モデルをビルド。 # デフォルトでは90クラスを検出するCOCOアーキテクチュアで作業しているので、 # パイプライン定義のクラス数をデータのクラス数に上書きする。 configs = config_util.get_configs_from_pipeline_file(pipeline_config) model_config = configs['model'] model_config.ssd.num_classes = num_classes model_config.ssd.freeze_batchnorm = True detection_model = model_builder.build( model_config=model_config, is_training=True) # 物体分類・検出の重みをリストアする --- RetinaNetは2つの推論チェックポイントHeadをもっている # --- 一つはクラス分類用, もう一つはボックス検出用. We will # ボックス検出用のチェックポイントHeadから重みをリストアするが、クラス分類用の重みはスクラッチで初期化する 両方のヘッドからリストアしたい場合は追加するラインをコメントアウトで以下に示している fake_box_predictor = tf.compat.v2.train.Checkpoint( _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads, # _prediction_heads=detection_model._box_predictor._prediction_heads, # (今回はリストアしないクラス分類用のHead) _box_prediction_head=detection_model._box_predictor._box_prediction_head, ) fake_model = tf.compat.v2.train.Checkpoint( _feature_extractor=detection_model._feature_extractor, _box_predictor=fake_box_predictor) ckpt = tf.compat.v2.train.Checkpoint(model=fake_model) ckpt.restore(checkpoint_path).expect_partial() # 部分的にリストア # モデルの重み値が作られるように、ダミーインプット(0配列)で実行 image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3])) prediction_dict = detection_model.predict(image, shapes) _ = detection_model.postprocess(prediction_dict, shapes) print('重みをリストアしました!')11.トレーニング
トレーニング時間は数分。
tf.keras.backend.set_learning_phase(True) # これらのパラメーターは調整できる; サンプルでは5枚の画像でトレーニングするため # 大きなバッチサイズは機能しない, 必要に応じてもっと大きいバッチでトレーニング画像を # メモリにフィットできる. batch_size = 4 learning_rate = 0.01 num_batches = 100 # 今回簡易トレーニングするトップ層の値を選択する. trainable_variables = detection_model.trainable_variables to_fine_tune = [] prefixes_to_train = [ 'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead', 'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead'] for var in trainable_variables: if any([var.name.startswith(prefix) for prefix in prefixes_to_train]): to_fine_tune.append(var) # 一回のトレーニングステップのフォワード・バックワード処理を設定 def get_model_train_step_function(model, optimizer, vars_to_fine_tune): """Get a tf.function for training step.""" # 速度をあげるために tf.function を使用. # 内部の具体的な値が欲しい場合(Eager Execution)は @tf.functionデコレーターをコメントアウト @tf.function def train_step_fn(image_tensors, groundtruth_boxes_list, groundtruth_classes_list): """一回分のトレーニングの計算. 引数: image_tensors: [1, 高さ, 幅, 3]のtf.float32タイプのTensorのリスト. 注。画像によってサイズは異なり、この関数内で640x640にリシェイプされる. groundtruth_boxes_list: バッチ内の各画像の正解ボックスを表す [N_i, 4]形状のtf.float32 タイプのTensorのリスト groundtruth_classes_list: バッチ内の各画像の正解ボックスを表す [N_i, num_classes]形状のtf.float32 タイプのTensorのリスト 戻り値: 入力バッチのトータルロスを表すスカラー(単一値)のTensor """ shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32) model.provide_groundtruth( groundtruth_boxes_list=groundtruth_boxes_list, groundtruth_classes_list=groundtruth_classes_list) with tf.GradientTape() as tape: preprocessed_images = tf.concat( [detection_model.preprocess(image_tensor)[0] for image_tensor in image_tensors], axis=0) prediction_dict = model.predict(preprocessed_images, shapes) losses_dict = model.loss(prediction_dict, shapes) total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss'] gradients = tape.gradient(total_loss, vars_to_fine_tune) optimizer.apply_gradients(zip(gradients, vars_to_fine_tune)) return total_loss return train_step_fn optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9) train_step_fn = get_model_train_step_function( detection_model, optimizer, to_fine_tune) print('簡易トレーニングをスタート!', flush=True) for idx in range(num_batches): # データセットのランダムなサブセットのキーを取得 all_keys = list(range(len(train_images_np))) random.shuffle(all_keys) example_keys = all_keys[:batch_size] # 注 このサンプルデモではデータ拡張を行わないが、楽しい練習のためにランダム横フリップと # ランダムクロップをお勧めする gt_boxes_list = [gt_box_tensors[key] for key in example_keys] gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys] image_tensors = [train_image_tensors[key] for key in example_keys] # トレーニングステップ (フォワードパス + バックワードパス) total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list) if idx % 10 == 0: print('batch ' + str(idx) + ' of ' + str(num_batches) + ', loss=' + str(total_loss.numpy()), flush=True) print('簡易トレーニングが終わりました!')batch 0 of 100, loss=1.2068503
batch 10 of 100, loss=0.12002414
batch 20 of 100, loss=0.10228661
batch 30 of 100, loss=0.0361837
batch 40 of 100, loss=0.011348422
batch 50 of 100, loss=0.0028579112
batch 60 of 100, loss=0.0032960502
batch 70 of 100, loss=0.002372135912.未知の画像でテスト!
テスト画像を numpy array にしてモデルの推論にかけます。結果は100個のバウンディングボックス、
100個のラベル、100個のスコアで返ってきます。
バウンディングボックスのi番目は、ラベルのi番目、スコアのi番目に対応しています。
どんな場合もこの100個という数は変わりません。この中で、スコアの高いものを視覚化したり、推論結果のボックスとして用います。
視覚化する場合のデフォルトのスコアの閾値は0.8です。(plot_detection関数で指定される)
手元で試した時も、大体の場合、例えばぼくが目で画像をみて推論対象のオブジェクトが2つ見える場合、モデルが出力した100個のスコアのうち0.5を超えるものは2つです。他のスコアは0.02など極端に低いです。なので、100個の結果のうち信頼できるボックスやラベルを見つけるのはそれほど難しくありません(トレーニングがうまくいっていれば)。pip install natsort #テスト画像の順番を保って推論するために、名前でソートするライブラリをインストールしています。from natsort import natsorted print(sorted_file_names) test_image_dir = './dataset/test' test_images_np = [] file_names = os.listdir(test_image_dir) test_paths = natsorted(file_names) for test_path in test_paths: test_images_np.append(np.expand_dims( load_image_into_numpy_array(test_path), axis=0)) # Again, uncomment this decorator if you want to run inference eagerly @tf.function def detect(input_tensor): """Run detection on an input image. Args: input_tensor: A [1, height, width, 3] Tensor of type tf.float32. Note that height and width can be anything since the image will be immediately resized according to the needs of the model within this function. Returns: A dict containing 3 Tensors (`detection_boxes`, `detection_classes`, and `detection_scores`). """ preprocessed_image, shapes = detection_model.preprocess(input_tensor) prediction_dict = detection_model.predict(preprocessed_image, shapes) return detection_model.postprocess(prediction_dict, shapes) # Note that the first frame will trigger tracing of the tf.function, which will # take some time, after which inference should be fast. label_id_offset = 1 for i in range(len(test_images_np)): input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=tf.float32) detections = detect(input_tensor) # このdetectionsで結果が取れます。 plot_detections( test_images_np[i][0], detections['detection_boxes'][0].numpy(), detections['detection_classes'][0].numpy().astype(np.uint32) + label_id_offset, detections['detection_scores'][0].numpy(), category_index, figsize=(15, 20), image_name="gif_frame_" + ('%02d' % i) + ".jpg") # 指定したパスにスコア0.8を超えるボックスを描画した画像が保存されます。print(detections) # 以下出力結果。省略してあるがそれぞれ100個ある # 'detection_boxes' 'detection_classes' 'detection_scores'が最終結果 # 'detection_anchor_indices''raw_detection_boxes''raw_detection_scores'は最終結果を計算する際使った途中データ(だと思う。たぶん) {'detection_anchor_indices': <tf.Tensor: shape=(1, 100), dtype=int32, numpy= array([[49416, 50753, ... 51112, 26364]], dtype=int32)>, 'detection_boxes': <tf.Tensor: shape=(1, 100, 4), dtype=float32, numpy= array([[[0.43758985, 0.7465773 , 0.63472795, 0.9252911 ], [0.1677289 , 0.6480559 , 0.890319 , 1. ], ... [0.40918362, 0.3183376 , 1. , 0.9439225 ], [0.639281 , 0.8898159 , 0.7221419 , 0.97141266]]], dtype=float32)>, 'detection_classes': <tf.Tensor: shape=(1, 100), dtype=float32, numpy= array([[0., 0., ... 1., 0.]], dtype=float32)>, 'detection_multiclass_scores': <tf.Tensor: shape=(1, 100, 3), dtype=float32, numpy= array([[[5.47093153e-03, 3.10172260e-01, 1.57460570e-03], [3.18378210e-03, 2.98067868e-01, 1.27398968e-03], ... [1.98462605e-03, 7.14010894e-02, 1.30185485e-03]]], dtype=float32)>, 'detection_scores': <tf.Tensor: shape=(1, 100), dtype=float32, numpy= array([[0.31017226, 0.29806787, 0.26563442, 0.23411435, 0.22276634, 0.21396422, 0.20716852, 0.18401867, 0.17277354, 0.16559672, ... 0.14484483, 0.14467192, 0.13986477, 0.13589099, 0.13474342, 0.07329145, 0.0723871 , 0.07223672, 0.07157233, 0.07140109]], dtype=float32)>, 'num_detections': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([100.], dtype=float32)>, 'raw_detection_boxes': <tf.Tensor: shape=(1, 51150, 4), dtype=float32, numpy= array([[[-3.6555314e-03, -1.2414398e-02, 1.4784184e-02, 1.0699857e-02], [-9.5088510e-03, -2.2957223e-02, 3.9035182e-02, 1.7941574e-02], ..., [ 3.1216300e-01, 6.6491508e-01, 1.3707981e+00, 1.0911807e+00], [ 6.6202581e-02, 4.6959493e-01, 1.5031044e+00, 1.2707567e+00]]], dtype=float32)>, 'raw_detection_scores': <tf.Tensor: shape=(1, 51150, 3), dtype=float32, numpy= array([[[9.3629062e-03, 7.2856843e-03, 4.1753352e-03], [4.8707724e-03, 1.5826846e-06, 3.3203959e-03], ..., [7.2056055e-03, 1.9515157e-02, 1.4944762e-02], [8.9454055e-03, 1.9429326e-03, 1.5336275e-03]]], dtype=float32)>}12’.結果をGifで表示
imageio.plugins.freeimage.download() anim_file = 'test.gif' filenames = glob.glob('gif_frame_*.jpg') filenames = sorted(filenames) last = -1 images = [] for filename in filenames: image = imageio.imread(filename) images.append(image) imageio.mimsave(anim_file, images, 'GIF-FI', fps=5) display(IPyImage(open(anim_file, 'rb').read()))13.モデルの保存
import os ckpt_path = 'ckpt/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8' os.makedirs(ckpt_path, exist_ok=True) checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=detection_model) manager = tf.train.CheckpointManager(checkpoint, directory=ckpt_path, max_to_keep=5) manager.save()14.モデルの復元
trained_model = model_builder.build(model_config=model_config, is_training=False) ckpt_trained = tf.compat.v2.train.Checkpoint(model=ssd_model) # ダミー入力で実行して重み値を生成 image, shapes = trained_model.preprocess(tf.zeros([1, 640, 640, 3])) prediction_dict = trained_model.predict(image, shapes) _ = trained_model.postprocess(prediction_dict, shapes) ckpt_trained.restore('ckpt/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/ckpt-1') print('重みをリストアしました!')15.復元したモデルの実行
12、のテストの detect_model を trained_modelに書き換えて実行します。
?
フリーランスエンジニアです。
お仕事のご相談こちらまで
rockyshikoku@gmail.comCore MLを使ったアプリを作っています。
機械学習関連の情報を発信しています。