20210731のPythonに関する記事は30件です。

Gunicornの設定ファイルの書き方

はじめに DjangoやFlaskなどのPythonアプリケーションを動かすwsgiサーバ、Gunicornの設定ファイルの書き方についてまとめてみました。 Gunicornの設定ファイルはPythonのスクリプトとして、各種設定を変数や関数として列挙していく形となる。 中身はPythonスクリプトなので通常のソースコードと同じように、ライブラリをimportしたり、Pythonのプログラムコードを記述することも可能。 環境 Python 3.9 Gunicorn 20.1 設定ファイル Gunicornでよく使う設定値としては、下記のようなものがある。 settings.py # # Gunicorn config file # wsgi_app = 'hogeapp.wsgi:application' # Server Mechanics #======================================== # current directory chdir = '/work/python/gunic/gu' # daemon mode daemon = False # enviroment variables raw_env = [ 'ENV_TYPE=dev', 'HOGEHOGE_KEY=xxxxxxxxxxxxxxxxxxxxxxxxx' ] # Server Socket #======================================== bind = '0.0.0.0:8000' # Worker Processes #======================================== workers = 2 # Logging #======================================== # access log accesslog = '/work/python/gunic/gu/logs/access.log' access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' # gunicorn log errorlog = '-' loglevel = 'info' wsgi_app wsgiのアプリケーションオブジェクト名。module_name:value形式でWSGIモジュールを指定する。 chdir Gunicornプロセスが動作する際のカレントディレクトリ。 WSGIアプリケーションの起点となるホームディレクトリを設定しておく。 daemon デーモン(バックグラウンド)モードで起動するか否かの設定値。 Trueではバックグラウンドでデーモンとして動作するが、Falseではフォアグラウンドでの動作となる。 raw_env Gunicornのプロセスで使用する環境変数を指定する。WSGIのPythonアプリケーションで使用することもできるため、アプリ内で必要な環境変数類はここで定義する。 KEY=VALUE形式の文字列で配列で記述する。 bind GunicornのListenポートとして待ち受けるポートを記載。 ネットワーク:ポートの形で指定する。 workers WSGIのワーカープロセス数を指定する。 accesslog Gunicornのアクセスログのファイル名を指定する。-を指定すると標準出力に出力される。 これはApacheやnginxのログようなクライアントからのリクエストに関するアクセスログとなる。 access_log_format アクセスログのフォーマット形式。 設定できるパラメータ変数は公式ドキュメントを参照。 LTSVやJSON(ndjson)形式でログを書き出したい場合は、ここで各フォーマットに適合した形式で設定することで行うことができる。 errorlog Gunicornログのファイル名。-を指定すると標準出力に出力される。 errorlogという名前になっているが、エラーだけでなくGunicorn本体の動作全般のログとなる。 loglevel errorlog のログレベルを指定する。 指定可能なレベルは、debug、info、warning、error、criticalが指定可能。 設定ファイルの使い方 通常 gunicornコマンドに、--configオプションで作成した設定ファイルを指定することで、使用することができる。 $ gunicorn --config /path/to/app/settings.py gunicornコマンドは--configオプションと併せて任意のオプションも利用可能。 設定ファイル内で設定された項目と重複して指定した場合、オプション引数側が優先されるため、これを利用して設定値を部分的に差し替えたりすることも可能。 $ gunicorn --config /path/to/app/settings.py --chdir /path/to/dev/app 仮想環境(venv)を利用する場合 Gunicornに限ったことでは無いが、仮想環境(venv)にインストールしたコマンドを利用するには、仮想環境ディレクトリ内にあるGunicornコマンドを利用する。 今回の場合、仮想環境内のgunicornを利用するには、仮想環境ディレクトリ内にあるgunicornコマンドを実行することで、使用することができる。 この場合、Gunicornで実行するアプリケーションも仮想環境の構成を利用して起動される。 $ /path/to/venv/bin/gunicorn --config /path/to/app/settings.py 参考 Settings — Gunicorn 20.1.0 documentation
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ffmpeg-pythonで動画をトリミング(切抜き)

前回の続きのつもりでしたが、簡単すぎて思いの外やることがありませんでした。 input.mp4の開始5秒後を起点に30秒間を切り抜いてoutput.mp4として出力するなら以下のような感じ。 import ffmpeg stream = ffmpeg.input("input.mp4", ss=5, t=30).output("output.mp4") ffmpeg.run(stream) もちろんss=やt=に変数を渡してやってもいいし、何なら小数渡しちゃってもOK。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Qiitaエンジニアフェスタ_ブラウザ選手権

Qiitaエンジニアフェスタ_ブラウザ選手権 https://qiita.com/official-events/be8c5ab5a9ddf90055cd 参加記事です。 過去の事故記事と、最近書いたり、つぶやいたものを集め、 自分の中での選手権を初めてみます。 できれば、1週に2項目書いて、8/25までには10項目の順位をつけたいかなって思っています。 1. play with docker で python 最近は、Pythonといえばdockerで利用している。 あなたもdocker, 私もdocker https://qiita.com/kaizen_nagoya/items/8f2746f10f30b575d0a8 dockerがうまく導入できていない方には、 ウェブでの利用をお勧めしている。 65歳からのプログラミング入門。docker(126)転職(16) https://qiita.com/kaizen_nagoya/items/1561f910c275b22d7c9f play with docker www.play-with-docker.com Docker hubのIDとパスワードを入れる。 「ADD new instance」をクリックする。 文字列をコピペする docker run -it ubuntu /bin/bash apt update apt -y upgrade apt -y install python3-dev python3 最初のpython1行プログラムをインタプリタで入力する。文字列のコピペ LangPro100-2015-00 >>> print("stressed"[::-1]) desserts みごとに逆順になった。 「>>>」 の行が入力、その次の行が出力。 文字を逆順にするのがこんなに簡単。 ここまで1時間で来れれば大丈夫。 自分では、下記のように、ありとあらゆるエラーにみまわれ、ここまで来るのに2時間、 この資料をまとめるのに1時間かかっている。泣) 参考資料(reference) 第11回 TOPPERS活用アイデア・アプリケーション開発コンテスト https://qiita.com/kaizen_nagoya/items/91162a9b258a2a06f5e0 TOPPERSに関連する操作なら、この部門への投稿の表題を変えて、TOPPERS活用アイデア・アプリケーション開発コンテストに応募するのも手かも。 あなたもdocker, 私もdocker https://qiita.com/kaizen_nagoya/items/8f2746f10f30b575d0a8 今まで書いてよかった技術書を紹介しよう! https://qiita.com/kaizen_nagoya/items/d31b7c158541d345a7ef 開発環境を豊かにする開発事例 過去・現在・未来 https://qiita.com/kaizen_nagoya/items/d9bf0c2c671fe7f1c749 Microsoftとの歴史 Cコンパイラを中心に https://qiita.com/kaizen_nagoya/items/d7c0cc257e99de0573cf TOPPERS ソースを積み上げよう https://qiita.com/kaizen_nagoya/items/65c15aed086f2da0928d
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

因果推論 Uplift Modeling X Learner for Multiple Treatments【Python】

はじめに この記事は、Uplift Modeling for Multiple Treatments with Cost Optimization の内容をベースとしています。 余談ですが、執筆時2021年7月31日より幾月前からUber EatsさんがYouTube上で音声のない広告を流しています。とてもユーザの的を得ているように思えその発想にアルゴリズムが関与しているのであればとても興味深いです。 X-Learner & Extending X-Learner for Multiple Treatments X-Learner と Extending X-Learner for Multiple Treatments それぞれのアルゴリズムを紹介します。 X-Learner の考え方は 2 つの段階に分けられます。 1 段階目では Treatment / Control それぞれの効果 $\mu_{1}, \ \mu_{0}$ を推定。 2 段階目では既に推定された効果と結果変数の差分を利用し条件付き期待値を算出。さらに傾向スコアで重みづけをすることで精度を高めています。 CATE の推定において「CATE がスパースな場合」「CATE の滑らかさ」「Treatment / Control 群の間に偏りがある場合」に対しある程度の対応をすることができます。 Extending X-Learner for Multiple Treatments は X-Learner を拡張し複数介入のケースにも対応できるようにした Meta-Learner のアルゴリズムです。 これはつまり「X-Learner は $t_j ∈ \{t_0, t_1\}$ を考えている」のに対し「Extending X-Learner for Multiple Treatments は $t_j ∈ \{t_0, t_1, ..., t_m\} \ (j ∈ \mathbb{Z})$ を考えている」ということを意味しています。 数式からも見て取れるように考え方の流れは X-Learner と同じです。 X-Learner “pseudo-effects” ${D_{i}}$ for the observations in the control group as \tilde{D_{i}^{0}} = \widehat{\mu }_{1}\left( x\right) - Y_{i}^{0} \tag{1} and for the individuals in the treatment groups as \tilde{D_{i}^{1}} = Y_{i}^{1} - \widehat{\mu }_{0}\left( x\right) \tag{2} where ${Y_{i}}$ is the observed value for the user. The pseudo-effects are then used as the outcome in another pair of regression methods to obtain the response functions ${\tilde{\tau_{1}^{}}(x)}$ and ${\tilde{\tau_{1}^{}}(x)}$ for the control and treatment groups, respectively. \begin{aligned}\widehat{\tau }\left( x\right) =e\left( x\right) \widehat{\tau }_{0}\left( x\right) +\left( 1-e\left( x\right) \right) \widehat{\tau }_{1}\left( x\right) \end{aligned} \tag{3} where ${e(x)}$ is the propensity score $P[W_{i} = 1 \ | \ X_{i} = x]$ with ${W_{i}}$ indicating the treatment assignment. Extending X-Learner for Multiple Treatments Consider an experiment with a control group and m treatment groups. Here, we use any suitable regression method to estimate the response functions under each group: \begin{aligned}\mu _{t_{j}}\left( x\right) = \mathbb{E}[ Y\left( t_{j}\right) \ | \ X=x] \end{aligned} \tag{4} where $t_j ∈ \{t_0, t_1, ..., t_m\}$ with t0 denoting the control group. We estimate ${\mu_{t_{0}}(x)}$ using data from the control group and${\mu_{t_{1}}(x)}$ using data from the j-th treatment group. We then proceed to estimate the pseudo-effects analogously, {\begin{equation} \begin{split} \tilde{D_{i}^{t_{0}}} = \hat{\mu} _{t_{j}}(x) - Y_{i} \\ \tilde{D_{i}^{t_{j}}} = Y_{i} - \hat{\mu} _{t_{0}}(x) \end{split} \end{equation} } \tag{5} where ${\tilde{D_{i}^{t_{0}}}}$ is estimated using control group data and ${\tilde{D_{i}^{t_{j}}}}$ is estimated using the data from the j-th treatment group. Finally, as in the two-group experiment, we use the pseudo-effects as the outcomes in another pair of regressions to obtain $\hat{\tau} _{t_0}$ and $\hat{\tau}_{t_j}$. In the multiple treatment group case, we need to estimate the propensity score e_{t_{j}}(x) \ = \ \mathbb{P}[W_{i} = t_{j} \ | \ X = x] \tag{6} for each m experiment groups. Focusing on the case in which we compare each treatment group against the control, we then estimate the CATEor a given treatment group as follows : \hat{\tau}^{t_{j}}(x) = \dfrac{\hat{e}_{t_j}(x)}{\hat{e}_{t_j}(x) + \hat{e}_{t_0}(x)}\hat{\tau}_{t_0}(x) \ + \ \dfrac{\hat{e}_{t_0}(x)}{\hat{e}_{t_j}(x) + \hat{e}_{t_0}(x)}\hat{\tau}_{t_j}(x) \tag{7} Finally, to predict the best treatment group for an individual, we estimate $\tau^{t_j}$ as \hat{\tau}^{t_{j}}(X_i) = \dfrac{\hat{e}_{t_j}^{(-1)}(X_i)}{\hat{e}_{t_j}^{(-1)}(X_i) + \hat{e}_{t_0}^{(-1)}(X_i)}\hat{\tau}_{t_0}^{(-1)}(X_i) + \dfrac{\hat{e}_{t_0}^{(-1)}(X_i)}{\hat{e}_{t_j}^{(-1)}(X_i) + \hat{e}_{t_0}^{(-1)}(X_i)}\hat{\tau}_{t_j}^{(-1)}(X_i) \tag{8} where notation of the form $\hat{e} _{t_j}^{(-1)}$ indicates that $\hat{e}_{t_j}$ has been estimated without using the $i$-th observation. We then simply compare which treatment group gives the highest predicted uplift for the individual and recommend that as the treatment group if the costs of treatments are equal. Implementation Check versions of libraries import pandas import numpy import scipy import sklearn import lightgbm import causalml library_dict = {0: 'pandas', 1: 'numpy', 2: 'scipy', 3: 'sklearn', 4: 'lightgbm', 5: 'causalml'} for num, library in enumerate([pandas, numpy, scipy, sklearn, lightgbm, causalml]): print('{}: v{}'.format(library_dict[num], library.__version__)) >> pandas: v1.3.1 numpy: v1.18.5 scipy: v1.4.1 sklearn: v0.23.2 lightgbm: v3.1.1 causalml: v0.10.0 X Learner execution class import pandas as pd import numpy as np import sys import logging logger = logging.getLogger() handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) logger.addHandler(handler) logger.setLevel(logging.INFO) import matplotlib.pyplot as plt %matplotlib inline from copy import deepcopy from sklearn.model_selection import KFold, StratifiedKFold from sklearn.metrics import mean_squared_error as mse from sklearn.metrics import log_loss from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier import lightgbm as lgb from causalml.dataset import synthetic_data, get_synthetic_summary_holdout from causalml.propensity import compute_propensity_score from causalml.metrics import plot, auuc_score from causalml.inference.meta import BaseXRegressor class XLearner(object): def __init__(self, covariance_df: pd.core.frame.DataFrame, intervention_df: pd.core.frame.DataFrame) -> None: """ covariance_df: Covariate DataFrame. intervention_df: Intervention DataFrame. If there are two types of interventions, three types of data will be stored together with the control. """ self.covariance_df = covariance_df self.intervention_df = intervention_df self.groups = list(self.intervention_df.iloc[:, 0].unique()) self.num_groups = len(self.groups) def propensity_score_RF(self, n_splits: int, kfold_type: str): self.n_splits = n_splits self.kfold_type = kfold_type self.preds_rf = {group_num: np.zeros(self.covariance_df.shape[0]) for group_num in range(self.num_groups)} self.logloss_rf_train = {group_num: [] for group_num in range(self.num_groups)} self.logloss_rf_valid = {group_num: [] for group_num in range(self.num_groups)} if self.kfold_type == 'KFold': self.c_v_instance = KFold(n_splits = self.n_splits, shuffle = False) self.split = self.c_v_instance.split(self.covariance_df) elif self.kfold_type == 'StratifiedKFold': self.c_v_instance = StratifiedKFold(n_splits = self.n_splits, shuffle = True) self.split = self.c_v_instance.split(self.covariance_df, self.intervention_df) else: raise Exception("kfold_type {} is invalid argument".format(self.kfold_type)) logger.info("RandomForestClassifier is progressing ...") # self.valid_index_list = list() for n_fold, (train_index, valid_index) in enumerate(self.split): logger.info('{} Cross Validation {}'.format(self.kfold_type, n_fold)) # self.valid_index_list.extend(list(valid_index)) X_train, z_train = self.covariance_df.iloc[train_index], self.intervention_df.iloc[train_index] X_valid, z_valid = self.covariance_df.iloc[valid_index], self.intervention_df.iloc[valid_index] model_rf = RandomForestClassifier() model_rf.fit(X_train, z_train.values.ravel()) pred_prob_train = {group_num: model_rf.predict_proba(X_train)[:, group_num] for group_num in range(self.num_groups)} logloss_train = {group_num: log_loss(z_train, pred_prob_train[group_num]) for group_num in range(self.num_groups)} [self.logloss_rf_train[group_num].append(logloss_train[group_num]) for group_num in range(self.num_groups)] pred_prob_valid = {group_num: model_rf.predict_proba(X_valid)[:, group_num] for group_num in range(self.num_groups)} logloss_valid = {group_num: log_loss(z_valid, pred_prob_valid[group_num]) for group_num in range(self.num_groups)} [self.logloss_rf_valid[group_num].append(logloss_valid[group_num]) for group_num in range(self.num_groups)] for group_num in range(self.num_groups): self.preds_rf[group_num][valid_index] = pred_prob_valid[group_num] self.logloss_rf_train_mean = {group_num: sum(self.logloss_rf_train[group_num]) / self.n_splits for group_num in range(self.num_groups)} self.logloss_rf_valid_mean = {group_num: sum(self.logloss_rf_valid[group_num]) / self.n_splits for group_num in range(self.num_groups)} self.eps = np.finfo(float).eps self.preds_rf = {group_num: np.where(self.preds_rf[group_num] < 0 + self.eps, 0 + self.eps * 1.001, self.preds_rf[group_num]) for group_num in range(self.num_groups)} self.preds_rf = {group_num: np.where(self.preds_rf[group_num] > 1 - self.eps, 1 - self.eps * 1.001, self.preds_rf[group_num]) for group_num in range(self.num_groups)} logger.info("RandomForestClassifier is complete.") return self.preds_rf, self.logloss_rf_train_mean, self.logloss_rf_valid_mean def learners(self, learner, control_outcome_learner = None, treatment_outcome_learner = None, control_effect_learner = None, treatment_effect_learner = None) -> None: self.learner = learner self.control_outcome_learner = control_outcome_learner self.treatment_outcome_learner = treatment_outcome_learner self.control_effect_learner = control_effect_learner self.treatment_effect_learner = treatment_effect_learner def cate(self, target_df: pd.core.frame.DataFrame, intervention_col_name: str, covariance_col_name: list[str], target_col_name: str, ps_col_list: list[str], verbose: bool = False): if self.control_outcome_learner is None: self.model_mu_c = deepcopy(learner) else: self.model_mu_c = control_outcome_learner if self.treatment_outcome_learner is None: self.model_mu_t = deepcopy(learner) else: self.model_mu_t = treatment_outcome_learner if self.control_effect_learner is None: self.model_tau_c = deepcopy(learner) else: self.model_tau_c = control_effect_learner if self.treatment_effect_learner is None: self.model_tau_t = deepcopy(learner) else: self.model_tau_t = treatment_effect_learner self.target_df = target_df self.intervention_col_name = intervention_col_name self.covariance_col_name = covariance_col_name self.target_col_name = target_col_name self.verbose = verbose self.control_name = 0 self.t_groups = list(self.target_df[self.intervention_col_name].loc[self.target_df[self.intervention_col_name] != self.control_name].unique()) self.t_groups.sort() # ascending self.ps = {group: np.array(self.target_df[ps_col]) for group, ps_col in enumerate(ps_col_list)} self.te = np.zeros((self.target_df.shape[0], len(self.t_groups))) self.dhat_cs = {} self.dhat_ts = {} self.models_mu_c = {group: deepcopy(self.model_mu_c) for group in self.t_groups} self.models_mu_t = {group: deepcopy(self.model_mu_t) for group in self.t_groups} self.models_tau_c = {group: deepcopy(self.model_tau_c) for group in self.t_groups} self.models_tau_t = {group: deepcopy(self.model_tau_t) for group in self.t_groups} for group in self.t_groups: self.models_mu_c[group].fit(self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.covariance_col_name], self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.target_col_name]) self.models_mu_t[group].fit(self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.covariance_col_name], self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.target_col_name]) self.d_c = self.models_mu_t[group].predict(self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.covariance_col_name]) - self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.target_col_name] self.d_t = self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.target_col_name] - self.models_mu_c[group].predict(self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.covariance_col_name]) self.models_tau_c[group].fit(self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.covariance_col_name], self.d_c) self.models_tau_t[group].fit(self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.covariance_col_name], self.d_t) self.dhat_cs[group] = self.models_tau_c[group].predict(self.target_df[self.covariance_col_name]) self.dhat_ts[group] = self.models_tau_t[group].predict(self.target_df[self.covariance_col_name]) self.te_ = ((self.ps[group] / (self.ps[group] + self.ps[0])) * self.dhat_cs[group] + (self.ps[0] / (self.ps[group] + self.ps[0])) * self.dhat_ts[group]).reshape(-1, 1) self.group_ = int(group) - 1 self.te[:, self.group_] = np.ravel(self.te_) if self.verbose: for group in self.t_groups: logger.info("Treatment group: {}".format(group)) y_hat_control = self.models_mu_c[group].predict(self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.covariance_col_name]) y_hat_treatment = self.models_mu_t[group].predict(self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.covariance_col_name]) # RMSE rmse = np.sqrt(mse(np.array(self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.target_col_name]), np.array(y_hat_control))) logger.info("RMSE Control: %s" %rmse) rmse = np.sqrt(mse(np.array(self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.target_col_name]), np.array(y_hat_treatment))) logger.info("RMSE Treatment: %s" %rmse) # Gini Coefficient arr_control = np.array([np.array(self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.target_col_name]), y_hat_control]).transpose() arr_treatment = np.array([np.array(self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.target_col_name]), y_hat_treatment]).transpose() for num, arr in enumerate([arr_control, arr_treatment]): if num == 0: n_samples = self.target_df.loc[self.target_df[self.intervention_col_name] == 0, self.target_col_name].shape[0] elif num == 1: n_samples = self.target_df.loc[self.target_df[self.intervention_col_name] == group, self.target_col_name].shape[0] # sort rows on prediction column from largest to smallest true_order = arr[arr[:, 0].argsort()][::-1, 0] pred_order = arr[arr[:, 1].argsort()][::-1, 0] # get Lorenz curves l_true = np.cumsum(true_order) / np.sum(true_order) l_pred = np.cumsum(pred_order) / np.sum(pred_order) l_ones = np.linspace(1 / n_samples, 1, n_samples) # get Gini coefficients, area between curves g_true = np.sum(l_ones - l_true) g_pred = np.sum(l_ones - l_pred) # normalize to true Gini coefficient true_gini = g_pred / g_true if num == 0: logger.info("Gini Control: %s" %true_gini) elif num == 1: logger.info("Gini Treatment: %s" %true_gini) return self.te y, X, treatment, _, _, e = synthetic_data(mode = 1, n = 10000, p = 5, sigma = 1.0) synthetic_data_ = np.hstack(( y.reshape(len(y), 1), X, treatment.reshape(len(y), 1), e.reshape(len(y), 1) )) synthetic_df = pd.DataFrame(data = synthetic_data_) synthetic_df.rename(columns = {0 : 'Y', 1 : 'cova_1', 2 : 'cova_2', 3 : 'cova_3', 4 : 'cova_4', 5 : 'cova_5', 6 : 'inte', 7 : 'ps'}, inplace = True ) covariance_df = synthetic_df[['cova_1', 'cova_2', 'cova_3', 'cova_4', 'cova_5']] synthetic_df['inte'] = synthetic_df['inte'].astype(int) intervention_df = synthetic_df['inte'] ins = XLearner(covariance_df, pd.DataFrame(intervention_df)) ps, logloss_rf_train, logloss_rf_valid = ins.propensity_score_RF(n_splits = 5, kfold_type = 'StratifiedKFold') learner = lgb.LGBMRegressor( boosting_type = 'gbdt', objective = 'regression', reg_alpha = 1, reg_lambda = 1, subsample = 0.6, max_depth = 10, num_leaves = 20, colsample_bytree = 0.5, min_child_weight = 10, n_estimators = 1000 ) ins.learners(learner) synthetic_df['ps_0'] = ps[0] synthetic_df['ps_1'] = ps[1] cate = ins.cate(synthetic_df, 'inte', ['cova_1', 'cova_2', 'cova_3', 'cova_4', 'cova_5'], 'Y', ['ps_0', 'ps_1'], True) learner_x = BaseXRegressor(learner = learner) cate_x = learner_x.fit_predict(X = X, treatment = treatment, y = synthetic_df['Y'], p = e, return_ci = False) CATE alpha = 0.2 bins = 30 plt.figure(figsize = (12, 8)) plt.hist(cate_x, alpha = alpha, bins = bins, label = 'X Learner') plt.hist(cate, alpha = alpha, bins = bins, label = 'X Learner Multiple') plt.legend() plt.show() AUUC synthetic_df['uplift_score_1'] = cate # synthetic_df['uplift_score_causalml'] = cate_x accu_score_ = auuc_score(df = synthetic_df[['Y','inte', 'uplift_score_1']], kind = 'gain', outcome_col = 'Y', treatment_col = 'inte') logger.info('AUUC : %s' %accu_score_) threshold_index = int(accu_score_.uplift_score_1 * synthetic_df.shape[0]) threshold = synthetic_df['uplift_score_1'].sort_values(ascending = False).reset_index(drop = True)[threshold_index] logger.info('Threshold : %s' %threshold) plot(df = synthetic_df[['Y','inte', 'uplift_score_1']], kind = 'gain', outcome_col = 'Y', treatment_col = 'inte' ) # plot(df = synthetic_df[['Y','inte', 'uplift_score_causalml']], kind = 'gain', # outcome_col = 'Y', # treatment_col = 'inte' # ) >> AUUC : uplift_score_1 0.789995 Random 0.498822 dtype: float64 Threshold : -0.011212206037982897 まとめ Uplift Modeling for Multiple Treatments with Cost Optimization の内容をもとに Extending X-Learner for Multiple Treatments を実装してみました。 認識や実装の間違いなどありましたらご指摘のほどよろしくお願いいたします。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Bayesian AB Testing Implemented in Two Methods 【Python】

はじめに この記事ではベイジアンABテストを2つの方法で実装しています。 1. Beta分布を共役事前分布に採用する方法 (Adopt Beta distribution as the conjugate prior distribution) 2. Bernoulli分布のパラメータをベイズ推定する方法 (Bayesian estimation of parameters of Bernoulli distribution) Check versions of libraries import numpy import scipy import pymc3 library_dict = {0: 'numpy', 1: 'scipy', 2: 'pymc3'} for num, library in enumerate([numpy, scipy, pymc3]): print('{}: v{}'.format(library_dict[num], library.__version__)) >> numpy: v1.18.5 scipy: v1.4.1 pymc3: v3.11.2 Load libraries import numpy as np from scipy.stats import beta, norm, bernoulli import pymc3 as pm import json import sys import logging logger = logging.getLogger() handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) logger.addHandler(handler) logger.setLevel(logging.INFO) from matplotlib import pyplot as plt %matplotlib inline from decimal import Decimal, ROUND_HALF_UP Adopt Beta distribution as the conjugate prior distribution class BayesianABN(object): def __init__(self, n_comparisons: int, params: list = None, n_samples: int = 50000) -> None: """ n_comparisons: If u wanna compare two creatives, n_comparisons is 2. params: If u wanna compare two creatives, params is [[num of conversions, num of impressions], [num of conversions, num of impressions]]. n_samples: Number of samples to generate from conjugate prior distribution. """ if params is None: self.params = list() for _ in range(n_comparisons): self.params.append([1, 1]) elif params is not None: self.params = params if n_comparisons != len(self.params): raise Exception("The value of n_comparisons does not match the number of elements in params.") if n_comparisons * 2 != sum([len(list_) for list_ in self.params]): raise Exception("The value of n_comparisons does not match the number of elements contained in each element of params.") self.n_comparisons = n_comparisons self.data = [[0, 0] for _ in range(n_comparisons)] self.n_samples = n_samples self.sampling() logger.info(json.dumps({'Number of comparisons': self.n_comparisons, 'Number of beta samples': self.n_samples})) def sampling(self) -> None: self.posterior_distribution_list = [beta(*p).rvs(self.n_samples) for p in self.params] def update(self, additional_data) -> None: if self.n_comparisons * 2 != sum([len(list_) for list_ in additional_data]): raise Exception("The value of n_comparisons does not match the number of elements contained in each element of additional_data.") for num, data in enumerate(additional_data): cov = data[0] imp = data[1] self.data[num][0] += cov self.data[num][1] += imp self.params[num][0] += cov self.params[num][1] += imp - cov self.sampling() def significant_diff(self, target1: int, target2: int) -> float: self.diff = np.round((self.posterior_distribution_list[target1] <= self.posterior_distribution_list[target2]).mean(), 1) if self.diff < 0.5: return 1.0 - self.diff return self.diff def mean_var(self) -> dict: mean_var_dict = {chr(num + 65): {'mean': np.round(self.posterior_distribution_list[num].mean(), 2), 'var': Decimal(self.posterior_distribution_list[num].var()).quantize(Decimal('.000000'), rounding = ROUND_HALF_UP)} for num in range(self.n_comparisons)} return mean_var_dict def current_sample(self) -> dict: current_sample_dict = dict() for num in range(self.n_comparisons): current_sample_dict[chr(num + 65)] = dict() current_sample_dict[chr(num + 65)]['cv'] = self.data[num][0] current_sample_dict[chr(num + 65)]['imp'] = self.data[num][1] return current_sample_dict def distribution_drawing(self, title: str = '', save: bool = False, labels: str = None) -> None: plt.figure(figsize = (10, 5)) plt.title(title) cmap = plt.get_cmap('jet') color_list = list() for num, posterior in enumerate(self.posterior_distribution_list): color = cmap(0.15 * (num + 1)) color_list.append(color) plt.hist(posterior, bins = 100, histtype = 'stepfilled', density = True, color = color, alpha = 0.5) handles = [plt.Rectangle((0, 0), 1, 1, color = c, ec = 'k', alpha = 0.5) for c in color_list] if labels is None: labels = [chr(num + 65) for num in range(self.n_comparisons)] plt.legend(handles, labels) if save: plt.saving('{}.png'.format(title)) plt.show() def delta_posterior_distribution_drawing(self, hdi_prob: float, target1: int, target2: int, title: str = None, save: bool = False) -> None: self.hdi_prob = hdi_prob self.pos_dis_1 = self.posterior_distribution_list[target1] self.pos_dis_2 = self.posterior_distribution_list[target2] self.delta_posterior_distribution = self.pos_dis_1 - self.pos_dis_2 self.hdi = norm.interval(alpha = self.hdi_prob, loc = self.delta_posterior_distribution.mean(), scale = self.delta_posterior_distribution.std()) plt.figure(figsize = (10, 5)) if title is None: title = 'Difference in the posterior distributions of the parameters of {0} and {1} / {0} - {1} / {2}% HDI'.format(chr(target1 + 65), chr(target2 + 65), int(self.hdi_prob * 100)) plt.title(title) cmap = plt.get_cmap('jet') color_list = list() color = cmap(0.15 * (0 + 1)) color_list.append(color) plt.hist(self.delta_posterior_distribution, bins = 100, histtype = 'stepfilled', density = True, color = color, alpha = 0.5, label = 'delta') self.mean = self.delta_posterior_distribution.mean() plt.vlines(self.mean, 0, 10, linestyle = '-', label = 'mean: {}'.format(np.round(self.mean, 3))) if int(str(hdi_prob)[3]) == 0: plt.vlines(self.hdi[0], 0, 10, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round((1.0 - self.hdi_prob) / 2.0, 2) * 100, np.round(self.hdi[0], 3))) plt.vlines(self.hdi[1], 0, 10, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round(1.0 - (1.0 - self.hdi_prob) / 2.0, 2) * 100, np.round(self.hdi[1], 3))) elif int(str(hdi_prob)[3]) != 0: plt.vlines(self.hdi[0], 0, 10, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round((1.0 - self.hdi_prob) / 2.0, 3) * 100, np.round(self.hdi[0], 3))) plt.vlines(self.hdi[1], 0, 10, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round(1.0 - (1.0 - self.hdi_prob) / 2.0, 3) * 100, np.round(self.hdi[1], 3))) handles = [plt.Rectangle((0, 0), 1, 1, color = c, ec = 'k', alpha = 0.5) for c in color_list] plt.legend() if save: plt.saving('{}.png'.format(title)) plt.show() BayesianABN_instance = BayesianABN(3) >> {"Number of comparisons": 3, "Number of beta samples": 50000} BayesianABN_instance.update([[2, 204], [3, 200], [50, 300]]) BayesianABN_instance.significant_diff(0, 1) >> 0.7 BayesianABN_instance.current_sample() >> {'A': {'cv': 2, 'imp': 204}, 'B': {'cv': 3, 'imp': 200}, 'C': {'cv': 50, 'imp': 300}} BayesianABN_instance.mean_var() >> {'A': {'mean': 0.01, 'var': Decimal('0.000070')}, 'B': {'mean': 0.02, 'var': Decimal('0.000094')}, 'C': {'mean': 0.17, 'var': Decimal('0.000461')}} BayesianABN_instance.distribution_drawing() Bayesian estimation of parameters of Bernoulli distribution class BayesianAB(object): def __init__(self, sample_dict: dict): """ sample_dict: If u wanna compare two creatives, sample_dict = {'A': creative_a, 'B': creative_b}. The values in the dict contain a sampling of cv = 1/ not cv = 0 for each creative. """ self.sample_dict = sample_dict if len(self.sample_dict) != 2: raise Exception("The number of elements in sample_dict must be two.") def modeling(self) -> None: with pm.Model() as self.model: self.p_A = pm.Uniform('p_A', 0, 1.0) self.p_B = pm.Uniform('p_B', 0, 1.0) self.obs_A = pm.Bernoulli('obs_A', self.p_A, observed = self.sample_dict['A']) self.obs_B = pm.Bernoulli('obs_B', self.p_B, observed = self.sample_dict['B']) self.delta_prob = pm.Deterministic('delta_prob', self.p_B - self.p_A) def estimate(self) -> None: with self.model: self.start = pm.find_MAP() self.step = pm.Slice() self.trace = pm.sample(20000, step = self.step, start = self.start, return_inferencedata = False) self.burned_trace = self.trace[1000:] def model_to_graphviz(self) -> None: return pm.model_to_graphviz(self.model) def tracing(self) -> None: with self.model: pm.plot_trace(self.trace) def posterior_distribution_drawing(self, hdi_prob) -> None: with self.model: pm.plot_posterior(self.burned_trace, hdi_prob = hdi_prob) def summary(self, hdi_prob: float): with self.model: self.summary_ = pm.summary(self.burned_trace, hdi_prob = hdi_prob) return self.summary_ def significant_diff(self) -> float: self.diff = np.round((self.burned_trace['p_A'] <= self.burned_trace['p_B']).mean(), 1) if self.diff < 0.5: return 1.0 - self.diff return self.diff def delta_posterior_distribution_drawing(self, hdi_prob: float, title: str = None, save: bool = False) -> None: self.hdi_prob = hdi_prob try: self.summary_ except: with self.model: self.summary_ = pm.summary(self.burned_trace, hdi_prob = self.hdi_prob) delta_prob = self.burned_trace['delta_prob'] plt.figure(figsize = (10, 5)) if title is None: title = 'Posterior distribution of delta' plt.title(title) cmap = plt.get_cmap('jet') color_list = list() color = cmap(0.15 * (0 + 1)) color_list.append(color) plt.vlines(self.summary_.iloc[2]['mean'], 0, 50, linestyle = '-', label = 'mean: {}'.format(self.summary_.iloc[2]['mean'])) if int(str(hdi_prob)[3]) == 0: self.hdi_lower = self.summary_.iloc[2]['hdi_{}%'.format(np.round((1.0 - self.hdi_prob) / 2.0, 2) * 100)] self.hdi_upper = self.summary_.iloc[2]['hdi_{}%'.format(np.round(1.0 - (1.0 - self.hdi_prob) / 2.0, 2) * 100)] plt.vlines(self.hdi_lower, 0, 50, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round((1.0 - self.hdi_prob) / 2.0, 2) * 100, self.hdi_lower)) plt.vlines(self.hdi_upper, 0, 50, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round(1.0 - (1.0 - self.hdi_prob) / 2.0, 2) * 100, self.hdi_upper)) elif int(str(hdi_prob)[3]) != 0: self.hdi_lower = self.summary_.iloc[2]['hdi_{}%'.format(np.round((1.0 - self.hdi_prob) / 2.0, 3) * 100)] self.hdi_upper = self.summary_.iloc[2]['hdi_{}%'.format(np.round(1.0 - (1.0 - self.hdi_prob) / 2.0, 3) * 100)] plt.vlines(self.hdi_lower, 0, 50, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round((1.0 - self.hdi_prob) / 2.0, 3) * 100, self.hdi_lower)) plt.vlines(self.hdi_upper, 0, 50, linestyle = '--', label = 'hdi_{}%: {}'.format(np.round(1.0 - (1.0 - self.hdi_prob) / 2.0, 3) * 100, self.hdi_upper)) plt.hist(delta_prob, bins = 100, histtype = 'stepfilled', density = True, color = color, alpha = 0.5) handles = [plt.Rectangle((0, 0), 1, 1, color = c, ec = 'k', alpha = 0.5) for c in color_list] plt.legend() if save: plt.saving('{}.png'.format(title)) plt.show() Adopt Beta distribution as the conjugate prior distribution VS Bayesian estimation of parameters of Bernoulli distribution Create Dummy Data creative_a = bernoulli.rvs(p = 0.15, size = 15000) creative_b = bernoulli.rvs(p = 0.17, size = 13000) Comparison Adopt Beta distribution as the conjugate prior distribution BayesianABN_instance = BayesianABN(2) >> {"Number of comparisons": 2, "Number of beta samples": 50000} BayesianABN_instance.update([[sum(creative_a), len(creative_a)], [sum(creative_b), len(creative_b)]]) BayesianABN_instance.significant_diff(0, 1) >> 1.0 BayesianABN_instance.current_sample() >> {'A': {'cv': 2258, 'imp': 15000}, 'B': {'cv': 2244, 'imp': 13000}} BayesianABN_instance.distribution_drawing() BayesianABN_instance.delta_posterior_distribution_drawing(0.95, 0, 1) Bayesian estimation of parameters of Bernoulli distribution instance = BayesianAB({'A': creative_a, 'B': creative_b}) instance.modeling() instance.model >> p_A_interval__ ~ TransformedDistribution p_B_interval__ ~ TransformedDistribution p_A ~ Uniform p_B ~ Uniform delta_prob ~ Deterministic obs_A ~ Bernoulli obs_B ~ Bernoulli instance.model_to_graphviz() instance.estimate() >> 100.00% [7/7 00:00<00:00 logp = -19,408, ||grad|| = 6,752.2] Multiprocess sampling (4 chains in 4 jobs) CompoundStep >Slice: [p_B] >Slice: [p_A] 100.00% [84000/84000 01:15<00:00 Sampling 4 chains, 0 divergences] Sampling 4 chains for 1_000 tune and 20_000 draw iterations (4_000 + 80_000 draws total) took 120 seconds. instance.tracing() posterior_distributions = instance.posterior_distribution_drawing(hdi_prob = 0.95) summary = instance.summary(hdi_prob = 0.95) summary >> mean sd hdi_2.5% hdi_97.5% mcse_mean mcse_sd ess_bulk ess_tail r_hat p_A 0.151 0.003 0.145 0.156 0.0 0.0 76771.0 58106.0 1.0 p_B 0.173 0.003 0.166 0.179 0.0 0.0 73469.0 56450.0 1.0 delta_prob 0.022 0.004 0.013 0.031 0.0 0.0 75624.0 66133.0 1.0 instance.significant_diff() >> 1.0 (instance.burned_trace['p_B'] - instance.burned_trace['p_A'] > 0).mean() >> 1.0 instance.delta_posterior_distribution_drawing(hdi_prob = 0.95, title = 'Posterior distribution of delta / 95% HDI') Result The same average, HID2.5%, and HDI 97.5% were calculated for the two methods compared. Since the difference of the estimated parameters is 1.0 and the difference of the average is 0.022, we can say that the probability that B is more likely to convert is close to 100% and the conversion rate could have been increased by 2.2%. まとめ ベイジアンABテストを2つの方法で実装し結果を比較してみました。 認識や実装の間違いなどありましたらご指摘のほどよろしくお願いいたします。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Progateで作ったWebアプリをDjangoで作ってみる2! Part5 -データベース編-

目標物の確認 ProgateのNode.jsコースで作ったブログアプリと同じものをDjangoで作ってみます。 Djangoでのアプリ開発の一連の流れを整理するために記していきます。 完成イメージ models.py models.pyファイルの作成を進めていきます。 今回作成するブログに使う情報は、タイトル、要約、本文、カテゴリーの4つです。 BlogModelというclassを定義して、modelsモジュールの中のModelクラスを継承させるために、BlogModelの中でmodels.Modelというコードを書きます(☆1)。 ブログのタイトルはCharFieldを定義しました。CharFieldは短めの文字列情報を扱うフィールドです。CharFieldを作成するときはMax_length(最大文字数)という引数を設定します。今回は最大100文字とします(☆2)。 要約と本文はTextFieldを定義します。TextFieldはCharFieldと同じイメージで長めの文字列情報を扱うときに使います(☆3)。 最後のカテゴリーはまた、CharFieldです。ですが、さっきのタイトルと異なり、choices=CATEGORYという引数が入っています。右の表示がhtmlファイル上で表示される項目、左の表示がhtmlファイル上で表示される項目です。つまり、CATEGORY = (('all', '全員'), ('limited', '会員限定'))の左側は機械が理解できる項目で、右側が人が理解できる項目です blogapp/blog/models.py from django.db import models # ☆1 CATEGORY = (('all', '全員'), ('limited', '会員限定')) # ☆4 class BlogModel(models.Model): # ☆1 title = models.CharField(max_length=100) # ☆2 summary = models.TextField() # ☆3 content = models.TextField() # ☆3 category = models.CharField( # ☆4 max_length=50, choices=CATEGORY) def __str__(self): return self.title マイグレーションしていきます。 > python manage.py makemigrations > python manage.py migrate admin.py admin.pyファイルにコードを追加し、管理画面にBlogModelを表示させることも忘れずに。 blogapp/blog/admin.py from django.contrib import admin from .models import BlogModel admin.site.register(BlogModel) ブログ記事の追加 最後に管理者ツールからBlogModelを選択して、個々の記事を入れていきます。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

AWS QuickSight Boto3【Python】

はじめに この記事は、Boto 3 Docs 1.9.185 documentation の内容をベースとしています。 AWS外部のWebアプリケーションで組織やユーザが登録された場合にPython Boto3, API Gateway, Lambda, Step Functionsから成るAPIを利用してQuickSightでも同様の組織やユーザを作成するケースがあり、本記事ではその際に利用したPython Boto3 QuickSight Clientの関数を記載しています。 Preparation import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.DEBUG) # set Variables REGION_NAME = os.getenv("REGION_NAME") AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID") NAME_SPACE = 'default' """ QuickSightへの必要に応じた権限を付与されているRoleをAssume RoleとしてLambdaで利用することにより、 aws_access_key_id, aws_secret_access_keyをパラメータに設定する必要がなくセキュア。 """ qs = boto3.client('quicksight', region_name = REGION_NAME) """ ダッシュボードをqs.create_dashboardで作成するにはテンプレートが必要なのであらかじめ作成。 QuickSightコンソール上で作成しておいた分析のIDをANALYSIS_IDに設定。 SourceEntityには分析および分析に利用しているデータセットのリソースネームを設定。 qs.create_templateで設定されるDataSetPlaceholderはqs.create_dashboardでも利用。 """ ANALYSIS_ID = 'analysis-id' qs.create_template( AwsAccountId = AWS_ACCOUNT_ID, Name = 'Dashboard Template', Permissions = [ { 'Principal': 'arn:aws:quicksight:{}:{}:user/{}/<QuickSightUserName>'.format(REGION, AWS_ACCOUNT_ID, NAME_SPACE), 'Actions': [ 'quicksight:UpdateTemplatePermissions', 'quicksight:DescribeTemplate', ] }, ], TemplateId = 'Dashboard-Template', SourceEntity = { 'SourceAnalysis': { 'Arn': 'arn:aws:quicksight:{}:{}:analysis/{}'.format(REGION, AWS_ACCOUNT_ID, ANALYSIS_ID), 'DataSetReferences': [ { 'DataSetPlaceholder': 'DataSetPlaceHolder1', 'DataSetArn': 'arn:aws:quicksight:{}:{}:dataset/<DataSetId1>'.format(REGION, AWS_ACCOUNT_ID) }, { 'DataSetPlaceholder': 'DataSetPlaceHolder2', 'DataSetArn': 'arn:aws:quicksight:{}:{}:dataset/<DataSetId2>'.format(REGION, AWS_ACCOUNT_ID) }, ] } }, Tags=[ { 'Key': 'name', 'Value': 'Dashboard Template' }, ], # VersionDescription = '1' ) Organization # create Group """ default Namespaceに組織IDと等しいGroupNameのグループを作成。 """ def CreateGroup(qs, AwsAccountId, Namespace, GroupName): qs.create_group( GroupName = GroupName, AwsAccountId = AwsAccountId, Namespace = Namespace ) # create Dataset """ ダッシュボードに表示するデータをデータセットとして作成。 ImportMode = 'DIRECT_QUERY'としてDBからクエリしたデータをQuickSightのデータセットとして利用。 DBのデータをQuickSightのデータセットとして利用するには対象のDBをデータソースとしてQuickSightに登録しておく必要があり、 DataSourceArnにはQuickSightに登録してあるデータソースのリソースネームを指定。 データセットとして読み込むデータとしてクエリデータのカラム情報をColumnsに指定。 QuickSightに作成してあるユーザやグループをPermissionsに設定することでデータセットへの権限の付与が可能。 """ def CreateDataset(qs, AwsAccountId, Region, Namespace, DataSetId, DataSetName, TableName, DataSourceArn, SqlQuery, Columns): qs.create_data_set( AwsAccountId = AwsAccountId, DataSetId = DataSetId, Name = DataSetName, PhysicalTableMap = { TableName: { 'CustomSql': { 'DataSourceArn': DataSourceArn, 'Name': 'Query', 'SqlQuery': SqlQuery, 'Columns': Columns }, } }, ImportMode = 'DIRECT_QUERY', Permissions = [ { 'Principal': 'arn:aws:quicksight:{}:{}:user/{}/<QuickSightUserName>'.format(Region, AwsAccountId, Namespace), 'Actions': [ "quicksight:DescribeDataSet", "quicksight:DescribeDataSetPermissions", "quicksight:PassDataSet", "quicksight:DescribeIngestion", "quicksight:ListIngestions", "quicksight:UpdateDataSet", "quicksight:DeleteDataSet", "quicksight:CreateIngestion", "quicksight:CancelIngestion", "quicksight:UpdateDataSetPermissions" ] }, ] ) # create Dashboard """ 組織名は変更されることが想定されるため、組織IDをDashboardIdとして利用してダッシュボードを作成。。 Permissionsにグループを設定することで、設定されたグループに属しているユーザにダッシュボード閲覧権の付与が可能。 SourceEntityではダッシュボードに読み込むデータセットおよびグラフビジュアルを作成する基となるテンプレートのリソースネームを設定。 DataSetIdとDataSetPlaceholderに1と2が存在しており、ダッシュボードに2つのデータセットを読み込むことを想定。 """ def CreateDashboard(qs, AwsAccountId, Region, Namespace, DashboardId, DashboardName, GroupName, DataSetId1, DataSetId2, DataSetPlaceholder1, DataSetPlaceholder2, TemplateID): qs.create_dashboard( AwsAccountId = AwsAccountId, DashboardId = DashboardId, Name = DashboardName, Permissions = [ { 'Principal': 'arn:aws:quicksight:{}:{}:user/{}/<QuickSightUserName>'.format(Region, AwsAccountId, Namespace), 'Actions': [ 'quicksight:DescribeDashboard', 'quicksight:ListDashboardVersions', 'quicksight:UpdateDashboardPermissions', 'quicksight:QueryDashboard', 'quicksight:UpdateDashboard', 'quicksight:DeleteDashboard', 'quicksight:DescribeDashboardPermissions', 'quicksight:UpdateDashboardPublishedVersion' ] }, { 'Principal': 'arn:aws:quicksight:{}:{}:group/{}/{}'.format(Region, AwsAccountId, Namespace, GroupName), 'Actions': [ "quicksight:DescribeDashboard", "quicksight:QueryDashboard", "quicksight:ListDashboardVersions" ] } ], SourceEntity = { 'SourceTemplate': { 'DataSetReferences': [ { 'DataSetPlaceholder': DataSetPlaceholder1, 'DataSetArn': 'arn:aws:quicksight:{}:{}:dataset/{}'.format(Region, AwsAccountId, DataSetId1) }, { 'DataSetPlaceholder': DataSetPlaceholder2, 'DataSetArn': 'arn:aws:quicksight:{}:{}:dataset/{}'.format(Region, AwsAccountId, DataSetId2) }, ], 'Arn': 'arn:aws:quicksight:{}:{}:template/{}'.format(Region, AwsAccountId, TemplateID) } }, Tags = [ { 'Key': 'name', 'Value': DashboardId }, ], # VersionDescription = '1' ) # delete Group def DeleteGroup(qs, AwsAccountId, Namespace, GroupName): qs.delete_group( GroupName = GroupName, AwsAccountId = AwsAccountId, Namespace = Namespace ) # delete Dataset def DeleteDataset(qs, AwsAccountId, DataSetId): qs.delete_data_set( AwsAccountId = AwsAccountId, DataSetId = DataSetId ) # delete Dashboard def DeleteDashboard(qs, AwsAccountId, DashboardId): qs.delete_dashboard( AwsAccountId = AwsAccountId, DashboardId = DashboardId ) # update Dashboard """ Permissionsを除き、qs.create_dashboardと同じ引数を利用。 引数に対応する値はqs.create_dashboardの際に指定した引数の値に等しい。 Nameのみを変更することでダッシュボード名を変更することが可能。 """ def UpdateDashboardName( qs, AwsAccountId, DashboardId, DataSetId1, DataSetPlaceholder1, DataSetId2, DataSetPlaceholder2, NewDashboardName, TemplateId ): update_dashboard_info = qs.update_dashboard( AwsAccountId = AwsAccountId, DashboardId = DashboardId, Name = NewDashboardName, SourceEntity = { 'SourceTemplate': { 'DataSetReferences': [ { 'DataSetPlaceholder': DataSetPlaceholder1, 'DataSetArn': 'arn:aws:quicksight:{}:{}:dataset/{}'.format(Region, AwsAccountId, DataSetId1) }, { 'DataSetPlaceholder': DataSetPlaceholder2, 'DataSetArn': 'arn:aws:quicksight:{}:{}:dataset/{}'.format(Region, AwsAccountId, DataSetId2) }, ], 'Arn': 'arn:aws:quicksight:{}:{}:template/{}'.format(Region, AwsAccountId, TemplateId) } }, # VersionDescription = '15', ) return update_dashboard_info # update Dashboard Published Version """ qs.update_dashboardでのダッシュボード名の変更を表示に反映。 ダッシュボード名が変更されたダッシュボードのバージョンナンバーをVersionNumberに設定。 qs.update_dashboardの返り値に含まれるバージョンナンバーを VersionArn = update_dashboard_info['VersionArn'] target = '/version/' idx = VersionArn.find(target) version_str = VersionArn[idx + len(target):] VersionNumber = int(version_str) のように取得することが可能。 """ def UpdateDashboardPublishedVersion (AwsAccountId, DashboardId, VersionNumber): qs.update_dashboard_published_version( AwsAccountId = AwsAccountId, DashboardId = DashboardId, VersionNumber = VersionNumber ) User # register User """ IdentityType = 'IAM'として登録されたユーザはパスワードを入力せずにEmbed URLからダッシュボードやコンソールのページを表示することが可能。 QuickSightに登録されるユーザ名はIamArnとSessionNameから成る。 IamArn = 'arn:aws:iam::{}:role/<Role>'.format(AWS_ACCOUNT_ID), SessionName = 'TestUser'として設定した場合の QuickSightに登録されるユーザ名は'<Role>/TestUser'。 登録するユーザが属している組織のIDをOrganizationListを指定することで、組織IDに等しいグループ名のグループがQuickSightに存在していれば グループに作成したユーザを追加。 """ def RegisterUser(qs, AwsAccountId, Email, IamArn, RegisterUserName, Namespace, qsUserName, UserList, OrganizationList): if qsUserName not in UserList: result = qs.register_user( IdentityType = 'IAM', Email = Email, UserRole = 'READER', IamArn = IamArn, SessionName = RegisterUserName, AwsAccountId = AwsAccountId, Namespace = Namespace, CustomPermissionsName = 'DataExplorer' ) if result['Status'] == 201: if OrganizationList != []: for GroupName in OrganizationList: qs.create_group_membership( MemberName = qsUserName, GroupName = GroupName, AwsAccountId = AwsAccountId, Namespace = Namespace ) else: logger.debug('The organization that {} belongs to does not exist in QuickSight.'.format(qsUserName)) # log else: logger.debug('User {} creation failed'.format(qsUserName)) # log else: logger.debug('User {} already exists'.format(qsUserName)) # log # delete User def DeleteUser(qs, AwsAccountId, qsUserName, Namespace): qs.delete_user( UserName = qsUserName, AwsAccountId = AwsAccountId, Namespace = Namespace ) # update User permission for dashboard def CreateGroupMembership(qs, AwsAccountId, qsUserName, GroupName, Namespace): qs.create_group_membership( MemberName = qsUserName, GroupName = GroupName, AwsAccountId = AwsAccountId, Namespace = Namespace ) # update User permission for dashboard def DeleteGroupMembership(qs, AwsAccountId, qsUserName, GroupName, Namespace): qs.delete_group_membership( MemberName = qsUserName, GroupName = GroupName, AwsAccountId = AwsAccountId, Namespace = Namespace ) まとめ 実際に開発を行う場合にはAPI仕様書や詳細設計書、シーケンス図の作成、DB設計などを行い、AWSであれば必要に応じてセキュリティグループやサブネットを設定、デプロイはserverless frameworkを利用しGitHubでソースコードのバージョン管理を行うなどの流れがありますが、ここではQuickSightに対して処理を行うPythonの関数を載せてみました。 認識の間違いなどありましたらご指摘のほどよろしくお願いいたします。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

属性に対する加算・減算代入演算子(+=, -=)の振る舞いを定義したいとき

例として以下のクラスを定義する。 example.py class Example: def __init__(self) -> None: self.num: int = 0 加算・演算代入演算子を利用する際も、属性に定義されているsetterが呼びだされる。 これを利用して振る舞いを定義できる。 プロパティを用いて書き直すと example.py class Example: def __init__(self) -> None: self.__num: int = 0 @property def num(self): return self.__num @num.setter def num(self, value): self.__num = value self.__numに対して演算した際の、setterの様子を見てみる。 example.py example = Example() example.num = 2 example.num += 1 examaple.num = 2の場合、setterの引数valueには、右辺の2が入る。 example_num += 1の場合、setterの引数valueには、計算後の値(ここでは3)が入る。 もちろんself.__numの値はsetterのself.__num = valueの行で代入されるまでは計算前の値になっている。 これを利用して、valueを代入する前のself.__numの値とvalueの値を比較することで、 加算・減算代入を場合分けして振る舞いを定義できる。 example.py @num.setter def num(self, value): if value < self.__num: # 成立するのは加算代入をしている場合。ただし通常の代入でも成り立つ可能性はある pass # += のときの振る舞い elif value > self.__num: # 成立するのは減算代入をしている場合。ただし通常の代入でも成り立つ可能性はある pass # -= のときの振る舞い self.__num = value
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

クラスの属性に対する加算・減算代入演算子(+=, -=)の振る舞いを定義したいとき

例として以下のクラスを定義する。 example.py class Example: def __init__(self) -> None: self.num: int = 0 加算・演算代入演算子を利用する際も、属性に定義されているsetterが呼びだされる。 これを利用して振る舞いを定義できる。 プロパティを用いて書き直すと example.py class Example: def __init__(self) -> None: self.__num: int = 0 @property def num(self): return self.__num @num.setter def num(self, value): self.__num = value self.__numに対して演算した際の、setterの様子を見てみる。 example.py example = Example() example.num = 2 example.num += 1 examaple.num = 2の場合、setterの引数valueには、右辺の2が入る。 example_num += 1の場合、setterの引数valueには、計算後の値(ここでは3)が入る。 もちろんself.__numの値はsetterのself.__num = valueの行で代入されるまでは計算前の値になっている。 これを利用して、valueを代入する前のself.__numの値とvalueの値を比較することで、 加算・減算代入を場合分けして振る舞いを定義できる。 example.py @num.setter def num(self, value): if value < self.__num: # 成立するのは加算代入をしている場合。ただし通常の代入でも成り立つ可能性はある pass # += のときの振る舞い elif value > self.__num: # 成立するのは減算代入をしている場合。ただし通常の代入でも成り立つ可能性はある pass # -= のときの振る舞い self.__num = value
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

フォルダと拡張子を指定してフォルダ内のファイルを別の1つのフォルダにコピー

はじめに サーバーに存在するサブフォルダを含む大量のファイルをローカルの1つのフォルダにダウンロード(コピー)して処理したい時が割とある。 フォルダごとに同一名のファイルが存在する場合はコピーする際に上書きされる可能性がある。 また、フォルダのデータ全てをコピーするのではなく、特定の拡張子のファイルだけコピーしたい場合が結構ある。 そんな課題を解決するためのGUIを作成した。 仕様 ・上下2つの「フォルダを選択」ボタンで「コピー元フォルダ(1行目)」と「コピー先フォルダ(2行目)」を選択する。 ・3行目にコピー対象の拡張子を「.」抜きで入力する※最大5つ ・「実行ボタン」でコピー処理が実行される ・コピー先のファイル名はコピー元のディレクトリを起点とした相対パスのパス文字がアンダーバーに変換された名前が付けられる コード import PySimpleGUI as sg import os import re, shutil import pathlib INPUT_BOX_SIZE = 5 HEADER_ROW = 1 INI_EXT_LINE = 2 MAX_NUM_EXT = 5 #### 関数 # ファイル名変換関数 def name_savefile_dirname(path ,org_dir): file_path = path.replace('/', os.sep) #パスの区切り文字を統一 org_path = pathlib.Path(org_dir) sub_path = pathlib.Path(file_path) file_name = sub_path.relative_to(org_path) # 指定ディレクトリ基準のファイルの相対パスを取得 return str(file_name).replace("\\","_") #パスの区切り文字をアンダーバーに変換 # ファイルコピー関数 def copy_files(org_dir, save_dir, target_ext): file_list = [] #コピーするファイルのリスト filepath_list = [] #コピーするファイルパスのリスト for dirpath, dirnames, filenames in os.walk(org_dir): #コピー元のディレクトリを基準にtarget_extで指定した拡張子のファイルをリスト化 for file in filenames: for ext in target_ext: m = re.search(re.escape(ext), file) #指定した拡張子のファイルを探す if m : file_list.append(file) #ファイル名をリストに追加 filepath_list.append(os.path.join(dirpath, file)) #コピー元ファイルのパスをリストに追加 for i in range(len(file_list)): save_file_path = os.path.join(save_dir, name_savefile_dirname(filepath_list[i], org_dir)) #コピー先ディレクトリ名とコピー元ディレクトリを付けたファイル名を結合 shutil.copy2(filepath_list[i], save_file_path) #ファイルをコピー print(save_file_path) #### GUI # セクション1 - オプションの設定と標準レイアウト sg.theme('Dark Blue 3') layout = [ #1行目 [sg.InputText('コピー元フォルダ',enable_events=True,), sg.FolderBrowse('フォルダを選択', key='folder_from')], #2行目 [sg.InputText('コピー先フォルダ',enable_events=True,), sg.FolderBrowse('フォルダを選択', key='folder_to')], #3行目 [sg.Text('拡張子(ex:csv)', size=(15, 1)),sg.InputText('csv', size=(INPUT_BOX_SIZE, 1)), sg.InputText('xlsx', size=(INPUT_BOX_SIZE, 1)),sg.InputText('', size=(INPUT_BOX_SIZE, 1)), sg.InputText('', size=(INPUT_BOX_SIZE, 1)), sg.InputText('', size=(INPUT_BOX_SIZE, 1))], #4行目 [sg.Submit(button_text='実行ボタン')] ] # セクション 2 - ウィンドウの生成 window = sg.Window('ファイルコピー', layout) # セクション 3 - イベントループ while True: event, values = window.read() if event is None: print('exit') break if event == '実行ボタン': # フォルダのパスを抽出 path_from = values['folder_from'] path_to = values['folder_to'] # 入力がある場合コピー対象とするように拡張子をリスト化 ext_list =[] for i in range(MAX_NUM_EXT): if not len(values[INI_EXT_LINE+i].strip())==0: ext_list.append("." + values[INI_EXT_LINE+i].strip()) # コピー関数実行 copy_files(path_from, path_to, ext_list) # ポップアップ sg.popup("終了~") # セクション 4 - ウィンドウの破棄と終了 window.close()
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

GUIでフォルダ内にある指定した拡張子のファイルを別フォルダにコピー

はじめに サーバーに存在するサブフォルダを含む大量のファイルをローカルにダウンロード(コピー)して処理したい時が割とある。 フォルダごとに同一名のファイルが存在する場合はコピーする際に上書きされる可能性がある。 また、フォルダのデータ全てをコピーするのではなく、特定の拡張子のファイルだけコピーしたい場合が結構ある。 そんな課題を解決するためのGUIを作成した。 仕様 ・上下2つの「フォルダを選択」ボタンで「コピー元フォルダ(1行目)」と「コピー先フォルダ(2行目)」を選択する。 ・3行目にコピー対象の拡張子を「.」抜きで入力する※最大5つ ・「実行ボタン」でコピー処理が実行される ・コピー先のファイル名はコピー元のディレクトリを起点とした相対パスのパス文字がアンダーバーに変換された名前が付けられる コード import PySimpleGUI as sg import os import re, shutil import pathlib INPUT_BOX_SIZE = 5 HEADER_ROW = 1 INI_EXT_LINE = 2 MAX_NUM_EXT = 5 #### 関数 # ファイル名変換関数 def name_savefile_dirname(path ,org_dir): file_path = path.replace('/', os.sep) #パスの区切り文字を統一 org_path = pathlib.Path(org_dir) sub_path = pathlib.Path(file_path) file_name = sub_path.relative_to(org_path) # 指定ディレクトリ基準のファイルの相対パスを取得 return str(file_name).replace("\\","_") #パスの区切り文字をアンダーバーに変換 # ファイルコピー関数 def copy_files(org_dir, save_dir, target_ext): file_list = [] #コピーするファイルのリスト filepath_list = [] #コピーするファイルパスのリスト for dirpath, dirnames, filenames in os.walk(org_dir): #コピー元のディレクトリを基準にtarget_extで指定した拡張子のファイルをリスト化 for file in filenames: for ext in target_ext: m = re.search(re.escape(ext), file) #指定した拡張子のファイルを探す if m : file_list.append(file) #ファイル名をリストに追加 filepath_list.append(os.path.join(dirpath, file)) #コピー元ファイルのパスをリストに追加 for i in range(len(file_list)): save_file_path = os.path.join(save_dir, name_savefile_dirname(filepath_list[i], org_dir)) #コピー先ディレクトリ名とコピー元ディレクトリを付けたファイル名を結合 shutil.copy2(filepath_list[i], save_file_path) #ファイルをコピー print(save_file_path) #### GUI # セクション1 - オプションの設定と標準レイアウト sg.theme('Dark Blue 3') layout = [ #1行目 [sg.InputText('コピー元フォルダ',enable_events=True,), sg.FolderBrowse('フォルダを選択', key='folder_from')], #2行目 [sg.InputText('コピー先フォルダ',enable_events=True,), sg.FolderBrowse('フォルダを選択', key='folder_to')], #3行目 [sg.Text('拡張子(ex:csv)', size=(15, 1)),sg.InputText('csv', size=(INPUT_BOX_SIZE, 1)), sg.InputText('xlsx', size=(INPUT_BOX_SIZE, 1)),sg.InputText('', size=(INPUT_BOX_SIZE, 1)), sg.InputText('', size=(INPUT_BOX_SIZE, 1)), sg.InputText('', size=(INPUT_BOX_SIZE, 1))], #4行目 [sg.Submit(button_text='実行ボタン')] ] # セクション 2 - ウィンドウの生成 window = sg.Window('ファイルコピー', layout) # セクション 3 - イベントループ while True: event, values = window.read() if event is None: print('exit') break if event == '実行ボタン': # フォルダのパスを抽出 path_from = values['folder_from'] path_to = values['folder_to'] # 入力がある場合コピー対象とするように拡張子をリスト化 ext_list =[] for i in range(MAX_NUM_EXT): if not len(values[INI_EXT_LINE+i].strip())==0: ext_list.append("." + values[INI_EXT_LINE+i].strip()) # コピー関数実行 copy_files(path_from, path_to, ext_list) # ポップアップ sg.popup("終了~") # セクション 4 - ウィンドウの破棄と終了 window.close()
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

kivyMDチュートリアル其の弍什捌 Components - Slider篇

ハロー、Youtube!どうもHIKAKI・・・ 危ない、危ない。危うく自己紹介を間違えるところでした。 いやー、相変わらず「あつがなつい」ですね。体がトロけてなくなるのでは・・・ と無駄な心配をしているばかりな日々を過ごしています。皆さんはお元気に 過ごせていますでしょうか? かといって涼しむ手段も限定されて、なかなか思うようにいかないものですね。 はやく、元の生活様式に戻りたいものです。 ですが!(暑苦しい)、kivyMDチュートリアルはそんなことおかまいなしに進んで いく所存です!今日も元気にSlider篇をやっていこうと思います。 Slider 元気にマテリアルデザインのリンク先は飛ばしていこうと思います。 思い切りだけはいいよね。 まぁでも、マニュアルでもリンク先でもアイコンやバーなどを見るとパッと何に使う かとかは分かりやすいのではないでしょうか。どちらも音量を調節するアイコンが表示 されていますが、この使い方が多いのではないですかね。リンク先では、RGB値を設定 するということも例として挙げられていますね。 一応マニュアルの方ではこのような説明があります。 Sliders allow users to make selections from a range of values. 先週のSelection Controlsコンポーネントと関連がありそうですが、主に設定関連 で使われそうなものになりますね。あとはこんなものも使えるんじゃない?みたいな ことがあれば教えてもらえると嬉しく思います。 ということで、前置きにウェイトを置いてもしょうがないので、さっそくコードと実行 結果の方を見るようにします。今日はとてもとてもシンプルな使いかた・表示となって います。 Code(1つだけ紹介する形なのでタイトルは適当に) xxviii/slider.py from kivy.lang import Builder from kivymd.app import MDApp KV = ''' Screen MDSlider: min: 0 max: 100 value: 40 ''' class Test(MDApp): def build(self): return Builder.load_string(KV) Test().run() 本当にシンプル・・・ たったこれだけになります。 もう今まで見られた方は入門1日目くらいの難易度です。 説明するくらいも省けるようなテイストはありますが、一応軽く触れておきます。 Usage(ここも適当に) # With value hint MDSlider: min: 0 max: 100 value: 40 # Without value hint MDSlider: min: 0 max: 100 value: 40 hint: False # Without custom color !!!!!注意!!!!! → 現行バージョン(0.104.1)だと有効化されない MDSlider: min: 0 max: 100 value: 40 hint: False color: app.theme_cls.accent_color どの表示方法でもmin、max、valueプロパティは必ずくっ付いています。詳細はAPI の方に後回しする形となりますが、最初の2つは名前から分かる通りSliderの取り得る 値の最小・最大値になりますよね。また初期表示される設定値としてvalueプロパティ があります。今回は40に変更してみました。 「Without value hint」で出てくるhintプロパティは、これも名前の通り値のヒン トになります。注意点としては、デフォルトだとTrueになっているのでこれを表示させ たくないということであればFalseに変更する必要があります。 最後のcolorプロパティですが、なぜかこれは有効化されませんでした。次回バージョン での課題ということでしょうか。あぁTodoタスクが山積みだなぁ。。 結果 ということであっさりした終わり方ですが、以上となります。 ここからはどのようになるのか、というかほぼほぼマニュアルの通りですが、どうなって いるか見てみましょう。 初めて動画キャプチャというのを採用してみました。動きがもっちゃりしていますね。 あと最初の動きがキモい・・・ ということはどうでも良いことですが、これは最初の「With value hint」のところ になります。ちゃんとhintプロパティ(デフォはTrueが入る)が有効化されていること が分かりますね。 次に「Without value hint」がどうなっているか見てみましょう。 見事にヒントが現れていませんね。少しでもスリム化したいとなった場合は良いかもし ません。 あと言い忘れていましたが、Codeのところでは紹介できていませんが試しているのは 別々のコードとなります。一緒にしようと思ったのですが、なぜかSlider同士がくっ 付く仕様となっていて見やすくするために分けました。これもGitHubの方では、それ ぞれのコードがあるので参考にしてもらえれば。 API - kivymd.uix.slider 最後にまとめに入る前に、使用したAPIを触れておきます。 class kivymd.uix.slider.MDSlider(**kwargs) Class for creating a Slider widget. Check module documentation for more details. Sliderウィジェットを作るためのクラスとなります。 詳細はコードの方を見てねとも言われています。 active If the slider is clicked. active is an BooleanProperty and defaults to False. 気になったので、試してみましたがなんのためのモノか不明でした。 初期表示からhintが有効化(ヒントが表示)されていて、そのあとは特に「With value hint」の挙動と変わらないという・・・ 少し発展途上にあるかもしれません。 hint If True, then the current value is displayed above the slider. hint is an BooleanProperty and defaults to True. 言わずとしれたモノですよね。デフォルトはTrue(値が見える)となっています。 またColor系のプロパティは、試したところでは有効化されていることは確認 出来ませんでした。あと、どうでもよいことですが、なぜvalueプロパティの 説明がないぃぃ・・・いらないということですか?説明がいらないということ なのですかぁぁ?(狂気的) まとめ はい、今日はあっさりとした終わり方でしたがいかがだったでしょうか。 値をユーザーに決めるということでいうと、Text Fieldとかよりも直感的で 操作しやすいのではないでしょうか。On/Offとかだと先週のSelectionCont- rolsにお任せする形になりそうですね。 ということで今日はこのくらいにしておきます。来週はSnackbarが動かないので 1つ飛ばしてSpinner篇の予定となります。 なんかソワソワした終わり方ですが、以上で。たまにはこんな形でもいいですよね。 # 独裁政権 それでは、ごきげんよう。 参照 Components » Slider https://kivymd.readthedocs.io/en/latest/components/slider/
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Docker + Keycloak + FastAPIを用いたローカル OAuth2.0 Authorization Code Grant 検証環境構築 メモ

学習目的でKeycloakを使ったAuthorization Code Grantを検証するための環境を構築したので、メモとして残しておく。 アクセストークンを取得するところまで。 構成 Dockerを用いた以下の構成で検証環境を構築する。 keycloak ___ docker-compose.yml |__ app.env |__ app ___ Dockerfile |__ requirements.txt |__ api _ main.py docker-compose.yml FastAPI - Keycloak - MySQLコンテナ定義 version: "3" volumes: mysql_data: driver: local services: mysql: image: mysql:5.7 volumes: - mysql_data:/var/lib/mysql environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: keycloak MYSQL_USER: keycloak MYSQL_PASSWORD: password keycloak: image: quay.io/keycloak/keycloak:latest container_name: "keycloak" environment: DB_VENDOR: MYSQL DB_ADDR: mysql DB_DATABASE: keycloak DB_USER: keycloak DB_PASSWORD: password KEYCLOAK_USER: admin KEYCLOAK_PASSWORD: P@ssw0rd ports: - 8080:8080 depends_on: - mysql app: container_name: "app" env_file: app.env build: ./app volumes: - ./app/api:/usr/src/server ports: - "8000:8000" app.env 環境変数設定 一度Keycloakコンテナを起動して、登録したクライアント情報を設定する。 APP_BASE_URL=http://localhost:8000/ KEYCLOAK_BASE_URL_LOCALHOST=http://localhost:8080/ KEYCLOAK_BASE_URL_CONTAINER_NAME=http://keycloak:8080/ CLIENT_ID=YOUR_CLIENT_ID CLIENT_SECRET=YOUR_CLIENT_SECRET REDIRECT_URI=http://localhost:8000/auth/callback requirements.txt Python依存ライブラリ fastapi requests uvicorn Dockerfile 依存ライブラリインストールとFastAPI起動オプションを指定。 FROM python:3.7 WORKDIR /usr/src/server ADD requirements.txt . RUN pip install -r requirements.txt # uvicornのオプションに--reloadを付与し、 # main.pyの編集と同時に変更内容を反映させる。 CMD ["uvicorn", "main:app", "--reload", "--host", "0.0.0.0", "--port", "8000"] main.py 以下を行う。 Keycloak Authorization Endpointへのリダイレクト(login) Keycloak Authorization Responseの受信+トークンリクエスト(auth) import ast import urllib.parse as parse import os import requests import uvicorn from fastapi import FastAPI from starlette.requests import Request from starlette.responses import RedirectResponse import hashlib # 環境変数取得 # FastAPI アプリ用 APP_BASE_URL = os.getenv("APP_BASE_URL") APP_CLIENT_ID = os.getenv("CLIENT_ID") APP_CLIENT_SECRET = os.getenv('CLIENT_SECRET') APP_REDIRECT_URI = os.getenv('REDIRECT_URI') # Keycloak用 # Authorization Endpoint KEYCLOAK_BASE_URL_LOCALHOST = os.getenv("KEYCLOAK_BASE_URL_LOCALHOST") # "Master"は自身の環境のレルム名を指定する。 AUTH_BASE_URL = ( f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/Master" "/protocol/openid-connect/auth" ) # Token Endpoint # コンテナ間通信するため、localhostではなくコンテナ名を指定 # "Master"は自身の環境のレルム名を指定する。 KEYCLOAK_BASE_URL_CONTAINER_NAME = os.getenv( "KEYCLOAK_BASE_URL_CONTAINER_NAME") TOKEN_URL = ( f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/master" "/protocol/openid-connect/token" ) app = FastAPI() # Keycloak Authorization Endpointへのリダイレクト @app.get("/auth/login") async def login() -> RedirectResponse: # ステート生成 state = hashlib.sha256(os.urandom(32)).hexdigest() # Authorization Endpointへリダイレクト AUTH_URL = AUTH_BASE_URL + '?{}'.format(parse.urlencode({ 'client_id': APP_CLIENT_ID, 'redirect_uri': APP_REDIRECT_URI, 'state': state, 'response_type': 'code' })) response = RedirectResponse(AUTH_URL) # ステート保存 response.set_cookie(key="AUTH_STATE", value=state) return response # Token Request def get_token(code): params = { 'client_id': APP_CLIENT_ID, 'client_secret': APP_CLIENT_SECRET, 'grant_type': 'authorization_code', 'redirect_uri': APP_REDIRECT_URI, 'code': code } x = requests.post(TOKEN_URL, params, verify=False).content.decode('utf-8') return ast.literal_eval(x) # Redirection Endpoint # ステートと認可コードを受け取る。 # ステート検証後、トークンリクエストを実行する。 @app.get("/auth/callback") async def auth(request: Request, code: str, state: str) -> RedirectResponse: # State検証 if state != request.cookies.get("AUTH_STATE"): return {"error": "state_verification_failed"} return get_token(code) if __name__ == "__main__": uvicorn.run(app, port=8000, loop="asyncio") 動作確認 コンテナを起動する docker-compose up Keycloak Admin コンソールにアクセスする。 http://localhost:8080 ※ログイン情報はdocker-composeに記載 クライアント(Clients)を登録する。 ※各種登録情報は、Keycloak: Authorization Code Grant Exampleを参考にする。 テストユーザー(Users)を登録する。 コンテナを再起動する。 docker-compose down docker-compose build docker-compose up Keycloak Authorization Endpointリダイレクト用エンドポイントにアクセスする。 http://localhost:8000/auth/login ユーザー認証を行う。 FastAPI側のリダイレクトURIにリダイレクトされ、ブラウザに以下のようなトークンレスポンスJSONが表示される。 { "access_token": "...", "expires_in": 60, "refresh_expires_in": 1800, "refresh_token": "...", "token_type": "Bearer", "not-before-policy": 0, "session_state": "...", "scope": "profile email" } 参考情報 Keycloak: Authorization Code Grant Example fastapi-keycloak-oidc-auth
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python 桁の端数切り捨て処理

百円・千円の端数を切り捨て丸め処理を行いたい場合の方法 num = 14520 num // 1000 * 1000 14000
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[Python]FastAPIテスト方法 メモ

Python WebAPI開発用フレームワークFastAPIを使用して作成したAPIのテスト方法についてメモする。 大まかなテストコードの書き方 TestClient をインポートする。 TestClient を作成し、FastAPIに渡す。 test_ 始まりの名前の関数を作成する (pytest 仕様)。 TestClient オブジェクトを使用し、テスト用リクエストを記述する。 チェック内容の assert 文を記述する。 事前準備 pytestインストール pip install pytest コード テスト対象 main.py ユーザー情報の登録・取得を行うAPI from fastapi import FastAPI, Header, HTTPException from pydantic import BaseModel # 正常トークン correct_token = "correct_token" # 模擬ユーザーデータ dummy_user_db = { "abcde12345": {"id": "abcde12345", "name": "Yamada", "email_address": "yamada@example.com"}, "fghij67890": {"id": "fghij67890", "name": "Tanaka", "email_address": "tanaka@example.com"}, } app = FastAPI() class User(BaseModel): id: str name: str email_address: str # ユーザー情報取得API @app.get("/users/{user_id}", response_model=User) async def get_user(user_id: str, token: str = Header(...)): # トークン不正時 if token != correct_token: raise HTTPException( status_code=400, detail="token_verification_failed") # ユーザー非存在時 if user_id not in dummy_user_db: raise HTTPException(status_code=404, detail="user_not_found") return dummy_user_db[user_id] # ユーザー情報登録API @app.post("/users/", response_model=User) async def create_user(user: User, token: str = Header(...)): # トークン不正時 if token != correct_token: raise HTTPException( status_code=400, detail="token_verification_failed") # ユーザーID重複時 if user.id in dummy_user_db: raise HTTPException(status_code=400, detail="user_id_duplicated") dummy_user_db[user.id] = user return user テストコード 各APIの正常/異常系リクエスト時のレスポンスを検証する。 from fastapi.testclient import TestClient from main import app client = TestClient(app) # ユーザー情報取得API 正常系テスト def test_get_user_NR001(): response = client.get( "/users/abcde12345", headers={"token": "correct_token"}) # レスポンス検証 # ステータスコード assert response.status_code == 200 # レスポンスボディ assert response.json() == { "id": "abcde12345", "name": "Yamada", "email_address": "yamada@example.com", } # ユーザー情報取得API 異常系テスト # トークン違い def test_get_user_ABR001(): response = client.get("/users/abcde12345", headers={"token": "dummy_token"}) assert response.status_code == 400 assert response.json() == {"detail": "token_verification_failed"} # ユーザー情報取得API 異常系テスト # 存在しないユーザー指定 def test_get_user_ABR002(): response = client.get( "/users/hogefuga", headers={"token": "correct_token"}) assert response.status_code == 404 assert response.json() == {"detail": "user_not_found"} # ユーザー情報登録API 正常系テスト def test_create_user_NR001(): # 正常系リクエスト # ヘッダーとボディは辞書型で渡す。 response = client.post( "/users/", headers={"Token": "correct_token"}, json={"id": "klmno12345", "name": "Kobayashi", "email_address": "kobayashi@example.com"}, ) assert response.status_code == 200 assert response.json() == { "id": "klmno12345", "name": "Kobayashi", "email_address": "kobayashi@example.com", } # ユーザー情報登録API 異常系テスト # トークン違い def test_create_user_ABR001(): response = client.post( "/users/", headers={"Token": "dummy_token"}, json={"id": "klmno12345", "name": "Kobayashi", "email_address": "kobayashi@example.com"}, ) assert response.status_code == 400 assert response.json() == {"detail": "token_verification_failed"} # ユーザー情報登録API 異常系テスト # 登録済みユーザーID指定 def test_create_user_ABR002(): response = client.post( "/users/", headers={"Token": "correct_token"}, json={ "id": "abcde12345", "name": "Kobayashi", "email_address": "kobayashi@example.com" }, ) assert response.status_code == 400 assert response.json() == {"detail": "user_id_duplicated"} 動作確認 pytest ================================================================ test session starts ================================================================ platform win32 -- Python 3.8.3, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 rootdir: C:\Users\user\Documents\dev\vscode\ws_git\fastapi\test plugins: anyio-3.2.1, cov-2.11.1 collected 6 items test_main.py ...... [100%] ================================================================= 6 passed in 0.56s ================================================================= 参考情報 FastAPI - テスト
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

3D scatterのRuntimeWarningエラー

RuntimeWarning: invalid value encountered in sqrt 問題コード plt.subplot(111, projection='3d') plt.scatter(X, Y, Z) 出力 RuntimeWarning: invalid value encountered in sqrt scale = np.sqrt(self._sizes) * dpi / 72.0 * self._factor 解決策 解決コード fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.scatter(X, Y, Z) 全体コード import matplotlib.pyplot as plt from sklearn.datasets import make_swiss_roll X, t = make_swiss_roll(n_samples=1000) # RuntimeWarningエラー plt.subplot(111, projection='3d') plt.scatter(X[:,0], X[:,1], X[:,2], c=t) plt.show() # 成功 fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.scatter(X[:,0], X[:,1], X[:,2], c=t) plt.show()
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonでPriority R-Treeを使って近傍の道路を超高速検索

超高速な近傍道路探索の実行例 ディズニーランドを中心とした1辺が1kmの長方形に含まれる道路を抽出? クエリ実行時間0.01秒 道路の座標データは要らず,インデックスのみでいい場合? クエリ実行時間0.00008秒 はじめに 本記事では,空間インデックスなどに使用されるR-Treeの亜種で,最悪実行時間を保証した Priority R-Tree1(PR-Tree)を,Pythonから近傍道路探索の用途に使ってみます。 ライブラリは C++とPybind11で実装された python-prtree を用います。x86_64であれば,ビルド済みwheelがpypiに登録されているので以下でインストール可能です。M1 Mac等の場合は,cmakeとpybind11を事前にインストールする必要があります(ローカルでビルドが走るため)。また,R-Treeのライブラリであるrtreeも比較用にインストールします。 # {{ if aarch64 }} pip install cmake pybind11 # {{ endif }} pip install python-prtree tree 道路NWデータの準備 https://qiita.com/kkdd/items/1c39f87fe34c2fb406f3 を参考にOpenStreetMapから道路データのみを抽出します(リンク先では高速のみにフィルターされているためそれを外す)。抽出したファイル名を japan-way.osm.bf とします。 そして,https://qiita.com/kkdd/items/a900d2dd60e3215a1138 を参考に作成した以下のコードで道路NWデータだけを1つのdictにまとめてpickleで保存します。手元ではdata.pklは3GB程度になりましたので,ディスク・メモリ容量には気をつけてください2。 import osmium import pickle import copy from tqdm import tqdm ONEWAY = 1 BIDIRECTIONAL = 0 def wayDirection(oneway): if oneway in [None, "no", "false", "0", "reversible"]: return BIDIRECTIONAL if oneway in ["reverse", "-1"]: return -ONEWAY else: return ONEWAY class DataHandler(osmium.SimpleHandler): def __init__(self): super().__init__() self.d = {} self.d["nodes"] = {} self.d["ways"] = {ONEWAY: {}, BIDIRECTIONAL: {}} self.pbar = tqdm() def update(self): self.pbar.update(1) def node(self, n): self.d["nodes"][n.id] = (n.location.lon, n.location.lat) self.update() def way(self, w): nodes = [n.ref for n in w.nodes] oneway = wayDirection(w.tags.get("oneway")) if oneway == -ONEWAY: oneway = ONEWAY nodes = nodes[::-1] self.d["ways"][oneway][w.id] = { "nodes": nodes, "tags": {k: v for k, v in w.tags}, } self.update() if __name__ == "__main__": h = DataHandler() h.apply_file("japan-way.osm.pbf", locations=False) with open("data.pkl", "wb") as f: pickle.dump(h.d, f) data.pklは,以下のような構造になります。 { "nodes": { node_id: (lon, lat), ..., }, "ways": { ONEWAY: { way_id: { "nodes": [1, 2, 3, ...], "tags": {key1: value1, ...} }, }, BIDIRECTIONAL: ... } R-Tree/PR-Treeの構築と比較 せっかくなので,C++で実装されたlibspatialindexのPython Wrapperであるrtreeライブラリとの構築時間の比較・速度比較を行ってみたいと思います。なお,rtreeもbuildのwheelが手に入るので,Pythonだけで完結します。 Bounding Boxの作成 まずは各ライブラリに突っ込むデータを用意するために,抽出した道路NWデータから各道路(way_idごと)にBounding Box(BB)を作成していきます。BBは,(x_min, y_min, x_max, y_max)から決まる4点を頂点とする長方形です。 import pickle import numpy as np from tqdm import tqdm with open("data.pkl", "rb") as f: d = pickle.load(f) nodes = dict() for k, v in tqdm(d["nodes"].items()): nodes[k] = np.array(list(v), dtype=np.float64) nodes_boxes = np.array(list(nodes.values()), dtype=np.float64) ways = dict() tmp = d["ways"][0] | d["ways"][1] for k, v in tqdm(tmp.items()): try: minima = np.stack([nodes[n] for n in v["nodes"]]).min(axis=0) maxima = np.stack([nodes[n] for n in v["nodes"]]).max(axis=0) bb = np.hstack([minima, maxima]) ways[k] = bb except: continue idxes = np.array(list(ways.keys()), dtype=np.int64) boxes = np.array(list(ways.values()), dtype=np.float64) R-Tree/PR-Treeの構築 構築にかかった時間は,rtreeの場合は1時間程度,python-prtreeの一括構築の場合は20分程度でした。時間の都合上逐次挿入のpython-prtreeの実行時間はありませんが,小データではrtreeよりも高速に動作することは確認できました。 R-Tree import time import rtree p = rtree.index.Property() p.dat_extension = 'data' p.idx_extension = 'index' s = time.time() file_idx = rtree.index.Index('rtree', properties=p) for j, (i, v) in tqdm(enumerate(ways.items()), total=len(ways)): datum = { "nodes": tuple(tmp[i]["nodes"]), "polylines": tuple([tuple(nodes[n]) for n in tmp[i]["nodes"]]), "mbr": tuple(v.ravel()), "tags": tmp[i]["tags"], } # 逐次に挿入して構築していく file_idx.insert(i, boxes[j], obj=datum) print(time.time() - s) PR-Tree(一括で構築) import time from python_prtree import PRTree2D s = time.time() # ここで一気に構築する prtree = PRTree2D(idxes, boxes) for i, v in tqdm(ways.items()): datum = { "nodes": tuple(tmp[i]["nodes"]), "polylines": tuple([tuple(nodes[n]) for n in tmp[i]["nodes"]]), "mbr": tuple(v.ravel()), "tags": tmp[i]["tags"], } # python objectの設定は後で prtree.set_obj(i, datum) prtree.save("prtree.bin") print(time.time() - s) PR-Tree(逐次で構築) 推奨はされないですが,R-Tree likeな逐次構築も可能です。なお,rebuild3 すると最悪計算時間の保証がされます。 import time from python_prtree import PRTree2D s = time.time() prtree = PRTree2D() for j, (i, v) in tqdm(enumerate(ways.items()), total=len(ways)): datum = { "nodes": tuple(tmp[i]["nodes"]), "polylines": tuple([tuple(nodes[n]) for n in tmp[i]["nodes"]]), "mbr": tuple(v.ravel()), "tags": tmp[i]["tags"], } # 逐次に挿入して構築していく prtree.insert(i, boxes[j], obj=datum) # 最悪計算時間保障のため構築し直す prtree.rebuild() prtree.save("prtree.bin") print(time.time() - s) 近傍の道路を探索 検索速度を検証するために,ディズニーランドを中心とした1辺が1kmの長方形に含まれる道路を全て列挙して,foliumを使って可視化してみたいと思います。得られる結果はrtree/python-prtreeを用いた場合も同一で,以下のようになりました。 また,気になるクエリの実行時間ですが,rtreeの場合は0.03秒程度・python-prtreeの場合は0.01秒程度でした。 さらに,今回は使用していないですが,python-prtreeにはbatch_queryというメソッドがあり,数十万のクエリでもC++の中で並列に実行してくれて一括で結果を返してもらえるので便利そうです。 R-Tree import numpy as np import rtree import folium import time p = rtree.index.Property() p.dat_extension = 'data' p.idx_extension = 'index' file_idx = rtree.index.Index('rtree', properties = p) lat, lng = (35.6329, 139.8804) dx = (1./3600/27) # 1m dy = (1./3600/30) # 1m k = 1000 # m rect = (lng - k * dx, lat - k * dy, lng + k * dx, lat + k * dy) s = time.time() for _ in range(10): res = list(file_idx.intersection(rect, objects=True)) res = [r.object for r in res] print(time.time() - s) m = folium.Map(location=[lat, lng], zoom_start=14) folium.Marker([lat, lng], popup="here").add_to(m) locations = [ (rect[1], rect[0]), (rect[3], rect[0]), (rect[3], rect[2]), (rect[1], rect[2]), ] folium.Polygon( locations=locations, color="red", weight=10, fill=True, fill_opacity=0.3, ).add_to(m) for r in res: locations = [x[::-1] for x in r["polylines"]] popup = r["tags"] folium.PolyLine(locations=locations, popup=popup).add_to(m) m.save("rtree.html") PR-Tree import numpy as np from python_prtree import PRTree2D import folium import time prtree = PRTree2D("prtree.bin") print(prtree.n) lat, lng = (35.6329, 139.8804) dx = (1./3600/27) # 1m dy = (1./3600/30) # 1m k = 1000 # m rect = (lng - k * dx, lat - k * dy, lng + k * dx, lat + k * dy) s = time.time() for _ in range(10): res = prtree.query(rect, return_obj=True) print(time.time() - s) m = folium.Map(location=[lat, lng], zoom_start=14) folium.Marker([lat, lng], popup="here").add_to(m) locations = [ (rect[1], rect[0]), (rect[3], rect[0]), (rect[3], rect[2]), (rect[1], rect[2]), ] folium.Polygon( locations=locations, color="red", weight=10, fill=True, fill_opacity=0.3, ).add_to(m) for r in res: locations = [x[::-1] for x in r["polylines"]] popup = r["tags"] folium.PolyLine(locations=locations, popup=popup).add_to(m) m.save("prtree.html") まとめ R-Treeの亜種で最悪実行時間を保証した Priority R-Tree(PR-Tree)のライブラリであるpython-prtreeを使って,道路NWからR-Tree/PR-Treeの近傍道路探索を行ってみました(on M1 MacBook)。その結果,PR-Treeのライブラリであるpython-prtreeは,R-Treeのライブラリであるrtreeよりも構築・検索ともに3倍ほど高速に動作しました。 また,batch_queryというrtreeにはないメソッドもpython-prtreeにはあり,多くのクエリを投げる際にはさらに高速に動作しそうです。さらに,python-prtreeは,構築やバッチクエリの処理にマルチスレッド対応しているため,本検証で使用した4+4コアのM1よりも多くのコアがあるCPUを利用する際には更なる高速化が望めます。 最後になりますが,python-prtreeに関する改善要望等ありましたら,https://github.com/atksh/python_prtree/issues/new からissueを立てていただければ可能な範囲で対応しますので,ご意見/ご要望をお待ちしています。 R-TreeとPR-Treeの主な違いは,木構造の構築方法にあります。前者は逐次で木を拡張していきますが,後者は全データを所与としてkd-tree likeな構築方法を採用しています。 ↩ Macの場合にはswapを勝手に切ってくれるのでOOMにはならないですが ↩ このrebuildはBBの数が1億でも30秒程度で終わるので,オーバーヘッドはほぼないと考えて良いです。 ↩
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

正規分布の再生性を実際に確かめてみた

再生性とは (1) 確率分布$F$について、2つの独立な確率分布$X,Y$が$F$に従えば、それらの和 $$Z=X+Y$$ も常に$F$に従うとき、確率分布$F$は再現性をもつという (2) すなわち、確率分布$F$が再生性をもつとは、次が成立することである。 $$ X \sim F, Y \sim F, XとYは独立 \Rightarrow X+Y \sim F $$ このように再生性は定義されます。 正規分布の再生性 そもそも正規分布は再生性をもつのか(証明) 互いに独立な確率変数$X \sim N(μ_1,σ_1^2)$と$Y \sim N(μ_2,σ_2^2)$とおく。 $X$と$Y$の積率母関数は、それぞれ $$\begin{array}{l} m_{X}(t)=\mathrm{e}^{\mu_{1} t+\frac{\sigma_{1}^{2} t^{2}}{2}} \end{array}$$ $$\begin{array}{l} m_{Y}(t)=\mathrm{e}^{\mu_{2} t+\frac{\sigma_{2}^{2} t^{2}}{2}} \end{array}$$ したがって、$X+Y$の積率母関数は、 $$\begin{aligned} m_{X+Y}(t) &=m_{X}(t) m_{Y}(t)\\ &=\mathrm{e}^{\mu_{1}+\frac{\sigma_{1}^{2} t^{2}}{2}} \mathrm{e}^{\mu_{2} t+\frac{\sigma_{2}^{2} t^{2}}{2}}\\ &=\mathrm{e}^{\left(\mu_{1}+\mu_{2}\right) t+\frac{\left(\sigma_{1}{ }^{2}+\sigma_{2}^{2}\right) t^{2}}{2}} \end{aligned}$$ これは、$N(μ_1+μ_2,σ_1^2+σ_2^2)$の積率母関数であり、積率母関数の一意性から、$X+Y$の確率分布は$N(μ_1+μ_2,σ_1^2+σ_2^2)$といえる。 よって、正規分布は再生性をもつと言える。 という感じで証明できます。 正規分布が再生性をもつということは 上記の$X, Y$に関して、$X+Y \sim N(μ_1+μ_2,σ_1^2+σ_2^2)$に従うということである。 検証 試しに、$X \sim N(50,100)$, $Y \sim N(70,25)$で実験してみます。 グラフには、$X, Y, F(足した分布),T(理論値)$をプロットしました。 np.random.normal()は、平均と標準偏差が引数になるので、分散の平方根を入力とします。 import numpy as np import matplotlib.pyplot as plt x_mu, x_sigma = 50, 10 y_mu, y_sigma = 70, 5 X = np.random.normal(x_mu, x_sigma, 50000) # 平均50, 標準偏差10, 分散100 Y = np.random.normal(y_mu, y_sigma, 50000) # 平均70, 標準偏差5, 分散25 F = X + Y theoretical = np.random.normal(x_mu+y_mu, np.sqrt(x_sigma**2+y_sigma**2), 50000) plt.figure(figsize=(15,7)) plt.hist(X, bins=1000, label='X', alpha=0.7) plt.hist(Y, bins=1000, label='Y', alpha=0.7) plt.hist(F, bins=1000, label='F', alpha=0.5) plt.hist(theoretical, bins=1000, label='theretical', alpha=0.5) plt.legend() plt.savefig("dist.png") plt.show() それっぽくなりました。 一応値を変えてみて、再度検証してみます import numpy as np import matplotlib.pyplot as plt x_mu, x_sigma = 50, 10 y_mu, y_sigma = -100, 10 X = np.random.normal(x_mu, x_sigma, 50000) # 平均50, 標準偏差10, 分散100 Y = np.random.normal(y_mu, y_sigma, 50000) # 平均-100, 標準偏差10, 分散100 F = X + Y T = np.random.normal(x_mu+y_mu, np.sqrt(x_sigma**2+y_sigma**2), 50000) plt.figure(figsize=(15,7)) plt.hist(X, bins=1000, label='X', alpha=0.7) plt.hist(Y, bins=1000, label='Y', alpha=0.7) plt.hist(F, bins=1000, label='F', alpha=0.5) plt.hist(T, bins=1000, label='T', alpha=0.5) plt.legend() plt.savefig("dist.png") plt.show() 裾の広がりも二つの分布に比べて広がってるのがわかります。 参考 積率母関数の証明 正規分布の積率母関数
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

深層強化学習(A3C)を用いたシステムトレーディング

はじめに  近年、人工知能ブームにより、人工知能を使ったトレーディング手法が盛んである。そこで、今回は深層強化学習を用いたシステムトレーディングを実施した。  まず、基本的な深層強化学習を用いたトレーディングモデルである。agentの行動として、 BUY、HOLD、SELLの三つの内一つを選択する。環境の戻り値として、状態(今現在保有しているポジションの価格、市場価格、手持ちのキャッシュ)、報酬(手持ちのキャッシュの変化値(含む益も含む))、終了(取引の終了か否か)、情報(ターミナルにディスプレイする情報)を返す。 使用データについて トレンド傾向の掴みやすさから、yahoo financeからGSPCの日足を使用した。 訓練データの期間:2015/1/1 - 2017/6/30 テストデータの期間:2017/7/1 - 2021/1/1 以下ソースコード TD学習 TD学習(時間的差分学習: Temporal Difference Learning)とは、代表的な価値ベース手法一つである。逐次的にデータ更新ができるのが特徴。 以下、TD学習での状態価値の更新式。 \Delta V(s) = r + \gamma V(s_{t+1}) - V(s_t) \\ A2C1  A2C(Advantage Actor-Critic)とは、A3C(Asynchronous Advantage Actor-Critic)から、非同期の部分を抜いたものであり、A3Cと比べてGPUの負荷が低い。  Q学習で使用したQ値は、状態価値関数V(s)とアドバンテージ値A(s, a)の2つに分解できる。このアドバンテージ関数とは、ある状態において、ある行動が他の行動に比較しどの程度優れているかを表す。価値関数とは、その状態に優位度を表す。  つまり、Actorはアドバンテージ値を通しQ値を学ぶ。これによりある行動の評価は、その行動がどれだけ良いかだけでなく、どれだけ良くなるかにも基づいて行われる。アドバンテージ関数の利点は、政策ネットワークの高い分散を減らし、モデルを安定させる。 A(s_t,a_t) = Q(s_t,a_t) - V(s_t) \\ A3C2 A3Cは並列分散処理を行なってる。処理速度が早い。個別のスレッドが学習した重みはmasterBrainに蓄積される。新しい、学習が始まる時に、各スレッドがmasterBrainから重みをコピーする。 マルチスレッド マルチスレッドの特徴は以下の通り ・平行処理 ・GILロックを持っているスレッドのみ、実行可能、他のスレッドは待機する。 ・cpuのコア数に依存しない。 マルチプロセスの特徴は以下の通り ・並列処理 ・メモリーが共有されていないので、値の受け渡しが大変。 ・tensowflowだと、modelが動かないことがある。 ・cpuのコア一つにプロセスを割り当てする。 cpuバウンドとはcpu内で行なっている数値計算などの処理 ループ class Task: def __init__(self, name): self.name = name self.num = 0 self.sum = 0.0 def run(self): while True: sleep(1) if self.num == 3: break print('roop_name = ' + self.name + ' :num = '+ str(self.num) + ' :sum = '+ str(self.sum)) self.num += 1 self.sum += random.random() name = 'nomal-roop' start = time.time() for i in range(4): w = Task('roop_point_'+str(i)) w.run() end = time.time() - start arr.append("name:" + name + " process_time:{0}".format(end) + "[s]") roop_name = roop_point_0 :num = 0 :sum = 0.0 roop_name = roop_point_0 :num = 1 :sum = 0.642469181212962 roop_name = roop_point_0 :num = 2 :sum = 1.5964812171373977 roop_name = roop_point_1 :num = 0 :sum = 0.0 roop_name = roop_point_1 :num = 1 :sum = 0.8876820994429431 roop_name = roop_point_1 :num = 2 :sum = 1.627826300716026 roop_name = roop_point_2 :num = 0 :sum = 0.0 roop_name = roop_point_2 :num = 1 :sum = 0.03546302344611851 roop_name = roop_point_2 :num = 2 :sum = 1.0239282875765587 roop_name = roop_point_3 :num = 0 :sum = 0.0 roop_name = roop_point_3 :num = 1 :sum = 0.602393530385244 roop_name = roop_point_3 :num = 2 :sum = 1.555539488491399 マルチスレッド class Task: def __init__(self, name): self.name = name self.num = 0 self.sum = 0.0 def run(self): while True: sleep(1) if self.num == 3: break print('roop_name = ' + self.name + ' :num = ' + str(self.num) + ' :sum = ' + str(self.sum)) self.num += 1 self.sum += random.random() name = 'thread-pool' start = time.time() thread_num = 4 threads = [] for i in range(thread_num): threads.append(Task(name=f'thread_{i}')) datas = [] with ThreadPoolExecutor(max_workers = thread_num) as executor: for task in threads: job = lambda: task.run() datas.append(executor.submit(job)) end = time.time() - start arr.append("name:" + name + " process_time:{0}".format(end) + "[s]") roop_name = thread_0 :num = 0 :sum = 0.0 roop_name = thread_1 :num = 0 :sum = 0.0 roop_name = thread_2 :num = 0 :sum = 0.0 roop_name = thread_3 :num = 0 :sum = 0.0 roop_name = thread_0 :num = 1 :sum = 0.7829927782861958 roop_name = thread_2 :num = 1 :sum = 0.7264674393557742 roop_name = thread_1 :num = 1 :sum = 0.4721450639806136 roop_name = thread_3 :num = 1 :sum = 0.2746835685320669 roop_name = thread_0 :num = 2 :sum = 0.8189509274906515 roop_name = thread_1 :num = 2 :sum = 0.7522106668563098 roop_name = thread_2 :num = 2 :sum = 1.3346477522815392 roop_name = thread_3 :num = 2 :sum = 0.33216049073474685 損失関数3 損失関数(更新式)は、損失関数(Actor)と損失関数(Critic)に分解できる。 L = L_v + L_π 損失関数(Actor)と損失関数(Critic)は以下の式に分解できる。アドバンテージ値A(s, a)は、一般的なTD学習での状態価値の更新式より、時系列を加味した形式になっている。Hはエントロピー。 L_v = (A(s_t,a_t))^2 \\ L_π = -log(π(a_t,s_t))A(s_t,a_t) - \beta H(π(s_t)) \\ A(s_t,a_t) \approx TD error= \sum_{k=1}^{n-1} \gamma^k r_{t+k} + \gamma^n V(s_{t+n}) - V(s_t) \\ H(π(s_t)) = 1/2(log(2π \sigma ^2) + 1) \\ \beta:ハイパーパラメータ、\gamma:割引係数、\sigma:分散 期待収益 G_t = \sum_{t'=t}^{T} \gamma ^{t'-t}r_{t'} \\ r:報酬,\gamma :割引係数 期待収益とは、行動に対する収益の期待を示したものである。割引係数により、現在より離れた報酬は小さくなるようになる。 売買ルール 1.空売りは認めない 2.ポジションを持っている場合、追加注文を出せない。 3.最後のステップでポジションを全て売却する。 4.ポジションは全買い、全売り 5.所持金は1000000ドル 実装と結果 ソースコードはこちら 当然の事ながら、A2Cと同じパフォーマンスである。 ソースコードはこちら V. Mnih, A. P. Badia, M. Mirza, A. Graves, T. Lillicrap, T. Harley, D. Silver, and K. Kavukcuoglu, “Asynchronous methods for deep rein- forcement learning,” in International conference on machine learning, (2016), pp. 1928–1937. ↩ V. Mnih, A. P. Badia, M. Mirza, A. Graves, T. P. Lillicrap, T. Harley, D. Silver, K. Kavukcuoglu, "Asynchronous Methods for Deep Reinforcement Learning",(2016), pp. 1602–1783. ↩ S. Kuutti, R. Bowden, H. Joshi, R.D Temple, and S. Fallah, "End-to-end Reinforcement Learning for Autonomous Longitudinal Control Using Advantage Actor Critic with Temporal Context", IEEE Intelligent Transportation Systems Conference (ITSC),(2019) ↩
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

MinecraftサーバーをPythonから制御

MinecraftサーバーをPythonから制御 PythonからMinecraftサーバーを制御できるモジュールがあるらしい。 環境 Python3.9(だったっけ?) Windows 10 Pro Mcipc 2.3.3 server設定ファイルで enable-rcon=True にしておいてください いんすとーる pip3 install mcipc python3 -m pip3 install mcipc 前者でできなければ後者を試してみてください。 できなかったら「python パス 通し方」ググってください。 コードを書く。 サーバーを停止する まずはサーバーをストップさせることから。 from mcipc.rcon.je import Client with Client('127.0.0.1', 25575, passwd='minecraft') as client: log = client.stop() print(log) 補足 二行目のClient()はの引数は左からサーバーのIPアドレス、RCONクライアントのポート、 パスワードの順です。 log変数には実行結果が帰ってきます。 よく使うコマンドをまとめる from mcipc.rcon.je import Client while True: ans = input("") with Client('127.0.0.1', 25575, passwd='minecraft') as client: if ans == "1": log = client.stop() if ans == "2": log = client.kill("@e[type=player]") #[]の文字も同じ引数の中に入れる if ans == "3" log =client.effect.give("@p","speed") #client.effect("give","@p","speed")ではないことに注意 print(log) (工事中) djangoから制御できるようにする 参考 公式ドキュメント
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ライブラリ追加によるdockerイメージの作成(Python, tensorflow, sklearn)

目的 前回の記事でdockerをインストールしてベーシックな機械学習環境を構築することまで行いましたが、拾ってきたdockerイメージには必要なライブラリが入っていませんでした。そこで拾ってきたdockerイメージにライブラリを追加して新しいdockerイメージを作成することを行います。 実施内容概要 元となるdockerイメージ:tensorflow/tensorflow 追加するライブラリ:ScikitLearn、Pandas、imbalanced-learn 作成するdockerイメージ:my_ml 実施環境 MacBookAir  early 2020, RAM 8GB, CPU intel core i5 (非M1) OS: macOS Big Sur Ver. 11.4 実施内容 dockerの起動 docker desktopのインストールとtensorflowのdockerイメージのダウンロードが完了していれば、以下のコマンドをterminalに入力するだけでdockerを起動できます。 -d を追加して、バックグラウンドで動作してもらうことにしました。 --name TF を追加して、コンテナの名前をTFと固定しました。 % docker run -d -it --name TF -p 8081:8888 tensorflow/tensorflow:latest-jupyter 注意 2回目以降、上記コマンドではTFはすでにあるとのエラーが出ます。この時はdocker desktopの再生ボタンを押せば良いみたいです。 CLIの起動 docker desktopのCLIと書いてあるボタンをクリックするとコマンドラインが起動します。どうやらここがdockerコンテナ上のterminal相当の場所の様です。例えば、このコマンドライン上で「Python3」と入力するとPythonが起動します。(今は起動しません) pipによるライブラリのインストール 起動したコマンドライン上で以下のコマンドを実行してライブラリをインストールします。 $ pip install scikit-learn $ pip install pandas $ pip install imbalanced-learn インストールの確認 インストール完了後、起動しているコマンドライン上で以下を入力し、Python3を起動させます。 $ Python3 その後、起動したPython3にて以下を実行してエラーが出ないことを確かめます。 >>> import pandas >>> import sklearn >>> import imbalanced-learn その後一旦起動したCLIを終了させます。(terminalを左上のバッテンマークで消しておきます。) terminalの再起動とdockerイメージ化 terminalを再度起動して、以下のコマンドを入力します。これで新たなdockerイメージの作成が完了しました!! $ docker commit TF my_ml commitの後の第一引数で起動中のコンテナを指定(今はTF) 第二引数で作成するイメージの名前をつける(今回はmy_ml) イメージの名前には大文字は使えない様です。 $ docker images と入力すれば、my_mlと記載されたイメージが追加されていることが確認できるはずです。 結果 例えば、以下のコマンドをterminal上で実行すれば、 my_mlというdockerイメージから、 MLという名前のdockerコンテナを作成できます。 そしてこれにはすでに元にしたtensorflowはもちろん、pandas、Scikit-Learnとimbalanced-learnも含まれています! $ docker run -d -it --name ML -p 8088:8888 my_ml
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Macにおけるdocker構築。Pythonと機械学習に向けて。

記事の目的 Pythonによる機械学習環境として、以下の二つを実施できる環境をdocker上に構築しました。 ・データサイエンス100本ノックが実施できる環境 ・Tensorflowが使用できる環境 当方、本職はプログラマーではなく、pythonや機械学習は初学者です。本記事は他のページを参考にしまくっておりキュレーションサイトの様ですがご容赦ください。。。 実施環境 MacBookAir  early 2020, RAM 8GB, CPU intel core i5 (非M1) OS: macOS Big Sur Ver. 11.4 dockerについて dockerとは Wikiによると、「Dockerは、コンテナ仮想化を用いてアプリケーションを開発・配置・実行するためのオープンソースソフトウェアあるいはオープンプラットフォームである」とのことです。 コンテナ仮想化などの難しいキーワードがありますが、現時点では以下の目的を達成するための手段としています。 「Dockerは、パソコン(私の場合はMac)でPythonを使用して機械学習を実行するための環境」 *Pythonによらず、色々な言語の開発環境を整えられる様ですが、詳しくは分かりません・・・。 *anacondaなどPythonを使用するための環境は他にもありますが、dockerのメリットは以下の様です。 dockerのメリット 機械学習のフレームワーク用の環境を簡単に作成して、使い分けができる 例えば、TensorflowとPytorchなど機械学習用のフレームワークを動作させる環境を、docker(正確にはdockerコンテナ)上に個別に作成できます。 初心者目線ではとりあえずTensorflowを使用できる環境を簡単に作れることがメリットですが、上級者目線ではフレームワークのバージョンも含めて個別に環境構築ができるので、論文の検証などにも便利な様です。 開発環境を即座に構築し、配布できる。グループで開発する時に有効 初心者目線では、グループでアプリを作るとき、グループ内の一番詳しい人が作成してくれた環境を共有できる、といったところでしょうか。パソコン引越しの時にも役に立つはずです。近々パソコン買い替えを検討している方にも良いかと。 dockerコンテナ上で開発したアプリケーションを別の場所に移動させる際、コンテナイメージに変換することで環境をそのまま移動させることができる。(移動先の環境原因のバグを減らせる) 上のメリットと似てますね。Macからwindows、その逆も簡単にできる様です 開発環境を除去することも簡単なので、PCに不要なファイルが残ったりしない 初心者目線からこのメリットを理解するのは少々難しいのですが、要らないファイルが残らないのでパソコンがスッキリするという感じですかね・・・。 この他にもメリットはたくさんあるようです。詳しくは割愛します。 dockerのインストール 初心者にdockなんて必要?と思われるかもしれませんが、何事も試してみたい派なので、とりあえずインストールしました。参考にしたのは、以下のサイトのMacの箇所です。 Docker hubにサインイン DockerHubのサイトでユーザー登録をしておきます。 Docker Desktop on Macをインストール 以下のサイトにアクセスし、「Mac with intel chip」をクリックしてダウンロードし、 ダウンロード完了後に「docker.dmg」をダブルクリックしてインストールしました。 詳細は以下のサイトに記載してあります。 初回起動とチュートリアル 初回起動時はチュートリアルに沿って進めました。以下の画面です。 青い箇所をクリックするだけで、クローン、ビルド、ラン、シェアの順に進められます。 シェアのところで、先ほど作成したDocker hubに登録したユーザ名とパスワードを入力します。 もしかして、この初回起動時のチュートリアルは実行不要かも とりあえずdocker自体のインストールはこれで完了の様なので、次にデータサイエンス100本ノックの環境構築を行いました。 データサイエンス100本ノックが実施できる環境の構築 ここから先は以下のサイトを参考にしました。 Dockerセットアップまでは完了しているので、「Docker上で動かす」以降を実施しました。 初回起動 docker desktopが起動している状態で、以下のコマンドをteriminal上で実行します。 $ git clone https://github.com/The-Japan-DataScientist-Society/100knocks-preprocess $ cd 100knocks-preprocess $ docker-compose up -d --build 次に、safariなどのブラウザ上で以下にアクセスします。(terminalではありません!) http://localhost:8888 Jupyter Notebookが起動します。Jupyter notebook上からworkのフォルダにアクセスし、その中の「preprocess_knock_Python.ipynb」を起動すれば、データサイエンス100本ノックの環境構築完了でした! 再開 再開時は、docker desktopを起動して100knocks-preprocessをstartさせます。 そして、ブラウザから以下にアクセスすれば再開できました!簡単! http://localhost:8888 Tensorflowのライブラリが使用できる環境の構築 ここから先は以下のサイトを参考にしました。 Keras/Tensorflowのdockerイメージのダウンロード 以下のコマンドをMacのterminalに入力し、Jupyter付きの最新のtensorflowをダウンロードします。 $ docker pull tensorflow/tensorflow:latest-jupyter dockerとjupyter notebookの起動 次に以下のコマンドをterminalに入力してdockerを起動します。 $ docker run -it -p 8081:8888 tensorflow/tensorflow:latest-jupyter この時、tokenが発行されます。terminalの返答にtoken=xxxxxxxxxxxxxxxと長い文字と数字の列があると思いますので、コピーしておきます。 次に、ブラウザ上で以下にアクセスします。起動時にポート8001を指定してるので、100本ノックとは異なります。 http://localhost:8081/ ブラウザ上でtokenを聞かれますので、そして先ほどコピーしたtokenを入力してJypyter notebookが使用できるようになりました! Jypyter notebook上では右上の「New▼」をクリックしてPython3を選べば開始できました。 テスト Jypyter notebookで以下をテストで実行したところ、エラーは出なかったのでひとまず成功している様です! 入力 import tensorflow as tf print(tf.reduce_sum(tf.random.normal([1000, 1000]))) 出力 tf.Tensor(-138.44626, shape=(), dtype=float32) ライブラリ確認 普段使用しているライブラリに関して、この段階で使用可能か確認しました 入っている ・・・numpy、matplotlib 入っていない・・・Pandas、sklearn、imbalanced-learn 次のステップとしてdockerイメージにライブラリを追加していく方法を探してみます。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Discord読み上げbot自作ノウハウ⑤ オンラインステータス編

読み上げbotが今どのくらいのサーバーに参加していて、 今どのくらいのサーバーで読み上げをしているのか オンラインステータスに表示させてみましょう。 例えば、喋太郎は今接続しているサーバー数のみ、 Albotは参加しているサーバーの総数のみを表示させる仕様になっています。 今回は両方表示させるコードにしてありますので、好みに合わせて調整してください。 一番基本的なコードは以下になります。 presence = f'{len(client.voice_clients)}/{len(client.guilds)}サーバー' await client.change_presence(activity=discord.Game(name=presence)) あとは、上記のコードを以下のイベントで呼び出してあげれば良いだけです。 botが起動した時 サーバーに参加した時 サーバーから退出した時 ボイスチャンネルに参加した時 ボイスチャンネルから退出した時 #botが起動した時 @client.event async def on_ready(): presence = f'{len(client.voice_clients)}/{len(client.guilds)}サーバー' await client.change_presence(activity=discord.Game(name=presence)) #サーバーに参加した時 @client.event async def on_guild_join(guild): presence = f'{len(client.voice_clients)}/{len(client.guilds)}サーバー' await client.change_presence(activity=discord.Game(name=presence)) #サーバーから退出した時 @client.event async def on_guild_remove(guild): presence = f'{len(client.voice_clients)}/{len(client.guilds)}サーバー' await client.change_presence(activity=discord.Game(name=presence)) @client.event async def on_voice_state_update(member, before, after): if member.id == client.user.id: #ボイスチャンネルに参加した時 if before.channel is None: presence = f'{len(client.voice_clients)}/{len(client.guilds)}サーバー' await client.change_presence(activity=discord.Game(name=presence)) #ボイスチャンネルから退出した時 elif after.channel is None: presence = f'{len(client.voice_clients)}/{len(client.guilds)}サーバー' await client.change_presence(activity=discord.Game(name=presence))
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Z3Py 例題 ハノイの塔

問題 以下のルールに従って、すべての円盤を右端の杭に移動する。 3本の杭と、中央に穴の開いた大きさの異なる複数の円盤から構成される。 最初はすべての円盤が左端の杭に小さいものが上になるように順に積み重ねられている。 円盤を一回に一枚ずつどれかの杭に移動させることができるが、小さな円盤の上に大きな円盤を乗せることはできない。 回答 円盤が4つの場合 example_tower_of_hanoi.py from z3 import * import itertools disk_num = 4 peg = 3 # 存在する状態を列挙 (disk_4,disk_3,disk_2,disk_1) の形 states = itertools.product(range(peg), repeat=disk_num) dict_state = {} # 状態を辞書化 for i, state in enumerate(states): dict_state[i] = state # 状態の逆引き辞書作成 def inverse_dict(d): return {v: k for k, v in d.items()} inverse_dict_state = inverse_dict(dict_state) # 制約定義 t_max = 16 # 移動させる回数 T = [Int("T%s" % i) for i in range(t_max)] f = Function('f', IntSort(), IntSort(), BoolSort()) s = Solver() s.add([And(0 <= T[i], T[i] <= len(dict_state)-1) for i in range(len(T))]) def check_state_change(before, after): # 状態遷移が可能かどうかチェックする changed_disk = None for x in range(len(before)): # 一度に1枚だけしか移動しない。 if (before[x] - after[x]) != 0: if changed_disk != None: changed_disk = None break else: changed_disk = x else: if changed_disk != None: # 前状態において、一番上のDiskしか移動しない if (before[changed_disk] - before[x]) == 0: changed_disk = None break # 小さな円盤の上に大きな円盤を乗せることはできない if (after[changed_disk] - after[x]) == 0: changed_disk = None break if changed_disk != None: return True else: return False for i in range(len(dict_state)): for j in range(len(dict_state)): if check_state_change(dict_state[i], dict_state[j]): s.add(f(i, j) == True) else: s.add(f(i, j) == False) # 全ての遷移において、遷移可能な状態である。 for i in range(len(T)-1): s.add(f(T[i], T[i+1])) # 初期状態と終了状態を指定する。 s.add(T[0] == inverse_dict_state[(0, 0, 0, 0)]) s.add(T[-1] == inverse_dict_state[(2, 2, 2, 2)]) # 結果 print(s.check()) if s.check() == sat: # print(s.model()) m = s.model() for i in range(len(T)): print('T['+str(i)+']=' + str(dict_state[m[T[i]].as_long()])) 出力 sat T[0]=(0, 0, 0, 0) T[1]=(0, 0, 0, 1) T[2]=(0, 0, 2, 1) T[3]=(0, 0, 2, 2) T[4]=(0, 1, 2, 2) T[5]=(0, 1, 2, 0) T[6]=(0, 1, 1, 0) T[7]=(0, 1, 1, 1) T[8]=(2, 1, 1, 1) T[9]=(2, 1, 1, 2) T[10]=(2, 1, 0, 2) T[11]=(2, 1, 0, 0) T[12]=(2, 2, 0, 0) T[13]=(2, 2, 0, 1) T[14]=(2, 2, 2, 1) T[15]=(2, 2, 2, 2) 解説 存在できる状態は、各円盤の位置が決まれば1状態しかないことから、左から(一番大きな円盤の位置、二番目に大きな円盤の位置、、、、一番小さな円盤の位置)とした。 解く方針として、初期状態から最終状態への一連の遷移状態が成立していることを解くために、ある状態からすべての状態に対し遷移できるかできないかを定義した。 z3pyとは関係関係ないが、順列組み合わせを求めるのに、itertoolsを用いた。( 参考 ) m=s.model()で解を見るとき、数値に変換するには.as_long()を用いる。(Int()は使えない) 他の例題 Z3Py個人的ポータル へ 前の例題(グラフ彩色問題)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

pipでpsycopg2をインストールしようとするとCommand errored out with exit status 1とエラーが出る

DjangoでPostgresを使おうと思い、調べているとpsycopg2というパッケージが必要とのことで $ pip install psycopg2 とコマンドを打ちました。するとエラーが発生。 $ pip install psycopg2 Collecting psycopg2 Using cached psycopg2-2.9.1.tar.gz (379 kB) Using legacy 'setup.py install' for psycopg2, since package 'wheel' is not installed. Installing collected packages: psycopg2 Running setup.py install for psycopg2 ... error ERROR: Command errored out with exit status 1: command: /home/(username)/django/myvenv/bin/python -u -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-3167j200/psycopg2_ce00c2bb1d2342c1811e76887cb05c30/setup.py'"'"'; __file__='"'"'/tmp/pip-install-3167j200/psycopg2_ce00c2bb1d2342c1811e76887cb05c30/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-c1vq00ua/install-record.txt --single-version-externally-managed --compile --install-headers /home/(username)/django/myvenv/include/site/python3.8/psycopg2 cwd: /tmp/pip-install-3167j200/psycopg2_ce00c2bb1d2342c1811e76887cb05c30/ Complete output (38 lines): running install running build running build_py creating build creating build/lib.linux-x86_64-3.8 creating build/lib.linux-x86_64-3.8/psycopg2 copying lib/errorcodes.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/pool.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/errors.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/sql.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/tz.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/_range.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/extensions.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/__init__.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/extras.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/_ipaddress.py -> build/lib.linux-x86_64-3.8/psycopg2 copying lib/_json.py -> build/lib.linux-x86_64-3.8/psycopg2 running build_ext building 'psycopg2._psycopg' extension creating build/temp.linux-x86_64-3.8 creating build/temp.linux-x86_64-3.8/psycopg gcc -pthread -B /home/(username)/anaconda3/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -DPSYCOPG_VERSION=2.9.1 (dt dec pq3 ext lo64) -DPSYCOPG_DEBUG=1 -DPG_VERSION_NUM=120007 -DHAVE_LO64=1 -DPSYCOPG_DEBUG=1 -I/home/(username)/django/myvenv/include -I/home/(username)/anaconda3/include/python3.8 -I. -I/usr/include/postgresql -I/usr/include/postgresql/12/server -I/usr/include/libxml2 -I/usr/include/mit-krb5 -c psycopg/psycopgmodule.c -o build/temp.linux-x86_64-3.8/psycopg/psycopgmodule.o -Wdeclaration-after-statement In file included from psycopg/psycopgmodule.c:28: ./psycopg/psycopg.h:36:10: fatal error: libpq-fe.h: No such file or directory 36 | #include <libpq-fe.h> | ^~~~~~~~~~~~ compilation terminated. error: command 'gcc' failed with exit status 1 ---------------------------------------- ERROR: Command errored out with exit status 1: /home/(username)/django/myvenv/bin/python -u -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-3167j200/psycopg2_ce00c2bb1d2342c1811e76887cb05c30/setup.py'"'"'; __file__='"'"'/tmp/pip-install-3167j200/psycopg2_ce00c2bb1d2342c1811e76887cb05c30/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-c1vq00ua/install-record.txt --single-version-externally-managed --compile --install-headers /home/(username)/django/myvenv/include/site/python3.8/psycopg2 Check the logs for full command output. どうやらlibpq-fe.hってのが無いらしい。ということで調べていると以下の記事を見つけました。 https://stackoverflow.com/questions/56230187/no-such-file-error-libpq-fe-h-when-installing-postgresql-package-for-r-on-linux 最終的に sudo apt install libpq-dev で解決しました。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python3ではじめるシステムトレード:大偏差理論

R.S.Ellis Entropy, Large Deviations and Statistical Mechanicsを学ぶために、適当に関連する英語版のウィキメディアを翻訳(www.DeepL.com/Translator(無料版)) しメモとして残すことで、後々の見なおしに活用したいと思います。 Ellisのまとめ 確率論と統計力学の共通した1つのテーマはカオスの中で規則性を探すことである。大数の法則と中心極限定理を含む確率論ではいくつかの母数を用いて、確率的な系のふるまいを表現する。統計力学では各々の分子の間の複雑に絡み合った相互作用を表現する確率分布から物質のマクロな特性を導き出す。これらの2つの分野を結び付ける概念がエントロピー(多重度)だ。この用語は1865年にクラウジウスによりもたらされた。その後、ボルツマンが統計的なエントロピーの解釈をあたえた。それは有名な$S=k \log W$である。kはボルツマン定数、$W$は熱力学的確率である。確率過程と統計力学で系を分析する際には、系そのものを定義したミクロレベルでの表現から、マクロレベルへと展開していく。この2つのレベルを結び付けるのがエントロピーである。マクロ状態は多くのミクロ状態と整合性があり、かつマクロ状態は集合を成す。無限になるマクロ状態からエントロピー最大のものが実現し、観測可能となる。 コイントスを考える。コイントスの状態空間は$\Gamma=${0,1}である。{$X_j;j\in Z$}はベルヌーイ試行で連続したコイントスの結果である。、また$\rho=\rho_1 \delta_0+\rho_1 \delta_1$である。表は1,裏は0とする。試行は一定の時間の経過にそって行われ、無限の試行は分離している。公平なコインは$\rho=1/2 \delta_0+1/2 \delta_1$。このモデルは配位空間$\Omega=${0,1}$^Z$上の積測度$P_{\rho}$で表現される。マクロ的にはコインの動きは平均値で表現される。可能な平均値は$z\in(0,1)$の値であれば、いくつでもよい。これを$z \in [0,1]$の集合と呼ぶ。トスを$\omega_1,\cdots,\omega_n$とし、$n=1,\cdots$とし、$S_n(\omega)=\sum_{j=1}^nX_j(\omega)$とすると、平均値は$S_n/n$である。$(\omega_1,\cdots,\omega_n)$は配位である。この配位上の積測度は$P_{\rho}$である。$n$回の試行で$k$回表が出るとすると、表の出る確率は2項係数C(n,k)$\rho^k$で与えられる。 $C(n,k)=\frac{n!}{(n-k)}!$ 階乗(!)をスタンリーングの近似($\log(n!)=n\log(n)-n+\beta_n$を用いて、C(n,k)の対数を取ってnで割ると、 $\frac{1}{n}\log(C(n,k)\rho^n)=\log(\rho)-\frac{k}{n}\log(\frac{k}{n})-(1-\frac{k}{n})\log(1-\frac{k}{n})+O(\frac{\log n}{n})$ となる。k\n=z,$\rho=1/2$とすると右辺は $-z\log(2z)-(1-z)\log(21-z)+O(\log n/n)$ となる。 $I(z)=z\log(2z)+(1-z)\log(21-z)$ と置くと、これは凸関数となり、エントロピー関数、またはレート関数と呼ばれる。 $Q_n$を$S_n/n$の分布とすると $Q_n${$A_{z,\epsilon}$}$\approx \exp(-nI(z))$ となる。 これらの結果をPythonで表現してみる。エントロピー関数を青、確率$Q$を橙で示した。zはz$\in[0,1]$で等間隔の変数として発生させた。 def Iz_2(z): return z*np.log(2*z)+(1-z)*np.log(2*(1-z)) def Qz_2(n,z): return np.exp(-n*Iz_2(z)) n=10 z=np.arange(1,n)/n plt.plot(z,Iz_2(z)) plt.plot(z,Qz_2(n,z)) nの数を10,50,1000,10000000と変化させた。zがnが大きくなるに従い1/2に収束していくのが分かる。これは、データから経験分布を算出し、その分布がどの程度収束しているかを見ることができる。 for n in (10,50,1000,10000000): z=np.arange(1,1000)/1000 plt.plot(z,Qz_2(n,z),label=n) plt.legend() plt.plot(z,Iz_2(z)) plt.show() つぎに乱数を発生させ、実現値と理論値の差を見てみよう。ここではベルヌーイ試行として乱数をは発生する。それを100個に分類し、そのエントロピー関数と確率を算出する。x軸はz$\in[0,1]$である。グリーンが経験分布、青が確率、橙がエントロピー関数である。 from scipy.stats import bernoulli p=0.5 mk=[] n=15 nn=10000 for i in range(nn): r = bernoulli.rvs(p, size=n) mk.append(sum(r)/n) a=np.histogram(mk,bins=100,density=False) y=np.array(a[:][0].tolist()) y=y/y[np.argmax(y)] x=np.array((a[:][1]).tolist()) plt.plot(x,Qz_2(n,x)) plt.plot(x,Iz_2(x)) plt.plot(x[1:],y) n=100000 nn=100000で 参考にしたウィキのページ 大偏差原理(Large Deviation Theory) 測度(Measure) 完全測度(Complete measure) 可分集合(Separable space) 経験測度(Empirical measure) 開集合(Open set) σ-代数(σ-algebra) 計量(metric) 空間(space) 積位相(Product topology) ディラック測度(Dirac measure) 大偏差原理(Large Deviation Theory)te 確率論において、大偏差の理論は、列の確率分布のテールの漸近的な振る舞いに関するものである。この理論の基本的な考え方はラプラスにまで遡ることができる。定式化は保険数理、すなわちクラメルとルンドバーグによる保険の破産理論から始まる。大偏差理論の統一的な定式化は、1966年にVaradhanの論文で行われた。大偏差理論は、測度の集中というヒューリスティックな考え方を公式化したもので、確率測度の収束という概念を広く一般化したものである。 大まかに言えば、大偏差理論は、ある種の極端なイベントやテールイベントの確率測度が指数関数的に減少することに関係している。 応用 大偏差の原理では、確率モデルから効果的に情報を得ることができる。そのため、情報理論やリスク管理に応用される。物理学では、熱力学や統計力学(エントロピーと速度関数の関係)に大偏差理論が応用されている。 大偏差とエントロピー 統計力学では、レート関数はエントロピーと関係がある。これは次のようにヒューリスティックに見ることができる。統計力学では、特定のマクロ状態のエントロピーは、そのマクロ状態に対応するミクロ状態の数とを結び付けている。コイン投げの例では、平均値$M_{N}$が特定のマクロ状態を指定する。そして、ある特定の値を生み出す表と裏の特定の列が、特定のミクロ状態を構成する。大雑把に言えば、ミクロ状態の数が多いマクロ状態は、高いエントロピーを持つ。そして、エントロピーが大きい状態は、実際の実験で実現される。平均値が1/2(表の数と裏の数が同じ)のマクロ状態は、それを生み出すミクロ状態の数が最も多く、まさにエントロピーが最も高い状態である。そして、ほとんどの実用的な状況では、このマクロ状態を大量の試行で実際に得ることができる。一方、「レート関数」は、特定のマクロ状態が出現する確率を測定する。レート関数が小さければ小さいほど、あるマクロ状態が出現する確率は高くなる。コイン投げでは、平均値が1/2の時の「レート関数」の値は0となる。このように、「レート関数」は「エントロピー」のマイナスと見ることができる。 大偏差理論の「レート関数」とカルバック・ライブラー情報量の関係は、Sanovの定理によって明確にされた。 測度(Measure) 数学では、ある集合の部分集合に数を割り当てる体系的な方法を「集合の測度」といい、直感的には部分集合の大きさと解釈される。このような数値を割り当てることができる集合を、可測集合と呼ぶ。この意味で、測度とは、長さ、面積、体積の概念を一般化したものである。特に重要な例として、ユークリッド空間のルベーグ測度がある。これは、通常の長さ、面積、体積を、与えられたユークリッド空間のある部分集合に割り当てるものだ。例えば、実数の区間についてのルベーグ測度は、通常は長さであるが、区間の長さと矛盾しない形で、他の種類の集合に数字を割り当てることができる。 技術的には、測度は、集合Xの部分集合に、非負の実数または+∞を割り当てる関数である。測度はさらに可算加法性(countably additive)でなければならない。「大きな」部分集合が、可測な有限個(または可算無限個)の「より小さな」素部分集合(disjoint subset)に分解できる場合、「大きな」部分集合は可測であり、その測度は、「より小さな」部分集合の測度の和となる。 一般に、測度の他の公理(axioms)を満たしながら、与えられた集合のすべての部分集合に一貫した大きさを関連付けたい場合、数え上げ測度(counting measure)のような例ぐらいしか見あたらない。この問題は、σ-代数を形成するのに必要な、すべての部分集合の部分的な集まり、いわゆる部分集合が可測(measurable subset)である場合に対してのみ測度を定義することで解決された。これは、可測部分集合の可算和集合(countable union)、可算積集合(countable intersection)、および可測な部分集合の補集合が可測であることを意味する。ルベーグ測度が一貫して定義できないユークリッド空間の非可測集合は、その補体との混ざり具合が悪いという意味で、必然的に複雑になる。 実際、その存在は選択の公理が自明でなくなる。 測度の主な応用例としては、ルベーグ積分の基礎、アンドレイ・コルモゴロフによる確率論の公理化、エルゴード理論などがある。積分理論では、測度を指定することで、ユークリッド空間の部分集合よりも一般的な空間上で積分を定義することができる。さらに、ユークリッド空間上のルベーグ測度に関する積分は、その前身であるリーマン積分よりも一般的で、豊かな理論を持っている。確率論では、全集合に1の大きさを与える測度を考え、測度可能な部分集合を測度によって確率が与えられる事象と考える。エルゴード理論では、力学系の下で不変であるか、または力学系から自然に生じる測度を考える。 測度とは、AがBの部分集合であるとき、Aの測度はBの測度以下であるという意味で、単調であるという性質を持つ。また、空集合の測度は0であることが求められる。 測度μの可算加法性。加算疎集合(countable disjoint union)の測度は、各部分集合のすべての測度の合計と同じである。 完全測度(Complete measure) 数学では、完全測度(正確には完全測度空間)とは、すべての空集合のすべての部分集合が測定可能(測度ゼロを持つ)な測度空間のことをいう。より正式には、測度空間(X, Σ, μ)が完全であるのは、次の場合に限られる。 可分集合(Separable space) 数学では、位相空間(Topological space)が数えられるほどの密な部分集合を含む場合、分離可能と呼ばれる。すなわち、空間のすべての空でない開部分集合が、少なくとも1つの要素を含むような、空間の要素の列{$x_n$}$_{n=1}^\infty$が存在する。 可算性の他の公理と同様に、分離可能性は「サイズの制限」であり、必ずしもカーディナリティの観点からではなく(ただし、Hausdorff公理の存在下では、これは事実。特に、イメージがHausdorff空間の部分集合である分離可能な空間上のすべての連続関数は、数えられる密な部分集合上のその値によって決定される。 経験測度(Empirical measure) 確率論において、経験測度とは、(通常は有限の)確率変数の列の特定の実現値から生じる確率的測度である。 経験測度を研究する動機は、真の基礎的な確率測度を知ることはしばしば不可能だからである。観測値を集め、相対度数を計算する。経験的尺度または経験的分布関数によって、それぞれP、または関連する分布関数Fを推定することができる。これらはある条件のもとで一様に良い推定値となる。経験過程の分野における定理は、この収束の率を提供する。 開集合(Open set) 例 青い円は、$x^2 + y^2 = r^2$を満たす点(x, y)の集合を表す。赤い円盤は、$x^2 + y^2 < r^2$を満たす点(x, y)の集合を表す。赤色の集合は開集合、青色の集合はその境界集合、赤色と青色の集合の和は閉集合である。 σ-代数(σ-algebra) 数学解析や確率論において、集合Xのσ代数(σ場ともいう)とは、Xの部分集合の集まり$\Sigma$であり、X自身を含み、補集合の下で閉じており、可算和集合の下で閉じている。この定義は、空の部分集合も含み、可算積集合の下で閉じていることを意味する。 このような対$(X,\Sigma)$を可測空間またはボレル空間と呼ぶ。 σ-代数は集合の代数の一種である。σ代数の主な用途は測度の定義であり,具体的には,ある測度が定義されている部分集合の集まりは,必然的にσ代数となる.この概念は、ルベーグ積分の基礎となる数理解析や、確率を割り当てることができる事象の集まりと解釈される確率論において重要である。また、確率論では、条件付き期待値の定義にσ-代数が重要な役割を果たしている。 統計学では、十分統計量の正式な数学的定義に(副)σ-代数が必要であり、特に統計量が関数や確率過程であり、条件付き密度の概念が適用できない場合に必要である。 $X=${a,b,c,d}とすると、X上のσ代数は、$\Sigma =\varnothing$ ,{a,b},{c,d},{a,b,c,d}であり、$\varnothing$は空集合である。一般に、有限代数は常にσ代数である。{$ A_{1},A_{2},A_{3},\dots$} がXの可算な分割であるとすると、分割内のすべての集合の組合(空集合を含む)のコレクションはσ代数である。 より有用な例としては、すべての開区間から始めて、すべての可算回の合併、可算回の交叉、および相対補集合を加え、関連する閉合特性が得られるまでこのプロセスを続けることによって形成される実線上の部分集合の集合がある。このプロセスによって生成されるσ代数は実線上のボレル代数として知られており、最小の(すなわち「最も粗い」)代数と考えることもできる。 このプロセスによって生成されるσ代数は実線上のボレル代数として知られており、すべての開集合を含む、あるいは同等にすべての閉集合を含む最小の(すなわち「最も粗い」)σ代数と考えることもできる。また、ボレル階層と呼ばれる関連した構造は、記述的集合論に関連する。 計量(metric) 数学の世界では、計量(metric)、または距離関数は集合の対となる点の間の距離を与える関数である。計量(metric)を持つ集合を距離空間と呼ぶ。計量(metric)は集合に位相(tpology)を誘導するが、すべての位相が距離で生成できるわけではない。位相が計量(metric)で記述できる位相空間を距離化可能(metrizable)という。 微分幾何学では、微分可能な多様体の接線ベクトルからスカラー上に定義される双線型形式である計量テンソルが重要な情報源となる。計量テンソルは、曲線に沿った距離を積分によって求めることができ、それによって計量(metric)を決定することができる。 タクシーメトリックと平面上のユークリッド距離を比較した図。タクシーメトリックによると、赤、黄、青の経路は同じ長さ(12)。ユークリッド距離では、緑の道の長さは6$\sqrt {2}\approx 8.49$であり、唯一の最短の道。 空間(space) 可測空間、測度空間、確率空間 距離や角度を捨て、(幾何学的な体の)体積を残すと、測度理論にたどり着く。体積の他に、測度は面積、長さ、質量(または電荷)分布の概念を一般化し、さらに、アンドレイ・コルモゴロフの確率論のアプローチによれば、確率分布の概念もある。 古典数学の「幾何学的体」は、単なる点の集合よりもはるかに規則的である。体の境界は体積がゼロである。したがって、体の体積はその内部の体積であり、内部は無限の立方体の列によって尽きることができる。これに対して、任意の点の集合の境界は、ゼロではない体積を持つことができる(例:ある立方体の内側にあるすべての有理点の集合)。測度論は、体積の概念を膨大な種類の集合、いわゆる可測集合に拡張することに成功した。実際、非可測集合は、応用上ほとんど存在しない。 可測集合は、可測空間に定義によって与えられ、可測な関数や写像につながる。位相空間を可測空間にするためには、その空間にσ代数を与える必要がある。ボレル集合のσ-代数が最も一般的であるが、唯一の選択肢ではない。すべてのσ代数がある位相のボレルσ代数になるわけではない。実際、σ代数は位相に関係なく、与えられた集合(または関数)の集まりによって生成される。可測空間のすべての部分集合は,それ自体が可測空間である. 標準可測空間(標準ボレル空間とも呼ばれる)は、コンパクト空間との類似性から特に有用である。標準可測空間の間の双射的な可測写像はすべて同型であり、つまり逆の写像も可測である。また、このような空間間の写像は、そのグラフが積空間において可測である場合にのみ、可測である。同様に、コンパクトな計量空間間の双射的な連続写像はすべて同型であり、すなわち逆写像も連続である。また、このような空間間の写像は、そのグラフが積空間で閉じている場合に限り、連続的である。 ユークリッド空間(より一般的には、完全分離可能な計量空間)におけるすべてのボレル集合は、ボレルσ代数が付与されており、標準可測空間である。すべての数えられない標準可測空間は相互に同型である。 測度空間とは、可測空間に測度を付与したものだ。ルベーグ測度を持つユークリッド空間は測度空間である。積分理論は、測度空間上の可測関数の積分可能性と積分値を定義する。 測度0の集合はnull集合と呼ばれ、無視できるものである。したがって、「mod 0 同型」とは、完全測度の部分集合(つまり無視できる補集合)同士の同型と定義される。 確率空間とは、空間全体の測度が1になるような測度空間であり、確率空間の族(有限であるか否かを問わない)の積が確率空間となる。一方、測度空間一般では、有限個の空間の積しか定義されない。したがって、無限次元の確率測度(特に、ガウス測度)はたくさんあるが、無限次元のルベーグ測度はない。 特に、標準確率空間が有用である。標準確率空間では、条件付き期待値は条件付き測度に対する積分として扱うことができる。2つの標準確率空間が与えられると、それらの測度代数の同型はすべて、ある測度保存写像によって誘導される。標準可測空間上のすべての確率測度は、標準確率空間につながる。標準確率空間の列(有限であるか否かを問わない)の積は標準確率空間である。すべての非原子標準確率空間はmod 0で相互に同型であり、そのうちの1つはルベーグ測度を持つ区間(0,1)である。 これらの空間はあまり幾何学的ではない。特に、他のすべての空間に(何らかの形で)適用される次元の考え方は、可測空間、測度空間、確率空間には適用されない。 積位相(Product topology) 位相幾何学(topology)やそれに関連する数学の分野では、積空間(product space)とは、積位相(Product topology)と呼ばれる自然な位相を備えた位相空間の族の直積(Cartesian product)である。この位相は、箱型位相と呼ばれる、より明白な位相とは異なる。箱型位相(box topology)も積空間に与えることができ、積が有限個の空間のみに及ぶ場合には積位相と一致する。しかし、箱型位相が細かすぎるのに対し、積空間をその因子の圏論的積(Categorical product )にするという点で、積位相は「正しい」ものであり、その意味で積位相は直積の自然な位相である。 ディラック測度(Dirac measure) 数学では、ある集合が固定要素xを含むかどうかだけでその集合の大きさを決める「ディラック測度」がある。物理学などの技術分野で重要なツールであるディラック・デルタ関数の考え方を形式化したものである。 定義 Dirac測度とは、集合X(Xの部分集合の任意のσ代数を含む)上の測度δxであり、与えられたx∈Xと任意の(測定可能な)集合A⊆Xに対して次のように定義される。 ここで、1AはAの指標関数である。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

MISIAの国歌独唱が最高だったのでその時のツイートをWord Cloudでまとめてみた。

 この間のオリンピック開会式のMISIAの国歌独唱が最高だったので、Twitter上でどんな反応があったのかWordCloudでまとめてみました。  今回使用する主なライブラリはtweepy, janome, WordCloudになります。  tweepyを使用するのにはAPIキー、トークンが必要になるので、以下の記事を参考にして取得してください。  2021年度版 Twitter API利用申請の例文からAPIキーの取得まで詳しく解説 ライブラリのインポート import re import pandas as pd import tweepy from janome.tokenizer import Tokenizer import emoji from wordcloud import WordCloud tweepyの認証設定 api_key = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" api_key_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" access_token= "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" access_token_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" auth = tweepy.OAuthHandler(api_key, api_key_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) tweetの取得 tweepy.Cursorでは一度で最大100件のツイートを取得可能で、Twitter APIでは15分間に180回までの制限があるので、取得できるツイートは最大でも15分で1万8千件になります。 今回の場合、開会式でのMISIAの歌唱の時間が2021/7/23 20:18~20:20ぐらいだったので、その時間を指定しています。 (※今回は指定時刻の2分間で約1万5千件のツイートがあります。さすがオリンピック開会式・・・) q = "MISIA -filter:retweets" #検索ワード:MISIA リツイートは含まない since = "2021-07-23_20:18:00_JST" #指定開始時刻(日本時間) until = "2021-07-23_20:20:00_JST" #指定終了時刻(日本時間) tweet_data = [] for tweet in tweepy.Cursor(api.search, q=q, result_type='recent', lang='ja', tweet_mode='compat', since=since, until=until, count=100).items(17900): tweet_data.append([tweet.text.replace('\n','')]) tweetの整形 t = Tokenizer() words = [] for text in tweet_data: text = str(text) # ユーザー名削除 text = re.sub(r'@[0-9a-zA-Z_:]*', "", text) # ハッシュタグ削除 text = re.sub(r'#.*', "", text) # URL削除 text = re.sub(r'(https?)(:\/\/[-_.!~*\'()a-zA-Z0-9;\/?:\@&=+\$,%#]+)', "", text) #記号の削除 text=re.sub(r'[!-~]', "", text) text=re.sub(r'[︰-@]', "", text) #改行の削除 text=re.sub('\n', " ", text) tokens = t.tokenize(text) for token in tokens: pos = token.part_of_speech.split(',')[0] if pos == '名詞': words.append(token.surface) elif pos == "動詞": words.append(token.base_form) elif pos == "形容詞": words.append(token.base_form) text = ' '.join(words) wordcloudで可視化(※文字の色はMISIAの衣装に合わせてレインボー) 出力された画像や前のブロックのwordsをsort_valuesして表示しない単語を設定するといいと思います。 #表示しない単語の指定 stop_words = ["MISIA", "さん", "てる", "くる", "する", "なる", "そう", "れる", "ない", "ある", "ーー", "ここ","しまう", "える", "よう", "いう", "っぽい", "ちゃん", "ぇな"] wordcloud = WordCloud( background_color="white", #背景:白 colormap = 'rainbow', #文字のカラーマップ:レインボー collocations = False, #単語を連結するか font_path='C:/Windows/Fonts/YuGothM.ttc', #使用するフォント width=600, height=400, #画像サイズ min_font_size=8, #一番小さいフォントサイズ font_step=2, #フォントサイズの間隔 stopwords=set(stop_words) ).generate(text) wordcloud.to_file('./wordcloud.png') 圧倒的な「かき氷」の多さ・・・ 参考記事 TwitterAPIの上限までTweetを取得する|つぶやきをWordcloudで可視化①
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

MISIAの国歌独唱に感動したのでその時のツイートをWord Cloudでまとめてみた

 この間のオリンピック開会式のMISIAの国歌独唱が最高だったので、Twitter上でどんな反応があったのかWordCloudでまとめてみました。  今回使用する主なライブラリはtweepy, janome, WordCloudになります。  tweepyを使用するのにはAPIキー、トークンが必要になるので、以下の記事を参考にして取得してください。  2021年度版 Twitter API利用申請の例文からAPIキーの取得まで詳しく解説 ライブラリのインポート import re import pandas as pd import tweepy from janome.tokenizer import Tokenizer import emoji from wordcloud import WordCloud tweepyの認証設定 api_key = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" api_key_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" access_token= "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" access_token_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" auth = tweepy.OAuthHandler(api_key, api_key_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) tweetの取得 tweepy.Cursorでは一度で最大100件のツイートを取得可能で、Twitter APIでは15分間に180回までの制限があるので、取得できるツイートは最大でも15分で1万8千件になります。 今回の場合、開会式でのMISIAの歌唱の時間が2021/7/23 20:18~20:20ぐらいだったので、その時間を指定しています。 (※今回は指定時刻の2分間で約1万5千件のツイートがあります。さすがオリンピック開会式・・・) q = "MISIA -filter:retweets" #検索ワード:MISIA リツイートは含まない since = "2021-07-23_20:18:00_JST" #指定開始時刻(日本時間) until = "2021-07-23_20:20:00_JST" #指定終了時刻(日本時間) tweet_data = [] for tweet in tweepy.Cursor(api.search, q=q, result_type='recent', lang='ja', tweet_mode='compat', since=since, until=until, count=100).items(17900): tweet_data.append([tweet.text.replace('\n','')]) tweetの整形 t = Tokenizer() words = [] for text in tweet_data: text = str(text) # ユーザー名削除 text = re.sub(r'@[0-9a-zA-Z_:]*', "", text) # ハッシュタグ削除 text = re.sub(r'#.*', "", text) # URL削除 text = re.sub(r'(https?)(:\/\/[-_.!~*\'()a-zA-Z0-9;\/?:\@&=+\$,%#]+)', "", text) #記号の削除 text=re.sub(r'[!-~]', "", text) text=re.sub(r'[︰-@]', "", text) #改行の削除 text=re.sub('\n', " ", text) tokens = t.tokenize(text) for token in tokens: pos = token.part_of_speech.split(',')[0] if pos == '名詞': words.append(token.surface) elif pos == "動詞": words.append(token.base_form) elif pos == "形容詞": words.append(token.base_form) text = ' '.join(words) wordcloudで可視化(※文字の色はMISIAの衣装に合わせてレインボー) 出力された画像や前のブロックのwordsをsort_valuesして表示しない単語を設定するといいと思います。 #表示しない単語の指定 stop_words = ["MISIA", "さん", "てる", "くる", "する", "なる", "そう", "れる", "ない", "ある", "ーー", "ここ","しまう", "える", "よう", "いう", "っぽい", "ちゃん", "ぇな"] wordcloud = WordCloud( background_color="white", #背景:白 colormap = 'rainbow', #文字のカラーマップ:レインボー collocations = False, #単語を連結するか font_path='C:/Windows/Fonts/YuGothM.ttc', #使用するフォント width=600, height=400, #画像サイズ min_font_size=8, #一番小さいフォントサイズ font_step=2, #フォントサイズの間隔 stopwords=set(stop_words) ).generate(text) wordcloud.to_file('./wordcloud.png') 圧倒的な「かき氷」の多さ・・・ 参考記事 TwitterAPIの上限までTweetを取得する|つぶやきをWordcloudで可視化①
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ubuntu環境へのpython開発環境構築 参照サイト

以下のサイトを参照して、自分のマシン(Ubuntu 20.04 LTS)にpython開発環境を構築した。 1. UbuntuにAnacondaをインストールしてPythonとJupyter Notebookを動かすまでの手順 2. ubuntu環境でのanaconda3のインストール / condaが使えないとき 1のサイトより、anacondaをインストールすると、プロジェクトごとに異なるバージョンのpythonをインストールして使用できるよう「仮想環境」を構築することができる模様。 例えば、「このディレクトリではpython3.5、別のディレクトリではpython3.6使いたい」ときなどがある場合に、作成した仮想環境を切り替えることで同一のパソコン上でも異なるpythonのバージョンやライブラリを使用できるとのこと 仮想環境の有効化に戸惑ったため、以下のサイトを参照した conda activate の CommandNotFoundError への対処方法 jupyter notebookを起動して、pandasをimportしたところ、下記のようなエラーが出た。 ModuleNotFoundError: No module named ‘pandas’ このエラーの対処には下記のサイトを参照した Anaconda の Jupyter Notebook で ModuleNotFoundError が発生した場合のトラブルシュート 今回はpandasをターミナルからインポートした。 pandasのインストール場所を確認したところ、以下の場所に存在していた。 $find -name pandas ./anaconda3/pkgs/pandas-1.1.5-py36ha9443f7_0/lib/python3.6/site-packages/pandas ./anaconda3/pkgs/pandas-1.2.4-py38h2531618_0/lib/python3.8/site-packages/pandas ./anaconda3/pkgs/astropy-4.2.1-py38h27cfd23_1/lib/python3.8/site-packages/astropy/io/misc/pandas ./anaconda3/envs/my_env/lib/python3.6/site-packages/pandas ./anaconda3/lib/python3.8/site-packages/pandas ./anaconda3/lib/python3.8/site-packages/astropy/io/misc/pandas 確かに仮想環境であるmy_envにpandasパッケージがインポートされていることが確認できた。juypter notebookを起動して、改めてpandasをインポートしてみると、インポートができた。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

タイタニックデータを使って探索的データ分析を行うまで

ubuntu環境へのpython開発環境構築 以下のサイトを参照して、自分のマシン(Ubuntu 20.04 LTS)にpython開発環境を構築した。 1. UbuntuにAnacondaをインストールしてPythonとJupyter Notebookを動かすまでの手順 2. ubuntu環境でのanaconda3のインストール / condaが使えないとき 1のサイトより、anacondaをインストールすると、プロジェクトごとに異なるバージョンのpythonをインストールして使用できるよう「仮想環境」を構築することができる模様。 例えば、「このディレクトリではpython3.5、別のディレクトリではpython3.6使いたい」ときなどがある場合に、作成した仮想環境を切り替えることで同一のパソコン上でも異なるpythonのバージョンやライブラリを使用できるとのこと 仮想環境の有効化に戸惑ったため、以下のサイトを参照した conda activate の CommandNotFoundError への対処方法 jupyter notebookを起動して、pandasをimportしたところ、下記のようなエラーが出た。 ModuleNotFoundError: No module named ‘pandas’ このエラーの対処には下記のサイトを参照した Anaconda の Jupyter Notebook で ModuleNotFoundError が発生した場合のトラブルシュート 今回はpandasをターミナルからインポートした。 pandasのインストール場所を確認したところ、以下の場所に存在していた。 $find -name pandas ./anaconda3/pkgs/pandas-1.1.5-py36ha9443f7_0/lib/python3.6/site-packages/pandas ./anaconda3/pkgs/pandas-1.2.4-py38h2531618_0/lib/python3.8/site-packages/pandas ./anaconda3/pkgs/astropy-4.2.1-py38h27cfd23_1/lib/python3.8/site-packages/astropy/io/misc/pandas ./anaconda3/envs/my_env/lib/python3.6/site-packages/pandas ./anaconda3/lib/python3.8/site-packages/pandas ./anaconda3/lib/python3.8/site-packages/astropy/io/misc/pandas 確かに仮想環境であるmy_envにpandasパッケージがインポートされていることが確認できた。juypter notebookを起動して、改めてpandasをインポートしてみると、インポートができた。 jupyter notebookでpandas_profilingのインポートも同様に行う コーディング開始 import pandas as pd import pandas_profiling as pdp X = pd.read_csv('/home/akihiro/train.csv') profile = pdp.ProfileReport(X) profile この際、pandas_profilingパッケージ関連で多くのエラーが生じた。その際に参考にしたサイトを書いておく。自分の場合は、panadas_profilingのバージョンが問題だったらしく、condaでインポートすることが原因なのではと推測した。そこで参照したサイトがこちら↓↓ pandas_proflingを使うとTypeError: concat() got an unexpected keyword argument 'join_axes'が出た話(Google Colab) pipでインストールし直したところ、新たなエラーが生じた。 /home/akihiro/anaconda3/envs/my_env/lib/python3.6/site-packages/pandas_profiling/pandas_profiling.mplstyle' not found in the style library and input is not a valid URL or path; see `style.available` for list of available styles jupyter notebookのカーネルを再起動することでエラーを消すことができた その時参照したサイトがこちら↓↓ - pandas_profiling.mplstyle not found OSError #233 - Pandas-profiling: pandas_profiling.mplstyleが見つかりませんOSError これで訓練データの概要を表示できた。表示まで10秒ほどかかった。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む