20191203のTensorFlowに関する記事は3件です。

文書解析タスクのTransformerをTensorflow officialから使う (TF2.0インストール付き)

0. Abstract

これまでの文書解析タスクでは、ある意味盲目的に時系列処理としてLSTM、RNNが用いられてきた。
しかし、ここ最近オートエンコーダベースのモデルにAttention機構という技術を組み合わせたモデルが登場してきており、これまでのLSTMやRNNの性能を上回っている。
Attention機構自体は昔から存在しているが、それを文書解析タスクで採用したものはTransformerというモデル、論文では「Attention Is All You Need」というものとなっている。
https://arxiv.org/abs/1706.03762

本技術の紹介サイトは様々あるが、王道のTensorflowを動作させるチュートリアルは探せなかったため、使い方を説明していきたい。

今回の参考サイトは以下である。
https://github.com/tensorflow/models/tree/master/official/transformer#model-training-and-evaluation

1. Installation

学習出来る環境まで構築していく。

Switch Tensorflow 2.0 with anaconda

TF2.0で使えるモデルらしいのでまずはTF2.0をインストール。
Cuda 10.0、CuDNN 7.4、Anacondaがよい(2019, 12月時点)。
Cuda 10.1の場合はライブラリパスが見つからなくなる。

$ conda update -n base conda
$ conda create --name tf2-gpu
$ conda activate tf2-gpu
$ pip install --upgrade pip
$ pip uninstall numpy
$ pip install numpy
$ pip install tensorflow-gpu==2.0.0-rc1
# cuda 10.1用
$ export LD_LIBRARY_PATH="/usr/local/cuda/lib64:/usr/lib/x86_64-linux-gnu:$LD_LIBRARY_PATH"

condaを使うと、どうやらIntel製MKLで高速演算可能なNumpyがインストールされるらしいが、上手く動かなかったりするので一度アンインストールして再インストールを行う。上手く動く人は飛ばしてOK。

これでTensorflow 2.0がインストールされるので以下でGPUを読み込めているかを確認する。

import tensorflow
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

たまにnvidia-smi -lがバグるらしいのでその場合は再起動。
もしくはCudaの入れ替えを行う。
なお、CUDA 10.1用にドライバ入れなおす場合は以下のようにする。

$ sudo apt-get purge nvidia*
$ sudo apt-get install cuda-libraries-10-0
$ sudo add-apt-repository ppa:graphics-drivers
$ sudo apt-get update
$ sudo apt-get install nvidia-410

Download source code

取り合えず何も考えずにオフィシャルのTensorflowのモデルをGit cloneする。
Masterブランチを使うと未だ公開されていないTensorflowの関数を使おうとしてバグるのでRC1にダウングレードさせる。

$ git clone https://github.com/tensorflow/models.git
$ git checkout tf_2_0_rc1

次にパスを通して、依存関係をインストールする。その後、Transformerのディレクトリへ移動
因みに、この辺の解説は飛ばされることが多いが、めちゃくちゃ重要

$ export PYTHONPATH="$PYTHONPATH:<YOUR_PATH>/models"
$ pip install -r models/official/requirements.txt
$ cd models/official/transformer/v2/

今後、<YOUR_PATH>/models/official/transformer/v2が作業ディレクトリとなる。
またExportが外れないようにbashrcにでも書き込んでおこう。
v2フォルダはKeras及びTF2.0に対応したバージョンのソースコードとなっている。

Variables

これから以下の変数をよく使うので、定義しておくと楽である。

PARAM_SET=big
DATA_DIR=$HOME/transformer/data
MODEL_DIR=$HOME/transformer/model_$PARAM_SET
VOCAB_FILE=$DATA_DIR/vocab.ende.32768

本稿は変数にしてしまうと分かりにくいので、ハードコーディングで進めて行く。
なお、変数で定義する場合は絶対パスを使わないと上手く動かない箇所が存在する。

Prepare datasets

今回のタスクはポルトガル語から英語への変換である。このため、これらのコーパスをダウンロードする。

$ mkdir datasets
$ python3 data_download.py --data_dir=./datasets/

亀の歩みでダウンロードが始まる。
image.png

ダウンロードが全て終わると、Shufflingが行われて以下のようなログが表示される
image.png

2. Training

ここまできてようやく学習を開始させられる。
まず重みを記録するフォルダを作成する。

$ mkdir checkpoints

次に、以下のコマンドで学習を開始する。

python transformer_main.py --data_dir=./datasets/ --model_dir=./checkpoints --vocab_file=./datasets/vocab.ende.32768 --param_set=big

学習が開始されればこのような画面が表示される。
image.png

3. Prediction

公式から急に説明がなくなるが、どうやらソースコードでは実装されている。

以下でPredictionができる。

python transformer_main.py --data_dir=./datasets/ --model_dir=./checkpoints --vocab_file=./datasets/vocab.ende.32768 --param_set=big --mode==predict

結果はこのような感じ。

image.png

対訳取得

次に対訳表現を取得したい。しかし、インプットデータはTokenizeされているし、TF.Dataset形式だし・・・という状況。

おもむろにtransformer_main.pyを開き、以下のように書き直す。

 def predict(self):
    """Predicts result from the model."""
    params = self.params
    flags_obj = self.flags_obj

    with tf.name_scope("model"):
      model = transformer.create_model(params, is_train=False)
      self._load_weights_if_possible(
          model, tf.train.latest_checkpoint(self.flags_obj.model_dir))
      model.summary()
    subtokenizer = tokenizer.Subtokenizer(flags_obj.vocab_file)

    ds = data_pipeline.eval_input_fn(params)
    ds = ds.map(lambda x, y: x).take(_SINGLE_SAMPLE)
    ret = model.predict(ds)
    val_outputs, _ = ret

    dataset = ds.batch(1)
    iterator = dataset.make_one_shot_iterator()
    batch = iterator.get_next()

    for i in range(62):
      translate.translate_from_input(batch[0][i].numpy(), subtokenizer)

    length = len(val_outputs)
    for i in range(length):
      translate.translate_from_input(val_outputs[i], subtokenizer)

TF2.0からDatasetの使い方変わりすぎ・・・
書き直して実行すると米語→独語の対訳が得られる。

英語

On the other hand, 76% of voters were white but these represented only 46% of early voters.

対訳

In den anderen anderen Worten, aber nicht nur wenige % des Jahres.

5分学習したくらいじゃ全然翻訳出来ないことが分かった。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Airflowでモデルの学習からデプロイまでをやってみた

こんにちは,NTTドコモ入社4年目の石井です.
業務では機械学習を用いたレコメンデーション機能の開発に取り組んでいます.

4日の記事では本業務とは直接的には関係ないですが,最近ではDevOpsと並んで盛り上がりを見せているMLOpsの領域から実行サイクルを円滑化してくれるワークフローエンジンである Airflow を用いたパイプライン実行について紹介します.

はじめに

最近ではCI/CDと行った技術はソフトウェア開発の現場では当たり前のように活用されていると思いますが,機械学習における継続的デプロイは明確なベストプラクティスがまだ定義されておらず,各々の置かれている状況や環境に応じて様々な形をとっていると思われます.特に研究開発の部署では議論の中心はモデリングになってくるため学習モデルの管理や継続的デプロイについては優先度が下がってしまうのが現状です.

一方でGoogle社が2015年に投稿した論文1内に記載されている以下の図のように基本的に機械学習システムにおけるモデリングの領域(機械学習)は非常に小さいことが分かると思います.最近では社内でも機械学習を活用したシステムを運用する機会が増えてきており,今後は研究開発によるモデリングからエンドユーザが利用するServingまでの全体最適をどうするかの議論が活発化すると考えられます.
figure_ml_system.png
出典: Hidden Technical Debt in Machine Learning Systemsより

そこで,今回はデータサイエンティストとしてモデリング外の領域を広げていくことを目指して個人的に興味を持っていたAirflowを用いたパイプラインを実践していこうと思います.

ゴール

今回のゴールですがAirflowを用いたパイプラインを作成して学習モデルの生成からモデルのオンライン推論までを実践していこうと思います.このゴール設定にした理由はモデルの学習からオンライン推論を実施する部分までを記載している記事を単純に私が見つけられなかったからです...
さて,具体的には以下のシナリオを実施して行こうと思います.

  1. Airflowを用いたDAG記述によるフローを定義とAirflowの実行
  2. APIをトリガーとして作成したタスク実行
  3. Titanicデータを使ったモデル学習タスクによる学習モデル生成
  4. 学習モデルのTensorflowServing環境へのデプロイ
  5. タスク完了ステータスのSlackチャネル通知
  6. 推論結果をAPIリクエストにて確認

Airflowとは

Airflow2はApacheのトップレベルプロジェクトとして開発されているOSSになります.その役割はワークフローの記述やスケジューリング,モニタリングといった機能を提供するプラットフォームとなっています.つまり,これまでパイプラインの作成に際して手動でCronバッチを作成したり,ログ管理を行ったり,データを取得するために個別でスクリプトを用意していたところをAirflowによって一元的に管理しながら継続実行していくことを可能にします.
そして,Airflowの仕組みについてですがAirflowでは大きく3つのコンポーネントからから構成されています.

  • WebServer
    WebServerはDAGの設定内容や進行状況の確認等のモニタリング機能を提供します.DAGなどの設定情報や実行履歴などは接続しているDBに保存しており,それらの情報を読み取ってWeb画面上に表示します.

  • Scheduler
    SchedulerはDAGの一覧を監視して実行可否を判断し,DAGを実行する機能を提供します.また,Brokerと呼ばれるタスクキューにジョブを投入する役割も担います.

  • Worker
    Brokerに積まれているジョブをdequeueしてタスクを実行するインスタンスとしての役割を担い,並列分散処理を実現する機能を提供します.

これらの3つのコンポーネントによって構成されているAirflowですが,Airflowを簡単に試したいという場合には単一処理としてシーケンシャルにタスクを実行することも可能です.その場合はBrokerを用いる必要はありませんが,基本的には並列分散で実行したいというシーンの方が多いと思いますのでBrokerはほぼ必須と考えて良いと考えられます.
また,AirflowではWebServerを介してREST APIが提供されているため,/api/experimental/のエンドポイントを利用して外部からAirflowを操作することもできます.この点は外部のシステムと連携しやすさを考えると非常に便利です.

TensorflowServingとは

TensorflowServing3は学習済みモデルをサービスとしてデプロイし、プロダクション環境で柔軟でパイパフォーマンスなオンライン推論APIを実現するソフトウェアです。Google社がApache2.0ライセンスとして公開しているOSSでtensorflow extended内のコンポーネントとしても利用されています.

ここではTensorflowServingの詳細は本題から外れてしまうため割愛しますが,ポイントを簡単にお伝えするとtensorflow servingはC++で記述されていることから高速で動作し,さらにはgRPCという通信プロトコルによるリクエストを受け付けるインターフェースを提供してくれるためモデルをプロダクション環境で運用する際には非常に有効なツールとなっています.また,学習モデルのバージョン管理や並列運用も可能となっており,至れり尽くせりの機能を提供しています.興味がある方は是非一度触って見てください.

ただ,個人的に唯一改善してほしいと感じたのは対応しているモデル形式です.Tensorflowで利用されているSavedModel形式のみに対応しているため,他のアルゴリズムやフレームワークで作成したモデルを簡単にデプロイすることができません.この点は今後拡張して行ってもらいると非常に嬉しいですね.

システムアーキテクチャ

最終的に構成するシステムアーキテクチャの全体像をまとめておきます.
今回は1つのサーバーノードの中にシステムを構築していき,以下のような構成を再現していこうと思います.

figure_system_architecture.png

Airflowの情報を管理するデータベースにはPostgresを利用します.そして,SchedulerからWorkerへジョブを受け渡すBrokerにはRedisを用いました.
データベースやBrokerの選定には特に理由はありませんが,Airflowの公式ドキュメントで推奨しているデータベースということでPostgresを,BrokerにはRedisを利用しました.バックエンドで利用できるデータベースに大きな制約はありませんが,AirflowがSqlAlchemyを利用してデータベースの操作を行う関係で,SqlAlchemyがサポートしているデータベースを採用すべきと公式ドキュメントに注意書きがあったのでやはり推奨のものを利用するのがベターのようです.

検証実行環境

  • Amazon Linux 2 (t2.large)
  • Python 3.7.4
  • Airflow 1.10.6
  • tensorflow 2.0
  • tensorflow serving 1.10.0

モデル作成

モデルはKaggleでお馴染みのTitaicのデータセットを利用して生存者を分類する学習モデルを生成します.データ取得はオンラインストレージ上に格納されている場所から取得するため,学習タスクの中でデータ取得の処理も併せて実行していきます.Titanicのデータがどのようなデータが見たことのない人のためにサンプルで以下にデータの一部を記載しておきます.
figure_airflow_dataframe.png
フレームワークはTensorflow Servingで扱うSavedModel形式のモデルを生成したいためTensorflowを利用し,モデルのアルゴリズムはGradientBoostingTreeを用いてモデルを学習します.以下に今回利用するモデルを生成するコードを記載します.

train.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import numpy as np
import pandas as pd
import tensorflow as tf

CATEGORICAL_COLUMNS   = ['sex', 'n_siblings_spouses', 'parch',
                         'class', 'deck', 'embark_town', 'alone']
NUMERIC_COLUMNS       = ['age', 'fare']

class Train:
    def __init__(self):
        self.model           = None
        self.X_train         = None
        self.X_eval          = None
        self.y_train         = None
        self.y_eval          = None
        self.feature_columns = []

    def load(self):
        self.X_train = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv')
        self.X_eval  = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv')
        self.y_train = self.X_train.pop('survived')
        self.y_eval  = self.X_eval.pop('survived')

    def train(self):
        tf.random.set_seed(123)
        fc              = tf.feature_column
        num_examples    = len(self.y_train)

        def one_hot_cat_column(feature_name, vocab):
            return tf.feature_column.indicator_column(
                tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))

        def make_input_fn(X, y, num_example, n_epochs=None, shuffle=True):
            def input_fn():
                dataset = tf.data.Dataset.from_tensor_slices((X.to_dict(orient='list'), y))
                if shuffle:
                    dataset = dataset.shuffle(num_example)
                dataset = (dataset
                    .repeat(n_epochs)
                    .batch(num_example))
                return dataset
            return input_fn

        for feature_name in CATEGORICAL_COLUMNS:
            vocabulary = self.X_train[feature_name].unique()
            self.feature_columns.append(
                one_hot_cat_column(
                    feature_name,
                    vocabulary
                )
            )

        for feature_name in NUMERIC_COLUMNS:
            self.feature_columns.append(
                tf.feature_column.numeric_column(
                    feature_name,
                    dtype=tf.float32
                )
            )

        train_input_fn = make_input_fn(self.X_train, self.y_train, num_examples)
        eval_input_fn  = make_input_fn(self.X_eval, self.y_eval, num_examples, shuffle=False, n_epochs=1)

        params = {
            'n_trees': 50,
            'max_depth': 3,
            'n_batches_per_layer': 1,
            'center_bias': True
        }

        self.model = tf.estimator.BoostedTreesClassifier(self.feature_columns, **params)
        self.model.train(train_input_fn, max_steps=5)
        results = self.model.evaluate(eval_input_fn)
        return results

    def export(self, path):
        feature_spec = tf.feature_column.make_parse_example_spec(self.feature_columns)
        serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)

        self.model.export_saved_model(
            path,
            serving_input_receiver_fn
        )


def execute():
    output = "/".join([os.environ["AIRFLOW_HOME"],"models", "gbdt"] )
    tr = Train()
    tr.load()
    tr.train()
    tr.export(os.getenv("MODELPATH", output)


Dag記述

次にAirflow側のワークフローを記述していきます.
AirflowではDAG(Directed Acyclic Graph)と呼ばれる有効非巡回グラフを用いてタスクの記述や条件分岐,並列処理などをまとめていきます.
DAGの記載方法はインスタンス化したOperatorを用いてタスクを登録し,タスク間の依存関係を定義します.そして,Operatorにはいくつか種類がありメジャーなところだとBashシェルを実行するBashOperatorやPythonのメソッドを実行できるPythonOperator,GCP上のBigQueryにクエリを発行するBigQueryOperatorなどがあるので実現したいタスクに応じて様々なOperatorを使い分けることになります.もっと他のOperatorについて知りたいという方はAirflowの公式ドキュメント4をご覧ください.

それでは早速DAGを定義してきましょう.今回は以下の表のようにタスクをOperatorを用いて記述してDAGを定義しました.

タスク名 Operator 説明
start, end BashOperator Bashシェルコマンドを実行して標準出力を行います.実行開始や終了の出力をします.
train PythonOperator データの読み込みからTensorflowにおける学習モデル生成までの処理を実施します.
branch BranchPythonOperator 既存でTesorflow Servingが動作しているかを確認して次の処理を選択します.
deploy BashOperator TensorflowServingのDockerImageを用いて学習モデルをデプロイします.
update BashOperator Tensorflow Serving上のモデルを更新します.
join DummyOperator 条件分岐したタスクを結合するダミータスクを実施します.
slack SlackWebhookOperator slackのWebhhookを利用してDAGの処理が完了したことを通知します.

上記のタスクを定義したサンプルコードは以下になります.

pipeline.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import pytz
import docker
import datetime
from datetime import datetime, timedelta
from src.train import execute

import airflow
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

START_TIME = datetime.now(pytz.timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S')
CLI        = docker.DockerClient(base_url=os.getenv('DOCKER_URL', "unix://var/run/docker.sock"))

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.today(),
    'email': None,
    'email_on_failure': False,
    'email_on_retry': False,
}

dag = DAG(
    'ml_pipeline',
    description='DAG for machine learning pipeline that performs from model learning to deployment',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

def branch_func(**kwargs):
    try:
        CLI.containers.get("serving")
        return "update"
    except:
        return "deploy"

slack_message = " \
    【Slack ML Notification】\n\
    *ステータス:* Success\n\
    *タスクID:* ml_pipeline\n\
    *実行開始時間:* " + START_TIME


start_command = """
    echo "=================="
    echo " Airflow Task Run "
    echo "=================="   
"""

end_command = """
    echo "====================="
    echo " Airflow Task Finish "
    echo "====================="
"""

deploy_command = """
    docker run -d \
    -p 8500:8500 \
    -p 8501:8501 \
    --name "serving" \
    -e MODEL_NAME={{params.model}} \
    -v "$AIRFLOW_HOME/models:/models" \
    tensorflow/serving
"""

update_command = """
    docker cp $AIRFLOW_HOME/models serving:/models
    docker restart serving
"""

start = BashOperator(
    task_id='start',
    bash_command=start_command,
    dag=dag
)

end = BashOperator(
    task_id='end',
    bash_command=end_command,
    dag=dag
)

train = PythonOperator(
    task_id='train',
    python_callable=execute,
    dag=dag        
) 

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=branch_func,
    dag=dag
)

deploy = BashOperator(
    task_id='deploy',
    bash_command=deploy_command,
    params={"model": "gbdt"},
    dag=dag,
)

update = BashOperator(
    task_id='update',
    bash_command=update_command,
    dag=dag
)

join = DummyOperator(
    task_id='join',
    trigger_rule='none_failed',
    dag=dag
)

slack = SlackWebhookOperator(
    task_id='slack',
    http_conn_id='slack_webhook',
    webhook_token=BaseHook.get_connection('slack_webhook').password,
    message=slack_message,
    channel='#learning-notice',
    username='Airflow',
    dag=dag
)

start >> train >> branching
branch >> deploy >> join
branch >> update >> join
join >> slack >> end

これで今回利用するファイルの説明は終了です.
学習によって出力したモデルを本来はバージョン管理したいところですが今回は簡略化のため,とりあえず最新のモデルを継続してデプロイするだけにしました.
次からはAirflowの環境構築を行って実行していきましょう.

環境構築

システムアーキテクチャの項目で記載した構成に従ってAirflowと各種ソフトウェア環境を構築していきます.
まず初めに周辺環境のセットアップから初めてAirflowの設定していこうと思います.

周辺環境のセットアップ

PostgresとRedisはDockerコンテナを用いて構築します.
以下のコマンドを実行するだけで簡単にセットアップできます.

$ docker run -d -p 6379:6379 redis:latest redis-server
$ docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=airflow -e POSTGRES_DB=airflow -e POSTGRES_USER=airflow  postgres:latest

データベースのデータを永続化したい場合はローカルのボリュームをマウントしてコンテナを立ち上げます.
また,データベースの認証情報はairflowに統一していますがセキュリティ上よろしくないのでご自身の環境で適宜修正してください.

サーバ環境変数の設定

Airflowの設定ファイルやDAGファイルの参照先は実行環境内で何も指定しない場合は$HOME/airflowに設定され,当該ディレクトリ内にある設定ファイルやDAGの情報を参照します.そのため,利用したいDAGが$HOME/airflow以外にある場合はAirflowのルートパスを変更する必要があります.その場合は環境変数のAIRFLOW_HOMEでルートパスを指定します.

以下内容を.bash_profile.bashrcファイルに追記して環境変数を読み込んでください.

.bashrc
export AIRFLOW_HOME=<プロジェクトのパス>  
terminal
$ source ~/.bashrc

Airflowのインストールと初期化

Airflowのインストールはpipコマンドで簡単にインストールできます.

terminal
$ pip install apache-airflow apache-airflow[postgres] apache-airflow[celery]

インストールが完了したらAirflowで利用するデータベースを初期化します.
このコマンドを実行するとAIRFLOW_HOME配下にairflow.cfgとairflow.dbというファイルが生成されます.
airflow.cfgはairflowの設定ファイルでairflow.dbはSQLiteにて作成されたデータベースを保存するファイルになります.デフォルトで利用されるデータベースはSQLiteであるため,以下のコマンドを実行したタイミングで自動的にairflow.dbが作成されますが今回はpostgresを利用するためこちらのファイルは利用しません.

terminal
$ airflow initdb

airflow.cfgファイルの修正

Airflowの設定ファイルは先ほど説明したようにairflow.cfgとなります.
設定ファイルでは接続するデータベースやBroker,ロギングの内容などを設定します.
今回は分散環境でタスクを実行するためExecutorにCeleryExecutorを指定する必要があります.airflow.cfgの初期設定ではSequntialExecutorになっているため以下の通りに修正していきます.また,併せて接続するデータベースの情報もDockerコンテナで作成したデータベース情報に合わせて変更します.

airflow.cfg
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql://airflow:airflow@localhost:5432/airflow

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = redis://localhost:6379/0

# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+postgresql://airflow:airflow@localhost:5432/airflow

Airflow内の環境変数設定

SlackOperatorでSlakに通知する際に利用する認証情報をAirflowのConnectionsに設定します.
Dagのスクリプト内に認証情報を直接記載しても実行できますが,WebUIにアクセスする人が誰でもSlackの認証情報を閲覧できてしまうためセキュリティの観点でよろしくありません.そのためConnectionsに認証情報を設定するようにします.Slackの認証情報はSlackのWebhook設定ページにアクセスして,新しくIncoming WebHookを作成することで発行できます.発行した後はWebHookURLの項目にあるURLをコピーしてください.

slack_webhook.png

次にAirflowで利用するデータベースにSlackの認証情報を設定します.airflowのCLIを利用して以下のコマンドで取得したSlackの認証情報を反映させます.

terminal
$ airflow connections --add \
    --conn_id=slack_webhook \
    --conn_type=http \
    --conn_host="https://hooks.slack.com/services" \
    --conn_extra='{"webhook_token": "/TXXXXXXXXXXXX/BYYYYYYYYYYYYY/ZZZZZZZZZZZZZZZZZ"}'

上記のパラメータの説明ですが,conn_id はSlackOperator内で呼び出すキーとなりますのでご自身の環境で自由に書き換えてください.変更した場合はDAG内のSlackOperatorでhttp_conn_idに指定する環境変数のキーも設定したものに書き換える必要がありますのでご注意ください.その他のパラメータは固定でconn_typeは通信プロトコルになるためhttpを指定,conn_hostはSlackのサーバホスト名になるのでWebhookURLのserviceまでの部分,最後のconn_extraではWebhookURLのservice以降のトークンを指定します.

DAGファイルの配置

前述で指定したAIRFLOW_HOMEで指定したルートパスの配下にdagsフォルダを作成して,dags配下に作成したDAGのスクリプトを配置します.
dagsフォルダ配下に配置したファイルは自動的にairflow側で読み込まれるためワークフローを定義したDAGファイルを配置するだけで簡単にパイプラインを追加することができます.何も設定しなくても勝手に読み込んでくれるので非常に使い勝手が楽です.
また,PythonOperatorで利用する学習を実行するスクリプトファイルのtrain.pyはsrcフォルダ配下に配置してpipeline.pyから呼び出すようにしました.PYTHONPATHにはAIRFLOW_HOMEまでのパスを追加して実行します.

root/
  ├ README.md
  ├ requirements.txt
  ├ airflow.cfg
  ├ dags/
  │   └ pipeline.py
  ├ src/
  │   └ train.py

Airflowのタスク実行

ここまででAirflowを実行するための周辺環境整備はできたのでここからは実行していきます.

Airflowの起動

Airflowを実行するためには3つのコンポーネントを動作させる必要があるため,以下の3つのコマンドを実行してプロセスを立ち上げます.

terminal
$ airflow webserver &             # webserverの起動
$ airflow scheduler &             # schedulerの起動
$ airflow worker &                # workerの起動

Airflowによる管理画面

Airflowが正常に動作していることをブラウザで確認してみましょう.

ブラウザを起動してAirflowが動作しているサーバのホスト名にアクセスしてみます.正常に動作している場合はAirflowで実行できるDAG一覧画面が表示されます.

figure_airflow_ui.png

DAG一覧の中から自身で作成したDAGを選択して先ほどのGraphViewタブを選択するとDAG記述で記載した内容がフローとして表示されるので定義した通りのワークフローになっているかを確認できます.今回は学習を実施してモデルを生成し,TensorflowServingでデプロイが完了したらSlackに通知するということで想定通りのフローが表示されていました.

figure_airflow_workflow.png

APIトリガーによるタスク実行

作成したDAGを実行していきます.
トリガーとしてはいくつか種類がありますが,AirflowのREST APIにリクエストを投げて実行します.DAGを実行するAPIのエンドポイントはデフォルトでは/api/experimental/<自身のdag名>/dag_runsになりますので,実行したいDAGの名前を指定してエンドポイント対してPOSTリクエストを投げます.

terminal
$ curl -X POST -d "{}" http://<Host>:8080/api/experimental/dags/ml_pipeline/dag_runs

上記コマンドの<Host>にはAirflowが動作しているご自身のホスト名で置き換えて実行してください.リクエストが正常に行われるとAirflow側でタスクのインスタンスが生成されてタスクをキューに蓄積されていくのを確認できます.

figure_airflow_dag_runs.png

処理中画面

実行後はただ処理が完了するまで待つだけです.

figure_airflow_pipeline_video.gif

Slack通知

Slackタスクが完了したら実行した事前に登録したSlackのチャネルに通知してくれます.個人的にはモデルを学習させて逐一学習が終わったかを確認することが非常に煩わしいと感じているためコミュニケーションツールと簡単に連携できて学習が終了したタイミングで知らせてくれるのは非常にありがたいと感じました.

figure_airflow_slack.png

推論実施

最後に学習モデルがデプロイされた推論サーバに対してAPIリクエストを投げて推論結果を確認してみましょう.

terminal
$ curl -X POST -d \
  "{ \
    \"examples\": [{ \
      \"sex\": [\"male\"], \
      \"age\": [22.0], \
      \"n_siblings_spouses\": [1], \
      \"parch\": [0], \
      \"fare\": [7.2500], \
      \"class\": [\"Third\"], \
      \"deck\": [\"unknown\"], \
      \"embark_town\": [\"Southampton\"], \
      \"alone\": [\"y\"] \
    }] \
  }" \
 http://<Host>:8501/v1/models/gbdt:classify

無事にモデルがデプロイされて推論結果を取得することができました.
結果は "0" ラベルということで正しく推論できていそうですね.

{
    "results": [[["0", 0.65634954], ["1", 0.343650401]]]
}

まとめ

最後まで読んで頂きありがとうございました.皆様いかがでしたでしょうか.

今回はAirflowを用いたワークフローを用いて,モデルの学習からデプロイまでを実施してみました.
実際にAirflowに触ってみるとPythonスクリプトでワークフローを簡単に記述することができるお手軽さや外部システムと連携する際のハードルの低さといった多くのメリットを感じました.また,TensorflowServingによるオンライン推論まで自動化できると作成した学習モデルの民主化等にも貢献してくるのではないかと思います.

一方で課題だと感じる部分もありました.Airflow上で複数のタスクが同時に運用されている環境ではリソース管理は当然課題に上がってくると思いますし,分散処理によるマルチクラスタにおいてログをどうやって集約するのかといった問題などもありそうです.
今回の取り組みの中でも実際にAirflowをEC2上で動作させていた時にインスタンスタイプを少なめに見積もってしまった結果,リソース不足が発生してプロセスが落ちるといったことを経験しました.やはりリソース監視とスケーリングによる対策は必至でありそうな感覚を覚えました.

とは言えAirflowは有効なツールであることは間違いないので今後の動向は継続してウォッチしていこうと思います.まだまだ手探りの状態で扱いきれていない機能も多々あるので今度はBigQueryあたりと接続したワークフローとかにチャレンジしたいと思います.

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[供養]ROS2Learnを使ってみた[取扱注意]

はじめに

もはや誰得感はありますが,年末のご供養ってコトで.

ROS2Learnとは?

ROS 2向け強化学習フレームワークです.Gazebo上でロボットアーム等の振る舞いを自律学習して,目標タスクを実行するモデルを自動生成します.openai_rosのROS 2対応版という位置づけで正しいと思われます.

Acutronic Roboticsが自社ロボットアームMARAをReference Modelとして強力に開発を推し進めていて,,,というところでお察しください( ´·௰·`)

公式ドキュメントなどはWebの海に離散しましたが,幾つか情報の痕跡が残っています.
- (ROS Discourse) Check out gym-gazebo2 and ROS2Learn!
- (arXiv) ROS2Learn: a reinforcement learning framework for ROS 2
- (YouTube) MARA robotic arm : Training MARAOrient using PPO2 through ROS2learn framework and gym-gazebo2.

GitHubリポジトリは残念ながらachivedされています.なんかあってもIssueも立てられんorz
- https://github.com/AcutronicRobotics/ros2learn

なんでこの記事書いたの?

筆者は2019年6月頃にコレを動かして遊んでみていました.なんかいろいろできそうだな〜と夢想している間に件の事態になったわけですが,,,
せっかく動かしてみた経験があってそのメモを残していたし,また『World MoveIt Day 2019 in Tokyo』の機会にまだちゃんと動くことが確認できたので,環境構築の整え方くらいでもROS2 Advent Calendar 2019に放流してみようかなと考えた次第です.

なお原典がすでに存在しませんので,情報の正当性は保証しません.いろいろ関係ツールをlatest versionにするとマズい可能性は高いので,なるべくそれぞれの当時のversionは明記しておきます.
違うバージョンでも動いたとか,この手順通りにやったら動かん!とか情報ありましたらお知らせくださいませ(でもどうすんだ??

前提とする環境

  • Ubuntu 18.04.2 LTS
  • ROS 2 Dashing
  • Gazebo 9

Eloquentがリリースされましたが,さてどうでしょうか??

ROS2Learnの環境構築

千里の路もgit cloneからです.

cd ~ && git clone -b dashing https://github.com/acutronicRobotics/ros2learn.git 
cd ros2learn 
git submodule update --init --recursive

各モジュールをインストールしていきます.

gym_gazebo2 の準備

https://github.com/AcutronicRobotics/gym-gazebo2

これもあくとろ(ry

INSTALL.mdに従ってやっていきます.

Gazebo のインストール

いつもどおりですね.
9.9.0以上が良いとのことで,古ければインストールし直しましょう.当方の環境では9.11.0で動いてます.

curl -sSL http://get.gazebosim.org | sh

依存パッケージのインストール

DDSはOpenSpliceを使ってほしいようです.今なら直っているかも.
あとTensorflow入れてます.当方の環境では当時入れた1.14.0ですが,最新の2.0.0でも動くかは分かりません.

sudo apt update && sudo apt install -y \
ros-dashing-action-msgs \
ros-dashing-message-filters \
ros-dashing-yaml-cpp-vendor \
ros-dashing-urdf \
ros-dashing-rttest \
ros-dashing-tf2 \
ros-dashing-tf2-geometry-msgs \
ros-dashing-rclcpp-action \
ros-dashing-cv-bridge \
ros-dashing-image-transport \
ros-dashing-camera-info-manager

# Install OpenSplice RMW implementation. Required for dashing until default FastRTPS is fixed.
sudo apt install ros-dashing-rmw-opensplice-cpp

sudo apt update && sudo apt install -y \
  build-essential \
  cmake \
  git \
  python3-colcon-common-extensions \
  python3-pip \
  python-rosdep \
  python3-vcstool \
  python3-sip-dev \
  python3-numpy \
  wget

# Install TensorFlow CPU. Feel free to get the GPU version at https://www.tensorflow.org/install/gpu.
pip3 install tensorflow

# Additional utilities
pip3 install transforms3d billiard psutil

# Fast-RTPS dependencies
sudo apt install --no-install-recommends -y \
  libasio-dev \
  libtinyxml2-dev

MARA用のWorkSpace作成とビルド

mkdir -p ~/ros2_mara_ws/src
cd ~/ros2_mara_ws
wget https://raw.githubusercontent.com/AcutronicRobotics/MARA/dashing/mara-ros2.repos
vcs import src < mara-ros2.repos
wget https://raw.githubusercontent.com/AcutronicRobotics/gym-gazebo2/dashing/provision/additional-repos.repos
vcs import src < additional-repos.repos
# Avoid compiling erroneus package
touch ~/ros2_mara_ws/src/orocos_kinematics_dynamics/orocos_kinematics_dynamics/COLCON_IGNORE

HRIMを生成します.

cd ~/ros2_mara_ws/src/HRIM
sudo pip3 install hrim
hrim generate models/actuator/servo/servo.xml
hrim generate models/actuator/gripper/gripper.xml

ビルドしていきます.
ちょっとパッケージのビルド順にクセがあるようです.

source /opt/ros/dashing/setup.bash
cd ~/ros2_mara_ws
colcon build --merge-install --packages-skip individual_trajectories_bridge
# Remove warnings
touch ~/ros2_mara_ws/install/share/orocos_kdl/local_setup.sh ~/ros2_mara_ws/install/share/orocos_kdl/local_setup.bash

なお当方が試したときはrosidl_typesupport_openspliceにあったバグを踏み抜いたのですが,すでに修正対応されています.(参考

OpenAI Gymのインストール

まずpip3 uninstall gymしてから最新版をソースからビルドしようなっ!て言ってますが,怖いですねぇ.
当方のcommit tagは0.14.0です.

cd ~
git clone https://github.com/openai/gym
cd gym
pip3 install -e .

gym-gazebo2 のインストール

以上の操作を行ってから,ros2learn/内でgym-gazebo2をインストールします.

cd ~/ros2learn/environments/gym-gazebo2
pip3 install -e .

環境変数の設定

このへんはご自由に.私はこういうものは.bashrcでfunction alias拵えています.

cd ~/ros2learn/environments/gym-gazebo2
echo "source `pwd`/provision/mara_setup.sh" >> ~/.bashrc
echo "export RMW_IMPLEMENTATION=rmw_opensplice_cpp" >> ~/.bashrc
source ~/.bashrc

baselines パッケージのインストール

これなんなのかよく分かっていません.誰か教えてください.
OpenAIの標準的なアルゴリズムが実装されたパッケージだと思ってます.

cd ~/ros2learn/algorithms/baselines
pip3 install -e .

参考:https://github.com/AcutronicRobotics/baselines

依存ツールのインストール

Tensorboard で可視化できるのですが,その関係ですね.

pip3 install pandas
pip3 install matplotlib
sudo apt install python3-tk

QuickStart with MARA_arm

まずはGazebo連携でMARAのモデルがちゃんと動くか確認します.

source ~/ros2learn/environments/gym-gazebo2/provision/mara_setup.sh
cd ~/gym-gazebo2/examples/MARA
python3 gg_random.py -g

-hオプションでなにができるか確認してみてください.

Train an agent

ロボットアームの姿勢制御がサンプルの学習対象のタスクのようです.

source ~/ros2learn/environments/gym-gazebo2/provision/mara_setup.sh
cd ~/ros2learn/experiments/examples/MARA
python3 train_ppo2_mlp.py -g

-gオプションを付与すれば,Gazeboが起動して学習状況を可視的に確認できます(PCはめっちゃ重くなります)
ずっと実行し続けて,やればやるほど精度が上がる(はず)です.

学習データは /tmp/ros2learn/MARA-v0/ppo2_mlp/<timestamp>/ に順次生成されていきます.

Run a trained policy

生成した学習データをもとに対象タスクを実行します.

$ cd ~/ros2learn/algorithms/baselines/baselines/ppo2/
$ vim default.py 
$ git diff 
$ diff --git a/baselines/ppo2/defaults.py b/baselines/ppo2/defaults.py
index cc7d4a2..4239923 100644
--- a/baselines/ppo2/defaults.py
+++ b/baselines/ppo2/defaults.py
@@ -51,7 +51,7 @@ def mara_mlp():
         # env_name = 'MARACollisionOrient-v0',
         transfer_path = None,
         # transfer_path = '/tmp/ros2learn/MARA-v0/ppo2_mlp/2019-02-19_12h47min/checkpoints/best',
-        trained_path = '/tmp/ros2learn/MARA-v0/ppo2_mlp/2019-04-02_13h18min/checkpoints/best'
+        trained_path = '/tmp/ros2learn/MARA-v0/ppo2_mlp/2019-06-12_14h37min/checkpoints/best'
     )

 def mara_lstm():

$ cd ~/ros2learn/experiments/examples/MARA
$ python3 run_ppo2_mlp.py -g -r -v 0.3

defaults.py を編集して,実行する学習データを指定します.
-g はGazebo起動で可視化
-r は実速度 (real speed) で実行
-v は速度 (velocity) の指定 (この例では 0.3 rad/sec)
という感じです.

Visualize training data on tensorboard

tensorboard機能で学習データの各種統計情報を表示することができます.

$ tensorboard --logdir=/tmp/ros2learn/MARA-v0/ppo2_mlp --port 8008 

としてやって,ブラウザで http://localhost:8008/ にアクセスしてみてください.

所感

当時のメモをそのまま貼り付けてみます.

  • Gazebo準拠に作成したモデルを用いて強化学習を実行するには非常に便利そう
    • Gazeboで学習経過/推論結果を可視的に確認できるのも良い
    • 対象のモデルとタスクを作成するのが大変ではあるかもしれない
    • 例えばTurtleBot3は,ROBOTIS公式のデモのものを流用するなど?
  • 学習アルゴリズムのスクリプトを自作することもできそう
    • baseline が標準のものだが,詳細は不明(TensorFlowの一般的なもの?)
    • 様々な学習モデルやアルゴリズムの比較評価をできそう
  • 自前ロボット適用には用意すべきファイルは多そう
    • 基本は,GazeboモデルとPython3スクリプトの作成になる模様

まだワカラナイことが多すぎるので,いろいろ深堀りしてみたさはありますが,はてさて,,,

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む