- 投稿日:2020-02-26T22:50:12+09:00
seabornのpairplotで下半分(上半分)だけ表示する
https://tutorialmore.com/questions-1150768.htmにやり方があったのでメモ。
import seaborn as sns import matplotlib.pyplot as plt def hide_current_axis(*args, **kwds): plt.gca().set_visible(False) # データフレーム準備 X = sns.load_dataset("iris") # ペアプロット pg = sns.pairplot(X) # 上のグラフを消す=下三角部分を表示 pg.map_upper(hide_current_axis) # 下のグラフを消す=上三角部分を表示 # pg.map_lower(hide_current_axis) # 対角部分を消す # pg.map_diag(hide_current_axis)もしくはpairgridで好きな位置に好きなグラフを配置する。
from itertools import groupby import seaborn as sns import matplotlib.pyplot as plt def hide_current_axis(*args, **kwds): plt.gca().set_visible(False) def stackedhist(data, stackby, **kwds): groups = groupby(zip(stackby, data), lambda x: x[0]) grouped_data = [[v for _, v in items] for key, items in groups] plt.hist(grouped_data, stacked=True, edgecolor='none') # データフレーム準備 X = sns.load_dataset("iris") g = sns.PairGrid(X, diag_sharey=False) g.map_lower(sns.scatterplot, data=X, hue='species', alpha=0.3, edgecolor='none') g.map_diag(stackedhist, stackby=X['species']) g.map_upper(hide_current_axis)
- 投稿日:2020-02-26T22:36:08+09:00
【Python】DICOMからPNGやCSVに変換
はじめに
AIを作成するために、MRI画像をDICOM形式からPNG形式あるいはCSV形式に変換する必要があった。
件数は1700件ほど。
有名なものとしてはDICOM Cenverterがあるが、大量のDICOMをInputすると固まってしまう。
(199.99$で買わなきゃいけないのか??)
そこで、pythonを使ってDICOMからPNG形式あるいはCSV形式に変換するコードを作成した。前準備
DICOMの読み取りに必要なpydicomとPNGとして保存する上でひつようなopencv-pythonを用意する。
$ pip3 install pydicom $ pip3 install opencv-python準備
1つのフォルダの中に変換したいDICOMを用意する。
terminal$ ls instance_1.dcm instance_14.dcm instance_19.dcm instance_6.dcm instance_10.dcm instance_15.dcm instance_2.dcm instance_7.dcm instance_11.dcm instance_16.dcm instance_3.dcm instance_8.dcm instance_12.dcm instance_17.dcm instance_4.dcm instance_9.dcm instance_13.dcm instance_18.dcm instance_5.dcm実行
以下のコードを実行する。
dcm2png_csv.pyimport numpy as np import pydicom import glob import cv2 def dcm2png_csv(dcm): d = pydicom.read_file(dcm) value = d.pixel_array wc = d.WindowCenter ww = d.WindowWidth max, min = wc + ww / 2, wc - ww / 2 value_std = 255 * (value - min) / (max - min) cv2.imwrite((dcm[:-4]+'.png'), value_std) np.savetxt((dcm[:-4]+'.csv'),value,delimiter=',') for dcm in glob.glob('*dcm'): dcm2png_csv(dcm)必要なmoduleのimport
import numpy as np import pydicom import glob import cv2DICOMの読み込み
DICOMのヘッダーから値まで読み込める。
"d.pixel_array"画像の画素値にあたる。
(大人の事情でヘッダー等はみせられません(泣))d = pydicom.read_file(dcm) value = d.pixel_arrayコントラストの調節
値を読み込んでそのままPNGに保存してはコントラストがおかしな画像がでてくる。
MRI画像は16 bitでデータを収集されているのに対し、PNGは8 bitであるためおかしくなる。
DICOMのタグの中でコントラストを決定するWindow Center(WC)とWindow Width(WW)を取得し、それが8 bitの状態で正しく見えるように正規化する。wc = d.WindowCenter ww = d.WindowWidth max, min = wc + ww / 2, wc - ww / 2 value_std = 255 * (value - min) / (max - min)ここのサイトの画像がわかりやすいです。この例では、12 bitのデータを8 bitに変換しています。
PNGおよびCSVの保存
PNGとCSVの保存をします。
PNGは8 bitですので正規化され"value_std"をPNGにします。
ですが、16 bitが8 bitですから情報としては少なくなってしまうわけです。
これがAIの判定精度に影響を与える場合があります。
ですので、生の値をCSVに保存することも重要です。
ということで、CSVにするのは"value"。# PNG保存 cv2.imwrite((dcm[:-4]+'.png'), value_std) # CSV保存 np.savetxt((dcm[:-4]+'.csv'),value,delimiter=',')結果
わかりずらいですが、PNGが増えたことがわかります。
$ ls instance_1.dcm instance_14.dcm instance_19.dcm instance_6.dcm instance_1.png instance_14.png instance_19.png instance_6.png instance_10.dcm instance_15.dcm instance_2.dcm instance_7.dcm instance_10.png instance_15.png instance_2.png instance_7.png instance_11.dcm instance_16.dcm instance_3.dcm instance_8.dcm instance_11.png instance_16.png instance_3.png instance_8.png instance_12.dcm instance_17.dcm instance_4.dcm instance_9.dcm instance_12.png instance_17.png instance_4.png instance_9.png instance_13.dcm instance_18.dcm instance_5.dcm instance_13.png instance_18.png instance_5.pngまとめ
DICOMからPNGやCSVに無料変換するコードを作成した。
このコードをforで回せば一括変換できるので便利かと!
- 投稿日:2020-02-26T22:07:29+09:00
将来の売上予測への挑戦:①時系列分析って何?
はじめに
最近は、小売業界もビッグデータやらAIが持て囃されていて、日々色々な相談が各部署からあります。
特に最近多いのが、店舗の部署から「来月の売上を予測して欲しい」「来週どれくらい売れれば良いと言えばいいのか」「来月追加で販促をやった方がいいか」といった将来の売上についてです。以前は、前年同月比105%とかが目標だったのが、少子高齢化やらインバウンドやら異常気象やら、世の中変動が大きすぎて、前年同月比が役だたなくなってきています。
そこで、店舗ではいつも通りやっていたらどれくらい売れるのか・売れないのかを知っておいて、そこにイベントとか広告等で、いくら上乗せしていくかを考えていく基準にしたいというわけです。何の分析がいいのか
私は今でこそデータを使う仕事をしていますが、バリバリの文系人間なので、統計の複雑な手法に詳しくありません。
それで、当初は、回帰分析で気象とか販促施策とか、周辺のイベントとかの情報を地道に入れて予測をしてみたのですが、精度が全く上がらない。。。そんなときに、色々と調べてみると、株とかを予測するのに「時系列分析」っていうのがあるよと知りました。
時系列分析って何?
「全人類がわかる統計学」、「TV朝日の視聴率推移をSARIMAモデルで予測してみる」などを参考にしながら、私の理解で時系列分析を整理したいと思います。
(間違っていたらすみません。ぜひ難しい数式なしで、教えて下さい。。。)1.時系列分析とは、過去の売上が予測変数に入ること
私が当初つくった回帰分析では、次のように売上を全く別の変数で説明しようとしていました。
売上 = a{1} * 気温 + a{2} * 販促費用 + ・・・しかし、ある日の売上が1,000万円だとして、その翌日の売上はいくらでしょう?
100万円にはならないでしょう。逆に1億円にもならないでしょう。
たぶん、1,200万円とか800万円とか、前日の売上からそこまで大外しはしないだろうと考えられます。そこで、次のように過去の売上を、説明変数につかっていくことで精度を上げていくというやり方です。
売上{n} = a{1} * 売上{n-1} + a{2} * 売上{n-2} + ・・・これを、AR(自己回帰)と言うらしいです。
2.時系列分析とは、過去の誤差を考慮すること
1の自己回帰に対して、先月の売上が本来よりも高ければ、売上の先食いがあったと考えて、今月は売上が減る可能性を考慮するものです。
こちらは、次のように表すことができます。売上{n} = b{1} * 誤差{n} + b{2} * 誤差{n-1} + ・・・これを、MA(移動平均)と言うらしいです。
3.時系列分析といっても、完全に同じ周期を繰り返すわけではない
周期が繰り返していれば簡単なのですが、そうで無いのが現実の時系列の厳しいところ。
それを、難しい言葉で「非定常過程」というらしいです。短期的ではなく、中長期的なトレンドとして、上昇傾向だったり下降傾向だったりするのを考慮するということのようです。
この1~3をあわせて、ARIMA(Auto Regressive Integrated Moving Average)モデルと呼ぶそうです。
ARとMAが合体した感じが、カッコいいですね。4.時系列なので、季節性も考慮しないといけない
ここまでやっても、中々精度は上がりません。
でも、それは小売の人間なら分かっています。2月・9月は毎年売上が上がらないといった、季節性があるはずが、それらをここまで考慮していないんです。季節性といっても、様々な周期があるように思います。
- 週の中の曜日 : 土日は、休日に買いだめが発生する店は売上が上がる
- 月の中の日 : 25日とか給料日後には、ちょっと高いものが売れて売上が上がる
- 年の中の月 : 上記の通り2,9月は売上が下がる
これらの周期も考慮していくことができるのが、SARIMAモデルというようです。
5.時系列以外の要素も考慮しないといけない
ここまで時系列の要素を見てきましたが、突発的な要素も組み込んでいきたいところです。
- 気象:雨が降るだけじゃなく、最近の異常気象もありますね
- イベント:店の近くで運動会やお祭りがあると、それだけで売上が大きくあがります
- 競合:ライバルの店が近くにできると、それ以降売上が10数~数10%の一定規模落ち込みます
これらの外部変数を考慮していくのが、ARIMAXモデルというようです。
これらをPythonで実現していくために参考にするサイト
Pythonによる状態空間モデル
ARIMAモデルに+データ解釈を加えたモデルを、状態空間モデルというようです時系列データの予測ライブラリ--PyFlux--
ARIMAやARIMAX、状態空間モデルを実装できるライブラリとしてPyFluxというものがあるみたいですRNNで来月の航空会社の乗客数を予測する
これらは、ニューラルネットワークっぽいですがおわりに
今回は、文字ばかりですみません。
次回以降、実際に時系列分析を試していこうと思います。
- 投稿日:2020-02-26T21:08:50+09:00
Top 30 Python Interview Questions with Answers 2020
Here are the top 30 python interview questions and answers in 2020 you should read before appearing in the interview.
click here to read more
https://www.positronx.io/python-interview-questions-with-answers/
- 投稿日:2020-02-26T20:46:59+09:00
Accuracyなどの混合行列の評価が使えない理由
前回の投稿からかなり時間があいてしまいましたが、これからも時々更新していく予定です。
機械学習の精度を評価する際によく利用されているAccuracyなどの指標ですが、金融分野ではほとんど使用されることはありません。
なぜ利用されないのかを説明する前に、混合行列とAccuracyなどの評価指標について説明します。説明が不要な方は問題点と結論だけをご覧ください。Confution Matrix (混合行列)
混合行列とは機械学習の2項分類において、予測と結果をマトリックスにしたものです。
機械学習の2項分類では、通常予測確率とそれに基づく予測分類がアウトプットとして出力できます。
例えば、Credit scoreにおいては、延滞になる確率と延滞になる・ならないを0と1で出力します。
それを元に、予測と結果の合致をマトリックスで表すと以下のようになります。
- 予測(延滞にならない) - Positive 予測(延滞になる) - Negative 結果(延滞でない) - Positive TP(True Positive) FN(False Negative) 結果(延滞) - Negative FP(False Positive) TN(True Negative) 学習結果で延滞にならないと予測して、結果が延滞でない場合はTPに該当し、延滞にならないと予測して、結果が延滞の場合はFNに該当します。
予測が正しかった場合は、TPかTNに該当することになります。評価指標
上記の混合行列を使用した評価指標には以下のような種類があり、それぞれ特徴があります。どれも数値が高いほど性能がよいとされています。
Accuracy (正確率、精度)
よく利用されているもので、全体の中で予測と結果が正しかったものの割合がどれくらいあるかを示しています。$Accuracy = \frac{TP + TN}{TP + FP + FN + TN}$
Precision (適合率)
Positiveと予測された(延滞にならない)中で結果もPositiveだった割合。$Precision = \frac{TP}{TP + FP}$
Recall (再現率)
結果がPositiveなものをどれだけ予測できていたかの割合。$Recaall = \frac{TP}{TP + FN}$
Specificity (特異度)
結果がNegativeなものをどれだけ予測できていたかの割合。$Specificity = \frac{TN}{FP + TN}$
評価指標の問題点と特徴
例えば以下のような予測結果だった場合は、
- 予測(延滞にならない) - Positive 予測(延滞になる) - Negative 結果(延滞でない) - Positive 980(TP) 0(FN) 結果(延滞) - Negative 20(FP) 0(TN) $$ Accuracy = \frac{TP + TN}{TP+ FP + FN + TN} = \frac{980}{1000} = 0.98 $$
となり、Accuracyの数値は高いものになります。
ただし、内訳を見てみると予測はすべてPositiveとなっており、結果がNegativeのものを1件も予測できていません。データの偏りによってはでたらめに全部Positiveと予測しても、Accuracyは高い数値になります。
そこで、Specificityを計算してみると$$ Specificity = \frac{TN}{FP + TN} = \frac{0}{20} = 0 $$
となり、Negativeになる予測がまったくできていないことがわかります。
その他指標
評価指標が数種類あり、それぞれが特徴を持つと評価が複雑になりがちです。そこでF値(F-score, F1 score, F-measure、F尺度)などと呼ばれる指標を使用することがあります。これはPrecisionとRecallの調和平均の値となります。
$$ F1 = 2\frac{Precision * Recall}{Precision + Recall} $$
問題点
混合行列を使用した様々な評価指標を述べてきましたが、これらを利用することはありません。混合行列の根本は、PositiveとNegativeの2項に分類し、その分類の正しさを測っています。分類の仕方は、予測された確率を基に決められたしきい値より低ければPositive、高ければNegativeになります。
最初の問題点として、データに偏りがあった場合です。元のデータでPositiveが99%、Negativeが1%のような場合、評価指標の数値は偏ったものになりがちです。
第二はしきい値の設定です。しきい値をどこにするかで当然PositiveとNegativeの割合が変化しますが、そのしきい値が適切なのかが曖昧になりがちです。
例えば、これをCredit scoreなどで利用すると、A社では延滞予測が5%以上は契約しないので、しきい値を5%に、B社では10%をしきい値に用いて、Negativeに分類された人は契約しないとしましょう。ではPositiveに分類された人がすべて延滞にならないかというとそうではなく、必ず延滞になる事案が発生します。これは、すべての与えられた情報(例えば属性や取引履歴)が同じでも、延滞にならない・なるケースが発生するためです。
Credit scoreで一番重要視されるのは、リスク度合いに合わせた商品設計なので、大事なことは何%の確率か(リスク度合い)ということで、PositiveやNegativeに分類される正確さではないのです。結論
リスク度合いを考慮せずに、いかに高い精度で2項に分類するかが重要な場合は、当然上記の評価指標が重要になることもあります。
結論として、どう利用されるかによって使う指標を選ばないと、なんの意味もない指標をもとに学習モデルの精度を測り、精度がいい・悪いを評価してしまうことになります。
モデルの評価や予測した理由を説明する方法についてはこちらを御覧ください。
理解されるPOC作成のために、機械学習したモデルをどう評価し説明するかのまとめ。
- 投稿日:2020-02-26T20:46:59+09:00
Accuracyなどの混同行列の評価が使えない理由
前回の投稿からかなり時間があいてしまいましたが、これからも時々更新していく予定です。
機械学習の精度を評価する際によく利用されているAccuracyなどの指標ですが、金融分野ではほとんど使用されることはありません。
なぜ利用されないのかを説明する前に、混同行列とAccuracyなどの評価指標について説明します。説明が不要な方は問題点と結論だけをご覧ください。Confution Matrix (混同行列)
混同行列とは機械学習の2項分類において、予測と結果をマトリックスにしたものです。
機械学習の2項分類では、通常予測確率とそれに基づく予測分類がアウトプットとして出力できます。
例えば、Credit scoreにおいては、延滞になる確率と延滞になる・ならないを0と1で出力します。
それを元に、予測と結果の合致をマトリックスで表すと以下のようになります。
- 予測(延滞にならない) - Positive 予測(延滞になる) - Negative 結果(延滞でない) - Positive TP(True Positive) FN(False Negative) 結果(延滞) - Negative FP(False Positive) TN(True Negative) 学習結果で延滞にならないと予測して、結果が延滞でない場合はTPに該当し、延滞にならないと予測して、結果が延滞の場合はFNに該当します。
予測が正しかった場合は、TPかTNに該当することになります。評価指標
上記の混同行列を使用した評価指標には以下のような種類があり、それぞれ特徴があります。どれも数値が高いほど性能がよいとされています。
Accuracy (正確率、精度)
よく利用されているもので、全体の中で予測と結果が正しかったものの割合がどれくらいあるかを示しています。$Accuracy = \frac{TP + TN}{TP + FP + FN + TN}$
Precision (適合率)
Positiveと予測された(延滞にならない)中で結果もPositiveだった割合。$Precision = \frac{TP}{TP + FP}$
Recall (再現率)
結果がPositiveなものをどれだけ予測できていたかの割合。$Recaall = \frac{TP}{TP + FN}$
Specificity (特異度)
結果がNegativeなものをどれだけ予測できていたかの割合。$Specificity = \frac{TN}{FP + TN}$
評価指標の問題点と特徴
例えば以下のような予測結果だった場合は、
- 予測(延滞にならない) - Positive 予測(延滞になる) - Negative 結果(延滞でない) - Positive 980(TP) 0(FN) 結果(延滞) - Negative 20(FP) 0(TN) $$ Accuracy = \frac{TP + TN}{TP+ FP + FN + TN} = \frac{980}{1000} = 0.98 $$
となり、Accuracyの数値は高いものになります。
ただし、内訳を見てみると予測はすべてPositiveとなっており、結果がNegativeのものを1件も予測できていません。データの偏りによってはでたらめに全部Positiveと予測しても、Accuracyは高い数値になります。
そこで、Specificityを計算してみると$$ Specificity = \frac{TN}{FP + TN} = \frac{0}{20} = 0 $$
となり、Negativeになる予測がまったくできていないことがわかります。
その他指標
評価指標が数種類あり、それぞれが特徴を持つと評価が複雑になりがちです。そこでF値(F-score, F1 score, F-measure、F尺度)などと呼ばれる指標を使用することがあります。これはPrecisionとRecallの調和平均の値となります。
$$ F1 = 2\frac{Precision * Recall}{Precision + Recall} $$
問題点
混同行列を使用した様々な評価指標を述べてきましたが、これらを利用することはありません。混同行列の根本は、PositiveとNegativeの2項に分類し、その分類の正しさを測っています。分類の仕方は、予測された確率を基に決められたしきい値より低ければPositive、高ければNegativeになります。
最初の問題点として、データに偏りがあった場合です。元のデータでPositiveが99%、Negativeが1%のような場合、評価指標の数値は偏ったものになりがちです。
第二はしきい値の設定です。しきい値をどこにするかで当然PositiveとNegativeの割合が変化しますが、そのしきい値が適切なのかが曖昧になりがちです。
例えば、これをCredit scoreなどで利用すると、A社では延滞予測が5%以上は契約しないので、しきい値を5%に、B社では10%をしきい値に用いて、Negativeに分類された人は契約しないとしましょう。ではPositiveに分類された人がすべて延滞にならないかというとそうではなく、必ず延滞になる事案が発生します。これは、すべての与えられた情報(例えば属性や取引履歴)が同じでも、延滞にならない・なるケースが発生するためです。
Credit scoreで一番重要視されるのは、リスク度合いに合わせた商品設計なので、大事なことは何%の確率か(リスク度合い)ということで、PositiveやNegativeに分類される正確さではないのです。結論
リスク度合いを考慮せずに、いかに高い精度で2項に分類するかが重要な場合は、当然上記の評価指標が重要になることもあります。
結論として、どう利用されるかによって使う指標を選ばないと、なんの意味もない指標をもとに学習モデルの精度を測り、精度がいい・悪いを評価してしまうことになります。
モデルの評価や予測した理由を説明する方法についてはこちらを御覧ください。
理解されるPOC作成のために、機械学習したモデルをどう評価し説明するかのまとめ。
- 投稿日:2020-02-26T20:44:46+09:00
Pythonの繰り返し処理
pythonの繰り返し処理についてまとめました。
初級者向けです。
pythonバージョン:3.6.9
公式ドキュメントは以下が参考になります。
for文・while文
rangeRangeを使用するfor
Rangeオブジェクトを使用する繰り返しは、主に回数指定のループ処理に使用します。
Rangeオブジェクトはrange(start, stop, step)
とすることで生成でき、for文中で用いることでstartからstop - 1 まで(stop自体は含まれません)をstepずつ増やしながら実行することができます。以下にいくつか例を挙げます。N回繰り返す
for i in range(5): print(i) # 0 ~ 4 が出力される #Rangeのイニシャライザは、startが0、stepが1のデフォルト値があります。1から始めて、2ずつ増やしながら繰り返し
#stepに与える値を変えることで任意の順序でループができます for i in range(1, 10, 2): print(i) #1, 3, 5, 7, 9 が出力される値を減らしながら繰り返す
逆に減らしながらの実行も可能です。
#startが10、stopが-1、stepが-1なので、10から始まり0までを1ずつ減らしながら処理します for i in range(10, -1, -1): print(i) #10 ~ 0が減少しながら出力されるFor
こちらは
range
を使用しない場合のfor
の使用方法です。
主にイテラブルなどの集合操作に使用します。リストなどの要素をすべて繰り返し処理する
Javaなどで言うところのforeach に該当するものです。
処理することができるのは、イテラブルと呼ばれるオブジェクトです。リストやセット、タプルなどがこれにあたります。
(__iter__()
を実装しているものがイテラブルと呼ばれるのですが、詳細な解説は割愛します。)#リストの全要素を繰り返す a = [1, 3, 5, 7] for n in a: print(n) #セットに対しても同様 s = {1, 2, 3, 4} for n in s: print(n)なお、ディクショナリについては、ディクショナリが持つメソッドを用いてキーと値を取り出します。
#キーをすべて処理する for k in m.keys(): print(k) #値をすべて処理する for v in m.values(): print(v) #キーと値のペアに対して処理をする for k, v in m.items(): print(str(k) + v)インデックス付きで繰り返す
要素を走査しつつインデックスも使用したい場合は
enumerate
を使用します。a = ['a', 'b', 'c', 'd', 'f'] for i, n in enumerate(a): print(i, n) #以下のように出力されます。 #0 a #1 b #2 c #3 d #4 f複数のイテラブルを同時に処理する
複数のイテラブルの要素を順に同時に扱いたいときは
zip()
を使います。a = [1, 2, 3, 4, 5] b = ['a', 'b', 'c', 'd', 'e'] for i, j in zip(a, b): print(str(i) + j)注意点として、イテラブルの要素数が異なる場合、余分な(要素数の多い方の)要素は無視されます。
#リストaが持つ要素のうち、後ろ3つは処理対象になりません a = [1, 2, 3, 4, 5, 6, 7, 8] b = ['a', 'b', 'c', 'd', 'e'] for i, j in zip(a, b): print(str(i) + ' ' + j) #1 a #2 b #3 c #4 d #5 e # 6 ~ 8 が出力されません!While
whileは、「ある条件を満たしている間」処理を実行し続けます。
#この例だと、iが10未満の間、処理を続けます i = 0 while i < 10: print(i) i += 1 # 0 ~ 9 が出力されます。条件を満たしている間実行し続けるので、下記のコードは無限ループです。
#無限ループの簡単な例 #whileの条件が常にTrueのため、無限ループ while True: print('hoge')continue, break, else
continue
,break
,else
は、それ単独ではループ処理を実現しませんが、上記に挙げたfor
やwhile
と組み合わせることで、ループ処理中に処理の中断やスキップなどができるようになります。continue
continue
は、以下の処理をスキップして、次のループ処理に移ります。
特定の条件の場合は処理をしたくないときなどに使用します。for i in range(10): if i % 2 == 0: #偶数の場合、continueが実行され、次のループ処理に移ります continue print(i) # 1,3,5,7,9 が出力されますbreak
break
は、ループ処理そのものを終了します。特定の条件を満たした場合にループ処理を終了させたい場合などに使用します。for i in range(10): # i が7の時に処理を終了するため、7以降の数字は出力されません。 if i == 7: break print(i) #0,1,2,3,4,5,6が出力されますelse
else
は、ループ処理が終了後、かつbreakでループが終了していないときに実行されるブロックです。for i in range(10): print(i) else: print('done!') #0 ~ 9が出力された後、done! が出力されます。breakでループから抜けた場合はelseは実行されないため、下記の例ではelse句は実行されません。
for i in range(10): if i == 7: break print(i) else: print('done!') # 0 ~ 6が出力され、処理が終了します以上、他にループ処理についての知見が増えたらまた編集します。
- 投稿日:2020-02-26T20:35:18+09:00
Pythonのthreadingとmultiprocessingを完全理解
threadingとmultiprocessing
現代の主なOSと言ったら、Mac OS,UNIX,Linux,Windowsなどがあります。これらのOSは「マルチタスク」機能をサポートしてます。
マルチタスクとは?と思うかもしれませんが、例えばブラウザーを立ち上げて、音楽聴きながら、Wordでレポートを書くというシチュエーションでは、少なくとも3つのタスクが同時進行してます。そして、表のタスク以外に、裏ではOS関連の様々なタスクがこっそり動いてます。
マルチコアのCPUで、マルチタスクが処理できるのは理解しやすいですが、シングルコアのCPUでもマルチタスクが可能です。OSはそれぞれのタスクを交替に実行してます。例えば、タスク1を0.01秒、タスク2を0.01秒、タスク3を0.01秒、タスク1を0.01秒......繰り返して実行していきます。CPUは速いので、ほぼ同時進行のように感じます。
もちろん、シングルコアCPUはあくまでも交替で実行してるので、本当の意味での同時進行はマルチコアCPUのみ可能です。でもほとんどの場合、実行してるタスクの数はコアの数を遥かに超えるため、マルチコアにおいても「交替実行」の作業が行われてます。
OSにとって、1個のタスクは1個のプロセス(Process)になります。例えば、ブラウザーを立ち上げると、1個のブラウザープロセスが作成されます。同じく、Wordを開いたら、Wordプロセスが作られます。
1個のプロセスは1個の処理とは限りません。例えば、Wordではユーザーの入力をモニタリングする処理と、スペルチェック、UI表示などたくさんの処理が行われてます。これらの「サブタスク」はスレッド(Thread)と言います。1個のプロセスには最低限1個のスレッドがあります。複数のスレッドがある時、プロセスと同じく交替に実行します。
Pythonでマルチタスクを同時に処理したい時は主に2通りのやり方があります。
- 複数のプロセスを立ち上げます。それぞれ1個のスレッドしか持ってないですが、プロセスが複数あるため、複数のタスクを処理できます。
- 1個のプロセスの中で複数のスレッドを立ち上げます。
もちろん、複数のプロセスで複数のスレッドを立ち上げることもできますが、モデルが複雑になるため、あまりお勧めしません。
マルチタスクを処理する時、タスク間の通信や協力が必要だったり、タスク2が実行する時タスク1の一時停止が必要だったり、タスク3とタスク4が同時進行できなかったりするケースがあるため、プログラムがやや複雑になります。
(出典:システムソフトウェア講義の概要)1. threading
Unix系のOSでは以下のシステムコール関数が使えます。
関数 説明 start() スレッドを開始する setName() スレッドに名前をつける getName() スレッドの名前を取得 setDaemon(True) スレッドをデーモンにする join() スレッドの処理が終わるまで待機 run() スレッドの処理をマニュアルで実行する Pythonのスレッドはプロセスでシミュレートしたものではなく、本物のPOSIXスレッドです。標準ライブラリーから、
_thread
とthreading
の2つのモジュールが使えます。そして、_thread
は低レベルのモジュールで、threading
はそれをカプセル化したモジュールです。なので、通常threading
を使います。
スレッドを立ち上げるには、関数などを導入してThread
のインスタンスを作成し、start
で開始させます。1-1. Threadクラスをインスタンス化して利用する
import threading import time def run(n): # threading.current_thread().nameはgetName()を呼び出す print("task: {} (thread name: {})".format(n, threading.current_thread().name)) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",), name='Thread T2') # ここではsetName()が呼び出される # start() t1.start() t2.start() # join() t1.join() t2.join() # join()を呼び出したため # メインスレッドは上記のスレッドが終わるまで待機し # 全部終わったらprintする print(threading.current_thread().name)実行結果:
task: t1 (thread name: Thread-1) task: t2 (thread name: Thread T2) 2s 2s 1s 1s 0s 0s MainThreadt1とt2が交替で実行されてることが確認できます。交替ルールの1つはIO操作(ここでは
1-2. threading.Threadを継承してスレッドクラスをカスタマイズする
import threading import time class MyThread(threading.Thread): def __init__(self, n): super(MyThread, self).__init__() self.n = n # run()を書き直す def run(self): print("task: {}".format(self.n)) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) t1 = MyThread("t1") t2 = MyThread("t2") t1.start() t2.start()実行結果:
task: t1 task: t2 2s 2s 1s 1s 0s 0s1-3. threading.active_count()でアクティブなスレッド数を数える
REPL環境ですと、モニタリングするスレッドが複数存在するため、スレッド数が多くなります。
以下のコードをスクリプトで実行してください。import threading import time def run(n): print("task: {}".format(n)) time.sleep(1) for i in range(3): t = threading.Thread(target=run, args=("t{}".format(i),)) t.start() time.sleep(0.5) print(threading.active_count())実行結果:
task: t0 task: t1 task: t2 4メインスレッドの
import threading import time def run(n): print("task: {}".format(n)) time.sleep(0.5) for i in range(3): t = threading.Thread(target=run, args=("t{}".format(i),)) t.start() time.sleep(1) print(threading.active_count())実行結果:
task: t0 task: t1 task: t2 1実行時間を調節し、メインスレッドの
1-4. デーモンスレッド
import threading import time def run(n): print("task: {}".format(n)) time.sleep(1) print('3') time.sleep(1) print('2') time.sleep(1) print('1') for i in range(3): t = threading.Thread(target=run, args=("t{}".format(i),)) # setDaemon(True) t.setDaemon(True) t.start() time.sleep(1.5) print('スレッド数: {}'.format(threading.active_count()))実行結果:
task: t0 task: t1 task: t2 3 3 3 スレッド数: 4t1、t2、t3はメインスレッドのデーモンスレッドに設定したので、メインスレッドの終了とともに停止します。
例えば、Wordのスペルチェックはデーモンスレッドで、無限ループで実行されますが、メインスレッドが落ちると一緒に落ちます。1-5. GIL
他のプログラミング言語で、マルチコアのCPUを利用する際、同時にコア数のスレッドが実行できます。しかし、Pythonでは1つのプロセスである時刻において1つのスレッドしか実行されません。なぜなら、GIL(Global Interpreter Lock)があるからです。
Pythonが設計された当初、データセキュリティーのため、GIL機能が組み込まれたそうです。あるスレッドが実行する時、GILを取得する必要があって、1つのPythonプロセスには1つのGILしかないので、同時に1つのスレッドしか実行されないわけです。GILはパスポートみたいなもので、GILを持ってないスレッドはCPUに入れません。ちなみに、GILはCPython(通常のPython distribution)にはありますが、PyPyとJythonには存在しません。
1-5-1. CPythonでのマルチスレッドの手順
- リソースを取得
- GILをリクエストする
- PythonインタプリターはOSのネイティブスレッドを調達
- OSはCPUを操作して計算する
- GIL回収ルールを満たしたら、計算が終わってか否か、GILが回収される
- 他のスレッドが上記手順を繰り返す
- GILがまた回ってきたら、GIL回収ルールを満たすまで前回の引き続きを処理する(context switch)
1-5-2. 異なるバージョンのGIL回収ルール
- Python 2.X
- IO操作が発生したら回収
- ticksが100になったら回収
- ticksはGIL用のカウンターで、Python仮想処理の数を記録する
- 100になったらGILが回収され、0にリセットする
sys.setcheckinterval
で閾値を設定できる- Python 3.X
- ticksは廃棄された
- タイマーで時間計測し、閾値を超えたら回収
実験として、簡単な無限ループを実行してみます。
import threading import multiprocessing def loop(): x = 0 while True: x = x ^ 1 for i in range(multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start()ご覧の通り、GILのせいで、シングルプロセスで、どんなに頑張っても、CPUの利用率は100%ぐらいに止まってます(クアッドコアのCPUで最大400%利用できるはず)。
1-5-3. 違う種類のタスクにおけるPythonの計算効率
- CPUバウンドタスク
- 一定時間後GILが回収され、スレッドを切り替えるため、余計に計算コストがかかり、遅くなります。
- IOバウンドタスク
- IO操作が行われる度に、スレッドを切り替えます。遅いファイルの読み書きなどを待たずに他の処理に回せるため、効率が良いです。
1-5-4. アドバイス
- マルチコアCPUを最大限利用したいなら、multiprocessingを利用したほうが良いです。各プロセスには違うGILがあるため、計算効率が良くなります。
1-6. スレッド制御
同じプロセスのスレッド間はリソースがシェアされます。そして、スレッドの切り替えは順番性がなくランダムに行われるため、データがおかしくなることがあります。
import threading # 貯金額とする balance = 0 def change_it(n): # 出金と入金でプラマイ0になるはず global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)上記のコードを何回か実行してみると分かると思いますが、結果が0でなくなります。
balance = balance + n
は2つの不可分操作(atomic operation)に分割できます。x = balance + n balance = xここの
x
はローカル変数で、それぞれのスレッドは独自のx
を持ってます。上記のコードが順番に実行されると以下のようになります。balance = 0 # 初期値 t1: x1 = balance + 5 # x1 = 0 + 5 = 5 t1: balance = x1 # balance = 5 t1: x1 = balance - 5 # x1 = 5 - 5 = 0 t1: balance = x1 # balance = 0 t2: x2 = balance + 8 # x2 = 0 + 8 = 8 t2: balance = x2 # balance = 8 t2: x2 = balance - 8 # x2 = 8 - 8 = 0 t2: balance = x2 # balance = 0 balance = 0 # 結果が正しいしかし、順番が違うと結果も異なります。
balance = 0 # 初期値 t1: x1 = balance + 5 # x1 = 0 + 5 = 5 t2: x2 = balance + 8 # x2 = 0 + 8 = 8 t2: balance = x2 # balance = 8 t1: balance = x1 # balance = 5 t1: x1 = balance - 5 # x1 = 5 - 5 = 0 t1: balance = x1 # balance = 0 t2: x2 = balance - 8 # x2 = 0 - 8 = -8 t2: balance = x2 # balance = -8 balance = -8 # 結果が間違ってるこのように、マルチスレッドにおいて計算結果が予測不可能になる現象をスレッドアンセーフ(Thread-unsafe)と言います。
これを解決するには、スレッドにロックをかけて制御する必要があります。
1-6-1. 排他制御(mutex)
import threading # 貯金額とする balance = 0 def change_it(n): # ロックを取得 lock.acquire() global balance balance = balance + n balance = balance - n # ロックを解放 lock.release() def run_thread(n): for i in range(100000): change_it(n) lock = threading.Lock() # ロックをインスタンス化 t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)排他制御を使うことで、ロックが解放されるまで、他のスレッドがリソースにアクセスできません。こうすることで、計算結果は必ず0になります。
1-6-2. 再帰的排他制御
import threading # 貯金額とする balance = 0 def add_it(n): lock.acquire() global balance balance = balance + n return balance def sub_it(n): lock.acquire() global balance balance = balance - n return balance def change_it(n): # ロックを取得 lock.acquire() global balance balance = add_it(n) balance = sub_it(n) # 再起的にロックを解放 lock.release() def run_thread(n): for i in range(1000): change_it(n) lock = threading.RLock() # ロックをインスタンス化 t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)ここでは
add_it
とsub_it
の内部もロックを取得してます。再帰的排他制御を使用することで、それぞれのロックの解放をする必要がなく、一発で全部解放できます。ただし、非常に計算コストがかかるため、ループ数を減らしてます。1-6-3. 有限セマフォ(BoundedSemaphore)制御
排他制御で、ある時刻において、リソースを処理できるのは1つのスレッドのみに制限されるのに対して、Semaphoreは一定数のスレッドの同時処理を許容する制限です。例えば、トイレに3つの便座があって、同時に3人が使ってて、他の人は並んで待つシチュエーションがセマフォに該当します。
import threading import time def run(n): semaphore.acquire() time.sleep(1) print("current thread: {}\n".format(n)) semaphore.release() semaphore = threading.BoundedSemaphore(5) # 5個のスレッドの同時処理を許容する for i in range(22): t = threading.Thread(target=run, args=("t-{}".format(i),)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print('-----全てのスレッドが終了した-----')上記のコードを実行すると、5個ずつcurrent threadの文字列が出力されることが確認できます。
1-6-4. イベント(Event)制御
スレッドのイベントはメインスレッドが他のスレッドをコントロールするためのものです。イベントには以下のメソッドが提供されます。
メソッド 説明 clear flagをFalseにする set flagをTrueにする is_set flagがTrueの時Trueを返す wait flagをモニタリングし続ける;flagがFalseの時はブロッキング(blocking)する import threading import time event = threading.Event() def lighter(): ''' flag=True: 青信号 flag=False: 赤信号 ''' count = 0 event.set() # 初期値は青信号 while True: if 5 < count <= 10: event.clear() # 赤信号にする print("\33[41;1m赤信号...\033[0m") elif count > 10: event.set() # 青信号にする count = 0 else: print("\33[42;1m青信号...\033[0m") time.sleep(1) count += 1 def car(name): while True: if event.is_set(): # 青信号がどうかをチェック print("[{}] 前進する...".format(name)) time.sleep(1) else: print("[{}] 赤信号のため、信号を待つ...".format(name)) event.wait() # flag=Trueになるまでここでブロッキングする print("[{}] 青信号のため、前進開始...".format(name)) light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car, args=("MINI",)) car.start()上記のコードで信号機と車のスレッド間の簡単な通信をイベントで実現しました。
1-6-5. タイマー(Timer)制御
タイマーを使って実行時間でスレッドを制御することもできます。
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # 1秒後helloが実行される1-6-6. 条件(Condition)制御
条件判定でスレッドを制御する方法もあります。
Condition
には以下のメソッドが提供されます。
メソッド 説明 wait 通知されるか引数のtimeout時間に達するまでスレッドをハングアップする notify ハングアップされたスレッド(デフォルトn=1)に通知する;ロックを取得した状態でしか使えない notifyAll ハングアップされた全てのスレッドに通知する import threading import time from random import randint from collections import deque class Producer(threading.Thread): def run(self): global stocks while True: if lock_con.acquire(): products = [randint(0, 100) for _ in range(5)] stocks.extend(products) print('生産者{}は{}を生産した。'.format(self.name, stocks)) lock_con.notify() lock_con.release() time.sleep(3) class Consumer(threading.Thread): def run(self): global stocks while True: lock_con.acquire() if len(stocks) == 0: # 商品が無くなったら生産されるまで待つ # notfifyされるまでスレッドをハングアップ lock_con.wait() print('お客様{}は{}を買った。在庫: {}'.format(self.name, stocks.popleft(), stocks)) lock_con.release() time.sleep(0.5) stocks = deque() lock_con = threading.Condition() p = Producer() c = Consumer() p.start() c.start()実行結果:
生産者Thread-1はdeque([73, 2, 93, 52, 21])を生産した。 お客様Thread-2は73を買った。在庫: deque([2, 93, 52, 21]) お客様Thread-2は2を買った。在庫: deque([93, 52, 21]) お客様Thread-2は93を買った。在庫: deque([52, 21]) お客様Thread-2は52を買った。在庫: deque([21]) お客様Thread-2は21を買った。在庫: deque([]) 生産者Thread-1はdeque([6, 42, 85, 56, 76])を生産した。 お客様Thread-2は6を買った。在庫: deque([42, 85, 56, 76]) お客様Thread-2は42を買った。在庫: deque([85, 56, 76]) お客様Thread-2は85を買った。在庫: deque([56, 76]) お客様Thread-2は56を買った。在庫: deque([76]) お客様Thread-2は76を買った。在庫: deque([])お客様に在庫を全部買われたら、生産者が5個商品を生産するという簡単なプログラムです。
1-7. ThreadLocal
スレッド間のデータは共有されるため、正確の出力を計算するためにロックをかける必要があるというのを説明しました。しかし、時々それぞれのスレッドに独自のローカル変数を処理させたい時があります。
import threading # グローバルスコープでThreadLocalオブジェクトを作成 local_school = threading.local() def process_student(): # 現在のスレッドに関連するstudentを獲得 std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # ThreadLocalのstudentにnameをバインドする local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join()実行結果:
Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)ここの
local_school
はグローバル変数ですが、ThreadLocal
オブジェクトであるため、それぞれのスレッドからお互い影響なく、インスタンス変数student
を操作できます。local_school
を辞書としてみることができ、student
だけではなく、teacher
をバインドすることもできます。そして、それぞれのスレッドが任意に操作することができ、お互い影響し合うことがありません。ThreadLocal
の使い方として、それぞれのスレッドに独自のDBコネクション、httpリクエストなどを作ることができます。スレッドからすると、受け取った全てのデータはローカル変数同然で、他のスレッドを構わず操作することが可能です。2. multiprocessing
Unix系OSでは
fork()
というシステムコールで、プロセスを作成できます。fork()
を呼び出すと、現在のプロセスをコピーします。コピーされたプロセスを子プロセスと言い、元のプロセスはその親プロセスになります。fork()
の戻り値は、子プロセスと親プロセス両方に返します。そして、子プロセスの戻り値は0で、親プロセスの中では子プロセスのIDが返されます。その理由は、親プロセスは子プロセスのIDを記録しなければなりません。子プロセスからgetppid
で親プロセスのIDを取得できます。Pythonの
OS
モジュールでは、システムコール系をカプセル化してます。import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))実行結果:
Process (19148) start... I (19148) just created a child process (19149). I am child process (19149) and my parent is 19148.ここでは親プロセスと子プロセスはそれぞれ違う条件分岐に入ります。Windowsは
fork()
というシステムコールを持ってないため、実行できませんのでご注意ください。
fork()
を使うことで、プロセスが新しいタスクを引き受けた時、新しいプロセスを作って処理させることができます。例えば、有名なApacheサーバーは親プロセスがポートをモニタリングし、新しいhttpリクエストが来たら、子プロセスをfork()
して処理させます。Pythonのマルチプロセスのプログラムを作成する時は、標準ライブラリの
multiprocessing
モジュールを使うのをお勧めします。なぜなら、multiprocessing
モジュールはクロスプラットフォームで、Windowsでもマルチプロセスのプログラムを作成できます。前述のように、Windowsは
fork()
を持ってないため、multiprocessing
モジュールでプロセスを作る時は、擬似fork()
の処理をしてます。具体的にいうと、親プロセスの全てのPythonオブジェクトをPickle
でシリアライズして、子プロセスに渡すようにしてます。そのため、Windowsでmultiprocessing
モジュールの呼び出しが失敗したら、Pickle
のほうで失敗してる可能性があります。また、子プロセスを作って、外部コマンドを実行させたい時は、標準ライブラリの
subprocess
が使えますが、ここではまず、Python処理をマルチプロセスモジュールmultiprocessing
の機能ついて紹介します。2-1. プロセス(Process)
プロセスを使って簡単い子プロセスを作成できます。
from multiprocessing import Process import os # 子プロセスが実行する処理 def run_proc(name): print('Run child process {} ({})...'.format(name, os.getpid())) print('Parent process {}.'.format(os.getpid())) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.')実行結果:
Parent process 19218. Child process will start. Run child process test (19219)... Child process end.実行関数と引数を
Process
に渡して、インスタンスを作って、start
で起動します。fork()
より、簡単に子プロセスを作れます。ここのjoin
を使うことで、スレッドの時と同じく、親プロセスは子プロセスの実行が終わるまで待機します。2-2. プロセスプール(Process Pool)
子プロセスを作るには非常に計算コストがかかるため、大量に作りたい時は、
Pool
でプロセスプールを作ったほうが効率的です。Pool
の主なメソッドは以下のようになります。
メソッド 説明 apply 同期処理 apply_async 非同期処理 terminate 直ちに終了する join 親プロセスは子プロセスの処理が終わるまで待機する;プロセスのjoinはcloseかterminateの後でしか実行できない close 全てのプロセスの処理が終わったら終了する from multiprocessing import Pool import os import time import random def long_time_task(name): print('Run task {} ({})...'.format(name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task {} runs {} seconds.'.format(name, (end - start))) print('Parent process {}.'.format(os.getpid())) p = Pool(4) # 同時に最大4個の子プロセス for i in range(5): p.apply_async(long_time_task, args=(i,)) # 非同期処理のため、親プロセスは子プロセスの処理を待たずに、 # 次のprintをする print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.')実行結果:
Parent process 19348. Waiting for all subprocesses done... Run task 0 (19349)... Run task 1 (19350)... Run task 2 (19351)... Run task 3 (19352)... Task 1 runs 0.8950300216674805 seconds. Run task 4 (19350)... Task 2 runs 1.0132842063903809 seconds. Task 4 runs 0.3936619758605957 seconds. Task 3 runs 2.3689510822296143 seconds. Task 0 runs 2.776203155517578 seconds. All subprocesses done.プールサイズは4なので、task 4はtask 0からtask 3のどれかが終了してから実行し始めます。
2-3. プロセス間通信
スレッドと違って、プロセス間のデータはシェアされません。OSはプロセス間通信の方法をたくさん提供してます。
multiprocessing
はOSの低レベルの機能をカプセル化し、使いやすくしてます。2-3-1. キュー(Queue)
FIFOのデータ構造キューはよく、プロセス間通信に使われます。
from multiprocessing import Process, Queue import os import time import random # Queueにデータを書き込む def write(q): print('Process to write: {}'.format(os.getpid())) for value in ['A', 'B', 'C']: print('Put {} to queue...'.format(value)) q.put(value) time.sleep(random.random()) # Queueからデータを読み取り def read(q): print('Process to read: {}'.format(os.getpid())) while True: value = q.get(True) print('Get {} from queue.'.format(value)) # 親プロセスがQueueを作って、子プロセスに渡す q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # pwを起動し、書き込み開始 pw.start() # prを起動し、読み取り開始 pr.start() # pwが終了するのを待つ pw.join() # prは無限ループなので、強制終了 pr.terminate()実行結果:
Process to write: 19489 Put A to queue... Process to read: 19490 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.仮に読み取りが遅くても、FIFOのため正確の順番に取り出せます。
2-3-2. パイプ(Pipe)
名前の通りパイプはパイプ状のデータ構造と考えて良いと思います。パイプの片方にデータを入れて(
send
メソッド)、もう片方にデータ受け取る(recv
メソッド)というふうにデータが伝達されてます。2つのプロセスが同じタイプに同時にデータを入れたり受け取ったりすると、データが破損する可能性がありますのでご注意ください。from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()実行結果:
[42, None, 'hello']2-3-3. マネージャ(Manager)
マネジャーはデータを伝達してるというよりシェアしてると言ったほうが的確かもしれません。
Manager()
はマネージャーオブジェクトを返してサーバープロセスを作ります。てサーバープロセスを通して、他のプロセスはプロキシ方式で、Pythonオブジェクトを操作することが可能になります。マネージャーオブジェクトはPythonのlist, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array
オブジェクトをサポートしています。from multiprocessing import Process, Manager def f(d, l, i): d[i] = i d[str(i)] = str(i) l.append(i) print(l) with Manager() as manager: shared_dict = manager.dict() shared_list = manager.list() p_list = [] # 10個のプロセスを作成 for i in range(10): p = Process(target=f, args=(shared_dict, shared_list, i)) p.start() p_list.append(p) for p in p_list: p.join() print('All subprocesses done.') print(shared_dict) print(shared_list)実行結果:
[0] [0, 1] [0, 1, 2] [0, 1, 2, 3] [0, 1, 2, 3, 4] [0, 1, 2, 3, 4, 5] [0, 1, 2, 3, 4, 5, 6] [0, 1, 2, 3, 4, 5, 6, 8] [0, 1, 2, 3, 4, 5, 6, 8, 7] [0, 1, 2, 3, 4, 5, 6, 8, 7, 9] All subprocesses done. {0: 0, '0': '0', 1: 1, '1': '1', 2: 2, '2': '2', 3: 3, '3': '3', 4: 4, '4': '4', 5: 5, '5': '5', 6: 6, '6': '6', 8: 8, '8': '8', 7: 7, '7': '7', 9: 9, '9': '9'} [0, 1, 2, 3, 4, 5, 6, 8, 7, 9]マネージャーでプロセス間共有のリストと辞書を作成してみました。ここでは、プロセスの処理は順番に行われてないことが確認できます。
2-3-4. プロセスのロック処理
スレッドと同じくプロセスにもロック処理があります。
from multiprocessing import Process, Lock def f(i): lock.acquire() try: print('hello world', i) finally: lock.release() lock = Lock() for num in range(10): Process(target=f, args=(num,)).start()実行結果:
hello world 0 hello world 1 hello world 2 hello world 3 hello world 4 hello world 5 hello world 6 hello world 7 hello world 8 hello world 9ロックがかかったことで前回と違って数字が順番に出力されてます。ただし、マルチプロセスの性能を発揮できなくなります。
2-4. 分散型プロセス処理
Pythonのプロセスは複数のマシンを利用して分散型プロセス処理が可能です。
multiprocessing
モジュールのmanagers
サブモジュールはプロセスを複数のマシンに分散できます。通信プロトコルが分からなくても、分散型プロセス処理のプログラムを書けます。分散型プロセス処理にはタスクを配るサーバープロセスと、タスクを実際に処理するワーカープロセスが必要です。まず、サーバープロセスの
task_master.py
を実装します。ここでは、
managers
でキューをapiとしてネットに公開します。サーバープロセスはキューを起動して、タスクを入れると、他のマシンはタスクにアクセスすることが可能になります。task_master.pyimport random import queue # ネット経由のため、標準ライブラリーのqueueで十分 from multiprocessing.managers import BaseManager # タスクを送るキュー task_queue = queue.Queue() # 結果を受け取るキュー result_queue = queue.Queue() class QueueManager(BaseManager): pass # 2つのキューをapiとして登録する # Windowsの場合はapiにlambdaが使えなので、素直に関数を定義してください QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # ポート5000を使い、認証暗号を'abc'にする # Windowsの場合はアドレスを明記する必要がある(127.0.0.1) manager = QueueManager(address=('', 5000), authkey=b'abc') # 起動する manager.start() # ネット経由でキューオブジェクトを取得 task = manager.get_task_queue() result = manager.get_result_queue() # タスクを入れてみる for i in range(10): n = random.randint(0, 10000) print('Put task {}...'.format(n)) task.put(n) # resultキューから結果を受け取る print('Try get results...') for i in range(10): # 10秒超えたらtimeoutで終了 r = result.get(timeout=10) print('Result: {}'.format(r)) # 終了 manager.shutdown() print('master exit.')次に、ワーカープロセスの
task_worker.py
を実装します。上で公開したmanager.get_task_queue
というapiでタスクを取得して、処理します。task_worker.pyimport time import queue from multiprocessing.managers import BaseManager # 同じQueueManagerを作る class QueueManager(BaseManager): pass # ネットからapiを取得してQueueManagerに登録する QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # サーバーに接続する server_addr = '127.0.0.1' print('Connect to server {}...'.format(server_addr)) # 同じポートと認証暗号を設定する m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 接続 m.connect() # それぞれのキューを取得 task = m.get_task_queue() result = m.get_result_queue() # taskキューからタスクを受け取って # 処理結果をresultキューに格納する for i in range(10): try: n = task.get(timeout=1) # ここでは簡単な二乗計算をタスクとする print('run task {} * {}...'.format(n, n)) r = '{} * {} = {}'.format(n, n, n*n) time.sleep(1) result.put(r) except queue.Empty: print('task queue is empty.') # 終了 print('worker exit.')ローカルマシンでも実行可能です。
実行結果:
サーバープロセスはまずタスクをtask_queue
に入れます。全部入れたら、result_queue
の中に結果が入るのを待ちます。task_master.pyPut task 7710... Put task 6743... Put task 8458... Put task 2439... Put task 1351... Put task 9885... Put task 5532... Put task 4181... Put task 6093... Put task 3815... Try get results...続いて、ワーカープロセスはサーバーに接続し、
task_queue
にあるタスクを取り出して、処理をします。処理結果はresult_queue
に送ります。task_worker.pyConnect to server 127.0.0.1... run task 7710 * 7710... run task 6743 * 6743... run task 8458 * 8458... run task 2439 * 2439... run task 1351 * 1351... run task 9885 * 9885... run task 5532 * 5532... run task 4181 * 4181... run task 6093 * 6093... run task 3815 * 3815... worker exit.
result_queue
の中に結果が入ってきたら、サーバープロセスは順に出力します。task_master.pyPut task 7710... Put task 6743... Put task 8458... Put task 2439... Put task 1351... Put task 9885... Put task 5532... Put task 4181... Put task 6093... Put task 3815... Try get results... Result: 7710 * 7710 = 59444100 Result: 6743 * 6743 = 45468049 Result: 8458 * 8458 = 71537764 Result: 2439 * 2439 = 5948721 Result: 1351 * 1351 = 1825201 Result: 9885 * 9885 = 97713225 Result: 5532 * 5532 = 30603024 Result: 4181 * 4181 = 17480761 Result: 6093 * 6093 = 37124649 Result: 3815 * 3815 = 14554225 master exit.ワーカープロセスではキューを作成してないので、全てのキューはサーバープロセスの中に存在します。
(出典:廖雪峰的官方网站)このように分散型プロセスをPythonで実現できます。複数ワーカーを使って処理させることで、強力な計算パワーが手に入ります。
3. subprocess
Unix系OSでは
fork()
で、子プロセスとして現在のプロセスのコピーを作成するのを説明しました。つまり、Pythonでos.fork
呼び出すと、Pythonプログラムの子プロセスが作成されます。しかし、Pythonプログラムではなく、外部コマンドが実行できる子プロセスが必要な時もあります。
Unix系OSにはもう1つexec()
というシステムコールが存在します。Pythonの中ではos.execve
として実装されてます。exec()
は現在プロセスを他のプログラムで置き換える関数です。つまり、os.fork
でPythonプログラムの子プロセスを作り、os.execve
で他のプログラム(シェルで実行できるls
、ping
のようなプログラムなど)で置き換えることができます。
標準ライブラリsubprocess
はPython処理ではなく、外部プログラムを実行する子プロセスを作成するためのモジュールです。そして、subprocess
で外部プログラムを実行する時は、Pythonプロセスと子プロセスの間にプロセス間通信用のパイプ(Pipe)が自動的に構築され、パラメータを渡したり、エラーメッセージを受け取ったりすることが可能になります。TODO
参考
threading --- スレッドベースの並列処理
multiprocessing --- プロセスベースの並列処理
subprocess --- サブプロセス管理
- 投稿日:2020-02-26T20:27:38+09:00
画像読み込みメモ
from matplotlib import pyplot as plt import cv2 img = cv2.imread("/content/test_images/test_1016805_0080.jpg") img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) plt.imshow(img)
- 投稿日:2020-02-26T20:27:32+09:00
時間の計算 演算子 datetime
import datetime now = datetime.datetime.now() d = datetime.timedelta(weeks=-1) d2 = datetime.timedelta(weeks=1) d3 = datetime.timedelta(days=365) print(now) print(now + d) print(now - d2) print(now - d3)実行結果2020-02-26 11:24:59.726696 2020-02-19 11:24:59.726696 2020-02-19 11:24:59.726696 2019-02-26 11:24:59.726696
- 投稿日:2020-02-26T20:27:32+09:00
時間の計算 演算子 datetime timedelta
import datetime now = datetime.datetime.now() d = datetime.timedelta(weeks=-1) d2 = datetime.timedelta(weeks=1) d3 = datetime.timedelta(days=365) print(now) print(now + d) print(now - d2) print(now - d3)実行結果2020-02-26 11:24:59.726696 2020-02-19 11:24:59.726696 2020-02-19 11:24:59.726696 2019-02-26 11:24:59.726696
- 投稿日:2020-02-26T20:16:14+09:00
AtCoderのテストツール for Pythonを作った
序
AtCoderの問題を解き、入出力サンプルによるテストで動作確認する際、
手作業でコピーして出力を確認…の繰り返しでは意外と時間がかかってしまい、
特に100点~200点問題の正解時間に影響を与えます。これを削減するために入出力データの組を与えて自動でテストを行うツールを作成しました。
ソース
https://github.com/mui-nyan/atcoder_testtool
導入
- 上のリンクからソースをダウンロードして適当なディレクトリに配置します。
.bashrc
などに関数として追加(↓)するか、atcoder_testtool
をPATHに追加します。actest_python() { bash ~/Documents/dev/atcoder_testtool/actest_python.sh $1 $2 }使い方
テストしたいソースファイルと同じディレクトリにデータファイルを作成します。ここでは
input
というファイル名としました。
データファイルのフォーマットは以下のように、入力---出力===
の繰り返しになっています。入力1 --- 出力1 === 入力2 --- 出力2 ===(最後の出力のあとも
===
が必要)
actest_python
を実行し、引数にソースファイル名とデータファイルを与えれば順にテストされます。$ actest_python B.py input AC 91ms Expect: 2 Actual: 2 AC 92ms Expect: 0 Actual: 0 AC 91ms Expect: 5 Actual: 5不正解や制限時間オーバーの場合の表示サンプルです。
制限時間は固定で2secとしました。$ actest_python B.py input WA 96ms Expect: 2 Actual: 3 TLE 2192ms Expect: 0 Actual: 0 AC 91ms Expect: 5 Actual: 5データファイルを楽して作りたい
テストは楽に行なえますが、データファイルの作成がやっぱり面倒なので、
データファイルの作成を補助するツールも作りました。上のGitHubにも含まれていますが、これです。
javascript: (function(){ let ans = ""; let i=0; while($(`.lang-ja #pre-sample${i}`).length > 0) { const input = $(`#pre-sample${i}`).html().trim(); const expect = $(`#pre-sample${i+1}`).html().trim(); ans += input + "\n---\n" + expect + "\n===\n"; i += 2; } console.log(ans); navigator.clipboard.writeText(ans); })();これをブックマークに登録し…
問題ページで実行して…
貼り付けると、
データファイルができます!
結果
嬉しい!
- 投稿日:2020-02-26T19:57:08+09:00
datetime その1
import datetime now = datetime.datetime.now() print(now) print(now.isoformat()) print(now.strftime('%d/%m/%Y-%H:%M:%S:%f')) print('##########') today = datetime.date.today() print(today) print(today.isoformat()) print(today.strftime('%d/%m/%y')) print('##########') t = datetime.time(hour=1, minute=15, second=30, microsecond=100) print(t) print(t.isoformat()) print(t.strftime('%H:%M:%S:%f'))実行結果2020-02-26 10:56:54.691417 2020-02-26T10:56:54.691417 26/02/2020-10:56:54:691417 ########## 2020-02-26 2020-02-26 26/02/20 ########## 01:15:30.000100 01:15:30.000100 01:15:30:000100
- 投稿日:2020-02-26T19:46:45+09:00
Pythonで掛け算 (paizaランク D 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 単語のカウント (paizaランク C 相当)
https://paiza.jp/works/mondai/skillcheck_sample/word-count?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
sort-number.pys = input().split() word_list = [] for word in s: if not word in word_list: print(word, s.count(word)) word_list.append(word)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T19:43:49+09:00
PythonでFizz Buzz (paizaランク C 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 Fizz Buzz (paizaランク C 相当)
https://paiza.jp/works/mondai/skillcheck_sample/fizz-buzz?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
fizz-buzz.pyN = int(input()) for i in range(1, N+1): if i % 3 == 0 and i % 5 == 0: print("Fizz Buzz") elif i % 3 == 0: print("Fizz") elif i % 5 == 0: print("Buzz") else: print(i)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T19:38:39+09:00
Pythonで数の並び替え (paizaランク D 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 数の並び替え (paizaランク D 相当)
https://paiza.jp/works/mondai/skillcheck_sample/sort-number?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
バブルソートを自分で書いてみました。
sort-number.py# 入力された値を保存する n = int(input()) a = [] for i in range(n): a.append(int(input())) # バブルソート def bSort(a): for i in range(len(a)-1): for j in range(len(a)-1, i, -1): if a[j] < a[j-1]: a[j], a[j-1] = a[j-1], a[j] return a # 答えを出力する for i in bSort(a): print(i)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T19:32:43+09:00
Pythonで文字の一致 (paizaランク D 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 文字の一致 (paizaランク D 相当)
https://paiza.jp/works/mondai/skillcheck_sample/diff_str?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
diff_str.py# 入力された文字列を保存する a = input() b = input() # 文字列を比較し答えを出力する if a == b: print("OK") else: print("NG")解答コード その2
頑張って一文で書くとこんな感じになりました。
これが一般的なのですかね?diff_str.pyprint("OK" if input() == input() else "NG")参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T19:21:22+09:00
Pythonで一番小さい値 (paizaランク D 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 一番小さい値 (paizaランク D 相当)
https://paiza.jp/works/mondai/skillcheck_sample/min_num?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
min関数を使っては面白くないかなと思い、あえて冗長な書き方をしてみました。
min_num.py# 入力された値を保存する n_1 = int(input()) n_2 = int(input()) n_3 = int(input()) n_4 = int(input()) n_5 = int(input()) # 一番小さい数字を見つける ans = n_1 if n_2 < n_1: ans = n_2 if n_3 < ans: ans = n_3 if n_4 < ans: ans = n_4 if n_5 < ans: ans = n_5 # 答えを出力する print(ans)解答コード その2
先ほどのをfor文とリスト構造を用いて少し見やすくしてみました。
min_num.py# 入力された値を保存する n = [int(input()) for i in range(5)] # 一番小さい数字を見つける ans = n[0] for i in range(4): if n[i+1] < ans: ans = n[i+1] # 答えを出力する print(ans)解答コード その3
min関数を使ってみました。
min_num.py# 入力された値を保存する n = [int(input()) for i in range(5)] # 一番小さい数字を見つける ans = min(n) # 答えを出力する print(ans)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T19:11:56+09:00
websocketとwebsocket_clientの競合でうまくimportできないのを解決する
達成したいこと
- import websocketしたい
- websocket.create_connection()したい
やったこと
importできない
sumple_ws.pyfrom websocket import create_connection↓
Traceback (most recent call last): File "sumple_ws.py", line 10, in <module> from websocket import create_connection ImportError: cannot import name 'create_connection' from 'websocket' (unknown location)
websocket_clientとかいうやつがある
サーバ側とクライアント側で使うべきパッケージが違うらしい
今回のケースでは誤り
$ pip install websocketこっちが正解
$ pip install websocket-clientなのにimportするときはどっちも
websocket
リーダブルコード読んできてくださいwebsocket_clientを導入したのにimportできない
はい完璧
$ pip install websocket-client $ pip uninstall websocket $ pip freeze | grep websocket websocket-client==0.57.0が 再現
sumple_ws.pyfrom websocket import create_connectionTraceback (most recent call last): File "sumple_ws.py", line 10, in <module> from websocket import create_connection ImportError: cannot import name 'create_connection' from 'websocket' (unknown location)import websocketの名前空間がおかしい
対話モードで確認
>>> import websocket >>> dir(websocket) ['__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__']
websocket_clientも再インストールする
$ pip uninstall websocket-client $ pip install websocket-client 対話モード ```python >>> import websocket >>> dir(websocket) ['ABNF', 'DEFAULT_SOCKET_OPTION', 'STATUS_ABNORMAL_CLOSED', 'STATUS_BAD_GATEWAY', 'STATUS_GOING_AWAY', 'STATUS_INVALID_EXTENSION', 'STATUS_INVALID_PAYLOAD', 'STATUS_MESSAGE_TOO_BIG', 'STATUS_NORMAL', 'STATUS_POLICY_VIOLATION', 'STATUS_PROTOCOL_ERROR', 'STATUS_STATUS_NOT_AVAILABLE', 'STATUS_TLS_HANDSHAKE_ERROR', 'STATUS_UNEXPECTED_CONDITION', 'STATUS_UNSUPPORTED_DATA_TYPE', 'WebSocket', 'WebSocketAddressException', 'WebSocketApp', 'WebSocketBadStatusException', 'WebSocketConnectionClosedException', 'WebSocketException', 'WebSocketPayloadException', 'WebSocketProtocolException', 'WebSocketProxyException', 'WebSocketTimeoutException', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', '_abnf', '_app', '_cookiejar', '_core', '_exceptions', '_handshake', '_http', '_logging', '_socket', '_ssl_compat', '_url', '_utils', 'continuous_frame', 'create_connection', 'debug', 'dump', 'enableTrace', 'error', 'frame_buffer', 'getdefaulttimeout', 'isEnabledForDebug', 'isEnabledForError', 'isEnabledForTrace', 'recv', 'recv_line', 'send', 'setdefaulttimeout', 'sock_opt', 'trace', 'warning']今度こそ完璧
要因
インストールとアンインストールのタイミングのせいでなんやかんやあったと思われる
これが
$ pip install websocket-client $ pip uninstall websocketこの順番ならよかったはず
$ pip uninstall websocket $ pip install websocket-client
- 投稿日:2020-02-26T19:03:09+09:00
Pythonで掛け算 (paizaランク D 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 掛け算 (paizaランク D 相当)
https://paiza.jp/works/mondai/skillcheck_sample/multiplication?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
multiplication.py# 入力される値を変数に保存する a = int(input()) b = int(input()) # aとbを掛け合わして答えに保存する answer = a * b # 答えを出力する print(answer)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T18:58:15+09:00
pythonでrabbimqを扱う
はじめに
RabbitMqはソフトウェア間でメッセージのやり取りやメッセージのキューイングなど(AMQP)ができるものです。使用方法については多くのサイトで書かれていますが実際にメッセージの管理については書かれているところが少ないため、RabbitMqやpythonのpikaの用意からメッセージの確認までをまとめました。
環境
- python:3.6.5
- イメージ:rabbitmq:3-management
RabbitMqを使うのに必要なもの
- RabbitMqサーバ:メッセージを受けたり送ったりするもの(メッセージをためる場所)
- producer:メッセージを送るもの(クライアント)
- consumer:メッセージを受け取るもの(ホスト)
1. RabbitMqの用意
dockerのRabbitMqイメージを取得
RabbitMqのイメージは標準のものと管理プラグインが有効なものの2つがあります。今回はキューの状況を見たいため管理プラグインのものをpullします。rabbitmqという名前でコンテナが提供されているのでイメージをpullします。
docker pull rabbitmq:3-management普通のrabbitmqの場合は、
rabbitmq:3-management
ではなくrabbitmq
をpullしてください。RabbitMqの起動
RabbitMqを起動します。今回はバックグラウンドで動き続けてほしいため
-d
オプションをつけています。また、ポートは管理画面を見たいため、コンテナ内の15672をホストの8080に、実際にキューのやり取りをするポートをコンテナ内の5672とホストの5672に紐づけています。さらにキューを特定するために--hostname
を分かる名前にしています。docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 8080:15672 rabbitmq:3-management普通のrabbitmqの場合は、
rabbitmq:3-management
ではなくrabbitmq
をrunして、-p 8080:15672
も不要です。RabbitMqの起動確認
今回は、管理プラグインが有効なコンテナを起動したので
localhost:8080
にブラウザからアクセスして中身を見てみます。アクセスするとログイン画面になるため、デフォルトのguest
で入ります。現在の状態がみれます。Nodesのところに先ほど--hostname
で指定した名前が入っています。2. producer:メッセージを送るもの(クライアント)の用意
producerの作成にはpythonでキューのやり取りを行うため、pikaというライブラリを使用してrabbitmqにアクセスします。
pikaのインストール
pikaのインストールは普通にpipでインストールします。
pip install pikaproducerの実装
メッセージの送信には次の手順が必要です。これらの手順はすべてrabbitmqに対して行われるため、consumerは必要ありません。
- コネクション作成
- チャンネル作成・取得
- キュー作成・取得
- メッセージの送信
コネクション作成
pythonからRabbitMqに向けて接続を行います。作成するパラメータにホスト名(IPアドレス)やポート番号、タイムアウトなどの設定を与えます。RabbitMqコンテナはホストの5672ポートに紐づいているため、パラメータにlocalhostを与えて、ポートはデフォルトの5672のままなので指定はしていません。その後、パラメータを与えてコネクションを生成して接続完了になります。
client_main.pyimport pika pika_param = pika.ConnectionParameters('localhost') connection = pika.BlockingConnection(pika_param)チャンネル作成・取得
接続が完了したら次はチャンネルの作成を行います。チャンネルとはRabbitMqへの道のようなイメージです。チャンネルが同じproducerとconsumerがメッセージのやり取りの対象になります。
client_main.pyimport pika pika_param = pika.ConnectionParameters('localhost') connection = pika.BlockingConnection(pika_param) channel = connection.channel()最後の1行だけが追加されています。必要であればこの引数にチャンネルの識別番号を入力します。
キュー作成・取得
チャンネルを作成できたらキューの作成を行います。このキューとはメッセージをためる場所のようなイメージになります。そのため、キューの名前が同じでなければメッセージのやり取りは行えません。チャンネルとは違い必ず指定が必要になります。
client_main.pyimport pika pika_param = pika.ConnectionParameters('localhost') connection = pika.BlockingConnection(pika_param) channel = connection.channel() channel.queue_declare(queue='hello')最後の1行だけが追加されています。必要であればこの引数にキューの設定を入力します。
メッセージの送信
事前の用意ができたため、メッセージの送信を行います。
basic_publish()
のrouting_key
にキューの名前を指定してbody
に送信したいメッセージを指定します。client_main.pyimport pika pika_param = pika.ConnectionParameters('localhost') connection = pika.BlockingConnection(pika_param) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') connection.close()最後に送信が完了したらコネクションを閉じます。
実際に送信してみる
RabbitMqへの送信用ソースができたため、実行してみます。コマンドはすぐに帰ってきます。
PS C:\Users\xxxx\program\python\pika> python .\client_main.pyRabbitMqの管理画面でも確認してみます。Queuesタブを見てみるとMessageがReady:1になっているのがわかります。
3. consumer:メッセージを受け取るもの(ホスト)の用意
consumerも、pythonでキューのやり取りを行うため、pikaというライブラリを使用してRabbitMqにアクセスします。
consumerの実装
メッセージの送信には次の手順が必要です。これらの手順はすべてRabbitMqに対して行われるため、起動にproducerは必要ありません。コネクション作成からキュー作成まではproducerと同じです。
- コネクション作成
- チャンネル作成・取得
- キュー作成・取得
- コールバック(受信時処理)の作成
- queueメッセージ受付開始
コールバック(受信時処理)の作成
メッセージを受信したときに処理させたい関数を記載します。関数の最後に受信したからキューからメッセージを削除するために応答関数
basic_ack()
を指定します。今回は例として受信したメッセージを表示する関数callback
を記載します。host_main.pyimport pika pika_param = pika.ConnectionParameters(host='localhost') connection = pika.BlockingConnection(pika_param) channel = connection.channel() channel.queue_declare(queue='hello') ![queue_receive.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/481713/d78bd155-4b9a-9093-998d-c85ff4af2cc7.png) ![queue_receive.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/481713/f6fbd5ef-322d-77d0-8e85-68626ac91783.png) def callback(ch, method, properties, body): print("{} Received".format(body)) ch.basic_ack(delivery_tag = method.delivery_tag)queueメッセージ受付開始
作成したチャンネルの
basic_consume()
にキュー名とコールバック関数を指定します。その後にstart_consuming()
関数でメッセージの受信を開始します。この関数を開始すると関数内で延々とメッセージ待ちをするため終了するときは強制終了かコールバック関数に終了の契機を入れておく必要があります。host_main.pyimport pika pika_param = pika.ConnectionParameters(host='localhost') connection = pika.BlockingConnection(pika_param) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("{} Received".format(body)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume( queue='hello', on_message_callback=callback) channel.start_consuming()実際に受信してみる
RabbitMqからの受信用ソースができたため、実行してみます。
PS C:\Users\xxxx\program\python\pika> python .\host_main.py b'Hello World!' Receivedproducerで送信したメッセージが受信できて標準出力されていることが確認できます。
RabbitMqの管理画面でも確認してみます。Queuesタブを見てみるとMessageがReady:0になっているのがわかります。
おわりに
RabbitMqを使用してpythonでメッセージのやり取りを行う方法をまとめました。とはいえほとんど公式の内容と同じになってしまいました。これを使用することで容易に非同期な処理やキューに関する処理を実施することができそうです。
- 投稿日:2020-02-26T18:56:16+09:00
Pythonで掛け算 (paizaランク D 相当)を解く
初めに
paizaスキルチェック基本問題を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaの練習問題 mod7占い (paizaランク S 相当)
https://paiza.jp/works/mondai/skillcheck_sample/addition?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
addition.py# 入力された値を保存する a, b = (int(x) for x in input().split()) # 値の足し算を計算して答えに保存する answer = a + b # 答えを出力する print(answer)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
まとめ
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T18:56:16+09:00
Pythonで足し算 (paizaランク D 相当)を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題 足し算 (paizaランク D 相当)
https://paiza.jp/works/mondai/skillcheck_sample/addition?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
addition.py# 入力された値を保存する a, b = (int(x) for x in input().split()) # 値の足し算を計算して答えに保存する answer = a + b # 答えを出力する print(answer)参考
https://qiita.com/KoyanagiHitoshi/items/3286fbc65d56dd67737c
最後に
分からないところがあれば気軽にコメントしてくださいね。
できるだけ答えますよ!
- 投稿日:2020-02-26T18:37:19+09:00
Pythonでmod7占い (paizaランク S 相当)を解く
初めに
paizaスキルチェック基本問題を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaの練習問題 mod7占い (paizaランク S 相当)
https://paiza.jp/works/mondai/skillcheck_sample/mod7?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。ダメな解答コード
- 三重ループ
- 計算量: O(n^3)
- 大規模データではタイムオーバーで失敗してしまいます。
mod7_v1.pyn = int(input()) cards = [int(input())%7 for i in range(n)] total = 0 for n1 in range(len(cards)): for n2 in range(n1+1,len(cards)): for n3 in range(n2+1, len(cards)): if (cards[n1]+cards[n2]+cards[n3])%7==0: total+=1 print(total)ダメな解答コードその2
上記と同じ理由でダメです。
mod7_v2.pyimport itertools n = int(input()) cards = [int(input())%7 for i in range(n)] total = 0 for num_set in itertools.combinations(cards, 3): if sum(num_set)%7==0: total += 1 print(total)解答コード
- 足す前に剰余を取っても、結果は同じ
- (A+B+C)%7 は、(A%7 + B%7 + C%7)%7 と等しい
- 0~6の値のみ与えられたと考えてよい。
- 3つの値を選ぶ組み合わせは7*7*7 = 343
mod7.pyn = int(input()) cards = [int(input())%7 for i in range(n)] total = 0 for n1 in range(7): for n2 in range(7): for n3 in range(7): if (n1 + n2 + n3) %7 == 0: c1 = cards.count(n1) c2 = cards.count(n2) c3 = cards.count(n3) if n2 == n1: c2 -= 1 if n3 == n1: c3 -= 1 if n3 == n2: c3 -= 1 pat = c1*c2*c3 total += pat print(total//6)参考
https://www.slideshare.net/paiza_official/mod7-note
まとめ
いろいろ試行錯誤した結果、かなりシンブルになったので一人で感動してました。
今はこの感動を誰かに伝えられる語彙力と解説力が欲しいです。
- 投稿日:2020-02-26T17:22:49+09:00
先読み/後読みアサーションで重複マッチング
先読み/後読みアサーションで重複マッチング
Python3ではデフォルトで重複しないマッチングを行う。一度マッチした文字は「消費」されるため、例えば「AAAA」から「AA」を検索した場合、最初2文字と最後2文字の2箇所にマッチし、中間にはマッチしない。同じく、
_hoge_foo_baaar_
から_[a-z]+_
を検索すると、_foo_
はマッチしない。重複したマッチングを行うときは、文字を「消費」しない先読みアサーション/後読みアサーションを使う。
(参考:Pythonの正規表現で重複したマッチのやり方(先読みアサーション)ということで手を動かして試してみた。
import re s = "_hoge_foo_baaar_" key = "(?<=_)[a-z]+(?=_)" for m in regex.finditer(key, s, overlapped=True): print(m) # <regex.Match object; span=(1, 5), match='hoge'> # <regex.Match object; span=(6, 9), match='foo'> # <regex.Match object; span=(10, 15), match='baaar'> # 一致位置を取り出す m = [m.span() for m in re.finditer(key, s)] print(m) # [(1, 5), (6, 9), (10, 15)]macthには先/後読みアサーションで指定した部分は含まれない.
含みたい場合は多分 regex 使ったほうが早い(公式でも推奨されているreの上方互換ライブラリらしい)import regex s = "_hoge_foo_baaar_" key = "_[a-z]+_" for m in regex.finditer(key, s, overlapped=True): print(m) # <regex.Match object; span=(0, 6), match='_hoge_'> # <regex.Match object; span=(5, 10), match='_foo_'> # <regex.Match object; span=(9, 16), match='_baaar_'> ms = [m.span() for m in regex.finditer(key, s, overlapped=True)] print(ms) # [(0, 6), (5, 10), (9, 16)]参考
- 投稿日:2020-02-26T17:19:04+09:00
Pythonのimportをお手軽にカスタマイズする方法
何か特殊な事情があって動的にimportしたい時に使えます。
https://docs.python.org/ja/3/library/importlib.html
全てはドキュメントに書いてあったりするのですが、ちょっと分かりづらいので、手っ取り早く使いたい人は以下のコードを最初の方に仕込んでおくと良いと思います。
import importlib.machinery import sys class PackageFinder(importlib.machinery.PathFinder): @classmethod def find_spec(cls, fullname, path=None, target=None): if not path: path = [] # pathに探してほしいディレクトリのリストを指定する path.append("/path/to/package/root") return importlib.machinery.PathFinder.find_spec(fullname, path, target) sys.meta_path.append(PackageFinder)上記コードのpathを動的に切り替えるなりなんなりする事で、特定のディレクトリを同期して読み込んだり、リモートのディレクトリをダウンロードして読み込んだり、といったアクロバティックなこともできるようになります(推奨しているわけではないです)。
尚、sys.path_hooksというのもあって、こちらはパスを追加するのではなく、パスを引数にとって、それに対応するモジュールimportの挙動をカスタマイズしたい時に使うみたいです。
- 投稿日:2020-02-26T17:12:11+09:00
デコレータを評価するためのテストコード
Pythonのデコレータを評価するためのテストコードの書き方です.
もっといい方法あったら教えて下さい.デコレータ自体のテスト
hoge デコレータをつけると, 戻り値に 1 を加算します.
def hoge(func): @functools.wraps(func) def wrapper(n): return func(n) + 1 return wrapper @hoge def sample(n): return n * 2 if __name__ == '__main__': assert sample(3) == 7デコレータ自体は, 次のように評価できます (
ANY_VALUE
はなんでもよい).assert hoge(lambda n: 6)(ANY_VALUE) == 7引数があるデコレータのテスト
つぎの hoge デコレータを使うと, デコレータの引数に指定した値を戻り値に加算します.
def hoge(m): def decorator(func): @functools.wraps(func) def wrapper(n): return func(n) + m return wrapper return decorator @hoge(2) def sample(n): return n * 2 if __name__ == '__main__': assert sample(2) == 6デコレータ自体は, 次のように評価できます.
assert hoge(2)(lambda n: 4)(ANY_VALUE) == 6
- 投稿日:2020-02-26T15:50:41+09:00
Pythonではじめる機械学習 個人的備忘録 Part1
はじめに
Pythonで機械学習を学ぼうと思い立ち、
まず手に取った本「Pythonではじめる機械学習」を読んでいくときの個人的備忘録として書いています。IT企業には所属しておりませんが、これからいろいろと学んで仕事をしていけるよう精進してまいります。
初学者の方に少しでも力になればと思い、ログを残しています。scikit-learnのインストール
scikit-learn(サイキット・ラーン)と読みます。
下のリンクに取扱い説明書があります。
一部の人だけかもしれませんが、リンク先の図を見ているとgnuplotを思い出しますね。
・scikit-learnのドキュメント
・scikit-learnのユーザーガイドJupyter Notebookをインストールするかどうか悩んだのですが、最初なので本に従うことにしました。
使ってみれば便利かもしれないですしね。
Jupyter NotebookをインストールするためにはAnacondaをインストールします。
・Anaconda公式サイトのダウンロードページAnacondaをインストールするとこの本で用いるパッケージはすべて使うことができます。
既にPythonを使っている人はターミナルでpip install numpy scipy matplotlib ipython scikit-learn pandas pillowと打てばこの本で利用するライブラリをインストールすることができます。
Jupyter Notebookの使い方は下のリンク先に書いてあります。
【Jupyter Notebook】効果的な使い方を知ろう[Python/機械学習]NumpyとSciPyの違い
NumPy(ナンパイorナムパイ)とSciPy(サイパイ)です。
SciPyは各種ソフトの集合のことで、NumPyの機能はSciPyで全部使えます。
・SciPyとその仲間たち(NumPy, IPython など)の違いと関係
・NumpyとScipy
私のような初学者からすると、大した違いもまだ分からないので気にせずいきます。CSR
CSRはCompressed Sparse Rowの略で疎行列を圧縮した形になっています。
行列計算ってだいたいの成分が0の行列を扱うことが多いので、
無駄にデータが増えないような便利な形にしているんだと思います。COO形式はCoordinate Formatの略で普通に行列の番号を指定してあげるやり方です。
pandas
本には
from IPython import displayと書かれているのですが、その後
display(data_pandas)の箇所で
TypeError: 'module' object is not callableとエラーが出てしまいます。
僕の環境では、from IPython.display import displayとするとうまくいきました。
多分バージョンが違うからだと思います。アイリスのクラス分類
scatter_matrix関数を使うときに本では
grr = pd.scatter_matrix(iris_dataframe, c=y_train, figsize=(15,15), marker='o', hist_kwds={'bins': 20}, s=60, alpha=.8, cmap=mglearn.cm3)と記述があるが、僕のバージョンだと
grr = pd.plotting.scatter_matrix(iris_dataframe, c=y_train, figsize=(15,15), marker='o', hist_kwds={'bins': 20}, s=60, alpha=.8, cmap=mglearn.cm3)で動いた。
感想
今回は第1章はじめにを読んでわからなかったところを書いた。
環境の整備がほとんどであったが、最後に少しだけ機械学習についても触れた。
k-最近接法でアイリスの分類を行ったがモデルの詳しい部分は触れられていない。
パラメータがいろいろとあるが変更するパラメータはこの後の章で紹介していくと記述があった。
いまのところ、パラメータいっぱいでわけわからん状態ですがこつこつ続けていきたいと思っております。次回からは本格的に機械学習の内容、教師あり学習について学んでいく。
- 投稿日:2020-02-26T15:49:46+09:00
pythonのクラスへの理解奮闘記(2)仕事それだけじゃないよね?_クラスの継承について
前回までのあらすじ
ひとまず、クラスのメリットを享受したコーディングと問題に対する(ここでは稟議書に対する処理)アプローチはクリアでき、さらに有難いことにクラスについてのコーディングお作法もご指摘かなうことができた。だが、しかし経理部員の私としては稟議書担当係君にもっと仕事をしてもらいたいという情念があります。
継続的な処理の話
前回確かに「新しく回ってきた稟議書」に対しての処理を具現化することはできました。しかし、「既に回ってきたかつ未だ使用開始していない固定資産」に対する処理がおざなりです。この「既に回ってきたかつ未だ使用開始していない固定資産」についてはまた新しく稟議書が回ってくることはありません。そのため、継続的にケアして毎月利用開始しているかどうか担当者にメールしなければならないのです。
新しく発生する仕事
ここで新しく発生する仕事としては具体的に、前回利用開始に至らなかった案件に対して定期的にメールを送る仕事です。これをクラスの承継という方法で実現しようと試みます。前のコーディングは「新しく回ってきた稟議書」に対する処理だったしね。
新しく書いたコード
qiita.rbimport openpyxl import datetime class New_Assets(): def __init__(self,apdnum,apdmen,unitname,assetname,money): self.apdnum=apdnum self.apdmen=apdmen self.unitname=unitname self.assetname=assetname self.money=money #メールにPDFを添付する機能 担当者と担当部署から送り先メールアドレスを拾ってくる作業 def make_mail(self): mail=("{}{}様お疲れ様です。経理部の固定資産担当です。稟議書No{}の{}についてですが、\ \n当月の状況について利用開始しているか教えてください。また請求書もあれば請求書の添付も\ \nよろしくお願いいたします。".format(self.unitname,self.apdmen,self.apdnum,self.assetname)) print(mail) def insert_excel(self): wb=openpyxl.load_workbook("../data/発注固定資産管理表.xlsx") sh=wb.active for dt_row in range(2,50): if sh["A"+str(dt_row)].value!=None: continue else: sh["A"+str(dt_row)].value=self.apdnum sh["B"+str(dt_row)].value=self.unitname sh["C"+str(dt_row)].value=self.apdmen sh["D"+str(dt_row)].value=self.assetname sh["E"+str(dt_row)].value=self.money break wb.save("../data/発注固定資産管理表_{}_johannesrome作成.xlsx".format(datetime.date.today())) #上記前回書いた基幹部分にあたるコード #以下新規コード #New_AssetsクラスからサブクラスであるBuying_Assetsを作成し属性としてconsume(費消)を追加 class Buying_Assets(New_Assets): def __init__(self,apdnum,apdmen,unitname,assetname,money,consume): super().__init__(apdnum,apdmen,unitname,assetname,money) self.consume=consume def make_buying_mail(self): if self.consume=="未": print("{}の{}様お疲れ様です。経理部の固定資産担当です。先月未使用であった稟議書No{}の{}についてですが\ \n当月の状況について利用開始しているか教えてください。また請求書もあれば請求書の添付も\ \nよろしくお願いいたします。".format(self.unitname,self.apdmen,self.apdnum,self.assetname)) sh["G"+str(dt_row)].value="✔" else: None #以下出力コード wb=openpyxl.load_workbook("../data/発注固定資産管理表_johannesrome作成.xlsx") sh=wb.active for dt_row in range(2,50): apdnum=sh["A"+str(dt_row)].value unitname=sh["B"+str(dt_row)].value apdmen=sh["C"+str(dt_row)].value assetname=sh["D"+str(dt_row)].value money=sh["E"+str(dt_row)].value consume=sh["F"+str(dt_row)].value m=Buying_Assets(apdnum,apdmen,unitname,assetname,money,consume) m.make_buying_mail() wb.save("../data/発注固定資産管理表_johannesrome作成.xlsx")前回ご指摘のあったコーディング作法は修正済み
クラスの継承としてBuyin_Assetsをサブクラスとして作成し、属性としてconsumeを追加しただけ
そして、今回してほしいプログラムとして未使用資産のメール文作成と、作成したら✔をエクセルに入れる機能。そして実行すると
チェックが入りました(∩´∀`)∩
上手く来ました( ◠‿◠ )
総括
継承前のメソッドやデータを今回使うわけではなかったので設定テーマとアプローチとしては筋が悪いかなぁと思いました。もっと、個人的には継承前のメソッドもバリバリ活かすイメージがあったので。
ただ、コーディングをしてる最中わざわざdef make_buying_mail(self):のところでopenpyxlを開いて処理するコーディングだったところ途中クラスの属性を活かしたコーディングに変えられることに気づけたところがクラスのメリットかなぁと気づきを得られた次第です。クラスはどんどん使って早く慣れ親しみたいところです。
また、もっとこういうテーマだとクラスのメリットは享受できるよ~などいろいろご意見ありましたら、コメントお願いいたします!(`・ω・´)
- 投稿日:2020-02-26T14:38:45+09:00
Pythonで島探し (paizaランク S 相当) を解く
初めに
paizaのレベルアップ問題集を解いていたのですが、模範解答がなかったので自分で作ってみました。
言語はPython3です。問題
Paizaのスキルチェック見本問題「島探し (paizaランク S 相当)」
https://paiza.jp/works/mondai/skillcheck_sample/search-island?language_uid=python3
ログインしないと問題文が見れませんでした。
登録は無料ですぐにできるので、とりあえず登録してみることをおススメします。解答コード
find_lands.pycol, row = map(int, input().split()) map_list = [[0]* (col+2)] for _ in range(row): map_list.append([0] + list(map(int, input().split())) + [0]) map_list.append(map_list[0]) def check(x, y): lands = [[x,y]] while lands: x, y = lands.pop() map_list[y][x] = 0 # down if map_list[y+1][x] == 1: lands.append([x, y+1]) # right if map_list[y][x+1] == 1: lands.append([x+1, y]) # up if map_list[y-1][x] == 1: lands.append([x, y-1]) # left if map_list[y][x-1] == 1: lands.append([x-1, y]) count = 0 for r in range(1, row+1): for c in range(1, col+1): if map_list[r][c] == 1: check(c, r) count += 1 print(count)参考
https://maro28.com/paiza-s-rank-mod7
まとめ
初めでQiitaの記事を書いてみましたが、シンプルで使いやすくていいですね!
これからちょくちょく使っていけたらいいなと思います。