20210620のdockerに関する記事は6件です。

DockerでSageMathを使ってみた

コンテナを起動する イメージをプルします。 docker pull sagemath/sagemath コンテナを起動します。 docker run -it sagemath/sagemath:latest /bin/bash 以下のように sage ユーザーでコンテナに入ることができました。 sage@a5eac0d438c4:~$ sage コマンドで SageMath を起動します。 sage@a5eac0d438c4:~$ sage ---------------------------------------------------------------------- | SageMath version 9.1, Release Date: 2020-05-20 | | Using Python 3.7.3. Type "help()" for help. | ---------------------------------------------------------------------- 例)$\mathbb{Q}$ 上の多項式環 sage: R.<x> = PolynomialRing(QQ) sage: R Univariate Polynomial Ring in x over Rational Field sage: f = x^2 - 1 sage: f.factor() (x - 1) * (x + 1) exit で対話モードを終了します。 sage: exit ファイルの入出力 ファイルを作成するために vim コマンドをインストールします。 sage@a5eac0d438c4:~$ sudo apt-get update sage@a5eac0d438c4:~$ sudo apt-get install vim 自分のプログラムコードを保存するディレクトリを作成し、sample.sage というファイルを作成します。 sage@a5eac0d438c4:~$ mkdir mysrc sage@a5eac0d438c4:~$ cd mysrc/ sage@a5eac0d438c4:~/mysrc$ vim sample.sage 10までの素数を output.txt ファイルに出力するプログラムを作成します。 sample.sage file = open('output.txt', 'w') for i in range(10): if is_prime(i): file.write(f'{i}は素数です。\n') file.close() sage コマンドで sagemath を起動します。 sage@a5eac0d438c4:~/mysrc$ sage 実行するには以下のように load(ファイル名) を実行します。 sage: load('sample.sage') 実行が完了したら一旦 sage を終了します。 sage: exit output.txt ファイルを確認します。 sage@a5eac0d438c4:~/mysrc$ cat output.txt 2は素数です。 3は素数です。 5は素数です。 7は素数です。 参考記事 プログラミング https://doc.sagemath.org/html/ja/tutorial/programming.html
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ちょ、、待てよ、、ちゃんとNGINX入るやん

はじめに 前回までにGCP上にUbuntuインスタンスを構築し、その中でDockerを動かすことに成功。 仮想技術を使って仮想化ソフトウェアをインストールという仮想仮想した世界に戸惑うも興奮冷めやまない夜を過ごすのであった。 Docker上にNGINXをインストールしてみた そもそも私がNGINXというソフトウェアを知ったのは3年前のことである。 新人営業時代に、これからはMicro Service時代だ!Micro Serviceを支える技術を提供しよう!ということで、疎結合環境をAPIゲートウェイ機能によって柔軟にコントロールするNGINX Plusという製品に初めて出会う。 そのときに初めてNGINXとはWebServerであり、OSSとして世界中で使用されているデファクトに近い存在だということを知った。 他にもWeb Serverを調べてみると、「Apache」なども有名なようだったが、過去の経験から今回はNGINXをインストールすることにする。 実践 まずはDocker環境にNGINXをインストールする手順を調べるべく、Google先生に相談。 すると、NGINXの公式ブログにたどり着く。 https://www.nginx.com/blog/deploying-nginx-nginx-plus-docker/ こちらをざっと読むと、 1.コンテナ環境でデフォルトのNGINXをインストールするコマンドを入力(1行) 2.Docker環境の確認 ちょ、、待てよ、、コマンド1つで終了だと!? ただ公式がそう言っているなら信じるしかない。 信じてやってみよう。 1.コンテナ環境でデフォルトのNGINXをインストールするコマンドを入力 公式に記載されているコマンドを読み解いてみると、いくつか発見があった。 1行のコマンドの中でいくつもの命令が含まれている。 ①Docker起動 ②名前の決定 ③ポート番号の決定 ④Docker Hubからインストールするイメージ決定 $ docker run --name mynginx1 -p 80:80 -d nginx 今回は名前を「chomateyonginx」とする。 実際に実行。 ステータスに「Download newer image for nginx:latest」の文字が! 2.Docker環境の確認 ステータスにも「最新版のNGINXをダウンロードしました」と表示されているが、本当にインストールされているのかDocker環境を確認してみる。 $ sudo docker ps 私が2日ほど寝かせたため41時間前の表示にはなっているが、「chomateyonginx」と命名したDocker上のNGINX環境が構築されていることがわかる。 ちょ、、待てよ、、一行しかコード書いてないのにちゃんとNGINXが入っている、、 では、NGINX上ではどのように表示されているんだ!? と、いうことでlocalhostでアクセスし、確認してみる。 $ curl http://localhost ぬおおおおお。 NGINX:「Welcome to nginx!」 私:「お会いできて光栄です。結婚してください」 NGINX:「世界から求められるためあなただけを選ぶことができません」 私:「・・・ちょ、、待てよ、、」 終劇 次回 NGINXのデフォルトhtmlを編集してみる
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ちょ、、待てよ、、ちゃんとNGINX入るじゃん

はじめに 前回までにGCP上にUbuntuインスタンスを構築し、その中でDockerを動かすことに成功。 仮想技術を使って仮想化ソフトウェアをインストールという仮想仮想した世界に戸惑うも興奮冷めやまない夜を過ごすのであった。 Docker上にNGINXをインストールしてみた そもそも私がNGINXというソフトウェアを知ったのは3年前のことである。 新人営業時代に、これからはMicro Service時代だ!Micro Serviceを支える技術を提供しよう!ということで、疎結合環境をAPIゲートウェイ機能によって柔軟にコントロールするNGINX Plusという製品に初めて出会う。 そのときに初めてNGINXとはWebServerであり、OSSとして世界中で使用されているデファクトに近い存在だということを知った。 他にもWeb Serverを調べてみると、「Apache」なども有名なようだったが、過去の経験から今回はNGINXをインストールすることにする。 実践 まずはDocker環境にNGINXをインストールする手順を調べるべく、Google先生に相談。 すると、NGINXの公式ブログにたどり着く。 https://www.nginx.com/blog/deploying-nginx-nginx-plus-docker/ こちらをざっと読むと、 1.コンテナ環境でデフォルトのNGINXをインストールするコマンドを入力(1行) 2.Docker環境の確認 ちょ、、待てよ、、コマンド1つで終了だと!? ただ公式がそう言っているなら信じるしかない。 信じてやってみよう。 1.コンテナ環境でデフォルトのNGINXをインストールするコマンドを入力 公式に記載されているコマンドを読み解いてみると、いくつか発見があった。 1行のコマンドの中でいくつもの命令が含まれている。 ①Docker起動 ②名前の決定 ③ポート番号の決定 ④Docker Hubからインストールするイメージ決定 $ docker run --name mynginx1 -p 80:80 -d nginx 今回は名前を「chomateyonginx」とする。 実際に実行。 ステータスに「Download newer image for nginx:latest」の文字が! 2.Docker環境の確認 ステータスにも「最新版のNGINXをダウンロードしました」と表示されているが、本当にインストールされているのかDocker環境を確認してみる。 $ sudo docker ps 私が2日ほど寝かせたため41時間前の表示にはなっているが、「chomateyonginx」と命名したDocker上のNGINX環境が構築されていることがわかる。 ちょ、、待てよ、、一行しかコード書いてないのにちゃんとNGINXが入っている、、 では、NGINX上ではどのように表示されているんだ!? と、いうことでlocalhostでアクセスし、確認してみる。 $ curl http://localhost ぬおおおおお。 NGINX:「Welcome to nginx!」 私:「お会いできて光栄です。結婚してください」 NGINX:「世界から求められるためあなただけを選ぶことができません」 私:「・・・ちょ、、待てよ、、」 終劇 次回 NGINXのデフォルトhtmlを編集してみる
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Docker】-v バインドマウントでエラーが出る時の原因 no.16

こんにちは、まゆみです。 Dockerについての記事をシリーズで書いています。 前回・前々回の記事で、『-vオプション』についての記事を書いてきました。 ただ、-vオプションの記事を読みながらコマンドをうっているはずなのに、上手くいかない場合もあると思います。 今回の記事では、-v オプションを使って、バインドマウントをした途端うまく行かなかった時にエラーとして最も考えられそうなことを書いていこうと思います。 では早速はじめて行きますね。 『-v』バインドマウントでエラーが出てしまう時に考えられるエラー local側に必要なdependencyがない。 順調にうまく進んでいたのに、『-v』オプションを付けて実行した瞬間上手くいかなくなった人は、まず、ローカルホスト側に『node_modules』など、ソースコードに必要なモジュールがあるかどうか確かめてください。(モジュールがある人は、今回の記事の内容には、あなたの求めている情報は書かれていないと思います。<(_ _)>) node.jsを使って何かアプリを作る時、必要なdependenciesは『package.json』に書かれていますよね。(下記のスクショ参考) localホスト側に、必要なdependenciesがないと、上手く実行されない原因になります localホスト側にnode_modulesが無いのがどうして原因なの? Dockerfile で『RUN npm install』でインストールしてイメージを作っているのに、なぜ、localホストにnode_modulesがないといけないの?? そんな疑問がわいてくるかも知れません。 一言でその答えを書くと『-v バインドマウント』で、DockerfileでCOPY した内容が全て上書きされるからです。 ですが、その後 -v <マウントさせたいフォルダー> : <Container側のフォルダー> build context全体をマウントさせると、先ほどDockerfileでCOPY したファイルを全て上書きするような形になってしまうのです。 ちょうど下記のイラストのような状況になります そうすると、localホスト側に『node_modules』がないのが、エラーの原因になってしまいます。 解決方法 上記のことが原因で『-v』でマウントした瞬間上手くいかなくなった場合、 Anonymous Volume を使う と解決する事ができます。 Anonymous Volumeって何? Anonymous Volumeは前回の記事で、Named Volumeと比較しながら軽く説明させていただいていました。 Named (名前の付けられた) Anonymous (匿名の) という言葉が示している通り、Anonymous Volumeは そのContainerに付属して作られた 他のContainerとは共有のできないVolume になります 今回の記事で取り上げているような原因でバインドマウントがうまく行かなくなった時、Anonymous Volumeを使って解決します。 具体的なコマンドを示します。 下記のコマンドになります。 -v /app/node_modules Named Volumeでは、:(コロン)をはさんで、ホスト側のディレクトリとContainer側のディレクトリーを指定しました。 -v <ホスト側> : <container側> 『Container側のディレクトリーから、ホスト側のディレクトリ―を参照してくれ』ということですね。 ただ、Anonymous Volumeは:(コロン)を書かず、Container側のディレクトリ―のみ指定します。 Anonymous Volumeを使うと『Containerの外の物を参照するな。Container側の指定されたディレクトリーをそのままプレイスフォルダーとして使ってくれ』という意味になります。 上記のスクショのようにコマンドを実行しました。 そしてlocalホスト側を見ると『node_modules』があるように表示されています まとめ 今回の記事はここで締めくくらせていただきます。 『-v』は理解に苦しんだところなのですが、1週間かけて、理解する事ができました。 少しでもこの記事が役に立てれば幸いです。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【GCP】Kubeflow Pipelinesを使った機械学習パイプラインの自動化

はじめに  初めてのQiitaへの投稿です。独学で半年間、機械学習やGCPについて学習した物のアウトプットとして記事を書きました。間違いなどございましたら、ご指摘いただけると幸いです。随時加筆修正します。  この記事では、Kubeflow Pipelinesを用いた機械学習パイプラインの自動化について記述します。Courseraの講座をベースにしました(当該GitHubレポジトリはこちら)。MLOpsについては、GCPのドキュメントが参考になります。今回使用するコードはこちらのレポジトリにあります。 目的  今回は、24時間分の気象データから、翌24時間分の気温の予測を作成するシステムを作ります。新しいコンポーネントの追加が容易なパイプラインの作成や、パイプラインの実行を自動化することを目的としています。これは、複数の機械学習パイプラインをサービスに組み込むような場面に有用だと考えられます。そして、これを実現するために今回はKubeflow PipelinesとGoogle Cloud Platformの各種サービスを利用します。 Architecture  今回作成するアーキテクチャの概要は、上のようになります。BigQueryからのデータ抽出、TensorFlow Transformでの前処理、Cloud Storageへの保存においては、Dataflowを利用します。モデルの訓練・評価・デプロイについては、AI Platformです。また、これらをKubeflow pipelinesを用いてひとまとめにし、リモートレポジトリへのpushなどの事前作成したトリガーによって自動で実行します。ここでは、Cloud Buildを用います。  また、BeautifulSoupを用いたスクレイピングで学習のためのデータセットの準備や、1日1回の追加のデータ収集をします。そして、収集したデータから予測を作成します(Cloud Functions/Cloud Scheduler)。 Requirements Python 3.7 TensorFlow 2.5.0 TensorFlow Tansform 1.1.0 TensorFlow Addons 0.13.0 Kubeflow Pipelines GCP AI Platform Cloud Storage BigQuery Dataflow Cloud Build Cloud Functions Cloud Scheduler Pub/Sub ディレクトリ構造 今回使用するディレクトリの構成は次のようになっています。 下準備  パイプラインの作成に入る前に、データセットとクラスターの作成をします。 データセットの作成 使用ファイル - weather_scrayper.py  まず始めに、今回使用するデータセットの準備をします。気象庁の過去の気象データ検索からスクレイピングし、BigQueryのテーブルにアップロードします。取得するデータは、東京都の1時間ごとの値で、「時、気圧(現地、海面)、降水量、気温、湿度、風向、風速、日照時間、全天日射量」の列を使用します。本サイトでは、毎日午前2時頃に前日分のデータが更新されます。  BeautifulSoupを用いてWebスクレイピングを行います。こちらの記事を参考にしました。ここで、BigQueryのテーブルへの書き込みを検討しましたが、大量の書き込みは不正なリクエストとして検知されてしまいます。そのため、今回はCSVファイルとして書き込んでからBigQueryにアップロードします。取得するデータの期間は、2011年1月1日の午前1時〜プログラム実行日の午前0時です。  こちらのプログラムを実行後、CSVファイルが作成されたら、BigQueryへとアップロードします。この際、スキーマは自動取得ではなく、以下の様に設定します。詳細設定から、ヘッダーの1行分をスキップします。テーブル名はtokyoとしました。  アップロードジョブが終了したら、レコードの確認をします。日照時間・全天日射量の欠損値は0.0で埋めています。 GKEクラスターの作成  次の手順で、GKEクラスターの作成とデプロイをします。  クラスターの作成後、画面最下部よりデプロイします。最後に、こちらを参考に、Kuberneteシークレットを使用して、クラスタにGoogle CloudリソースとAPIへのアクセスを許可します。比較的時間のかかるトレーニングジョブを実行した際にBrokenPipeLineが出てしまうので、それを防ぐためにこの設定をします。エラーの詳細は、こちらのissueが参考になります。 Kubeflow Pipelines  ここから、パイプラインを構成するcomponentsの作成を開始します。こちらの記事が参考になります。今回は、Predefined componentsとLightweight Python componentsを使います。前者は、kfp.ComponentStore.load_compontsを用いて簡単に利用できます。GCPから利用できるPredifined componentsは、こちらから確認できます。以下では、AI platform上でのトレーニングジョブやトレーニング済みモデルのデプロイを行うcomponentsをロードしています。 model_training_pipeline.py import kfp # COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/gcp/' COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX') # Create component factories component_store = kfp.components.ComponentStore( local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX]) # Predefined components # component.yamlがあるディレクトリを指定 mlengine_train_op = component_store.load_component('ml_engine/train') mlengine_deploy_op = component_store.load_component('ml_engine/deploy')  後者は、kfp.components.func_to_container_opによって、Pythonで定義された関数をcomponentに変換するものです。 model_training_pipeline.py from helper_components import retrieve_best_run from helper_components import evaluate_model from preprocess_dataflow_pipeline import run_transformation_pipeline import kfp from kfp.components import func_to_container_op BASE_IMAGE = os.getenv('BASE_IMAGE') # docker_images/base_image/Dockerfile TRANSFORM_BASE_IMAGE = os.getenv('TRANSFORM_BASE_IMAGE') # docker_images/transform_image/Dockerfile EVALUATE_IMAGE = os.getenv('EVALUATE_IMAGE') # docker_images/evaluate_image/Dockerfile # Lightweight components run_transform_pipeline_op = func_to_container_op( run_transformation_pipeline, base_image=TRANSFORM_IMAGE) retrieve_best_run_op = func_to_container_op( retrieve_best_run, base_image=BASE_IMAGE) evaluate_model_op = func_to_container_op( evaluate_model, base_image=EVALUATE_IMAGE)  最後に、パイプライン上で必要なパラメータを引数としてパイプラインを定義します。 model_training_pipeline.py # Defining the pipeline @kfp.dsl.pipeline( name='Weather-forecast Model Training', description='The pipeline training and deploying the Weather-forecast pipeline' ) def weather_forecast_train(project_id, gcs_root, region, source_table_name, num_epochs_hypertune, num_epochs_retrain, num_units, evaluation_metric_name, evaluation_metric_threshold, model_id, version_id, replace_existing_version, hypertune_settings=HYPERTUNE_SETTINGS):  以下でそれぞれのcomponentsの詳細とパイプラインでの呼び出しを記述していきます。 データ抽出〜保存 使用ファイル: - pipeline/preprocess_dataflow_pipeline.py  ここではDataflowを用いて以下のフローを実行します。 BigQueryからのデータ抽出 Tensorflow Transformでの前処理 Cloud Storageへの保存  以下で、それぞれの処理をApache Beamのプログラミングモデルを用いて記述し、最後にDataflowのジョブとしてまとめて実行するrun_transformation_pipeline関数の記述をします。このステップでは、こちらのチュートリアルとこちらのドキュメントを参考にしました。 データ抽出  まず、BigQueryのテーブルからデータの抽出を行い、Train,Valid,Testの3つのデータセットを作成します。後述しますが、1日1回のスクレイピングでテーブルにレコードを自動追加されます。そのため、プログラムの実行タイミングによって、3つのデータセットの比率が変わらないよう動的に変化するクエリを記述します。ここではUNIX時間を使って、最初のレコードのDate(2011-01-01 01:00:00)からプログラムの実行までの経過時間を80:10:10の割合で分割し、テンプレートクエリ中のstartとendの値を決めます。 preprocess_dataflow_pipeline.py # Generating the query # train, valid, testセットのデータ量の比率が常に一定であるように、UNIX時間を使ってクエリを変化させる def generate_sampling_query(source_table_name, step): # Setting timestamp division start = datetime(2011, 1, 1, 1, 0, 0) end = datetime.now() diff = end.timestamp() - start.timestamp() train_start = start.timestamp() train_end = train_start + diff * 0.8 valid_end = train_end + diff * 0.1 test_end = valid_end + diff * 0.1 train_start = datetime.fromtimestamp(train_start) train_end = datetime.fromtimestamp(train_end) valid_end = datetime.fromtimestamp(valid_end) test_end = datetime.fromtimestamp(test_end) valid_start = train_end test_start = valid_end # Template query sampling_query_template=""" SELECT * FROM `{{source_table}}` WHERE Date BETWEEN '{{start}}' AND '{{end}}' ORDER BY Date """ # Changing query dependging on steps if step == "Train": start, end = train_start, train_end elif step == "Valid": start, end = valid_start, valid_end else: start, end = test_start, test_end query = Template(sampling_query_template).render( source_table=source_table_name, start=start, end=end) return query  読み込みの前にデータ形式の変換を行います。まず、Datetime形式のデータは読み取りができないので、Date列をtimestamp形式に変換します。また、window_direction列を文字列から、360°表記に変換します。 preprocess_dataflow_pipeline.py def prep_bq_row(bq_row): result = {} for feature_name in bq_row.keys(): result[feature_name] = bq_row[feature_name] date_time = pd.to_datetime(bq_row["Date"]) time_stamp = pd.Timestamp(date_time) result["Date"] = time_stamp.timestamp() wind_direction = tf.strings.regex_replace(bq_row["wind_direction"], "[\s+)]", "") wind_direction = tf.strings.regex_replace(wind_direction, "[x]", u"静穏") direction_list = [ "北", "北北東", "北東", "東北東", "東", "東南東", "南東", "南南東", "南", "南南西", "南西", "西南西", "西", "西北西", "北西", "北北西", "静穏" ] degree_list = [ 0.0, 22.5, 45.0, 67.5, 90.0, 112.5, 135.0, 157.5, 180.0, 202.5, 225.0, 247.5, 270.0, 292.5, 315.0, 337.5, 0.0 ] def direction_to_degree(direction): if direction in direction_list: index = direction_list.index(direction) return degree_list[index] else: return 0.0 result["wind_direction"] = direction_to_degree(wind_direction) return result  最後に、Apache BeamのI/Oコネクターを用いて、BigQueryからデータの読み込みを行います。 preprocess_dataflow_pipeline.py def read_from_bq(pipeline, source_table_name, step): # Generate query query = generate_sampling_query(source_table_name, step) # Read data from BigQuery raw_data = ( pipeline | 'Read{}DatafromBigQuery'.format(step) >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True)) ) raw_dataset = (raw_data, raw_metadata) return raw_dataset TensorFlow Transform  Tensorflow Transformの変換には、AnalyzeフェーズとTransformフェーズとがあります。前者で、未加工のTrainデータを全走査し、変換に必要な数値を取得します。そして、後者でデータの変換を行います。Trainデータから得た統計量(平均・分散、最小・最大値など)を用いてTrain、Valid、Testデータをそれぞれ変換します。また、同じ変換プロセスをサービング後の予測時にも用いることで、Training-Serving skewを防ぐことができます。 前処理  前項で抽出したデータにTensorFlow Transformを用いて前処理を加えます。まず前処理関数の定義です。こちらのドキュメントを参考にしました。  Date列は周期性を表現できるように、sin波・cos波を使って変換します。また、wind_directionとwind_velocity列を併せてベクトルに変換します。また、air_pressure_ashoreとair_pressure_afloatの外れ値をクリップします。最後に、変数の正規化・標準化を行います。temperature列の平均・分散は、モデルの評価やサービング時の予測後のリスケーリングに使用するので、それぞれを1つの特徴量として一緒に書き込みます。(参照:https://github.com/tensorflow/transform/issues/185) preprocess_dataflow_pipeline.py def preprocess_fn(inputs): outputs = {} # Date timestamp_s = inputs["Date"] day = 24 * 60 * 60 year = 365.2425 * day outputs["day_sin"] = tf.sin(timestamp_s * 2 * math.pi / day) outputs["day_cos"] = tf.cos(timestamp_s * 2 * math.pi / day) outputs["year_sin"] = tf.sin(timestamp_s * 2 * math.pi / year) outputs["year_cos"] = tf.cos(timestamp_s * 2 * math.pi / year) # Air pressure STANDARDIZED_FEATURES_LIST = ["air_pressure_ashore", "air_pressure_afloat"] for feature in STANDARDIZED_FEATURES_LIST: outputs[feature] = tft.scale_to_0_1(tf.clip_by_value(inputs[feature], 860.0, 1100.0)) outputs["diff_air_pressure"] = outputs["air_pressure_ashore"] - outputs["air_pressure_afloat"] # Wind wind_direction_rad = inputs["wind_direction"] * math.pi / 180.0 outputs["wind_vector_x"] = inputs["wind_velocity"] * tf.cos(wind_direction_rad) outputs["wind_vector_y"] = inputs["wind_velocity"] * tf.sin(wind_direction_rad) # Others # Normalizing numerical features NORMALIZED_FEATURES_LIST = ["precipitation", "temperature", "humidity", "hours_of_daylight", "global_solar_radiation"] for feature in NORMALIZED_FEATURES_LIST: outputs[feature] = tft.scale_to_z_score(inputs[feature]) # Calcurating stats of Temperature and Converting to feature # preprocess_fn()で変換したデータに、trainセットのtemperature列の平均と分散が追加される def feature_from_scalar(value): batch_size = tf.shape(input=inputs["temperature"])[0] return tf.tile(tf.expand_dims(value, 0), multiples=[batch_size]) outputs["temp_mean"] = feature_from_scalar(tft.mean(inputs['temperature'])) outputs["temp_var"] = feature_from_scalar(tft.var(inputs['temperature'])) return outputs  上記の前処理関数をtrainセットに適用します。tft_beam.AnalyzeAndTransformDataset関数で、Analyze・Transformの両フェイズを実行します。 preprocess_dataflow_pipeline.py # Analyze and transform train dataset def analyze_and_transform(raw_dataset, step): transformed_dataset, transform_fn = ( raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocess_fn) ) return transformed_dataset, transform_fn  続いて、Validデータに同様の前処理関数を用いて変換を行います。繰り返しになりますが、ここでの変換には、testセットから作成された統計量が使用されます。 preprocess_dataflow_pipeline.py # Transform valid and test dataset def transform(raw_dataset, transform_fn, step): transformed_dataset = ( (raw_dataset, transform_fn) | '{}Transform'.format(step) >> tft_beam.TransformDataset() ) return transformed_dataset 保存  最後に、データセットをCSV形式に変換してCloud Storageへと保存します。testセットはTensorFlow Transformでの変換を加えずに保存します。 preprocess_dataflow_pipeline.py def to_train_csv(rawdata): TRAIN_CSV_COLUMNS = [ 'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure', 'precipitation', 'temperature', 'humidity', 'wind_vector_x', 'wind_vector_y', 'hours_of_daylight', 'global_solar_radiation', 'temp_mean', 'temp_var' ] data = ','.join([str(rawdata[k]) for k in TRAIN_CSV_COLUMNS]) yield str(data) def to_test_csv(rawdata): TEST_CSV_COLUMNS = [ 'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature', 'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation' ] data = ','.join([str(rawdata[k]) for k in TEST_CSV_COLUMNS]) yield str(data) # Cloud storageへの書き込み def write_csv(transformed_dataset, location, step): transformed_data, _ = transformed_dataset ( transformed_data | '{}Csv'.format(step) >> beam.FlatMap(to_csv) | '{}Out'.format(step) >> beam.io.Write(beam.io.WriteToText(location)) )  変換プロセスも保存しておきます。 preprocess_dataflow_pipeline.py # 変換プロセスの保存 def write_transform_artefacts(transform_fn, location): ( transform_fn | 'WriteTransformArtefacts' >> tft_beam.WriteTransformFn(location) ) Dataflowジョブの実行  以上のプロセスをDataflowで実行します。Kubeflow pipelinesのPredefined componentsから、Dataflowを実行できますが、このcomponentsは呼び出し時に、指定したpythonファイルを1つだけtmpディレクトリにダウンロードして実行するような設定になっているようです。[参考]そのため、beamパイプラインのoptionsでsetup_fileを指定できず、Dataflowジョブの実行時に、tensorflow-transformについて、"module not found error"が起きてしまいます。そこで今回は、Lightweight Python componentsを使います。  それでは上記の関数を使って、Dataflowジョブを実行するためのrun_transformation_pipeline関数の定義をしましょう。 preprocess_dataflow_pipeline.py from typing import NamedTuple def run_transformation_pipeline( source_table_name:str, job_name:str, gcs_root:str, project_id:str, region:str, dataset_location:str ) -> NamedTuple('Outputs', [('training_file_path', str), ('validation_file_path', str), ('testing_file_path', str)]): from datetime import datetime import os import tempfile import copy import tensorflow as tf import tensorflow_transform as tft import tensorflow_transform.beam as tft_beam from tfx_bsl.public import tfxio import apache_beam as beam from jinja2 import Template #------------------------------------------------------------------------------------- # 上記の関数の定義をする #------------------------------------------------------------------------------------- TRAINING_FILE_PATH = 'training/data.csv' VALIDATION_FILE_PATH = 'validation/data.csv' TESTING_FILE_PATH = 'testing/data.csv' options = { 'staging_location': os.path.join(gcs_root, 'tmp', 'staging'), 'temp_location': os.path.join(gcs_root, 'tmp'), 'job_name': job_name, 'project': project_id, 'max_num_workers': 3, 'save_main_session': True, 'region': region, 'setup_file': './setup.py', # pipeline内でtensorflow-transformを使えるようにsetup_fileの設定をする } opts = beam.pipeline.PipelineOptions(flags=[], **options) RUNNER = 'DataflowRunner' with beam.Pipeline(RUNNER, options=opts) as pipeline: with tft_beam.Context(temp_dir=tempfile.mkdtemp()): # Create training set step = "Train" training_file_path = '{}/{}'.format(dataset_location, TRAINING_FILE_PATH) tf_record_file_path = dataset_location raw_train_dataset = read_from_bq(pipeline, source_table_name, step) transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step) write_csv(transformed_train_dataset, training_file_path, step) # Create validation set step = "Valid" validation_file_path = '{}/{}'.format(dataset_location, VALIDATION_FILE_PATH) raw_eval_dataset = read_from_bq(pipeline, source_table_name, step) transformed_eval_dataset = transform(raw_eval_dataset, transform_fn, step) write_csv(transformed_eval_dataset, validation_file_path, step) # Create testing set step = "Test" testing_file_path = '{}/{}'.format(dataset_location, TESTING_FILE_PATH) raw_test_dataset = read_from_bq(pipeline, source_table_name, step) write_csv(raw_test_dataset, testing_file_path, step) # Sarving artefacts transform_artefacts_dir = os.path.join(gcs_root,'transform') write_transform_artefacts(transform_fn, transform_artefacts_dir) return (training_file_path, validation_file_path, testing_file_path, transform_artefacts_dir) run_transformation_pipeline()  それでは、最後にkfp.func_to_container_opを使って、このcomponentsを作成します。使用ファイルは、docker_images/transform_imageに格納されています。 model_training_pipeline.py import os from kfp.components import func_to_container_op TRANSFORM_IMAGE = os.getenv('TRANSFORM_IMAGE') run_transform_pipeline_op = func_to_container_op( run_transformation_pipeline, base_image=TRANSFORM_IMAGE)  以上で、このステップは終了です。次に、データセットの作成を行います。 データセットのwindow処理~モデルの保存 使用するファイル: ここでは、以下のフローを実行します。 - データセットのwindow処理 - Hyperparameter Tuning - 再トレーニング  tf.dataを用いてデータセットのwindow処理を行い、Cloud AI Platformの訓練ジョブでモデルのHyperparameter Tuningを行います。そして、最もパフォーマンスの良いハイパーパラメーターをセットして、再トレーニングを行います。 seq2seqモデル  今回使用するモデルはseq2seqです。ここでは、こちらの記事を参考にしました。このモデルを使って、24時間分の気象データの入力から、翌日の1時間ごとの気温の予測をします。 seq2seqモデルは、EncoderとDecoderからなるモデルです。Decoderの入力として、前時点での実現値(実際の気温)を使用する方法と、モデルの前時点での予測値を使用する方法とがあります。Teacher forcingでは、訓練時に前者、予測時に後者の方法を利用します。これは、訓練の収束を速めるために有効ですが、訓練時と予測時でモデルの評価値に大きな差が出る恐れがあります。今回は、前者の方法から後者の方法に段階的に切り替えるScheduled Samplingを使用します。まずは、このサンプリング方法に併せてデータセットの処理を行います。 window処理 使用ファイル: - docker_images/trainer_image/create_dataset.py  まず、window処理関数の定義をします。ここでは、こちらの記事を参考にしました。まず、48時点のデータを1まとめにして、前半の24時点をEncoderのインプットとします。後半の24時点はDecoderのインプットとアウトプットの作成に使います。訓練時のDecoderのインプットは、図のように最初の要素が0で、以降は前時点のDecoderアウトプットと同じ値を使います。 create_dataset.py from functools import partial import tensorflow as tf # Setting defaults CSV_COLUMNS = [ 'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure', 'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 'hours_of_daylight', 'global_solar_radiation', 'temp_mean', 'temp_var' ] SELECT_COLUMNS = [ 'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure', 'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 'hours_of_daylight', 'global_solar_radiation' ] DEFAULTS = [[0.0] for _ in range(len(SELECT_COLUMNS))] # Packing features def pack(features): packed_features = tf.stack(list(features.values()), axis=1) return tf.reshape(packed_features, [-1]) @tf.function def marshal(x, feature_keys): features = { k: x[:, feature_keys.index(k)] for k in feature_keys #pack時失われたkeyを付け直す } return features # Window processing def windowed_dataset(dataset, batch_size, mode): marshal_fn_partial = partial(marshal, feature_keys=SELECT_COLUMNS) dataset = dataset.map(pack) dataset = dataset.window(size=48, shift=1, drop_remainder=True) dataset = dataset.flat_map(lambda window: window.batch(48)) if mode == "train": dataset.shuffle(1000) encoder_input = dataset.map(lambda window: window[:24]).map(marshal_fn_partial) decoder_input = dataset.map(lambda window: tf.concat((tf.zeros((1)), window[24:-1, 8]), axis=0)) #Teacher Forcingのため、decoder_inputの先頭は、0にする decoder_output = dataset.map(lambda window: window[24:, 8]) inputs = tf.data.Dataset.zip((encoder_input, decoder_input)) dataset = tf.data.Dataset.zip((inputs, decoder_output)).cache() dataset = dataset.batch(batch_size, drop_remainder=True).repeat(1).prefetch(1) return dataset  tf.data.experimental.make_csv_dataset関数でロードしたデータに、window処理を施します。この際、select_columnsを設定して、temperature列の平均・分散であるtemp_mean/temp_varを省きます。 create_dataset.py # Loading dataset def load_dataset(filename, batch_size, mode): dataset = tf.data.experimental.make_csv_dataset( file_pattern=filename, column_names=CSV_COLUMNS, column_defaults=DEFAULTS, select_columns=SELECT_COLUMNS, batch_size=1, shuffle=False, header=False, num_epochs=1) dataset = windowed_dataset(dataset, batch_size, mode) return dataset モデルの作成 使用ファイル: - docker_images/trainer_image/create_model.py  KerasのFunctional APIを使ってseq2seqのモデルを作成します。まず、訓練時に使用するモデルです。 create_model.py import tensorflow as tf import tensorflow_addons as tfa # Creating model for training and evaluating def train_model(num_units=128, learning_rate=0.001, dropout_rate=0.35): SELECT_COLUMNS = [ 'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure', 'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 'hours_of_daylight', 'global_solar_radiation' ] # Input layer # tf.keras.experimental.SequenceFeaturesでの入力層は、モデルの保存ができず断念 encoder_input_layers = { colname: tf.keras.layers.Input(name=colname, shape=(24, 1), dtype=tf.float32) for colname in SELECT_COLUMNS } pre_model_input = tf.keras.layers.Concatenate(axis=-1, name="concatenate")(encoder_input_layers.values()) # Encoder encoder_lstm = tf.keras.layers.LSTM(num_units, return_sequences=True, name="encoder_lstm1")(pre_model_input) encoder_dropout = tf.keras.layers.Dropout(dropout_rate, name="encoder_dropout")(encoder_lstm) encoder_output, state_h, state_c = tf.keras.layers.LSTM(num_units, return_state=True, name="encoder_lstm2")(encoder_dropout) encoder_state = [state_h, state_c] # Scheduled Sampler sampler = tfa.seq2seq.sampler.ScheduledOutputTrainingSampler( sampling_probability=0., next_inputs_fn=lambda outputs: tf.reshape(outputs, shape=(1, 1)) ) sampler.sampling_probability = tf.Variable(0.) # Decoder decoder_input = tf.keras.layers.Input(shape=(24, 1), name="decoder_input") decoder_cell = tf.keras.layers.LSTMCell(num_units, name="decoder_lstm") output_layer = tf.keras.layers.Dense(1, name="decoder_output") decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell, sampler, output_layer=output_layer) decoder_output, _, _ = decoder(decoder_input, initial_state=encoder_state, sequence_length=[24]) final_output = decoder_output.rnn_output # Creating model model = tf.keras.Model( inputs=[encoder_input_layers, decoder_input], outputs=[final_output]) optimizer = tf.keras.optimizers.RMSprop(learning_rate) model.compile(loss="mse", optimizer=optimizer) return model, encoder_input_layers, encoder_state, decoder_cell, output_layer, sampler    次に、上記のモデルの学習済み層を利用して、予測用のモデルを作成します。Functional APIを使用して作成したモデルの保存を試みましたが、エラーの解消ができなかったため、サブクラス化しています。 create_model.py # Creating model for prediction # Functional APIではモデルの保存時にエラーが出るので、サブクラス化する def predict_model(encoder_input_layers, encoder_state, decoder_cell, output_layer): # Encoder Layer Class class Inference_Encoder(tf.keras.layers.Layer): def __init__(self, encoder_input_layers, encoder_state): super().__init__() self.model = tf.keras.models.Model(inputs=[encoder_input_layers], outputs=encoder_state) @tf.function def call(self, inputs): return self.model(inputs) # Decoder Layer Class class Inference_Decoder(tf.keras.layers.Layer): def __init__(self, decoder_cell, output_layer): super().__init__() # Inference sampler self.sampler = tfa.seq2seq.sampler.InferenceSampler( sample_fn = lambda outputs: tf.reshape(outputs, (1, 1)), sample_shape = [1], sample_dtype = tf.float32, end_fn = lambda sample_ids : False, ) self.decoder = tfa.seq2seq.basic_decoder.BasicDecoder( decoder_cell, self.sampler, output_layer=output_layer, maximum_iterations=24 ) @tf.function def call(self, initial_state): start_inputs = tf.zeros(shape=(1, 1)) decoder_output, _, _ = self.decoder(start_inputs, initial_state=initial_state) final_output = decoder_output.rnn_output return final_output # Inference Model Class class Inference_Model(tf.keras.Model): def __init__(self, encoder_input_layers, encoder_state, decoder_cell, output_layer): super().__init__() self.encoder = Inference_Encoder(encoder_input_layers, encoder_state) self.decoder = Inference_Decoder(decoder_cell, output_layer) @tf.function def call(self, inputs): inputs_copy = inputs.copy() # inputsは、transform_fnで処理したデータで、訓練セットの平均と分散が含まれている # rescaleのために、それらの統計量を取り出しておく temp_mean = inputs_copy.pop('temp_mean')[0][0] temp_var = inputs_copy.pop('temp_var')[0][0] initial_state = self.encoder(inputs_copy) outputs = self.decoder(initial_state) outputs_rescaled = outputs * tf.sqrt(temp_var) + temp_mean return outputs_rescaled inference_model = Inference_Model(encoder_input_layers, encoder_state, decoder_cell, output_layer) return inference_model モデルの保存 使用ファイル: - docker_images/trainer_image/saved_model.py  サービング時、TensorFlow Transformの前処理関数を適用できるよう、モデルにpreprocessing_layerを加えることがポイントです。(参考:https://github.com/tensorflow/tfx/issues/2199) saved_model.py import tensorflow as tf def export_serving_model(model, tf_transform_output, out_dir): TRANSFORM_FEATURE_COLUMNS = [ 'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature', 'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation' ] SELECT_COLUMNS = [ 'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure', 'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 'hours_of_daylight', 'global_solar_radiation', 'temp_mean', 'temp_var' ] # Building Model example = { x: tf.random.uniform(shape=(1, 24), name=x) for x in SELECT_COLUMNS } ex = model(example) # Transform raw features def get_apply_tft_layer(tf_transform_output): tft_layer = tf_transform_output.transform_features_layer() @tf.function def apply_tf_transform(raw_features_dict): unbatched_raw_features = { k: tf.squeeze(tf.reshape(v, (1, -1))) for k, v in raw_features_dict.items() } transformed_dataset = tft_layer(unbatched_raw_features) expanded_dims = { k: tf.reshape(v, (-1, 24)) for k, v in transformed_dataset.items() } return expanded_dims return apply_tf_transform def get_serve_raw_fn(model, tf_transform_output): model.preprocessing_layer = get_apply_tft_layer(tf_transform_output) @tf.function def serve_raw_fn(features): preprocessed_features = model.preprocessing_layer(features) return preprocessed_features return serve_raw_fn serving_raw_entry = get_serve_raw_fn(model, tf_transform_output) serving_transform_signature_tensorspecs = { x: tf.TensorSpec(shape=[None, 24], dtype=tf.float32, name=x) for x in TRANSFORM_FEATURE_COLUMNS } serving_signature_tensorspecs = { x: tf.TensorSpec(shape=[None, 24], dtype=tf.float32, name=x) for x in SELECT_COLUMNS } # Signatures signatures = {'serving_default': model.call.get_concrete_function(serving_signature_tensorspecs), 'transform': serving_raw_entry.get_concrete_function(serving_transform_signature_tensorspecs)} tf.keras.models.save_model(model=model, filepath=out_dir, signatures=signatures) モデルの訓練 使用ファイル: - docker_images/trainer_image/train.py  ここでは、上述のcreate_dataset.pyとcreate_model.py、saved_model.pyを使って、seq2seqモデルの訓練と保存を行います。Hyperparameter Tuningを行う際には、evalセットでのmseを用いてモデルの性能の評価をします。チューニング後に再訓練をしたら、学習済み層を用いて予測モデルを作成し、EncoderとDecoderに分けて保存します。 train.py import tensorflow as tf from tensorflow.keras.optimizers import RMSprop import tensorflow_transform as tft import fire import hypertune from create_dataset import load_dataset from create_model import train_model from create_model import predict_model from save_model import export_serving_model # Training and evaluating the model def train_evaluate(job_dir, training_dataset_path, validation_dataset_path, num_epochs, num_units, learning_rate, dropout_rate, hptune, transform_artefacts_dir): training_dataset = load_dataset(training_dataset_path + "*", 256, "train") validation_dataset = load_dataset(validation_dataset_path + "*", 128, "eval") print('Starting training: learning_rate={}, dropout_rate={}'.format(learning_rate, dropout_rate)) tf_transform_output = tft.TFTransformOutput(transform_artefacts_dir) model, encoder_input_layers, encoder_state, decoder_cell, output_layer, sampler = train_model( num_units=num_units, learning_rate=learning_rate, dropout_rate=dropout_rate ) def update_sampling_probability(epoch, logs): eps = 1e-16 proba = max(0.0, min(1.0, epoch / (num_epochs - 10 + eps))) sampler.sampling_probability.assign(proba) sampling_probability_cb = tf.keras.callbacks.LambdaCallback(on_epoch_begin=update_sampling_probability) history = model.fit(training_dataset, epochs=num_epochs, validation_data=validation_dataset, callbacks=[sampling_probability_cb] ) # Hyperparameter tuning if hptune: val_loss = history.history["val_loss"] print("val_loss: {}".format(val_loss)) hpt = hypertune.HyperTune() hpt.report_hyperparameter_tuning_metric( hyperparameter_metric_tag='val_loss', metric_value=val_loss[-1]) # Saving the model if not hptune: inference_model_dir = '{}/predict'.format(job_dir) inference_model = predict_model(encoder_input_layers, encoder_state, decoder_cell, output_layer) export_serving_model(inference_model, tf_transform_output, inference_model_dir) print('Inference model saved in: {}'.format(inference_model_dir)) # Execution if __name__ == '__main__': fire.Fire(train_evaluate)  それでは、以上をまとめて、Kubeflow pipelinesのPredefined componentsからCloud AI Platformの訓練ジョブを実行します。まずHyperparameter Tuningを行います。ここでは、Optimizerのlearning_rateとDropoutの割合(dropout_rate)をチューニングします。以下がチューニングの設定です。設定については、こちらが参考になります。 model_training_pipeline.py HYPERTUNE_SETTINGS = """ { "hyperparameters": { "goal": "MINIMIZE", "maxTrials": 3, "maxParallelTrials": 3, "hyperparameterMetricTag": "val_loss", "enableTrialEarlyStopping": True, "params":[ { "parameterName": "learning_rate", "type": "DOUBLE", "minValue": 0.00001, "maxValue": 0.1, "scaleType": "UNIT_LOG_SCALE" }, { "parameterName": "dropout_rate", "type": "DOUBLE", "minValue": 0.1, "maxValue": 0.4, "scaleType": "UNIT_LOG_SCALE" } ] } } """ ```  ```python:model_training_pipeline.py # Tune hyperparameters tune_args = [ '--training_dataset_path', create_dataset.outputs["training_file_path"], '--validation_dataset_path', create_dataset.outputs["validation_file_path"], '--num_epochs', num_epochs_hypertune, '--num_units', num_units, '--hptune', 'True', '--transform_artefacts_dir', create_dataset.outputs["transform_artefacts_dir"] ] job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir/hypertune', kfp.dsl.RUN_ID_PLACEHOLDER) hypertune = mlengine_train_op( project_id=project_id, region=region, master_image_uri=TRAINER_IMAGE, job_dir=job_dir, args=tune_args, training_input=hypertune_settings).apply(use_gcp_secret()) # Kubernetesシークレットを使用しないと、長時間の訓練が途中で停止します。  訓練が終了したら、最もパフォーマンスが良かったパラメーターの組み合わせを取得します。(参照:https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs?hl=ja) helper_components.py from typing import NamedTuple def retrieve_best_run( project_id:str, job_id:str ) -> NamedTuple('Outputs', [('metric_value', float), ('learning_rate', float), ('dropout_rate', float)]): from googleapiclient import discovery from googleapiclient import errors ml = discovery.build('ml', 'v1') job_name = 'projects/{}/jobs/{}'.format(project_id, job_id) request = ml.projects().jobs().get(name=job_name) try: response = request.execute() print(response) except errors.HttpError as err: print(err) except: print('Unexpected error') best_trial = response['trainingOutput']['trials'][0] print("best_trial:", best_trial) metric_value = best_trial['finalMetric']['objectiveValue'] learning_rate = float(best_trial['hyperparameters']['learning_rate']) dropout_rate = float(best_trial['hyperparameters']['dropout_rate']) return (metric_value, learning_rate, dropout_rate) model_training_pipeline.py #Retrive the best trial get_best_trial = retrieve_best_run_op( project_id, hypertune.outputs['job_id'])  それでは、改めて訓練ジョブを実行し、学習済みモデルをCloud Storageに保存します。 model_training_pipeline.py # Re-training the model job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir', kfp.dsl.RUN_ID_PLACEHOLDER) train_args = [ '--training_dataset_path', create_dataset.outputs["training_file_path"], '--validation_dataset_path', create_dataset.outputs["validation_file_path"], '--num_epochs', num_epochs_retrain, '--num_units', num_units, '--learning_rate', get_best_trial.outputs['learning_rate'], '--dropout_rate', get_best_trial.outputs['dropout_rate'], '--hptune', 'False', '--transform_artefacts_dir', create_dataset.outputs["transform_artefacts_dir"] ] train_model = mlengine_train_op( project_id=project_id, region=region, master_image_uri=TRAINER_IMAGE, job_dir=job_dir, args=train_args).apply(use_gcp_secret()) モデルの評価 使用ファイル: - pipelines/helper_components.py - docker_images/evaluate_image/create_dataset.py  まずはデータセットの作成についてですが、訓練時とは違いDecoderへのインプットはありません。 docker_images/evaluate_image/create_dataset.py import tensorflow as tf from functools import partial # Loading dataset def load_test_dataset(filename, batch_size): CSV_COLUMNS = [ 'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature', 'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation', 'weather', 'cloud cover' ] SELECT_COLUMNS = [ 'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature', 'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation' ] DEFAULTS = [[0.0] for i in SELECT_COLUMNS] # Packing features def pack(features): packed_features = tf.stack(list(features.values()), axis=1) return tf.reshape(packed_features, [-1]) @tf.function def marshal(x, feature_keys): features = { k: x[:, feature_keys.index(k)] for k in feature_keys } return features # Window processing def windowed_dataset(dataset, batch_size): marshal_fn_partial = partial(marshal, feature_keys=SELECT_COLUMNS) dataset = dataset.map(pack) dataset = dataset.window(size=48, shift=1, drop_remainder=True) dataset = dataset.flat_map(lambda window: window.batch(48)) x_test = dataset.map(lambda window: window[:24]).map(marshal_fn_partial).batch(batch_size, drop_remainder=True).repeat(1).prefetch(1) y_true = dataset.map(lambda window: window[24:, 4]).batch(batch_size, drop_remainder=True).repeat(1).prefetch(1) return x_test, y_true dataset = tf.data.experimental.make_csv_dataset( file_pattern=filename, column_names=CSV_COLUMNS, column_defaults=DEFAULTS, select_columns=SELECT_COLUMNS, header=False, batch_size=1, shuffle=False, num_epochs=1 ) x_test, y_true = windowed_dataset(dataset, batch_size) return x_test, y_true  ロードしたデータは、まずモデルのpreprocessing_layerで、前処理します。その後、予測値と実際の気温とを用いてmseを計算します。 pipeline/helper_components.py # モデルの予測値と、実現値を使ってモデルの評価を行う def evaluate_model( dataset_path: str, model_path: str, transform_artefacts_dir: str, metric_name: str ) -> NamedTuple('Outputs', [('metric_name', str), ('metric_value', float), ('mlpipeline_metrics', 'Metrics')]): import json import tensorflow as tf import numpy as np from create_dataset import load_test_dataset def calculate_loss(y_pred, y_true): mse = tf.keras.losses.MeanSquaredError() return mse(y_true, y_pred).numpy().astype(np.float64) model_path = '{}/predict'.format(model_path) model = tf.keras.models.load_model(model_path) x_test, y_true = load_test_dataset(dataset_path + "*", 256) x_test_transformed = x_test.map(model.preprocessing_layer) prediction = [] for item in x_test_transformed: prediction.append(model.predict(item)) y_pred = np.array(prediction).reshape(-1, 24) y_true = np.array(list(tf.data.Dataset.as_numpy_iterator(y_true))).reshape(-1, 24) if metric_name == "mse": metric_value = calculate_loss(y_pred, y_true) print("metric_value:", metric_value) else: metric_name = 'N/A' metric_value = 0 metrics = { 'metrics': [{ 'name': metric_name, 'numberValue': metric_value }] } return (metric_name, metric_value, json.dumps(metrics)) model_training_pipeline.py # Evaluatiing the model eval_model = evaluate_model_op( dataset_path=create_dataset.outputs['testing_file_path'], model_path=str(train_model.outputs['job_dir']), transform_artefacts_dir=create_dataset.outputs['transform_artefacts_dir'], metric_name=evaluation_metric_name) モデルのデプロイ 使用ファイル: - kfp-pipeline/custom_prediction/predictor.py - kfp-pipeline/custom_prediction/preprpcess.py - kfp-pipeline/custom_prediction/setup.py  事前に設定したevaluation_metric_thresholdよりtestセットでの評価指標が良い場合にはデプロイします。この際、カスタム予測ルーチンを作成してデプロイします。(参考:https://cloud.google.com/ai-platform/prediction/docs/custom-prediction-routines?hl=ja) model_training_pipeline.py # Deploying the model with kfp.dsl.Condition(eval_model.outputs['metric_value'] < evaluation_metric_threshold): model_uri = '{}/predict'.format(train_model.outputs["job_dir"]) deploy_model = mlengine_deploy_op( model_uri=model_uri, project_id=project_id, model_id=model_id, version_id=version_id, model = {"regions": [region], # "onlinePredictionLogging": True, # 同名のモデルが既にあると、デプロイ時にエラーが出るのでコメントアウトします。 "onlinePredictionConsoleLogging": True}, version = {"packageUris": ["gs://##########-kubeflowpipelines-default/staging/dist/my_custom_code-0.1.tar.gz"], "predictionClass": "predictor.MyPredictor"}, runtime_version=RUNTIME_VERSION, python_version=PYTHON_VERSION, replace_existing_version=replace_existing_version)  カスタム予測ルーチンは次のように実行されます。生のデータのDate列をtimestamp型へ、wind_direction列を数値型へと変換し、モデルに保存されている前処理層で変換します。変換後の特徴量をモデルへとインプットすることで、予測が返されます。 custom_prediction/predictor.py import os import pickle import numpy as np import tensorflow as tf import preprocess class MyPredictor(object): def __init__(self, model): self._model = model def predict(self, instances, **kwargs): preprocessed_inputs = {} for i in instances: for k, v in i.items(): if k not in preprocessed_inputs.keys(): preprocessed_inputs[k] = [v] else: preprocessed_inputs[k].append(v) preprocessed_inputs["Date"] = [preprocess.convert_to_timestamp(i) for i in preprocessed_inputs["Date"]] preprocessed_inputs["wind_direction"] = [preprocess.direction_to_degree(i) for i in preprocessed_inputs["wind_direction"]] preprocessed_inputs = { k: tf.reshape(np.array(v, dtype=np.float32), shape=(-1, 24)) for k, v in preprocessed_inputs.items() } transformed_inputs = self._model.preprocessing_layer(preprocessed_inputs) outputs = self._model.predict(transformed_inputs, steps=1).reshape(-1, 24) return outputs.tolist() @classmethod def from_path(cls, model_dir): model = tf.keras.models.load_model(model_dir) return cls(model)  モデルのデプロイができたら、オンライン予測ジョブを送信して、求める予測が返ってくるかを確かめます。 kfp-pipeline.ipynb from oauth2client.client import GoogleCredentials from googleapiclient import discovery from googleapiclient import errors service = discovery.build('ml', 'v1') def predict_json(project, model, instances, version=None): name = 'projects/{}/models/{}'.format(project, model) if version is not None: name += '/versions/{}'.format(version) response = service.projects().predict( name=name, body={'instances': instances} ).execute() if 'error' in response: raise RuntimeError(response['error']) return response['predictions']  インプットには、こちらのサンプルデータを使います。24個の数字のリストが返されます。 kfp-pipeline.ipynb import json VERSION_ID = "v01" instances = {} with open("daily_data.json", mode="r") as f: instances = json.load(f) predict_json(PROJECT_ID, MODEL_ID, instances, VERSION_ID) Pipelineの実行 使用ファイル: - kfp-pipeline.ipynb - cloudbuild.yaml  それでは、以上のKubeflow PipelinesをCloud AI Platformのノートブックから実行をします。ENDPOINTは次のようにして確認できます。ARTIFACT_STORE_URIは、クラスターの作成時に自動でCloud Storageに作成されるバケットを設定します。 kfp-pipeline.ipynb REGION = 'XXXXXXXXXXX' #change your code ENDPOINT = 'XXXXXXXXXX.pipelines.googleusercontent.com' #change your code ARTIFACT_STORE_URI = 'gs://XXXXXX-kubeflowpipelines-default' #change your code PROJECT_ID = !(gcloud config get-value core/project) PROJECT_ID = PROJECT_ID[0] このノートブックを順に実行することで、パイプラインが実行されます。学習が終了してモデルが保存されたら、saved_model_cliコマンドを使って、モデルの中身を確認できます。saved_modelの扱いに関してはこちら。 %%bash saved_model_cli show --dir gs://model_dir --all CI/CDの設定 使用ファイル: - kfp-pipeline.ipynb - cloudbuild.yaml  ここでは、リモートレポジトリへの新しいタグのpushをトリガーとして、パイプラインを自動実行するための設定を行います。前項において手動で実行していたdockerファイルのビルドなどの作業をyamlファイルにまとめて記述します。build構成ファイルの記述はこちらが参考になります。  "_"から始まる変数は、ユーザー定義の変数で後から置換するものを表します。$PROJECT_IDや$TAG_NAMEといった変数は自動で置換されます。(参考:https://cloud.google.com/build/docs/configuring-builds/substitute-variable-values?hl=ja) cloudbuild.yaml steps: # Build the trainer image - name: 'gcr.io/cloud-builders/docker' args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_TRAINER_IMAGE_NAME:$TAG_NAME', '.'] dir: $_PIPELINE_FOLDER/docker_images/trainer_image id: 'Build the trainer image' # Build the transform image - name: 'gcr.io/cloud-builders/docker' args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_TRANSFORM_IMAGE_NAME:$TAG_NAME', '.'] dir: $_PIPELINE_FOLDER/docker_images/transform_image id: 'Build the transform image' # Build the evaluate image - name: 'gcr.io/cloud-builders/docker' args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_EVALUATE_IMAGE_NAME:$TAG_NAME', '.'] dir: $_PIPELINE_FOLDER/docker_images/evaluate_image id: 'Build the evaluate image' # Build the base image for lightweight components - name: 'gcr.io/cloud-builders/docker' args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_BASE_IMAGE_NAME:$TAG_NAME', '.'] dir: $_PIPELINE_FOLDER/docker_images/base_image id: 'Build the base image' # Build the base image for lightweight components - name: 'gcr.io/cloud-builders/docker' args: ['build', '-t', 'gcr.io/$PROJECT_ID/kfp-cli:latest', '.'] dir: $_PIPELINE_FOLDER/docker_images/kfp-cli id: 'Build the kfp-cli' # Compile the pipeline - name: 'gcr.io/$PROJECT_ID/kfp-cli' args: - '-c' - | dsl-compile --py $_PIPELINE_DSL --output $_PIPELINE_PACKAGE env: - 'BASE_IMAGE=gcr.io/$PROJECT_ID/$_BASE_IMAGE_NAME:$TAG_NAME' - 'TRAINER_IMAGE=gcr.io/$PROJECT_ID/$_TRAINER_IMAGE_NAME:$TAG_NAME' - 'TRANSFORM_IMAGE=gcr.io/$PROJECT_ID/$_TRANSFORM_IMAGE_NAME:$TAG_NAME' - 'EVALUATE_IMAGE=gcr.io/$PROJECT_ID/$_EVALUATE_IMAGE_NAME:$TAG_NAME' - 'RUNTIME_VERSION=$_RUNTIME_VERSION' - 'PYTHON_VERSION=$_PYTHON_VERSION' - 'COMPONENT_URL_SEARCH_PREFIX=$_COMPONENT_URL_SEARCH_PREFIX' dir: $_PIPELINE_FOLDER/pipeline id: 'Compile the Pipeline' # Upload the pipeline - name: 'gcr.io/$PROJECT_ID/kfp-cli' args: - '-c' - | kfp --endpoint $_ENDPOINT pipeline upload -p ${_PIPELINE_NAME}_$TAG_NAME $_PIPELINE_PACKAGE dir: $_PIPELINE_FOLDER/pipeline id: 'Upload the Pipeline' # Deploy the pipeline in KFP - name: 'gcr.io/$PROJECT_ID/kfp-cli' args: - '-c' - | kfp --endpoint $_ENDPOINT run submit \ -e $_EXPERIMENT_NAME \ -r $_RUN_ID \ -p `kfp --endpoint $_ENDPOINT pipeline list | grep -w ${_PIPELINE_NAME}_$TAG_NAME | grep -E -o -e "([a-z0-9]){8}-([a-z0-9]){4}-([a-z0-9]){4}-([a-z0-9]){4}-([a-z0-9]){12}"` \ project_id=$PROJECT_ID \ gcs_root=$_GCS_STAGING_PATH \ region=$_REGION \ source_table_name=$_SOURCE_TABLE \ num_epochs_hypertune=$_NUM_EPOCHS_HYPERTUNE \ num_epochs_retrain=$_NUM_EPOCHS_RETRAIN \ num_units=$_NUM_UNITS \ evaluation_metric_name=$_EVALUATION_METRIC \ evaluation_metric_threshold=$_EVALUATION_METRIC_THRESHOLD \ model_id=$_MODEL_ID \ version_id=$TAG_NAME \ replace_existing_version=$_REPLACE_EXISTING_VERSION dir: $_PIPELINE_FOLDER/pipeline id: 'Deploy and Run the Pipeline' waitFor: ['Upload the Pipeline'] # Push the images to Container Registry images: - gcr.io/$PROJECT_ID/$_TRAINER_IMAGE_NAME:$TAG_NAME - gcr.io/$PROJECT_ID/$_TRANSFORM_IMAGE_NAME:$TAG_NAME - gcr.io/$PROJECT_ID/$_EVALUATE_IMAGE_NAME:$TAG_NAME - gcr.io/$PROJECT_ID/$_BASE_IMAGE_NAME:$TAG_NAME # Changing the timeout threshold timeout: 3600s 手動でのビルドを実行してみましょう。yamlファイル内の変数値の置換は、実行時にsubsutitutionsで指定します。 kfp-pipeline.ipynb SUBSTITUTIONS=""" _ENDPOINT={},\ _TRAINER_IMAGE_NAME=trainer_image,\ _TRANSFORM_IMAGE_NAME=transform_image,\ _EVALUATE_IMAGE_NAME=evaluate_image,\ _BASE_IMAGE_NAME=base_image,\ TAG_NAME=v01,\ _PIPELINE_FOLDER=.,\ _PIPELINE_DSL=model_training_pipeline.py,\ _PIPELINE_PACKAGE=model_training_pipeline.yaml,\ _PIPELINE_NAME=weather_forecast_continuous_training,\ _RUNTIME_VERSION=2.5,\ _PYTHON_VERSION=3.7,\ _COMPONENT_URL_SEARCH_PREFIX=https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/gcp/,\ \ _EXPERIMENT_NAME=Weather_Forecast_Training,\ _RUN_ID=Run_001,\ _GCS_STAGING_PATH=gs://##########-kubeflowpipelines-default/staging,\ _REGION=asia-northeast1,\ _SOURCE_TABLE=weather_data.tokyo,\ _NUM_EPOCHS_HYPERTUNE=1,\ _NUM_EPOCHS_RETRAIN=1,\ _NUM_UNITS=128,\ _EVALUATION_METRIC=mse,\ _EVALUATION_METRIC_THRESHOLD=10.0,\ _MODEL_ID=weather_forecast,\ _REPLACE_EXISTING_VERSION=True """.format(ENDPOINT).strip() # 手動ビルド !gcloud builds submit . --config cloudbuild.yaml --substitutions {SUBSTITUTIONS}  それでは、Cloud Buildの設定を行い、パイプラインの実行の自動化を実現します。こちらのドキュメントを参考に進めます。今回は、リポジトリへの新しいタグのpushをトリガーとして、cloudbuild.yamlファイルが読み込まれ、タスクが実行されます。  最後に、cloudbuild.yamlへの代入変数をリストアップします。  シェルで以下のコマンドを実施すると、パイプラインの実行が始まります。 Cloud Buildの履歴から実行のログが確認できます。 git tag (tag name) git push origin main --tag 自動スクレイピング 使用ファイル: - scheduled_scrayper/main.py - scheduled_scrayper/requirements.txt こちらのプログラムによって、BigQueryのテーブルに1日1回のレコードの追加が行われます。また同時に、翌日の気温の予測を行うためのオンライン予測ジョブをが実行され、予測値はCloud Storageへと保存されます。ここでは、このプログラムが定期実行されるようにCloud FunctionsとCloud Schedulerの設定を行います。こちらの記事通りに設定していきます。Cloud Schedulerの設定で頻度を次のように指定します。これは、実行を1日1回午前2時にスケジュールすることを意味します。 参考: - https://cloud.google.com/storage/docs/uploading-objects?hl=ja#storage-upload-object-code-sample - https://cloud.google.com/ai-platform/prediction/docs/online-predict?hl=ja 終わりに  今回は、Kubeflow Pipelinesを用いた機械学習パイプラインの自動化を実践しました。完全なパイプラインには程遠い部分もありますが、新しい要素の追加も比較的容易に行えるようになっていると思います。  また、実務未経験者の記事なので間違いなどがございましたら、ご指摘いただけると幸いです。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Dockerで環境を作ってLaravelプロジェクトを作るまで(Windows)

WSL2のおかげかWindowsでもDockerが楽に動かせるようになったので。 とりあえずPHP環境を作って、そこにLaravelをインストールする感の流れを書いておきます。 動作環境 Windows10 VisualStudioCode Docker WSL2のインストール docker関連のファイルを準備 以下のようなファイルを準備。 記述内容は各自好きな感じに変えるといいと思います。 困ったらこちらでも読めば解決すると思います。 https://docs.docker.com/compose/ docker-compose.yml version: '3' services: laravel: container_name: laravel build: context: . dockerfile: ./docker/php/Dockerfile volumes: - ./laravel:/var/www/html/laravel ports: - "8080:80" # - "443:443" mysql: container_name: mysql build: context: . dockerfile: ./docker/mysql/Dockerfile environment: - MYSQL_ROOT_PASSWORD=password - MYSQL_USER=laravel - MYSQL_PASSWORD=password - MYSQL_DATABASE=test volumes: - ./mysql:/var/lib/mysql ports: - 3306:3306 docker/php/Dockerfile FROM php:8.0-apache RUN apt-get update \ && apt-get install -y git unzip \ && docker-php-ext-install pdo_mysql \ && a2enmod rewrite docker/mysql/Dockerfile FROM mysql:5.7 VSCodeからDockerの拡張機能を導入 ms-azuretools.vscode-dockerをインストール。 Dockerの起動 起動していないとエラーになるので忘れずに。 VSCode上でdocker-compose.ymlを使って動かす 拡張機能のDockerを入れておけば、docker-compose.ymlを右クリックした際にメニューがいくつか追加されてます。 右クリック > Compose RestartでLaravel用とMySQL用のDockerのコンテナが2つ立ち上がります。 composerでLaravelのインストール composerが入っている環境でコマンドを実行します。 (Dockerfileにcomposerを導入するように書き換えておくと楽かも) 以下のコマンドでlaravelがインストールされます。 バージョンは各自好きなものを指定すれば良いと思います。 cd /var/www/html composer create-project "laravel/laravel=8.5.*" laravel 構築完了 http://localhost:8080/laravel/publicにアクセスすれば、LaravelのWelcomeページが出ます。 ここからいろいろ作り始める感じですね。 URLが気になる方はDocumentRootとか変えると良いかも。 Failed to open stream: Permission deniedと出た時 The stream or file "/var/www/html/laravel/storage/logs/laravel.log" could not be opened in append mode: Failed to open stream: Permission denied よくある権限エラーです。 storageフォルダの権限を変えてしまいましょう。 chown -R www-data:www-data laravel/storage # もしくは chmod -R 777 laravel/storage
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む