- 投稿日:2019-12-03T21:07:49+09:00
文書解析タスクのTransformerをTensorflow officialから使う (TF2.0インストール付き)
0. Abstract
これまでの文書解析タスクでは、ある意味盲目的に時系列処理としてLSTM、RNNが用いられてきた。
しかし、ここ最近オートエンコーダベースのモデルにAttention機構という技術を組み合わせたモデルが登場してきており、これまでのLSTMやRNNの性能を上回っている。
Attention機構自体は昔から存在しているが、それを文書解析タスクで採用したものはTransformerというモデル、論文では「Attention Is All You Need」というものとなっている。
https://arxiv.org/abs/1706.03762本技術の紹介サイトは様々あるが、王道のTensorflowを動作させるチュートリアルは探せなかったため、使い方を説明していきたい。
今回の参考サイトは以下である。
https://github.com/tensorflow/models/tree/master/official/transformer#model-training-and-evaluation1. Installation
学習出来る環境まで構築していく。
Switch Tensorflow 2.0 with anaconda
TF2.0で使えるモデルらしいのでまずはTF2.0をインストール。
Cuda 10.0、CuDNN 7.4、Anacondaがよい(2019, 12月時点)。
Cuda 10.1の場合はライブラリパスが見つからなくなる。$ conda update -n base conda $ conda create --name tf2-gpu $ conda activate tf2-gpu $ pip install --upgrade pip $ pip uninstall numpy $ pip install numpy $ pip install tensorflow-gpu==2.0.0-rc1 # cuda 10.1用 $ export LD_LIBRARY_PATH="/usr/local/cuda/lib64:/usr/lib/x86_64-linux-gnu:$LD_LIBRARY_PATH"
conda
を使うと、どうやらIntel製MKLで高速演算可能なNumpyがインストールされるらしいが、上手く動かなかったりするので一度アンインストールして再インストールを行う。上手く動く人は飛ばしてOK。これでTensorflow 2.0がインストールされるので以下でGPUを読み込めているかを確認する。
import tensorflow from tensorflow.python.client import device_lib device_lib.list_local_devices()たまに
nvidia-smi -l
がバグるらしいのでその場合は再起動。
もしくはCudaの入れ替えを行う。
なお、CUDA 10.1用にドライバ入れなおす場合は以下のようにする。$ sudo apt-get purge nvidia* $ sudo apt-get install cuda-libraries-10-0 $ sudo add-apt-repository ppa:graphics-drivers $ sudo apt-get update $ sudo apt-get install nvidia-410Download source code
取り合えず何も考えずにオフィシャルのTensorflowのモデルをGit cloneする。
Masterブランチを使うと未だ公開されていないTensorflowの関数を使おうとしてバグるのでRC1にダウングレードさせる。$ git clone https://github.com/tensorflow/models.git $ git checkout tf_2_0_rc1次にパスを通して、依存関係をインストールする。その後、Transformerのディレクトリへ移動
因みに、この辺の解説は飛ばされることが多いが、めちゃくちゃ重要$ export PYTHONPATH="$PYTHONPATH:<YOUR_PATH>/models" $ pip install -r models/official/requirements.txt $ cd models/official/transformer/v2/今後、
<YOUR_PATH>/models/official/transformer/v2
が作業ディレクトリとなる。
またExportが外れないようにbashrc
にでも書き込んでおこう。
v2
フォルダはKeras及びTF2.0に対応したバージョンのソースコードとなっている。Variables
これから以下の変数をよく使うので、定義しておくと楽である。
PARAM_SET=big DATA_DIR=$HOME/transformer/data MODEL_DIR=$HOME/transformer/model_$PARAM_SET VOCAB_FILE=$DATA_DIR/vocab.ende.32768本稿は変数にしてしまうと分かりにくいので、ハードコーディングで進めて行く。
なお、変数で定義する場合は絶対パスを使わないと上手く動かない箇所が存在する。Prepare datasets
今回のタスクはポルトガル語から英語への変換である。このため、これらのコーパスをダウンロードする。
$ mkdir datasets $ python3 data_download.py --data_dir=./datasets/ダウンロードが全て終わると、Shufflingが行われて以下のようなログが表示される
2. Training
ここまできてようやく学習を開始させられる。
まず重みを記録するフォルダを作成する。$ mkdir checkpoints次に、以下のコマンドで学習を開始する。
python transformer_main.py --data_dir=./datasets/ --model_dir=./checkpoints --vocab_file=./datasets/vocab.ende.32768 --param_set=big3. Prediction
公式から急に説明がなくなるが、どうやらソースコードでは実装されている。
以下でPredictionができる。
python transformer_main.py --data_dir=./datasets/ --model_dir=./checkpoints --vocab_file=./datasets/vocab.ende.32768 --param_set=big --mode==predict結果はこのような感じ。
対訳取得
次に対訳表現を取得したい。しかし、インプットデータはTokenizeされているし、TF.Dataset形式だし・・・という状況。
おもむろに
transformer_main.py
を開き、以下のように書き直す。def predict(self): """Predicts result from the model.""" params = self.params flags_obj = self.flags_obj with tf.name_scope("model"): model = transformer.create_model(params, is_train=False) self._load_weights_if_possible( model, tf.train.latest_checkpoint(self.flags_obj.model_dir)) model.summary() subtokenizer = tokenizer.Subtokenizer(flags_obj.vocab_file) ds = data_pipeline.eval_input_fn(params) ds = ds.map(lambda x, y: x).take(_SINGLE_SAMPLE) ret = model.predict(ds) val_outputs, _ = ret dataset = ds.batch(1) iterator = dataset.make_one_shot_iterator() batch = iterator.get_next() for i in range(62): translate.translate_from_input(batch[0][i].numpy(), subtokenizer) length = len(val_outputs) for i in range(length): translate.translate_from_input(val_outputs[i], subtokenizer)TF2.0からDatasetの使い方変わりすぎ・・・
書き直して実行すると米語→独語の対訳が得られる。英語
On the other hand, 76% of voters were white but these represented only 46% of early voters.対訳
In den anderen anderen Worten, aber nicht nur wenige % des Jahres.5分学習したくらいじゃ全然翻訳出来ないことが分かった。
- 投稿日:2019-12-03T15:10:18+09:00
Airflowでモデルの学習からデプロイまでをやってみた
こんにちは,NTTドコモ入社4年目の石井です.
業務では機械学習を用いたレコメンデーション機能の開発に取り組んでいます.4日の記事では本業務とは直接的には関係ないですが,最近ではDevOpsと並んで盛り上がりを見せているMLOpsの領域から実行サイクルを円滑化してくれるワークフローエンジンである Airflow を用いたパイプライン実行について紹介します.
はじめに
最近ではCI/CDと行った技術はソフトウェア開発の現場では当たり前のように活用されていると思いますが,機械学習における継続的デプロイは明確なベストプラクティスがまだ定義されておらず,各々の置かれている状況や環境に応じて様々な形をとっていると思われます.特に研究開発の部署では議論の中心はモデリングになってくるため学習モデルの管理や継続的デプロイについては優先度が下がってしまうのが現状です.
一方でGoogle社が2015年に投稿した論文1内に記載されている以下の図のように基本的に機械学習システムにおけるモデリングの領域(機械学習)は非常に小さいことが分かると思います.最近では社内でも機械学習を活用したシステムを運用する機会が増えてきており,今後は研究開発によるモデリングからエンドユーザが利用するServingまでの全体最適をどうするかの議論が活発化すると考えられます.
出典: Hidden Technical Debt in Machine Learning Systemsよりそこで,今回はデータサイエンティストとしてモデリング外の領域を広げていくことを目指して個人的に興味を持っていたAirflowを用いたパイプラインを実践していこうと思います.
ゴール
今回のゴールですがAirflowを用いたパイプラインを作成して学習モデルの生成からモデルのオンライン推論までを実践していこうと思います.このゴール設定にした理由はモデルの学習からオンライン推論を実施する部分までを記載している記事を単純に私が見つけられなかったからです...
さて,具体的には以下のシナリオを実施して行こうと思います.
- Airflowを用いたDAG記述によるフローを定義とAirflowの実行
- APIをトリガーとして作成したタスク実行
- Titanicデータを使ったモデル学習タスクによる学習モデル生成
- 学習モデルのTensorflowServing環境へのデプロイ
- タスク完了ステータスのSlackチャネル通知
- 推論結果をAPIリクエストにて確認
Airflowとは
Airflow2はApacheのトップレベルプロジェクトとして開発されているOSSになります.その役割はワークフローの記述やスケジューリング,モニタリングといった機能を提供するプラットフォームとなっています.つまり,これまでパイプラインの作成に際して手動でCronバッチを作成したり,ログ管理を行ったり,データを取得するために個別でスクリプトを用意していたところをAirflowによって一元的に管理しながら継続実行していくことを可能にします.
そして,Airflowの仕組みについてですがAirflowでは大きく3つのコンポーネントからから構成されています.
WebServer
WebServerはDAGの設定内容や進行状況の確認等のモニタリング機能を提供します.DAGなどの設定情報や実行履歴などは接続しているDBに保存しており,それらの情報を読み取ってWeb画面上に表示します.Scheduler
SchedulerはDAGの一覧を監視して実行可否を判断し,DAGを実行する機能を提供します.また,Brokerと呼ばれるタスクキューにジョブを投入する役割も担います.Worker
Brokerに積まれているジョブをdequeueしてタスクを実行するインスタンスとしての役割を担い,並列分散処理を実現する機能を提供します.これらの3つのコンポーネントによって構成されているAirflowですが,Airflowを簡単に試したいという場合には単一処理としてシーケンシャルにタスクを実行することも可能です.その場合はBrokerを用いる必要はありませんが,基本的には並列分散で実行したいというシーンの方が多いと思いますのでBrokerはほぼ必須と考えて良いと考えられます.
また,AirflowではWebServerを介してREST APIが提供されているため,/api/experimental/
のエンドポイントを利用して外部からAirflowを操作することもできます.この点は外部のシステムと連携しやすさを考えると非常に便利です.TensorflowServingとは
TensorflowServing3は学習済みモデルをサービスとしてデプロイし、プロダクション環境で柔軟でパイパフォーマンスなオンライン推論APIを実現するソフトウェアです。Google社がApache2.0ライセンスとして公開しているOSSでtensorflow extended内のコンポーネントとしても利用されています.
ここではTensorflowServingの詳細は本題から外れてしまうため割愛しますが,ポイントを簡単にお伝えするとtensorflow servingはC++で記述されていることから高速で動作し,さらにはgRPCという通信プロトコルによるリクエストを受け付けるインターフェースを提供してくれるためモデルをプロダクション環境で運用する際には非常に有効なツールとなっています.また,学習モデルのバージョン管理や並列運用も可能となっており,至れり尽くせりの機能を提供しています.興味がある方は是非一度触って見てください.
ただ,個人的に唯一改善してほしいと感じたのは対応しているモデル形式です.Tensorflowで利用されているSavedModel形式のみに対応しているため,他のアルゴリズムやフレームワークで作成したモデルを簡単にデプロイすることができません.この点は今後拡張して行ってもらいると非常に嬉しいですね.システムアーキテクチャ
最終的に構成するシステムアーキテクチャの全体像をまとめておきます.
今回は1つのサーバーノードの中にシステムを構築していき,以下のような構成を再現していこうと思います.Airflowの情報を管理するデータベースにはPostgresを利用します.そして,SchedulerからWorkerへジョブを受け渡すBrokerにはRedisを用いました.
データベースやBrokerの選定には特に理由はありませんが,Airflowの公式ドキュメントで推奨しているデータベースということでPostgresを,BrokerにはRedisを利用しました.バックエンドで利用できるデータベースに大きな制約はありませんが,AirflowがSqlAlchemyを利用してデータベースの操作を行う関係で,SqlAlchemyがサポートしているデータベースを採用すべきと公式ドキュメントに注意書きがあったのでやはり推奨のものを利用するのがベターのようです.検証実行環境
- Amazon Linux 2 (t2.large)
- Python 3.7.4
- Airflow 1.10.6
- tensorflow 2.0
- tensorflow serving 1.10.0
モデル作成
モデルはKaggleでお馴染みのTitaicのデータセットを利用して生存者を分類する学習モデルを生成します.データ取得はオンラインストレージ上に格納されている場所から取得するため,学習タスクの中でデータ取得の処理も併せて実行していきます.Titanicのデータがどのようなデータが見たことのない人のためにサンプルで以下にデータの一部を記載しておきます.
フレームワークはTensorflow Servingで扱うSavedModel形式のモデルを生成したいためTensorflowを利用し,モデルのアルゴリズムはGradientBoostingTreeを用いてモデルを学習します.以下に今回利用するモデルを生成するコードを記載します.train.py#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import numpy as np import pandas as pd import tensorflow as tf CATEGORICAL_COLUMNS = ['sex', 'n_siblings_spouses', 'parch', 'class', 'deck', 'embark_town', 'alone'] NUMERIC_COLUMNS = ['age', 'fare'] class Train: def __init__(self): self.model = None self.X_train = None self.X_eval = None self.y_train = None self.y_eval = None self.feature_columns = [] def load(self): self.X_train = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv') self.X_eval = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv') self.y_train = self.X_train.pop('survived') self.y_eval = self.X_eval.pop('survived') def train(self): tf.random.set_seed(123) fc = tf.feature_column num_examples = len(self.y_train) def one_hot_cat_column(feature_name, vocab): return tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab)) def make_input_fn(X, y, num_example, n_epochs=None, shuffle=True): def input_fn(): dataset = tf.data.Dataset.from_tensor_slices((X.to_dict(orient='list'), y)) if shuffle: dataset = dataset.shuffle(num_example) dataset = (dataset .repeat(n_epochs) .batch(num_example)) return dataset return input_fn for feature_name in CATEGORICAL_COLUMNS: vocabulary = self.X_train[feature_name].unique() self.feature_columns.append( one_hot_cat_column( feature_name, vocabulary ) ) for feature_name in NUMERIC_COLUMNS: self.feature_columns.append( tf.feature_column.numeric_column( feature_name, dtype=tf.float32 ) ) train_input_fn = make_input_fn(self.X_train, self.y_train, num_examples) eval_input_fn = make_input_fn(self.X_eval, self.y_eval, num_examples, shuffle=False, n_epochs=1) params = { 'n_trees': 50, 'max_depth': 3, 'n_batches_per_layer': 1, 'center_bias': True } self.model = tf.estimator.BoostedTreesClassifier(self.feature_columns, **params) self.model.train(train_input_fn, max_steps=5) results = self.model.evaluate(eval_input_fn) return results def export(self, path): feature_spec = tf.feature_column.make_parse_example_spec(self.feature_columns) serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec) self.model.export_saved_model( path, serving_input_receiver_fn ) def execute(): output = "/".join([os.environ["AIRFLOW_HOME"],"models", "gbdt"] ) tr = Train() tr.load() tr.train() tr.export(os.getenv("MODELPATH", output)Dag記述
次にAirflow側のワークフローを記述していきます.
AirflowではDAG(Directed Acyclic Graph)と呼ばれる有効非巡回グラフを用いてタスクの記述や条件分岐,並列処理などをまとめていきます.
DAGの記載方法はインスタンス化したOperatorを用いてタスクを登録し,タスク間の依存関係を定義します.そして,Operatorにはいくつか種類がありメジャーなところだとBashシェルを実行するBashOperatorやPythonのメソッドを実行できるPythonOperator,GCP上のBigQueryにクエリを発行するBigQueryOperatorなどがあるので実現したいタスクに応じて様々なOperatorを使い分けることになります.もっと他のOperatorについて知りたいという方はAirflowの公式ドキュメント4をご覧ください.
それでは早速DAGを定義してきましょう.今回は以下の表のようにタスクをOperatorを用いて記述してDAGを定義しました.
タスク名 Operator 説明 start, end BashOperator Bashシェルコマンドを実行して標準出力を行います.実行開始や終了の出力をします. train PythonOperator データの読み込みからTensorflowにおける学習モデル生成までの処理を実施します. branch BranchPythonOperator 既存でTesorflow Servingが動作しているかを確認して次の処理を選択します. deploy BashOperator TensorflowServingのDockerImageを用いて学習モデルをデプロイします. update BashOperator Tensorflow Serving上のモデルを更新します. join DummyOperator 条件分岐したタスクを結合するダミータスクを実施します. slack SlackWebhookOperator slackのWebhhookを利用してDAGの処理が完了したことを通知します. 上記のタスクを定義したサンプルコードは以下になります.
pipeline.py#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import pytz import docker import datetime from datetime import datetime, timedelta from src.train import execute import airflow from airflow import DAG from airflow.hooks.base_hook import BaseHook from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator START_TIME = datetime.now(pytz.timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S') CLI = docker.DockerClient(base_url=os.getenv('DOCKER_URL', "unix://var/run/docker.sock")) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.today(), 'email': None, 'email_on_failure': False, 'email_on_retry': False, } dag = DAG( 'ml_pipeline', description='DAG for machine learning pipeline that performs from model learning to deployment', default_args=default_args, schedule_interval=timedelta(days=1) ) def branch_func(**kwargs): try: CLI.containers.get("serving") return "update" except: return "deploy" slack_message = " \ 【Slack ML Notification】\n\ *ステータス:* Success\n\ *タスクID:* ml_pipeline\n\ *実行開始時間:* " + START_TIME start_command = """ echo "==================" echo " Airflow Task Run " echo "==================" """ end_command = """ echo "=====================" echo " Airflow Task Finish " echo "=====================" """ deploy_command = """ docker run -d \ -p 8500:8500 \ -p 8501:8501 \ --name "serving" \ -e MODEL_NAME={{params.model}} \ -v "$AIRFLOW_HOME/models:/models" \ tensorflow/serving """ update_command = """ docker cp $AIRFLOW_HOME/models serving:/models docker restart serving """ start = BashOperator( task_id='start', bash_command=start_command, dag=dag ) end = BashOperator( task_id='end', bash_command=end_command, dag=dag ) train = PythonOperator( task_id='train', python_callable=execute, dag=dag ) branch = BranchPythonOperator( task_id='branch', python_callable=branch_func, dag=dag ) deploy = BashOperator( task_id='deploy', bash_command=deploy_command, params={"model": "gbdt"}, dag=dag, ) update = BashOperator( task_id='update', bash_command=update_command, dag=dag ) join = DummyOperator( task_id='join', trigger_rule='none_failed', dag=dag ) slack = SlackWebhookOperator( task_id='slack', http_conn_id='slack_webhook', webhook_token=BaseHook.get_connection('slack_webhook').password, message=slack_message, channel='#learning-notice', username='Airflow', dag=dag ) start >> train >> branching branch >> deploy >> join branch >> update >> join join >> slack >> endこれで今回利用するファイルの説明は終了です.
学習によって出力したモデルを本来はバージョン管理したいところですが今回は簡略化のため,とりあえず最新のモデルを継続してデプロイするだけにしました.
次からはAirflowの環境構築を行って実行していきましょう.環境構築
システムアーキテクチャの項目で記載した構成に従ってAirflowと各種ソフトウェア環境を構築していきます.
まず初めに周辺環境のセットアップから初めてAirflowの設定していこうと思います.周辺環境のセットアップ
PostgresとRedisはDockerコンテナを用いて構築します.
以下のコマンドを実行するだけで簡単にセットアップできます.$ docker run -d -p 6379:6379 redis:latest redis-server $ docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=airflow -e POSTGRES_DB=airflow -e POSTGRES_USER=airflow postgres:latestデータベースのデータを永続化したい場合はローカルのボリュームをマウントしてコンテナを立ち上げます.
また,データベースの認証情報はairflow
に統一していますがセキュリティ上よろしくないのでご自身の環境で適宜修正してください.サーバ環境変数の設定
Airflowの設定ファイルやDAGファイルの参照先は実行環境内で何も指定しない場合は
$HOME/airflow
に設定され,当該ディレクトリ内にある設定ファイルやDAGの情報を参照します.そのため,利用したいDAGが$HOME/airflow
以外にある場合はAirflowのルートパスを変更する必要があります.その場合は環境変数のAIRFLOW_HOME
でルートパスを指定します.
以下内容を.bash_profile
や.bashrc
ファイルに追記して環境変数を読み込んでください..bashrcexport AIRFLOW_HOME=<プロジェクトのパス>terminal$ source ~/.bashrcAirflowのインストールと初期化
Airflowのインストールはpipコマンドで簡単にインストールできます.
terminal$ pip install apache-airflow apache-airflow[postgres] apache-airflow[celery]インストールが完了したらAirflowで利用するデータベースを初期化します.
このコマンドを実行するとAIRFLOW_HOME配下にairflow.cfgとairflow.dbというファイルが生成されます.
airflow.cfgはairflowの設定ファイルでairflow.dbはSQLiteにて作成されたデータベースを保存するファイルになります.デフォルトで利用されるデータベースはSQLiteであるため,以下のコマンドを実行したタイミングで自動的にairflow.dbが作成されますが今回はpostgresを利用するためこちらのファイルは利用しません.terminal$ airflow initdb
airflow.cfgファイルの修正
Airflowの設定ファイルは先ほど説明したようにairflow.cfgとなります.
設定ファイルでは接続するデータベースやBroker,ロギングの内容などを設定します.
今回は分散環境でタスクを実行するためExecutorにCeleryExecutorを指定する必要があります.airflow.cfgの初期設定ではSequntialExecutorになっているため以下の通りに修正していきます.また,併せて接続するデータベースの情報もDockerコンテナで作成したデータベース情報に合わせて変更します.airflow.cfg# The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor executor = CeleryExecutor # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engine, more information # their website sql_alchemy_conn = postgresql://airflow:airflow@localhost:5432/airflow # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally # a sqlalchemy database. Refer to the Celery documentation for more # information. # http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings broker_url = redis://localhost:6379/0 # The Celery result_backend. When a job finishes, it needs to update the # metadata of the job. Therefore it will post a message on a message bus, # or insert it into a database (depending of the backend) # This status is used by the scheduler to update the state of the task # The use of a database is highly recommended # http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings result_backend = db+postgresql://airflow:airflow@localhost:5432/airflowAirflow内の環境変数設定
SlackOperatorでSlakに通知する際に利用する認証情報をAirflowのConnectionsに設定します.
Dagのスクリプト内に認証情報を直接記載しても実行できますが,WebUIにアクセスする人が誰でもSlackの認証情報を閲覧できてしまうためセキュリティの観点でよろしくありません.そのためConnectionsに認証情報を設定するようにします.Slackの認証情報はSlackのWebhook設定ページにアクセスして,新しくIncoming WebHookを作成することで発行できます.発行した後はWebHookURLの項目にあるURLをコピーしてください.次にAirflowで利用するデータベースにSlackの認証情報を設定します.airflowのCLIを利用して以下のコマンドで取得したSlackの認証情報を反映させます.
terminal$ airflow connections --add \ --conn_id=slack_webhook \ --conn_type=http \ --conn_host="https://hooks.slack.com/services" \ --conn_extra='{"webhook_token": "/TXXXXXXXXXXXX/BYYYYYYYYYYYYY/ZZZZZZZZZZZZZZZZZ"}'上記のパラメータの説明ですが,conn_id はSlackOperator内で呼び出すキーとなりますのでご自身の環境で自由に書き換えてください.変更した場合はDAG内のSlackOperatorでhttp_conn_idに指定する環境変数のキーも設定したものに書き換える必要がありますのでご注意ください.その他のパラメータは固定でconn_typeは通信プロトコルになるためhttpを指定,conn_hostはSlackのサーバホスト名になるのでWebhookURLのserviceまでの部分,最後のconn_extraではWebhookURLのservice以降のトークンを指定します.
DAGファイルの配置
前述で指定したAIRFLOW_HOMEで指定したルートパスの配下にdagsフォルダを作成して,dags配下に作成したDAGのスクリプトを配置します.
dagsフォルダ配下に配置したファイルは自動的にairflow側で読み込まれるためワークフローを定義したDAGファイルを配置するだけで簡単にパイプラインを追加することができます.何も設定しなくても勝手に読み込んでくれるので非常に使い勝手が楽です.
また,PythonOperatorで利用する学習を実行するスクリプトファイルのtrain.py
はsrcフォルダ配下に配置してpipeline.py
から呼び出すようにしました.PYTHONPATHにはAIRFLOW_HOMEまでのパスを追加して実行します.root/ ├ README.md ├ requirements.txt ├ airflow.cfg ├ dags/ │ └ pipeline.py ├ src/ │ └ train.pyAirflowのタスク実行
ここまででAirflowを実行するための周辺環境整備はできたのでここからは実行していきます.
Airflowの起動
Airflowを実行するためには3つのコンポーネントを動作させる必要があるため,以下の3つのコマンドを実行してプロセスを立ち上げます.
terminal$ airflow webserver & # webserverの起動 $ airflow scheduler & # schedulerの起動 $ airflow worker & # workerの起動Airflowによる管理画面
Airflowが正常に動作していることをブラウザで確認してみましょう.
ブラウザを起動してAirflowが動作しているサーバのホスト名にアクセスしてみます.正常に動作している場合はAirflowで実行できるDAG一覧画面が表示されます.DAG一覧の中から自身で作成したDAGを選択して先ほどのGraphViewタブを選択するとDAG記述で記載した内容がフローとして表示されるので定義した通りのワークフローになっているかを確認できます.今回は学習を実施してモデルを生成し,TensorflowServingでデプロイが完了したらSlackに通知するということで想定通りのフローが表示されていました.
APIトリガーによるタスク実行
作成したDAGを実行していきます.
トリガーとしてはいくつか種類がありますが,AirflowのREST APIにリクエストを投げて実行します.DAGを実行するAPIのエンドポイントはデフォルトでは/api/experimental/<自身のdag名>/dag_runs
になりますので,実行したいDAGの名前を指定してエンドポイント対してPOSTリクエストを投げます.terminal$ curl -X POST -d "{}" http://<Host>:8080/api/experimental/dags/ml_pipeline/dag_runs上記コマンドの
<Host>
にはAirflowが動作しているご自身のホスト名で置き換えて実行してください.リクエストが正常に行われるとAirflow側でタスクのインスタンスが生成されてタスクをキューに蓄積されていくのを確認できます.処理中画面
実行後はただ処理が完了するまで待つだけです.
Slack通知
Slackタスクが完了したら実行した事前に登録したSlackのチャネルに通知してくれます.個人的にはモデルを学習させて逐一学習が終わったかを確認することが非常に煩わしいと感じているためコミュニケーションツールと簡単に連携できて学習が終了したタイミングで知らせてくれるのは非常にありがたいと感じました.
推論実施
最後に学習モデルがデプロイされた推論サーバに対してAPIリクエストを投げて推論結果を確認してみましょう.
terminal$ curl -X POST -d \ "{ \ \"examples\": [{ \ \"sex\": [\"male\"], \ \"age\": [22.0], \ \"n_siblings_spouses\": [1], \ \"parch\": [0], \ \"fare\": [7.2500], \ \"class\": [\"Third\"], \ \"deck\": [\"unknown\"], \ \"embark_town\": [\"Southampton\"], \ \"alone\": [\"y\"] \ }] \ }" \ http://<Host>:8501/v1/models/gbdt:classify無事にモデルがデプロイされて推論結果を取得することができました.
結果は "0" ラベルということで正しく推論できていそうですね.{ "results": [[["0", 0.65634954], ["1", 0.343650401]]] }
まとめ
最後まで読んで頂きありがとうございました.皆様いかがでしたでしょうか.
今回はAirflowを用いたワークフローを用いて,モデルの学習からデプロイまでを実施してみました.
実際にAirflowに触ってみるとPythonスクリプトでワークフローを簡単に記述することができるお手軽さや外部システムと連携する際のハードルの低さといった多くのメリットを感じました.また,TensorflowServingによるオンライン推論まで自動化できると作成した学習モデルの民主化等にも貢献してくるのではないかと思います.一方で課題だと感じる部分もありました.Airflow上で複数のタスクが同時に運用されている環境ではリソース管理は当然課題に上がってくると思いますし,分散処理によるマルチクラスタにおいてログをどうやって集約するのかといった問題などもありそうです.
今回の取り組みの中でも実際にAirflowをEC2上で動作させていた時にインスタンスタイプを少なめに見積もってしまった結果,リソース不足が発生してプロセスが落ちるといったことを経験しました.やはりリソース監視とスケーリングによる対策は必至でありそうな感覚を覚えました.とは言えAirflowは有効なツールであることは間違いないので今後の動向は継続してウォッチしていこうと思います.まだまだ手探りの状態で扱いきれていない機能も多々あるので今度はBigQueryあたりと接続したワークフローとかにチャレンジしたいと思います.
- 投稿日:2019-12-03T12:16:48+09:00
[供養]ROS2Learnを使ってみた[取扱注意]
はじめに
もはや誰得感はありますが,年末のご供養ってコトで.
ROS2Learnとは?
ROS 2向け強化学習フレームワークです.Gazebo上でロボットアーム等の振る舞いを自律学習して,目標タスクを実行するモデルを自動生成します.openai_rosのROS 2対応版という位置づけで正しいと思われます.
Acutronic Roboticsが自社ロボットアームMARAをReference Modelとして強力に開発を推し進めていて,,,というところでお察しください( ´·௰·`)
公式ドキュメントなどはWebの海に離散しましたが,幾つか情報の痕跡が残っています.
- (ROS Discourse) Check out gym-gazebo2 and ROS2Learn!
- (arXiv) ROS2Learn: a reinforcement learning framework for ROS 2
- (YouTube) MARA robotic arm : Training MARAOrient using PPO2 through ROS2learn framework and gym-gazebo2.
GitHubリポジトリは残念ながらachivedされています.なんかあってもIssueも立てられんorz
- https://github.com/AcutronicRobotics/ros2learnなんでこの記事書いたの?
筆者は2019年6月頃にコレを動かして遊んでみていました.なんかいろいろできそうだな〜と夢想している間に件の事態になったわけですが,,,
せっかく動かしてみた経験があってそのメモを残していたし,また『World MoveIt Day 2019 in Tokyo』の機会にまだちゃんと動くことが確認できたので,環境構築の整え方くらいでもROS2 Advent Calendar 2019に放流してみようかなと考えた次第です.なお原典がすでに存在しませんので,情報の正当性は保証しません.いろいろ関係ツールをlatest versionにするとマズい可能性は高いので,なるべくそれぞれの当時のversionは明記しておきます.
違うバージョンでも動いたとか,この手順通りにやったら動かん!とか情報ありましたらお知らせくださいませ(でもどうすんだ??前提とする環境
- Ubuntu 18.04.2 LTS
- ROS 2 Dashing
- Gazebo 9
Eloquentがリリースされましたが,さてどうでしょうか??
ROS2Learnの環境構築
千里の路も
git clone
からです.cd ~ && git clone -b dashing https://github.com/acutronicRobotics/ros2learn.git cd ros2learn git submodule update --init --recursive各モジュールをインストールしていきます.
gym_gazebo2 の準備
https://github.com/AcutronicRobotics/gym-gazebo2
これもあくとろ(ry
INSTALL.mdに従ってやっていきます.
Gazebo のインストール
いつもどおりですね.
9.9.0以上が良いとのことで,古ければインストールし直しましょう.当方の環境では9.11.0
で動いてます.curl -sSL http://get.gazebosim.org | sh
依存パッケージのインストール
DDSはOpenSpliceを使ってほしいようです.今なら直っているかも.
あとTensorflow入れてます.当方の環境では当時入れた1.14.0
ですが,最新の2.0.0でも動くかは分かりません.sudo apt update && sudo apt install -y \ ros-dashing-action-msgs \ ros-dashing-message-filters \ ros-dashing-yaml-cpp-vendor \ ros-dashing-urdf \ ros-dashing-rttest \ ros-dashing-tf2 \ ros-dashing-tf2-geometry-msgs \ ros-dashing-rclcpp-action \ ros-dashing-cv-bridge \ ros-dashing-image-transport \ ros-dashing-camera-info-manager # Install OpenSplice RMW implementation. Required for dashing until default FastRTPS is fixed. sudo apt install ros-dashing-rmw-opensplice-cpp sudo apt update && sudo apt install -y \ build-essential \ cmake \ git \ python3-colcon-common-extensions \ python3-pip \ python-rosdep \ python3-vcstool \ python3-sip-dev \ python3-numpy \ wget # Install TensorFlow CPU. Feel free to get the GPU version at https://www.tensorflow.org/install/gpu. pip3 install tensorflow # Additional utilities pip3 install transforms3d billiard psutil # Fast-RTPS dependencies sudo apt install --no-install-recommends -y \ libasio-dev \ libtinyxml2-devMARA用のWorkSpace作成とビルド
mkdir -p ~/ros2_mara_ws/src cd ~/ros2_mara_ws wget https://raw.githubusercontent.com/AcutronicRobotics/MARA/dashing/mara-ros2.repos vcs import src < mara-ros2.repos wget https://raw.githubusercontent.com/AcutronicRobotics/gym-gazebo2/dashing/provision/additional-repos.repos vcs import src < additional-repos.repos # Avoid compiling erroneus package touch ~/ros2_mara_ws/src/orocos_kinematics_dynamics/orocos_kinematics_dynamics/COLCON_IGNOREHRIMを生成します.
cd ~/ros2_mara_ws/src/HRIM sudo pip3 install hrim hrim generate models/actuator/servo/servo.xml hrim generate models/actuator/gripper/gripper.xmlビルドしていきます.
ちょっとパッケージのビルド順にクセがあるようです.source /opt/ros/dashing/setup.bash cd ~/ros2_mara_ws colcon build --merge-install --packages-skip individual_trajectories_bridge # Remove warnings touch ~/ros2_mara_ws/install/share/orocos_kdl/local_setup.sh ~/ros2_mara_ws/install/share/orocos_kdl/local_setup.bashなお当方が試したときはrosidl_typesupport_openspliceにあったバグを踏み抜いたのですが,すでに修正対応されています.(参考)
OpenAI Gymのインストール
まず
pip3 uninstall gym
してから最新版をソースからビルドしようなっ!て言ってますが,怖いですねぇ.
当方のcommit tagは0.14.0
です.cd ~ git clone https://github.com/openai/gym cd gym pip3 install -e .gym-gazebo2 のインストール
以上の操作を行ってから,
ros2learn/
内でgym-gazebo2をインストールします.cd ~/ros2learn/environments/gym-gazebo2 pip3 install -e .環境変数の設定
このへんはご自由に.私はこういうものは.bashrcでfunction alias拵えています.
cd ~/ros2learn/environments/gym-gazebo2 echo "source `pwd`/provision/mara_setup.sh" >> ~/.bashrc echo "export RMW_IMPLEMENTATION=rmw_opensplice_cpp" >> ~/.bashrc source ~/.bashrcbaselines パッケージのインストール
これなんなのかよく分かっていません.誰か教えてください.
OpenAIの標準的なアルゴリズムが実装されたパッケージだと思ってます.cd ~/ros2learn/algorithms/baselines pip3 install -e .参考:https://github.com/AcutronicRobotics/baselines
依存ツールのインストール
Tensorboard で可視化できるのですが,その関係ですね.
pip3 install pandas pip3 install matplotlib sudo apt install python3-tkQuickStart with MARA_arm
まずはGazebo連携でMARAのモデルがちゃんと動くか確認します.
source ~/ros2learn/environments/gym-gazebo2/provision/mara_setup.sh cd ~/gym-gazebo2/examples/MARA python3 gg_random.py -g
-h
オプションでなにができるか確認してみてください.Train an agent
ロボットアームの姿勢制御がサンプルの学習対象のタスクのようです.
source ~/ros2learn/environments/gym-gazebo2/provision/mara_setup.sh cd ~/ros2learn/experiments/examples/MARA python3 train_ppo2_mlp.py -g
-g
オプションを付与すれば,Gazeboが起動して学習状況を可視的に確認できます(PCはめっちゃ重くなります)
ずっと実行し続けて,やればやるほど精度が上がる(はず)です.学習データは
/tmp/ros2learn/MARA-v0/ppo2_mlp/<timestamp>/
に順次生成されていきます.Run a trained policy
生成した学習データをもとに対象タスクを実行します.
$ cd ~/ros2learn/algorithms/baselines/baselines/ppo2/ $ vim default.py $ git diff $ diff --git a/baselines/ppo2/defaults.py b/baselines/ppo2/defaults.py index cc7d4a2..4239923 100644 --- a/baselines/ppo2/defaults.py +++ b/baselines/ppo2/defaults.py @@ -51,7 +51,7 @@ def mara_mlp(): # env_name = 'MARACollisionOrient-v0', transfer_path = None, # transfer_path = '/tmp/ros2learn/MARA-v0/ppo2_mlp/2019-02-19_12h47min/checkpoints/best', - trained_path = '/tmp/ros2learn/MARA-v0/ppo2_mlp/2019-04-02_13h18min/checkpoints/best' + trained_path = '/tmp/ros2learn/MARA-v0/ppo2_mlp/2019-06-12_14h37min/checkpoints/best' ) def mara_lstm(): $ cd ~/ros2learn/experiments/examples/MARA $ python3 run_ppo2_mlp.py -g -r -v 0.3
defaults.py
を編集して,実行する学習データを指定します.
-g
はGazebo起動で可視化
-r
は実速度 (real speed) で実行
-v
は速度 (velocity) の指定 (この例では 0.3 rad/sec)
という感じです.Visualize training data on tensorboard
tensorboard機能で学習データの各種統計情報を表示することができます.
$ tensorboard --logdir=/tmp/ros2learn/MARA-v0/ppo2_mlp --port 8008としてやって,ブラウザで http://localhost:8008/ にアクセスしてみてください.
所感
当時のメモをそのまま貼り付けてみます.
- Gazebo準拠に作成したモデルを用いて強化学習を実行するには非常に便利そう
- Gazeboで学習経過/推論結果を可視的に確認できるのも良い
- 対象のモデルとタスクを作成するのが大変ではあるかもしれない
- 例えばTurtleBot3は,ROBOTIS公式のデモのものを流用するなど?
- 学習アルゴリズムのスクリプトを自作することもできそう
- baseline が標準のものだが,詳細は不明(TensorFlowの一般的なもの?)
- 様々な学習モデルやアルゴリズムの比較評価をできそう
- 自前ロボット適用には用意すべきファイルは多そう
- 基本は,GazeboモデルとPython3スクリプトの作成になる模様
まだワカラナイことが多すぎるので,いろいろ深堀りしてみたさはありますが,はてさて,,,