- 投稿日:2022-01-31T23:46:25+09:00
foliumを使って東京のコロプレスマップを作成する
はじめに 今回は僕の記事では同じみになったLeafletをPythonで使えるようにするためのライブラリfoliumを使って、東京のコロプレスマップを作成してみたいと思います。 foliumを使って東京のコロプレスマップを作成する 使用するデータ 以下のレポジトリに保存されているデータを利用させていただきます。 実際のコード # モジュールの読み込み import folium import geopandas as gpd # 都道府県データの読み込み geojson_path = "https://raw.githubusercontent.com/niiyz/JapanCityGeoJson/master/geojson/prefectures/13.json" geojson = gpd.read_file(geojson_path) # 元となる地図の作成 map = folium.Map( location=[36, 140], tiles='cartodbpositron', zoom_start=9 ) # 都道府県のデータを描画 folium.Choropleth( geo_data=geojson, name='choropleth' ).add_to(map) map 結果 このように東京のコロプレスマップをPythonで作成することができました。 さいごに 今回はfoliumを使って東京のコロプレスマップを作成してみました。以下のレポジトリには各都道府県のGeoJsonがあるので、興味のある都道府県のデータを使って何か可視化してみるのもいいかもしれません。
- 投稿日:2022-01-31T23:27:18+09:00
pythonでdequeの中身を出力するには`*dq`とすればOK
pythonでdequeの中身を出力するには*dqとすればOK test.py from collections import deque L = deque() L.append("a") L.append("b") L.append("c") # deque 1つの場合 print(*(L)) L2 = deque() L2.append("d") L2.append("e") L2.append("f") # deque 2つ足す場合 print(*(L + L2)) # 上記だと、なぜか空白が一緒にprint()されるので、、、文字列にして空白を削除する str = "".join(list(L + L2)) print(str) 実行 $ python test.py a b c a b c d e f abcdef 参考
- 投稿日:2022-01-31T23:27:11+09:00
foliumと日本の人口データを使ってコロプレスマップを作成してみた
はじめに 以前の記事でfoliumを使ってコロプレスマップを作成できることはわかったので、今回は実際に日本の人口のデータを用いてコロプレスマップを作成してみました。 以下は以前foliumを使ってコロプレスマップを作成してみた記事です。 foliumと日本の人口データを使ってコロプレスマップを作成してみた 使用したデータ 人口のデータはe-Statよりダウンロードしたものを整形して使用しました。 またコロプレスマップに使用するデータは、以下のリポジトリに上がっているjapan.geojsonを使用させていただきました。 実際のコード # ライブラリの読み込み import folium import geopandas as gpd import pandas as pd # 人口のデータの読み込み df = pd.read_csv('./population.csv') # カラムの型変換 df['population'] = df['population'].map(lambda x: int(x.replace(',',''))) # geojsonの読み込み geojson_path = "https://raw.githubusercontent.com/dataofjapan/land/master/japan.geojson" geojson = gpd.read_file(geojson_path) # 元となる地図の作成 map = folium.Map( location=[36.288055, 138.097424], # 日本が地図の中心になるように設定、位置は長野県上田市 tiles='cartodbpositron', zoom_start=4) # 都道府県のデータを描画 folium.Choropleth( geo_data=geojson, name = 'choropleth', data = df, columns=['code', 'population'], key_on='feature.properties.id', fill_color='BuPu', fill_opacity=1, ).add_to(map) map 結果 さいごに 今回はfoliumと日本の人口データを使ってコロプレスマップを作成してみました。簡単にできるので、興味のあるひとはぜひ試してみてください。
- 投稿日:2022-01-31T23:17:13+09:00
pythonのprint()で改行つけないためには`print("test", end='')`みたいにすればOK
pythonのprint()で改行つけないためにはprint("test", end='')みたいにすればOK test.py print("test") print("test", end='') print("test", end='') 実行 $ python test.py test testtest 以上 参考
- 投稿日:2022-01-31T22:38:57+09:00
pixivの小説作品のURLからテキストを保存する
どうして作ったの? ※あくまでブラウザ内で確認するのが難しいテキスト情報を手元で確認するためのツールとしての利用を想定しています TRPGの一つ、CoC(コールオブクトゥルフ)を最近頻繁にプレイしていてページ内だけで確認するのが手間だな・・・と思ったので作りました pixivで一般公開されている名作のシナリオのテキスト量は膨大で、数十ページにおよぶためpixivサイト内だけで情報を確認することは難しい! ということでpixivの小説作品のURLから本文のテキストだけを抽出するコードを作ってみました 環境 Google Colab ざっくりやってること seleniumを使って仮想ウェブブラウザを構築 html内のタグ情報を使って本文のテキストを取得 ページ遷移をさせつつ、リダイレクト後に同じページに飛ばされたらwhile文をbreakしてファイルを出力する ソース Colabへの環境構築 !pip install selenium !apt-get update !apt install chromium-chromedriver !cp /usr/lib/chromium-browser/chromedriver /usr/bin ライブラリの読み込みと環境設定 import urllib.request from bs4 import BeautifulSoup import time from selenium import webdriver options = webdriver.ChromeOptions() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') 関数 def main(url): lists = [] idx = 1 while True: browser = webdriver.Chrome('chromedriver',options=options) search_url = f'{url}#{str(idx)}' res = browser.get(search_url) time.sleep(1) current_url = browser.current_url print(idx) print(search_url) print(current_url) # リダイレクトが発生した際に同じURLに転送された場合処理を終了する if search_url != current_url: if idx == 1: pass else: break source = browser.page_source.replace("<br>", "\n") # レイアウト上の改行要素を事前にテキスト用に変換しておく soup = BeautifulSoup(source, 'html.parser') sentences = soup.find('p', attrs={'class': 'sc-dIvrsQ leiCDy'}) # pixivサイトの仕様変更に伴い変更の可能性あり # 文字部分のみを抽出する sentences = sentences.get_text().replace("\r", "").replace("\u3000", "") lists.append(sentences) idx += 1 lists = '\n'.join(lists) return lists 呼び出し例 url = 'https://www.pixiv.net/novel/show.php?id=16394819' lists = main(url) print(lists) このあとは好きにテキストファイルに保存するなりコピペするなりできます 終わりに 良きCoCライフを!!! 個人的にリダイレクトを待つコードの位置を間違えてリダイレクト前のURLを一生参照していたのが最初は何故か分かりませんでしたが気づいてしまったら綺麗に動いてくれて一安心 一応pixiv小説とかにも汎用性があるコードになっているはずですが、pixivさんのサイト仕様の変更に伴って利用できなくなると思うのでニーズがあれば更新いたします
- 投稿日:2022-01-31T22:38:06+09:00
データ分析未経験者が社内データ分析コンペに出場してわかったこと
はじめに 昨年末から今年頭にかけ,会社でデータ分析推進部署主催の社内の実データを使ったデータ分析のコンペがあり,未経験ながらも興味があったので本物のデータを使ってデータ分析を実体験できるチャンスと捉えて参加しました. コンペの振り返りと頭の整理のために自分なりに理解したデータ活用・分析のイメージをまとめます. 注) 初心者が振り返りも含めて自分の言葉でまとめたものなので間違い等あるかと思います.何か問題点・間違い等ありましたらコメントいただけますようよろしくお願いします. 参加したデータ分析コンペの概要 データや評価内容の詳細は載せられませんが,当たり障りの無いレベルでざっくりと今回参加したコンペの概要をまとめます. 目的:データ活用が可能な人材の育成 + 社内業務へのフィードバック 参加者:自分で参加したいと応募した人(データ分析・活用の経験問わず) 利用データ:社内業務で収集した実データ(正解あり) データ形式:表形式データ 予測対象:複数分類(グループ分け) 期間:3週間 コンペ環境:社内IT部門等の関係部署が協力して専用環境を構築 わかったこと 今回参加したコンペは正解ありの教師あり学習で予測精度を競ったため,教師あり学習を前提にわかったことをまとめます. データ分析・活用とは? ある定まったフォーマットのデータをインプットに欲しい結果を予測・判断できる数式的なもの(データ活用の世界ではモデルと言う)を作成し,必要なデータを準備すれば誰でも一定のレベルで今後の予測や判断ができるようにすること. インプットと予測の例:天気・温度・湿度・曜日 → 入場者 や 売上高 の予測 など インプットと判断の例:画像 → 文字の識別 や 偽札の判定 など データ分析コンペとは? 企業などがスポンサーになり,所有するデータと予測したい内容を提示して,予測に使える計算式を参加者に作成してもらい予測精度を競いあう大会.参加者が作成した計算式に予測したい内容をマスキングしたデータを投入し,予測結果の精度を競いあう.上位入賞者には賞金が支払われるものもあり. 有名なデータ分析コンペとして Kaggle がある. データ分析をする環境 データ分析はPythonを使うのが一般的. PythonとVisualStudioCodeと関連する拡張機能をPCにそれぞれインストールして環境構築することもできる.処理速度はPCのスペックに依存するので大規模な分析には不向き. GoogleはGoogleアカウントがあれば無料で利用できるブラウザベースのデータ分析環境を提供している.簡単な分析ではストレスなく利用できた.大規模な分析では??? データ分析コンペで有名なKaggleでは,Kaggle内にデータ分析環境があり利用可能.簡単な分析ではストレスなく利用できた.大規模な分析では??? AWSのSavgeMaker等のSaaSもあり.AWSの場合,利用するインスタンスを選択できるようなので必要に応じてリソースを選択することが可能.ただし,大規模リソースを利用すればその分コストは増加することにつながるのでリソースサイズの検討が必要になりそう. モデルの作り方 インプットデータの形式と予測したいものによって,この手法を使えばだいたいOK,という指針がある. それぞれの手法はpythonのライブラリで提供されているので,作りたいモデルを指定して学習用データを動作パラメータを設定すれば予測するモデルを作成してくれる. 手法ごとにパラメータが違う.パラメータチューニングで予測精度が上がったり下がったりする. モデル学習用データの準備方法 コンペでは提供されたデータを学習用データ(参考書)と評価用データ(模擬試験)に分けて,学習用データで勉強→評価用データで正解率の確認をしてモデルの精度を上げていく. 学習用データ(参考書)と評価用データ(模擬試験)の準備方法にもいろいろあり,初めに比率で分けてそれを使い続けるホールドアウト法や,全データで学習用データ・評価用データの組み合わせを網羅する交差検証などの手法がある. 予測精度を上げるには? インプットデータと結果の間で影響度が高いインプットがどれか?を見つけ出すのが重要. インプットと結果の関係性を推測するのも重要. 物理現象が絡む場合は,データの物理現象の意味を知ることも必要. インプットと結果は1:1の関係ではなく,N:1で関係しあっている場合が多いので,複合的に関係性を見つけ出す必要あり. インプットのデータの特徴をより如実に表すようにインプットデータを加工する必要あり. 加工方法の例:対数をとる,平均値/中央値/標準偏差 コンペの結果と感じたこと 今回のデータ分析コンペでは,インプットデータの加工はガイド通りの加工にとどまり,何種類かのデータ分析モデルを実装して精度比較をしましたが,ある一定の精度で頭打ちとなってしまいました. コンペの上位者のコメントを聞くと,インプットデータと予測対象との関係性に着目してインプットデータの特徴を表すように加工することに注力したとのことで,やはりモデル作成の前の人間によるデータ分析や推論が重要であると実感しました. まとめ データ分析を実際に手を動かして体験しながら学んだことで,実は手の届く世界の技術であることがわかり大変有意義な経験をすることができました. とは言え,データ活用をがっつり使うには数学的知識や統計学の知識などの知識は弱いなということを実感したので,ちょっとずつでも勉強していきたいな. あと,良いデータ活用をするには実際の業務背景や業務モデル,対象の物理現象などを理解しないといけないということも聞きましたが,ここらへんはシステム開発と一緒ですね. 最後に,このような機会を作ってくれたコンペ企画・運営をしていただいた方々,本当にありがとうございました. 補足 2022年の目標である「外に発信をしよう」の第一歩として,初めての記事投稿をしてみました.自分で分かったつもりでいることでも,文章にまとめるのは難しいですね.
- 投稿日:2022-01-31T22:20:08+09:00
ABC237 A~D問題 ものすごく丁寧でわかりやすい解説 python 灰色~茶色コーダー向け #AtCoder
ABC237(AtCoder Beginner Contest 237) A~D問題の解説記事です。 灰色~茶色コーダーの方向けに解説しています。 その他のABC解説、動画などは以下です。 A - Not Overflow 問題文の通りNが-2^31以上かつ2^31未満かを確認します。 条件を満たすなら「Yes」そうでないならば「No」を出力します。 「aのx乗」はa**xと書きます。 「以上」は「<=」 未満は「<」 です。 「Yes」「No」は文字列なので出力の際、"Yes","No"とダブルクオーテーションをつけてください。 入力の受け取り、出力がわからない方は以下の記事を参考にしてください。 【提出】 #入力の受け取り N=int(input()) # -2^31以上2^31未満ならば if(-2)**31<=N<2**31: # 「Yes」 を出力 print("Yes") #そうでないならば else: # 「No を出力」 print("No") B - Matrix Transposition 上から1行目、左からj列目の要素をA[i][j]と書きます。 この問題は要するに B[i][j]=A[j][i]となるように行列を作って出力せよ という問題です。 まず入力を受け取ります。二次元配列を使います。 二次元配列は空のリストを作り、そこにさらにリストを入れていくことで作れます。 まずAという空のリストを作り、1行ずつ受け取ってAへ入れる、ということを繰り返します。 #空のリストを作る A=[] # H回 for i in range (H): # 一行ずつリストとして受け取り A_tmp=list(map(int,input().split())) # Aへ入れる A.append(A_tmp) こうすることでAの上からi行目、左からj列目をA[i][j]と確認できます。 ただし、0インデックスとなるため、行列の左上はA[1][1]ではなくA[0][0]となることに注意してください。 続いてBを確認していきます。 BはまずW*H(W行H列)の二次元配列を作ってしまい。そこに値を埋めていくという方法を取ります。 W*Hの二次元配列を作るには以下のように書きます。 # W*Hの二次元配列を作成 B=[[0]*H for i in range(W)] 初期値はすべて「0」になっていますが、これはどうせ更新するのでなんでもいいです。 あとはB[i][j]=A[j][i]と順に埋めて、最後にBを一行ずつ出力すれば終わりです。 Bの出力の際 print(B[行番号]) とすると[]つきで出力されますが、 print(*B[行番号]) とすると[]なしで出力できるので楽です。 【提出】 # 入力の受け取り H,W=map(int,input().split()) #空のリストを作る A=[] # H回 for i in range (H): # 一行ずつリストとして受け取り A_tmp=list(map(int,input().split())) # Aへ入れる A.append(A_tmp) # W*Hの二次元配列を作成 B=[[0]*H for i in range(W)] # i=0~(W-1) for i in range (W): #j=0*(H-1) for j in range (H): #Bの値を更新 B[i][j]=A[j][i] # i=0~(W-1) for i in range (W): # 一行ずつ出力 print(*B[i]) C - kasaka 回文であるなら先頭と後ろの「a」の個数は一致している必要があります。 後ろの「a」の個数を数える場合はiを一つずつ減らしながらS[i]を確認します。 iを一つずつ減らしていくには以下のようにfor文を書きます。 for(開始,(終了-1),減る量): i=(Sの文字数-1)~0まで一つずつ減らしていく場合なら以下のようになります。 # i=(Sの文字数-1)~0 -1ずつ for i in range(len(S)-1,-1,-1): 先頭の「a」の個数が後ろの「a」の個数より多ければ回文にすることはできません。 例:aaabbaa よって「後ろの「a」の個数」<「先頭の「a」の個数」なら「No」です。 そうでないケース、すなわち「先頭の「a」の個数」≤「後ろの「a」の個数」場合、先頭に足りない「a」をつけ足して回文かどうかの判定をします。 回文かの判定は単純に1文字ずつ見ていくだけです。 pythonでは0インデックスになるため、i=0~(N-1)についてS[i]=S[(N-1)-i]となっていることが回文であることの条件です。 【提出】 # 入力の受け取り S=input() # 先頭にある"a"の数 a_front=0 # i=0~(Sの文字数-1) for i in range(len(S)): # i文字目が"a" なら if S[i]=="a": #カウント a_front+=1 # そうでないなら else: # for を抜ける break # 後ろにある"a"の数 a_back=0 # i=(Sの文字数-1)~0 -1ずつ for i in range(len(S)-1,-1,-1): # i文字目が"a" なら if S[i]=="a": #カウント a_back+=1 # そうでないなら else: # for を抜ける break # 「後ろの「a」の個数」<「先頭の「a」の個数」 if a_back<a_front: # 「No」を出力 print("No") # 終了 exit() # 先頭に足りない個数分「a」を追加 S="a"*(a_back-a_front)+S # Sの長さ N=len(S) # i=0~(N-1) for i in range(N): # 回文か判定 # 1文字目と(N-i-1)文字目が違うなら if S[i]!=S[N-i-1]: # 「No」を出力 print("No") # 終了 exit() # ここまで来たら回文 # 「Yes」を出力 print("Yes") D - LR insertion pythonはdeque(両端キュー)と呼ばれる機能を使えます。 これは先頭と末端への要素追加が0(1)でできるリストだと思えばいいです。 (通常のリストだと要素数Nの場合、先頭への要素追加がO(N)となります) dequeがわからない人は以下を参照してください。 dequeについて dequeはリストのようなものですが、先頭から要素を取り出す操作をO(1)で行うことができます。 (リストだとO(N)かかります) インポート:from collections import deque 作成:変数名=deque() 先頭(左端)に要素追加【O(1)】:変数名.appendleft(要素) 末尾(右端)に要素追加【O(1)】:変数名.append(要素) 先頭(左端)から要素を取り出して削除【O(1)】:変数名.popleft() 末尾(右端)から要素を取り出して削除【O(1)】:変数名.pop() 【使用例】 # インポート:from collections import deque from collections import deque # 作成:変数名=deque() que=deque() # 先頭(左端)に要素追加【O(1)】:変数名.appendleft(要素) que.appendleft(1) # 末尾(右端)に要素追加【O(1)】:変数名.append(要素) que.append(3) # 先頭(左端)から要素を取り出して削除【O(1)】:変数名.popleft() x=que.popleft() # 末尾(右端)から要素を取り出して削除【O(1)】:変数名.pop() y=que.pop 詳しく知りたい人は以下のページを見てください。 dequeなら両端への要素追加はO(1)で出来ますが、問題文の通り「0」から始めると両端でなく真ん中に要素を追加するケースでTLEします。 こういった指示に従って操作を行う系の問題では「逆から操作する」というのがよく使うテクニックになります。 出力例1(1 2 4 5 3 0)を眺めてみると、「5」から逆順で追加すれば 先頭に「4」(4 5) 末端に「3」(4 5 3) 先頭に「2」(2 4 5 3) と先頭か末尾への要素追加のみで解けることがわかります。 よってSを逆順に確認して 「R」→先頭に追加 「L」→末端に追加 としていけばOKです。 先頭(左端)への追加はappendleftを使います。 【提出】 # 入力の受け取り N=int(input()) S=input() # deque のインポート from collections import deque # キューを用意 que=deque() # Nを追加 que.append(N) # i=(N-1)~0 -1ずつ for i in range(N-1,-1,-1): # Sのi文字目が「R」 if S[i]=="R": # 先頭(左端)へ「i」を追加 que.appendleft(i) # そうでない場合(Sのi文字目が「L」) else: # 末端(右端)へ「i」を追加 que.append(i) # 答えの出力 # ([]がいらない場合は先頭に「*」をつける) print(*que) 【広告】 「AtCoder 凡人が『緑』になるための精選50問詳細解説」 AtCoderで緑になるための典型50問をひくほど丁寧に解説した本(kindle)、pdf(booth)を販売しています。 値段:100円(Kindle Unlimited対象) 【kindle】 【booth(pdf)】 1~24問目まではサンプルとして無料公開しています
- 投稿日:2022-01-31T22:15:23+09:00
SQLAlchemy同期/非同期の比較
PythonのフレームワークであるFastAPIを触る機会があって、その中で非同期プログラミングの存在を知りました。 非同期プログラミングについて色々調べていくと、DB接続も非同期処理に対応してきていることが分かり、PythonのSQLライブラリで有名なSQLAlchemyもバージョン1.4から非同期処理に対応しているようです。 時代は非同期プログラミングみたいですね。 ただSQLAlchemyの非同期処理の記事がまだ少なく、同期処理から非同期処理に変えるときに、苦労しそうなので、今回はSQLAlchemy同期処理と非同期処理で設定やクエリがどう違うの比較しようと思います。 SQLAlchemyにはcoreとormがありますが、ここではormで書いていきます。 APIにはFastAPI、DBはPostgreSQLを使っていきます。 動作確認用コードは下記リポジトリに置いています。 同期処理:https://github.com/y-p-e/sync_postgresql_fastapi 非同期処理:https://github.com/y-p-e/async_postgresql_fastapi 事前準備と前提条件 まずは、APIとDBを接続するための設定を比較していきます。 PostgreSQL-ドライバー まずは、アプリ(FastAPI)とDB(PostgreSQL)を接続するためのドライバーからです。 同期接続 PostgreSQLと同期接続するために使えるドライバーはいくつかあります。 psycopg2 pg8000 py-postgresql この中ではpsycopg2が一番有名かと思うので、psycopg2を選択します。 参考:PythonからPostgreSQLに接続する方法 非同期接続 PostgreSQLと非同期接続するためのドライバーもいくつかあります。 Databases Tortoise ORM ormantic GINO asyncpg ほとんどがSQLAlchemy Coreの方に対応しているのですが、FastAPIでコードを書いていくならSQLAlchemy ORMに対応していた方が良いので、ここではasyncpgを選択します。 参考:データベースの非同期処理 ちなみに、asyncpgはpsycopg2よりも3倍早いそうです。これだけでもasyncpgにする価値はありそうですね。 In our testing asyncpg is, on average, 3x faster than psycopg2 (and its asyncio variant -- aiopg). 参考:MagicStack/asyncpg SQLAlchemy-エンジン & セッション SQLAlchemyを動かすためのエンジンの設定の違いを見てみます。 同期接続 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+psycopg2", "admin", "password", "db", "5432", "sync_db") engine = create_engine(DB_URL, echo=True) Session = scoped_session( sessionmaker( autocommit = False, autoflush = False, bind = engine)) 非同期接続 from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+asyncpg", "admin", "password", "db", "5432", "async_db") engine = create_async_engine(DB_URL, echo=True) Session = scoped_session( sessionmaker( autocommit = False, autoflush = False, bind = engine, class_=AsyncSession)) 非同期の場合は非同期用のcreate_async_engineとAsyncSessionをインポートする必要があります。 そして、sessionmakerの引数にclass_=AsyncSessionを設定する必要があります。 ドライバー&エンジン&セッション DBと接続するために、ドライバー、エンジン、セッションというものが出てきました。 最初これらの違いが良くわからなかったので、現実世界と照らし合わせて自分の理解を示しておきます。 自分の理解なので厳密には違うと思いますが、大きくは外れていないかと思っています。 まずドライバーは、DBへ続く道なのかなと思っています。そもそも道がなかったら目的地へも辿りつけないので、一番最初に道を作ってあげるイメージです。 その次に、エンジンです。エンジンというと車のエンジンが一番身近にあるかなと思います。なので、目的地に行くための乗り物がSQLAlchemyというイメージです。 最後にセッションです。セッションは、SQLAlchemy専用の道路というイメージです。大きな道にはバス専用の道路とかあるみたいな感じかなと思っています。 マイグレーション では、実際にCRUD処理の違いを見ていく前に、テーブルを作っておきましょう。 テーブルを作るにはマイグレーションをする必要がありますが、マイグレーションを非同期でする必要はないかなと思うので同期処理で作っておきます。 SQLAlchemyで作られたテーブルのマイグレーションを実行するファイルを用意しました。 from sqlalchemy import create_engine from api.models.task import Base DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+psycopg2", "admin", "password", "db", "5432", "async_db") engine = create_engine(DB_URL, echo=True) def reset_database(): Base.metadata.drop_all(bind=engine) Base.metadata.create_all(bind=engine) if __name__ == "__main__": reset_database() CRUD 準備ができたので、CRUD処理のそれぞれの違いを見ていきます。 Create 同期接続 def create_task(db: Session, task_create: task_schema.TaskCreate): task = task_model.Task(title=task_create.title) db.add(task) db.commit() return task 非同期接続 async def create_task(db: AsyncSession, task_create: task_schema.TaskCreate): task = task_model.Task(title=task_create.title) db.add(task) await db.commit() await db.refresh(task) return Read 全件取得 同期接続 def get_tasks(db: Session): tasks = db.query(task_model.Task).all() return 非同期接続 async def get_tasks(db: AsyncSession): result = await (db.execute(select(task_model.Task.id,task_model.Task.title,))) return result.all() Read 1件取得 同期接続 def get_task(db: Session, task_id): task = db.query(task_model.Task).filter(task_model.Task.id == task_id).first() return task 非同期接続 async def get_task(db: AsyncSession, task_id): result = await (db.execute(select(task_model.Task.id,task_model.Task.title,).filter(task_model.Task.id == task_id))) return result.first() Update 同期接続 def update_task(db: Session, task_id, task_create: task_schema.TaskCreate): task = db.query(task_model.Task).filter(task_model.Task.id == task_id).first() task.title = task_create.title db.add(task) db.commit() return task 非同期接続 async def update_task(db: AsyncSession, task_id, task_create: task_schema.TaskCreate): result = await (db.execute(select(task_model.Task).filter(task_model.Task.id == task_id))) task = result.first() task[0].title = task_create.title db.add(task[0]) await db.commit() await db.refresh(task[0]) return task[0] Delete 同期接続 def delete_task(db: Session, task_id): task = db.query(task_model.Task).filter(task_model.Task.id == task_id).delete() db.commit() return 非同期接続 async def delete_task(db: AsyncSession, task_id): result = await db.execute(select(task_model.Task).filter(task_model.Task.id == task_id)) task = result.first() await db.delete(task[0]) await db.commit() return task まとめ SQLAlchemyのORMも非同期処理に対応したということで、同期処理と非同期処理の違いをまとめてみました。 非同期処理でも同期処理と似たような書き方はできるけど、微妙に違うので気をつけて書いていかないとハマるポイントかなと思います。 あとは、使えるドライバーがそもそも違うのでここも注意するところかなと思います。ただ、非同期処理対応のasyncpgはpsycopg2よりも3倍早いということなので、かなりメリットはありそうな気がします。
- 投稿日:2022-01-31T22:10:13+09:00
Flickrの写真を使って写真の位置情報を可視化してみた
はじめに SNSにはたくさんの写真がアップロードされています。地図を使って可視化することが好きな僕は、アップロードされている写真の位置情報を使って地図に可視化できないかなと思いました。調べてみると位置情報の取得が簡単にできるサービスでFlickrというものがありました。今回はFlickrに投稿されている写真を使って、その写真の位置情報をもとに地図に可視化してみたいと思います。 Flickrの写真を使って写真の位置情報を可視化してみた 実現したいこと 今回はFlickrのAPIを扱うPythonライブラいを使って写真の情報を取得し、そのデータをLeafletをPythonで使えるようにしたPythonライブラリのfoliumを使って可視化していこうと思います。地図上に写真の位置情報をもとにピンをたて、ピンには実際のその写真が見えるようにします。 Flickrとは Flickrとは、写真を共有することのできるサービスです。Instagramのようなサービスですね。今回はこのFlickrのAPIを使って写真の情報を取得しました。 Flickr APIの利用方法については、以下の記事を参考にしました。 実際のコード Flickr APIを利用して写真の情報を取得方法については、以下の記事と公式ドキュメントを参考にしました。 # 必要なライブラリ from flickrapi import FlickrAPI import folium import json import pandas as pd # Flickr APIを利用するための情報を記載 key = "XXXXXXXXXXXXXXXXXXX" secret = "XXXXXXXXXXXXXXXXXX" # Flickr APIを使って写真の情報を取得 # 今回は写真のタイトルに"fish"が関連しているものを500件を上限に取得 # 位置情報は、extrasでgeoを指定 flickr = FlickrAPI(key, secret, format='parsed-json') result = flickr.photos.search( text = 'fish', per_page = 500, media = 'photos', sort = 'relevance', safe_seach = 1, extras = 'url_q, licence, geo' ) photos = result['photos']['photo'] # 地図の中心を、写真に保存されている緯度経度の平均を出すため lat_sum, lon_sum = 0,0 for photo in photos: lat_sum += float(photo['latitude']) lon_sum += float(photo['longitude']) lat_mean = lat_sum / len(photos) lon_mean = lon_sum / len(photos) # 元となる地図を作成 map = folium.Map(location=[lat_mean, lon_mean], zoom_start=3) # 写真に保存されている位置にピンをたて、そのピンに写真を載せる for photo in photos: folium.Marker( location=[float(photo['latitude']), float(photo['longitude'])], popup="<p>{}<br /><img src='{}'</p>".format(photo['title'],photo['url_q']) ).add_to(map) # 地図を描画 map 結果 ピンをプロットした地図は、以下のような感じです。 ピンをクリックすると写真が表示されます。 さいごに 今回はFlickrに投稿されている写真の位置情報をもとに地図に可視化してみました。写真の位置情報の利用は色々な使い道があると思うので、今度は他の使い方を検討してみたいと思います。
- 投稿日:2022-01-31T21:35:45+09:00
Streamlit PythonをIBM Cloudへデプロイするよ
手軽にPythonにUIを追加してアプリケーションを作成できるStremlitを使い始めています。ローカルで実行できるので開発やテストは問題ないのですが、できあがったアプリを他の人に見てもらうにはどこかにデプロイする必要があります。 StremlitではStremlitSharingというStremlitの専用の公開サイトがあるのでそこへ手軽にデプロイできるのですが、他のデプロイ先も試してみようと思います。 Herokuとかへのデプロイする情報が多いのですが、今回はあまり情報のないIBM Cloud Code Engineへのデプロイを試してみました。 事前チェック 下記のものを準備しておいてください。 * IBM Cloud アカウント (ライトアカウント以上) * GitHub アカウント * Git CLI IBM Cloud IBMのクラウドサイトです。企業向けに力を入れてる感じです。開発したアプリケーションをデプロイする方法としてはいろいろあるのですが、昨年2021年に、Code Engineというサーバーレスランタイム環境が正式公開されたのでこちらを試してみます。 Code Engine IBM Cloud Code Engine サーバーレスのコンテナ、アプリケーションコード、バッチを実行できる環境として提供されています。開発者がサーバーとかインフラとか意識せずにアプリケーションを公開できる環境として便利ですね。 ここにアプリケーションをデプロイする方法としてはあらかじめDockerイメージを作成する方法とGithubに配置したソース・ファイルを元にしてDockerイメージを作成してデプロイする方法があります。(HerokuとかStremlitSharingと同じような使い方というとGithubソースの方なのでこちらをやってみます。) Github準備 サンプルソースの準備 作成したソース・ファイルをGithubで公開します。参照してください。 https://github.com/saboten10/simple_streamlit 必要なものは下記の3つのファイルになります。 * python program file * Dockerfile * requirement.txt Dockerfileの説明 IBM Cloude Code Engineへデプロイするのに必要となるDockerfileになります。Dockerイメージファイルのサイズが気になる人はSlimも利用できます。 Dockerfile # For more information, please refer to https://aka.ms/vscode-docker-python # you can reduce the docker image size by using python:3.x-slim-buster #FROM python:3.8 FROM python:3.8-slim-buster # Keeps Python from generating .pyc files in the container ENV PYTHONDONTWRITEBYTECODE=1 # Turns off buffering for easier container logging ENV PYTHONUNBUFFERED=1 # for python:fullpackage RUN apt-get update && apt-get upgrade -y WORKDIR /app # Exposing default port for streamlit EXPOSE 8501 # Install requirements COPY requirements.txt . RUN pip install -r requirements.txt #RUN python -m pip install -r requirements.txt # Copy necessary files COPY . /app # Creates a non-root user with an explicit UID and adds permission to access the /app folder # For more info, please refer to https://aka.ms/vscode-docker-python-configure-containers RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app USER appuser # Launch app when container is run CMD streamlit run sample1.py Code Engine準備 IBM Cloudの中の操作を入れて大体の準備を記述する。 Code Engineのアクティベート IBM Cloudのカタログの中のCotainerの中にCode Engineがあります。 デプロイ操作 Githubにアップロードされているソースコードを元にIBM Cloud Code Engineへデプロイする方法を最初から手順を順番にやっていきます。 ソース・コードから始める Code EngineのスタートページにDockerイメージから始めるか、ソースコードから始めるか選ぶところがあるのでソースコードを選択してください。 ソースコードリポジトリには自分のGithubのソースコードリポジトリを指定します。自分のはパブリックにしていますが非公開でも対応できます。 ソースコードリポジトリに自分のGithubを指定して、「作成の開始」ボタンを押します。 作成の開始 なんか翻訳が変なのかな、日本語的におかしな感じがしますが作成するアプリケーションの情報を入れていきます。 上から順番に指定していきます。 アプリケーション名:simple-streamlit-application-20220130 としておきます。アンダーバーが使えないので注意してください。 プロジェクトの作成ボタンを押すと画面の右側に次のような画面が出てきます。 アプリケーションの作成 なんかプロジェクトがロードできないとかエラーが出たのでCode Engineのメニューから作成したプロジェクトを選択してアプリケーションの作成を選択 ※本当はこのまま進めるはずです。 赤枠部分を入力して、「ビルド詳細の指定」ボタンを押す。 ビルド詳細の指定画面が出てくるのでブランチ名を「master」として「次」へ その後はディフォルトのまま進むと作成するDockerイメージを格納するリポジトリが作成されて元の画面に戻ります。 ビルドとデプロイの実行 いよいよ「作成」ボタンを押します。 イメージビルドから始まって、成功すると自動的に環境へデプロイまで進みます。 「準備完了」が表示されるとデプロイが完了してアプリケーションの準備ができました。 アプリケーションの実行 アプリケーションのテストを押して、「アプリケーションのURL」を押すと対象のURLへアクセスされます。 はい、作成したStreamlitアプリケーションが表示されました!! おつかれさまでした。 これでアプリケーションをデプロイする環境ができたので、もとのGithub側を変更してビルド、デプロイすればアプリケーションを変更することもできます。 今回のサンプルはStreamlitのタイトルとマークダウンしか出力していないシンプルなプログラムですがベースはできましたので自分のコードで試してみてください。 お金の話 IBM Cloud Code Engineはクレジットカードを登録したライトアカウント以上で利用できます。 費用のページに詳細が書かれていますが、 * 作成したDockerイメージを格納するレジストリが512MBまでは無料ですが、それ以上だと無料プランからアップグレードする必要があります。この場合は超過した容量が有料(今回のサンプルは273MBなので無料枠です) * 同じくレジストリからPULLする容量にも制限があります。月間5GB以内が無料枠、それ以上は同じくアップグレードが必要(従量課金) * 作成したアプリケーションについてもヒット数によって課金です。月間で10万アクセスなのでテストプログラムとかならまぁ大丈夫かな。 実際、今回のDockerファイルではslimイメージを指定しているけど、普通にStreamlitでイメージ作ったら512MBを超えちゃってる。 はみ出した費用はわずかだけど完全に無料でやりたいよね。 終わりに 今回はPythonで手軽にUIを作成できるStremlitをIBM Cloudへデプロイする方法について書きました。 * Streamlit Sharing (Streamlit Cloud) * Heroku * IBM Cloud とデプロイを試してきましたがStremlit SharingはStremlitを使ったアプリのみが対象でGithub側のソースはパブリック公開しておかないといけないです。それに対してHeroku、IBM CloudはStremlit以外でも大丈夫ですし、ソースコードもパブリック公開する必要はないのでソースを公開したくない人には選択肢としていいと思います。 特にIBM Cloudには他にもたくさんのサービスがあるのでそれを利用したPythonアプリケーションを作成するのであれば今回のIBM Cloud Code Engineにデプロイするメリットはあると思います。 (もちろんIBM Cloud Code EngineからAzureとかGoogleとか他のAPIも呼び出すことはできます。)
- 投稿日:2022-01-31T21:28:49+09:00
2011年3月11日に起きた地震情報を可視化してみた
はじめに 2022年1月22日に九州で大きな地震が発生しました。地震といえば、僕は2011年の東日本大震災を思い出します。今回は東日本大震災が発生した2011年3月11日の地震情報をfoliumを使って可視化してみたいと思います。 2011年3月11日に起きた地震情報を可視化してみた 実現したいこと LeafletをPythonで使えるようにしたPythonライブラリのfoliumを使って、震央の位置とマグニチュードを円の大きさで表現したいと思います。 使用するデータ 今回使用するデータは、気象庁が提供している震度データベース検索からダウンロードしたものを使わせていただきました。 ここから、2011年3月11日に発生した最大震度1以上をどこかしらで観測した地震を集計しています。 実際のコード # 必要なライブラリ import folium import pandas as pd import geopandas as gpd import numpy as np import datetime as dt import re # ダウンロードしたCSVを読み込む # データの整形が必要なので、もろもろ修正していく df_tmp = pd.read_csv('./earthquake.csv') # マグニチュードが不明なところは除く df_tmp = df_tmp[df_tmp['Magnitude'] != '不明'] # 使用するために空のデータフレームを作成しておく df = pd.DataFrame(index=[], columns=['datetime','lat','lon','magnitude']) # 緯度経度が六十進法で保存されるので、それを十進法に変更する関数を用意する # 緯度 def Sixty2TenOfLat(row): lat_splited = re.split('\D+', row['lat']) lat = float(lat_splited[0]) + float(lat_splited[1]) / 60 + float(lat_splited[2]) / 60 / 60 return lat # 経度 def Sixty2TenOfLon(row): lon_splited = re.split('\D+', row['lon']) lon = float(lon_splited[0]) + float(lon_splited[1]) / 60 + float(lon_splited[2]) / 60 / 60 return lon # 緯度経度を六十進法から十進法に変更して、使用するデータフレームに格納 df['lat'] = df_tmp.apply(Sixty2TenOfLat, axis=1) df['lon'] = df_tmp.apply(Sixty2TenOfLon, axis=1) # マグニチュードは文字列で保存されていたので、`float`に変更して使用するデータフレームに格納 df['magnitude'] = df_tmp['Magnitude'].map(lambda x: float(x)) # 使用する元の地図を作成 # 地図の中心は、震央の緯度経度の平均にしている map = folium.Map(location=[df['lat'].mean(), df['lon'].mean()], zoom_start=5, tiles='cartodbdark_matter') # 震央の位置に、マグニチュードの大きさを円の大きさとして描画 # 円の半径は、描画の関係で大きくしている for i, row in df.iterrows(): folium.CircleMarker( location=[row['lat'], row['lon']], radius=row['magnitude']*5, color='#0000ff', fill_color='#000055' ).add_to(map) # 地図を描画 map 実際の結果 上記のコードを実行すると、このような結果が得られました。東北ではたくさんの地震が発生していることがわかります。僕もたくさんの地震が発生したのを直で経験していたので、すごく覚えています。 さいごに 今回は気象庁のデータとLeafletをPythonで使えるようにしたPythonライブラリのfoliumを使って、2011年3月11日に起きた地震情報を可視化してみました。foliumを使うと簡単に、インタラクティブな地図を描画することができます。
- 投稿日:2022-01-31T20:18:59+09:00
StreamlitとCOTOHA APIで簡単にテキスト感情分析Webアプリを作る
StreamlitとCOTOHA APIを使って超簡単にテキスト感情分析Webアプリを作ります。 Streamlitとは StreamlitとはPythonでフロントエンドを構築できるWebフレームワークです。同じPythonのwebフレームワークであるDjango, Flask, Fast APIなどと比べるとHTTPやCSSなどを理解しなくてもWebアプリを作成することができます。 ※ただし、その分デザインの凝ったWebアプリの作成には向いてないかもしれません。 Streamlitのインストール ターミナルからpipでstreamlitをインストールして下さい。 pip install streamlit Streamlitのユーザー登録 StreamlitのHPからユーザー登録をおこなってGithubのアカウントを連動させることにより簡単にWebアプリケーションを公開することができます。 COTOHA APIとは NTTコミュニケーションズが提供する自然言語処理のAPIサービスで、構文解析、キーワード抽出、感情分析、類似度算出等を無料(※一部サービスは有料)で使うことができるサービスとなっています。 詳しい説明はこちらの記事が分かりやすいと想います。 自然言語処理を簡単に扱えると噂のCOTOHA APIをPythonで使ってみた ソースコード こちらが実際に作成したソースコードになります。 secret.jsonにはCOTOHA APIでアカウントを作成した際に受け取ったBase URL, Client id, Client secretを記述して下さい。 secret.json { "COTOHA_BASE_URL":"https://*************************", "COTOHA_ID":"********************************", "COTOHA_SECRET":"****************" } requirements.txtには使用するライブラリを記述して下さい。 requirements.txt streamlit==1.3.1 pandas あとはソースコードです。 sentiment_analyze_app.py import json import pandas as pd import requests import streamlit as st with open('secret.json') as f: secret = json.load(f) BASE_URL = secret["COTOHA_BASE_URL"] CLIENT_ID = secret["COTOHA_ID"] CLIENT_SECRET = secret["COTOHA_SECRET"] def get_cotoha_acces_token(): token_url = "https://api.ce-cotoha.com/v1/oauth/accesstokens" headers = { "Content-Type": "application/json", "charset": "UTF-8" } data = { "grantType": "client_credentials", "clientId": CLIENT_ID, "clientSecret": CLIENT_SECRET } response = requests.post(token_url, headers=headers, data=json.dumps(data)) access_token = response.json()["access_token"] return access_token def cotoha_sentiment_analyze(access_token, sentence): base_url = BASE_URL headers = { "Content-Type": "application/json", "charset": "UTF-8", "Authorization": "Bearer {}".format(access_token) } data = { "sentence": sentence, } response = requests.post(base_url + "nlp/v1/sentiment", headers=headers, data=json.dumps(data)) return response.json() st.title('テキスト感情分析アプリ') input_option =st.selectbox( '入力データの選択', ('直接入力', 'テキストファイル') ) input_data = None if input_option == '直接入力': input_data = st.text_area('こちらにテキストを入力して下さい。') else: uploaded_file = st.file_uploader('テキストファイルをアップロードして下さい。', ['txt']) if uploaded_file is not None: content = uploaded_file.read() input_data = content.decode() if input_data is not None: st.write('入力データ') st.write(input_data) if st.button('実行'): accses_token = get_cotoha_acces_token() response = cotoha_sentiment_analyze(accses_token, input_data) sentiment = response["result"]["sentiment"] score = response["result"]["score"] emotional_phrase = response["result"]["emotional_phrase"] df_emotional_phrase = pd.DataFrame(emotional_phrase) st.write(f'## 分析結果:{sentiment}') st.write(f'### スコア:{score}') st.dataframe(df_emotional_phrase) あとは、これらをGithubにpushしてStreamlitのサイトでデプロイすれば簡単にWebアプリを公開することができます。 作成したアプリ 作成したアプリの画面はこんな感じになっていて、テキストを直接入力するかテキストファイルの読み込みをさせます。 実行結果 それでは直接入力で「死は全く怖くない一番恐れるのはこの怒りがやがて風化してしまわないかということだ」と入力して実行すると という結果が返って来ました。 まとめ Streamlitでテキスト感情分析アプリを作ってみました。気軽に作れるのでちょっとしたプロトタイプの作成などにはいいかもしれません。
- 投稿日:2022-01-31T19:58:27+09:00
pythonのリストでn番目に大きな値とそのindexを取得する
はじめに pythonでリストの最大値・最小値をとってくる方法は簡単です。 しかし、n番目の値やindexをとってくる方法は、調べても出てこなかったため高速に行えるものを書きました。 これはもともとdifflibライブラリ内にあったものを参考にしました。 アルゴリズム プログラムの順番としては、 値とインデックスをセットにしたリストを作る 高速化のためn番目までソートする ソートしたリストのn番目の中身を取得 です。 ソート時、n番目までしかソートしていないので、高速です。 プログラム from heapq import nlargest, nsmallest lst = [5,2,6,2,7,5,9,5] n = 3 #この値までソートする result = [(value,i)for i,value in enumerate(lst)] max_result = nlargest(n, result) min_result = nsmallest(n, result) print("{}番目に大きな値はindex{}の{}です".format(n, max_result[n-1][1], max_result[n-1][0])) print("{}番目に小さな値はindex{}の{}です".format(n, min_result[n-1][1], min_result[n-1][0])) #3番目に大きな値はindex2の6です #3番目に小さな値はindex0の5です 応用 元のリストの中身を関数で数値化し、その大きさでソートするなら、以下のように書き換えればいいです。 今回は文字の長さで並べ替えています。 タプルのリストを作る際に、len関数を使用しています。 from heapq import nlargest, nsmallest lst = ["a", "sadfsd", "bas", "dg", "kgkgkg"] n = 2 #この値までソートする result = [(len(value),i,value)for i,value in enumerate(lst)] max_result = nlargest(n, result) min_result = nsmallest(n, result) print("{}番目に大きな値はindex{}の{}です".format(n, max_result[n-1][1], max_result[n-1][2])) print("{}番目に小さな値はindex{}の{}です".format(n, min_result[n-1][1], min_result[n-1][2])) #2番目に大きな値はindex1のsadfsdです #2番目に小さな値はindex3のdgです まとめ pythonのリストでn番目に大きな値とインデックスを取得する方法を書きました。 プログラムを勉強し始めた人でもわかる内容ですが、いざ書いてみようと思うとなかなかめんどくさくて、ライブラリというものの恩恵を改めて実感しました。 ぜひアレンジして使用してください。
- 投稿日:2022-01-31T18:07:48+09:00
SudachiPyを使ってみる
SudachiPyはワークス徳島人工知能NLP研究所が開発している形態素解析器SudachiのPython版です。 バージョン0.6.0からは実装をCythonからRustに変更し、以前のバージョンより20倍ほど実行速度が速くなっています。 導入 最初はPythonの仮想環境を作ります。 python -m venv supy-test bashやzshを使う場合は環境を有効にするために以下のようにします。 source supy-test/bin/activate Windows上のPowershellの場合はPowershell版の有効化スクリプトを使います。 .\supy-test\Scripts\Activate.ps1 ARMのMac (Apple Silicon) の場合は追加のステップが必要です。 ARMのMacの場合はバイナリーのパッケージはまだ存在していないので、ビルドのためにRustのToolchainとPythonのパッケージも必要になります。 pip install setuptools_rust インストール 準備が終わったら、SudachiPyをインストールします。 pip install sudachipy sudachidict_core 試してみる 使い方は二通りあります。 まずは、コマンドラインツールsudachipyから起動確認をします。 $ echo "吾輩は猫である" | sudachipy 吾輩 代名詞,*,*,*,*,* 我が輩 は 助詞,係助詞,*,*,*,* は 猫 名詞,普通名詞,一般,*,*,* 猫 で 助動詞,*,*,*,助動詞-ダ,連用形-一般 だ ある 動詞,非自立可能,*,*,五段-ラ行,終止形-一般 有る EOS ライブラリとしての使い方 コマンドラインでの利用は簡単な試用にはいいですが、醍醐味があるのはライブラリとしての使い方です。 基本的な使い方は以下の3つのステップです: >>> dict = sudachipy.Dictionary() # まずは辞書を作る >>> tokenizer = dict.create() # 辞書から分割器を作る >>> tokenizer.tokenize("吾輩は猫である") # 分割自体を行う <MorphemeList[ <Morpheme(吾輩, 0:2, (0, 350242))>, <Morpheme(は, 2:3, (0, 122101))>, <Morpheme(猫, 3:4, (0, 571106))>, <Morpheme(で, 4:5, (0, 101816))>, <Morpheme(ある, 5:7, (0, 12492))>, ]> 分かち書きの出力: >>> morphemes = tokenizer.tokenize("吾輩は猫である") >>> print(*[m.surface() for m in morphemes]) 吾輩 は 猫 で ある 今後、SudachiPyは解析中にGILを使わないようにする予定です。スレッドごとに別のTokenizerのインスタンスを使うようにしてください。 コンポーネントの説明 Dictionaryは辞書データを持つ部品です。辞書データはimmutableなので、基本的に複数のDictionaryを作る必要がなく、スレッド間の共有もできます。 Tokenizerは解析中に可変な状態をもつコンポーネントです。中身は可変なので、スレッド間の共有は不可能です。 MorphemeListは解析の結果で、形態素列を表す型です。追加の状態を持つ必要がありPythonのListではなく独自実装ですが、Listと同じように使えます。 Morphemeは形態素です。様々な情報を持っています。大事なのは以下のものです。 begin() \ end():入力の文字列にたいして形態素の始まりと終わりの文字(正確に言えばUnicodeのCodepoint)のオフセット surface():形態素の見出し、入力の文字列中の文字列 part_of_speech():UniDic体系の品詞 dictionary_form():活用可能な形態素の辞書形 normalized_form():正規化情報、例)あんまり→余り reading_form():形態素の読み 分割の単位 Sudachiは複数の単位に分割できます。 C:固有表現レベル(既定値) B:人間に自然なレベル A:Unidicの短単位に相当するレベル 分割のレベルの指定の仕方は3通りあります。 すべての解析を1つのレベルで行う この場合はTokenizerを作る際にDictionary.create()にmodeの引数として渡します。 >>> tokenizer = dict.create(mode=sudachipy.SplitMode.C) # 既定値はC >>> tokenizer.tokenize("関西国際空港") <MorphemeList[ <Morpheme(関西国際空港, 0:6, (0, 1564531))>, ]> >>> tokenizer = dict.create(mode=sudachipy.SplitMode.A) >>> tokenizer.tokenize("関西国際空港") <MorphemeList[ <Morpheme(関西, 0:2, (0, 735344))>, <Morpheme(国際, 2:4, (0, 365535))>, <Morpheme(空港, 4:6, (0, 602797))>, ]> この使い方は一番おすすめです。 解析ごとにレベルを変える この場合はTokenizer.tokenize()にmodeの引数として渡します。 >>> tokenizer.tokenize("関西国際空港", mode=sudachipy.SplitMode.C) <MorphemeList[ <Morpheme(関西国際空港, 0:6, (0, 1564531))>, ]> >>> tokenizer.tokenize("関西国際空港", mode=sudachipy.SplitMode.B) <MorphemeList[ <Morpheme(関西, 0:2, (0, 735344))>, <Morpheme(国際, 2:4, (0, 365535))>, <Morpheme(空港, 4:6, (0, 602797))>, ]> >>> tokenizer.tokenize("関西国際空港", mode=sudachipy.SplitMode.A) <MorphemeList[ <Morpheme(関西, 0:2, (0, 735344))>, <Morpheme(国際, 2:4, (0, 365535))>, <Morpheme(空港, 4:6, (0, 602797))>, ]> 解析ごとに一時的にレベルが変更されるので、Tokenizerのデフォルトの分割モードは変更されません。 同時に複数のレベルを使いたい場合 この場合はMorpheme.split()を使います。 >>> tokenizer = dict.create(mode=sudachipy.SplitMode.C) >>> morphemes = tokenizer.tokenize("関西国際空港") >>> morphemes <MorphemeList[ <Morpheme(関西国際空港, 0:6, (0, 1564531))>, ]> >>> morphemes[0].split(sudachipy.SplitMode.A) <MorphemeList[ <Morpheme(関西, 0:2, (0, 735344))>, <Morpheme(国際, 2:4, (0, 365535))>, <Morpheme(空港, 4:6, (0, 602797))>, ]> 良いSudachi Lifeを。
- 投稿日:2022-01-31T17:53:49+09:00
pip-compileでAssertionErrorを回避する(2022-01-31時点での一時しのぎ)
pip-compileでAssertionErrorを回避する(2022-01-31時点での一時しのぎ) 背景 kedro使っていて、kedro build-reqsでライブラリの依存関係を解決しようとするとエラーが出た。 結構ハマったのでメモ kedro build-reqsは内部的にpip compile(pip-tools)を使っているらしい。 調べてみると、pipのバグのようだった。 問題 2022-01-31現在、以下のようにpip-compileしようとすると、AssertionErrorが発生する模様。 $ pip-compile src/requirements.in 既知のバグらしく、現在対応中とのこと https://github.com/jazzband/pip-tools/pull/1559 解決策 pipのバージョンを21系にダウングレードすれば大丈夫 $ pip install pip==21.3.1
- 投稿日:2022-01-31T17:49:57+09:00
ココだけの話だけどF1-Scoreは少数ラベル評価指標ではない。
先に結論 F1-Scoreは「少数ラベル評価」ではなく「Positiveサンプル評価」であり、positiveラベルが少数である時のみ、少数ラベルを評価できる。 negativeラベルが少数であるときや、positiveとnegativeのどちらが少数か不明な場合では期待する動きをしない可能性があるので気を付けましょう コードで見てみよう sample.py from sklearn.metrics import f1_score, accuracy_score import numpy as np # 990件のnegativeと10件のpositive y_true = np.zeros(1000) y_true[:10] = 1 y_pred = np.zeros(1000) y_pred[5:15] = 1 print(accuracy_score(y_true, y_pred)) # 0.99 print(f1_score(y_true, y_pred)) # 0.5 # 990件のpositiveと10件のnegative y_true = np.ones(1000) y_true[:10] = 0 y_pred = np.ones(1000) y_pred[5:15] = 0 print(accuracy_score(y_true, y_pred)) # 0.99 print(f1_score(y_true, y_pred)) # 0.9949494949494949 見てわかる通り、positiveが多数ラベルの際にはF1はほぼAccuracyと同じ値になる。 簡単な理由 F1スコアは以下の式で計算される。 \begin{align} F1 = \frac{2 \times Recall \times Precision}{Recall + Precision} \\ Recall = \frac{TP}{TP + FN} \\ Precision = \frac{TP}{TP + FP} \end{align} 直観的に行くため、ここでTPやFNにどういった数字が入っているかを考える。 pos(GT) neg(GT) pos(予測) TP FP neg(予測) FN TN ここで、positive >> negativeにおいてはおよそTP >> FN, FP > TNとなる。まずpos(GT)が圧倒的に大きいのでTP+FN(positiveサンプルの総量)が大きな値に。不均衡的にpositiveサンプルの多くはTPに。一部がFNになる。また、pos(予測)のうちでも多くがTPになるが、一部がFPになる。そうなると、RecallやPrecisionは Recall = \frac{1}{1 + \frac{FN}{TP}} , (TP >> FNとすると\frac{FN}{TP}⇒0.000に近くなるため) \\ =\frac{1}{1+0.000} = 0.999... 結局RecallやPrecisionはTPがどれだけできているかを見ているので、positiveサンプルがどれだけ正解できているかを見ているのであって、少数ラベルの出来具合をみているわけではないんですよね。 もちろん、positiveサンプルが少数の場合は、少数ラベルの評価指標にもなりえますが、そこをすっ飛ばしてとりあえずラベルの偏りがあるからF1scoreみたいにするのは良くないと思います。では
- 投稿日:2022-01-31T17:00:44+09:00
multiprocessingで並列処理 【備忘録】
Poolオブジェクトによる基本的なプロセス管理と、Pipeオブジェクトを使った返り値処理を記載する。 multiprocessing Documentation(accessed: 2022/01/31) 実行環境 Mac OS Monterey 12.1 Python 3.9.2 iMac (Retina 5K, 27-inch, 2017) プロセッサ 3.4 GHz クアッドコアIntel Core i5 Pool Object 引数 processesに整数を与えるとその数のプロセスを生成して並列処理を行う。何も与えないと自動的にos.cpu_count()が返す値を使ってプロセスを生成する。 imap_unorderd(func, iterable[, chunksize]) プロセスにタスクを渡す方法は複数あるが、タスクの処理順を気にしない場合はimap_unordered()が最も速い。 funcに処理したい関数を渡し、iterableにその引数を渡す。chunksizeはタスクの分割サイズを決めるものだがunorderdの場合処理が終わったプロセスから順番にタスクが振られるので気にする必要はない。 pool = Pool() pool.imap_unordered(api_task, params) # これだけで並列処理してくれる。 # tqdm(プログレスバー)を組み合わせたバージョン with tqdm(total=len(params)) as t: for _ in pool.imap_unordered(api_task, params): t.update(1) Pipe Object 他プロセスからの返り値を受け取るために使う。 片方の端で読み書きを同時に行うとPipe内データが破損する恐れがある。 引数にduplexを取り、 duplex=Trueで双方向性,Falseで一方向性のPipeを生成できる。 データの破損を避けるためにFalseで運用するのが無難。 from multiprocessing import Process, Pipe def double(x, sender): # 並列処理に渡す関数 sleep(3) y = x*x sender.send(y) return None nums = range(5) process_list = [] connections = [] # Pipe for name in names: receiver, sender = Pipe(False) # 受信側、送信側の順で生成される p = Process(target=double, args=(nums, sender)) connections.append(receiver) process_list.append(p) p.start() # プロセスの処理を開始する。 for process in process_list: # タスクの終了待ち処理 この間プロセスはロックされる。 process.join(1) # 1秒間のタイムアウトをとる タスクひとつひとつが十分に高速に処理できる場合いらないかもしれない。(要動作確認) results = [conn.recv() for conn in connections] print(results) for process in process_list: process.terminate() # プロセスを終了させる 終わりに プロセスの学問的理解をしたいと考えながら既に数年経ちました。 参考文献に載せたCPUのコアとスレッド、プロセスの関係性の記事がわかりやすかったです。 ハードウェアの勉強とソフトウェアの勉強は交互にすべきですね。 参考文献 multiprocessing の基本(accessed 2022/01/31) 【図解】CPUのコアとスレッドとプロセスの違い(accessed 2022/01/31)
- 投稿日:2022-01-31T16:57:18+09:00
urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failedの対処法
問題 PythonでWebスクレイピングをする時に import urllib.request response=urllib.request.urlopen('https://yahoo.co.jp') しようとしたら, ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1056) と出力されてしまった. 解決方法 /Applications/Python\ 3.7/Install\ Certificates.command とコマンドを叩くと証明書の検証がうまくいった.
- 投稿日:2022-01-31T16:56:17+09:00
Google Colaboratoryでりんなの13億パラメータ版GPT-2(japanese-gpt-1b)を試してみる
13億パラメータ版GPT-2で生成した文章: GPT-2のような文章生成モデルは、読み手が熟達することで使いやすくなることが期待されている。日本語では「文章が上手い」という表現があるが、この熟達性とは一定のレベルをクリアしなければならないというもので、また一定レベルを満たしていればその文章を読んだだけで「うまい文章」だと思えるというものでもない。文章を書くこと自体はその場の思いつきや書き手と読み手の相性などによって変容する。 みなさんこんにちは。 Transformerの登場以降、目覚ましい発展を遂げている自然言語処理界隈ですが、最近ビッグニュースが飛び込んできましたね。rinna株式会社さんが、これまでよりも大規模なGPT-2の事前学習モデルを公開してくださいました。冒頭の引用文は私が実際にGPT-2で生成した文章です。これ、すごいですよね。 パラメータ数は約13億。とんでもない数ですね・・。 事前学習モデルのバイナリサイズは2.5GBほどになるようです。 「なんだかすごい気がするし、実際試してみたいけど・・・難しいんでしょう?」 という方も多くいらっしゃると思い、簡単に試せる方法をここに記しておこうと思います。 大きな流れ 今回は「試す」ことを目的に、環境構築には時間をかけずサクッといきたいと思います。 このため、Google Colaboratory での動作を前提とします。 やることはたったの5ステップです。 Google Colaboratryを開く Transformersをインストールする Sentence Pieceをインストールする GPT-2の事前学習モデルをロードする 文章を生成する ちょっとした説明とサンプルコード 1. Google Colaboratryを開く Google ColaboratryはGoogleが提供する、WEBブラウザ上で使用できるPython実行環境です。 GPUを使うと文章の生成が速く終わるので、「ランタイムのタイプを変更」からGPUを選択しておきます。 なお、これ以降の手順でコードをコピペするのが面倒という方は↓のボタンからどうぞ。 2. Transformersをインストールする Google Colaboratory の環境を立ち上げた最初の状態ではTransformersがインストールされていません。このため、pipコマンドを使ってライブラリのインストールを行います。 # Transformersのインストール !pip install transformers 3. Sentence Pieceをインストールする rinna株式会社さんが公開しているGPT-2の事前学習モデルは、トークナイズにSentencePieceを使っています。こちらも、pipコマンドを使ってライブラリのインストールを行います。 # SentencePieceのインストール !pip install sentencepiece 4. GPT-2の事前学習モデルをロードする さきほどインストールしたTransformersのモジュールを使って事前学習済みモデルをダウンロードし読み込みます。こちらは少し時間がかかります。最後の2行はGPUを使って文章生成をするための設定です。GPUを使うと文章の生成が速くなります。 # ライブラリのインポート import torch from transformers import T5Tokenizer, AutoModelForCausalLM # トークナイザの読み込み tokenizer = T5Tokenizer.from_pretrained("rinna/japanese-gpt-1b") # モデル本体の読み込み model = AutoModelForCausalLM.from_pretrained("rinna/japanese-gpt-1b") # GPU使用時の設定 if torch.cuda.is_available(): model = model.to("cuda") 5. 文章を生成する さて、いよいよ文章を生成してみます。 こちらはHuggingFaceにあるサンプルコードを使わさせて頂きました。 「GPT-2のような文章生成モデルは、」という文からスタートして文章を作成します。 # 入力文 text = "GPT-2のような文章生成モデルは、" # 入力文のトークナイズ token_ids = tokenizer.encode(text, add_special_tokens=False, return_tensors="pt") # 入力文とパラメータの設定にしたがって文章を生成 with torch.no_grad(): output_ids = model.generate( token_ids.to(model.device), max_length=100, min_length=100, do_sample=True, top_k=500, top_p=0.95, pad_token_id=tokenizer.pad_token_id, bos_token_id=tokenizer.bos_token_id, eos_token_id=tokenizer.eos_token_id, bad_word_ids=[[tokenizer.unk_token_id]] ) # 生成された文章を単語IDから実際の単語に変換 output = tokenizer.decode(output_ids.tolist()[0]) print(output) 生成された文章: GPT-2のような文章生成モデルは、読み手が熟達することで使いやすくなることが期待されている。日本語では「文章が上手い」という表現があるが、この熟達性とは一定のレベルをクリアしなければならないというもので、また一定レベルを満たしていればその文章を読んだだけで「うまい文章」だと思えるというものでもない。文章を書くこと自体はその場の思いつきや書き手と読み手の相性などによって変容する。しかし「うまい文章」は実際に精読することでしか評価できないのであり、この精読をすることによってようやく うーん、なるほど・・・ 文章生成モデルの使い勝手は読み手にかかっているんですね。なんだか、意思を持ってるんじゃないかってくらい自然な文章が出てきて怖いくらいですね・・・ おわりに 今回は事前学習済みのモデルをそのまま使用して文章を生成してみました。BERTやGPT-2のようなモデルは、さらに少量の訓練データを用意しファインチューニング(微調整)をすることで、より多様なタスクに適応させることができます。ファインチューニングにはそれほど計算リソースを必要としませんが、事前学習においては莫大なリソースを要します。これほど大規模な日本語版事前学習モデルを公開してくださったrinna株式会社さんには、感謝の念に堪えません。
- 投稿日:2022-01-31T16:40:37+09:00
Selenium経由でSlackのDMを取得・送信・スクショする
ブラウザの操作を自動化するライブラリ「Selenium」を使ってSlackへ自動でログインし、特定のメンバーのDM欄からテキストを自動で取得・送信したり、スクリーンショットを保存したりするプログラムの一例です。 先日の投稿の上位互換プログラムです。 auto-dm.py # (Selenium 4.1.0 で動作確認済み) # webdriver は使用しているChromeのバージョンに合わせて以下のページから取得する。 # https://chromedriver.chromium.org/downloads from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.chrome import service as fs import time import sys import os # パラメータ slackurl = 'sample.slack.com' # Slack URL mailaddress = 'sample@gmail.com' # 登録済みメールアドレス mypassword = 'abrakadabra' # ログインパスワード waittime = 10.0 # (sec.) 待機時間 (通信速度 & PCスペックに応じて調整) logintime = 10.0 # (sec.) ログインを維持する時間 (適宜調整) driverpath = 'C:/Users/[user名]/Downloads/chromedriver_win32/chromedriver.exe' # webdriver が置かれている path を指定 workpath = 'C:/Users/[user名]/Downloads/' # 取得した画像などの保存先を指定 url = 'https://app.slack.com/client/XXXYYYZZZ/AAABBBCCC' # 目的地となる Slack WorkSpace のURL recipient = '山田' # 相手(DMの受信者)のアカウント名 mymessage = 'Yes My Darkness!' # DMの内容 order = 1 # 何番目に最新のメッセージを取得するか dm_mode = 0 # 0 -> 何もしない // 1 -> メッセージを送信する ss_mode = 1 # 0 -> 何もしない // 1 -> 最新のメッセージのスクリーンショットを保存 // 2 -> DM欄全体のスクリーンショットを保存 # フォルダの存在確認 if os.path.exists(workpath) == False: print('% Error: The "workpath" is invalid...') sys.exit() # webdriverの存在確認 if os.path.exists(driverpath) == False: print('% Error: The "driverpath" is invalid...') sys.exit() try: # 自動ログイン try: # Webページにアクセス chrome_service = fs.Service(executable_path=driverpath) driver = webdriver.Chrome(service=chrome_service) driver.maximize_window() #全画面表示にする (オプション) driver.get(url) driver.implicitly_wait(waittime) # 遷移待ち # Slack WorkSpaceの選択 ch_element = driver.find_element(By.ID, 'domain') ch_element.send_keys(slackurl) ch_element.send_keys(Keys.ENTER) driver.implicitly_wait(waittime) # 遷移待ち # email, password の自動入力 mail_element = driver.find_element(By.ID, 'email') mail_element.send_keys(mailaddress) driver.implicitly_wait(waittime) # 待機 (オプション) password_element = driver.find_element(By.ID, 'password') password_element.send_keys(mypassword) password_element.send_keys(Keys.ENTER) driver.implicitly_wait(waittime) # 遷移待ち (少し長め) # 自動ログインに成功 print('% Successfully logged in:') url = driver.current_url # URLを取得する (オプション) title = driver.title # タイトルを取得する (オプション) print("% URL :", url, ) print("% title :", title) except: # ログインに失敗した場合はセッションを終了する driver.quit() print("% Error: Your login has been failed...") sys.exit() # 送信先(recipient)のDMを開く recipient_list = driver.find_elements(By.CSS_SELECTOR, '[data-qa="channel_sidebar_name_' + recipient + '"]') if len(recipient_list) > 0: # DMリストに名前が表示されている場合 print("% DM is showing in the current list.") driver.find_element(By.CSS_SELECTOR, '[data-qa="channel_sidebar_name_' + recipient + '"]').click() driver.implicitly_wait(waittime) # 遷移待ち else: # DMリストに名前が表示されていない場合 print("% DM is NOT showing in the current list.") searchbox_element = driver.find_element(By.CSS_SELECTOR, '[class="c-button-unstyled p-top_nav__search p-top_nav__search--windows-linux"]') searchbox_element.send_keys("@" + recipient) driver.implicitly_wait(waittime) # 待機 (オプション) driver.find_element(By.CSS_SELECTOR, '[class="c-truncate c-truncate--break_words"]').click() driver.implicitly_wait(waittime) # 待機 (オプション) # DMの要素をリストとして取得 message_element_list = driver.find_elements(By.CSS_SELECTOR, '[class="c-message_kit__gutter"]') # メッセージの要素を特定 messagenum = -1*order target_massage = message_element_list[messagenum] # 最新のメッセージをテキストで取得 print("% sender :", target_massage.find_element(By.CSS_SELECTOR, '[class="c-message__sender c-message_kit__sender"]').text) # 最新のメッセージの送信者 print("% timestamp :", target_massage.find_element(By.CSS_SELECTOR, '[class="c-timestamp__label"]').text) # 最新のメッセージの送信時刻 if target_massage.find_elements(By.CSS_SELECTOR, '[class="p-rich_text_section"]'): print("% message :", target_massage.find_element(By.CSS_SELECTOR, '[class="p-rich_text_section"]').text, sep="\n") # 最新のメッセージの本文 else: print("% (There is no text in the message.)") # テキストを含まないメッセージの場合 ss_mode = 1 # スクリーンショットの保存を強制する (オプション) # メッセージを送る (オプション) if dm_mode == 1: post_element = driver.find_element(By.CSS_SELECTOR, '[class="ql-editor ql-blank"]') post_element.click() driver.implicitly_wait(waittime) # 待機 (オプション) post_element.send_keys(mymessage) # メッセージ入力 post_element.send_keys(Keys.ENTER) driver.implicitly_wait(waittime) # 待機 (オプション) driver.find_element(By.CSS_SELECTOR, '[class="c-icon c-icon--paperplane-filled"]').click() print("% Your message was successfully sent.") # DMの画像を取得する (オプション) if ss_mode == 1: time.sleep(waittime) # 強制的な待機 (オプション) png = message_element_list[messagenum].screenshot_as_png # 最新のメッセージのみをpngで取得 with open(workpath + 'image_from_slack.png', 'wb') as f: f.write(png) # 画像の書き出し print("% The image of the latest message has been saved successfully.") elif ss_mode == 2: time.sleep(waittime) # 強制的な待機 (オプション) dmfield_element = driver.find_elements(By.CSS_SELECTOR, '[class="p-workspace__primary_view"]') # DM欄の要素を特定 png = dmfield_element[messagenum].screenshot_as_png #DM欄をpngで取得 with open(workpath + 'image_from_slack.png', 'wb') as f: f.write(png) # 画像の書き出し print("% The image of the messages being displayed has been saved successfully.") time.sleep(logintime) # 任意時間ログイン状態を維持 finally: driver.quit() # セッションの終了 print("% Done.") sys.exit() セキュリティの問題が生じない範囲で適宜改変してご利用下さい。 Slack内のクラス名などは予告なく変更されることがありますので、継続的に使用する場合は適宜保守して下さい。
- 投稿日:2022-01-31T16:02:40+09:00
PythonでFXチャートを表示してみる
PythonでFXチャートを表示するのをGoogle Colabでやってみます。 https://colab.research.google.com Google ColabでTA-libを使う方法 https://laid-back-scientist.com/talib このページではyahoo_finance_api2で株価を取得していますが、アキシオリーのヒストリカルデータをダウンロードしてきます。 https://www.axiory.com/jp/how-to-install/historical-data import talib import pandas as pd import mplfinance as mpf from google.colab import drive # Googleの株価を取得 # my_share = share.Share('GOOG') # ohlcv = my_share.get_historical( # share.PERIOD_TYPE_YEAR, 1, # share.FREQUENCY_TYPE_DAY, 1) #df = pd.DataFrame(ohlcv) # unix時間をtimestampに変換 #df['timestamp'] = #pd.to_datetime(df['timestamp'].astype(int), unit='ms') drive.mount('/content/drive') df = pd.read_csv(filepath_or_buffer='drive/My Drive/USDJPY_2021_all.csv', names = ['date','time','open','high','low','close','volume']) df['timestamp'] = pd.to_datetime(df['date'] +" "+ df['time'], format='%Y.%m.%d %H:%M') df.drop({'date','time'},axis = 1,inplace = True) df.set_index("timestamp", inplace=True) df
- 投稿日:2022-01-31T15:56:45+09:00
第3回 株価予測_LSTM ver.2
初めに 全部で4本の記事に分けて書いています! 第1回 株価予測_可視化編 第2回 株価予測_LSTM ver.1 第3回 株価予測_LSTM ver.2 第4回 株価予測_アプリ化編 実装コード 前回の記事ではLSTMをkerasで実装して株価を予測しました!時系列データの予測にはRNNかLSTMが主流になってきていますからね。今回はPytorchを使って意思決定のためのデータ分析を前回と同様にLSTMで行いたいと思います。 このシリーズで行っている株価のチャート分析は意思決定のためのテクニカル指標に過ぎません。なので、予測に絶対は無いことと、株やFXで損する・得することの判断は自己責任でお願いします。ちなみに私は過去にFX(レバレッジ20倍)でテクニカル指標を過信し過ぎて。。。。なので皆さんも実際に取引する時はファンダメンタルズ分析もしっかり取り入れつつ、楽しく取引しましょう!! 今回の章立て 今回の趣旨 前処理 予測実装 結果 最後に 参考文献 今回の趣旨 以前私は、IWSMAI 2020で発表されたStock Market Prediction Using LSTM Recurrent Neural Networkを読みました。率直な感想としては、やはり現代でも株価予測に関するゴールデンスタンダードが無いから、株価を予測することは難しい。ということです。株価は不規則に動くのでシンプルなモデル(単回帰など)ではやはり予測が困難です。最近だとAIの株価予測などのサービスも出てきていますが、やはり確実に!とは言えないでしょう(そもそも論、予測に確実なんてない☜)。さて、私は前回(第2回 株価予測_LSTM ver.1)も使用していますが、今回もLSTMによる予測モデルを構築していきたいと思います。 何がしたいのか? 株価予測をするときにはさまざまなコンセプトがあると思います。以下では、なぜ今回「LSTM ver.2」を作成したのかについて 「第2回 株価予測_LSTM ver.1」 「論文:Stock Market Prediction Using LSTM Recurrent Neural Network」 と比較しながら示しています。 第2回 株価予測_LSTM ver.1 前回のモデルでは月曜日から木曜日までのデータを使用して金曜日の終値が上がるか下がるかの2クラス分類をLSTMで行いました。このとき、精度の評価指標としてはaccuracyを用いました。私みたいに短期的に利益を上げたい場合は明日の株価が気になるので2値分類は効果を発揮するでしょう。☜精度はいまいちでしたが。。。 論文:Stock Market Prediction Using LSTM Recurrent Neural Network この論文では、LSTMを用いて株価を正確に予測するためにはどのくらい学習をすれば良いのか?について調べています。 モデル構造はこんな感じで、LSTM4層で、途中にDropoutを挟んでいるという具合です。 Stock Market Prediction Using LSTM Recurrent Neural Network データはNew York Stock ExchangeからGoogle(2004/8/19~2019/12/19)とNKE(2010/1/4~2019/12/19)のデータを取得して、学習データ80%、テストデー20%に分割して予測しています。 NKEのデータに関しては1980年からあるのですが、今回は使用していません(下図参照)。なぜなら、図からもわかるように1980年から2010年頃まで株価の変動率が低いためです。30年分の学習にオーバーフィッティングしていることによって、その後の大幅な株価の変動に予測が追いつけなくなっているわけです。 Stock Market Prediction Using LSTM Recurrent Neural Network なので私も、どのようなデータを使用して株価予測をしようかと悩んだときに、過学習が起きないようなデータを収集することが重要だと考え、Zホールディングス(株)様からデータを取得しました。 前回の記事より図を参照 また、この論文ではEpochsが多くなるほどMSEが小さくなりました。という結論を出しています。 Stock Market Prediction Using LSTM Recurrent Neural Network しかし、私としてはMSEではなく株価の値段も知りたいので、今回は前述を踏まえてLSTMのモデルを構築していきました。 今回の構築したLSTMモデルの特徴 使用するのはCloseデータのみです。今回は「データの期間:2005/01/01~2022/01/01」の25MAを利用して、「データの期間:2021/01/01~2022/01/01」の株価の予測を行いたいと思います。評価はMAEにしたので何円の誤差になるのかわかると思います。 前処理 今回もGoogle Colabで実装しています。 初めにライブラリのインポートをします。 # モジュールのインポート import numpy as np import pandas as pd import pandas_datareader.data as data from matplotlib import pyplot as plt %matplotlib inline # 標準化関数(StandardScaler)をインポート from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_absolute_error import torch from torch.utils.data import TensorDataset, DataLoader from torch import nn import torch.nn.functional as F from torch import optim 前回と同様にデータを取得して、csvに保存していきます。 start = '2005-01-01' end = '2022-01-01' df = data.DataReader('4689.JP', 'stooq', start, end) df.to_csv('Z_Holdings2.csv') データの確認をします。 df = pd.read_csv('Z_Holdings2.csv') df カラムのデータ型の確認、欠損値の確認を行い、データを確認したら、Dateデータがobject型なので、datatime64型へ変更します。 df['Date'] = pd.to_datetime(df['Date']) df.info() 始値(open)、安値(low)、高値(high)、出来高(volume)を消して、終値(close)のみを残します。 df = df.drop(['Open', 'Low', 'High', 'Volume'], axis=1) 次にデータの並び替えを行います。 df.sort_values(by='Date', ascending=True, inplace=True) 終値の25日移動平均(25MA)を算出してみます。 df['25MA'] = df['Close'].rolling(window=25, min_periods=0).mean() print(df.head()) 終値と25日移動平均を見てみましょう。 plt.figure() plt.title('Z_Holdings') plt.xlabel('Date') plt.ylabel('Stock Price') plt.plot(df['Date'], df['Close'], color='black', linestyle='-', label='close') plt.plot(df['Date'], df['25MA'], color='red', linestyle='--', label='25MA') plt.legend() # 凡例 plt.savefig('Z_Holdings.png') # 図の保存 plt.show() 問題ないようなので、入力する25MAを平均値が0、標準偏差が1になるように標準化を行います。 ma = df['25MA'].values.reshape(-1, 1) scaler = StandardScaler() ma_std = scaler.fit_transform(ma) print("ma: {}".format(ma)) print("ma_std: {}".format(ma_std)) 予測実装 いよいよ予測を行います。まず、現在から過去25日分の株価の移動平均を入力値として、1日後の株価の移動平均を予測するためのデータセットを作成します。このとき、効率的な処理をするためにN-dimensional arrayに変換します。 data = [] # 入力データ(過去25日分の移動平均) label = [] # 出力データ(1日後の移動平均) for i in range(len(ma_std) - 25): data.append(ma_std[i:i + 25]) label.append(ma_std[i + 25]) # ndarrayに変換 data = np.array(data) label = np.array(label) print("data size: {}".format(data.shape)) print("label size: {}".format(label.shape)) 今回の株価データを訓練データとテストデータに分けます。 test_len = int(252) # 1年分(252日分) train_len = int(data.shape[0] - test_len) # 訓練データ train_data = data[:train_len] train_label = label[:train_len] # テストデータ test_data = data[train_len:] test_label = label[train_len:] # データの形状を確認 print("train_data size: {}".format(train_data.shape)) print("test_data size: {}".format(test_data.shape)) print("train_label size: {}".format(train_label.shape)) print("test_label size: {}".format(test_label.shape)) 次に、ndarrayをPyTorchのTensorに変換します。 train_x = torch.Tensor(train_data) test_x = torch.Tensor(test_data) train_y = torch.Tensor(train_label) test_y = torch.Tensor(test_label) 最後にTensorDatasetで特徴量とラベルを結合したデータセットを作成します。 train_dataset = TensorDataset(train_x, train_y) test_dataset = TensorDataset(test_x, test_y) DataLoaderを使用して、データセットを128個のミニバッチに分割します。 train_batch = DataLoader( dataset=train_dataset, # データセットの指定 batch_size=128, # バッチサイズの指定 shuffle=True, # シャッフルするかどうかの指定 num_workers=2) # コアの数 test_batch = DataLoader( dataset=test_dataset, batch_size=128, shuffle=False, num_workers=2) # ミニバッチデータセットの確認 for data, label in train_batch: print("batch data size: {}".format(data.size())) # バッチの入力データサイズ print("batch label size: {}".format(label.size())) # バッチのラベルサイズ break 次に、株価を予測するためのニューラルネットワークを定義していきます。 LSTMの実装に関して、一部はゼロから作るDeep Learning ❷ ―自然言語処理編を参照しています。 モデル構造 1層のLSTMと、1層の全結合層で構成しています。 class Net(nn.Module): def __init__(self, D_in, H, D_out): super(Net, self).__init__() self.lstm = nn.LSTM(D_in, H, batch_first=True, num_layers=1) self.linear = nn.Linear(H, D_out) def forward(self, x): output, (hidden, cell) = self.lstm(x) output = self.linear(output[:, -1, :]) return output 次にハイパーパラメータの定義をします。 D_in = 1 # 入力次元: 1 H = 200 # 隠れ層次元: 200 D_out = 1 # 出力次元: 1 epoch = 100 # 学習回数: 100 今回の実装でCPUとGPUどちらを使うかを指定します。 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') net = Net(D_in, H, D_out).to(device) print("Device: {}".format(device)) Device: cpu 損失関数(平均二乗誤差: MSE)の定義と最適化関数(Adam)の定義を行います。 criterion = nn.MSELoss() optimizer = optim.Adam(net.parameters()) 学習・評価損失を保存するリストを作成します。そして学習を実行します。 train_loss_list = [] # 学習損失 test_loss_list = [] # 評価損失 # 学習(エポック)の実行 for i in range(epoch): # エポックの進行状況を表示 print('---------------------------------------------') print("Epoch: {}/{}".format(i+1, epoch)) # 損失の初期化 train_loss = 0 # 学習損失 test_loss = 0 # 評価損失 # ---------学習パート--------- # # ニューラルネットワークを学習モードに設定 net.train() # ミニバッチごとにデータをロードし学習 for data, label in train_batch: # GPUにTensorを転送 data = data.to(device) label = label.to(device) # 勾配を初期化 optimizer.zero_grad() # データを入力して予測値を計算(順伝播) y_pred = net(data) # 損失(誤差)を計算 loss = criterion(y_pred, label) # 勾配の計算(逆伝搬) loss.backward() # パラメータ(重み)の更新 optimizer.step() # ミニバッチごとの損失を蓄積 train_loss += loss.item() # ミニバッチの平均の損失を計算 batch_train_loss = train_loss / len(train_batch) # ---------学習パートはここまで--------- # # ---------評価パート--------- # # ニューラルネットワークを評価モードに設定 net.eval() # 評価時の計算で自動微分機能をオフにする with torch.no_grad(): for data, label in test_batch: # GPUにTensorを転送 data = data.to(device) label = label.to(device) # データを入力して予測値を計算(順伝播) y_pred = net(data) # 損失(誤差)を計算 loss = criterion(y_pred, label) # ミニバッチごとの損失を蓄積 test_loss += loss.item() # ミニバッチの平均の損失を計算 batch_test_loss = test_loss / len(test_batch) # ---------評価パートはここまで--------- # # エポックごとに損失を表示 print("Train_Loss: {:.2E} Test_Loss: {:.2E}".format( batch_train_loss, batch_test_loss)) # 損失をリスト化して保存 train_loss_list.append(batch_train_loss) test_loss_list.append(batch_test_loss) このとき、出力された値は標準化された株価の平均二乗誤差ですが、epochsを重ねるごとに損失は減少しています。学習データとテストデータに対するepochsごとの損失をプロットします。 # 損失 plt.figure() plt.title('Train and Test Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.plot(range(1, epoch+1), train_loss_list, color='blue', linestyle='-', label='Train_Loss') plt.plot(range(1, epoch+1), test_loss_list, color='red', linestyle='--', label='Test_Loss') plt.legend() # 凡例 plt.show() # 表示 学習開始後すぐに損失の値が大幅に下がっていることがわかります。 学習していない最新の株価データ一年分を予測していきます。まず、ニューラルネットワークを評価モードに設定して株価の予測値と正解値を取得します。view(-1).tolist()で取得した予測値と正解値をTensorを1次元listに変換します。 net.eval() # 推定時の計算で自動微分機能をオフにする with torch.no_grad(): # 初期化 pred_ma = [] true_ma = [] for data, label in test_batch: # GPUにTensorを転送 data = data.to(device) label = label.to(device) # 予測値を計算:順伝播 y_pred = net(data) pred_ma.append(y_pred.view(-1).tolist()) true_ma.append(label.view(-1).tolist()) 次に取得したlistが入れ子構造なので、Tensor数値データを取り出してlistを1次元配列にします。さらに今回は実際の株価の価格を知りたいので、標準化を解除して元の株価に変換します。 pred_ma = [elem for lst in pred_ma for elem in lst] true_ma = [elem for lst in true_ma for elem in lst] pred_ma = scaler.inverse_transform(pred_ma) true_ma = scaler.inverse_transform(true_ma) 結果 平均絶対誤差を計算します。 mae = mean_absolute_error(true_ma, pred_ma) print("MAE: {:.3f}".format(mae)) MAE: 2.043 平均絶対誤差は約2ドルでした!これは、一年間の株価予測の予測値と正解値の結果の差が約2ドルということになります。最後に終値と25日移動平均を図示して結果を見てみましょう! date = df['Date'][-1*test_len:] # テストデータの日付 test_close = df['Close'][-1*test_len:].values.reshape(-1) # テストデータの終値 plt.figure() plt.title('YHOO Stock Price Prediction') plt.xlabel('Date') plt.ylabel('Stock Price') plt.plot(date, test_close, color='black', linestyle='-', label='close') plt.plot(date, true_ma, color='dodgerblue', linestyle='--', label='true_25MA') plt.plot(date, pred_ma, color='red', linestyle=':', label='predicted_25MA') plt.legend() # 凡例 plt.xticks(rotation=30) plt.show() 最後に 今回は過去数年分の株価データを用いることで、2021/1/1~2022/1/1までの株価を予測しました!結果としては一年間の株価予測の予測値と正解値の結果の差が約2ドルなので悪くはないと。。。個人的には思いますがどうでしょうか?今回作成したモデルのアーキテクチャや、前処理、データ取得時のデータの剪定で精度をより高めることができると思います。なのでこれからも時系列データ分析を勉強してもっと精度を高められるように頑張ります!(^○^) 参考文献 以下の書籍には大変お世話になりました、著者の皆様大変ありがとうございました! 最短コースでわかる ディープラーニングの数学 最短コースでわかる PyTorch &深層学習プログラミング 動かしながら学ぶ PyTorchプログラミング入門 FXデイトレード・スイングトレード 東大院生が考えたスマートフォンFX ザ・トレーディング──心理分析・トレード戦略・リスク管理・記録管理 ガチ速FX 27分で256万を稼いだ“鬼デイトレ" デイトレード 株とPython─自作プログラムでお金儲けを目指す本 Pythonで将来予測
- 投稿日:2022-01-31T15:46:23+09:00
Pythonで収集したデータをInfluxDBに格納してGrafanaで可視化する
はじめに 前回の記事ではPythonを使ってModbus TCP通信でデータを収集する方法を紹介しました。 今回は収集したデータをエッジデバイスにインストールしたInfluxDBに格納し、Grafanaで可視化する方法を紹介します。 エッジデバイスのみで手軽に装置のデータを可視化することができます。 動作確認済みデバイス 動作確認済デバイス(OS) e-RT3 Plus F3RP70-2L1(Ubuntu 18.04 32bit) Raspberry Pi 4 Model B (Ubuntu Server 20.04 32bit) これらのデバイスでは armhf アーキテクチャのパッケージが動作します。 構成 以下の構成でデータの収集~モニタリングを行います。 データ収集 Modbus TCP通信を行ってエッジデバイスが装置からデータを収集します。 データ蓄積 収集したデータをInfluxDBに蓄積します。 可視化 InfluxDBに蓄積したデータをGrafanaを使って可視化します。 モニタリング PCでエッジデバイスのGrafanaにアクセスして結果をモニタリングします。 準備 Grafanaのインストール エッジデバイスにGrafanaをインストールします2。 必要なパッケージをインストールします。 sudo apt install -y apt-transport-https sudo apt install -y software-properties-common wget wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add - リポジトリを追加してGrafanaをインストールします。 echo "deb https://packages.grafana.com/oss/deb stable main" | sudo tee -a /etc/apt/sources.list.d/grafana.list sudo apt update sudo apt install grafana サービスを起動して状態を確認します。 sudo systemctl daemon-reload sudo systemctl start grafana-server sudo systemctl status grafana-server 以下のような表示があれば成功です。 username@ubuntu:~$ sudo systemctl status grafana-server * grafana-server.service - Grafana instance Loaded: loaded (/usr/lib/systemd/system/grafana-server.service; disabled; ven Active: active (running) since Fri 2021-12-24 07:38:20 UTC; 14s ago Docs: http://docs.grafana.org Main PID: 2756 (grafana-server) Tasks: 12 (limit: 2366) CGroup: /system.slice/grafana-server.service `-2756 /usr/sbin/grafana-server --config=/etc/grafana/grafana.ini --p Dec 24 07:38:32 ubuntu grafana-server[2756]: t=2021-12-24T07:38:32+0000 lvl=info Dec 24 07:38:32 ubuntu grafana-server[2756]: t=2021-12-24T07:38:32+0000 lvl=info ... Note エッジデバイスがproxy環境下にある場合はproxy設定が必要です。 InfluxDBのインストール エッジデバイスにInfluxDBをインストールします。 PythonからInfluxDBを操作するためのパッケージをインストールします。 python3 -m pip install influxdb InfluxDBをインストールします3。 リポジトリを追加して、インストールを行います。 curl -s https://repos.influxdata.com/influxdb.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdb.gpg export DISTRIB_ID=$(lsb_release -si); export DISTRIB_CODENAME=$(lsb_release -sc) echo "deb [signed-by=/etc/apt/trusted.gpg.d/influxdb.gpg] https://repos.influxdata.com/${DISTRIB_ID,,} ${DISTRIB_CODENAME} stable" | sudo tee /etc/apt/sources.list.d/influxdb.list sudo apt update sudo apt install influxdb サービスを起動して状態を確認します。 sudo systemctl unmask influxdb.service sudo systemctl start influxdb sudo systemctl status influxdb 以下のような表示があれば成功です。 username@ubuntu:~$ sudo systemctl status influxdb * influxdb.service - InfluxDB is an open-source, distributed, time series databa Loaded: loaded (/lib/systemd/system/influxdb.service; enabled; vendor preset: Active: active (running) since Fri 2021-12-24 07:51:02 UTC; 6s ago Docs: https://docs.influxdata.com/influxdb/ Process: 4191 ExecStart=/usr/lib/influxdb/scripts/influxd-systemd-start.sh (co Main PID: 4192 (influxd) Tasks: 9 (limit: 2366) CGroup: /system.slice/influxdb.service `-4192 /usr/bin/influxd -config /etc/influxdb/influxdb.conf Dec 24 07:51:01 ubuntu influxd-systemd-start.sh[4191]: ts=2021-12-24T07:51:01.75 Dec 24 07:51:01 ubuntu influxd-systemd-start.sh[4191]: ts=2021-12-24T07:51:01.75 ... データ収集 PyModbus4を使用したPythonプログラムで装置からデータを取得します。 今回は装置の代わりにPC上でModbusサーバーを起動します。 Note PyModubsを利用したデータ収集の詳細については以前の記事をご覧ください。 Modbusサーバーの準備 PC上でModbusサーバーを起動します。 定期的に値を書き換えるために、updating_server.py(ライセンスはこちら)を一部変更したプログラムを使用します。 今回は保持レジスタに2つの時系列データを、コイルに1つの2値データを格納します。 関数updating_writerを以下のコードに変更します。 import random def updating_writer(a): context = a[0] slave_id = 0x00 # コイル1への書き込み fx = 1 address = 0x01 values = [bool(random.getrandbits(1))] log.debug("coil1: " + str(bool(random.getrandbits(1)))) context[slave_id].setValues(fx, address, values) # 保持レジスタ2への書き込み fx = 3 address = 0x02 values = context[slave_id].getValues(fx, address, count=1) values[0] += random.randint(-10, 10) if values[0] > 100: values[0] = 100 elif values[0] < 0: values[0] = 0 log.debug("holding2: " + str(values[0])) context[slave_id].setValues(fx, address, values) # 保持レジスタ3への書き込み fx = 3 address = 0x03 values = context[slave_id].getValues(fx, address, count=1) values[0] += random.randint(-5, 5) if values[0] > 50: values[0] = 50 elif values[0] < 0: values[0] = 0 log.debug("holding3: " + str(values[0])) context[slave_id].setValues(fx, address, values) プログラム下部の以下のコードを書き換えます。 # 変更前 StartTcpServer(context, identity=identity, address=("localhost", 5020)) # 変更後 StartTcpServer(context, identity=identity, address=("", 5020)) 必要なパッケージをインストールしてプログラムを起動します。 python -m pip install twisted pymodbus python updating_server.py エッジデバイスでのデータ収集 エッジデバイスで以下のプログラムclient.pyを実行してサーバーのデータを5秒周期で収集します。 <IP_ADDRESS_OF_SERVER_PC>はModbusサーバーを起動しているPCのIPアドレスで置き換えてください。 client.py #!/usr/bin/env python from pymodbus.client.sync import ModbusTcpClient as ModbusClient import time # Modbusサーバーに接続 mbclient = ModbusClient('<IP_ADDRESS_OF_SERVER_PC>', port=5020) mbclient.connect() while True: try: # コイル1の値読み取り rr = mbclient.read_coils(1, 1) co1 = rr.bits[0] # 保持レジスタ2,3の値読み取り rr = mbclient.read_holding_registers(2, 2) hr2 = rr.registers[0] hr3 = rr.registers[1] print("coil1: {0}, holding2: {1}, holding3: {2}".format(co1, hr2, hr3)) # 5秒スリープ time.sleep(5) except KeyboardInterrupt: # 切断 mbclient.close() break except Exception as e: print(e) プログラムを起動して、以下のような出力があれば成功です。 username@ubuntu:~$ python3 client.py coil1: False, holding2: 89, holding3: 12 coil1: True, holding2: 85, holding3: 13 coil1: True, holding2: 78, holding3: 14 ... # Ctrl+Cで終了 Note このプログラムによる周期的なデータ収集は簡易的なものであり、周期は正確ではありません。 より正確な周期でデータ収集を行いたい方は他の方法をご検討ください。 データ蓄積 エッジデバイスで収集したデータをInfluxDBに蓄積します。 データベースの準備 InfluxDBを起動してデータベースを作成します。 今回はsampleDBという名前でデータベースを作成します。 sudo systemctl start influxdb username@ubuntu:~$ influx Connected to http://localhost:8086 version 1.8.10 InfluxDB shell version: 1.8.10 > CREATE DATABASE sampleDB > exit データベースへの格納 収集したデータを作成したデータベースに格納します。 InfluxDBのサンプルプログラムを参考にしてエッジデバイスでのデータ収集で作成したclient.pyに、データを格納するコードを追加します。 追加後のコードは以下のようになります。 client.py #!/usr/bin/env python from pymodbus.client.sync import ModbusTcpClient as ModbusClient from influxdb import InfluxDBClient from datetime import datetime import time # Modbusサーバーに接続 mbclient = ModbusClient('<IP_ADDRESS_OF_SERVER_PC>', port=5020) mbclient.connect() # データベースに接続 dbclient = InfluxDBClient(host='localhost', port=8086, database='sampleDB') while True: try: # コイル1の値読み取り rr = mbclient.read_coils(1, 1) co1 = rr.bits[0] # 保持レジスタ2,3の値読み取り rr = mbclient.read_holding_registers(2, 2) hr2 = rr.registers[0] hr3 = rr.registers[1] print("coil1: {0}, holding2: {1}, holding3: {2}".format(co1, hr2, hr3)) # データベースへの書き込み json_body = [ { "measurement": "sample_measurement", "time": datetime.utcnow(), "fields": { "coil_1": co1, "holding_register_2": hr2, "holding_register_3": hr3 } } ] print("Write points: {0}".format(json_body)) dbclient.write_points(json_body) # 5秒スリープ time.sleep(5) except KeyboardInterrupt: # 切断 mbclient.close() dbclient.close() break except Exception as e: print(e) Note データベースへデータを格納する度にSDカードへの書き込みが発生します。 SDカードへの高頻度な書き込みを避けたい場合は、RAMディスクやネットワークディスクなど適当なストレージを用意し、InfluxDBの設定で格納先として指定してください。 プログラムを起動して数十秒待ちます。 python3 client.py # Ctrl+Cで終了 エッジデバイスで以下のコマンドを実行してテーブルの中身を表示します。以下のようにデータが格納されていたら成功です。 username@ubuntu:~/modbus$ influx Connected to http://localhost:8086 version 1.8.10 InfluxDB shell version: 1.8.10 > USE sampleDB Using database sampleDB > SELECT * FROM "sample_measurement" LIMIT 3 name: sample_measurement time coil_1 holding_register_2 holding_register_3 ---- ------ ------------------ ------------------ 1641876882000000000 false 23 13 1641876887000000000 false 16 14 1641876892000000000 true 15 13 可視化とモニタリング InfluxDBに蓄積したデータをGrafanaで可視化します。 エッジデバイスで以下のコマンドを実行してGrafanaを起動します。 sudo systemctl start grafana-server PCでhttp://<IP_ADDRESS_OF_EDGE_DEVICE>:3000にアクセスしてGrafanaを開きます。 ログイン画面が開きます。ユーザー名とパスワードの初期設定値はどちらもadminです。必要に応じて変更してください。 データベースの登録 Grafanaにデータベースの準備で作成したsampleDBを登録します。 左端の歯車のアイコンをクリックしてConfiguration > Data Sourcesと進み、「Add data source」をクリックします。 InfluxDBをクリックします。 設定画面が表示されます。以下の項目を入力し、「Save & test」をクリックします。 項目 設定値 URL http://localhost:8086 Database sampleDB 時系列データの可視化 ダッシュボードを作成し、収集した時系列データをグラフ化します。 今回は保持レジスタ2の値を生データの折れ線グラフ、保持レジスタ3の値を1分毎の平均値の棒グラフで可視化します。 左端の+アイコンをクリックしてCreate > Dashboardと進み、「Add a new panel」をクリックします。 保持レジスタ2(holding_register_2)のグラフの設定を以下の画面のように設定を行い、「Apply」をクリックします。 Note 鉛筆のマークをクリックし以下のクエリ文を入力することでクエリの設定を行うこともできます。 SELECT "holding_register_2" FROM "sample_measurement" WHERE $timeFilter 「Add panel」のアイコンをクリックして同様に保持レジスタ3(holding_register_3)の棒グラフを作成します。以下の画面のように設定を行います。 Note 以下のクエリ文でクエリの設定を行うこともできます。 SELECT mean("holding_register_3") FROM "sample_measurement" WHERE $timeFilter GROUP BY time(1m) 2値データの可視化 コイル1の最新の状態をtrue/falseで表示する可視化を行います。 「Add panel」のアイコンをクリックしてパネルを追加し、右側のメニューから「Stat」を選択して以下のように設定を行います。 Note 以下のクエリ文ででクエリの設定を行うこともできます。 SELECT "coil_1" FROM "sample_measurement" WHERE $timeFilter すべての作業が完了すると以下のような画面になります。「Save dashboard」をクリックしてダッシュボードを保存します。 まとめ 今回はPythonを使ったModbus通信によるデータ収集からInfluxDBでのデータ蓄積、Grafanaによる可視化までをエッジデバイス上で行いました。 クラウド等の外部のシステムに接続せずにローカルで可視化を行うことができるので、手軽に可視化を行いたいときに便利です。是非ご活用ください。 産業用AIプラットフォーム | 横河電機株式会社 ↩ Install on Debian/Ubuntu | Grafana Labs ↩ Install InfluxDB OSS | InfluxDB OSS 1.8 Documentation ↩ GitHub - riptideio/pymodbus: A full modbus protocol written in python ↩
- 投稿日:2022-01-31T15:29:31+09:00
Python のログを JSON で出力する
CloudWatch Logs にログを吐き出していたところ、Traceback の改行文字で分割されていてエラー内容が非常に追いづらいという体験をした。 どうやら CloudWatch Logs は JSON でログを吐けばよしなにパースしてくれるらしく、本番環境のログは JSON で吐き出したほうが良さそうということで、Python のログを Traceback つきで JSON 出力するようにしてみた。 初期設定 from os import getenv import logging import json class CustomLogFormatter(logging.Formatter): """ログを JSON で出力するフォーマッタ""" def format(self, record: logging.LogRecord) -> str: try: data = vars(record) exc_info = data.pop("exc_info") if exc_info: data["traceback"] = self.formatException(exc_info).splitlines() return json.dumps(data) except: return super().format(record) log_handler = logging.StreamHandler() if getenv("LOG_FORMAT", "default").lower() == "json": log_handler.setFormatter(CustomLogFormatter()) logging.basicConfig(handlers=[log_handler], level=getenv("LOG_LEVEL", logging.WARNING)) 環境変数で設定を切り替えれるようにしているが、そのへんは好みで Traceback を行ごとに区切っている (splitlines) が、これは CloudWatch Logs で見やすくするためで、区切らなくてもいいとは思う json.dumps が何らかの理由で失敗したときなどにログをロストしないように super().format(record) している 使い方 from logging import getLogger logger = getLogger(__name__) try: 1/0 # 例外を発生させる except: logger.exception("Unexpected error occurred") ルートロガーの設定を変更しているので、 getLogger で新しくロガーを作成した場合でも特に何も設定する必要はない 例外をキャッチした際に logger.exception を使っておけば LogRecord に exc_info が含まれるので、特に何もしなくても Traceback がログ出力されるので便利 logger.exception("...") は logger.error("...", exc_info=True) と同じ (たぶん) なので、ログレベルは ERROR になる extra に渡した追加データもそのままログに出力される 例: logger.exception("...", extra={"user_id": "..."}) 出力されるエラー ※ 本来は改行されていない1行の JSON 文字列が出力されるが、ここでは見やすくするために整形している { "name": "__main__", "msg": "Unexpected error occurred", "args": [], "levelname": "ERROR", "levelno": 40, "pathname": "main.py", "filename": "main.py", "module": "main", "exc_text": null, "stack_info": null, "lineno": 8, "funcName": "<module>", "created": 1643609361.1079879, "msecs": 107.98788070678711, "relativeCreated": 3.8950443267822266, "thread": 4410674688, "threadName": "MainThread", "processName": "MainProcess", "process": 17896, "traceback": [ "Traceback (most recent call last):", " File \"main.py\", line 6, in <module>", " 1/0", "ZeroDivisionError: division by zero" ] } 参考 https://docs.python.org/ja/3/library/logging.html https://docs.python.org/ja/3/library/traceback.html https://github.com/python/cpython/blob/main/Lib/logging/__init__.py https://dev.classmethod.jp/articles/lambda-python-log-output-json/
- 投稿日:2022-01-31T14:40:11+09:00
Pythonで特定ディレクトリ内の特定文字列を含むファイルを列挙する
表題の通り,pythonで特定ディレクトリ内の任意の深さにあるファイルを列挙するプログラムを書きました. 実行例は以下の通りです. # pathはホームディレクトリを起点として考えています. $ python --word "word to search!!!" --path this_directory コードは以下の通りです. import os from argparse import ArgumentParser from typing import Tuple def _is_word_in_file(word: str, file: str) -> bool: with open(file, 'r') as f: for line in f: if word in line: return True else: return False def _get_args() -> Tuple[str, str, str]: parser = ArgumentParser() parser.add_argument( '--path', required=True, type=str, help='The path of interest' ) parser.add_argument( '--word', required=True, type=str, help='The word to check whether each file has' ) parser.add_argument( '--ext', default='.py', type=str, help='The extension of files to check such as `.tex`, `.py`, `.csv`.\n' 'You can omit the period and just parse such as `tex`, `py`, `csv` as well' ) try: args = parser.parse_args() except: import sys parser.print_help() sys.exit() path, word, ext = args.path, args.word, args.ext if not ext.startswith('.'): ext = '.' + ext return path, word, ext if __name__ == '__main__': path, word, ext = _get_args() path = path if 'home' in path else os.path.join(os.environ['HOME'], path) for root, dirs, files in os.walk(path): if len(files) == 0: continue target_files = [os.path.join(root, fn) for fn in files if fn.endswith(ext)] passed_files = [fn for fn in target_files if _is_word_in_file(word, fn)] if len(passed_files) > 0: print('\n'.join(passed_files))
- 投稿日:2022-01-31T14:13:41+09:00
[py2rb] 抽象基底クラス
はじめに 移植やってます。 ( from python 3.7 to ruby 2.7 ) 抽象基底クラス (Python) from collections.abc import Sized print(isinstance([1, 2], Sized)) print(issubclass(list, Sized)) print(isinstance([], Sized)) print(isinstance(set({1, 2}), Sized)) print(isinstance(3, Sized)) print(isinstance('Hello, world!', Sized)) print(isinstance({'id': 1, 'name': 'John'}, Sized)) # output True True True True False True True こちらの記事が分かりやすい。 defined? (Ruby) require 'set' def sizeable(obj) if obj.is_a?(Integer) false else begin obj.size true rescue => exception obj.method_defined? :size, true end end end puts sizeable(3) puts sizeable([]) puts sizeable([1, 2]) puts sizeable(Set.new([1, 2])) puts sizeable({'id': 1, 'name': 'John'}) puts sizeable('Hello, world!') puts sizeable(Array) puts sizeable(Class) # output false true true true true true true false Integer.sizeってバイト数を返すんですね。 花より団子で。 メモ Python の 抽象基底クラス を学習した 百里を行く者は九十里を半ばとす
- 投稿日:2022-01-31T13:52:38+09:00
suumo賃貸物件検索サイトのスクレイピング
はじめに pythonのBeautifulSoupを使用して、押上駅周辺のsuumoの賃貸物件情報をスクレイピングしました。以下の内容はコードとその解説となります。 また、スクレイピングする際はそのサイトの利用規約に従う必要があります。 suumoについては個人利用であれば問題はないとのことでした。 (suumoの利用規約はこちらsuumo利用規約) その他、スクレイピングを行う際はいろいろと気をつけなければならないことがあるので注意してください。 目次 ・コード ・解説 1. 必要なモジュールのインポート 2. リストの作成 3. ユーザーエージェントの設定 4. スクレイピング 5. csvファイルで出力 ・まとめ コード 今回、私が作成したコードは以下になります。(jupyternotebookを使用しています) import requests from bs4 import BeautifulSoup from tqdm import tqdm as tqdm import pandas as pd import time # スクレイピングしたデータを一時的に格納するリスト name = [] station = [] price = [] sikikinreikin = [] room = [] age = [] address = [] # ユーザーエージェント名を設定 user_agent = "(ユーザーエージェント名)" header = {'user-agent':user_agent} # スクレイピング開始 for p in tqdm(range(1,539)): p=str(p) url="https://suumo.jp/jj/chintai/ichiran/FR301FC005/?shkr1=03&cb=0.0&shkr3=03&rn=0440&shkr2=03&mt=9999999&ar=030&bs=040&shkr4=03&ct=9999999&ra=013&ek=044006820&cn=9999999&mb=0&fw2=&et=9999999" + "&page=" + p urls=requests.get(url,headers=header) time.sleep(2) urls.encoding = urls.apparent_encoding soup=BeautifulSoup() soup=BeautifulSoup(urls.content,"html.parser") house_name = soup.find_all("a",class_="js-cassetLinkHref") station_name = soup.find_all("div",style="font-weight:bold") table_data = pd.read_html(urls.content) for h in house_name: name.append(h.text) for s in station_name: station.append(s.text) for i in range(0,30): table = table_data[i] price.append(table[0]) sikikinreikin.append(table[1]) room.append(table[2]) age.append(table[3]) address.append(table[4]) # リストをシリーズに変換 name_s=pd.Series(name) station_s=pd.Series(station) price_s=pd.Series(price) sikikinreikin_s=pd.Series(sikikinreikin) room_s=pd.Series(room) age_s = pd.Series(age) address_s=pd.Series(address) # シリーズを結合 df=pd.concat([name_s,station_s,price_s,sikikinreikin_s,room_s,age_s,address_s],axis=1) # カラム名を付け直す df.columns=["name","station","price","sikikinreikin","room","age","address"] # csvで保存 df.to_csv('data/suumo.csv',header=True, index=False) 解説 上から解説していきます。 1. 必要なモジュールのインポート import requests from bs4 import BeautifulSoup from tqdm import tqdm as tqdm import pandas as pd import time requestsはwebページを取得際に必要となります。 BeautifulSoupはHTMLを解析する際に必要となります。 tqdmはスクレイピングの進行状況を確認する際に使用します。 pandasは取得したデータをデータフレームにする際に使用します。 timeは連続してwebページにアクセスしないようにするため必要です。 2. リストの作成 name = [] station = [] price = [] sikikinreikin = [] room = [] age = [] address = [] スクレイピングして取得したデータを一時的に保存しておくリストです。 3. ユーザーエージェントの設定 user_agent = "(ユーザーエージェント名)" header = {'user-agent':user_agent} スクレイピングするにあたってユーザーエージェントを設定する必要があります。 ユーザーエージェントを設定しないと不自然なアクセスとみなされ、通信が切断される可能性があります。 使用しているブラウザでこのサイトにアクセスするとユーザーエージェントを確認できます。ページの中にブラウザのユーザーエージェントという項目があるので、その値をuser_agentに代入してください。 (この辺りはもう少し調べて学ぶ必要があると感じました。) 4. スクレイピング まずはスクレイピングしたいサイトについて確認します。 押上駅周辺で検索したところ物件数は16136件でした。(2022年1月現在) また、物件を部屋ごとに表示させたところ、538ページあることがわかりました。 つまり、538ページ分のwebページにアクセスする必要があることになります。 そこで、下記のようなコードで538ページ分のページにアクセスしました。 for p in tqdm(range(1,539)): p=str(p) url="https://suumo.jp/jj/chintai/ichiran/FR301FC005/?shkr1=03&cb=0.0&shkr3=03&rn=0440&shkr2=03&mt=9999999&ar=030&bs=040&shkr4=03&ct=9999999&ra=013&ek=044006820&cn=9999999&mb=0&fw2=&et=9999999" + "&page=" + p urls=requests.get(url,headers=header) time.sleep(2) urlの後ろに&page=pと指定するとそのページのpページ目にアクセスすることができます。 url + &page= + p でrangeを指定しfor文を回すことで範囲内のページ全てにアクセスすることができます。 requests.get(url,headers=header)で指定のurlのwebページを取得します。この時、引数にheaders=header (headerは先ほど設定したユーザーエージェント名)とすることでヘッダーを設定できます。 ここで一度webサイトにアクセスしたので、連続してwebサイトにアクセスするのを防ぐためにtime.sleep(2)で2秒待ちます。 urls.encoding = urls.apparent_encoding soup=BeautifulSoup() soup=BeautifulSoup(urls.content,"html.parser") house_name = soup.find_all("a",class_="js-cassetLinkHref") station_name = soup.find_all("div",style="font-weight:bold") table_data = pd.read_html(urls.content) urls.apparent_encodingは文字化け対策で必要です。 BeautifulSoup(urls.content,"html.parser")で取得したwebページのhtmlを解析します。 まずは家の名前を取得するのですが、その前にhtmlのどのタグに家の名前の要素があるかどうかを確認する必要があります。 要素の確認は、スクレイピングしたいwebページにアクセスし、取得したい要素の上で右クリック→検証で行うことができます。 確認したところ、物件の名前は"a"タグのclass="js-cassetLinkHref"にあることがわかりました。 soup.find_all("a",class_="js-cassetLinkHref")と書くことで、そのページ内の全ての"js-cassetLinkHref"クラスの要素を取得します。 classはpythonの予約語として設定されているので、コードを書くときはclass_のようにします。 最寄駅については、今回は太字になっているところのみを取得しました。 また、pd.read_html(urls.content)とすることで、そのwebページの中のtable属性のデータをリスト型で取得できます。今回はこれを利用し、敷金礼金や部屋の広さといったデータを取得しました。 今回のwebページでは0~29番目までのテーブルが物件のデータになっているのでfor文でリストに格納していきます。 for h in house_name: name.append(h.text) for s in station_name: station.append(s.text) for i in range(0,30): table = table_data[i] price.append(table[0]) sikikinreikin.append(table[1]) room.append(table[2]) age.append(table[3]) address.append(table[4]) 5. csvファイルで出力 # リストをシリーズに変換 name_s=pd.Series(name) station_s=pd.Series(station) price_s=pd.Series(price) sikikinreikin_s=pd.Series(sikikinreikin) room_s=pd.Series(room) age_s = pd.Series(age) address_s=pd.Series(address) # シリーズを結合 df=pd.concat([name_s,station_s,price_s,sikikinreikin_s,room_s,age_s,address_s],axis=1) # カラム名を付け直す df.columns=["name","station","price","sikikinreikin","room","age","address"] # csvで保存 df.to_csv('data/suumo.csv',header=True, index=False) 最後に、作成したリストをシリーズに変換&結合し、カラム名を付け直したらcsvファイルで保存します。 まとめ 上記のコードで、スクレイピングにかかった時間は約30分でした。 また、今回取得したデータはそのままでは汚すぎて使うことができません。そのため、前処理をして必要なデータを使える形にする必要があります。 リストに格納する際のコードを工夫すれば前処理にかかる手間が少し軽くなるかもしれません。
- 投稿日:2022-01-31T12:43:58+09:00
【Project Euler】Problem 63: べき乗の桁数
本記事はProjectEulerの「100番以下の問題の説明は記載可能」という規定に基づいて回答のヒントが書かれていますので、自分である程度考えてみてから読まれることをお勧めします。 問題 63. べき乗の桁数 原文 Problem 63: Powerful digit sum 問題の要約:$a^n$の桁数が$n$になるような$a^n$の個数を求めよ 例: * $16807=7^5, 134217728=8^9$ $a$の上限が9, $n$の上限が21であることが分かれば後は簡単です。 print(f"Answer: {sum([len(str(a**n)) == n for a in range(1,10) for n in range(1,22)])}") (開発環境:Google Colab)
- 投稿日:2022-01-31T12:22:17+09:00
QISkit ショアのアルゴリズムで素因数分解
1.はじめに 量子因数分解の方法としてショアのアルゴリズムが提案されている.参考文献1)ではN=21,m=5の場合の解法の手順が紹介されているが,QISkitによる実装は紹介されていない.本稿では,参考文献2)の手法を用いて,N=21,m=2などの場合についてQISkitで因数分解を行なえるプログラムを作成したので報告する. 2.問題設定 1)因数分解対象 N=21,m=2などを入力.現在,下記は確認している. 表1 確認ずみ因数分解組合わせ 2)解析システム RaspberryPi4+Python+QISkit 3.真理値表の作成 表2にN=21,m=2の場合の真理値表を示す.真理値表はExcelを用いて作成した.変数xの範囲は,y(1,2,4,8,16,11)が2周期以上現れるように,4ビットとした.実装したプログラムでは,自動的に真理値表を作成しているが,Nとmの組み合わせによっては位数rが大きくなりすぎて処理できない場合もあるので,事前に確認しておいた方が安全である. 表2 真理値表 N=21,m=2 4.QISkitプログラム実装 QISkitによる実装プログラムは下記の通り.オラクルの記述は,参考文献2)では試行錯誤により論理回路の簡略化を行っているが,一行ずつ自動的に変換する方法とした. 以下プログラムに従って処理内容を説明する. (1)ライブラリのインポート from qiskit import * from qiskit.circuit.library.standard_gates import C3XGate, C4XGate,MCXGate from qiskit.tools.visualization import * from qiskit import execute, Aer from fractions import gcd, Fraction # for shor import math import matplotlib.pyplot as plt from qiskit import IBMQ, QuantumCircuit, ClassicalRegister, QuantumRegister from qiskit import execute, Aer from qiskit.qasm import pi from qiskit.tools.visualization import plot_histogram from qiskit import IBMQ, transpile from qiskit.providers.ibmq.managed import IBMQJobManager from qiskit.quantum_info import Statevector from qiskit.circuit.library import RYGate from qiskit.circuit.add_control import add_control import numpy as np # np.dot(x, y) (2)IQFT関連プログラム 参考文献2)を使用させていただいた. def Swap(qci,s1,s2): qci.cx(s1,s2) qci.cx(s2,s1) qci.cx(s1,s2) def iqft(qci, q, n): for i in range(n): for j in range(i):qci.cu1(-math.pi/float(2**(i-j)), q[i], q[j]) qci.h(q[i]) (3)因数分解対象N, mの設定 N,mが互いに素であることは確認している. 現在下記の組み合わせは確認している.現在の設定では最大のNは60000(2**16)程度. 表1 確認済組み合わせ(再掲) while 1: N = int(input('Enter an integer N: ')) print('You entered : ', N) m = int(input('Enter an integer m: ')) print('You entered : ', m) if gcd(N,m) ==1: break (4)レジスタの設定 N=50000程度を想定し設定している. bx=9 print(bx) by=16 print(by) cn=by qx = QuantumRegister(bx) qy = QuantumRegister(by) c = ClassicalRegister(cn) qcx = QuantumCircuit(qx, c) qcy = QuantumCircuit (qy, c ) qc=qcx+qcy (5)オラクルの記述 一行ずつ自動的に変換している. for i in range(bx): qc.h(qx[i]) for i in range(2**bx): X=i for j in range(bx): if (X & 2**(bx-1))==0: qc.x(qx[j]) X=X<< Y=(m**i)%N for j in range(by): if (Y & 2**bx)!=0: if bx==3: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qy[j]]) elif bx==4: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qy[j]]) elif bx==5: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qy[j]]) elif bx==6: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qy[j]]) elif bx==7: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qy[j]]) elif bx==8: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qy[j]]) elif bx==9: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qx[8],qy[j]]) elif bx==10: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qx[8],qx[9],qy[j]]) elif bx==11: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qx[8],qx[9],qx[10],qy[j]]) else: print("bx is too large") Y=Y<<1 X=i for j in range(bx): if (X & 2**(bx-1))==0: qc.x(qx[j]) X=X<<1 (6)IQFTの実行 参考文献2)を使用させていただいた. iqft(qc, qx, bx) if bx==3: Swap(qc, qx[0], qx[2]) elif bx==4: Swap(qc, qx[0], qx[3]) Swap(qc, qx[1], qx[2]) elif bx==5: Swap(qc, qx[0], qx[4]) Swap(qc, qx[1], qx[3]) elif bx==6: Swap(qc, qx[0], qx[5]) Swap(qc, qx[1], qx[4]) Swap(qc, qx[2], qx[3]) elif bx==7: Swap(qc, qx[0], qx[6]) Swap(qc, qx[1], qx[5]) Swap(qc, qx[2], qx[3]) elif bx==8: Swap(qc, qx[0], qx[7]) Swap(qc, qx[1], qx[6]) Swap(qc, qx[2], qx[5]) Swap(qc, qx[3], qx[4]) elif bx==9: Swap(qc, qx[0], qx[8]) Swap(qc, qx[1], qx[7]) Swap(qc, qx[2], qx[6]) Swap(qc, qx[3], qx[5]) elif bx==10: Swap(qc, qx[0], qx[9]) Swap(qc, qx[1], qx[8]) Swap(qc, qx[2], qx[7]) Swap(qc, qx[3], qx[6]) Swap(qc, qx[4], qx[5]) elif bx==11: Swap(qc, qx[0], qx[10]) Swap(qc, qx[1], qx[9]) Swap(qc, qx[2], qx[8]) Swap(qc, qx[3], qx[7]) Swap(qc, qx[4], qx[6]) else: print("bx is too large") exit() fig=qc.draw(fold=-1) #print(fig) (7)シミュレーションの実行 for i in range(bx): qc.measure ( qx[bx-1-i] , c[i] ) backend_sim = Aer.get_backend('aer_simulator') r = execute(qc,backend_sim, shots=4096*4).result() rc = r.get_counts() print(rc) 図1にIQFT実行後のシミュレーション結果を示す. 図1 IQFT後の量子ビット|x>の観測結果 (8)位相推定と因数分解処理 以下の推定は主に参考文献1)を参考に行った. 1)IQFT後の量子ビット|x>のピーク値検出 IGFT後の出現割合が0.05程度以上の量子ビット|x>を検出. 図1の場合は下記. m=0,3,5,8,11,13 2)位数の推定 Mと位数rには下記の関係がある. 上記に位相推定結果を代入すると,下記のように位数r=6としたときの結果とほぼ一致する. したがって,位数r=6と推定できる. プログラムでは,n=0,1,2,3,4,5とm=0,3,5,8,11,13とを最小二乗法で直線補間し,その傾きaから位相 r=round((2**bx)/a)としている.図2に直線補間した結果を示す. 図2 n, m間の直線補間 x = [] y = [] j=0 for i in r.get_counts(): c=rc.get(i) if c > 600: x.append(j) y.append(int(i,2)) j=j+1 y.sort() print(x) print(y) xx = np.array(x) yy = np.array(y) def reg1dim(x, y): a = np.dot(x, y)/ (x**2).sum() return a a = reg1dim(xx, yy) r=round((2**bx)/a) print(r) 3)素因数の計算 参考文献2)を参考に作成した.手順は下記. p=m**(r/2)-1 q=m**(r/2)+1 pp=gcd(p,N) qq=gcd(q,N) print("N=",N," m=",m," r=",r) print("p=",int(pp)) print("q=",int(qq)) ・直線補間図出力 出力結果は図2. plt.scatter(xx, yy, color="k") plt.plot([0,xx.max()], [0, a * xx.max()]) # x.max() plt.show() ・IQFT実行後のシミュレーション結果出力 出力結果は図1. plt=plot_histogram (rc, figsize = (12,7)) plt.show() [参考] コメントのない実行ファイルを添付しておきます. from qiskit import * from qiskit.circuit.library.standard_gates import C3XGate, C4XGate,MCXGate from qiskit.tools.visualization import * from qiskit import execute, Aer from fractions import gcd, Fraction # for shor import math import matplotlib.pyplot as plt from qiskit import IBMQ, QuantumCircuit, ClassicalRegister, QuantumRegister from qiskit import execute, Aer from qiskit.qasm import pi from qiskit.tools.visualization import plot_histogram from qiskit import IBMQ, transpile from qiskit.providers.ibmq.managed import IBMQJobManager from qiskit.quantum_info import Statevector from qiskit.circuit.library import RYGate from qiskit.circuit.add_control import add_control import numpy as np def Swap(qci,s1,s2): qci.cx(s1,s2) qci.cx(s2,s1) qci.cx(s1,s2) def iqft(qci, q, n): for i in range(n): for j in range(i):qci.cu1(-math.pi/float(2**(i-j)), q[i], q[j]) qci.h(q[i]) while 1: N = int(input('Enter an integer N: ')) print('You entered : ', N) m = int(input('Enter an integer m: ')) print('You entered : ', m) if gcd(N,m) ==1: break bx=9 print("bx=",bx) by=16 print("by=",by) cn=by qx = QuantumRegister(bx) qy = QuantumRegister(by) c = ClassicalRegister(cn) qcx = QuantumCircuit(qx, c) qcy = QuantumCircuit (qy, c ) qc=qcx+qcy for i in range(bx): qc.h(qx[i]) for i in range(2**bx): X=i for j in range(bx): if (X & 2**(bx-1))==0: qc.x(qx[j]) X=X<<1 Y=(m**i)%N for j in range(by): if (Y & 2**bx)!=0: if bx==3: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qy[j]]) elif bx==4: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qy[j]]) elif bx==5: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qy[j]]) elif bx==6: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qy[j]]) elif bx==7: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qy[j]]) elif bx==8: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qy[j]]) elif bx==9: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qx[8],qy[j]]) elif bx==10: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qx[8],qx[9],qy[j]]) elif bx==11: qc.append(MCXGate(bx),[qx[0],qx[1],qx[2],qx[3],qx[4],qx[5],qx[6],qx[7],qx[8],qx[9],qx[10],qy[j]]) else: print("bx is too large") Y=Y<<1 X=i for j in range(bx): if (X & 2**(bx-1))==0: qc.x(qx[j]) X=X<<1 iqft(qc, qx, bx) if bx==3: Swap(qc, qx[0], qx[2]) elif bx==4: Swap(qc, qx[0], qx[3]) Swap(qc, qx[1], qx[2]) elif bx==5: Swap(qc, qx[0], qx[4]) Swap(qc, qx[1], qx[3]) elif bx==6: Swap(qc, qx[0], qx[5]) Swap(qc, qx[1], qx[4]) Swap(qc, qx[2], qx[3]) elif bx==7: Swap(qc, qx[0], qx[6]) Swap(qc, qx[1], qx[5]) Swap(qc, qx[2], qx[3]) elif bx==8: Swap(qc, qx[0], qx[7]) Swap(qc, qx[1], qx[6]) Swap(qc, qx[2], qx[5]) Swap(qc, qx[3], qx[4]) elif bx==9: Swap(qc, qx[0], qx[8]) Swap(qc, qx[1], qx[7]) Swap(qc, qx[2], qx[6]) Swap(qc, qx[3], qx[5]) elif bx==10: Swap(qc, qx[0], qx[9]) Swap(qc, qx[1], qx[8]) Swap(qc, qx[2], qx[7]) Swap(qc, qx[3], qx[6]) Swap(qc, qx[4], qx[5]) elif bx==11: Swap(qc, qx[0], qx[10]) Swap(qc, qx[1], qx[9]) Swap(qc, qx[2], qx[8]) Swap(qc, qx[3], qx[7]) Swap(qc, qx[4], qx[6]) else: print("bx is too large") exit() fig=qc.draw(fold=-1) #print(fig) for i in range(bx): qc.measure ( qx[bx-1-i] , c[i] ) backend_sim = Aer.get_backend('aer_simulator') r = execute(qc,backend_sim, shots=4096*4).result() rc = r.get_counts() print(rc) x = [] y = [] j=0 for i in r.get_counts(): c=rc.get(i) if c > 600: x.append(j) y.append(int(i,2)) j=j+1 y.sort() print(x) print(y) xx = np.array(x) yy = np.array(y) def reg1dim(x, y): a = np.dot(x, y)/ (x**2).sum() return a a = reg1dim(xx, yy) r=round((2**bx)/a) print("r=",r) p=m**(r/2)-1 q=m**(r/2)+1 pp=gcd(p,N) qq=gcd(q,N) print("N=",N," m=",m," r=",r) print("p=",int(pp)) print("q=",int(qq)) plt.scatter(xx, yy, color="k") plt.plot([0,xx.max()], [0, a * xx.max()]) #plt.show() plt=plot_histogram (rc, figsize = (12,7)) #plt.show() 5.おわりに ショアのアルゴリズムをある程度自動的に可能なプログラムを作成した.しかし,mは試行錯誤的に行う必要がある.mの自動的検出は今後の課題である(かなり難しいか?). 参考文献 (1)ショアのアルゴリズムで21を素因数分解!量子コンピューターの基礎から解説! (2)中山茂:Python 量子プログラミング入門(Gaia教育シリーズ8)
- 投稿日:2022-01-31T11:10:09+09:00
災害拠点病院一覧のPDFをCSVに変換
import pathlib import camelot import pandas as pd import requests def fetch_file(url, dir="."): p = pathlib.Path(dir, pathlib.PurePath(url).name) p.parent.mkdir(parents=True, exist_ok=True) r = requests.get(url) r.raise_for_status() with p.open(mode="wb") as fw: fw.write(r.content) return p # ダウンロード url = "https://www.mhlw.go.jp/content/10800000/000773371.pdf" p = fetch_file(url) # PDF変換 tables = camelot.read_pdf( str(p), pages="all", split_text=True, strip_text=" ", line_scale=40 ) dfs = [pd.DataFrame(table.data[1:], columns=table.data[0]) for table in tables] df0 = pd.concat(dfs).set_index("No") df1 = df0.copy() df1["都道府県"] = df1["都道府県"].str.extract("(.+[都道府県])").fillna(method="ffill") df1 df1.to_csv("災害拠点病院一覧.csv", encoding="utf_8_sig") # 基幹・地域 集計 df2 = pd.crosstab(df1["都道府県"], df1["区分"]) df2 # 基幹・地域 抽出 df3 = ( df0["都道府県"] .str.split(expand=True) .dropna(how="all") .rename(columns={0: "都道府県", 1: "基幹", 2: "地域"}) ) df3.set_index("都道府県", inplace=True) df3["基幹"] = ( pd.to_numeric(df3["基幹"].str.replace("基幹", ""), errors="coerce") .fillna(0) .astype(int) ) df3["地域"] = ( pd.to_numeric(df3["地域"].str.replace("地域", ""), errors="coerce") .fillna(0) .astype(int) ) df3 # 基幹・地域 確認 (df3 - df2).reindex(df3.index)