20200722のPythonに関する記事は30件です。

Auto-sklearn2.0試してみた

はじめに

2020年7月8日にAuto-sklearn2.0の論文( https://arxiv.org/abs/2007.04074 )がarXiv上に公開されました。
今までAuto-sklearnの存在は知っていたものの、使ったことがなかったのでこれを機に試してみました。

論文のななめ読み

著者はFreiburg-Hannoverのグループ( https://www.automl.org/ )の方々。
論文によるとAuto-sklearn2.0のアップデート内容は、モデル選択のストラテジーの改善、ポートフォリオの構築、自動ポリシー選択とのことですが、詳細は論文を参照してください。
(モデル選択、ハイパラチューニング、前処理部分含めた全体の構成の3点を改善したと理解しましたが、チラ見程度なので間違えていたらすみません)

やること

あるデータに対して、Auto-sklearn1.0、Auto-sklearn2.0をそれぞれ実行して、予測精度がどうなるのか比較してみたいと思います。
また、Auto-sklearnの中で閉じるのは面白くないので、同じデータに対してAuto-Gluonでも予測させて精度がどうなるのか見てみたいと思います。

環境

今回はGoogle Colaboratoryを使用します(GPUは使っていません)。

実施した内容

Auto-sklearnのインストール

https://automl.github.io/auto-sklearn/master/installation.html
こちらに記載されているように以下の内容を実行します。
2020年7月22日時点でインストール時にエラーなどは起こりませんでした。

!sudo apt-get install build-essential swig
!curl https://raw.githubusercontent.com/automl/auto-sklearn/master/requirements.txt | xargs -n 1 -L 1 pip install
!pip install auto-sklearn

https://automl.github.io/auto-sklearn/master/
こちらにAuto-sklearn2.0のインポート方法が記載されているので、以下のように実行します。
なお、Github( https://github.com/automl/auto-sklearn )の中身を見ると、Auto-sklearn2.0のRegressorは見当たりませんでした。今後のアップデートに期待です。

import sklearn.model_selection
import sklearn.datasets
import sklearn.metrics
from sklearn import preprocessing

from autosklearn.experimental.askl2 import AutoSklearn2Classifier

データのロード

Auto-sklearn2.0がClassifierしかないので、公式のexample( https://automl.github.io/auto-sklearn/master/examples/example_feature_types.html )を参考にしました。
データはこちらにある、収入が一定の額以上か否かを当てるタスクの物を使用しています。

X, y = sklearn.datasets.fetch_openml(data_id=179, return_X_y=True)

# y needs to be encoded, as fetch openml doesn't download a float
y = preprocessing.LabelEncoder().fit_transform(y)

X_train, X_test, y_train, y_test = \
     sklearn.model_selection.train_test_split(X, y, random_state=1)

# Create feature type list from openml.org indicator and run autosklearn
data = sklearn.datasets.fetch_openml(data_id=179, as_frame=True)
feat_type = ['Categorical' if x.name == 'category' else 'Numerical' for x in data['data'].dtypes]

Auto-sklearn2.0の実行

今回はお試しなので、時間を5分に設定して次のように実行しました。

%%time 

cls = AutoSklearn2Classifier(
    time_left_for_this_task=300,
    seed=1,
    metric=autosklearn.metrics.log_loss
)
cls.fit(X_train, y_train, feat_type=feat_type)
#CPU times: user 16.8 s, sys: 557 ms, total: 17.3 s
#Wall time: 4min 56s

ただし、このまま実行すると以下のwarningが大量に表示されました。

/usr/local/lib/python3.6/dist-packages/sklearn/base.py:197: FutureWarning: From version 0.24, get_params will raise an AttributeError if a parameter cannot be retrieved as an instance attribute. Previously it would return None.
  FutureWarning)

とくに問題ないと思いますが、もし気になる場合は、warningsで無視するといいかもしれません。

import warnings
warnings.simplefilter('ignore')

AccuracyとAUCで予測精度を出すと次のような結果になりました。

predictions = cls.predict(X_test)
print("Accuracy score ", sklearn.metrics.accuracy_score(y_test, predictions))
print("AUC ", sklearn.metrics.roc_auc_score(y_test, predictions))
#Accuracy score  0.8585701416755385
#AUC  0.7749035627902573

Auto-sklearn1.0の実行

従来のAuto-sklearnを使う場合は、import autosklearn.classificationでライブラリをインポートして、同じように実行します。

%%time 

cls = autosklearn.classification.AutoSklearnClassifier(
    time_left_for_this_task=300,
    per_run_time_limit=30,
    seed=1,
    metric=autosklearn.metrics.log_loss
)
cls.fit(X_train, y_train, feat_type=feat_type)
#CPU times: user 5.77 s, sys: 1.08 s, total: 6.86 s
#Wall time: 4min 55s
predictions = cls.predict(X_test)
print("Accuracy score ", sklearn.metrics.accuracy_score(y_test, predictions))
print("AUC ", sklearn.metrics.roc_auc_score(y_test, predictions))
#Accuracy score  0.8490705101957252
#AUC  0.7580882714036629

今回はモデル構築時にCross-validationをしていないので、もう少し慎重に比較しないといけないところですが、単純に数値だけ比較するとAuto-sklearn2.0の方が良い結果となりました。

Auto-Gluonの実行

Auto-Gluonについての詳細は別の記事( https://qiita.com/dyamaguc/items/dded739f35e59a6491c8 )を参照してください。
Auto-Gluonの場合は、インプットのデータにnumpyndarrayを指定できないため、pandasDataFrameに変換する処理を追加しています。
それ以外は基本的に同じにしています。

X, y = sklearn.datasets.fetch_openml(data_id=179, return_X_y=True)

# y needs to be encoded, as fetch openml doesn't download a float
y = preprocessing.LabelEncoder().fit_transform(y)

X_train, X_test, y_train, y_test = \
     sklearn.model_selection.train_test_split(X, y, random_state=1)

X_train_ = pd.DataFrame( X_train )
y_train_ = pd.DataFrame(y_train, columns=['class'])
train_data = pd.concat( [X_train_, y_train_ ], axis=1)

auto-stackを使用する設定として、実行します。

%%time

long_time = 5*60 # for quick demonstration only, you should set this to longest time you are willing to wait
dir = 'agModels-predictClass-autostack' # specifies folder where to store trained models
predictor = task.fit(
    train_data=train_data, 
    label='class', 
    auto_stack=True, 
    output_directory = dir,
    eval_metric='log_loss',
    time_limits=long_time)
#CPU times: user 9min 21s, sys: 11.3 s, total: 9min 32s
#Wall time: 5min 15s
import sklearn.metrics
X_test_ = pd.DataFrame( X_test )
predictions = predictor.predict(X_test_)
print("Accuracy score ", sklearn.metrics.accuracy_score(y_test, predictions))
print("AUC ", sklearn.metrics.roc_auc_score(y_test, predictions))
#Accuracy score  0.8609450495454918
#AUC  0.780458426444995

今回のデータセットと設定では、Auto-Gluonが一番良い結果となりました。

まとめ

今回はAuto-sklearn2.0に触れて見ましたが、この手のAutoMLは本当に楽で、そこそこの結果を出してくれるので、便利だなと思いました。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Raspberry PiでMinecraftプログラミングやろうとした

環境

トラブルシュート

現象

Minecraft-piは起動されず、何も起こらない。試しに、minecraft-piコマンドを手動実行すると、次のエラーが表示された。

error while loading shared libraries: libGLESv2.so: cannot open shared object file: No such file or directory

原因と対策

次のサイトの記事を参考に、Raspberry Piのファームウェアをアップデートすることで解消した。
※参考:Games that uses libGLESv2.so doen't work in raspbian stretch

sudo rpi-update
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

MacでPythonの開発環境構築

#pyenvのインストール

pyenvとは

pythonのバージョンを簡単に切り替えるて利用できるようにするツールです

Homebrewでpyenvをインストールする場合

% brew install pyenv

gitでpyenvをインストールする場合

% git clone git://github.com/yyuu/pyenv.git ~/.pyenv

pyenvを利用する前にターミナルで利用するシェルの確認

% echo $SHELL

/bin/bash か /bin/zsh が表示される

bashの場合

% vi ~/.bash_profile
export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
if command -v pyenv 1>/dev/null 2>&1; then
  eval "$(pyenv init -)"
fi

% source ~/.bash_profile

zshの場合

% vi ~/.zshrc
export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
if command -v pyenv 1>/dev/null 2>&1; then
  eval "$(pyenv init -)"
fi

% zsh -l

pyenvでpythonをインストールする

インストール可能なpythonのバージョンを確認し、インストールする

% pyenv install --list
% pyenv install 3.8.3
% pyenv install 3.8.2
% pyenv versions

pythonをインストールしてきた後はshimのリフレッシュをしておく。

% pyenv rehash

特定のディレクトリ配下で利用するpythonのバージョンを設定する

% pyenv local 3.8.2
% ls -lah
.python-version
% cat .python-version
3.8.2

全体で利用するpythonのバージョンを設定する

% pyenv global 3.8.3

バージョンの確認

% pyenv version

アンインストール

pyenv uninstall 3.8.2

pipの利用

pipとは

python標準のパッケージはたいていpythonをインストールする時点で
自動的にインストールされます。
サードパーティのパッケージは別途インストールが必要で、
PyPIという下記のサイトで配布されています。
https://pypi.org/
このサードパーティが配布しているパッケージをインストールするために、
pipを使ってパッケージの管理が行います。

pipのバージョンを確認する

% python -m pip list
Package    Version
---------- -------
pip        19.2.3
setuptools 41.2.0
WARNING: You are using pip version 19.2.3, however version 20.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

pipのバージョンアップ

% python -m pip install pip --upgrade
Collecting pip
  Using cached https://files.pythonhosted.org/packages/43/84/23ed6a1796480a6f1a2d38f2802901d078266bda38388954d01d3f2e821d/pip-20.1.1-py2.py3-none-any.whl
Installing collected packages: pip
  Found existing installation: pip 19.2.3
    Uninstalling pip-19.2.3:
      Successfully uninstalled pip-19.2.3
Successfully installed pip-20.1.1

pipenvの利用

pipenvとは

Pipenvは、Python公式が正式に推薦する依存関係管理・パッケージングのためのツールです。

pipenvのインストール

% pip install pipenv

pipenvの利用方法

プロジェクト「project1」を作成する

% mkdir /develop/python/tutorial/project1
% cd /develop/python/tutorial/project1

プロジェクトでpython3.8.3を利用するとする

% pipenv --python 3.8.3
% ls -lah
 Pipfile

numpyパッケージをインストールする

% pipenv install numpy
% ls -lah
 Pipfile
 Pipfile.lock

Pipfileに利用するパッケージの情報が書かれている

% cat Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
numpy = "*"

[requires]
python_version = "3.8"

Pipfile.lockにインストール済みパッケージの詳細が書き込まれる

% cat Pipfile.lock
{
    "_meta": {
        "hash": {
            "sha256": "b59842a2e4aca58430e4c67380e4d495bd0cb8b31d65c29e51235c48f9456e4b"
        },
        "pipfile-spec": 6,
        "requires": {
            "python_version": "3.8"
        },
        "sources": [
            {
                "name": "pypi",
                "url": "https://pypi.org/simple",
                "verify_ssl": true
            }
        ]
    },
    "default": {
        "numpy": {
            "hashes": [
                "sha256:13af0184177469192d80db9bd02619f6fa8b922f9f327e077d6f2a6acb1ce1c0",
                "sha256:26a45798ca2a4e168d00de75d4a524abf5907949231512f372b217ede3429e98",
                "sha256:26f509450db547e4dfa3ec739419b31edad646d21fb8d0ed0734188b35ff6b27",
                "sha256:30a59fb41bb6b8c465ab50d60a1b298d1cd7b85274e71f38af5a75d6c475d2d2",
                "sha256:33c623ef9ca5e19e05991f127c1be5aeb1ab5cdf30cb1c5cf3960752e58b599b",
                "sha256:356f96c9fbec59974a592452ab6a036cd6f180822a60b529a975c9467fcd5f23",
                "sha256:3c40c827d36c6d1c3cf413694d7dc843d50997ebffbc7c87d888a203ed6403a7",
                "sha256:4d054f013a1983551254e2379385e359884e5af105e3efe00418977d02f634a7",
                "sha256:63d971bb211ad3ca37b2adecdd5365f40f3b741a455beecba70fd0dde8b2a4cb",
                "sha256:658624a11f6e1c252b2cd170d94bf28c8f9410acab9f2fd4369e11e1cd4e1aaf",
                "sha256:76766cc80d6128750075378d3bb7812cf146415bd29b588616f72c943c00d598",
                "sha256:7b57f26e5e6ee2f14f960db46bd58ffdca25ca06dd997729b1b179fddd35f5a3",
                "sha256:7b852817800eb02e109ae4a9cef2beda8dd50d98b76b6cfb7b5c0099d27b52d4",
                "sha256:8cde829f14bd38f6da7b2954be0f2837043e8b8d7a9110ec5e318ae6bf706610",
                "sha256:a2e3a39f43f0ce95204beb8fe0831199542ccab1e0c6e486a0b4947256215632",
                "sha256:a86c962e211f37edd61d6e11bb4df7eddc4a519a38a856e20a6498c319efa6b0",
                "sha256:a8705c5073fe3fcc297fb8e0b31aa794e05af6a329e81b7ca4ffecab7f2b95ef",
                "sha256:b6aaeadf1e4866ca0fdf7bb4eed25e521ae21a7947c59f78154b24fc7abbe1dd",
                "sha256:be62aeff8f2f054eff7725f502f6228298891fd648dc2630e03e44bf63e8cee0",
                "sha256:c2edbb783c841e36ca0fa159f0ae97a88ce8137fb3a6cd82eae77349ba4b607b",
                "sha256:cbe326f6d364375a8e5a8ccb7e9cd73f4b2f6dc3b2ed205633a0db8243e2a96a",
                "sha256:d34fbb98ad0d6b563b95de852a284074514331e6b9da0a9fc894fb1cdae7a79e",
                "sha256:d97a86937cf9970453c3b62abb55a6475f173347b4cde7f8dcdb48c8e1b9952d",
                "sha256:dd53d7c4a69e766e4900f29db5872f5824a06827d594427cf1a4aa542818b796",
                "sha256:df1889701e2dfd8ba4dc9b1a010f0a60950077fb5242bb92c8b5c7f1a6f2668a",
                "sha256:fa1fe75b4a9e18b66ae7f0b122543c42debcf800aaafa0212aaff3ad273c2596"
            ],
            "index": "pypi",
            "version": "==1.19.0"
        }
    },
    "develop": {}
}

開発環境用のパッケージをインストールする

$ pipenv install --dev ptvsd pytest

パッケージをアップデートする

$ pipenv update pytest

パッケージをアンインストールする

$ pipenv uninstall --all

開発用のパッケージのみアンインストールする

$ pipenv uninstall --all-dev

PipfileとPipfile.lockの再利用

Pipfileの利用

Pipfileをもとにパッケージをインストールします。
このときにインストールした内容でPipfile.lockが更新されます.

$ pipenv install

開発用のパッケージもインストールしたい場合は--devをつける

$ pipenv install --dev 

Pipfile.lockの利用

Pipfile.lockをもとにパッケージをインストールします。

$ pipenv sync

開発用のパッケージもインストールしたい場合は--devをつける

$ pipenv sync --dev 
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

pythonを利用したS3とEC2内一時ディレクトリのファイルやり取り

はじめに

オンプレ環境のGPUを持っていないので、私はAWSでGPUインスタンスを立ててディープラーニングを行っている。
学習データを利用する際、EC2内に一時ディレクトリを作成して、そこにS3から学習データをダウンロードして使うことで、以下のような利点が得られるんじゃないかと思ったので、紹介する。

EC2インスタンス上にデータを入れっぱなしにしておくよりも、プログラムの実行が終わった際にEC2インスタンスからデータが消去される、かつS3に出力結果が上がるため、セキュリティの強化、出力結果紛失の危険性低減、EBS(ストレージ)の節約になる。

この記事では、EC2インスタンス内に一時ディレクトリを作成し、そこにS3からファイルをダウンロードする。
そして、一時ディレクトリからS3へのアップロードするまでを目指す。

必要作業

  • EC2にIAMロールの設定
  • pythonモジュールのインストール(python3を利用)
  • コードの記述

IAMロールの設定

ロールの作成

①AWSマネジメントコンソールからIAMのページに移動
②左のメニュー一覧からロールを選択
③ロールを作成を選択

_2020-07-17_17.28.55.png

④ユースケースはEC2を選択
⑤次のステップへ

_2020-07-17_17.35.44.png

⑥検索窓でS3と入力し検索
⑦AmazonS3FullAccessにチェックマークを入れる
⑧次のステップへ

_2020-07-17_17.42.51.png

⑨タグは設定してもしなくてもどちらでも良いが、複数人でAWSを利用している場合は、管理のしやすさのため、Userのみ設定しておくと良い
⑩次のステップへ

_2020-07-17_17.46.21.png

⑪任意のロール名をつける(ロール名から何ができるロールなのかわかるようにしておくと楽)
⑫ロールの作成

_2020-07-17_17.50.16.png

EC2インスタンスへのIAMロールの割当

EC2インスタンスを作成する際に先程作成したIAMロールを割り当てる

※インスタンス作成時にしかロールの割当できないかも?作成後にロールが割り当てられるかは要検証

_2020-07-17_18.04.46.png

pythonモジュールのインストール

必要モジュール

AWSモジュール
- boto3
一時ディレクトリモジュール
- tempfile(python標準モジュールのためインストール不要)

インストールコマンド

pip install boto3

コードの記述

以下のコードをノートブックファイルや.pyファイルにコピペすればダウンロード及びアップロードの完了

ダウンロード元やアップロード先の設定

import boto3
import tempfile

#ダウンロード元バケット名
bucket_name = 'hogehoge'
#アップロード先バケット名(今回はダウンロード先と同じ)
out_bucket_name = bucket_name

#バケットにある動画ファイル名
file_name = 'input.mp4'

#アップロード先のフォルダ名
#下記の例だとS3resultフォルダが作成され、そこに出力される
out_folder_name = 'result'

一時ディレクトリの作成及びS3からのダウンロード

#一時保存用ディレクトリの作成
tmpdir = tempfile.TemporaryDirectory()
tmp = tmpdir.name + '/'

#一時的な入力ファイル名(適当な名前で固定)
#固定された名前で一時フォルダにダウンロードされる
tmp_input = tmp + 'tmp.mp4'

#S3からファイルをダウンロード
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.download_file(file_name, tmp_input)

ダウンロードされたファイルの使用例

以下の例はopencvで読み込みをし、何も処理せず一時ディレクトリに別名保存している

(一時ディレクトリにtmp.mp4とoutput.mp4がある状態)

import cv2

#一時ディレクトリに保存するビデオ名の設定
output_video =tmp + 'output.mp4'

#動画ファイルを読み込む
video = cv2.VideoCapture(tmp_input)

# 幅と高さを取得
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
size = (width, height)

#総フレーム数を取得
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

#フレームレート(1フレームの時間単位はミリ秒)の取得
frame_rate = int(video.get(cv2.CAP_PROP_FPS))

# 保存用
fmt = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
writer = cv2.VideoWriter(output_video, fmt, frame_rate, size)

for i in range(frame_count):
    ret, frame = video.read()
    ### ここに加工処理などを記述する ###
    writer.write(frame)

writer.release()
video.release()
cv2.destroyAllWindows()#ビデオの読み込み

S3へのアップロード

以下の例は先に設定したS3アップロード先にoutput_video.mp4という名前で上で作成したビデオがアップロードされている。

s3.Bucket(out_bucket_name).upload_file(output_video,out_folder_name+'/'+ 'output_video.mp4')

一時ディレクトリの削除

tmpdir.cleanup()

まとめ

それぞれの技術についてもっと細かく解説しているサイトはたくさんあると思うが、それをどうやってうまく組み合わせるのかが意外と難しいと思いこの記事を作成した。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

IAM認証のAWS API GatewayにPythonからSigV4署名してアクセスするには

AWSのAPI GatewayのAPIのメソッドの設定で、IAM認証を使っている場合、APIリクエスト時に署名が必要です。

API Gatewayのメソッド設定画面はこんな感じです。英語のマネジメントコンソールだと「Authorization」、日本語だと「認可」という欄です。

image.png

デフォルトはNONEという設定になっていますが、これをAWS_IAMにするとAPI Gatewayは届いたリクエストの署名を確認するようになります。リクエストする側が署名をする必要があります。

API Gatewayが署名を確認するようになるとAPI GatewayのリソースポリシーにてIAMユーザやIAMロールで制限をかけられるようになります。

Pythonにて署名を付けてAPIリクエストするサンプルコードを書いておきます。

前提

IAMユーザのアクセスキーが ~/.aws/credentials に設定されていて、そのIAMユーザからIAMロールにスイッチするIAM権限があり、そのIAMロールでAPI Gatewayにアクセスするものとします。(~/.aws/credentials があればよいので、以下のサンプルコードはGCPのCompute Engineインスタンスで動かしました)

Pythonコード

import boto3
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
import urllib.request
import sys

endpoint_host = "xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com"
endpoint = "https://" + endpoint_host + "/stage"
path = "/hello"
url = endpoint + path

# .aws/credentials または環境変数でアクセスキーが設定されているIAMユーザのセッションを生成。
session = boto3.session.Session()
# .aws/credentials に複数のIAMユーザがある場合は profile_name を指定。
#session = boto3.session.Session(profile_name = "foo")

sts_client = session.client("sts")

# IAMロールのARNを指定して assume_role 。
# IAMロールが不要でIAMユーザのままでAPIリクエストする場合は、ここは不要。
assume_role_response = sts_client.assume_role(
    RoleArn = "arn:aws:iam::XXXXXXXXXXXX:role/ROLENAME",
    RoleSessionName = "test")

# assume_role で得られたトークンなどからIAMロールでのセッションを生成。
# IAMロールが不要でIAMユーザのままでAPIリクエストする場合は、ここは不要。
session = boto3.session.Session(
    aws_access_key_id = assume_role_response['Credentials']['AccessKeyId'],
    aws_secret_access_key = assume_role_response['Credentials']['SecretAccessKey'],
    aws_session_token = assume_role_response['Credentials']['SessionToken'])

# セッション情報からAPIリクエストに署名。
credentials = session.get_credentials()
awsreq = AWSRequest(method = "GET", url = url)
SigV4Auth(credentials, "execute-api", "ap-northeast-1").add_auth(awsreq)

# urllib.request.Request 生成。
# この4つのリクエストヘッダーが必須。
# IAMロールが不要でIAMユーザのままでAPIリクエストする場合は、X-Amz-Security-Token は不要。
req = urllib.request.Request(url, headers = {
    "Authorization": awsreq.headers['Authorization'],
    "Host": endpoint_host,
    "X-Amz-Date": awsreq.context['timestamp'],
    "X-Amz-Security-Token": assume_role_response["Credentials"]["SessionToken"]
})

# APIリクエスト実行
try:
    with urllib.request.urlopen(req) as response:
        # レスポンス出力
        sys.stdout.buffer.write(response.read())
except urllib.error.HTTPError as err:
    # 403などの場合はここに到達
    # エラーを出力
    print(err)
    # レスポンスヘッダを出力
    print(err.headers)

エラーメッセージの例

IAMユーザに assume_role する権限がないと assume_role を呼び出したところで

botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the AssumeRole operation

という例外が発生します。

X-Amz-Security-Token が必要なのに足りてないと、403 Forbidden が返され、レスポンスヘッダに次のように書かれます。

x-amzn-ErrorType: UnrecognizedClientException

API GatewayのリソースポリシーでこのIAMロールからのリクエストを拒否していると、403 Forbidden が返され、レスポンスヘッダに次のように書かれます。

x-amzn-ErrorType: AccessDeniedException

なお、IAMロールにAPI GatewayにアクセスするIAM権限がなくても、API Gatewayのリソースポリシーで許可していれば、AccessDeniedException にはならずに正常に処理できるようです。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PyCharm+Djangoでの開発環境の書き分け方の一例

開発環境は通常マシンごとに

# 開発環境ラベル
export DEPLOY_ENV=<dev/stg/prd>

のように開発環境を記述する環境変数を書き分けると便利だが、PyCharmを用いて開発するときには、楽をしようとするとちょっとややこしい。そこで備忘録を残しておく。

PyCharmで開発環境ラベルが必要になる状況は、以下2つのパターンがある。

  1. 画面からRun/Debugする場合
  2. PyCharmのターミナルでpython manage.py ...を実行する場合(migration実行時など)

前者の場合(画面からRun/Debugする場合)の環境変数設定は別に難しくない。Run/Debug Configurationsから環境変数を追加すれば可能だ。

ただし、Configurationsの数 × 開発ラベルの種類の数だけ設定しなくてはいけないのでちょっと面倒ではある。本当はProjectレベルの環境変数があったら便利なのかもしれないが、ここは良い解決策がないので地道に行おう。

さて、このように設定してしても、後者の場合(PyCharmのターミナル、もしくは通常のターミナルで実行する場合)の環境変数には(当然ながら)開発ラベルの環境変数は反映されない。上で設定したものはあくまでPyCharmの画面から実行するときにのみ反映される環境変数だ。

そこで、別の環境変数設定方法が必要になる。しかし、いくつかアイデアを検討したが、あまりぱっとしたものが思うかばない。

というのは、ターミナルからexportしたり、.bash_profileなどでexportしても、1つのマシンで複数のProjectや開発環境がある場合を考えるとあまり筋がよくないからだ。また、manage.py系の実行をRun/Debug Configurationに設定してしまうことも考えたが、インタラクティブシェルを必要とすることが多く、そのような場合に立ち上がらないケースがあったのでこれも諦めた。PyCharm側で良さげな機能があればよいのだが、これも残念ながら見つからない。

そこで、ちょっと荒業だが、virtualenvのbin/activateの最後で直接exportするように運用ルールを設定した。具体的には以下のように設定するようにした。

vi ~/.virtualenvs/<Virtualenv環境名>/bin/activate
# 最後にexport DEPLOY_ENV=<dev/stg/prd>を記述

こうすることで、activateと同時に開発環境ラベルが設定できて楽になる。

なお、このように書き分けるために、当然Virtualenv環境は<Project名称>_<開発環境ラベル>(例:project_dev)のように書き分ける必要がある。

まとめ

PyCharmにおける開発環境ラベルは、

  • Run/Debug Configurationsで、それぞれの開発環境ごとにラベルをきちんと書く

  • また、python manage.py...でも反映させるために、virtualenvactivateの最後でもマシンごとに環境変数をexportする。

と良い。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[Python]gspreadでCSVをスプレッドシートへインポート

目的

作成したCSVファイルをスプレッドシートへ自動でインポートしたい。

Pythonのgspreadを使用して実際にやってみた。

前提条件

・Googleアカウント作成
・プロジェクト作成
・API有効化
・サービスアカウントキーの取得

こちらが全て完了している事が前提です。
筆者は下記リンクを参考にしました。

pythonでGoogle Driveの任意のフォルダにスプレッドシートを作成・編集する
PythonでGoogleスプレッドシートを編集

CSVをインポート

インポートテスト用CSVファイル
a,b,c
1,2,3
コード
import_test.py
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import csv

# GoogleAPI
scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']
# 認証用キー
json_keyfile_path = '認証用キーのファイルパス'
# サービスアカウントキーを読み込む
credentials = ServiceAccountCredentials.from_json_keyfile_name(json_keyfile_path, scope)
# pydrive用の認証
gauth = GoogleAuth()
gauth.credentials = credentials
drive = GoogleDrive(gauth)
# インポートするCSVファイル
csv_path = 'CSVファイルパス'
# スプレッドシート格納フォルダ(https://drive.google.com/drive/folders/<フォルダのID>)
folder_id = 'フォルダID'
# スプレッドシート新規作成
f = drive.CreateFile({
    'title': 'test',
    'mimeType': 'application/vnd.google-apps.spreadsheet',
    "parents": [{"id": folder_id}]})
f.Upload()
# gspread用の認証
gc = gspread.authorize(credentials)
# 作成したスプレッドシートのIDを取得
sheet_id = f['id']
# スプレッドシートを開く
workbook = gc.open_by_key(sheet_id)
# 作成したスプレッドシートにCSVをインポート
workbook.values_update(
    'Sheet1',
    params={'valueInputOption': 'USER_ENTERED'},
    body={'values': list(csv.reader(open(csv_path, encoding='utf_8_sig')))}
)
実行結果

test_spreadsheets.png

無事にインポート出来ました。

ドライブ用の認証処理

作成したいフォルダの指定

空のスプレッドシート作成

スプレッドシート用の認証処理

CSVインポート

という流れです。

既存のブックにシートを追加→CSVインポート

インポートテスト用CSVファイル
d,e,f
4,5,6
コード
import_test.py
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import csv

# GoogleAPI
scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']
# 認証用キー
json_keyfile_path = '認証用キーのファイルパス'
# サービスアカウントキーを読み込む
credentials = ServiceAccountCredentials.from_json_keyfile_name(json_keyfile_path, scope)
# pydrive用の認証
gauth = GoogleAuth()
gauth.credentials = credentials
drive = GoogleDrive(gauth)
# インポートするCSVファイル
csv_path = 'CSVファイルパス'
# スプレッドシート格納フォルダ(https://drive.google.com/drive/folders/<フォルダのID>)
folder_id = 'フォルダID'
# スプレッドシート格納フォルダ内のファイル一覧取得
file_list = drive.ListFile({'q': "'%s' in parents and trashed=false" % folder_id}).GetList()
# gspread用の認証
gc = gspread.authorize(credentials)
# スプレッドシートIDを取得
book_name = 'test'
sheet_id = [file['id'] for file in file_list if file['title'] == book_name]
sheet_id = sheet_id[0]
# スプレッドシートを開く
workbook = gc.open_by_key(sheet_id)
# ワークシート追加
sheet_name = 'Sheet2'
workbook.add_worksheet(title=sheet_name, rows=1000, cols=26)
# スプレッドシートにCSVをインポート
workbook.values_update(
    sheet_name,
    params={'valueInputOption': 'USER_ENTERED'},
    body={'values': list(csv.reader(open(csv_path, encoding='utf_8_sig')))}
)
実行結果

test_spreadsheets2.png

新たなシートを作成してCSVをインポートする手順はこんな感じになります。

フォルダ内に存在するファイル一覧を作成

ファイル一覧からCSVインポートしたいブックを検索してシートIDを取得

CSVインポート

という流れ。

ブックが既に存在する場合は新規ワークシートを作成、存在しない場合は新規ブックを作成してCSVをインポート

import_test.py
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import csv
import sys

# GoogleAPI
scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']
# 認証用キー
json_keyfile_path = '認証用キーのファイルパス'
# サービスアカウントキーを読み込む
credentials = ServiceAccountCredentials.from_json_keyfile_name(json_keyfile_path, scope)
# pydrive用の認証
gauth = GoogleAuth()
gauth.credentials = credentials
drive = GoogleDrive(gauth)
# インポートするCSVファイル
csv_path = 'CSVファイルパス'
# スプレッドシート格納フォルダ(https://drive.google.com/drive/folders/<フォルダのID>)
folder_id = 'フォルダID'
# スプレッドシート格納フォルダ内のファイル一覧取得
file_list = drive.ListFile({'q': "'%s' in parents and trashed=false" % folder_id}).GetList()
# ファイル一覧からファイル名のみを抽出
title_list = [file['title'] for file in file_list]
# 任意のブック名
book_name = '任意のブック名'
# ブックの存在判定
if book_name not in title_list:
    # ブック新規作成
    f = drive.CreateFile({
        'title': book_name,
        'mimeType': 'application/vnd.google-apps.spreadsheet',
        "parents": [{"id": folder_id}]})
    f.Upload()
    # gspread用に認証
    gc = gspread.authorize(credentials)
    # スプレッドシートIDを取得
    sheet_id = f['id']
    # ブックを開く
    workbook = gc.open_by_key(sheet_id)
    # ワークシート名を任意のシート名に変更
    worksheet = workbook.worksheet('Sheet1')
    sheet_name = '任意のワークシート名'
    worksheet.update_title(sheet_name)
else:
    # gspread用の認証
    gc = gspread.authorize(credentials)
    # スプレッドシートIDを取得
    sheet_id = [file['id'] for file in file_list if file['title'] == book_name]
    sheet_id = sheet_id[0]
    # ブックを開く
    workbook = gc.open_by_key(sheet_id)
    # 存在するワークシートの情報を全て取得
    worksheets = workbook.worksheets()
    # ワークシート名のみをリストへ格納
    worksheets_title_list = [sheet.title for sheet in worksheets]
    # 任意のワークシート名
    sheet_name = '任意のワークシート名'
    # 同一ワークシート名の存在判定
    if sheet_name == worksheets_title_list:
        # 同一のワークシート名が既に存在する場合はシートを作成せずに処理終了
        sys.exit()
    else:       
        # ワークシート追加
        workbook.add_worksheet(title=sheet_name, rows=1000, cols=26)
# スプレッドシートにCSVをインポート
workbook.values_update(
    sheet_name,
    params={'valueInputOption': 'USER_ENTERED'},
    body={'values': list(csv.reader(open(csv_path, encoding='utf_8_sig')))}
)

先程紹介した新規ブック作成パターンと新規ワークシート追加パターンを組み合わせた例。

ブックの新規作成が必要な場合と既存ブックへ書き込む場合のどちらにも対応出来る。

日次処理等で定期的にデータを追加する場合に便利。

同一ワークシート名の存在判定も追加する事によってエラーを回避。

最後に

お手軽にCSVをインポート出来てめちゃ便利。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

djsoerでのユーザ認証周りテスト

DRF(Django Rest Framework)とdjoser(jwt認証型のauthシステム)で開発しているが、認証周りのテストが初めてだったので、色々戸惑った。

なので、ユーザー認証テストと認証されたユーザでの投稿テストをメモしておく。

from rest_framework.test import APITestCase
from config.settings import TEST_USER_ID_PW


# TEST_USE_ID_PWは'username'と'password'をキーにもつdict。なお、ここのパスワードは平文で行う。

class TestSomeSerializer(APITestCase):
    def setUp(self):
        # テスト共通で用いるユーザーを作成
        self.register_response = self.client.post('/api/v1/auth/users/', TEST_USER_ID_PW) # login done
        self.assertEqual(self.register_response.status_code, 201)  # 201 Created

    def test_create_something(self):
        # 作成したユーザで投稿をテスト
        res = self.client.post('/api/v1/something/',
                               {'user': self.register_response.data['id'], 'message': 'abc'})
        self.assertEqual(res.status_code, 201)  # 201 Created

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonを使ってCSVの予定をGoogleカレンダーに載っけてみた

目標:Googleカレンダー⇔CSVファイル⇔別アプリのようにカレンダーと他アプリを連携すること。

その第一段階として、CSVのファイルに記述した予定をGoogleカレンダー上に登録するプログラムを作りました。

GoogleカレンダーAPIの利用方法

こちらのサイトを参考にさせていただきました。

【Python】Google Calendar APIを使ってGoogle Calendarの予定を取得・追加する | 無次元日記
https://non-dimension.com/python-googlecalendarapi/

Google Calendar APIのReference > Events: insert
https://developers.google.com/calendar/v3/reference/events/insert

新規イベントを登録するソースコード

上記サイトを参考にしてカレンダーに"変数eventの予定"を追加するコードは以下のようになりました。

CalendarAPI.py
from __future__ import print_function
import datetime
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/calendar']
CID = 'Your Calendar ID'

def insert(event):
    creds = None
    # The file token.pickle stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)

    service = build('calendar', 'v3', credentials=creds)

    # Call the Calendar API
    create_event = service.events().insert(calendarId=CID, body=event).execute()
    return create_event['id']

変数eventの中身

変数イベントの中身は上記Googleサイトを確認すると、start(開始時刻),end(終了時刻)が必須でその他様々なオプションがあります。
今回はSummary(イベントタイトル)、location(場所)、description(説明)を一緒に載っけます。

ソースコード

ソースコードは以下となります。

csv2calendar.py
import pandas as pd
import pprint
import datetime
import CalendarAPI
TIMEZONE = 'Japan'

def TransTime(t):
    if len(t) > 11:
        strtime = datetime.datetime.strptime(t, '%Y/%m/%d %H:%M').isoformat()
        dictime = {
            'dateTime': strtime,
            'timeZone': TIMEZONE
        }
    else :
        strtime = datetime.datetime.strptime(t, '%Y/%m/%d').isoformat()
        dictime = {
            'date': strtime[0:10],
            'timeZone': TIMEZONE
        }
    return dictime


csv = pd.read_csv('test.csv')
d = csv.to_dict(orient='index')

for h, num in zip(d.values(), d.keys()):
    h['start'] = TransTime(h['start'])
    h['end']   = TransTime(h['end'])
    del h['id']
    try:
        event_id = insert_calendar.insert(h)
    except:
        print("Error: Can't put id={} on your calendar".format(csv.loc[num, 'id']))
    else:
        print("Put id={} on your calendar as new_id={}".format(csv.loc[num, 'id'], event_id))
        csv.loc[num, 'id'] = event_id

csv.to_csv('test.csv',index=False)

CSVは以下のように記述しました。

test.csv
id,summary,location,description,start,end
1,会議1,会議室1,memo,2020/7/22 20:40,2020/7/23 0:30
2,会議2,会議室2,メモ,2020/7/30 12:00,2020/7/30 13:00
3,終日イベント,家,,2020/7/31,2020/07/31
4,終日イベント2,家,,2020/8/2,2020/8/3 12:00

説明

関数TransTimeについて

TransTimeは開始時刻、終了時刻の「年/月/日 時:分」を変換する関数です
送信するeventは以下の書式をとります。

{'summary': '会議1',
 'location': '会議室1',
 'description': 'memo',
 'start':  {'dateTime': '2020-07-22T20:40:00', 'timeZone': 'Japan'},
 'end': {'dateTime': '2020-07-23T00:30:00', 'timeZone': 'Japan'},      
},

時刻をISOフォーマットに変換します。このときタイムゾーンを設定する必要があります。
また、終日予定の場合は時刻を書かずに以下のように記します。

{'summary': '終日イベント',
 'location': '家',
 'description': nan,
 'start': {'date': '2020-07-31', 'timeZone': 'Japan'},
 'end': {'date': '2020-07-31', 'timeZone': 'Japan'}
 }

そのため、CSV内で日付のみのデータは後者こととしました。

本体

プログラムの挙動はCSVを読み込みPandas DataFrameを介して辞書型に変換します。
この辞書型それぞれに対し、eventのフォーマットに変換しAPIを通します。
CSVにはidの行を用意します。Googleカレンダーに予定を追加した場合、固有のidが帰ってきます。
これで上書きすることで、後々予定のアップデートなどを実装した際にイベントの識別ができるようにしました。

次の目標

CSVの変更に対しGoogleカレンダーを更新する。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

写真を上げるだけ書籍管理linebot作ってみた

はじめに

一人暮らしを始めてからというものの、読書が趣味になりました。
家には本棚が6か所あり、そのどこに何の本があるかをしりたくなったので、書籍管理line bot作ってみました。line botベンリィ

完成イメージ

写真をアップロード⇒本の登録

本情報の手打ちはしんどいなあと思ったので、本の写真アップロードで代用。
背表紙にあるISBN番号をOCRで読み取って、Google Books APIで書籍情報を呼び出します。
「これ登録しますけどどの棚に入れますか~」と聞かれるので、棚番号を応えて登録。
登録済のものについては※で忠告を出しておきます。
Screenshot_20200722-014617.png

検索 (本の名前)⇒本の検索

「検索」(スペース)(本の名前の一部)とすると、その言葉を含む本がどの棚になるかを教えてくれます。複数の場合も対応、該当がない場合は該当するものがないよ、と言ってくれます。
Screenshot_20200722-181829.png

スタンプ⇒キレる

スタンプに対しては厳しめの対応です。
そりゃあ書籍管理botだもんね。
スタンプなんて意味わかんないよね。
Screenshot_20200722-182148.png
スタンプかわいい

全体構造

本の登録

ざくっとした流れは
①写真アップロード
②cloud functionsが写真をcloud vision APIへ。写真内の文字を読み取る。
③cloud functionsでISBN番号を取り出し、google books APIへ。書籍情報を取得。
④cloud functionsからgoogle apps scriptに書籍情報を送り、スプレッドシートに入力。
となっています。

直接GASとやり取りすればいいじゃんと思った人もいると思います。
その通りです(作ってる最中に思いました)。
その通りなんです。。。クソウセッカクツクッチャッタカラ

本の検索

こちらの処理は
①検索_(書籍名の一部) と送る
②cloud functionsがgoogle apps scriptへ受け流す
③google apps scriptでスプレッドシート内の検索処理
④cloud functions経由でlineへ返す。
となっています。

GASメインの処理ですね。いよいよcloud functionsなぜ使った感。
見切り発車でcloud functionsを起動したのが仇でしたね。。。
だからline botとGASつなげればいいじゃんとか言わないでやめて石投げなry

サンプルコード

Cloud Functions

line botがpostする先の処理になります。
基本的にはpost内容に応じて処理を変えるようになっています。
言語はpythonで書いております。

import os
import base64, hashlib, hmac
import logging
import requests
import json
import re
import base64
import urllib.request
from flask import abort, jsonify
from linebot import (
    LineBotApi, WebhookParser
)
from linebot.exceptions import (
    InvalidSignatureError
)
from linebot.models import (
    MessageEvent, TextMessage, TextSendMessage
)

def main(request):
    #環境変数から各トークンの取得
    channel_secret = os.environ.get('LINE_CHANNEL_SECRET')
    channel_access_token = os.environ.get('LINE_CHANNEL_ACCESS_TOKEN')

    #botインスタンス
    line_bot_api = LineBotApi(channel_access_token)
    parser = WebhookParser(channel_secret)

    #lineから運ばれてきたデータ
    body = request.get_data(as_text=True)

    #認証周り
    hash = hmac.new(channel_secret.encode('utf-8'),
        body.encode('utf-8'), hashlib.sha256).digest()
    signature = base64.b64encode(hash).decode()
    if signature != request.headers['X_LINE_SIGNATURE']:
        return abort(405)
    try:
        events = parser.parse(body, signature)
    except InvalidSignatureError:
        return abort(405)

    #内容に応じて分岐
    for event in events:
        #メッセージタイプに応じて処理を変化
        message_type = event.message.type
        response_text = ""
        print(event)

        #テキストの場合、検索か棚番号
        if message_type == "text":
            #テキストの内容を取得
            message_text = event.message.text
            message_list = message_text.split()
            message = message_list[0]

            #検索の場合
            if message == "検索" and len(message_list) >= 2:
                body = {"mode":"search","target":message_list[1]}
                res = send_to_GAS(body)
                response_text = res["content"]

            #数値の場合(写真を挙げた後の棚番号)
            elif message in ["0","1","2","3","4","5","6"]:
                body = {"mode":"int", "int":message}
                res = send_to_GAS(body)
                print(res)
                response_text = res["content"]

            #その他
            else: 
                response_text = "本の背表紙のアップロードをするか、\n「検索(スペース)(本の名前の一部)」で検索してください"

        #写真の場合、本の登録
        elif message_type == "image":

            #写真取得
            message_id = event.message.id
            message_content = line_bot_api.get_message_content(message_id)

            #OCRで文字を取得
            res_json = vision_api(message_content)

            try:
                #文字からISBN番号を取得
                res = res_json['responses'][0]['textAnnotations'][0]['description']
                num = extractISBNNumbers(res)

                #ISBN番号から書籍を特定し一時保存
                info_json = searchBooksFromISBN(num)
                res = send_to_GAS(info_json)

                #登録済の場合、警告を足す
                addition = ""
                if res["content"] >= 1:
                    addition = "\n※既に登録済の書籍となっています"
                else:
                    pass

                #確認レスポンス
                response_text = "「"+info_json["book"]["title"]+"」を登録します。登録先の棚番号を1~6の数字で記入してください。"+addition

            except KeyError:
                print('画像の品質が悪いようです')

        #スタンプはキレる
        elif message_type == "sticker":
            response_text = "は???????"

        #その他はキレない
        else:
            response_text = "本の背表紙のアップロードをするか、\n「検索(スペース)(本の名前の一部)」で検索してください"

        #lineに返信
        line_bot_api.reply_message(
            event.reply_token,
            TextSendMessage(text=response_text))

    return jsonify({ 'message': 'ok'})

#共通method
#Google Apps ScriptとJSONのやり取りをする処理
def send_to_GAS(info_json):
    #基本パラメータ
    url = (Google Apps ScriptのURL)
    headers = {"Content-Type": "application/json"}

    #引数の内容をdumpsしてPOST
    body = json.dumps(info_json).encode("utf-8")
    post_request = urllib.request.Request(url, data=body, method="POST", headers=headers)

    #レスポンスをdictにparse
    with urllib.request.urlopen(post_request) as response:
        response_binary = response.read().decode("utf-8")
        response_dict = json.loads(response_binary)
        return response_dict

#ここから先は写真に対する処理
#取得した写真をcloud visionへ送って文字取得
def vision_api(image_content):

    #基本パラメータ
    GOOGLE_CLOUD_VISION_API_URL = 'https://vision.googleapis.com/v1/images:annotate?key='
    API_KEY = (cloud visionのAPI KEY)
    api_url = GOOGLE_CLOUD_VISION_API_URL + API_KEY

    #画像データをエンコード
    image_base64 = base64.b64encode(image_content.content).decode('utf-8')

    #POSTするJSONの用意
    req_body = json.dumps({
        'requests': [
            {
            'image': {
                'content': image_base64
            },
            'features': [
                {
                'type': 'TEXT_DETECTION',
                'maxResults': 1,
                }
            ]
        }]
    })
    res = requests.post(api_url, data=req_body)
    return res.json()

#取得した文字列からISBN番号を取得
def extractISBNNumbers(words):

    #ISBNが始まる文字の番号を取得し、そこから多めにとる
    startLetterNumber = words.find('ISBN')
    endLetterNumber = startLetterNumber + 21
    ISBN_Number = words[startLetterNumber:endLetterNumber]

    #数値のみ取り出す
    number = re.sub("\\D", "", ISBN_Number)
    if len(number)==13 or len(number)==10:
        return number
    else:
        print('ISBNナンバーが読み取れません')
        return 'null'

#取り出したISBN番号を利用して書籍情報を取得
def searchBooksFromISBN(num):
    #基本パラメータ
    API_URL = 'https://www.googleapis.com/books/v1/volumes?q=isbn:'
    FULL_API_URL = API_URL + num
    res = requests.get(FULL_API_URL).json()['items'][0]

    #取得したデータ
    title = res['volumeInfo']['title']
    authors_list = res['volumeInfo']['authors']
    authors_str = ''
    for author in authors_list:
        authors_str += author + ' '

    #returnするデータ
    res = {"mode":"input","book":
        {
        "box_num":"box1",
        "title":title,
        "authors":authors_str
        }}
    return res

Cloud Functionsこまごま

環境変数から取得

#環境変数から各トークンの取得
    channel_secret = os.environ.get('LINE_CHANNEL_SECRET')
    channel_access_token = os.environ.get('LINE_CHANNEL_ACCESS_TOKEN')

cloud functionsで環境変数に設定したものを取り出す処理です。
今回初めて知ったので備忘録的にメモさせてください。
QiitaとかGitHubに上げるときとかに、いちいちマスキングする必要がなくなるので便利そうですね。
参考:(https://qiita.com/spre55/items/da2ded18ac4652abb936)

Cloud Vision APIへのPOST処理

#取得した写真をcloud visionへ送って文字取得
def vision_api(image_content):

    #基本パラメータ
    GOOGLE_CLOUD_VISION_API_URL = 'https://vision.googleapis.com/v1/images:annotate?key='
    API_KEY = 'Cloud VisionのAPI Key'
    api_url = GOOGLE_CLOUD_VISION_API_URL + API_KEY

    #画像データをエンコード
    image_base64 = base64.b64encode(image_content.content).decode('utf-8')

    #POSTするJSONの用意
    req_body = json.dumps({
        'requests': [
            {
            'image': {
                'content': image_base64
            },
            'features': [
                {
                'type': 'TEXT_DETECTION',
                'maxResults': 1,
                }
            ]
        }]
    })
    res = requests.post(api_url, data=req_body)
    return res.json()

こちらがCloud Vision APIを叩く際の処理。
base64エンコードをしたり、jsonでdumpsするなどのお作法の実装に苦労しました。
テキスト検出がこんなに簡単にできるなんて便利な時代ですよね。
参考:https://qiita.com/atomyah/items/25db0c9c2ecd319218df
参考:「独学プログラマーのためのAIアプリ開発がわかる本」https://www.amazon.co.jp/%E7%8B%AC%E5%AD%A6%E3%83%97%E3%83%AD%E3%82%B0%E3%83%A9%E3%83%9E%E3%83%BC%E3%81%AE%E3%81%9F%E3%82%81%E3%81%AEAI%E3%82%A2%E3%83%97%E3%83%AA%E9%96%8B%E7%99%BA%E3%81%8C%E3%82%8F%E3%81%8B%E3%82%8B%E6%9C%AC-%E6%B2%B3%E5%90%88-%E5%A4%A7/dp/4046040076

Google Books APIへのPOST処理

#取り出したISBN番号を利用して書籍情報を取得
def searchBooksFromISBN(num):
    #基本パラメータ
    API_URL = 'https://www.googleapis.com/books/v1/volumes?q=isbn:'
    FULL_API_URL = API_URL + num
    res = requests.get(FULL_API_URL).json()['items'][0]

ISBNさえ手に入れば、書籍情報の引っこ抜きはお手軽にできるようです。
authorsやtitle以外の情報も何かに使ってみたいところ。
自分の読む本の傾向分析とかいいかもしれませんね。

Google Apps Script

こちらがスプレッドシートと直接やり取りをする処理になります。
言語はjavascriptですね。

function doPost(e){
  //シートオブジェクト
  var spreadsheet = SpreadsheetApp.openById(スプレッドシートのID);
  var sheet = spreadsheet.getSheetByName("mode");

  //POSTされた内容を確認
  var str = e.postData.contents;
  var json = JSON.parse(str);
  var mode = json["mode"];

  //①写真アップロードの場合
  //①A 一時セルに本情報を保存して確認する
  if (mode == "input"){
    inputTemp(json);
    res_list = searchTarget(json["book"]["title"])
    return returnAsJSON(res_list.length);

  //①B 棚番号を応えてきたら、そこに本情報を記入する
  } else if (mode == "int"){
    tempMode = sheet.getRange("B2").getValue();
    //写真アップロード後ならば、棚に記入
    if (tempMode == "input"){
      sheet.getRange("B2").setValue("")
      inputSheet(json);
      return returnAsJSON("完了しました");
    } 
    //写真をアップロードしていないならば、エラーを返す
    else {
      return returnAsJSON("本の背表紙のアップロードをするか、「検索(スペース)(本の名前の一部)」で検索してください");
    }

  //②検索の場合
  } else if (mode == "search"){
    res_list = searchTarget(json["target"]);
    res = buildResponse(res_list);
    return returnAsJSON(res);
  }
}

//共通パーツ
//引数をJSONでくるんでGoogle functionsに返す処理
function returnAsJSON(res){
  dict = {"content": res};

  // dictデータをjsonに変換
  // payloadをreturnするだけではだめ
  // ContentServiceを利用して、responseを作成
  payload = JSON.stringify(dict);

  //正味よくわからない
  ContentService.createTextOutput();
  var output = ContentService.createTextOutput();
  output.setMimeType(ContentService.MimeType.JSON);
  output.setContent(payload);
  return output;
}

//検索処理
function searchTarget(target){
  var ss = SpreadsheetApp.openById(スプレッドシートのID);

  //検索結果を入れるリスト
  var res_list = [];

  //検索してリストにぶち込む
  //シートを一つ一つ調べていく
  for (var sheetNum = 1; sheetNum < 6; sheetNum++){
    var sheet = ss.getSheets()[sheetNum];
    var textFinder = sheet.createTextFinder(target);
    var ranges = textFinder.findAll();

    //検索結果から取り出してリストにぶち込む
    for (var i = 0; i < ranges.length; i++ ) {
      var row = Math.round(ranges[i].getRow());
      var authorLocation = "A" + row;
      var titleLocation = "B" + row;
      author = sheet.getRange(authorLocation).getValue()
      title = sheet.getRange(titleLocation).getValue()
      res_list.push("\n棚"+sheetNum+":"+author+ "著 "+title);
    }
  }
  return res_list
}

//上記で作ったレスポンスのリストを文面にする処理
function buildResponse(res_list){

  //検索内容がある場合とない場合で返答を変える
  res = "検索結果:"
  if (res_list.length == 0){
    res = "該当するものはありませんでした";
  } else {
    for (i = 0; i < res_list.length; i++){
      res = res + res_list[i];
    }
  }
  Logger.log(res);
  return res;
}

//一時セルにスプレッドシートに一時保存
function inputTemp(json) {
  var spreadsheet = SpreadsheetApp.openById(スプレッドシートのID);
  var modeLocation = "B2";
  var authorLocation = "B3";
  var titleLocation = "C3";
  var sheet = spreadsheet.getSheetByName("mode");
  var location = sheet.getLastRow() +1;

  //スプレッドシートに一時保存
  sheet.getRange(modeLocation).setValue("input");
  sheet.getRange(authorLocation).setValue(json["book"]["authors"]);
  sheet.getRange(titleLocation).setValue(json["book"]["title"]);
}

//一時セルから棚のシートに記入
function inputSheet(json) {
  var spreadsheet = SpreadsheetApp.openById(スプレッドシートのID);
  var sheet = spreadsheet.getSheetByName("mode");

  //一時セルから本情報を取る
  author = sheet.getRange("B3").getValue();
  title = sheet.getRange("C3").getValue();

  //記入先のセル情報
  var sheet = spreadsheet.getSheetByName("box"+json["int"]);
  var location = sheet.getLastRow()+1;
  var authorLocation = "A" + location;
  var titleLocation = "B" + location;

  //記入処理
  sheet.getRange(authorLocation).setValue(author);
  sheet.getRange(titleLocation).setValue(title);
}

スプレッドシートの構成

写真から書籍情報を取り出して、linebotに「この書籍をどこに入れますか」と返す際に、取り出した書籍情報をどこかに保持する必要があるなあと思いました。
そこで今回はスプレッドシートにmodeというシートを作りました。
もっとスマートにやりたいんですが、この辺り何か良いアイデアがありましたら教えてください・・・!
image.png

こちらが本棚になります。
box1~box6までそれぞれシートを分けて書いております。
検索処理もここを調べて結果を返します。
image.png

今後やってみたいこと

ランダムで棚の中の本を出してくれる機能欲しいなあと思ってます。
処理自体はそれほど難しくないと思うんですが、まだまだ不慣れでGASとの疎通がうまくいかないときがしばしば。
本当になんでcloud functions挟んだろうか(白目)

あとは本の情報を一元管理できるようになったら、自分の読む本の傾向とか分析してみたいなあと思います。
どんな傾向の本を読むのか分析して、違うジャンルの本を読み始めてみるとか。
機械学習モデルをどこかにデプロイして、自分向けリコメンドシステムを作ってみるとか面白いかも。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonでデータの挙動を見やすくする可視化ツールを作成してみた

概要

Pythonのグラフ描画ライブラリ「seaborn」をベースにして、
相関係数や散布図などを一括で確認できる可視化ツールを作ってみました。
ggpairs_titanic_box_huesurvived.png
試行錯誤の結果、様々なデータパターンに対応した、汎用性の高いツールが出来た!
と感じています

背景と機能

本ツールの機能と、作成した背景を解説します

seabornとは?

Pythonのグラフ描画ライブラリで、ベースとなっているライブラリmatplotlibよりも、簡単なコードで綺麗なグラフを描くことができます

中でも、散布図とヒストグラムを組み合わせた「pairplot」は、
多変数の関係性を一度に可視化することができるため、
データ分析の初期段階で非常によく使われています。
pairplot_iris_huespecies.png

pairplotの課題

とても便利なpairplotですが、下記のようにいくつか課題があります。

課題1:右上の散布図が情報量ゼロ

pairplotの左下(下の図の赤枠)と右上(青枠)の散布図を見比べてください
diag_pairplot_iris_huespecies.png

よく見ると右上の散布図は左下の散布図の軸を反転させただけで、表示している情報は全く同じです。

右上の散布図は情報量ゼロ‥むしろ分析者に意味のない情報を見せている分、マイナスの効果すら生み出していると言えます。
(一応、縦軸横軸どちらの要素からスタートしても目的の2変数に辿り着けるというメリットはありますが‥)

課題2:離散的な変数は、散布図だと挙動が分かりずらい

今まで扱ってきた「iris」データセットは全て連続的な変数なので散布図がフィットしますが、
「titanic」データセットではどうでしょう?
under2_pairplot_titanic_nohue.png
"adult_male"、"alone"などの、離散的でとる値の少ない変数(赤枠で囲った部分)では、
点が重なってしまって挙動がよく分からないですね。

しかも、このような離散的な変数こそ実際のデータ分析では重要な場面も多い(下の「余談」を参照ください)ので、無視できる課題ではありません。

追加した機能

上記課題1、2をクリアするため、
R言語の可視化ライブラリ「GGally」等を参考にしつつ、
下記の2つの機能を加えてデータの傾向を見やすくしたメソッド

pairanalyzer

を作成しました

機能1:相関係数の表示

「課題1:右上の散布図の情報量がゼロ」をクリアするため、意味のある情報を載せたいと思い、
右上には相関係数を表示することとしました
ggpairs_iris_box_huespecies.png

相関係数の大小を直感的に分かりやすくするため、下記の機能も付け加えました
・相関係数の大小に応じて文字サイズを変更
・全体の相関係数を黒で、hue(後で解説します)ごとの相関係数を色分けして表示

機能2:箱ひげ図、バブルチャートへの自動表示変更

課題2で問題となっていた離散的な変数の傾向を把握しやすいよう、

・X,Yいずれかの変数の取る値が2種類以下のとき、箱ひげ図で表示
・もう1つの変数が4種類以下のとき箱ひげ図が作れないので、バブルチャートの大きさでデータの重複数を表示

に自動で表示変更する機能を追加し、データの挙動を見やすくしました
ggpairs_titanic_box_huesurvived.png
hueによる色分け機能もあるので、元のpairplotとそう変わらない感覚で使えると思います

使用に必要なもの

・Python本体 (動作確認時は3.7.7を使用)
・Matplotlibで表示可能な環境 (Jupyter等)
・下記ライブラリ (参考として動作確認時のライブラリバージョンも記載)
 seaborn (0.10.1)
 numpy (1.18.5)
 pandas (1.0.5)
 matplotlib (3.2.2)
 scipy (1.5.0)

コード

モジュールcustom_pair_plot.py内のクラスCustomPairPlotに、必要な処理をまとめました。
GitHubにもアップロードしています

モジュール本体

custom_pair_plot.py
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats

class CustomPairPlot():
    #初期化
    def __init__(self):
        self.df = None
        self.hue = None
        self.hue_names = None
        self.corr_mat = None

    #hueごとに相関係数計算
    def _corrfunc(self, x, y, **kws):
        if self.hue_names==None:
            labelnum=0
            hue_num = 0
        else:
            labelnum=self.hue_names.index(kws["label"])
            hue_num = len(self.hue_names)
        #xまたはyがNaNの行を除外
        mask = ~np.logical_or(np.isnan(x), np.isnan(y))
        x, y = x[mask], y[mask]
        #相関係数算出&0.4ごとにフォントサイズ拡大
        r, _ = stats.pearsonr(x, y)
        fsize = min(9, 45/hue_num) + min(4.5, 22.5/hue_num) * np.ceil(abs(r)/0.4)
        fsize = min(9, 45/hue_num) if np.isnan(fsize) else fsize
        #該当マスのaxを取得
        ax = plt.gca()
        #既に表示したhueの分だけ下にさげて相関係数表示
        ax.annotate("r={:.2f}".format(r), xy=(.1, .65-min(.15,.75/hue_num)*labelnum), xycoords=ax.transAxes, size=fsize, color=kws["color"])

    #hueを分けない相関係数計算して上半分に表示
    def _corrall_upper(self, g):
        #右上を1マスずつ走査
        for i, j in zip(*np.triu_indices_from(g.axes, 1)):
            #該当マスのaxesを取得
            ax = g.axes[i, j]
            plt.sca(ax)
            #フィールド名を取得
            x_var = g.x_vars[j]
            y_var = g.y_vars[i]
            #相関係数
            r = self.corr_mat[x_var][y_var]
            #相関係数0.2ごとにフォントサイズ拡大
            fsize = 10 + 5 * np.ceil(abs(r)/0.2)
            fsize = 10 if np.isnan(fsize) else fsize
            #一番上に表示
            ax.annotate("r={:.2f}".format(r), xy=(.1, .85), xycoords=ax.transAxes, size=fsize, color="black")

    #重複数に応じたバブルチャート
    def _duplicate_bubblescatter(self, data, x, y, hue=None, hue_names=None, hue_slide="horizontal", palette=None):
        #hueの要素数および値を取得
        if hue is not None:
            #hue_nameを入力していないとき、hueの要素から自動生成
            if hue_names is None:
                hue_names = data[hue].dropna().unique()
            hue_num = len(hue_names)
            hue_vals = data[hue]
        #hueを入力していないときも、groupby用にhue関係変数生成
        else:
            hue_names = ["_nolegend_"]
            hue_num = 0
            hue_vals = pd.Series(["_nolegend_"] * len(data),
                                      index=data.index)
        #hueで区切らない全データ数(NaNは除外)をカウント
        ndata = len(data[[x,y]].dropna(how="any"))

        ######hueごとにGroupByして表示処理######
        hue_grouped = data.groupby(hue_vals)
        for k, label_k in enumerate(hue_names):
            try:
                data_k = hue_grouped.get_group(label_k)
            except KeyError:
                data_k = pd.DataFrame(columns=data.columns,
                                      dtype=np.float)
            #X,Yごとに要素数をカウント
            df_xy = data_k[[x,y]]
            df_xy["xyrec"] = 1
            df_xycount = df_xy.dropna(how="any").groupby([x,y], as_index=False).count()
            #hueが2個以上存在するとき、表示位置ずらし量(対象軸のユニーク値差分最小値÷4に収まるよう設定)を計算
            if hue_num >=2:
                if hue_slide == "horizontal":
                    x_distinct_sort = sorted(data[x].dropna().unique())
                    x_diff_min = min(np.diff(x_distinct_sort))
                    x_offset = k * (x_diff_min/4)/(hue_num - 1) - x_diff_min/8
                    y_offset = 0
                else:
                    y_distinct_sort = sorted(data[y].dropna().unique())
                    y_diff_min = min(np.diff(y_distinct_sort))
                    x_offset = 0
                    y_offset = k * (y_diff_min/4)/(hue_num - 1) - y_diff_min/8
            else:
                x_offset = 0
                y_offset = 0
            #散布図表示(要素数をプロットサイズで表現)
            ax = plt.gca()
            ax.scatter(df_xycount[x] + x_offset, df_xycount[y] + y_offset, s=df_xycount["xyrec"]*1000/ndata, color=palette[k])

    #plotter=scatterかつ要素数が2以下なら箱ひげ図、それ以外ならscatterplotを使用
    def _boxscatter_lower(self, g, **kwargs):
        #kw_color = kwargs.pop("color", None)
        kw_color = g.palette
        #左下を走査
        for i, j in zip(*np.tril_indices_from(g.axes, -1)):
            ax = g.axes[i, j]
            plt.sca(ax)
            #軸表示対象のフィールド名を取得
            x_var = g.x_vars[j]
            y_var = g.y_vars[i]
            #XY軸データ抽出
            x_data = self.df[x_var]
            y_data = self.df[y_var]
            #XY軸のユニーク値
            x_distinct = x_data.dropna().unique()
            y_distinct = y_data.dropna().unique()

            #箱ひげ図(x方向)
            if len(x_distinct) ==2 and len(y_distinct) >= 5:
                sns.boxplot(data=self.df, x=x_var, y=y_var, orient="v",
                     hue=self.hue, palette=g.palette, **kwargs)
            #重複数に応じたバブルチャート(x方向)
            elif len(x_distinct) ==2 and len(y_distinct) < 5:
                self._duplicate_bubblescatter(data=self.df, x=x_var, y=y_var, hue=self.hue, hue_names=g.hue_names, hue_slide="horizontal", palette=g.palette)
            #箱ひげ図(y方向)
            elif len(y_distinct) ==2 and len(x_distinct) >= 5:
                sns.boxplot(data=self.df, x=x_var, y=y_var, orient="h",
                     hue=self.hue, palette=g.palette, **kwargs)
            #重複数に応じたバブルチャート(y方向)
            elif len(y_distinct) ==2 and len(x_distinct) < 5:
                self._duplicate_bubblescatter(data=self.df, x=x_var, y=y_var, hue=self.hue, hue_names=g.hue_names, hue_slide="vertical", palette=g.palette)
            #散布図
            else:
                if len(g.hue_kws) > 0:#マーカー指定あるとき
                    markers = dict(zip(g.hue_names, g.hue_kws["marker"]))
                else:#マーカー指定ないとき
                    markers = None
                sns.scatterplot(data=self.df, x=x_var, y=y_var, hue=self.hue,
                     palette=g.palette, style=self.hue, markers=markers)
            #凡例を追加
            g._clean_axis(ax)
            g._update_legend_data(ax)

        if kw_color is not None:
            kwargs["color"] = kw_color
        #軸ラベルを追加
        g._add_axis_labels()

    #メイン関数
    def pairanalyzer(self, df, hue=None, palette=None, vars=None,
             lowerkind="boxscatter", diag_kind="kde", markers=None,
             height=2.5, aspect=1, dropna=True,
             lower_kws={}, diag_kws={}, grid_kws={}, size=None):
        #メンバ変数入力
        if diag_kind=="hist":#ヒストグラム表示のとき、bool型の列を除外してデータ読込
            self.df = df.select_dtypes(exclude=bool)
        else:#kde表示のとき、bool型を含めデータ読込
            self.df = df
        self.hue = hue
        self.corr_mat = df.corr(method="pearson")
        #文字サイズ調整
        sns.set_context("notebook")

        #PairGridインスタンス作成
        plt.figure()
        diag_sharey = diag_kind == "hist"
        g = sns.PairGrid(self.df, hue=self.hue,
                 palette=palette, vars=vars, diag_sharey=diag_sharey,
                 height=height, aspect=aspect, dropna=dropna, size=None, **grid_kws)

        #マーカーを設定
        if markers is not None:
            if g.hue_names is None:
                n_markers = 1
            else:
                n_markers = len(g.hue_names)
            if not isinstance(markers, list):
                markers = [markers] * n_markers
            if len(markers) != n_markers:
                raise ValueError(("markers must be a singleton or a list of "
                                "markers for each level of the hue variable"))
            g.hue_kws = {"marker": markers}

        #対角にヒストグラム or KDEをプロット
        if diag_kind == "hist":
            g.map_diag(plt.hist, **diag_kws)
        elif diag_kind == "kde":
            diag_kws.setdefault("shade", True)
            diag_kws["legend"] = False
            g.map_diag(sns.kdeplot, **diag_kws)

        #左下に散布図etc.をプロット
        if lowerkind == "boxscatter":
            self._boxscatter_lower(g, **lower_kws)
        elif lowerkind == "scatter":
            g.map_lower(sns.scatterplot, **lower_kws)
        else:
            g.map_lower(sns.regplot, **lower_kws)

        #色分け(hue)有無で場合分けしてプロット&相関係数表示実行
        #hueなし
        if self.hue == None:
            #右上に相関係数表示
            self.hue_names = None
            self._corrall_upper(g)
        #hueあり
        else:
            #右上に相関係数表示(hueごとに色分け&全体の相関係数を黒表示)
            self.hue_names = g.hue_names
            g.map_upper(self._corrfunc)
            self._corrall_upper(g)
            g.add_legend()

各メソッドの解説

・pairanalyzer():メインの処理。外部から呼び出すのはこのメソッド
boxscatter_lower():変数の取る値に応じ、散布図・箱ひげ図・バブルチャートを切り替えるメソッド(上記「機能2」)
・_duplicate_bubblescatter():バブルチャート表示メソッド
corrall_upper():hueを分けない相関係数計算して上半分に表示
・_corrfunc():hueごとに相関係数計算して表示

使用方法

実行方法と引数の解説をします

実行方法

実行元スクリプトと同フォルダcustom_pair_plot.pyを置き、下記のように実行します

#参考としてtitanicデータを読み込んでいます
import seaborn as sns
titanic = sns.load_dataset("titanic")

#ここから実行部分
from custom_pair_plot import CustomPairPlot
gp = CustomPairPlot()
gp.pairanalyzer(titanic, hue='survived')

引数の解説

hue、lowerkind以外の引数は、
ベースとしたseabornのクラス「PairGrid」の引数と同仕様なので、
こちらもご参照ください

引数を指定しないとき

下記の引数が自動入力されます
(df, hue=None, palette=None, vars=None, lowerkind="boxscatter", diag_kind="kde", markers=None, height=2.5, aspect=1, dropna=True, lower_kws={}, diag_kws={}, grid_kws={}, size=None)

・表示例

gp.pairanalyzer(titanic)

ggpairs_titanic_box_nohue.png

hue

ここで指定した列名で色分け表示します。相関係数もhueごとに計算されます。
離散変数以外を指定するととんでもない事になるので、注意してください‥(笑)

gp.pairanalyzer(titanic, hue='survived')

ggpairs_titanic_box_huesurvived.png

palette

hueによる色分け用のカラーパレットを指定します

gp.pairanalyzer(titanic, hue='survived', palette='Blues')

ggpairs_titanic_box_hueblue.png

vars

グラフ化する列を選択する(指定しなければ全ての数値型&Boolean型の列を使用)

gp.pairanalyzer(titanic, hue='survived', vars=['pclass', 'age', 'adult_male'])

ggpairs_titanic_box_vars.png

lowerkind

左下に表示するグラフを下記3種類から選択します(指定しなければ、'boxscatter'を選択)

'boxscatter':前記「機能2」に従い、散布図・箱ひげ図・バブルチャートを自動切替
'scatter':散布図(通常のpairplotと同じ)
'reg':回帰直線

boxscatter

gp.pairanalyzer(titanic, hue='survived', lowerkind='boxscatter')

ggpairs_titanic_box_huesurvived.png

scatter

gp.pairanalyzer(titanic, hue='survived', lowerkind='scatter')

ggpairs_titanic_scatter.png

reg

gp.pairanalyzer(titanic, hue='survived', lowerkind='reg')

ggpairs_titanic_reg.png

diag_kind

対角に表示するグラフを下記2種類から選択します(指定しなければ、'kde'を選択)

'hist':ヒストグラム
'kde':ヒストグラムをカーネル密度推定で円滑化

hist

gp.pairanalyzer(titanic, hue='survived', lowerkind='hist')

ggpairs_titanic_hist.png

※pairplotおよびベースとなるseabornのクラスPairGridには、bool型をヒストグラム表示するとエラーを吐くバグがあるので、pairanalyzerではbool型の列(titanicでは'adult_male','alone')を事前に削除する処理を加えています

kde

gp.pairanalyzer(titanic, hue='survived', lowerkind='kde')

ggpairs_titanic_box_huesurvived.png

markers

hueにより色分けしたデータの、散布図プロット形状を指定します
指定方法はこちらを参照ください

gp.pairanalyzer(titanic, hue='survived', markers='+')

ggpairs_titanic_marker.png

List指定することで、hueごとに形状を変えることもできます。

gp.pairanalyzer(titanic, hue='survived', markers=['s','X'])

ggpairs_titanic_manymarker.png

height, aspect

グラフの高さと縦横比を指定します

gp.pairanalyzer(titanic, hue='survived', height=2, aspect=2)

ggpairs_titanic_heightaspect.png

その他の引数(dropna, lower_kws, diag_kws, grid_kws, size)

ベースとしたPairGridの仕様をそのまま使用していますが、
使いどころが分からなかったので本ページでは割愛します。
仕様はこちら参照ください
※その他の引数は動作テストも実施していないので、もしバグ等見つけた場合はコメント頂けるとありがたいです。

余談

本題とは直接関係ありませんが、タイタニック号のデータを本ツールで分析して面白い事実が可視化できたので、記載します

タイタニック号生還率と相関係数

titanicのデータで、生還を表す"survived"との相関係数を見てみます(下の図の赤枠)
titanic_corr_survived.png

相関係数の絶対値順位は下記のようになります

順位 変数名 相関係数
1 adult_male -0.56
2 pclass -0.34
3 fare 0.26

・2位のpclass(等級)、3位のfare(運賃)

どちらも乗船に支払った金額と関係するもので、

「金持ちの生還率が高い」
という結果が読み取れます(1等室はデッキが近く、避難距離が短く済んだそうです)
やはり世の中カネなのか‥というやるせない気分になります‥

・1位のadult_male(成人男性かどうか)

R = -0.56と生還率と強い相関がありそうです。
これは一見「成人男性は体力があるので生存率が高い」という、これまた当たり前の結論に見えます

しかしよく見ると、相関係数にマイナスがついています。
すなわち「体力のある成人男性の生存率が有意に低い」という、
感覚と矛盾した結果が導き出されます。
このことは上図緑枠で囲ったバブルチャートでもはっきりと確認できます

英国紳士はダテではなかった

気になって調べてみたところ、下のような記事が見付かりました。
https://www.afpbb.com/articles/fp/2564918

どうやら多くの乗客が、
女性や子供を優先して助ける
という行動をとった結果が、上記の相関として現れたようです。

驚くべきは、極限状況で数百人もの成人男性が、このような理性的な行動を取った事です。
下の記事のように、船長を始めとしたクルーの指揮が的確だったことも、秩序だった行動の理由として挙げられていますが、
いずれにせよ、英国紳士の自己犠牲の精神には頭が下がりますね
http://blog.livedoor.jp/science_q/archives/1652343.html

おわりに

以上のように分析からドラマが生まれる事もあるのだと、データの価値を再認識することができました。
私も100年後の分析者から後ろ指を差されないよう、清く正しく生きたいと思います…(笑)

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

あれってこの言語でどうやって書くのだろう?を解決するHyperpolyglot

1つのプログラミング言語ですべて完結すればいいけど、実際には複数の言語を使って組み合わせて使うのが一般的。そのときに直面するのが「あれ、これこの言語でどうやって書くのだろう?」の問題です。

実際に自分も仕事のメインプロジェクトではサーバーサイドはPython3でフロントエンドはTypeScriptを使っており、よくどっちがどっちかわからなくなって困ります。

この問題を解決するために最近、Hyperpolyglotを使い始めました。

Hyperpologlotは言語間のイディオムの違いを表でまとめてくれているサイトで、オブジェクトもしくは機能毎にイディオムをまとめてくれているサイトです。

自分のプロジェクトにずばりあう比較表は用意されていないのですが、Node.jsとPythonの比較でも基本的には問題ないので、自分はScripting Languages I: Node.js, Python, PHP, Ruby - Hyperpolyglotをよく使っています。node.jsではlodashを使って書いてあるところもあるので、今時そんなん使っているやついるんか…と思わんでもないですが。

余談

これをいちいち調べるのも面倒なので、エディタが間違ったらsuggestしてほしいところ。すでにそのようなプロジェクトはあるのか?なければ自分で作ろうかなと考えています。

参考

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pygameで簡単なスペースインベーダーゲームを作る方法

0.はじめに

今回作るものがどういう感じで動くのか見てみたい、この記事を読むのが面倒くさい方はこちら(Youtubeの動画)をご覧ください。長くなってしまったので記事を読んだ方が早いかもしれません。

1.実装

space_invader.py
import pygame
import math
import numpy as np

win = pygame.display.set_mode((600,400))

class Ship:
    def __init__(self,win):
        self.x = 300
        self.y = 350
        self.width = 20
        self.height = 40
        self.vel = 1
        self.win = win

    def draw(self):
        pygame.draw.rect(self.win,(255,255,255),(self.x,self.y,self.width,self.height),0)

    def move(self,keys):
        if keys[pygame.K_RIGHT] and self.x < 600 - self.width - self.vel:
            self.x += self.vel
        elif keys[pygame.K_LEFT] and self.x > self.vel:
            self.x -= self.vel

class Flower:
    def __init__(self,win,x):
        self.x = x
        self.y = 100
        self.radius = 30
        self.win = win
        self.hitcircle = (self.x,self.y)
        self.xdir = 1
        self.ydir = 0
        self.edge = False
        self.moveloop = 0

    def draw(self):
        pygame.draw.circle(self.win,(255,0,200), (self.x,self.y), self.radius)
        self.hitcircle = (self.x,self.y)

    def grow(self,flower):
        self.radius += 2

    def move(self):
        self.x += self.xdir

    def shiftDown(self):
        self.xdir *= -1
        self.y += self.radius
        self.edge = False


class Drop:
    def __init__(self,win,x):
        self.win = win
        self.y =  350
        self.vel = 10
        self.x = x
        self.r = 8

    def draw(self):
        pygame.draw.rect(self.win,(150,0,255),(self.x,self.y,self.r*2,self.r*2),0)

    def collide(self,drop,flower):
        dist = math.sqrt(np.abs(flower.hitcircle[0]-drop.x)**2+np.abs(flower.hitcircle[1]-drop.y)**2)
        if dist < 38:
            return True
        return False 

def drawWindow(ship,drops):
    ship.draw()

    for i in drops:
        i.draw()

    for b in flowers:
        b.draw()
        b.move()  

    pygame.display.update()

drops = []
flowers = []
shootloop = 0
for i in range(6):
    x = i*80 + 80
    flowers.append(Flower(win,x))
ship = Ship(win)
run = True
while run:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            run = False   

    keys = pygame.key.get_pressed()

    if shootloop > 0:
        shootloop += 1
    if shootloop > 20:
        shootloop = 0

    if keys[pygame.K_SPACE] and shootloop == 0:
        if len(drops) <= 10:
            drops.append(Drop(win,ship.x))

        shootloop = 1

    for drop in drops:
        if drop.y > 0:
            drop.y -= 1
        else:
            drops.pop(drops.index(drop))    

        for flower in flowers:
            if drop.collide(drop,flower):
                flower.grow(flower)
                drops.pop(drops.index(drop))

    for flower in flowers:
        if flower.x > 600 or flower.x < 0:
            flower.edge = True

        if flower.edge:
            flower.shiftDown()

    ship.move(keys)
    win.fill((51,51,51))
    drawWindow(ship,drops)

pygame.quit()

ここでは敵(Flower)、玉(Drop)、自分の船(Ship)の3つのクラスを作って動かしています。船は左右に動き、玉はスペースキーで発射され当たったかどうかの判定もし敵はだんだん近づいて来るようになっています。敵の近づいてくる処理は少し難しいかもしれませんがよく考えて見れば分かると思います。

最後に

今回作ったものはYoutubeでも解説しているのでそちらも良かったらご覧ください。質問等がございましたらその動画のコメント欄もしくは、この記事のコメント欄でどうぞ。また、いいなと思ったらチャンネル登録お願いします。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[Python, OpenCV]動画から検出した円をぴえんに変換する

ぴえんという顔文字をご存じでしょうか。
2019年ぐらいからJKやJCの間で流行りはじめた顔文字で、2020年には上半期のインスタ流行語大賞に選ばれたらしいです。
実際に見てもらった方が早いですね。

002・pien.png

いつからか誰かがこの顔文字と一緒に「ぴえん」という単語を使い始めて、それが定着してこの顔文字自体がぴえんと呼ばれるようになった・・・らしいです。

なんというかカルト的な人気を博したこのぴえん?ですが、この顔をした八頭身に追いかけられるようなゲームが出たり、「ぴえんこえてぱおん」というまた新しいよくわからない単語が登場したりしています。

さて、今回はこのぴえんをPythonとOpenCVを使ってカメラの映像に表示したいと思います。
まだどちらも触って1,2ヶ月程度なのでおかしな部分があるかもしれませんが、よろしくお願いします。

開発環境

  • macOS Catalina(使っているのはMacBookPro Mid 2014)
  • Python 3.7.7
  • numpy 1.18.5
  • OpenCV 3.4.2

Anacondaで仮想環境を作成しています。
カメラはeMeetのNovaを使っています。

ソース

pien.py
# -*- coding: utf-8 -*-
import cv2
import numpy as np

#カメラの取得
cap = cv2.VideoCapture(1)

while True:
    ret, frame = cap.read()
    #ハフ変換用にリサイズし、グレーカラーに変更
    frame = cv2.resize(frame, dsize=(640, 480))
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    gray = cv2.GaussianBlur(gray, (33, 33), 1)

    #ハフ変換による円形の抽出
    circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, 60, param1=10, param2=85, minRadius=1, maxRadius=80)

    #変換結果がある場合にぴえんの描写を行う
    if circles is not None:
        circles = np.uint16(np.around(circles))
        for i in circles[0,:]:
            #i[0]=x座標 i[1]=y座標 i[2]=半径
            x = i[0]; y = i[1]; r = i[2]
            #cv2.circleは円形 cv2.ellipseは楕円や楕円弧
            #輪郭
            cv2.circle(frame, (x, y), r, (0, 215, 255), -1)
            #眉毛右
            cv2.ellipse(frame, (x+int(r*2/3), y-int(r*2/3)), (int(r/3), int(r/4)), 0, 90, 165, (19, 69, 139), 3)
            #眉毛左
            cv2.ellipse(frame, (x-int(r*2/3), y-int(r*2/3)), (int(r/3), int(r/4)), 0, 15, 90, (19, 69, 139), 3)
            #涙右
            cv2.circle(frame, (x+int(r*2/5), y), int(r*3/10), (255, 255, 255), -1)
            #涙左
            cv2.circle(frame, (x-int(r*2/5), y), int(r*3/10), (255, 255, 255), -1)
            #黒目右
            cv2.circle(frame, (x+int(r*2/5), y-int(r/15)), int(r*3/10), (0, 0, 0), -1)
            #黒目左
            cv2.circle(frame, (x-int(r*2/5), y-int(r/15)), int(r*3/10), (0, 0, 0), -1)
            #白目大右
            cv2.ellipse(frame, ((x+int(r/3), y-int(r/6)), (int(r/3), int(r/4)), 135), (255, 255, 255), -1)
            #白目大左
            cv2.ellipse(frame, ((x-int(r/2), y-int(r/6)), (int(r/3), int(r/4)), 135), (255, 255, 255), -1)
            #白目小右
            cv2.ellipse(frame, ((x+int(r/2), y), (int(r/10), int(r/15)), 135), (255, 255, 255), -1)
            #白目小左
            cv2.ellipse(frame, ((x-int(r/3), y), (int(r/10), int(r/15)), 135), (255, 255, 255), -1)
            #口
            cv2.ellipse(frame, (x, y+r), (int(r/4), int(r/2)), 0, 245, 295, (19, 69, 139), 5)

    #ウィンドウにカメラの映像を表示 この時点でframeにはぴえんが描写されている
    cv2.imshow("frame", frame)

    #qキーを押したら終了する
    if cv2.waitKey(1)&0xff == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

コードの説明はそのままコメントで残しました。
重要なのは中程のハフ変換による円形の検出部分と、検出した円の座標・半径をもとにぴえんを作成する部分です。
ぴえんは画像を貼り付けているわけではなく、全て円と線で描画しているので、どんな大きさにも対応できるようになっています。
文字列も描写できるので?をそのまま使えないかと思ったのですがダメでした。

やってみた結果

仮想環境をconda activateした上でpien.pyを実行すると、カメラからの映像がウィンドウで表示されるかと思います。
そこでカメラを丸いものに向けてみましょう。

001・pien.gif

ぴえんになりました?
しっかり大きさが違ってもそれに合わせて変換されていますね。
今回は円を検出指定いるので、円の中に何が入っているかは特に関係ありません。時計でもキャップでもメガネでもぴえんになります。
コードを丸くすればそれもぴえんになります。
ちなみに瞳もぴえんになりました。(?ω?)みたいな感じです。

OpenCVを使えば円だけでなく例えば人の顔も検出できます。
つまりカメラに移った顔を全て?にすることができるということです。
そこら辺はそのうち試してみたいと思います。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pygameで紫の雨を降らせる方法

0.はじめに

今回作るものがどういう感じで動くのか見てみたい、この記事を読むのが面倒くさい方はこちら(Youtubeの動画)をご覧ください。

1.実装

purple_rain.py
import pygame
import random

WIDTH = 640
HEIGHT = 360

win = pygame.display.set_mode((WIDTH,HEIGHT))

class Drop:
    def __init__(self,win):
        self.x = random.uniform(0,WIDTH)
        self.y = -1
        self.yspeed = random.uniform(1,3)
        self.win = win
        self.length = random.uniform(10,20)

    def fall(self):
        self.y += self.yspeed

    def show(self):
        pygame.draw.rect(self.win,(138,43,226),(self.x,self.y,2,self.length))

def drawWindow(rain):
    for water in rain:
        water.show()
        if water.y <= 360:
            water.fall()
        else:
            rain.pop(rain.index(water))

    pygame.display.update()

rain = []
run = True
while run:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            run = False

    win.fill((230,230,250))

    if len(rain) < 500:
        rain.append(Drop(win))

    drawWindow(rain)
pygame.quit()

クラスを使ったので長くなってしまいましたが、やってることは雨をランダムな長さ、位置、速度で落としていき画面の外に消えたら追加というのをやっています。簡単ですね!

最後に

今回作ったものはYoutubeでも解説しているのでそちらも良かったらご覧ください。質問等がございましたらその動画のコメント欄もしくは、この記事のコメント欄でどうぞ。また、いいなと思ったらチャンネル登録お願いします。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

exp_01/ exp_02/ exp_03/ .../ 「このディレクトリ何やったけ?」を解消したい!

こんな経験ありませんか?

  • なんのディレクトリか忘れた
  • 先輩からもらったデータのディレクトリ構造がなんのこっちゃわからん
  • 初めて使うフレームワークのディレクトリ構造のメモがほしい

つくったもの

ls-Improvedというコマンド上で動作するlsっぽいコマンドです.

どんなもの?

image.png

このように,ディレクトリの説明をつけることができます.また,少しだけlsをリッチに表示してくれます.
例はrails tutorialの説明をrailsのデフォルトディレクトリの説明文としてつけました.

こんなことができます!

  • 機械学習等の実験パラメータの整理
    lsi_using.png

  • チーム開発におけるディレクトリ構造のコメント
    image (3).png

  • ターミナル映え
    image (11).png

導入方法

ローカルにpython環境が必要です.一応2系でも3系でも動きます!
pip install ls-Improved
python環境さえあれば,コマンドプロンプト等でも動きます.

基本的な使い方

lsi

ファイルと説明文が付いたディレクトリを表示してくれます.説明文が無い時はDirがデフォルトで付きます.
image (12).png

lsi -F

ファイルのみの表示をしてくれます.
image (4).png

lsi -D

説明文が付いたディレクトリを表示してくれます.
image (5).png

lsi -a

隠しファイルも含めて全て表示してくれます.
image (6).png

lsi -s "<検索ワード>"

このようにして,説明文から該当するディレクトリを検索することが可能です.
image (7).png

ディレクトリと説明文の両方で検索することができます.
image (9).png

mkdiri <ディレクトリ名> "<説明文>"

説明文を付けたディレクトリを作成することができます.
image (8).png

説明文を装飾する

  • \n:改行
  • ;r;:赤色にする
  • ;g;:緑色にする
  • ;b;:青色にする
  • ;w;:白色にする
  • ;p;:紫にする
  • ;_;:アンダーラインをつける
  • ;e;:装飾を解除する mkdiri_decoration.png

mkdiri -a <ディレクトリ名> "<説明文>"

すでに存在するディレクトリに説明文を上書きすることができます.
image (10).png

どうなってんの?

ディレクトリ内に隠しファイルとして.description.lsiというファイルが自動で作成されます.このファイルの中身を直接編集することも可能です.

最後に

筆者はvimが大好きなので,Vi-Improvedにあやかってls-Improvedと命名しました.ふざけた名前にしてしまい申し訳ありません.?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

exp_01/ exp_02/ exp_03/ .../ 「このディレクトリ何やったっけ?」を解消したい!

こんな経験ありませんか?

  • なんのディレクトリか忘れた
  • 先輩からもらったデータのディレクトリ構造がなんのこっちゃわからん
  • 初めて使うフレームワークのディレクトリ構造のメモがほしい

つくったもの

ls-Improvedというコマンド上で動作するlsっぽいコマンドです.

どんなもの?

image.png

このように,ディレクトリの説明をつけることができます.また,少しだけlsをリッチに表示してくれます.
例はrails tutorialの説明をrailsのデフォルトディレクトリの説明文としてつけました.

こんなことができます!

  • 機械学習等の実験パラメータの整理
    lsi_using.png

  • チーム開発におけるディレクトリ構造のコメント
    image (3).png

  • ターミナル映え
    image (11).png

導入方法

ローカルにpython環境が必要です.一応2系でも3系でも動きます!
pip install ls-Improved
python環境さえあれば,コマンドプロンプト等でも動きます.

基本的な使い方

lsi

ファイルと説明文が付いたディレクトリを表示してくれます.説明文が無い時はDirがデフォルトで付きます.
image (12).png

lsi -F

ファイルのみの表示をしてくれます.
image (4).png

lsi -D

説明文が付いたディレクトリを表示してくれます.
image (5).png

lsi -a

隠しファイルも含めて全て表示してくれます.
image (6).png

lsi -s "<検索ワード>"

このようにして,説明文から該当するディレクトリを検索することが可能です.
image (7).png

ディレクトリと説明文の両方で検索することができます.
image (9).png

mkdiri <ディレクトリ名> "<説明文>"

説明文を付けたディレクトリを作成することができます.
image (8).png

説明文を装飾する

  • \n:改行
  • ;r;:赤色にする
  • ;g;:緑色にする
  • ;b;:青色にする
  • ;w;:白色にする
  • ;p;:紫にする
  • ;_;:アンダーラインをつける
  • ;e;:装飾を解除する mkdiri_decoration.png

mkdiri -a <ディレクトリ名> "<説明文>"

すでに存在するディレクトリに説明文を上書きすることができます.
image (10).png

どうなってんの?

ディレクトリ内に隠しファイルとして.description.lsiというファイルが自動で作成されます.このファイルの中身を直接編集することも可能です.

最後に

筆者はvimが大好きなので,Vi-Improvedにあやかってls-Improvedと命名しました.ふざけた名前にしてしまい申し訳ありません.?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️?‍♂️

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

グラフで理解する独立成分分析

はじめに

 混ざった信号を分離する方法として、独立成分分析があります。
例えば、音源が3つあったとして、それらの混ざり具合の異なる信号が3つあれば分離できる場合があります。
ブラインド音源分離と呼ばれる分野です。
 ブラインド音源分離をやって見る前に、数学的な理論は置いておいて、独立成分分析の手順をグラフを書きながら学びたいと思います。

この投稿の大まかな流れは次の通りです。

  1. 主成分分析 無相関化の話
  2. 中心極限定理 どんな分布も足すとガウス分布になる話
  3. 独立成分分析 非ガウス性を最大化するように変換する話

2020_0722_ICA_rotation.gif

参考

独立成分解析の信号処理への応用

データ化学工学研究室 独立成分分析 (Independent Component Analysis, ICA) ~PCAの無相関より強力な ”独立” な成分を抽出~

使用するモジュールと関数

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.decomposition import FastICA

円を描くときに使います。

def circle(n,r=1):
    rad = np.linspace(0.0,2.0*np.pi,n)
    return np.asarray([np.cos(rad),np.sin(rad)]).T

データを回転させるときに使います。

def rotation_matrix(rad):
    return np.array([[np.cos(rad),-np.sin(rad)],[np.sin(rad),np.cos(rad)]])

独立成分分析

 無相関、独立という言葉はあまり日常では馴染みがありません。
数学で言う無相関とは、何の関係もないという意味ではなく、変数間に線形な関係がないことを表します。
非線形な関係があっても線形な関係がなければ無相関です。
独立は、無相関より厳しい条件です。非線形な関係もない状態を指します。
独立であるなら無相関となります。

独立な成分を抜き出したいのですが、独立であるなら無相関なので、まずは無相関化を行います。

無相関化(主成分分析)

 細かい導出はしませんが、データが多変量正規分布に従うとして、その分散共分散行列を対角化することを目指すか、基底の貼る空間に射影したときのデータの分散が最大になるような直交基底変換行列を探すことを目指します。
 結果的にはどちらも同じになり、分散共分散行列の固有値・固有ベクトルを求めることになります。

主成分分析の手順は以下のとおりです。

  1. データから平均を引いて平均を0にする
  2. 分散共分散行列を計算する
  3. 分散共分散行列を固有値分解する

実際に主成分分析を行うと何がどうなるのかグラフで見てみます。
使用するデータは多変量ガウス分布です。

np.random.seed(0)
mean = np.array([0.0, 0.0])
cov = np.array([[1.0, 0.6], [0.6, 1.0]])
data = np.random.multivariate_normal(mean, cov, size=100)

主成分分析を行います。

data = data - np.mean(data,axis=0) #各次元の平均を0にする
cov = np.cov(data, rowvar=False) #分散共分散行列
u,w,_ = np.linalg.svd(cov) #固有値分解 uが固有ベクトル wが固有値 基底変換後のcovの対角になる

#分散共分散行列の楕円を描く 確率長円と違って大きさは適当
c = circle(200,r=1.0)
c = c*np.sqrt(w) #楕円にする
c = c.dot(u) #回転させる

fig,axes = plt.subplots(figsize=(6,8))
axes.scatter(data.T[0],data.T[1])
axes.quiver(0,0,u[0,0],u[0,1],color = "red",angles = "xy", scale_units = "xy",scale = 0.5)
axes.quiver(0,0,u[1,0],u[1,1],color = "red",angles = "xy", scale_units = "xy",scale = 0.5)
axes.plot(c.T[0]*3,c.T[1]*3)
axes.set_aspect("equal")
axes.set_ylim(-4.0,4.0)
axes.set_xlim(-4.0,4.0)

円は確率長円もどきです。大きさは適当です。

多変量正規分布.png

元のデータに分散共分散行列の固有ベクトルをかけて、基底変換します。

pca_data = data.dot(u) #回転
pca_ax = u.dot(u) #赤い矢印の回転
c = c.dot(u) #楕円の回転

グラフの描画のコードは上とほとんど同じなので省略します。

PCA_kaiten.png

軸が回転した様子がわかると思います。

続いて分散共分散行列が単位行列になるように正規化します。
基底変換した後のデータを固有値の平方根で割ります。

norm_pca_data = pca_data/np.sqrt(w) #データの正規化
norm_c = c / np.sqrt(w) #円の正規化

PCA正規化.png

確認のため、このデータの分散共分散行列を計算してみます。

np.cov(norm_pca_data,rowvar=False)

array([[ 1.00000000e+00, -2.14306523e-17],
       [-2.14306523e-17,  1.00000000e+00]])

データを回転させても、分散共分散行列は単位行列のままです。

np.cov(norm_pca_data.dot(rotation_matrix(np.pi/2)),rowvar=False)

array([[1.00000000e+00, 3.23453187e-18],
       [3.23453187e-18, 1.00000000e+00]])

独立成分分析で残ったやることは、この回転だけです。

中心極限定理

どんな分布も足し合わせると正規分布に近づいていきます。
これを中心極限定理と言います。
試しに一様分布を足し合わせてみます。

np.random.seed(0)
uni_dist = np.random.random(10000)
uni_dist_5 = np.random.random(10000)+np.random.random(10000)+np.random.random(10000)+np.random.random(10000)+np.random.random(10000)

fig,axes = plt.subplots()
axes.hist(uni_dist,bins=30)

fig,axes = plt.subplots()
axes.hist(uni_dist_5,bins=30)

一様分布.png

中心極限定理.png

一様分布も足し合わせるとガウス分布に近づきました。

 独立成分分析では、分離後にガウス分布から離れるように変換を行います。
これは、混ざる前の分布が非ガウス分布であることを仮定しているためです。
 どれくらいガウス分布から離れたかの指標に、ここでは高次統計量の尖度を使います。
尖度は正規分布であれば定数ですが、尖った分布になると値が大きくなります。

 先程は多変量ガウス分布をテストデータに使いましたが、実は主成分分析した時点で独立になってしまうので次は違うデータでやります。

非ガウス性の最大化

ガウス分布に従わないデータを作ります。

x = 3*np.random.random(5000)
x -= x.mean()
y = 0.2*np.random.random(5000)
y -= y.mean()
data = np.array([x,y]).T
data = np.vstack([data,data.dot(rotation_matrix(np.pi/4))])

ICA_originData.png

まずは主成分分析して無相関化します。

data = data - np.mean(data,axis=0) #各次元の平均を0にする
cov = np.cov(data, rowvar=False) #分散共分散行列
u,w,_ = np.linalg.svd(cov) #固有値分解

pca_data = data.dot(u) #無相関化

ICA_PCAしたあと.png

標準偏差が1になるように正規化します。

norm_pca_data = pca_data/np.sqrt(w)

ICA_PCAして正規化.png

回転させて最もガウス分布から離れる角度を調べます。
通常は勾配法などを使うそうですが、ここでは総当りです。
アニメーションで見てみます。グラフ描画のコードが煩雑になってしまいました。

rad_array = np.linspace(0.0,np.pi/2.0,100)

kurt_array = []
for rad in rad_array:
    rot_data = norm_pca_data.dot(rotation_matrix(rad))
    kurt = np.sum(stats.kurtosis(rot_data))
    kurt_array.append(kurt)

for i,rad in enumerate(rad_array):
    rot_data = norm_pca_data.dot(rotation_matrix(rad))
    kurt = np.sum(stats.kurtosis(rot_data))

    fig = plt.figure(figsize=(10,10))
    ax1,ax2,ax3,ax4 = fig.add_subplot(2,2,1),fig.add_subplot(2,2,2),fig.add_subplot(2,2,3),fig.add_subplot(2,2,4)

    ax1.plot(rad_array,kurt_array)
    ax1.scatter(rad,kurt,color="red")
    ax1.set_xlabel("Rotation angle rad")
    ax1.set_xlim(-0.1,1.6)
    ax1.set_ylim(-2.5,1.3)

    ax2.scatter(rot_data.T[0],rot_data.T[1],alpha=0.03) #元データ回転
    ax2.plot(c.T[0]*3,c.T[1]*3)
    ax2.set_aspect("equal")
    ax2.set_ylim(-4.0,4.0)
    ax2.set_xlim(-4.0,4.0)
    ax2.set_xlabel("s1")
    ax2.set_ylabel("s2")

    ax3.hist(rot_data.T[0],bins=30,label="s1")
    ax4.hist(rot_data.T[1],bins=30,label="s2")

    ax3.legend(loc="upper right")
    ax4.legend(loc="upper right")

    ax1.set_ylabel("kurtosis")
    fig.savefig("{}.png".format(i),dpi=100)
    plt.close()

2020_0722_ICA_rotation.gif

尖度が最大になったときのデータはこれです。
ICA.png

合っているのか確認のためsklearnのFastICAでも独立成分分析を行ってみます。

decomposer = FastICA(n_components = 2)
decomposer.fit(data)
S = decomposer.transform(data)

fig,axes = plt.subplots(figsize=(6,6))
axes.scatter(S.T[0],S.T[1],alpha=0.1)
axes.set_aspect("equal")

sklearn_ICA.png

絶対値は同じになりませんが、見た目は同じです。問題なさそうです。

まとめ

独立成分分析は、無相関化、正規化、非ガウス性の最大化の順に行えば良いことがわかりました。
次は音声信号で試してみたいと思います。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

テストデータ用の自然な名字と名前を同姓同名無しで作るPythonのスクリプト

美しいテストデータを作るために

私の会社では人事向けのシステムを販売しているのですが
テストデータがいかにもテストデータのようで美しくありません
例えば人物名が「AAA」とか「テスト太郎」といった具合です。

普通の名前って意外と難しい

私の場合全国の都道府県でもっともありふれている苗字と明治安田生命が発表した新生児につけたお名前のランキングをランダムで出力するといったところにいったん落ち着きました。

ソースコード

# -*- coding:utf8 -*-
import numpy as np
import random
arr1 = ["佐藤","鈴木","田中","高橋","田中","伊藤","渡辺","山本","中村","小林"," 加藤","吉田","山田","佐々木","山口","松本","根岸","小池","関口","飯山","飯田"]
arr2 = ["健斗","誠","怜","琉生","花","蓮","陽翔","樹","悠真","咲良","悠斗","大翔","凛","陽葵","さくら","美月","結愛","大和","颯真","芽依"]

def cartesian(arrays, out=None):
    """
    Generate a cartesian product of input arrays.

    Parameters
    ----------
    arrays : list of array-like
        1-D arrays to form the cartesian product of.
    out : ndarray
        Array to place the cartesian product in.

    Returns
    -------
    out : ndarray
        2-D array of shape (M, len(arrays)) containing cartesian products
        formed of input arrays.

    Examples
    --------
    >>> cartesian(([1, 2, 3], [4, 5], [6, 7]))
    array([[1, 4, 6],
           [1, 4, 7],
           [1, 5, 6],
           [1, 5, 7],
           [2, 4, 6],
           [2, 4, 7],
           [2, 5, 6],
           [2, 5, 7],
           [3, 4, 6],
           [3, 4, 7],
           [3, 5, 6],
           [3, 5, 7]])

    """

    arrays = [np.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = np.prod([x.size for x in arrays])
    if out is None:
        out = np.zeros([n, len(arrays)], dtype=dtype)

    m = n / arrays[0].size
    out[:,0] = np.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian(arrays[1:], out=out[0:m, 1:])
        for j in xrange(1, arrays[0].size):
            out[j*m:(j+1)*m, 1:] = out[0:m, 1:]
    return out

if __name__ == '__main__':
    res = cartesian((arr1,arr2))
    result = list()
    for row in res:
        fullname = ' '.join(row)
        result.append(fullname)

    random.shuffle(result)
    for ln in result:
        print ln



感想

男性らしい名前や女性らしい名前って敬遠されてるんですね。。

改訂履歴

  • 2020/7/22 新規作成
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

テストデータ用の自然な姓名を同姓同名無しで作るPythonのスクリプト

美しいテストデータを作るために

私の会社では人事向けのシステムを販売しているのですが
テストデータがいかにもテストデータのようで美しくありません
例えば人物名が「AAA」とか「テスト太郎」といった具合です。

普通の名前って意外と難しい

私の場合全国の都道府県でもっともありふれている苗字と明治安田生命が発表した新生児につけたお名前のランキングをランダムで出力するといったところにいったん落ち着きました。

ソースコード

# -*- coding:utf8 -*-
import numpy as np
import random
arr1 = ["佐藤","鈴木","田中","高橋","田中","伊藤","渡辺","山本","中村","小林"," 加藤","吉田","山田","佐々木","山口","松本","根岸","小池","関口","飯山","飯田"]
arr2 = ["健斗","誠","怜","琉生","花","蓮","陽翔","樹","悠真","咲良","悠斗","大翔","凛","陽葵","さくら","美月","結愛","大和","颯真","芽依"]

def cartesian(arrays, out=None):
    """
    Generate a cartesian product of input arrays.

    Parameters
    ----------
    arrays : list of array-like
        1-D arrays to form the cartesian product of.
    out : ndarray
        Array to place the cartesian product in.

    Returns
    -------
    out : ndarray
        2-D array of shape (M, len(arrays)) containing cartesian products
        formed of input arrays.

    Examples
    --------
    >>> cartesian(([1, 2, 3], [4, 5], [6, 7]))
    array([[1, 4, 6],
           [1, 4, 7],
           [1, 5, 6],
           [1, 5, 7],
           [2, 4, 6],
           [2, 4, 7],
           [2, 5, 6],
           [2, 5, 7],
           [3, 4, 6],
           [3, 4, 7],
           [3, 5, 6],
           [3, 5, 7]])

    """

    arrays = [np.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = np.prod([x.size for x in arrays])
    if out is None:
        out = np.zeros([n, len(arrays)], dtype=dtype)

    m = n / arrays[0].size
    out[:,0] = np.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian(arrays[1:], out=out[0:m, 1:])
        for j in xrange(1, arrays[0].size):
            out[j*m:(j+1)*m, 1:] = out[0:m, 1:]
    return out

if __name__ == '__main__':
    res = cartesian((arr1,arr2))
    result = list()
    for row in res:
        fullname = ' '.join(row)
        result.append(fullname)

    random.shuffle(result)
    for ln in result:
        print ln



感想

男性らしい名前や女性らしい名前って敬遠されてるんですね。。

改訂履歴

  • 2020/7/22 新規作成
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【pytest】をFlask+docker開発で初めて使ってみた

Flask + docker-composeでのWebアプリ開発に初めてpytestを使用しました。
その際のテストファイルと、設定内容をザックリまとめました。

親の記事はコチラとなります。
■Flask+Docker+Vue.js+AWS...でゲームWebAppを作ってみた。

■Githubにソースコード公開しています

テスト書くとこんなに良い事ある

今まで、テストプログラムを避けていた人生でした:scream_cat:
偶然ですが、テストを必要としないプロジェクトばかりで使う機会が無かったといえば言い訳ですが、心のなかで「あー、本当はテストとか書いといた方が良いんだろうなー」と思いつつも、重い腰が持ち上がらず。。。
そんな方は、私だけでなく多いと思います!

心機一転でテストを書こうと決心したのは、こんなメリットがあります。
テストコードを書くメリット

  • コード変更した際に、挙動確認が簡単&確実になる。
  • テストを通すことで、チーム内のエンジニア同士で、問題なく動くことのエビデンスとなる。
  • エラーがいつからどこで、おかしくなったのか、把握することが容易になる。
  • チーム内のエンジニア同士の仕様の齟齬を無くすことができる。

などメリットは多くありますが、
「そもそもテストを書いて、開発しながら同時にチェックすれば、デバッグ作業の延長でプチプチ不具合を潰しながら早く確実に開発できるのでは!!」
と思ったのが一番の理由です。(人はそれをテストドリブン開発と呼ぶ)

なんでpytest

  • 読みやすくて、書きやすい
  • プラグインが豊富

この2つが主なpytestを選んだ理由です。
特に読みやすさ、書きやすさの点にですが、誤解を恐れず言い切ってしまえば、
pytestはassert文があっているか、どうかを判定するパッケージ。
判定箇所はassertしか使いません。

assert 1 + 2 == 3

assert文がTrueであれば、PASSED(正常)、FalseであればFAILED(テスト失敗)となります。
シンプルで分かりやすい。

ディレクトリ構成

Flask+dockerのプロジェクトで使ったテストコードのディレクトリ構成を紹介します。
主要なファイル、ディレクトリ以外は割愛しています。

# 概略

.
├── introdon
│  ├── __init__.py
│  ├── models
│  ├── templates
│  └── views
├── pytest.ini
├── server.py
└── tests
  ├── __init__.py
  ├── conftest.py
  ├── func
  │  ├── __init__.py
  │  ├── test_games.py
  │  └── test_users.py
  ├── sql
  │  ├── introdon_games.sql
  │  ├── introdon_logs.sql
  │  ├── introdon_songs.sql
  │  └── introdon_users.sql
  └── unit
    ├── __init__.py
    └── test_units.py

コンテンツルートから、テストコマンドを実行するつもりで構成しています。

introdonディレクトリservr.pyはFlaskのWebアプリファイルやディレクトリです。
テストを受ける対象になるファイルとなります。

pytest.iniと、testsディレクトリがpytestの関連ファイルです。

pytest.ini

pytestのデフォルトの振る舞いを変更できるようにするメインの構成ファイルです。
初期設定のようなもので、テスト実行するにあたり、ユーザーが細かく設定することができます。
コマンドに毎回書かなくても、テスト範囲を限定したり、オプションを追加するなどできるので便利です。

以下を設定しました。

[pytest]
addopts = --emoji -rsxX -l --strict
norecursedirs = .* *.egg sql
testpaths = tests
markers =
    use_mock: Used mock, Not SQL

addopts

コマンドオプションを追加しています。

  • --emoji: テスト結果に絵文字を追加
  • -rsxX: skipped,xfailed,xpassedになった全てのテストで理由が表示されるようになる。
  • --strict: markの厳密さ、 markersで定義していないものを使うとエラーにする

norecursedirs

テストファイルを検索しないディレクトリ名を指定しています。

testpaths

テストファイルを検索するディレクトリを限定する

pytestが、実行するテストを検索することをテストディスカバリと言います。
testpaths, norecursedirsの指定により、テストディスカバリをかなり限定することで検索時間や負荷を削減しています。

テストディスカバリは、以下の条件で自動でテストコードを検索します。

  • ファイル名がtest_*.py, *_test.py
  • テストメソッド、関数の名前が、test_*という形式
  • テストクラスの名前が、Test*という形式

つまり、設定内容と合わせると、
1. testsディレクトリの中のみ検索し、
2. .* *.egg sqlファイル、ディレクトリを無視し、
3. test_, _testがついたファイルを検索します。

テスト関連のファイルはまとめてtestsディレクトリに集約させています。

markers

使用するmark名の定義と、説明を記述します。

markerとは、その名の通り、テスト実行時にマーキングすることです。
任意の名前を指定してそのマークがついたテストのみを実行する為に使います。

今回は、
CircleCI実行用に、モックのみ使用しているテストにuse_mockをマーキングしています。

それでは、testsディレクトリにある実際のテストファイルについて触れていきます:arrow_down:

__init__.py

ディレクトリにテストファイルを分けるなら、条件反射で作成しましょう。
牛丼についてくる紅生姜のように、ジャイアンについてくるスネ夫のようにです。

pythonはパッケージ管理の際、パッケージディレクトリ内のinit.pyをまず見て、そのファイルに初期設定があれば読み込み、その後パッケージ内の処理を実行します。つまり、ディレクトリ内の初期設定ファイルとなります。
init.pyが何も書いてない場合は、なんの意味もない。。。ってわけではなく、存在するだけで名前解決してくれます。

異なるディレクトリに分けられた名前が同じメソッドを、同一プログラムで呼び出しても、init.pyがあれば、ディレクトリ名(パッケージ)により名前解決されて使用することができます、コイツは助かります!

conftest.py

テストにおける共通内容を書くファイルとなります、便利で大事なファイルです。
影響範囲は保存ディレクトリと、そのサブディレクトリ全てです。

改めて、後ほどfixtureで触れます。

fixtureとは

端的に、
テストをするための、準備+後片付けです。

少し補足すると、

  • setup
    テストの準備:テスト用のデータの追加や、前提条件を調整したりすること
  • teardown
    必要に応じてテストの後処理を行う

この2つを合わせてfixtureと言います。

今回のFlask+dockerのWebアプリでは、テストで使用するデータをfixtureで用意したいので、

  1. テスト用のDBコンテナを立ち上げる
  2. DBの初期化
  3. テスト用のデータを取り込む
  4. テストを実行する
  5. 後片付けとして、取り込んだテスト用データを破棄する

このテスト前後の工程をfixtureにやらせます。

どこにfixtureを書くか

conftest.pyに書く

テストファイルで共通して使いたい場合はconftest.pyに書きます。
簡単にfixtureとして取り込むことができるので、おすすめです。

必ず全テストに共通しなくてもかまいません。
テストが10個あるけど、その内2つのテストファイルだけに適用したい場合も、conftest.pyに書いておいて取り込むかどうかを後で制御するほうが良いです。後日、追加のテストファイルにも適用したいとか、一括でfixtureを変更したいなどよくあることだと思います。

ファイルをまたいでfixtureを使う可能性があるならconftest.pyに書いておきます。

テストコードに直接

そのテストファイル内だけでしか使わないのであれば、コードの見通しや編集しやすさから、テストファイル内に直接書くのもありです。
ただし、別ファイルで使う可能性が少しでも発生したら、面倒くさがらずconftest.pyに引っ越しさせましょう。後々その方が便利で楽です。

conftest.py

テストファイルで共通するfixtureを書きます。
ローカルのプラグインとも言えますね。
適用範囲はそのディレクトリと全てのサブディレクトリで利用可能になります。

コードは以下となります。

tests/conftest.py
import os
import pytest

import introdon
from introdon.scripts.db import InitDB


@pytest.fixture(scope='session')
def client():
    # 環境変数がTESTでないなら、エラー起こして止める
    if os.environ['FLASK_ENV'] != 'TEST':
        raise

    # form.validate_on_submit()解除に必要
    introdon.app.config['WTF_CSRF_ENABLED'] = False
    introdon.app.config['TESTING'] = True

    # データ全消去
    introdon.db.drop_all()
    # Tableの初期化
    InitDB().run()

    # テストDATAのインサート
    for line in open('tests/sql/introdon_users.sql'):
        introdon.db.session.execute(line)

    for line in open('tests/sql/introdon_songs.sql'):
        introdon.db.session.execute(line)

    for line in open('tests/sql/introdon_games.sql'):
        introdon.db.session.execute(line)

    for line in open('tests/sql/introdon_logs.sql'):
        introdon.db.session.execute(line)

    with introdon.app.test_client() as client:
        with introdon.app.app_context():
            yield client

    introdon.db.session.close()

ポイント解説です。

@pytest.fixture(scope='session')
fixtureの宣言はpytest.fixture()で宣言します。
scope='session'はfixtureの有効範囲を設定します。

  • session; テストコマンドを実行してから終わるまで
  • function: デフォルト、何も書かなければコレ。テスト関数ごとに毎回実行される。
  • class: テストクラスごと1回実行される。
if os.environ['FLASK_ENV'] != 'TEST':
    raise

Dockerでflaskのアプリコンテナを起動しているのですが、環境変数FLASK_ENV=TESTの時のみpytestを実行させるようにしています。本番、開発モードで間違ってtestが実行されるのを防ぐためです。

introdon.app.config['WTF_CSRF_ENABLED'] = False
flaskアプリでのフォーム入力で使用しているパッケージflask-wtfのCSRF対策機能をオフにするためにFalseに設定します。

# データ全消去
introdon.db.drop_all()
# Tableの初期化
InitDB().run()

余計なデータがあると、テスト結果が毎回変わってしまう危険性があるので、
テスト用のDBデータを全消去、初期化してキレイなDBテーブルを用意しています。

なお、docker-composeでコンテナを立ち上げる際に、環境変数FLASK_ENV=TESTにすることで、自動で以下の内容に切り替わるようにしています。
1. テストモードでコンテナを立ち上げる
2. テスト用にFlaskのモードを変更
3. テスト用のDBコンテナに接続して使用する。
4. コンテナが立ち上がったら、テストコマンドが自動で実行され、結果とともにcoverageも表示される

# テストDATAのインサート
for line in open('tests/sql/introdon_users.sql'):
  introdon.db.session.execute(line)

初期化されたテストDBにテストデータをインサートします。
使用するデータは予め用意したsqlで、内容は以下となります。

INSERT INTO introdon_test.logs (id, user_id, game_id, question_num, judge, score, correct_song_id, select_song_id, created_at) VALUES (1, 1, 1, 1, 1, 10, 2, 2, '2020-07-01 08:19:46');

1行ごとにSQL文の羅列なので、for文で行ごとに取り出し、
SQLAlchemyのsession.executeで直接SQL命令でデータをインサートしています。

with introdon.app.test_client() as client:
  with introdon.app.app_context():
    yield client

introdon.db.session.close()

yieldまでが、setupとなり、テスト実行の準備内容となります。
yieldclientインスタンスをテストコードに投げてテストの制御をテストコードに渡します。
このclientを引数で使用したテストコードが、このfixtureを使用するテストコードとなります。
テストコードが終了したら、制御が戻ってきてyield以降の、teardown箇所を実行します。

機能テスト(FuncTest)

ファンクショナルテスト、機能テスト。1つの機能をチェックするテストです。
「記事を投稿する、ユーザーを作成するなど」、一連のひとつの機能がちゃんと動くかのテストを指します。

tests/funcディレクトリにまとめています。
機能確認なので、テストDBから取得したデータを使ってテストしています。

以下は、ポイントを抜粋したコードです。

test_users

tests/func/test_users.py
#抜粋しています

import pytest

class TestUser():

    def login(self, client, username, password):
        return client.post('/', data=dict(
            username=username,
            password=password
        ), follow_redirects=True)

    def create_user(self, client, username, password):
        return client.post('/user/new', data=dict(
            username=username,
            password=password
        ), follow_redirects=True)

    invalided_users = (
        ('han', 'solo', '4文字以上10文字以内でお願いします'),
        ('DarthTyranus', 'DarthTyranus', '4文字以上10文字以内でお願いします'),
        ('Qui-Gon', 'Jinn', '半角英数のみでお願いします'),
        ('ダースモール', 'ダースモール', '半角英数のみでお願いします')
    )

    task_ids = ['{}-{}'.format(u[0], u[1]) for u in invalided_users]

    @pytest.mark.parametrize(['name', 'pw', 'comment'], invalided_users, ids=task_ids)
    def test_fail_login2(self, client, name, pw, comment):
        rv = self.login(client, name, pw)
        assert comment.encode() in rv.data

以下、ポイント解説です。

関数: login
条件を変えて頻繁にテストをする内容は、先に関数でまとめてしまってから組み込んだ方が見通しがスッキリします。
clientを引数にすることで、conftestで宣言したfixtureのyieldから制御が渡されます。
clientはflaskのテスト用インスタンスなので、clientを通してflaskアプリの挙動を確認できます。
client.postでそのページにアクセスしたhtmlページ内容を取得します。

タプル: invalided_users
テストで使用するパラメーターをタプルで事前に準備しています。
タプルで切り分けることで、複数のテストで使用できるので便利なのと、コード全体の見通しが良くなります。

task_ids = ['{}-{}'.format(u[0], u[1]) for u in invalided_users]
パラメータを使ってテストをした場合、パラメータの内容を用いてテスト結果の名前が自動で生成されます。
それはそれでいいのですが、日本語が文字化けしたり、すごく長くなったりと、分りづらい場合は、ユーザー指定することをおすすめします。

task_idsで、テスト結果名のフォーマットを定義しています。

@pytest.mark.parametrize(['name', 'pw', 'comment'], invalided_users, ids=task_ids)
    def test_fail_login2(self, client, name, pw, comment):
        rv = self.login(client, name, pw)
        assert comment.encode() in rv.data

この箇所がテスト関数となります。

@pytest.mark.parametarizeデコレーターでパラメータの取り込みを設定。
第1引数で、パラメータを順番にローカル変数名をつけています。
第2引数で、パラメータの内容
第3引数のids=task_idsでtask_idsのフォーマットをid,つまりテスト結果名のフォーマットに指定しています。

次のdef test_fail_login2がテスト関数の本体です。
test_の接頭辞でpytestがテストコードであると認識して実行します。
第2引数のclientでconftestのfixtureが使用できるようにしています。

rv.dataはflaskのテストからレスポンスされた内容で、上記の場合は、ログインの失敗ページになります。

最後で、テスト判定です。
assert comment.encode() in rv.data
それぞれのパラメータでcomment引数にした3番目のテキストをエンコードします。
エンコードした文字内容が、ログインの失敗ページに中に含まれるか否かを判定しています。

test_games

もうひとつの機能テストファイルも抜粋、ポイント解説します。

tests/func/test_games.py
#抜粋しています

import pytest

class TestGame():

  @pytest.fixture()
  def login_1111(self, client):
    return client.post('/', data=dict(
      username=1111,
      password=1111
    ), follow_redirects=True)

  #省略

  # みんなであそぶ:未指定
  @pytest.mark.usefixtures('login_1111')
  def test_start_game_multi(self, client):
    with client.session_transaction() as sess:
        sess['creatable'] = True

    rv = client.post('/game/start_multi', data=dict(
        artist='',
        genre='',
        release_from='1900',
        release_end='2100',
    ), follow_redirects=True)

    assert 'メンバー受付中・・'.encode() in rv.data

テストファイル内で、fixtureを設定しています。
  @pytest.mark.usefixtures('フィクスチャー名')
上記のデコレータを持つことで、テスト関数にfixtureを設定することができます。

テスト関数test_start_game_multiの処理は、

  1. fixtureのlogin_1111によってユーザーログイン
  2. セッション内容のsess['creatable']=Trueに設定
  3. 指定したページに任意のデータをポスト
  4. レスポンス内容をrvで受ける
  5. assertで任意のテキストが、レスポンス内容に含まれるか判定する。

上記のように、fixtureはひとつだけではなく複数設定することが可能です。

ユニットテスト(UnitTest)

関数やクラスなど、コードのごく一部をチェックするテストです。

一部のユニットテストを、ローカル環境のテスト実行だけでなく、CircleCIでテスト実行します。
そのため、テスト用のDBコンテナ利用だけではなく、モック使ってダミーデータを使ってテストします。

以下は、モック使用を抜粋したコードです。

test_units

tests/unit/test_units.py
#抜粋
import pytest

from introdon.models.songs import Song

@pytest.mark.use_mock
def test_add_song(self, mocker):
  # ダミーのレスポンス作成
  responseMock = mocker.Mock()
  responseMock.status_code = 404
  # requests.getの戻り値をpatch
  mock_res = mocker.patch('requests.get')
  mock_res.return_value = responseMock

  term = 'Shmi'
  attribute = ' Skywalker'
  limit = 9
  validate, status_code = Song.add_song(term, attribute, limit)

  assert validate == False
  assert status_code == 404

@pytest.mark.use_mock
マーカーuse_mockを利用しています。

テスト関数は以下です。
def test_add_song(self, mocker):
mockを利用してます。

mockとは仮の作り物の意味で、
pytestで端的に言えば、仮のデータを用意することです。

SQLやAPIでのデータの代わりに用意することが多いです。
pytest-mockパッケージの使用が便利なのでおすすめします。

mockを使ったテスト関数の流れです。

  1. mocker.Mock(): mockインスタンスを宣言(mockデータを入れるインスタンスを用意)
  2. mockインスタンスのアトリビュートにmock内容を入れる(ダミーデータを用意)
  3. mocker.path(requests.get) mockでパッチする関数を指定する(ダミーと差し替える関数を指定)
  4. mock_res.return_value = responseMock 置き換えを指定した関数の返り値をmockインスタンスにする(データを入れ替える)

上記の工程で、mockの準備ができました。
Song.add_song()メソッド内にあるrequests.get()関数の返り値が、mockのデータに差し替えられテストが実行されます。

最初は分かりにくいですが、慣れてしまえば、テストコード書くのがすごく便利になります。

ちなみに、テストのデータとしてmockを使用するためDBは使いません。
conftestのfixtureは不要のため、clientを引数に持つ必要ないです。

pytestの実行コマンド

目的や、環境に応じてpytestコマンドは複数使い分けています。

CIでクラウド環境、ローカルでTDDしながらテスト実行

use-mockがマーカーされているテストコードのみ実行します。

pytest -m use-mock

CircleCI上で実行しています。
マーカーでmock使用のみ切り分けてのユニットテストは、ローカル環境でTDDしながら開発していくのに使い勝手よさそうです。

全体の一括テスト

flask_env.sh
pytest --cov=introdon

コンテンツルートで実行します。

docker-composeで環境変数FLASK_ENV=TESTなら、
コンテナ起動後に、shellファイルで自動で一括テスト実行をしています。

--cov=introdon
coverageの有効範囲を指定しています。
coverageとは、テスト実行がコードの何パーセントを通っているか(確認しているか)の指標です。
pytestでcoverageを使うにはpytest-covをインストールしておきます。
上記オプションは、introdonディレクトリにおいてカバレッジを表示せよの意味です。

intellij で簡単テスト実行

IntelliJだけでなく、他IDEにも似た機能があると思いますので読み替えてください。
Intelliでテスト実行を1度設定すると、その後はクリック1,2回で簡単にテスト実行することができるようになります。楽なので激推しです:point_up_tone2:

設定の方法は、別記事をご参考ください
■Flask Webアプリのサクッと作れる『docker-compose構成』をまとめてみた

テスト実行は、簡単です。実行したいテストファイルを選択したら、実行ボタンを押すだけです。
スクリーンショット 2020-07-21 22.23.55.png

テスト結果はサイドバーで一覧表示されます。
スクリーンショット 2020-07-21 22.26.35.png
失敗したテストは何が原因でどこで失敗したのか。
失敗したテストファイルのリンクが表示されクリックだけで該当箇所にアクセスできます。
失敗したテストだけで再度テスト実行もでき、指定した時間間隔で自動テストを繰り返し実行することもできます。
過去のテスト結果内容も閲覧することも可能です。

さらに、
テストコードでもデバッグ可能です。
実行ボタン横の虫(バグ)ボタンのクリックでデバッグ実行です。
通常のコードと同様、テストコード自体もデバッグモードでコーディングできます。

さらに、さらに、
IntelliJの、coverage機能が見やすくて秀逸です。
バグボタンの更に右「盾ボタン」で実行できます。
スクリーンショット 2020-07-21 22.43.30.png
リストにして各ファイルや、ディレクトリが何パーセントか一覧表示してくれて、クリックで該当のファイルを開くこともできます。

スクリーンショット 2020-07-21 22.44.29.png
コードファイルは、どの箇所がテストが走って確認されていて、どこが未確認なのか色分け表示してくれます。

わかりやすい:eye:慣れてしまったらIntelliJ抜きでcoverageする気が無くなりそうです。

おわりに

正直言いますと、プロジェクトが9割以上完成した段階でテストコードを導入しました。
そのため、全然テスト書けてない。カバー率かなり低いです:skull:

テストコードの真価が発揮されるのは、やはりTDD(テストドリブン開発)だと思います。
1. まずは仕様をテストコードに落とし込み、
2. テストコードをクリアしながら、
3. プロジェクトを進行させる。
4. 更に新たに必要となったテストコードを追加しつつ、
5. またテストコードをクリアしつつプロジェクトを進行させる。

TDDは大きなメリットがあると思います。

  • プロジェクトの仕様をテストコードで明確にする。
  • プライオリティを意識しながら開発する。
  • プロジェクトコードの質をテストで随時チェックして、品質を保証して次の実装へと進む。

テスト無しだと、いつ・どこでおかしくなったのか特定が困難になったり、仕様から外れた機能を見当外れの優先度で作ってしまう危険性があります。身に覚えあります。。。

テストコードはプロジェクトのチェック機能として初期から導入するが良いです。
最初から、ガッツリと細かくテストコードを実装するよりも、
使うタイミングに応じて少しずつ慣れていき、増やしていけばいいと思います:raised_hands_tone4:

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

競プロ精進日記23日目~27日目(7/17~7/21)

感想

土日にAtCoderのコンテストがないのでコドフォに出ました。特に対策をしたわけではないですが青コーダーになることができました。AtCoderは肩の力が入りすぎてる気がするので、リラックスして臨むようにしたいです。
大学の課題のせいで精進計画が崩れ気味でしたが、直近のものは全て倒したのでここから立て直します。

23日目

ABC143-E Travel by Car

かかった時間

30分で投了、解説AC

間違えた原因

やっぱりもう一押しが足りないです。
集中力ややる気が欠けてしまっている時もあるし、やったことのあるパターンでもうまく応用できていないこともあるし、単純に知らないパターンの時もあります。しかし、多くの場合は最初のパターンで今回もそうなので、問題をやる時くらいTwitterチラチラ覗いたりとかはやめようと思いました(当たり前)。
こんな当たり前のことができないのは人間としてカスなので改善します。

考察

まず、あとのクエリで様々な頂点間を問われるので全点間での最小の燃料補給の回数を求めることを考えます。まず、辺の重みは非負なのでダイクストラ法を考えますが、最短距離の時に燃料補給も最小回数になるとは限りません。

つまり、ある頂点においてどれだけ燃料が残っているかが必要です。したがって、(燃料の補給回数,補給後に燃料をどの程度使ったか)を組にして考えれば、この組を最短距離と定義したダイクストラ法を行うことができます。しかし、ダイクストラ法に慣れていなかったため計算量の解析ができませんでした(普通のダイクストラ法と計算量的には同じです…)。

そこでより良い方法を考えようと思ったのですが、思いつくことができませんでした。

ここで、上記で難しいのが、それぞれの頂点で補給後に燃料をどの程度使ったかが必要であることです。つまり、ある頂点において燃料が足りているのか足りていないのかという二値に置き換えると問題が見通しやすくなります。さらに考察すれば、燃料が補給される頂点はどこかに注目できると思います。したがって、ある頂点から他の頂点へと移動する時に使用する燃料が$L$以下であれば補給する必要がなく、他の場合は途中の頂点で補給する必要があります。したがって、全点間での最小の燃料消費量を求めた時に$L$以下であるような頂点間はその最小の燃料の補給回数は1回となります。

以上より、WF法を用いて全点間の最小の燃料消費量を求めた後に、$0$の場合はそのまま$0$、$L$以下のものは1、$L$より大きいものはINFと書き換えて、再度WF法を行えば最小の燃料の補給回数を求めることができます。

また、WF法の方針は制約が$N=300$であることから発想できても良いかもしれません。さらに、グラフがそのままだと扱えないので題意に合うように作り直すと考えれば思いつきやすそうです。

Pythonのコード
abc143-E.py
n,m,l=map(int,input().split())
inf=100000000000
wf=[[inf]*n for i in range(n)]
for i in range(m):
    a,b,c=map(int,input().split())
    wf[a-1][b-1]=c
    wf[b-1][a-1]=c
for i in range(n):
    wf[i][i]=0
for k in range(n):
    for i in range(n):
        for j in range(n):
            wf[i][j]=min(wf[i][j],wf[i][k]+wf[k][j])
for i in range(n):
    for j in range(n):
        if i!=j:
            wf[i][j]=1 if wf[i][j]<=l else inf
for k in range(n):
    for i in range(n):
        for j in range(n):
            wf[i][j]=min(wf[i][j],wf[i][k]+wf[k][j])
q=int(input())
for i in range(q):
    s,t=map(int,input().split())
    x=wf[s-1][t-1]
    print(-1 if x==inf else wf[s-1][t-1]-1)

24,25日目

前半二日間で長めのレポートをダラダラとやっていたので、精進があまりできませんでした。また、25日目にはCodeforces Round #657 (Div. 2)に出ていました。その復習記事はこちらです。

ABC157-E Simple String Queries

かかった時間

20分で投了

間違えた原因

使ったこともないセグ木を使おうとしました。
結果的にセグ木の別解があったのでそちらも今度実装してみようと思います。
また、セグ木に囚われすぎなければ解けたと思います。

考察

まず、それぞれのクエリで区間を聞かれるので、区間の両端を累積和により先に求めることを考えますが、setを考えるので扱いにくそうです。次にとるべき方針はBITやセグ木などの区間を高速で計算するデータ構造を用いることであり今回はその方針でも行うことができます。

しかし、もしこのようなデータ構造を持っていない場合は別の方法を考える必要があります。一つのよく使う方法として、インデックスベースで要素を持つのではなく要素ベースでインデックスを持つことを考えます。今回は要素が26種類(アルファベット)しかないので、これが可能です。

つまり、それぞれのアルファベットについてそのアルファベットになるインデックスの集合を昇順で保存しておき、クエリで$l_q,r_q$に何種類の文字が含まれるかと聞かれたら、それぞれのアルファベットに対応するインデックスの集合中に$l_q$以上$r_q$以下のものが一つでも存在するかを探せば良いです。また、これはlower_bound($l_q$)で探したインデックスが$r_q$以下であればよく、$\log{n}$で検索することができます。

C++のコード
abc157e.cc
//コンパイラ最適化用
#pragma GCC optimize("Ofast")
//インクルード(アルファベット順)
#include<algorithm>//sort,二分探索,など
#include<bitset>//固定長bit集合
#include<cmath>//pow,logなど
#include<complex>//複素数
#include<deque>//両端アクセスのキュー
#include<functional>//sortのgreater
#include<iomanip>//setprecision(浮動小数点の出力の誤差)
#include<iostream>//入出力
#include<iterator>//集合演算(積集合,和集合,差集合など)
#include<map>//map(辞書)
#include<numeric>//iota(整数列の生成),gcdとlcm(c++17)
#include<queue>//キュー
#include<set>//集合
#include<stack>//スタック
#include<string>//文字列
#include<unordered_map>//イテレータあるけど順序保持しないmap
#include<unordered_set>//イテレータあるけど順序保持しないset
#include<utility>//pair
#include<vector>//可変長配列

using namespace std;
typedef long long ll;

//マクロ
//forループ関係
//引数は、(ループ内変数,動く範囲)か(ループ内変数,始めの数,終わりの数)、のどちらか
//Dがついてないものはループ変数は1ずつインクリメントされ、Dがついてるものはループ変数は1ずつデクリメントされる
#define REP(i,n) for(ll i=0;i<ll(n);i++)
#define REPD(i,n) for(ll i=n-1;i>=0;i--)
#define FOR(i,a,b) for(ll i=a;i<=ll(b);i++)
#define FORD(i,a,b) for(ll i=a;i>=ll(b);i--)
//xにはvectorなどのコンテナ
#define ALL(x) x.begin(),x.end() //sortなどの引数を省略したい
#define SIZE(x) ll(x.size()) //sizeをsize_tからllに直しておく
//定数
#define INF 1000000000000 //10^12:極めて大きい値,∞
#define MOD 1000000007 //10^9+7:合同式の法
#define MAXR 100000 //10^5:配列の最大のrange(素数列挙などで使用)
//略記
#define PB push_back //vectorヘの挿入
#define MP make_pair //pairのコンストラクタ
#define F first //pairの一つ目の要素
#define S second //pairの二つ目の要素
#define Umap unordered_map
#define Uset unordered_set



signed main(){
    ll n;cin>>n;
    char s[500000];cin>>s;
    //それぞれのアルファベットのインデックス
    vector<set<ll>> alph(26);
    vector<ll> a(n);
    REP(i,n){alph[ll(s[i]-'a')].insert(i);a[i]=ll(s[i]-'a');}
    ll q;cin>>q;
    vector<ll> ans;
    REP(_,q){
        ll x;cin>>x;
        if(x==1){
            //iを--し忘れていた
            ll i;char c;cin>>i>>c;i--;
            ll j=ll(c-'a');ll k=a[i];a[i]=j;
            //ここで永遠にバグらせた
            alph[k].erase(i);
            alph[j].insert(i);
        }else{
            ll ans=0;
            ll l,r;cin>>l>>r;l--;r--;
            REP(i,26){
                auto y=alph[i].lower_bound(l);
                if(y!=alph[i].end())if(*y<=r)ans++;
            }
            cout<<ans<<endl;
        }
    }
}

26,27日目

大学の授業のレポートを書いていました。競プロに関わることなのでこちらの記事にまとめました。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

IMAP4で受信したメールをJSONに変換するAPIサーバーをPythonで作る

概要

Pythonの標準ライブラリを使ってメールを受信し、JSON形式に変換して返すREST APIサーバーを作ったところ、シンプルで汎用性の高そうなコードが書けたので公開することにしました。

Pythonでメール受信する方法を調べると、レガシーなメソッドが使われたサンプルばかりがヒットして苦労したので、公式ドキュメントをよく読んで、とりあえず現時点(Python3.8)ではレガシーなメソッドが含まれないコードにしたつもりです。

ソースコード
https://github.com/sikkimtemi/imap_to_json

主な機能は以下の通りです。

  • IMAP4でメールを受信し、主要な項目をJSON形式で返す
  • 添付ファイルをローカルストレージに出力し、ファイル名と絶対パスを返す
  • 指定した日数より古いメールをサーバーから削除する

ソースコードの解説

モジュールのインポートと認証関連の定義

DOMAINはIMAP4サーバーのホスト名です。USER_IDPASSEORDはIMAP4のユーザーIDとパスワードです。
もしIMAPでなくPOP3を使いたい場合はimaplibの代わりにpoplibをインポートしてください。

import datetime
import email
import os
import imaplib
import shutil
from email import policy
from email.parser import BytesParser
from email.utils import parsedate_to_datetime
from flask import Flask, jsonify, request
from flask_restful import Resource, Api

DOMAIN = 'xxxxxxxxxxxxxxxxxxxxxxxxxxx'
USER_ID = 'xxxxxxxxxxxxxxxxxxxxxxxxxxx'
PASSWORD = 'xxxxxxxxxxxxxxxxxxxxxxxxxxx'

IMAP4サーバーへの接続とメールの検索

IMAP4サーバーに接続し、認証してメールの検索を行うまでの処理は以下のようになります。IMAP4ではメールを取得する前にメールボックスの選択と検索が必要です。メールボックスは通常はINBOXという名前ですが、ほかの名前でメールボックスを設定している場合は明示的に指定する必要があります。検索オプションは下記のリンクが参考になります。

https://www.atmarkit.co.jp/fnetwork/rensai/netpro09/imap4-searchoption.html

検索オプションを省略した場合は、UNSEENを設定して未読メールだけを読み込むようにしています。

class FetchMail(Resource):
    '''
    メール受信用クラス。
    '''

    def get(self):
        '''
        メールを受信し、内容と添付ファイルの情報をJSON形式で返す。
        '''

        result = []

        # IMAP4の検索条件
        # 設定可能な内容は下記を参照
        # https://www.atmarkit.co.jp/fnetwork/rensai/netpro09/imap4-searchoption.html
        search_option = request.args.get('option')

        # 検索条件を指定しない場合は未読メールの検索とする
        if not search_option:
            search_option = 'UNSEEN'

        # メールサーバーに接続
        cli = imaplib.IMAP4_SSL(DOMAIN)

        try:
            # 認証
            cli.login(USER_ID, PASSWORD)

            # メールボックスを選択(標準はINBOX)
            cli.select()

            # 指定されたオプションを用いてメッセージを検索
            status, data = cli.search(None, search_option)

            # 受信エラーの場合はエラーを返して終了
            if status == 'NO':
                print('受信エラー')
                res = {
                    'status': 'ERROR'
                }
                return jsonify(res)

メールの解析

検索したメールを1件ずつ解析していく処理です。ByteParserはデフォルトでは引数がpolicy=policy.compat32になり、戻り値としてemail.message.EmailMessageではなくemail.message.Messageが返ってくるので注意が必要です。(今回一番ハマったところです)

メールヘッダーの大抵の項目はEmailMessagegetメソッドで取得することができます。日付情報はSun, 19 Jul 2020 22:00:08 +0900 (JST)のような文字列として取得することできますが、これでは扱いにくいのでparsedate_to_datetimeを用いてPythonのDatetime形式に変換しています。また、このままだとJSTとUTCが混在していて不便なので、タイムゾーンをJSTに統一する処理も入れています。

CommonUtilの各メソッドについては後述します。

出力されるJSONデータの項目は以下のようになっています。

項目 内容
msg_id メールのMessage-Id。通常は一意の値になっている。
header メールヘッダーをすべて結合したテキスト。
from メールのFrom。
to メールのTo。
cc メールのCc。
subject メールのタイトル。
date メールの作成日。
time メールの作成時刻。
format メール本文のフォーマット。「text/plain」や「text/html」が入る。
charset メール本文の文字コード。「iso-2022-jp」や「utf-8」などが入る。
body メール本文。
attachments 添付ファイルの情報が配列で格納される。

attachmentsの配列の中身は以下のような構造になっています。

項目 内容
file_path 添付ファイルのファイルパス。絶対パスで記述。
file_name 添付ファイルのファイル名。
            # メールの解析
            for num in data[0].split():
                status, data = cli.fetch(num, '(RFC822)')
                msg = BytesParser(policy=policy.default).parsebytes(data[0][1])
                msg_id = msg.get('Message-Id', failobj='')
                from_ = msg.get('From', failobj='')
                to = msg.get('To', failobj='')
                cc = msg.get('Cc', failobj='')
                subject = msg.get('Subject', failobj='')
                date_str = msg.get('Date', failobj='')
                date_time = parsedate_to_datetime(date_str)
                if date_time:
                    # タイムゾーンを日本国内向けに上書き
                    date_time = date_time.astimezone(datetime.timezone(datetime.timedelta(hours=9)))
                date = date_time.strftime('%Y/%m/%d') if date_time else ''
                time = date_time.strftime('%H:%M:%S') if date_time else ''
                header_text = CommonUtil.get_header_text(self, msg)
                body, format_, charset = CommonUtil.get_main_content(self, msg)
                attachments = CommonUtil.get_attachments(self, msg, num)
                json_data = {}
                json_data['msg_id'] = msg_id
                json_data['header'] = header_text
                json_data['from'] = from_
                json_data['to'] = to
                json_data['cc'] = cc
                json_data['subject'] = subject
                json_data['date'] = date
                json_data['time'] = time
                json_data['format'] = format_
                json_data['charset'] = charset
                json_data['body'] = body
                json_data['attachments'] = attachments
                result.append(json_data)
            res = {
                'status': 'OK',
                'result': result
            }
            return jsonify(res)

        except Exception as error:
            print(error)
            res = {
                'status': 'ERROR'
            }
            return jsonify(res)

        finally:
            cli.close()
            cli.logout()

下記のcurlコマンドを実行すると呼び出すことができます。

curl http://127.0.0.1:8080/fetchmail

検索オプションを指定する場合は下記のようにします。この例では2020年7月21日以降に届いたメッセージを取得しています。

curl "http://127.0.0.1:8080/fetchmail?option=SINCE+21-JUL-2020"

共通処理クラス ヘッダー情報の結合

メールヘッダーを文字列として返すメソッドが標準では用意されていなかったので自作しました。改行区切りで結合しているだけです。

class CommonUtil():
    '''
    共通処理クラス。
    '''

    def get_header_text(self, msg:email.message.EmailMessage):
        '''
        ヘッダー部をまとめて文字列として返す。
        '''

        text = ''
        for key, value in msg.items():
            text = text + '{}: {}\n'.format(key, value)
        return text

共通処理クラス メール本文、フォーマット、文字コード情報の取得

ほとんどのメールについては、get_bodyget_contentを組み合わせればメール本文を取得することができます。本文のフォーマット情報(text/plaintext/html)はget_content_typeで、文字コードの情報はget_content_charsetでそれぞれ取得できます。

get_bodyは賢いメソッドですが、たまに例外が発生します。例外になるパターンを調べるたところ、設定された文字コードと実際の文字コードが異なる場合にエラーになるようです。ほかのパターンもあるかもしれませんが・・・。そこでリカバリー処理としてメッセージオブジェクトをwalkメソッドでひとつずつ走査していき、フォーマットがtext/plainのものが見つかったらデコードせずに返すことにしました。デコードしていないので当然受信側では文字化けしますが、何も返さないよりは良いだろうと判断しました。文字化けした本文は手動で正しい文字コードに変換してやれば解読は可能です。

    def get_main_content(self, msg:email.message.EmailMessage):
        '''
        メール本文、フォーマット、キャラクターセットを取得する。
        '''

        try:
            body_part = msg.get_body()
            main_content = body_part.get_content()
            format_ = body_part.get_content_type()
            charset = body_part.get_content_charset()

        except Exception as error:
            print(error)
            main_content = '解析失敗'
            format_ = '不明'
            charset = '不明'
            # get_bodyでエラーになるのは文字コード設定がおかしいメールを受信した場合なので、
            # decodeせずにテキスト部分をそのまま返す。
            for part in msg.walk():
                if part.get_content_type() == 'text/plain':
                    format_ = part.get_content_type()
                    main_content = str(part.get_payload())
                    charset = part.get_content_charset()

        return main_content, format_, charset

共通処理クラス 添付ファイルの出力

添付ファイルをローカル環境に出力し、ファイル名と絶対パスを返すメソッドです。同一のファイル名があっても上書きされないように、メール番号でディレクトリを作成してからファイルを出力しています。今回の要件ではAPIサーバーをクライアントアプリと同じ環境で動かす想定だったのでファイルパスを返していますが、もしリモート環境にサーバーを設置したい場合はBase64で変換するなどして、添付ファイルのオブジェクトそのものを返すようにすればよいかと思います。

この実装では添付ファイルの出力時に例外が発生するケースが確認されています。特にWindows環境で日本語の添付ファイルを出力しようとしたときに多く発生するようで、軽く調べたところファイル名に制御文字が含まれているような動きになるようです。必要であればリカバリ処理を入れてください。

    def get_attachments(self, msg:email.message.EmailMessage, b_num:bytes):
        '''
        添付ファイルが存在する場合はファイルに出力し、ファイルパスとファイル名を返す。
        ファイルはメール番号のディレクトリに格納する。
        '''

        files = []
        for part in msg.iter_attachments():
            try:
                filename = part.get_filename()
                if not filename:
                    continue
                # メール番号でディレクトリを作成
                new_dir = os.path.join('./tmp/{}/'.format(b_num.decode('utf-8')))
                os.makedirs(new_dir, exist_ok=True)
                # 添付ファイルを出力
                file_path = os.path.join(new_dir, filename)
                with open(file_path, 'wb') as fp:
                    fp.write(part.get_payload(decode=True))
                # ファイルパス(絶対パス)とファイル名を保存
                abs_path = os.path.abspath(file_path)
                files.append({'file_path':abs_path, 'file_name':filename})

            except Exception as error:
                print(error)

        return files

共通処理クラス 添付ファイル出力用ディレクトリのクリア

上記の処理で一時的に出力した添付ファイルはクライアントアプリが取得したら用済みになるので、ディレクトリごと削除するメソッドも作りました。

    def clean_tmp_dir(self):
        '''
        tmpディレクトリを空にする。
        '''

        shutil.rmtree('./tmp')
        os.mkdir('./tmp')

下記のクラスから呼び出しています。

class Clean(Resource):
    '''
    tmpディレクトリの掃除用クラス。
    '''

    def get(self):
        '''
        tmpディレクトリを空にする。
        '''

        CommonUtil.clean_tmp_dir(self)

        res = {
            'status': 'OK'
        }

        return jsonify(res)

curlコマンドで呼び出す場合の例です。

curl http://127.0.0.1:8080/clean

メールサーバー上のメールを削除

IMAPは基本的にサーバー上にメールデータを保持しますが、古いメールを削除したい場合はよくあるので、削除用のメソッドも作りました。

    def delete(self):
        '''
        メールを検索し、対象のメッセージをメールサーバーから削除する。
        '''

        # 日数の指定(指定された日数以前のメールが削除対象となる)
        data = request.json
        days = data.get('days')

        # 未指定時は90日を指定
        if not days:
            days = 90

        # 検索条件の生成
        target_date = datetime.datetime.now() - datetime.timedelta(days=days)
        search_option = target_date.strftime('BEFORE %d-%b-%Y')

        # メールサーバーに接続
        cli = imaplib.IMAP4_SSL(DOMAIN)

        try:
            # 認証
            cli.login(USER_ID, PASSWORD)

            # メールボックスを選択(標準はINBOX)
            cli.select()

            # 指定されたオプションを用いてメッセージを検索
            status, data = cli.search(None, search_option)

            # 受信エラーの場合はエラーを返して終了
            if status == 'NO':
                print('受信エラー')
                res = {
                    'status': 'ERROR'
                }
                return jsonify(res)

            # 対象メッセージの削除
            for num in data[0].split():
                cli.store(num, '+FLAGS', '\\Deleted')
            cli.expunge()

            res = {
                'status': 'OK',
            }
            return jsonify(res)

        except Exception as error:
            print(error)
            res = {
                'status': 'ERROR'
            }
            return jsonify(res)

        finally:
            cli.close()
            cli.logout()

curlコマンドから呼び出す場合の例です。実行すると100日前までのメールがサーバーから削除されます。

curl -X DELETE -H 'Content-Type: application/json' -d '{"days":100}' http://127.0.0.1:8080/fetchmail

所感

標準ライブラリがよくできているおかげで、Pythonを用いたメール受信は意外と簡潔に書けることが分かりました。メール受信のコア部分の実装はわずか数十行です。
ただし、ドキュメントはかなり分かりにくいので、正しいメソッドを調べるのが大変でした。Pythonでメールを扱う人に、この記事が多少なりとも役に立てば幸いです。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

画像の中から特定の文字を探し出す問題との格闘

概要

こんな問題が出ていたのです。

くいなちゃんさんはTwitterを使っています_「うなぎゲーム!_縦_↓_、横_→_、ナナメ_↙↘__のどれかに、「うなぎ」の並びを見つけましょう! 見つけたら、答えは言わずに_「見つけた」_とだけリプください。 _※暇人専用__https___t_co_w1sfIjrA7c」___Twitter.png

https://twitter.com/kuina_ch/status/759362808685342720

この問題をどうやったら画像解析で解けるか?にチャレンジしてみました。その備忘録です。

まずはopenCV+Python環境を作ります。

tensorflowとかいろいろまとめて使いたかったのでいつもこんな感じのDockerfileからイメージを利用しています。

FROM continuumio/miniconda3
ENV DEBCONF_NOWARNINGS yes

RUN apt-get update -y

RUN conda update conda \
  && conda update --all \
  && conda install jupyter numpy numexpr pandas matplotlib scipy statsmodels scikit-learn tensorflow keras  && conda clean --all && conda install -c menpo opencv=3.4.2

※ 実は社内の勉強会でKerasを使ってVGGのディープラーニングの勉強会をするとのことだったので、その準備をしようとしていたらこの問題に出会ってしまい解くことにしました。

ここで使ったOpenCVの基本

画像の読み書きとグレースケール化

import cv2
import numpy as np

im = cv2.imread('img/unagi.jpeg')
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)

print(im, gray)
(base) root@27117ac02af7:/data# python sample.py
[[[255 255 242]
  [255 255 245]
  [255 255 248]
  ...
  [251 254 255]
  [253 251 255]
  [255 249 255]]

 [[255 255 254]
  [255 255 254]
  [255 255 254]
  ...
  [251 255 255]
  [253 252 255]
  [255 251 255]]

 [[241 253 255]
  [241 253 255]
  [241 255 255]
  ...
  [253 255 254]
  [253 255 255]
  [255 254 255]]

 ...

 [[255 255 254]
  [255 255 251]
  [255 255 249]
  ...
  [255 254 255]
  [255 255 254]
  [255 255 249]]

 [[255 254 255]
  [255 255 255]
  [255 255 252]
  ...
  [255 255 254]
  [255 255 249]
  [255 255 247]]

 [[250 255 255]
  [250 255 255]
  [251 255 252]
  ...
  [255 255 251]
  [255 255 247]
  [255 255 247]]] [[251 252 253 ... 254 252 251]
 [255 255 255 ... 255 253 253]
 [252 252 253 ... 254 255 254]
 ...
 [255 254 253 ... 254 255 253]
 [254 255 254 ... 255 253 253]
 [254 254 254 ... 254 253 253]]

テンプレートマッチ

複数物体のテンプレートマッチング が最高に参考になりました。

切り出した画像「う」をテンプレートファイルにします。

u.jpg

解像度は同じものを作らないとピクセル比較を行うテンプレートマッチでは発見することができないので注意しましょう。

import cv2
import numpy as np

# 問題の画像
im = cv2.imread("img/unagi.jpeg")
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
# 切り出した画像「う」をグレースケールに
tpl = cv2.imread("template/u.jpg", 0)
w,h = tpl.shape[::-1]

# テンプレートマッチ
res = cv2.matchTemplate(gray, tpl, cv2.TM_CCOEFF_NORMED)
# しきい値以上のものだけを残す
loc = np.where (res > 0.9)
for pt in zip(*loc[::-1]):
    # pt[0], pt[1] がそれぞれ検出された画像の左上のx,y座標となるので
    # テンプレートを一致させた分の幅・高さを足して、1pixelの赤線で矩形を書く
    cv2.rectangle(im, pt, (pt[0] + w, pt[1] + h), (255, 0, 0), 1)

cv2.imwrite("result.png", im)

結果

result.png

ここで注意したいのが、左上のほうにある「う」などが太い線で囲われていることです。

先のコードを下記のようにすると

for pt in zip(*loc[::-1]):
    cv2.rectangle(im, pt, (pt[0] + w, pt[1] + h), (255, 0, 0), 1)
    print( "({}, {}) => ({}, {})".format(pt[0], pt[0]+w, pt[1], pt[1]+h))
(6, 26) => (6, 29)
(56, 76) => (6, 29)
(57, 77) => (6, 29)
(132, 152) => (6, 29)
...

のように、同じ文字が二回検出され、少し座標がずれた状態となっています。
この問題もあり、「縦に並んでいるかどうか」などを判別するのがちょっと難しいなと思いました。

縦横斜めに並んでいるかどうかを調べる

ここからはOpenCVは関係ありません。ここからはかなり乱暴なのですが、とりあえず、「画像の中心同士を比較して、距離が長すぎないこと・角度が斜め45度に近いこと」 を条件とすることにしました。

画像の中心点座標は

    for pt in zip(*loc[::-1]):
        cv2.rectangle(im, pt, (pt[0] + w, pt[1] + h), template["color"], 1)
        centers[template["name"]].append((pt[0] + w//2, pt[1] + h//2))

のようにして左上から幅の半分、高さの半分を足した値としてます(厳密でなくていいので整数として扱っています)

点の距離と角度

はじめ、座標 c1(x1, y1)と c2(x2, y2) なので、 三平方の定理で math.sqrt((x2-x1)**2 + (y2-y1)**2) としかけましたが、

def distance(c1, c2):
    d = math.hypot(c2[0]-c1[0], c2[1]-c1[1])
    return d

どうせmathモジュールを使うのであれば hypot を使えばよかったようです。

同じく角度は

def direction(c1, c2):
    # c1から見たときの点c2の位置が右か右下か下かを調べる
    theta = math.atan2(c2[0]-c1[0], c2[1]-c1[1])
    # 90: 右, -90:左。真下が0になる座標系
    return math.degrees(theta)

で求めることができます。thetaはラジアンなのでdegrees変換しておきました。

最終形

  • 「う」「な」「ぎ」のテンプレートを探索し、中心座標を全件リストにして持っておく
  • 座標同士を比較。距離はテンプレートの幅の2倍を超えない
  • 最初に「う」「な」を比較して、「な」「ぎ」がほぼ同様の角度に存在するかチェックする

という条件を書き効率が悪いですが全件検索をして

unagi_solver.py
import cv2
import numpy as np
import math
threshold = 0.9

im = cv2.imread('img/unagi.jpeg')
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)

templates = [
        {"name": "u", "path": "template/u.jpg", "color": (0,0,255)},
        {"name": "na", "path": "template/na.jpg", "color": (255,0,0)},
        {"name": "gi", "path": "template/gi.jpg", "color": (0,255,0)},
]
points = {}
centers = {"u": [], "na":[], "gi": [] }

def distance(c1, c2):
    d = math.hypot(c2[0]-c1[0], c2[1]-c1[1])
    return d

def direction(c1, c2):
    # c1から見たときの点c2の位置が右か右下か下かを調べる
    theta = math.atan2(c2[0]-c1[0], c2[1]-c1[1])
    # 90: 右?, -90:左
    return math.degrees(theta)

for template in templates:
    tpl = cv2.imread(template["path"], 0)
    w,h = tpl.shape[::-1]

    res = cv2.matchTemplate(gray, tpl, cv2.TM_CCOEFF_NORMED)
    loc = np.where (res > threshold)
    for pt in zip(*loc[::-1]):
        cv2.rectangle(im, pt, (pt[0] + w, pt[1] + h), template["color"], 1)
        centers[template["name"]].append((pt[0] + w//2, pt[1] + h//2))




for u in centers["u"]:
    # uを用いて naの中から距離が一定以下のものを抽出する
    for na in centers["na"]:
        if distance(u, na) < 50 and direction(u,na) > -90 and direction(u, na) < 90:
            for gi in centers["gi"]:
                if distance(na, gi) < 50 and abs(direction(u, na) - direction(na, gi)) < 10:
                    # 見つかったら中心同士を5pixelの黒線でつなぐ
                    cv2.line(im, u, na, (0, 0, 0), 5)
                    cv2.line(im, na, gi, (0, 0, 0), 5)

cv2.imwrite("result.png", im)

下記のように該当する部分に線を引くことができました(もちろん下記に線を引いた答えは大嘘です)

貼り付けた画像_2020_07_22_15_18.png

もっと効率よく探索する方法などもあると思いますが、とりあえず。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Discord.pyで、あれこれをやりたいためのサンプルコード集

まえがき

この記事はDiscord.pyを使用してBOTを作る際に役立つかもしれないサンプルソースコードを掲載しています。

いずれもベストプラクティスであるとは限りませんが、DiscordのBOT作りの参考になればいいなという思いで、記事にしました。

環境

BOTが反応する頭文字を変えたい

Botインスタンス作成時にcommand_prefixという引数に頭文字を指定します。

from discord.ext import commands

bot = commands.Bot(
  command_prefix="!"
)

標準搭載のヘルプ機能を無効にしたい

Botインスタンス作成時にhelp_commandという引数をNoneにします。

from discord.ext import commands

bot = commands.Bot(
  help_command=None
)

Botアカウントに「~をプレイ中」と表示したい

discord.Gameインスタンスを作成し、Botの接続完了時にchange_presenceで変更します。

import discord
from discord.ext import commands

bot = commands.Bot()

presence = discord.Game("Qiita") # Qiitaをプレイ中

@bot.event
async def on_ready():
  await bot.change_presence(activity=presence)

特定のリアクションをしたらロールを付与、解除したら剥奪する

イベントハンドラのon_raw_reaction_addon_raw_reaction_removeを使用してロールの付与・剥奪を処理します。

from discord.ext import commands

class RoleCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot # commands.Botインスタンスを代入

    @commands.Cog.listener()
    async def on_raw_reaction_add(self, payload):
        if payload.member.bot: # BOTアカウントは無視する
            return

        if payload.channel_id != 123456789123: # 特定のチャンネル以外でリアクションした場合は無視する
            return

        if payload.emoji.name == "?": # 特定の絵文字
            await payload.member.add_roles(
                payload.member.guild.get_role(123456789123) # ロールID
            )

    @commands.Cog.listener()
    async def on_raw_reaction_remove(self, payload):
        guild = self.bot.get_guild(payload.guild_id)
        member = guild.get_member(payload.user_id)
        if guild is None or member is None: # サーバーやメンバー情報が読めなかったら無視
            return

        if member.bot: # BOTアカウントは無視する
            return

        if payload.channel_id != 123456789123: # 特定のチャンネル以外でリアクションを解除した場合は無視する
            return

        if payload.emoji.name == "?": # 特定の絵文字
            await payload.member.remove_roles(
                payload.member.guild.get_role(123456789123) # ロールID
            )

「~が入力中」と表示させる

async with ctx.typing():(ctxはテキストチャンネル)を使用することで入力中のサインを表示できます。

import asyncio
from discord.ext import commands

class TypingCog(commands.Cog):
    @commands.command()
    async def delay_send(self, ctx):
        async with ctx.typing(): # 送られてきたチャンネルで入力中と表示させる
            await asyncio.sleep(5) # 重い処理をwith内で書く。(一例)
            await ctx.send("処理完了")

カテゴリを取得したい

テキストチャンネル等と同様に、get_channel関数でカテゴリを取得することができます。

from discord.ext import commands

bot = commands.Bot()

@bot.event
async def on_ready():
    category = bot.get_channel(123456789123) # カテゴリのID

コマンドにDiscordのメンバーが予め入ったものを使用したい

メンバーを入れる引数の後にdiscord.Memberを指定すると、Memberインスタンスとして代入されます。
なお、メンバーが見つからなかった場合は例外commands.BadArgumentが送出されます。
エラーハンドリングは後述にて説明します。

import discord
from discord.ext import commands

class CommandArgCog(commands.Cog):
    @commands.command()
    async def say_hello(self, ctx, target: discord.Member): # targetにメンバー指定する
        await ctx.send(f"Hello, {target.mention}!")

各コマンドのエラーハンドリングを行いたい

@{コマンドの関数名}.errorというデコレータ付け、引数にctxerrorを定義します。
ちなみに、コマンド処理内でエラーが発生すると、commands.CommandInvokeErrorが送出されます。

import discord
from discord.ext import commands

class ErrorHandlingCog(commands.Cog):
    @commands.command()
    async def say_hello(self, ctx, target: discord.Member):
        await ctx.send(f"Hello, {target.mention}!")

    @say_hello.error
    async def say_hello_error(self, ctx, error):
        if isinstance(error, commands.BadArgument): # errorはBadArgumentか
            await ctx.send(f"{ctx.message.author.mention} ターゲットを指定して下さい。もしくはユーザーが見つかりませんでした。")
            return

あとがき

紹介したのは一例でしたが、BOT製作の役に立てたら幸いです。

気が向いたらまたこの記事を更新するかもしれません。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PowerBIにおけるPythonビジュアルの役割と注意点

対象の人

PowerBIのビジュアル眺めてて「お、Py?」ってなった人

PowerBIにおいてのPythonの使いどころ

  • [取得] データ取得するところ
  • [編集] クエリとしてPythonを実行したいとき
  • [視覚化] PowerBI既定やカスタムビジュアルで実装していないとき

20200722.png

PowerBIではインタラクティブな可視化を得意としていますが、予測値などは表示できません。
データをもとに機械学習して予測値を出して視覚化したいときなど場合はPythonの出番になります。
ただし処理が遅いということもありますし、インタラクティブではない点を考えると、あらかじめ予測したデータソースをもって視覚化したほうが使い勝手が良いのかもしれません。(個人的な意見です)

となると、PowerBIにおけるPythonって、現時点においての私としてはやっぱりビジュアルなのかなーと思っています。
BI標準やカスタムで実装されていないグラフで視覚化したい、、とか。
seabornとか使うとキレイに表現できたりします。

Pythonビジュアルとは

Pythonで作図したものです。
PowerBIの既定ビジュアルやカスタムビジュアルでは表現できないグラフは、Pythonビジュアルを使って表現することができます。
matplotlibやseabornなどのライブラリが代表的で、データ加工して作図なんてことも可能です。
ただし注意すべき点もあるようです。

PowerBIサービスでは「Pro」ライセンスが必要

下図は単純な棒グラフです。(左側:Pythoビジュアル、右側:積み上げ横棒グラフ)
PowerBI Desktopで作成しています。
コメント 2020-07-20 174845.png

PowerBI Desktopでは問題なく作成できて、表示もできています。
これをPowerBIサービスに発行する場合はライセンスに注意です。
Free版だと、Pythonビジュアルで作成したグラフはこのとおり。
コメント 2020-07-20 180139.png

Pythonビジュアルで作成したグラフが表示されていません。
PowerBI DesktopではOKだが、PowerBIサービスはProライセンスが必要になります($9.99/月)。

Proライセンスだと、PowerBIサービス上でも表示できます。
でも表示に時間がかかります。(単純なグラフでも)
コメント 2020-07-22 135103.png

現状、Web公開には対応していない

Pythonビジュアルは現状、Web公開もサポートしていない様子です。
でもPowerBI DesktopからのPDFエクスポートはOKです。(表示だけになりますが)
コメント 2020-07-20 180106.png

Pythonビジュアル起点での絞り込みなどはできない

PowerBIといえば、相互作用が便利で、一方のビジュアルのあるデータをクリックすると、他のビジュアルも連動してデータを絞り込み表示したりできます。
これが、Pythonビジュアル起点だと無理なようです。
ただし、他のビジュアルやスライサーなどの起点の場合は、Pythonビジュアルも絞り込みなどが行われます。

現時点では、ProライセンスがあればPowerBIサービスでは利用できるし、Web公開さえ我慢すればPythonビジュアルならではの表現で相手に伝えることができます。

個人的見解

表示速度などを踏まえた場合、なるべくPowerBI標準ビジュアルやカスタムビジュアル優先で、どうしても表現できない場合にPythonビジュアルを使用するような順位付けになりそうです。

Power BIでサポートされているPythonパッケージ

※投稿時点

パッケージ 概要
matplotlib Pythonで静的、アニメーション、インタラクティブな視覚化を作成するための包括的なライブラリ
numpy Pythonを使用した配列計算の基本パッケージ
pandas データ分析、時系列、統計のための強力なデータ構造
scikit-learn 機械学習とデータマイニング用のPythonモジュールのセット
scipy Python用の科学ライブラリ
seaborn 統計データの視覚化、見た目はキレイ
stasmodels Pythonの統計計算とモデル
xgboost XGBoost Pythonパッケージ

参考
https://docs.microsoft.com/ja-jp/power-bi/connect-data/service-python-packages-support

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonによるマルコフ・スイッチングモデル

1. 経済データの「レジーム」

ITバブル崩壊やリーマンショックなど、経済に急激で大きな変化が生じた時とき、背景で根本的に何らかの状態が変化してしまっていると考えられる場合がある。このような根本的な状態の変化モデルを「レジームスイッチングモデル」という。
変化した状態が観測不可能な場合、レジームスイッチをモデル化することは難しそうである。しかし、レジームの確率分布をパラメータ付きで与え、レジーム間の遷移をマルコフ連鎖と捉えることで、モデル化が可能になる。MCMCによって尤度を最大化するパラメータ(正規分布の平均・標準偏差、レジームの遷移確率)を求めることができるのである。

2. マルコフ・スイッチングモデルの解説

レジーム付き線形回帰

以下の線形回帰モデルのレジームスイッチングモデルを考える。

\begin{eqnarray}
y\left( t \right) &=& x\left( t \right)\beta + \epsilon\left( t \right)\\
\epsilon\left( t \right)&\sim& N\left(0,\sigma^2\right)
\end{eqnarray}

各時刻$t$における第$1$〜第$n$レジームを以下のように表す。

\begin{eqnarray}
I\left(t\right) \in { 1,2,\cdots ,n}
\end{eqnarray}

レジームを観測できる場合、回帰式はレジームを添え字として以下のように表される。

\begin{eqnarray}
y\left( t \right) &=& x\left( t \right)\beta_{I\left( t\right)} + \epsilon\left( t \right)\\
\epsilon\left( t \right)&\sim& N\left(0,\sigma^2_{I\left( t\right)}\right)
\end{eqnarray}

このとき、$y\left( t\right)$は、平均$x\left( t \right)\beta_{I\left( t\right)}$、分散$\sigma^2_{I\left( t\right)}$の正規分布に従う。対数尤度を最大化する最尤法を適用する。すなわち、以下の尤度関数を最大とするパラメータを求める。

\begin{eqnarray}
\ln{\mathcal{L}}&=&\sum_{t=1}^{T}\ln{\{f\left( y\left( t\right)|I\left( t\right)\right)\} }\\
f\left( y\left( t\right)|I\left( t\right)\right)&=&\frac{1}{\sqrt{2\pi\sigma^2_{I\left( t\right)}}}\exp\left( -\frac{\{y\left( t\right)-x\left( t\right)\beta_{I\left( t \right)}\}^2}{2\sigma^2_{I\left( t\right)}}\right)
\end{eqnarray}

レジームを直接観測できない場合、$I\left( t\right)$も過去の履歴の条件付き確率分布に従うと考える。
時点$t$までに得られる情報を$\mathcal{F}\left( t\right)$とする。
$f\left( t\right)$は$\mathcal{F}\left( t\right)$に条件付けられないことに注意する。

\begin{eqnarray}
f\left( y\left( t\right) | \mathcal{F\left( t-1\right)}\right) &=& \sum_{I\left( t\right)=1}^{n}f\left( y\left( t\right),I\left( t\right) |\mathcal{F}\left( t-1\right)\right)\\
&=& \sum_{I\left( t\right)=1}^{n}f\left( y\left( t\right)|I\left( t\right),\mathcal{F}\left( t-1\right)\right)f\left( I\left( t\right)|\mathcal{F}\left( t-1\right)\right)\\
&=& \sum_{I\left( t\right)=1}^{n}\frac{1}{\sqrt{2\pi\sigma^2_{I\left( t\right)}}}\exp\left( -\frac{\{y\left( t\right)-x\left( t\right)\beta_{I\left( t \right)}\}^2}{2\sigma^2_{I\left( t\right)}}\right)
f\left( I\left( t\right)|\mathcal{F}\left( t-1\right)\right)
\end{eqnarray}

したがって、最大化する対数尤度は以下の通りである。

\begin{eqnarray}
\ln{\mathcal{L}}&=&\sum_{t=1}^{T}\ln{\left\{\sum_{I\left( t\right)=1}^{n}f\left( y\left( t\right)|I\left( t\right),\mathcal{F}\left( t-1\right)\right)f\left( I\left( t\right)|\mathcal{F}\left( t-1\right)\right)\right\} }\\

f\left( y\left( t\right)|I\left( t\right),\mathcal{F}\left( t-1\right)\right)&=&\frac{1}{\sqrt{2\pi\sigma^2_{I\left( t\right)}}}\exp\left( -\frac{\{y\left( t\right)-x\left( t\right)\beta_{I\left( t \right)}\}^2}{2\sigma^2_{I\left( t\right)}}\right)
\end{eqnarray}

マルコフ連鎖

$f\left( I\left( t\right)|\mathcal{F}\left( t-1\right)\right)$を与えることで、$y\left( t\right)$の確率分布が定まる。$I\left( t\right)$が1次のマルコフ性を満たすと仮定する。

\begin{eqnarray}
P\left( I\left( t\right)=i|\mathcal{F}\left( t-1\right)\right)
&=&\sum_{j=1}^{n}P\left( I\left( t\right)=i,I\left( t-1\right)=j|\mathcal{F}\left( t-1\right)\right)\\
&=&\sum_{j=1}^{n}P\left( I\left( t\right)=i|I\left( t-1\right)=j\right)P\left( I\left( t-1\right)=j|\mathcal{F}\left( t-1\right)\right)
\end{eqnarray}

状態jから状態iへの遷移確率$P\left( I\left( t\right)=i|I\left( t-1\right)=j\right)$を$p_{i,j}$と表記し、以下の遷移確率行列を考える。

\begin{eqnarray}
P &=&
\begin{pmatrix}
p_{1,1}&p_{1,2}&\cdots&p_{1,n}\\
p_{2,1}&\ddots&&\vdots\\
\vdots&&\ddots&\vdots\\
p_{n,1}&\cdots&\cdots&p_{n,n}
\end{pmatrix}
\end{eqnarray}

時点$t$までの情報が与えられたときの、レジームの条件付き確率$P\left( I\left( t\right)=j|\mathcal{F}\left( t\right)\right)$は、ベイズ更新によって求められる。

\begin{eqnarray}
P\left( I\left( t\right)=i|\mathcal{F}\left( t\right)\right)
&=&P\left( I\left( t\right)=i|\mathcal{F}\left( t-1\right), y\left( t\right)\right)\\
&=&\frac{f\left( I\left( t\right) =i,y\left( t\right)|\mathcal{F}\left( t-1\right)\right)}{f\left(y\left( t\right)|\mathcal{F}\left( t-1\right)\right)}\\
&=&\frac{f\left(y\left( t\right)|I\left( t\right)=i,\mathcal{F}\left( t-1\right)\right)f\left(I\left( t\right)=i|\mathcal{F}\left( t-1\right)\right)}{\sum_{i=1}^{n}f\left(y\left( t\right)|I\left( t\right)=i,\mathcal{F}\left( t-1\right)\right)f\left(I\left( t\right)=i|\mathcal{F}\left( t-1\right)\right)}\\
&=&\frac{\sum_{j=1}^{n}f\left(y\left( t\right)|I\left( t\right)=j,\mathcal{F}\left( t-1\right)\right)p_{i,j}f\left(I\left( t-1\right)=j|\mathcal{F}\left( t-1\right)\right)}{\sum_{i=1}^{n}\sum_{j=1}^{n}f\left(y\left( t\right)|I\left( t\right)=j,\mathcal{F}\left( t-1\right)\right)p_{i,j}f\left(I\left( t-1\right)=i|\mathcal{F}\left( t-1\right)\right)}
\end{eqnarray}

初期確率は、マルコフ連鎖の定常確率とする。遷移確率行列$P$、単位行列$I$、全要素が$1$のベクトル$1$とし、定常確率$\pi^{\ast}$は以下のように求められることが知られている。

\begin{eqnarray}
A &=& 
\begin{pmatrix}
I-P\\
1^{\mathrm{T}}
\end{pmatrix}\\

\pi^{\ast}&=&\left(A^{\mathrm{T}}A\right)^{-1}A^{\mathrm{T}}のM+1列目
\end{eqnarray}

マルコフ連鎖モンテカルロ法

対数尤度を最大化するパラメータを求める手法として、マルコフ連鎖モンテカルロ法(MCMC)を用いる。
ここでは解説を省略する。

提案されたパラメータは、レジームを区別する条件を満たす必要がある。
例えば、低リスクレジームと高リスクレジームに区別する場合、提案された$\sigma_1$、$\sigma_2$が$\sigma_1<\sigma_2$を満たす必要がある。
この条件は、個々のモデルごとに指定する必要がある点に注意する。

3. Pythonによるマルコフスイッチングモデルの実装

任意のレジーム数を対象としたマルコフ・スイッチングを実装する。

import numpy as np
import pandas as pd
from scipy import stats

import matplotlib.pyplot as plt
import japanize_matplotlib
%matplotlib inline

from tqdm.notebook import tqdm
regime = 3

#各レジームが従う正規分布の初期パラメータ
mu = [-1.4,-0.6,0]
sigma = [0.07,0.03,0.04]

レジームの遷移確率自体をMCMCで推定するのではなく、遷移確率をロジスティック関数値として、その引数をMCMCで推定する。その際、非対角成分に対してロジスティクス関数を適用し、対角成分は余事象として推移確率行列を生成する。
ロジスティクス関数の引数をgen_prob、ロジスティック関数値をprobとして別々に扱う。

gen_prob = np.ones((regime,regime))*(-3)
gen_prob = gen_prob - np.diag(np.diag(gen_prob))
#対角成分が0、非対角成分が-3の正方行列

prob = np.exp(gen_prob)/(1+np.exp(gen_prob))
#ロジスティック関数を適用

prob = prob + np.diag(1-np.dot(prob.T,np.ones((regime,1))).flatten())
#非対角成分の余事象として、対角成分の確率を求め、遷移確率行列とする。
# マルコフ連鎖の定常確率を求める関数
def cal_stationary_prob(prob, regime):
    # prob: 2-d array, shape = (regime, regime), 遷移確率行列
    # regime: int, レジーム数

    A = np.ones((regime+1, regime))
    A[:regime, :regime] = np.eye(regime)-prob

    return np.dot(np.linalg.inv(np.dot(A.T,A)),A.T)[:,-1]
# 対数尤度を計算する関数
def cal_logL(x, mu, sigma, prob, regime):
    # x: 1-d array or pandas Series, 時系列データ
    # mu: 1=d array, len = regime, 各レジームが従う正規分布の平均の初期値
    # sigma: 1=d array, len = regime, 各レジームが従う正規分布の分散の初期値
    # prob: 2-d array, shape = (regime, regime), 遷移確率行列
    # regime: int, レジーム数

    likelihood = stats.norm.pdf(x=x, loc=mu[0], scale=np.sqrt(sigma[0])).reshape(-1,1)
    for i in range(1,regime):
        likelihood = np.hstack([likelihood, stats.norm.pdf(x=x, loc=mu[i], scale=np.sqrt(sigma[i])).reshape(-1,1)])

    prior = cal_stationary_prob(prob, regime)

    logL = 0
    for i in range(length):
        temp = likelihood[i]*prior
        sum_temp = sum(temp)

        posterior = temp/sum_temp

        logL += np.log(sum_temp)

        prior = np.dot(prob, posterior)

    return logL
# 各時点が各レジームに属する確率を計算する関数
def prob_regime(x, mu, sigma, prob, regime):
    # x: 1-d array or pandas Series, 時系列データ
    # mu: 1=d array, len = regime, 各レジームが従う正規分布の平均の初期値
    # sigma: 1=d array, len = regime, 各レジームが従う正規分布の分散の初期値
    # prob: 2-d array, shape = (regime, regime), 遷移確率行列
    # regime: int, レジーム数

    likelihood = stats.norm.pdf(x=x, loc=mu[0], scale=np.sqrt(sigma[0])).reshape(-1,1)
    for i in range(1,regime):
        likelihood = np.hstack([likelihood, stats.norm.pdf(x=x, loc=mu[i], scale=np.sqrt(sigma[i])).reshape(-1,1)])

    prior = cal_stationary_prob(prob, regime)

    prob_list = []
    for i in range(length):
        temp = likelihood[i]*prior

        posterior = temp/sum(temp)

        prob_list.append(posterior)

        prior = np.dot(prob, posterior)

    return np.array(prob_list)
# MCMCで更新するパラメータを生成する関数
def create_next_theta(mu, sigma, gen_prob, epsilon, regime):
    # mu: 1=d array, len = regime, 各レジームが従う正規分布の平均の初期値
    # sigma: 1=d array, len = regime, 各レジームが従う正規分布の分散の初期値
    # gen_prob: 2-d array, shape = (regime, regime), ロジスティック関数の引数
    # epsilon: float, 提案するパラーメータの更新幅
    # regime: int, レジーム数

    new_mu = mu.copy()
    new_sigma = sigma.copy()
    new_gen_prob = gen_prob.copy()

    new_mu += (2*np.random.rand(regime)-1)*epsilon
    new_mu = np.sort(new_mu)
    new_sigma = np.exp(np.log(new_sigma) + (2*np.random.rand(regime)-1)*epsilon)
    new_sigma = np.sort(new_sigma)[[2,0,1]]
    new_gen_prob += (2*np.random.rand(regime,regime)-1)*epsilon*0.1

    new_gen_prob = new_gen_prob - np.diag(np.diag(new_gen_prob))
    new_prob = np.exp(new_gen_prob)/(1+np.exp(new_gen_prob))
    new_prob = new_prob + np.diag(1-np.dot(new_prob.T,np.ones((regime,1))).flatten())

    return new_mu, new_sigma, new_gen_prob, new_prob
#MCMCを実行する関数
def mcmc(x, mu, sigma, gen_prob, prob, epsilon, trial, regime):
    # x: 1-d array or pandas Series, 時系列データ
    # mu: 1=d array, len = regime, 各レジームが従う正規分布の平均の初期値
    # sigma: 1=d array, len = regime, 各レジームが従う正規分布の分散の初期値
    # gen_prob: 2-d array, shape = (regime, regime), ロジスティック関数の引数
    # epsilon: float, 提案するパラーメータの更新幅
    # trial: int, MCMCの実行回数
    # regime: int, レジーム数

    mu_list = []
    sigma_list = []
    prob_list = []
    logL_list = []
    mu_list.append(mu)
    sigma_list.append(sigma)
    prob_list.append(prob)

    for i in tqdm(range(trial)):
        new_mu, new_sigma, new_gen_prob, new_prob = create_next_theta(mu, sigma, gen_prob, epsilon, regime)

        logL = cal_logL(x, mu, sigma, prob, regime)
        next_logL = cal_logL(x, new_mu, new_sigma, new_prob, regime)

        ratio = np.exp(next_logL-logL)
        logL_list.append(logL)

        if ratio > 1:
            mu, sigma, gen_prob, prob = new_mu, new_sigma, new_gen_prob, new_prob

        elif ratio > np.random.rand():
            mu, sigma, gen_prob, prob = new_mu, new_sigma, new_gen_prob, new_prob

        mu_list.append(mu)
        sigma_list.append(sigma)
        prob_list.append(prob)

        if i%100==0:
            print(logL)

    return np.array(mu_list), np.array(sigma_list), np.array(prob_list), np.array(logL_list)

4. データ例

東京電力の超過収益率

期間は2003年〜2019年。マーケット株価はtopix、無リスク資産は日本国債(10年)とした。

超過収益率の時系列データをPandasのDataFramexとしている。

regime = 3

mu = [-1.4,-0.6,0]
sigma = [0.07,0.03,0.04]

gen_prob = np.ones((regime,regime))*(-3)
gen_prob = gen_prob - np.diag(np.diag(gen_prob))
prob = np.exp(gen_prob)/(1+np.exp(gen_prob))
prob = prob + np.diag(1-np.dot(prob.T,np.ones((regime,1))).flatten())

trial = 25000
epsilon = 0.1

mu_list, sigma_list, prob_list, logL_list = mcmc(x, mu, sigma, gen_prob, prob, epsilon, trial, regime)

prob_series = prob_regime(x, mu_list[-1], sigma_list[-1], prob_list[-1], regime)

各レジームの確率を積み上げ棒グラフとして可視化する。

fig = plt.figure(figsize=(10,5),dpi=200)
ax1 = fig.add_subplot(111)
ax1.plot(range(length), x, linewidth = 1.0, label="return")

ax2 = ax1.twinx()
for i in range(regime):
    ax2.bar(range(length), prob_series[:,i], width=1.0, alpha=0.4, label=f"regime{i+1}", bottom=prob_series[:,:i].sum(axis=1))

h1, l1 = ax1.get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
ax1.legend(h1+h2, l1+l2, bbox_to_anchor=(1.05, 1), loc='upper left')

ax1.set_xlabel('t')
ax1.set_ylabel(r'超過収益率')
ax2.set_ylabel(r'確率')
ax1.set_title(f"東京電力・高分散レジームである確率, epsilon={epsilon}, trial={trial}")

plt.subplots_adjust(left = 0.1, right = 0.8)
#plt.savefig("probability_regime2.png")
plt.show()

以下のグラフにおいて、高分散低収益率レジームを青、低分散低収益率レジームを黄色、中分散高収益レジームを緑とした。
probability_regime_with_excess_return.png

収益率ではなく、実際の株価をプロットしたのが下のグラフである。東日本大震災後の原発事故による株価の暴落は、レジームのスイッチではなくジャンプと推測されていることが分かる。
probability_regime_stock_price.png

参考文献

  • 小松高広(2018), 最適投資戦略 : ポートフォリオ・テクノロジーの理論と実践, 東京 : 朝倉書店
  • 沖本竜義(2014), マルコフスイッチングモデルのマクロ経済・ファイナンスへの応用, 日本統計学会誌, 44 (1), 137-157
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

多変量正規分布に従う乱数生成の背景にはCholesky分解がいた

分散共分散行列のChoresky分解で、独立な標準正規分布に従う乱数を、多変量正規分布に従う乱数に変換できる。
どゆこと?

1. 数学的背景

  • 確率変数の変数変換
  • 多次元正規分布の導出
  • 分散共分散行列の性質
  • Cholesky分解のアルゴリズム

を解説する。

1.1. 多次元確率分布の変数変換

確率変数

    \begin{eqnarray}
        X_1,\ X_2,\  \cdots ,\  X_n 
\end{eqnarray}

を、確率変数

\begin{eqnarray}
        Y_1,\ Y_2,\  \cdots ,\  Y_n 
    \end{eqnarray}

に変換する。
ただし、$X$と$Y$は、それぞれ

\begin{eqnarray}
        &f_X&\left(x_1,\ x_2,\ \cdots ,\ x_n\right)\\
        &f_Y&\left(y_1,\ y_2,\ \cdots ,\ y_n\right)
\end{eqnarray}

の確率密度関数をもつものとする。

各$Y$を、$X$の全単射として、

    \begin{eqnarray}
        \begin{cases}
            Y_1 = h_1\left(X_1,\ X_2,\  \cdots ,\  X_n\right)&\\
            Y_2 = h_2\left(X_1,\ X_2,\  \cdots ,\  X_n\right)&\\
            \ \vdots &\\
            Y_n = h_n\left(X_1,\ X_2,\  \cdots ,\  X_n\right)
        \end{cases}
    \end{eqnarray}

と表す。各$h_i$は全単射ゆえ逆関数が存在し、

    \begin{eqnarray}
        \begin{cases}
            X_1 = g_1\left(Y_1,\ Y_2,\  \cdots ,\  Y_n\right)&\\
            X_2 = g_2\left(Y_1,\ Y_2,\  \cdots ,\  Y_n\right)&\\
            \ \vdots &\\
            X_n = g_n\left(Y_1,\ Y_2,\  \cdots ,\  Y_n\right)
        \end{cases}
    \end{eqnarray}

と表される。$n$次元実数空間の任意の部分集合$D$に対して、積分の変数変換を施して、

    \begin{eqnarray}
        &\int\cdots\int_D& f_X\left(x_1,x_2,\cdots ,x_n\right) dx_1dx_2\cdots dx_n \nonumber \\
        = &\int\cdots\int_D& f_X\left(y_1,y_2,\cdots ,y_n\right)\left|\frac{\partial\left( x_1,x_2,\cdots x_n\right) }{\partial\left(y_1,y_2,\cdots y_n\right) }\right| dy_1dy_2\cdots dy_n 
    \end{eqnarray}

が成立する。
ただし、ヤコビアンを、

    \begin{eqnarray}
        \left|\frac{\partial\left( x_1,x_2,\cdots x_n\right) }{\partial\left(y_1,y_2,\cdots y_n\right) }\right| =
        \begin{pmatrix}
            \frac{\partial g_1}{\partial y_1}&\frac{\partial g_1}{\partial y_2}&\cdots&\frac{\partial g_1}{\partial y_n}\\
            \frac{\partial g_2}{\partial y_1}&\ddots&&\vdots\\
            \vdots &&\ddots&\vdots\\
            \frac{\partial g_n}{\partial y_1}&\cdots&\cdots&\frac{\partial g_n}{\partial y_n}
        \end{pmatrix}
    \end{eqnarray}

と表記している。

ここから、確率密度関数の変換式

    \begin{eqnarray}
        f_Y\left(y_1,y_2,\cdots ,y_n\right)
        = f_X\left(y_1,y_2,\cdots ,y_n\right)\left|\frac{\partial\left( x_1,x_2,\cdots x_n\right) }{\partial\left(y_1,y_2,\cdots y_n\right) }\right|

    \end{eqnarray}

を得る。

1.2. 多次元正規分布の導出

$1$次元標準正規分布を、$N\left(0,1\right)$と表記する。

互いに独立な$n$個の標準正規分布

    \begin{eqnarray}
        Z_1,\ Z_2,\cdots ,\ Z_n \sim N\left(0,1\right)\ \textrm{i.i.d}
    \end{eqnarray}

を定める。
これらの同時密度関数は、独立分布であるため積で表され、

    \begin{eqnarray}
        f_Z\left(z\right) = \frac{1}{\left(2\pi\right)^{n/2}} \exp\left(-\frac{1}{2}z^Tz\right)
    \end{eqnarray}

となる。

ここで、

    \begin{eqnarray}
        X &=& AZ + \mu
\end{eqnarray}

の変数変換を施す。
ただし、$Z,X$は$n$次元確率変数ベクトル、$A$は$n\times n$行列、$\mu$は$n$次元ベクトルである。

\begin{eqnarray}
        X&=&\left(X_1,\ X_2,\ \cdots\ X_n\right)^{\mathrm{T}} \nonumber \\
        Z&=&\left(Z_1,\ Z_2,\ \cdots\ Z_n\right)^{\mathrm{T}} \nonumber \\

        A&=&
        \begin{pmatrix}
            a_{1,1}&a_{1,2}&\cdots&a_{1,n}\\
            a_{2,2}&\ddots&&\vdots\\
            \vdots&&\ddots&\vdots\\
            a_{n,1}&\cdots&\cdots&a_{n,n}
        \end{pmatrix} \nonumber \\

        \mu&=&\left(\mu_1,\ \mu_2,\ \cdots\ \mu_n\right)^{\mathrm{T}} \nonumber
    \end{eqnarray}

変数変換のヤコビアンは、

    \begin{eqnarray}
        \left|\frac{\partial\left( x_1,x_2,\cdots x_n\right) }{\partial\left(z_1,z_2,\cdots z_n\right) }\right| &=& \det\left( A\right)
    \end{eqnarray}

より、

\begin{eqnarray}
        \left|\frac{\partial\left( z_1,z_2,\cdots z_n\right) }{\partial\left(x_1,x_2,\cdots x_n\right) }\right| &=& \frac{1}{\det\left( A\right)}

    \end{eqnarray}

を得る。

よって、$X$の同時密度関数は、

    \begin{eqnarray}
        f_X\left(x\right)&=&f_Z\left(A^{\mathrm{T}}\left(x-\mu\right)\right)\frac{1}{\det\left(A\right)} \nonumber \\
        &=&\frac{1}{\left(2\pi\right)^{n/2}\det\left(A\right)}\exp\left\{-\frac{1}{2}\left(x-\mu\right)^{\mathrm{T}}AA^{\mathrm{T}}\left(x-\mu\right)\right\}
    \end{eqnarray}

となる。

変換された確率変数の平均ベクトルと、分散共分散行列は、

    \begin{eqnarray}
        E\left[X\right]&=&AE\left[Z\right]+\mu \nonumber \\
        &=&\mu 
         \\
        Cov\left(X\right)&=&E\left[\left(X-\mu\right)\left(X-\mu\right)^{\mathrm{T}}\right] \nonumber \\
        &=&E\left[AZZ^{\mathrm{T}}A^{\mathrm{T}}\right] \nonumber \\
        &=&AE\left[Z\right] E\left[Z^{\mathrm{T}}\right] A^{\mathrm{T}} \nonumber \\
        &=&AA^{\mathrm{T}}
    \end{eqnarray}

分散共分散行列を、

    \begin{eqnarray}
        \Sigma = AA^{\mathrm{T}}
    \end{eqnarray}

と表記する。

これを適用して、多次元正規分布の同時密度関数

    \begin{eqnarray}
        f_X\left(x\right)
        &=&\frac{1}{\left(2\pi\right)^{n/2}\det\left(A\right)}\exp\left\{-\frac{1}{2}\left(x-\mu\right)^{\mathrm{T}}\left(AA^{\mathrm{T}}\right)^{-1}\left(x-\mu\right)\right\} \nonumber \\
        &=&\frac{1}{\left(2\pi\right)^{n/2}\det\left(\Sigma\right)^{1/2}}\exp\left\{-\frac{1}{2}\left(x-\mu\right)^{\mathrm{T}}\Sigma^{-1}\left(x-\mu\right)\right\}
    \end{eqnarray}

を得る。

1.3. 分散共分散行列の性質

分散共分散行列からCholesky分解によって乱数列を生成するにあたり、分散共分散行列が正定値であるという制約条件が必要になる。

ここでは、一般に分散共分散行列の性質を確認し、Cholesky分解可能性を確かめる。

ここで、線形代数の知識を確認する。

行列の半正定値性

以下は同値。
- $n\times n$対称行列$A$が半正定値である
- 任意のベクトル$x$に対して、$x^\mathrm{T}Ax\geq 0$
- $A$の任意の主小行列式が非負
- 全ての固有値は非負

行列の正定値性

以下は同値。
- $n\times n$対称行列$A$が正定値である
- 任意のベクトル$x$に対して、$x^\mathrm{T}Ax> 0$
- $A$の任意の主小行列式が正
- 全ての固有値は正

以下の主張を証明する。
分散共分散行列$\Sigma$は半正定値対称行列である。

分散$\sigma_i$、共分散$\rho_{i,j}$を成分とする$n\times n$分散共分散行列を$\Sigma$とする。
$\rho_{i,j}=\rho_{j,i}$より、$\Sigma$は対称行列である。
$a$を任意の$n$次元定数ベクトルとする。

\begin{eqnarray}
    a^{\mathrm{T}}\Sigma a &=& a^{\mathrm{T}}E\left[\left(X-E\left[X\right]\right)\left(X-E\left[X\right]\right)^{\mathrm{T}}\right]a \nonumber \\
    &=&E\left[a^{\mathrm{T}}\left(X-E\left[X\right]\right)\left(X-E\left[X\right]\right)^{\mathrm{T}}a\right] \nonumber \\
    &=&E\left[\langle a,X-E\left[X\right]\rangle^2\right] \nonumber \\
    &\geq&0 \nonumber
\end{eqnarray}

よって、$\Sigma$は半正定値である。

実用上、分散共分散行列は多くの場合で正定値対称となることが期待されるが、
正定値対称とならない場合は、以下の乱数生成ができないため注意。

1.4. Cholesky分解

正定値対称行列$A$に対して、下三角行列$L$を用いて分解を与えることを、Cholesky分解という。

    \begin{eqnarray}
        A&=&LL^{\mathrm{T}}
        \\
        \nonumber \\
        A&=&
        \begin{pmatrix}
            a_{1,1}&a_{1,2}&\cdots&a_{1,n}\\
            a_{2,2}&\ddots&&\vdots\\
            \vdots&&\ddots&\vdots\\
            a_{n,1}&\cdots&\cdots&a_{n,n}
        \end{pmatrix} \nonumber \\
        L&=&
        \begin{pmatrix}
            l_{1,1}&0&\cdots&\cdots&0\\
            l_{2,1}&l_{2,2}&0&&\vdots\\
            \vdots&&\ddots&\ddots&\vdots\\
            \vdots&&&l_{n-1,n-1}&0\\
            l_{n,1}&\cdots&\cdots&l_{n,n-1}&l_{n,n}
        \end{pmatrix} \nonumber
    \end{eqnarray}

Cholesky分解の構成法は、以下の通りである。

    \begin{eqnarray}
            l_{i,i}&=&\sqrt{a_{i,i}-\sum_{k=1}^{i-1}l_{i,k}^2}\ \ \ \ \ \ \left(\textrm{for}\ i=1,\cdots ,n\right) \\
            l_{j,i}&=&\left(a_{j,i}-\sum_{k=1}^{i-1}l_{j,k}\ l_{i,k}\right)/l_{i,i}\ \ \ \left(\textrm{for}\ j=i+1,\cdots ,n\right) 
    \end{eqnarray}

1.5. Cholesky分解による多変量正規分布に従う乱数列の生成

分散$\sigma_i$、共分散$\rho_{i,j}$を成分とする分散共分散行列$\Sigma$に対し、正定値の制約を設ける。

$\Sigma$に対してCholesky分解を適用して、$A=L$(下三角行列)を得る。

    \begin{eqnarray}
        x&=&Lz+\mu \\
        \nonumber \\
        L&=&
        \begin{pmatrix}
            l_{1,1}&0&\cdots&\cdots&0\\
            l_{2,1}&l_{2,2}&0&&\vdots\\
            \vdots&&\ddots&\ddots&\vdots\\
            \vdots&&&l_{n-1,n-1}&0\\
            l_{n,1}&\cdots&\cdots&l_{n,n-1}&l_{n,n}
        \end{pmatrix} \nonumber \\
        l_{i,i}&=&\sqrt{\sigma_{i}-\sum_{k=1}^{i-1}l_{i,k}^2}\ \ \ \ \ \ \left(\textrm{for}\ i=1,\cdots ,n\right) \nonumber \\
        l_{j,i}&=&\left(\rho_{j,i}-\sum_{k=1}^{i-1}l_{j,k}\ l_{i,k}\right)/l_{i,i}\ \ \ \left(\textrm{for}\ j=i+1,\cdots ,n\right) \nonumber
    \end{eqnarray}

2. 実装

multivariate_normal_distribution.py
import numpy as np

#平均
mu = np.array([[2],
               [3]])

#分散共分散行列
cov = np.array([[1,0.2],
                 0.2,1]])

L = np.linalg.cholesky(cov)
dim = len(cov)

random_list=[]

for i in range(n):
    z = np.random.randn(dim,1)
    random_list.append((np.dot(L,z)+mu).reshape(1,-1).tolist()[0])

いろいろ遊んでみた。
norm_dist1.png

norm_dist2.png

norm_dist3.png

norm_dist4.png

(数字は分散共分散行列の要素を表しています。)
plot_num.png

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonで差分計測用のStopWatchを作る

すでに多くの記事でPythonを使った処理時間計測方法が提示されているので、詳しい内容は割愛。
自分で使いやすいように、下記の通りStopWatchを作ってみる。

PCで及びPython処理時間計測する事自体にあまり精度が期待できないので、ある程度自分が信じる精度を決めておけば今後の計測において役にたつかなぁと思い、何度か実験しましたが ms 程度の精度で十分な気もするので少数第4位くらいまでで経過時間を戻す仕様としてみました。
何点か同時に測定できるように、識別子付きのStopWatchとなっております。

StopWatch.py
import time

class StopWatch():
    def __init__(self, name=None):
        self.name = name
        self.start_time = 0
        self.stop_time = 0

    def start(self):
        self.start_time = time.perf_counter()

    def stop(self):
        self.stop_time = time.perf_counter()
        tm = self.stop_time - self.start_time
        return(round( tm, 4))

    def clear(self):
        self.start_time = 0

    def isName(self):
        return self.name


if __name__ == "__main__":
    sw1 = StopWatch("A")
    sw2 = StopWatch("B")    
    sw1.start()
    time.sleep(1)

    print(sw1.stop())
    sw2.start()
    for i in range(1000000):
        i=i+0.01
    print(sw1.name + ":" + str(sw1.stop()))
    print(sw2.name + ":" + str(sw2.stop()))    


  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python】Skyfieldで衛星軌道予測

はじめに

以前、こちらの記事で、Pyorbitalを使った衛星の軌道計算についてご紹介しました。
しかし、PyorbitalのライセンスはGPLなので、GPLのパッケージを使うのはちょっと・・・
という方向けに、今回はSkyfieldを使って衛星の軌道計算をやってみました。

Skyfieldとは

Pythonの天文学計算パッケージです。
地球の周りの軌道にある星、惑星、衛星の位置などを計算することができます。
ライセンスはMITライセンスです。

環境

  • Windows 10
  • Python 3.7.3
  • skyfield 1.23

インストール

$ pip install skyfield

使ってみた

TLE読み込み

衛星のTLEを用意。今回はCELESTRAKから地球観測衛星AquaのTLEを入手。

aqua.txt
AQUA                    
1 27424U 02022A   20203.40846355  .00000031  00000-0  17013-4 0  9997
2 27424  98.2101 143.5537 0000452 148.6720 321.9438 14.57110132968816

tle_read.py
from skyfield.api import load


satellite = load.tle_file("./aqua.txt")[0]  # TLE読み込み
sgp4_model = satellite.model

print("衛星名: ", satellite.name)
print("元期: ", satellite.epoch.utc_jpl())
print("衛星番号: ", sgp4_model.satnum)
print("エポック年: ", sgp4_model.epochyr)
print("エポック日: ", sgp4_model.epochdays)
print("エポックのユリウス日: ", sgp4_model.jdsatepoch)
print("ndot: ", sgp4_model.ndot)
print("nddot: ", sgp4_model.nddot)
print("弾道抗力係数 B*: ", sgp4_model.bstar)
print("軌道傾斜角[rad]: ", sgp4_model.inclo)
print("昇交点赤経[rad]: ", sgp4_model.nodeo)
print("離心率: ", sgp4_model.ecco)
print("近地点引数[rad]: ", sgp4_model.argpo)
print("平均近点角[rad]: ", sgp4_model.mo)
print("平均運動[rad/min]: ", sgp4_model.no_kozai)

実行結果

衛星名: AQUA
元期: A.D. 2020-Jul-21 09:48:11.2507 UT
衛星番号: 27424
エポック年: 20
エポック日: 203.40846355
エポックのユリウス日: 2459051.5
ndot: 9.39326507149726e-13
nddot: 0.0
弾道抗力係数 B*: 1.7013e-05
軌道傾斜角[rad]: 1.714089603712883
昇交点赤経[rad]: 2.505484718420184
離心率: 4.52e-05
近地点引数[rad]: 2.5948159055250097
平均近点角[rad]: 5.618979316382121
平均運動[rad/min]: 0.06357842341892297

衛星の位置情報取得

satellite_position.py
from skyfield.api import load


satellite = load.tle_file("./aqua.txt")[0]  # TLE読み込み

ts = load.timescale()  # タイムスケールを生成
now = ts.now()
geocentric = satellite.at(now)  # 地球中心を基準としたx, y, zの位置を取得
subpoint = geocentric.subpoint()  # 緯度、経度、高度情報に変換

print("時刻: ", now.utc_jpl())
print("緯度[degree]", subpoint.latitude.degrees)
print("経度[degree]", subpoint.longitude.degrees)
print("高度[m]", subpoint.elevation.m)

実行結果

時刻: A.D. 2020-Jul-22 00:49:04.0105 UT
緯度[degree]: 29.340257418205187
経度[degree]: 16.29265032283678
高度[m]: 705898.890496908

パス時刻の取得

get_pass_time.py
from datetime import datetime, timedelta, timezone

from skyfield.api import load, Topos


satellite = load.tle_file("./aqua.txt")[0]  # TLE読み込み

ts = load.timescale()  # タイムスケールを生成

# 観測地点(東京タワー)の位置情報を設定
tokyo_tower_pos = Topos(
    latitude_degrees=35.66, longitude_degrees=139.75, elevation_m=333
)

start = datetime.now(timezone.utc)
end = start + timedelta(days=1)

# 24時間以内に観測点(東京タワー)から衛星が見える時間を計算(仰角5度以上)
# パス開始時刻、最大仰角時刻、パス終了時刻の情報が取得できる
t, events = satellite.find_events(
    tokyo_tower_pos, ts.utc(start), ts.utc(end), altitude_degrees=5.0
)

difference = satellite - tokyo_tower_pos
for ti, event in zip(t, events):
    name = ("パス開始: ", "最大仰角: ", "パス終了: ")[event]
    print(name, ti.utc_strftime("[UTC] %Y %b %d %H:%M:%S"))
    if event == 1:
        topocentric = difference.at(ti)
        alt, _, _ = topocentric.altaz()  # 観測点から観た衛星の仰角を取得
        print("最大仰角[degree]: ", alt.degrees)
    if event == 2:
        print()

実行結果

パス開始:  [UTC] 2020 Jul 22 03:30:00
最大仰角:  [UTC] 2020 Jul 22 03:35:40
最大仰角[degree]:  58.23825328597088
パス終了:  [UTC] 2020 Jul 22 03:41:22

パス開始:  [UTC] 2020 Jul 22 05:09:57
最大仰角:  [UTC] 2020 Jul 22 05:13:44
最大仰角[degree]:  11.91191755398313
パス終了:  [UTC] 2020 Jul 22 05:17:32

パス開始:  [UTC] 2020 Jul 22 15:33:05
最大仰角:  [UTC] 2020 Jul 22 15:37:48
最大仰角[degree]:  18.970133046864028
パス終了:  [UTC] 2020 Jul 22 15:42:30

パス開始:  [UTC] 2020 Jul 22 17:10:10
最大仰角:  [UTC] 2020 Jul 22 17:15:37
最大仰角[degree]:  37.15713012973206
パス終了:  [UTC] 2020 Jul 22 17:21:03

まとめ

Pyorbitalでできることは、Skyfieldでも大体できそう。
注意点としては、計算に使うタイムスケールオブジェクトをdatetimeから生成するときに、タイムゾーンをUTCに設定しとかないと怒られます。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む