- 投稿日:2020-02-25T09:58:24+09:00
BigQuery ML で使える TensorFlow モデルを作る
はじめに
BigQuery ML は インポートした TensorFlow モデルでの予測 ができます。
BigQuery ML を使って TensorFlow モデルを管理すると、データの転送を省略したり、モデルを BigQuery と Cloud Storage に一任できます。
この記事では、BigQuery ML にインポート可能な TensorFlow モデルを作っていきます。なお、この記事は @K_Urushi のご協力で作成しています。
モデルの作り方
TensorFlow モデルをインポートする CREATE MODEL ステートメント にあるように、BigQuery ML で使える TensorFlow モデルは SavedModel として保存されている必要があります。
SavedModel を作る
シンプルな SavedModel を作る
tf.saved_model.save の例から始めましょう。
例では、tf.TensorSpec はshape=None
と定義されていますが、BigQuery ML から使う場合は必須のようですので、
shape=1
とします。関数を変える場合には、適切に tf.TensorSpec を書き換えてください。import tensorflow as tf class Adder(tf.Module): @tf.function(input_signature=[tf.TensorSpec(shape=1, dtype=tf.float32)]) def add(self, x): return x + x + 1. to_export = Adder() tf.saved_model.save(to_export, '/tmp/adder') # 認証しておけば Google Storage に直接転送できます # tf.saved_model.save(to_export, 'gs://tmp/adder')作った SavedModel を BigQuery にインポートする
TensorFlow モデルのインポート を参考にモデルをインポートします。
Cloud Storage にある SavedModel を参照できるので、予め転送しておきましょう。
クエリ 1 つでインポートできるのでとてもお手軽です。CREATE OR REPLACE MODEL example_dataset.imported_tf_model OPTIONS (MODEL_TYPE='TENSORFLOW', MODEL_PATH='gs://tmp/adder/*')インポートしたモデルを使う
インポートした TensorFlow モデルでの予測 を参考にモデルで予測します。
SELECT * FROM ML.PREDICT(MODEL example_dataset.imported_tf_model, ( SELECT * FROM UNNEST(GENERATE_ARRAY(1,10))x実行結果
行 output_0 x 0 3.0 1 1 5.0 2 2 7.0 3 3 9.0 4 4 11.0 5 5 13.0 6 6 15.0 7 7 17.0 8 8 19.0 9 9 21.0 10 無事に実行できました。
tf.function の特徴である AutoGraph によって、Python コードが TensorFlow グラフに変換可能です。JavaScript UDF の代わりに使えるかもしれません。
サポートされている型
サポートされている入力 にありますが、再掲します。
TensorFlow 型 BigQuery ML type tf.int8, tf.int16, tf.int32, tf.int64, tf.uint8, tf.uint16, tf.uint32, tf.uint64 INT64 tf.float16, tf.float32, tf.float64, tf.bfloat16 FLOAT64 tf.bool BOOL tf.string STRING 2020 年 2 月 12 日現在、対応している入出力型は限定的なため、BigQuery のデータ型とモデル作成時の型の自由度の差異に注意しましょう。
tf.estimator を使って SavedModel を作る
続いて、予測に使用する SavedModel のエクスポート を参考に、機械学習モデルの SavedModel を作っていきます。
ここでは、BigQuery ML のリリースが待たれる Boosted Trees として、Boosted trees using Estimators を見ながら、tf.estimator.BoostedTreesClassifier を作ってみましょう。
データのロード
import numpy as np import pandas as pd import tensorflow as tf dftrain = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv') dfeval = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv') y_train = dftrain.pop('survived') y_eval = dfeval.pop('survived')入力値の作成
fc = tf.feature_column # カテゴリ値に対応すると煩雑になったため省略 # CATEGORICAL_COLUMNS = ['sex', 'n_siblings_spouses', 'parch', 'class', 'deck', # 'embark_town', 'alone'] NUMERIC_COLUMNS = ['age', 'fare'] def one_hot_cat_column(feature_name, vocab): return tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab)) feature_columns = [] for feature_name in CATEGORICAL_COLUMNS: # Need to one-hot encode categorical features. vocabulary = dftrain[feature_name].unique() feature_columns.append(one_hot_cat_column(feature_name, vocabulary)) for feature_name in NUMERIC_COLUMNS: feature_columns.append(tf.feature_column.numeric_column(feature_name, dtype=tf.float32)) NUM_EXAMPLES = len(y_train) def make_input_fn(X, y, n_epochs=None, shuffle=True): def input_fn(): dataset = tf.data.Dataset.from_tensor_slices((dict(X), y)) if shuffle: dataset = dataset.shuffle(NUM_EXAMPLES) # For training, cycle thru dataset as many times as need (n_epochs=None). dataset = dataset.repeat(n_epochs) # In memory training doesn't use batching. dataset = dataset.batch(NUM_EXAMPLES) return dataset return input_fn # Training and evaluation input functions. train_input_fn = make_input_fn(dftrain, y_train) eval_input_fn = make_input_fn(dfeval, y_eval, shuffle=False, n_epochs=1)tf.estimator の作成
est = tf.estimator.BoostedTreesClassifier(feature_columns, n_batches_per_layer=1) est.train(train_input_fn) # Eval. # result = est.evaluate(eval_input_fn) # print(pd.Series(result))SavedModel の作成
json_serving_input_fn を作って、export すると BigQuery ML から理想的な形で呼び出すことができます。
def json_serving_input_fn(): """Build the serving inputs.""" inputs = {} for feat in feature_columns: print(feat) inputs[feat.name] = tf.placeholder(shape=[None], dtype=feat.dtype) return tf.estimator.export.ServingInputReceiver(inputs, inputs) path = est.export_saved_model('gs://tmp/btc', json_serving_input_fn)BigQuery ML にする
CREATE OR REPLACE MODEL example_dataset.imported_tf_model OPTIONS (MODEL_TYPE='TENSORFLOW', MODEL_PATH='gs://tmp/btc/1581656284/*')使ってみる
SELECT * FROM ML.PREDICT(MODEL bigqueryml.gbdt, ( SELECT * FROM UNNEST([STRUCT(10 AS age, 0 AS fare), (80, 30)])))実行結果
行 all_class_ids all_classes class_ids classes logistic logits probabilities age fare 1 0 0 0 0 0.001847356558 -6.292150497 0.9981526732 10 0 1 1 0.001847356558 2 0 0 1 1 0.9975745082 6.019282341 0.00242551649 80 30 1 1 0.9975745082 いい感じに動いてそうです!
厄介なポイント
TensorFlow 1.x は tf.placeholder が使えるので上のコードが動作します。 TensorFlow 2.x は tf.placeholder が使えないため、以下の serving_input_fn で Proto Buffers を経由して頑張る方法しか見つけられていません。そのため予め Proto Buffers に変換したテーブルを用意するのが解になってしまいます。他には tensorflow/tensorflow/core/example の .proto を JavaScript にコンパイルして UDF を作成すると回避できる可能性があります。
serving_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn( tf.feature_column.make_parse_example_spec(feature_columns) )おわりに
BigQuery ML で使える TensorFlow の SavedModel を作って動作確認しました。
BigQuery ML でテンソルグラフ計算や、BigQuery ML では未リリースの BoostedTreesClassifier を実現できました。
BigQuery ML をうまく扱うと、データとモデルが近い位置におけるためぜひ活用していきたいですね。
- 投稿日:2020-02-25T00:32:20+09:00
ColaboratoryでTensorflow2.1.0でTPU使う場合は"%tensorflow_version 2.x"を使う
Colaboratory+TF2.1.0+TPUを使う場合について
pip install -U tensorflow==2.1.0
はうまくいかず
マジックコマンド%tensorflow_version 2.x
を使うようです.
Colabには、tensorflow 2.1.0がプリインストールされています。それを使用するには、マジック%tensorflow_version 2.xを実行します。
(tensorflowをインポートする前に)これを実行すると、2.xを選択するだけでなく、2.xを使用するようにクラウドTPUを構成するための追加作業も行います。
TPUに接続するためのコード
- Tensorflowを2.1.0にupgrade
%tensorflow_version 2.x
- バージョン確認
!pip list | grep '^tensorflow ' import tensorflow as tf print("Tensorflow version(import) : " + tf.__version__)
コンソールログ
tensorflow 2.1.0 Tensorflow version(import) : 2.1.0
- TPU接続
import tensorflow as tf import os tpu_grpc_url = "grpc://" + os.environ["COLAB_TPU_ADDR"] tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_grpc_url) tf.config.experimental_connect_to_cluster(tpu_cluster_resolver) tf.tpu.experimental.initialize_tpu_system(tpu_cluster_resolver) strategy = tf.distribute.experimental.TPUStrategy(tpu_cluster_resolver)
コンソールログ
INFO:tensorflow:Initializing the TPU system: 10.4.102.154:8470 INFO:tensorflow:Initializing the TPU system: 10.4.102.154:8470 INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
[付録]TF2.x系での検証結果
- TF2.0.0はpipで上げた場合でもうまくいきます.
- TF2.1.0の場合注意がいるようです.
pipでTensorflow==2.0.0の場合(TPU接続に成功する)
!pip install -U tensorflow==2.0.0import tensorflow as tf import os tpu_grpc_url = "grpc://" + os.environ["COLAB_TPU_ADDR"] tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_grpc_url) tf.config.experimental_connect_to_cluster(tpu_cluster_resolver) tf.tpu.experimental.initialize_tpu_system(tpu_cluster_resolver) strategy = tf.distribute.experimental.TPUStrategy(tpu_cluster_resolver)INFO:tensorflow:Initializing the TPU system: 10.31.187.66:8470 INFO:tensorflow:Initializing the TPU system: 10.31.187.66:8470 INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)pipでTensorflow==2.1.0の場合(TPU接続に失敗)
!pip install -U tensorflow==2.1.0import tensorflow as tf import os tpu_grpc_url = "grpc://" + os.environ["COLAB_TPU_ADDR"] tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_grpc_url) tf.config.experimental_connect_to_cluster(tpu_cluster_resolver) tf.tpu.experimental.initialize_tpu_system(tpu_cluster_resolver) strategy = tf.distribute.experimental.TPUStrategy(tpu_cluster_resolver)INFO:tensorflow:Initializing the TPU system: 10.31.187.66:8470 INFO:tensorflow:Initializing the TPU system: 10.31.187.66:8470 INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches --------------------------------------------------------------------------- NotFoundError Traceback (most recent call last) <ipython-input-3-a6bfd659f09b> in <module>() 8 tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_grpc_url) 9 tf.config.experimental_connect_to_cluster(tpu_cluster_resolver) ---> 10 tf.tpu.experimental.initialize_tpu_system(tpu_cluster_resolver) 11 strategy = tf.distribute.experimental.TPUStrategy(tpu_cluster_resolver) 3 frames /usr/local/lib/python3.6/dist-packages/six.py in raise_from(value, from_value) NotFoundError: '__inference__tpu_init_fn_4' is neither a type of a primitive operation nor a name of a function registered in binary running on n-cda3d9f6-w-0. Make sure the operation or function is registered in the binary running in this process.