20220110のPythonに関する記事は30件です。

Python学習(入出力)(初心者)

はじめに この記事は、約1年主にc言語を学んできた私がPythonに少しでも触れてみようとアウトプット用に書いたものです。ほとんどの方には参考にならないとは思います。間違い等あればご指摘いただけると嬉しいです。 Pythonとは まず、Pythonとは Pythonとは1991年にオランダ人のグイド・ヴァンロッサムというプログラマーによって開発され、オープンソースで運営されているプログラミング言語。Pythonのプログラミング言語としての主な特徴は、少ないコードで簡潔にプログラムを書けること、専門的なライブラリが豊富にあることなどが挙げられる。 https://www.internetacademy.jp/it/programming/programming-basic/what-is-python.html インターネット・アカデミー IT業界まるわかりガイド より Pythonはコードを書きやすく、そして読みやすくするために生まれた言語らしく、だれがコードを書いても似たようなコードになるそうです。 入出力 hello worldと入力して出力する。 string = input("入力") print(string) c言語の場合 #include <stdio.h> int main(void) { printf("入力\n"); char string1[15],string2[15]; scanf("%s %s", &string1,&string2); printf("%s %s", string1,string2); return 0; } c言語との違い 変数宣言がない(変数の型は代入された値によって自動的に決まる) c言語のようなライブラリ読み込みが必要ない 文末にセミコロンをつける必要がない main関数が必要ない など 終わりに 今回この記事を通して、ほんの少しだけではありますがPythonの人気の理由が分かった気がしました。特に今までc言語、c#しかやってこなかった私にとってこんなに書きやすく、読みやすい言語があるのかと驚きました。今後も少しずつPythonについて学習していければよいと思います。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python3】python-ulidで一意なIDを作成する

環境 Ubuntu20.04 Python3.8.10 python-ulid1.0.3 背景 前提事項 オブジェクト指向においてエンティティは「同一性によって識別されるオブジェクト」とされます。つまり、エンティティは一意なIDを持つ必要があります。(一意性が担保できればIDでなくても良いですが) 問題意識 今までIDの採番はRDBに任せていました。具体的にはidをautoincrementにしてinsertするときに自動採番するという方式です。 この方法には以下2つの問題があります。 DBにinsertするまでエンティティがIDを持たない。(つまり、エンティティの定義を満たしていない状況が生まれる) IDというドメイン層の関心事がインフラ層(DB)の責務になっている。 この問題を解決するには次の要件を満たす手段が必要です。 ドメイン層で実現する オブジェクト生成時にIDがセットされる 一意性が担保されている 一意である(DBの主キーとして利用するため) 順序が担保されている(ソートを考慮して) これらの条件を(ほぼ)満たすのがULID です。 ULIDとは Universally Unique Lexicographically Sortable Identifierの略 128bit長の文字列から成る 1ミリ秒ごとに1.21e + 24(約27)通りのIDを生成可能 辞書的にソート可能 特殊文字無し(URLセーフ) ULIDは上で挙げた条件を一通り満たしています。 特別な要件が無い場合とりあえずULIDを利用してよいでしょう。 実演 インストール手順 PythonでULIDを利用するのに必要な python-ulid を下記コマンドでインストールします。 $ pip install python-ulid インストールされたことを確認します。 $ pip list | grep python-ulid python-ulid 1.0.3 利用方法 ULIDクラスをインポート コンストラクタでULID()メソッドを呼び出し、id属性に値をセットする。(必要に応じて型をキャストする。 今回は文字列型にしています。) Hoge.py # ULIDクラスをインポート from ulid import ULID class Hoge(): def __init__(self, name: str) -> None:   # ULIDクラスのオブジェクトを生成してstr型にキャスト self.id = str(ULID()) 続いて実際にHogeクラスのオブジェクト生成してみます。 Hoge.py from ulid import ULID class Hoge(): def __init__(self, name: str) -> None: self.id = str(ULID()) # これより下が追記部分 hoge1 = Hoge() print(vars(hoge1)) hoge2 = Hoge() print(vars(hoge2)) print(hoge1.id is hoge2.id) $ python3 Hoge.py {'id': '01FS21597QRQQV7SVQSMJ3BMB3'} {'id': '01FS21597Q7PFPTZ9G6VNW0J4M'} False IDの一意性は確認することができました。 順序が担保されているかも確認します。 Hoge.py from ulid import ULID class Hoge(): def __init__(self, name: str) -> None: self.id = str(ULID()) hoge1 = Hoge() print(vars(hoge1)) hoge2 = Hoge() print(vars(hoge2)) # hoge2の方が後に作成されたので、hoge2.idの方が大きくなると想定 print(hoge1.id < hoge2.id) $ python3 Hoge.py {'id': '01FS21GB08J9G59JR0GD6VQTND'} {'id': '01FS21GB08F1PWB5N77AC0RYK8'} False hoge1のidの方が大きくなりました。 調べたところ、1ミリ秒以上の間隔をあけると確実に順序が担保されるが、それより感覚が短い場合は必ずしも順序が担保されないようです。 別の方が既に検証済みでしたので詳細は下記リンク参照。 実運用で1ミリ秒以内に同一クラスのオブジェクトが生成されることは起こらないと考えて、とりあえずは無視してしまっても良いかもしれません 課題・懸念点 テーブルのデータサイズが大きくなることによるパフォーマンス劣化 これは実際にパフォーマンスを計測してみないことには分かりません(レコード数が小さいならそれほど影響は無いと思いますが...) IDを含むエンドポイントが長くなりすぎる APIの動作確認が大変になりそうです。解決方法は今のところあてがありません まとめ 実開発でULIDを利用するとどのような弊害があるかまだ理解できていないので実際に使ってみたいと思います。 参考資料 shimojuboix ulid/spec ULID の順序性を確保するには ドメイン駆動設計入門 ソート可能なUUID互換のulidが便利そう 【Day 14】ID の採番問題【じゃんけんアドカレ】
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonでSlack のメッセージを取得するまで

アプリ作成 ここからCreate an app > From scratch を選択し、アプリ名とslackのどのワークスペースを使うか選択する。 スコープ App Manifestに oauth_config: scopes: bot: - channels:history - groups:history - im:history - mpim:history を追加して 以下に変える _metadata: major_version: 1 minor_version: 1 display_information: name: APP_NAME features: bot_user: display_name: APP_NAME always_online: false oauth_config: scopes: bot: - channels:history - groups:history - im:history - mpim:history settings: org_deploy_enabled: false socket_mode_enabled: false token_rotation_enabled: false ※ OAuth & Permissions > Scope > User Token Scopes > Add an OAuth Scopeから追加することもできる トークン作成 OAuth & Permissions > OAuth Tokens for Your Workspace から Install to Workspace を押して トークンを発行する。 コード import requests SLACK_CHANNEL_ID = 'SLACK_CHANNEL_ID' SLACK_URL = "https://slack.com/api/conversations.history" TOKEN = "TOKEN" def main(): payload = { "channel": SLACK_CHANNEL_ID } headersAuth = { 'Authorization': 'Bearer '+ str(TOKEN), } response = requests.get(SLACK_URL, headers=headersAuth, params=payload) json_data = response.json() msgs=[] for i in range(len(response.json()['messages'])): msgs.append(json_data['messages'][i]["text"]) msgs.reverse() return print(msgs) if __name__ == "__main__": main() slackの該当チャンネルまで行ってURLを確認すると https: //app.slack.com/client/T0123456789/C0123456789 といった感じになっており、 SLACK_CHANNEL_IDは 一番最後にある、ここで言う "C0123456789"である。 TOKENは先ほど発行したものであり、xoxbからスタートしている。 msgs.reverse()はmsgsでslackでのテキストメッセージを最新のものから取得するため、リスト内を時系列順に戻すためである。 メッセージの取得 ターミナルで このファイルを実行すると {'ok': False, 'error': 'not_in_channel'} とでた。 Slack api conversations.history returns error: not_in_channel によるとSlackの該当チャンネルにAPPを追加する必要があるようだったので、 slackの該当チャンネルの右上の人のマークを押し、インテグレーション>APPからアプリ追加を選択し、先ほど作ったアプリ名を追加し、再度ファイルを実行すると メッセージをリストで取得することができた。 展望 このコードでは返信をしたときのメッセージを取得することができないので、次は試みたい。 参考 Slackの特定チャンネルのメッセージをクロールする方法 Slack Api公式 Slack api conversations.history returns error: not_in_channel
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonでSlack の特定のチャンネルの全メッセージを取得するまで

アプリ作成 ここからCreate an app > From scratch を選択し、アプリ名とslackのどのワークスペースを使うか選択する。 スコープ App Manifestに oauth_config: scopes: bot: - channels:history - groups:history - im:history - mpim:history を追加して 以下に変える _metadata: major_version: 1 minor_version: 1 display_information: name: APP_NAME features: bot_user: display_name: APP_NAME always_online: false oauth_config: scopes: bot: - channels:history - groups:history - im:history - mpim:history settings: org_deploy_enabled: false socket_mode_enabled: false token_rotation_enabled: false ※ OAuth & Permissions > Scope > User Token Scopes > Add an OAuth Scopeから追加することもできる トークン作成 OAuth & Permissions > OAuth Tokens for Your Workspace から Install to Workspace を押して トークンを発行する。 コード import requests SLACK_CHANNEL_ID = 'SLACK_CHANNEL_ID' SLACK_URL = "https://slack.com/api/conversations.history" TOKEN = "TOKEN" def main(): payload = { "channel": SLACK_CHANNEL_ID } headersAuth = { 'Authorization': 'Bearer '+ str(TOKEN), } response = requests.get(SLACK_URL, headers=headersAuth, params=payload) json_data = response.json() msgs=[] for i in range(len(response.json()['messages'])): msgs.append(json_data['messages'][i]["text"]) msgs.reverse() return print(msgs) if __name__ == "__main__": main() slackの該当チャンネルまで行ってURLを確認すると https: //app.slack.com/client/T0123456789/C0123456789 といった感じになっており、 SLACK_CHANNEL_IDは 一番最後にある、ここで言う "C0123456789"である。 TOKENは先ほど発行したものであり、xoxbからスタートしている。 msgs.reverse()はmsgsでslackでのテキストメッセージを最新のものから取得するため、リスト内を時系列順に戻すためである。 メッセージの取得 ターミナルで このファイルを実行すると {'ok': False, 'error': 'not_in_channel'} とでた。 Slack api conversations.history returns error: not_in_channel によるとSlackの該当チャンネルにAPPを追加する必要があるようだったので、 slackの該当チャンネルの右上の人のマークを押し、インテグレーション>APPからアプリ追加を選択し、先ほど作ったアプリ名を追加し、再度ファイルを実行すると メッセージをリストで取得することができた。 展望 このコードでは返信をしたときのメッセージを取得することができないので、次は試みたい。 参考 Slackの特定チャンネルのメッセージをクロールする方法 Slack Api公式 Slack api conversations.history returns error: not_in_channel
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

# PySparkを使おう

0.PySparkとは? Apache Sparkは分散処理などを活用して、大容量データを分析するためのライブラリです。それをPythonのインターフェースから利用できるようにしたのがPySparkです。 さまざまなデータソースに適応でき、柔軟なデータ処理ができますので、まさにデータ処理を統一する際などに最適です。 用語 内容 Apache Spark 巨大なデータを高速に処理できる PySpark Apache SparkをPythonから使うことができるようにしたライブラリ 以下では主にWindows環境でのPySparkの利用をしてみます。 1.Apache Sparkのインストール 1.1. Apache Sparkのダウンロード まずはご本家からApache Sparkをインストールします。バージョンを選択して、tar.gzをダウンロードします。tar.gzファイルを解凍し手、適当な場所に置いておきます。 1.2. winutilsのインストール(Windows環境) Windows環境の場合、こちらのGitHubから該当するwinutils.exeファイルをダウンロードし、C:\xxx\spark-3.1.2-bin-hadoop3.2\binなどのbinフォルダにコピーします。 1.3.環境変数の設定 次に環境変数を設定します。 PATH 環境変数PATHに、C:\xxx\spark-3.1.2-bin-hadoop3.2\binのようにインストール先を追加 > set PATH=%PATH%;C:\xxx\spark-3.1.2-bin-hadoop3.2\bin HADOOP_HOME HADOOP_HOMEをC:\xxx\spark-3.1.2-bin-hadoop3.2のようにインストール先にします。 > set HADOOP_HOME=C:\xxx\spark-3.1.2-bin-hadoop3.2 SPARK_HOME 同様にSPARK_HOMEを設定します。 > set HADOOP_HOME=C:\xxx\spark-3.1.2-bin-hadoop3.2 1.4.動作確認 以上ができたら、コマンドラインからspark-shellを起動してみます。 >spark-shell 2.pyspark 続いてPython環境でSparkを使えるようにしていきましょう。 2.1. pysparkのインストール ご利用のPython環境で、pysparkを追加します。 pip install pyspark 2.2.環境変数の設定2 PYSPARK_PYTHON 利用するPython環境を環境変数PYSPARK_PYTHONに設定します。 set PYSPARK_PYTHON=C:\xxxx\bin\python.exe 2.3.動作確認 PySparkを起動してみましょう。 > pyspark 起動できたのであれば、プログラムを作っていきましょう。 3. PySparkを試す 基本的にはSparkContextオブジェクトを作り、データを操作します。ここでは二つのデータ形式について操作してみましょう。 3.1. RDDを使ってみる RDDとはRDD(Resilient Distributed Dataset)のことです。Apache Sparkのプログラミングでは、基本的にRDDにデータを保持して操作することが基本です。 PySparkではSparkContextオブジェクトを作成し、そこからさまざまなデータ処理を行います。 import pyspark from pyspark import SparkContext # SparkContextの作成 sc = SparkContext(appName='spark_sample') # RDDを作成する。 rdd = sc.parallelize([ (1, 'Foo'), (2, 'Bar'), (3, 'Baz'), ]) # RDDにかけるFilter関数(2以上の要素を取り出す) def filter_func(x): n,s = x return n >= 2 rdd = rdd.filter(filter_func) # 結果の表示 print(rdd.collect()) RDDにはfilter/mapなど関数型プログラミングで馴染みのメソッドが準備されていますので、自在な処理が可能です。 3.2. DataFrameを使ってみる DataFrameはデータベースのようなカラムを持ったテーブル構造です。RDDにデータのスキーマを与えて作成することができます。これによりSQLライクの検索も可能になります。 続いてJSONをDataFrame化してデータ処理を行ってみましょう。 sample.json sample.json {"name":"Alice","year":20} {"name":"Bob","year":25} さきほどはSparkContextをエントリとして使いましたが、__データセットやデータフレームのAPIを使用する際のエントリポイントは、SparkSessionです。_ SparkSessionは、builderを使用して以下のように作成します。 spark = SparkSession.builder \ .master("local") \ .appName("AppName") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() SparkSession作成し、データを処理してみます。JSONから取り込んだ場合、自動的にスキーマも推測されていますので、以下では複数行からなるJSONデータを取り込んでデータフレームを作成し、検索を行ってみます。 pythonコード import pyspark from pyspark.sql import SparkSession # SparkSessionの作成 spark = SparkSession.builder \ .master("local") \ .appName("JSON SQL") \ .getOrCreate() # DataFrameの作成 df = spark.read.json('sample.json') # JSONを読み込んだのでスキーマを確認 df.printSchema() # SparkSessionにテーブルとして登録 df.registerTempTable('people') # Spark SQLによる検索 selected = spark.sql('SELECT * FROM people WHERE name==\"Alice\"') selected.show() さまざまなDBへの接続 さまざまなデータベースに接続する場合、connectorをconfigに与えることで接続できます。たとえば、さまざまなデータベースの接続が、Spark Packageとして提供されています。 RDBMS(MySQL) まずはRDBMSであるMySQLに接続してみましょう。MySQL用のJDBCドライバのJARファイルを用意しておく必要があります。 以下から、JARファイルをダウンロードして、適当な位置にJARファイルを設置します。 https://jar-download.com/artifacts/mysql/mysql-connector-java こちらのJARファイルをSparkSessionで適用する必要があります。 from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars", "...jars\\mysql-connector-java-8.0.27.jar") \ .master("local").appName("PySpark_MySQL_test").getOrCreate() df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_schema") \ .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "shema_name") \ .option("user", "xxxx").option("password", "xxxx").load() df.show() MongoDB(NoSQL) NoSQLであるMongoDBについても同様にアクセスできます。MongoDBのコネクタはSpark Packageで提供されていますので以下のようにアクセスできます。 MongoDBは以下のようにSparkSessionを作成できます。 from pyspark.sql import SparkSession spark = SparkSession.builder.appName("myApp") \ .config("spark.mongodb.input.uri", "mongodb://localhost:27017/db_name.collection_name") \ .config("spark.mongodb.output.uri", "mongodb://localhost:27017/db_name.collection_name") \ .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \ .getOrCreate() df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load() df.show()
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonで文字列を日付に変換

文字列から日付(datetime)に変換 from datetime import datetime str = '20210808134937' date = datetime.strptime(str, '%Y%m%d%H%M%S') print(date) # 2021-08-08 13:49:37 print(type(date)) # <class 'datetime.datetime'> 文字列から日付(date)に変換 import datetime str = '20210808134937' date = datetime.datetime.strptime(str, '%Y%m%d%H%M%S') tdate = datetime.date(date.year, date.month, date.day) print(tdate) # 2021-08-08 print(type(tdate)) # <class 'datetime.date'> 日付から文字列に変換 import datetime tdatetime = datetime.now() tstr = tdatetime.strftime('%Y/%m/%d %H:%M:%S') print(tstr) # 2021/08/11 17:18:51 print(type(tstr)) # <class 'str'> 日付フォーマット 書式 説明 %Y 年 西暦4桁(例、2021) %y 年 西暦2桁(例、21) %m 月2桁(例、12) %d 日2桁(例、31) %H 時 24時間表記(例、14) %I 時 12時間表記(例、02) %p AM、PM %M 分2桁 %S 秒2桁 %f マイクロ秒6桁
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

文章からの気象解析〜WordCloudで遊ぶ〜

0.はじめに テレビの天気予報では現在の気象状況、明日や明後日の気象の予測とそれに伴う現象に対する注意事項を、気象キャスターが要領よく簡潔に説明してくれます。 気象庁からも気象予報の解説、過去の気象図に関する概説といったように文章で気象状況を簡潔にまとめた資料が定期的に発表されています。 このような文章にはどのような特徴があるのでしょうか? 今回は気象予報解析に関する文章の傾向をWordCloudというアプリケーションを用いて可視化してみます。 可視化すると、このような絵が得られます。 頻出する単語が大きな文字で表示されます。 これによってある期間の気象状況の傾向を可視化することができるのではないでしょうか。 環境 Mac mini(2018) CPU:3.2GHz 6core Intel Core i7 MS :32GB 2667MHz DDR4 OS :macOS Big Sur 11.6 anaconda3-5.3.1 Python 3.7.2 matplotlib 3.1.0 wordcloud 1.8.1 1. 題材とする気象庁発表資料 1.1 気象実況に関わる気象庁発表資料 次の2種類の気象庁発表資料を用います。 (1) 日々の天気図  気象庁が過去3か月から以前の天気図を月ごとにまとめた資料です。 PDFで公開されており、日ごとの天気図と簡単な概況解説があります。 以下はテキスト例です(2022年1月1日分)。 引き続き強い冬型の気圧配置。沖縄・奄美は寒気の影響で雨。西〜北日本も日本海側中心に雪や雨。その他は概ね晴れて初日の出日和。岩手県一関の最深積雪38cmは1月1位。 なお、最新の「日々の天気図」は下記にあります。 気象庁ホームページ 日々の天気図 (2) 短期予報解説資料  気象庁が発表する、17時および05時の天気予報(短期)の考え方、防災事項の解説です。 1.節に「実況上の着目点」として現在状況を説明している箇所がありますので、これを用います。 以下は解説例です(2022年1月10日発表分) **・北海道地方では 3cm 前後/3h の降雪の所があるが、次第に弱まる見込み。 ・日本の東の発達中の低気圧の影響で、関東地方や伊豆諸島で波が高くなっている。 ・500hPa -42℃以下の寒気を伴う寒冷渦が中国東北区にあり南東進。また寒冷渦の南西側にはトラフがのびており、-33℃以下の寒気を伴っている。 最新版は下記ホームページから確認できます。 気象庁ホームページ(短期予報解説資料) 短期予報解説資料はどちらかというと予報専門家向けの文章です。 「トラフ」、「500hPa」といった専門的な用語が用いられています。一般にはそれぞれ、「気圧の谷」「上空5500m付近」といった言葉で説明されています。 2. WordCloudでの可視化とテキストの前処理 2.1 PDFからのテキストの抜き出し pdftotextというツールを用いてPDFからテキストデータを抜き出すことができます。 抜き出したばかりのテキストには、PDFでの書式に応じて、文章の途中で改行があったりしています。 そこで多少のawkやtrといったテキスト処理を行い、改ページ記号や改行記号を削除したりしておきます。 「日々の天気図」については、異なる日のテキストが1行おきに混じってしまうなどの箇所もありました。 これは目で見て整形してやる必要がありました。 こうして得られるテキストを1ヶ月分結合したもの、1年分結合したものを入力データとします。 2.2 日本語テキストの前処理 こうして収集したテキストをそのままwordcloudに入力することもできますが、そのままだと長めの文がそのまま使われてしまいますので、頻出単語の可視化とは少し違ったものになります。 それはそれで面白い絵ができますが、語句単位での頻出度にも注目したいので、字句解析をさせて「分かち書き」の状態にしておきます。 こういった日本語文章の処理の解説は下記のサイトを参考にさせて頂きました。 uepon日々の備忘録 Twitterのツイートをwordcloudで可視化したい【後編】 ここでは日本語形態要素解析の分野で有名なjanomeを用いて日本語テキストを分かち書きします。 さらに、ある程度不要な文字・字句を排除してしまいます。 wordcloud.py import os from wordcloud import WordCloud from janome.tokenizer import Tokenizer from janome.analyzer import Analyzer from janome.tokenfilter import * fpath = "/Library/Fonts/Arial Unicode.ttf" # Mac OSのフォントファイル docdir = "somewhere_your_favorite" #複合名詞を作成するフィルタによる字句解析インスタンス a = Analyzer( token_filters=[CompoundNounFilter()]) f_in = docdir + "alltext.2.txt" # 入力テキストファイル f_tm = docdir + "tmp2.txt" # 下処理済テキストファイル f_ru = docdir + "textcld_raw.png" # 出力するPNGファイル f_ou = docdir + "textcld.png" # 出力するPNGファイル # 入力テキストファイルから文字列を読み込む contents = open( f_in , encoding="utf-8" ).read() word_list = [] # 一部字句を排除する下処理を行う。 for token in a.analyze(contents) : word = token.surface word_base = token.base_form PartOfSpeech = token.part_of_speech.split(',')[0] # 不要な助詞類、頻出する不要な文字を除去 if token.base_form in \ ["なる" , "れる" , "する" , "S" , "s" , "S" , "s"]: continue #使用する品詞を制限 if PartOfSpeech in \ ["形容詞", "動詞", "名詞", "代名詞", "副詞", "接頭詞" ]: word_list.append(word_base) #スペースにより語句を結合 wakachi = " ".join(word_list) # 分かち書きされたテキストファイルを書き出す content_tmp = open( f_tm, encoding="utf-8" , mode='w' ) print( wakachi, file=content_tmp) content_tmp.close() この結果、解説文が下記のように変換されます。 【下処理前】 引き続き強い冬型の気圧配置。沖縄・奄美は寒気の影響で雨。 西~北日本も日本海側中心に雪や雨。その他は概ね晴れて初日の出日和。 岩手県一関の最深積雪38cmは1月1位。 冬型の気圧配置続く。西日本~北日本の日本海側は北陸地方を中心に概ね雪。 山形県肘折の日降雪量52cm。 冷え込みも続き、北海道大樹は最低気温が平年より13℃低い‐28.1℃。 【下処理後】 引き続き 強い 冬型 気圧配置 沖縄 奄美 寒気 影響 雨 西~北日本 日本海側中心 雪 雨 その他 概ね 晴れる 初日の出日和 岩手県一関 最深積雪38cm 1月1位 冬型 気圧配置 続く 西日本~北日本 日本海側 北陸地方 中心 概ね 雪 山形県肘折 日降雪量52cm 冷え込み 続く 北海道大樹 最低気温 平年 13℃ 低い 28.1℃。 品詞の間にスペースが入ったほか、助詞が削除されたり、「続き」が「続く」と基本形になったりしています。 2.4 WordCloudの実行 これら2種類のテキストをWordCloudに入力します。 日本語のwordcloudを実行する際には、フォントファイルを指定する必要があります。 これをしないと、日本語は表示されません。 wordcloud.py # フォントファイルを指定してWordCloudを実行する # 字句解析前のデータ wc = \ WordCloud(background_color="white",width=900,height=500,\ font_path=fpath).generate(contents) wc.to_file( f_ru ) # 画像をファイルに書き込む # 字句解析後のデータ wc = \ WordCloud(background_color="white",width=900,height=500,\ font_path=fpath).generate(contents_tmp) wc.to_file( f_ou ) # 画像をファイルに書き込む 2.5 データの期間 次の2種類で実施しました。 ・1ヶ月分の文字列を結合したもの ・1年分の文字列を結合したもの それぞれ、下記の期間のテキストを収集しました。 「日々の天気図」2002年から2021年9月まで 「短期予報解説」2020年から2021年9月まで 3. 結果 3.1 字句解析の下処理後と下処理前 同じ時期について、「日々の天気図」を用いて結果を比較してみます。 冬の例(2021年1月分) ・下処理前 ・下処理後 下処理後は「雪」「雨」が頻出するため、はっきりとこれらが表示されるようになります。 下処理前は予報文としておなじみの表現「引き続き強い冬型の気圧配置」「北日本の広い範囲で雨や雪」などの用語が見られるほか、「など」といった単独では意味を有しづらい語句も目立ちます。 夏の例(2021年8月分) ・下処理前 ・下処理後 「下処理後」の方が端的な語句が増えています。 冬の例と比べると、「前線」「猛暑日」「雷雨」といった夏らしい用語が出てきます。  3.2 四季の変化 下処理後のデータを用いて、四季の変化を見ていきます。 「日々の天気図」の2010年分です。 1月 「雨」か「雪」かが最大の関心事のようです。2月以降にも「最高気温」の表現がよく出てきます。 この年は暖冬だったのでしょうか。 2月  引き続き「雨」か「雪」です。  ほかの年の解析結果でもそうなっていて、どんだけって感じですが。 3月  この年は暖かかったようですね。「サクラ開花」など春の訪れを思わせる用語も出てきています。 4月  サクラの話題が続きます。「前線」という言葉も見えるように。 5月 6月  「梅雨前線」が出てきます。「雷雨」「激しい雨」「前線」など雨に関する単語が多いですね。 7月  引き続き雨の言葉。梅雨末期の前線活動を思わせる「不安定」「非常」「激しい雨」「雷雨」に混じって「梅雨明け」「猛暑日」といった言葉も。 8月  「猛暑日」が多かったようです。「台風」の言葉も現れます。 9月  まさに「台風」シーズン到来!というかんじですね。「猛暑日」も見られ残暑厳しかったかな。 10月  「晴れ」と「雨」と「曇る」。周期的に天気が変わったのでしょうか。 11月  「初霜」や「初氷」、「冷え込む」といった言葉です。冬の到来です。 12月  「雪」や「初雪」とか「冬型気圧配置」が出てきました。  という感じで想像以上に季節感が感じられる可視化結果でした。 3.3 1年分の文章全体の可視化  「日々の天気図」の1年分のテキストの可視化結果です。 2003年 2009年 2020年 こうしてみると「低気圧」というワードが目立ちます。 天気、気象予報というものがいかに低気圧の動きによって説明されているかをあらためて思い知ります。 そのほか地方の名称(北日本、西日本など)が目立ちます。「北海道」というワードも多いですね。 3.4 「日々の天気図」と「短期予報解説」の比較 専門家向けの「短期予報解説」と、「日々の天気図」の違いを見てみます。 日々の天気図(2020年2月) 「雪」「雨」が重要なのです。 短期予報解説(同上) 「雪」「雨」といった直接的な現象に関する表現は少なかったのでしょう。 さて短期予報解説資料の方にはいくつか専門用語が登場しています。 「トラフ」は「気圧の谷」に相当します。 「500hPa」は高層気象を説明するための等高度面を表していて、テレビの天気予報でも時折「上空5500m付近」のような表現で登場します。 このような表現の違いは、日々の天気図は最終的な「雨」「雪」という人が感じた事象に重きをおいた解説であるのに対して、短期予報解説資料では予報の根拠となる「現在の状況」を専門的に説明しているために現れるのでしょう。 短期予報解説資料の1年分の可視化 短期予報解説資料の1年分も見てみます。 2020年1年分 「低気圧」はよほど重要なのでしょうね。また、「寒気」も頻出するようです。寒気が上空に流入することで大気の状態が不安定となり、災害を引き起こすような雨が降る場合があります。 このほか「暖湿気」といった用語が目を引きます。これもテレビで「暖かく湿った空気」と説明されるのをよく聞きますが、上空の寒気とともに、大気の状態を不安定にして大雨を降らせる要因です。 4 まとめ  気象庁が発表した文章からなる解説資料について、テキストデータ化してwordcloudを用いた可視化を行いました。季節による違いや、資料種別による違いがはっきりと見えました。  私はこれまで趣味として気象データを画像として用いての機械学習を主に行ってきましたが、テキストから面白いことを引き出す機械学習も勉強していこうと思います。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Elastic Beanstalkの環境変数を一覧で表示する

経緯 クラスメソッドさんの記事のコマンドがうまく通らなくて、jqが良く分からんから作った。 コード import boto3 from pprint import pprint import sys args = sys.args session = boto3.Session(profile_name=args[1]) eb = session.client('elasticbeanstalk') hoge = eb.describe_environments()["Environments"] ebname = [i['EnvironmentName'] for i in hoge] apname = [i['ApplicationName'] for i in hoge] for i, j in zip(ebname, apname): ebconf = eb.describe_configuration_settings( ApplicationName=j, EnvironmentName=i ) for i in ebconf['ConfigurationSettings']: print("---EnvironmentName " + i['EnvironmentName']) for f in i["OptionSettings"]: if f["Namespace"] == "aws:elasticbeanstalk:application:environment": print(f["OptionName"] + "," + f["Value"]) 実行 Python ebenvname.py $ProfileName jqの記法嫌い jsonも嫌い
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python WebフレームワークPlotly Dashで資産管理アプリを作成

Dashとは Dashは、データ可視化インターフェースを構築するためのオープンソースフレームワークです。 2017年にPythonのライブラリとしてリリースされ、RやJuliaの実装を含むまでに成長しました。 Flask、React.js、Plotly.jsの3つの技術を使って作成されています。 2020年にリリースされたStreamlitの人気がとても高くなっていますが、2018年からとても高い人気を誇ってます。 ライブラリをインストール pip install pandas pip install dash タイトルを作成 # -*- coding: utf-8 -*- import dash import dash_html_components as html app = dash.Dash(__name__) app.layout = html.Div(children=[ html.H1(children='資産管理'), ]) if __name__ == '__main__': app.run_server(debug=False, host = '127.0.0.1',port=8000) 実行動画 動画の実装結果
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonによるFFT

はじめに この記事では,Pythonを使ったフーリエ変換をまとめました.書籍を使ってフーリエ変換を学習した後に,プログラムに実装しようとするとハマるところが(個人的に)ありました.具体的には,以下の点を重点的にまとめています. サンプリング周波数 離散フーリエ変換のサンプルポイント数 配列のインデックスと周波数の関係 配列の構造 コンピュータで扱うフーリエ変換 コンピュータで数値を扱うには飛び飛びの離散値を用いる必要があります.以下の表に示すように,フーリエ変換にも離散型のものがあります.このうち,離散時間フーリエ変換はコンピュータで取り扱う際に,次の問題があります [資料1]. 変換対象の時間信号が無限長となる 周波数領域では連続値となる 上記の問題に対して利用されるのが,離散フーリエ変換です.これは,1)時間領域と周波数領域ともに有限の長さで,2)離散値なのでコンピュータで扱いやすいですね.この記事では,Scipyのfftパッケージを用いて,離散フーリエ変換を行うことにします. 時間領域 周波数領域 Python 離散時間フーリエ変換 離散,非周期(無限長) 連続,周期 Scipy.freqz 離散フーリエ変換 離散,周期 離散,周期 Scipy.fft, numpy.fft.fft 定義 時間領域の離散信号$x[n]$と周波数領域の離散信号$X[k]$は離散フーリエ変換と逆離散フーリエ変換により,次のように結ばれます [資料2]. \begin{eqnarray} \begin{cases} \displaystyle X[k] = \frac{1}{N} \sum_{n=0}^{N-1} x[n] \exp \left( -j \frac{2 \pi kn}{N} \right) \\ \displaystyle x[n] = \sum_{k=0}^{N-1} X[k] \exp \left( j \frac{2 \pi kn}{N} \right) \end{cases} \tag{1} \end{eqnarray} ここに,$N$はフーリエ変換・逆フーリエ変換を施す際のサンプル点数です.なお,参考書では$N$点フーリエ変換とか言います. 実装上の留意点 Pythonを使って実装する際の留意点をまとめます. サンプリング周波数と時間配列 サンプリング周波数$f_s=1/T_s$ Hzは,(サンプリング定理から)信号周波数$f_c$の2倍以上を設定するとよいです.このとき,時間領域の信号$x[n]$は,$1/f_s=T_s$秒間隔でサンプリングされることになりますね. Pythonで時間配列を生成するには,例えば,t=numpy.arange(start=t1,stop=t2,step=1/Ts)とすればよいですね (t1・t2:開始・終了時刻[s],step:サンプリング周期[s]). ■補足: 評価時間の中に存在するサンプル点数(=時間配列の長さ)は次のようになります. サンプル点数=\frac{評価時間}{(1/f_s)}=\frac{評価時間}{T_s} 例えば,信号周波数を$f_c=10$ Hz,サンプリング周波数は十分に大きくとった$f_s= 32 \times f_c$とします.このとき,評価時間を2秒とすれば,その中のサンプル点は,評価時間$/(1/f_s)=2 \cdot 32 \cdot 10=640$点となります. ここまでの実装例(1) 周波数$f_c=10~$Hzの正弦波信号(時刻0~2s)を,サンプリング周波数$f_s=32\cdot f_c$でサンプリングするコードを以下に示します. import numpy as np # 信号周波数 fc = 10 # サンプリング周波数 fs = 32 * fc # 時間ポイント( 長さ=2[s]/(1/fs)=2*32*10=640点 ) t = np.arange(start=0, stop=2, step=1/fs) # 信号 x = np.cos(2*np.pi*fc*t) 離散フーリエ変換を行うサンプリング点N 前節で,$評価時間/(1/f_s)$[点]でサンプルされた信号を作りましたが,このサンプル点から$N$点を選択してフーリエ変換を施します.$N$は,信号の1周期分が含まれるように設定すればよいです.なお,離散フーリエ変換の結果$X[k]$が配列長に一致します. 例えば,信号周波数$f_c=10$ Hz,サンプリング周波数$f_s= 32 \times f_c$とします.この時,1周期分$1/f_c=1/10=0.1 $秒間に存在するサンプルは,$0.1/f_s=32$点です.したがって,FFTを行うサンプリング点は$N > 32$とすればよいことになります. 周波数と配列のインデックスの関係 Scipyのfft()結果は配列として返却されますが,配列のインデックスと周波数の関係を述べます.周波数間隔$df$は,サンプリング周波数$f_s$とFFTサンプル点数$N$の比により与えられます. df = \frac{f_s}{N} ~~\therefore~ k番目 \leftrightarrow k df = k \frac{f_s}{N} このため,下図に示すような配列インデックスと周波数の関係があることが分かりますね. ここまでの実装例(2) # 実装例(1)の途中より追記 from scipy.fftpack import fft # FFT実行(N点) X = fft(x, N) # 周波数軸の生成 # 1[周波数index]あたりの周波数間隔df=fs/N df = fs/N # [周波数index]生成 sampleIndex = np.arange(start=0, stop=N) # [周波数index] --> 周波数間隔を反映 f = sampleIndex*df 離散フーリエ変換の結果(配列の構造) ScipyのFFT結果は,配列として返却されますが,どのような構造になっているのでしょうか.フーリエ変換結果$X[k]$のインデックス$k$は次のようになっています.(なお,以下の表は,$N$が偶数の時のもので,奇数の時は$N/2$を$(N+1)/2$と置き換えてください).   配列のインデックス$k$   中身 $0$ 直流(0 Hz) $1 ~ (N/2-1)$ 正の周波数成分 $N/2$ ナイキスト周波数(=サンプリング周波数/2) $(N/2+1) ~ (N-1)$ 負の周波数成分 ところで,Scipyには,上図に示したFFT結果を並び替えるfftshift()関数が用意されていて,下図のような並び替えを行ってくれます.その際,変換結果の配列の中央に位置するナイキスト周波数(=サンプリング周波数/2)成分は,負の周波数とまとめて移動される点にご注意ください. ここまでの実装例(3) ffshiftを考慮したFFTを行うコードを以下に示します. # 実装例(3)より追記 from scipy.fftpack import fftshift # FFTシフト検証( X=fft(x,N) ) Shifted_X = fftshift(X) # FFTシフト後の[周波数index]生成(//は整数割り) Shifted_sampleIndex = np.arange(-N//2, N//2) # FFTシフト後の[周波数index] --> 周波数間隔dfへ Shifted_f = Shifted_sampleIndex*df 振幅スペクトル fftの結果$X[k]$は,一般に複素数となります.この時,振幅スペクトルは実部$X_{re}$と虚部$X_{im}$を用いて次のように与えられます. |X[k]| = \sqrt{ X_{re}^2 + X_{im}^2 } 振幅スペクトルをプロットする際は例えばnumpy.abs()関数を使います.また,Scipyのfft()結果$X[k]$は,FFTポイント数$N$倍となっているため,X/Nとする必要があります. まとめ 以上の留意点を考慮して,時刻0~2sの正弦波信号(周波数$10$ Hz,サンプリング周波数$f_s=32f_c$)を離散フーリエ変換してみます.以下にソースコードと実行結果を示します.実行結果より,$\pm10$ Hzにスペクトルが立っていますね.これは,$\cos()$のフーリエ変換が \begin{eqnarray} \begin{cases} \displaystyle x(t)=\cos(2 \pi f_c t)=\frac{1}{2} (\exp(j2\pi f_c t) + \exp(-j2\pi f_c t) ) \\ \displaystyle X(f) = \frac{1}{2} ( \delta(f-f_c)+\delta(f+f_c) ) \end{cases} \end{eqnarray} となるので,$\pm f_c$ Hzに(信号振幅の1/2の)インパルス$\delta(t)$が立つ(今回は$\pm 10$ Hzに大きさ0.5のスペクトル)ことに対応しています. import numpy as np import matplotlib.pyplot as plt from scipy.fftpack import fft, fftshift # ------------------------------------------------------ # 信号周波数 fc = 10 # サンプリング周波数 fs = 32 * fc # 時間ポイント( 長さ=2[s]/(1/fs)=2*32*10=640点 ) t = np.arange(start=0, stop=2, step=1/fs) # 信号 x = np.cos(2*np.pi*fc*t) # ------------------------------------------------------ # FFTサンプル点数(>1周期の時間ポイント数) N = 256 # FFT実行(N点) X = fft(x, N) # 周波数軸の生成 # 1[周波数index]あたりの周波数間隔df=fs/N df = fs/N # ------------------------------------------------------ # FFTシフト Shifted_X = fftshift(X) # FFTシフト後の[周波数index]生成(//は整数割り) Shifted_sampleIndex = np.arange(-N//2, N//2) # FFTシフト後の[周波数index] --> 周波数間隔dfへ Shifted_f = Shifted_sampleIndex*df # ------------------------------------------------------ # プロット(ax[0]:時間波形,ax[1]:周波数波形) fig, ax = plt.subplots(2) ax[0].plot(t, x, ".-") ax[0].set_xlabel("time"), ax[0].set_ylabel("amplitude") ax[1].stem(Shifted_f, np.abs(Shifted_X)/N, use_line_collection=True) ax[1].set_xlabel("frequency"), ax[1].set_ylabel("amplitude") plt.tight_layout() plt.show() 資料 [資料1] 川又ほか,Python対応 ディジタル信号処理,森北出版,2021年. [資料2] M. Viswanathan et al., Digital Modulations using Python, Independently published, 2019.
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

motpyやーる(Python3.6、Windows10)

はじめに motpyやっていくー 開発環境 Windows 10 PC Python 3.6 実装 1.motpyをクローン 2.ライブラリをインストール pip install motpy pip install -r requirements_dev.txt 3.実行 python examples/detect_and_track_in_video.py --video_path=./assets/video.mp4 --detect_labels=['car','truck'] --tracker_min_iou=0.15 --device=cuda --track_text_verbose=1 motpypython examples/detect_and_track_in_video.py --video_path=./assets/video.mp4 --detect_labels=['car','truck'] --tracker_min_iou=0.15 --device=cuda --track_text_verbose=1#Python #DeepLearning #AI #機械学習 pic.twitter.com/f3QqVCBuwk— がちもとさん@メタバース熊本 (@sotongshi) January 10, 2022 追記 YOLOXと組み合わせてみた YOLOX + motpy trajectory pic.twitter.com/N8dvPpAnnS— がちもとさん@メタバース熊本 (@sotongshi) January 10, 2022 参考文献 YOLOX+motpyで始めるMultiple Object Tracking(MOT) YOLOXやーる(Python3.6、Windows10)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

日経平均株価の予想

日経平均株価を予想するためにすること  まず始めに日経平均株価をPythonで予想して、投資に役立てたいという熱意がありました。  そのためにPPDACサイクル(Problem->Plan->Data->Analysis->Conclusion)を回すことを考えてみました。 今回のPPDACサイクル  Problem:日経平均株価の動きを予想したい!  Plan:日経平均株価、出来高、空売り比率から予想する(ことができるのでは?という仮定)  Data:日経平均株価と出来高は、pandas-datareaderから取得する。空売り比率は、下記サイトからPDFファイルを集め、Pythonでcsv出力する。 JPXサイト https://www.jpx.co.jp/markets/statistics-equities/short-selling/index.html  Analysis:それぞれの相関関係を調べる。(前編では扱わないです。というか後編があるかも分からないです。)  Conclusion:未定。。。相関関係があれば良いのだけど。 Data取得 (1)pandas-datareaderを用いた日経平均株価と出来高の取得+csv出力 株価取得のソースコード  終値の値のみをcsv出力しています。 import pandas_datareader.data as web import datetime import csv year = '22' month = '01' start = datetime.datetime(2022, 1, 1) end = datetime.datetime(2022, 1, 31) tsd = web.DataReader("^N225", "yahoo", start, end) print(tsd) with open(f'./20220110_NIKKEI/csv/{year}{month}Close_price.csv', 'w') as f: writer = csv.writer(f) writer.writerow(tsd['Close']) 出来高取得のソースコード  出来高をcsv出力しています。 import pandas_datareader.data as web import datetime import csv year = '22' month = '01' start = datetime.datetime(2022, 1, 1) end = datetime.datetime(2022, 1, 31) tsd = web.DataReader("^N225", "yahoo", start, end) print(tsd) with open(f'./20220110_NIKKEI/csv/{year}{month}Volume.csv', 'w') as f: writer = csv.writer(f) writer.writerow(tsd['Volume']) (2)JPXサイトから取得したpdfファイルをcsv出力  まずはJPXサイトからpdfファイルをダウンロードします。 これにはchromeの拡張機能の"DownThemAll!"を使用しました。(ファイルを1回しかダウンロードしないため、あまりソースコード化する必要を感じませんでした。。。) https://chrome.google.com/webstore/detail/downthemall/nljkibfhlpcnanjgbnlnbjecgicbjkge?hl=en  ダウンロードしたPDFファイルのテキスト化 ダウンロードしたPDFファイルをテキストファイル化しています。 from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.converter import TextConverter from pdfminer.layout import LAParams from pdfminer.pdfpage import PDFPage import os input = './20220110_NIKKEI/pdf/' def pdf2text(yearMonth, yearMonthDay): input_path = f'./20220110_NIKKEI/pdf/{yearMonth}/{yearMonthDay}-m.pdf' output_path = f'./20220110_NIKKEI/pdf/{yearMonth}/{yearMonthDay}-m.txt' manager = PDFResourceManager() with open(output_path, "wb") as output: with open(input_path, 'rb') as input: with TextConverter(manager, output, codec='utf-8', laparams=LAParams()) as conv: interpreter = PDFPageInterpreter(manager, conv) for page in PDFPage.get_pages(input): interpreter.process_page(page) for i in range(32): if i < 10: ym = '202110' ymd = f'21100{i}' input_file = f'{input}{ym}/{ymd}-m.pdf' if(os.path.exists(input_file)): pdf2text(ym, ymd) else: ym = '202110' ymd = f'2110{i}' input_file = f'{input}{ym}/{ymd}-m.pdf' if(os.path.exists(input_file)): pdf2text(ym, ymd) else: print('FINISH!') 次にテキスト化ファイルのcsv出力します。 色々、ハマる所はありましたが、csv出力する箇所は、「価格規制ありの空売り比率」に絞りました。 import os import csv year = '21' month = '10' pathOrigin = './20220110_NIKKEI/text/' new_list = [] final_list = [] def short_ratio(nameNum): new_path = f'{pathOrigin}{year}{month}{nameNum}-m.txt' with open(new_path) as f: contents = f.readlines()[44] new_list.append(contents) #print(new_list) for i in range(32): if i < 10: num = f'0{i}' path_Under = f'{pathOrigin}{year}{month}{num}-m.txt' if(os.path.exists(path_Under)): short_ratio(num) else: num = i path_Over = f'{pathOrigin}{year}{month}{num}-m.txt' if(os.path.exists(path_Over)): short_ratio(i) #print(new_list) for list in new_list: list = list.replace('%\n', '') final_list.append(list) print(final_list) path_final = f'./20220110_NIKKEI/csv/{year}{month}short.csv' with open(path_final, 'w') as f: writer = csv.writer(f) writer.writerow(final_list) 本記事は、私の備忘録的な扱いです。。。  結構、苦労してここまで来ました(祝日1日使いました)が、分析まで至らず残念でした。。。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Django REST Framework】外部APIのデータをお気に入りに登録したい

外部APIのデータをお気に入りしていく機能を実装したいと思います。 一応実装できたかなというくらいで、もっと良い書き方があるかもしれませんが、そこは勉強中なのでご容赦ください。。。 前回書いた記事同様、Finaicial Modeling Prepを使用していきます。 【Django REST Framework】外部APIをAPIViewで取得する 1.モデルの作成 models.py from django.conf import settings class FavoStock(TimeStampModel): profile = models.ForeignKey(Profile, on_delete=models.CASCADE, related_name='favorites') symbol = models.CharField(max_length=20, blank=False) isin = models.CharField(max_length=12, blank=True) vender = models.CharField(max_length=10, blank=True) description = models.CharField(max_length=100, blank=False) def __str__(self): return f"{self.profile.user.username} likes {self.symbol}" 本来のお気に入り機能であれば、ForeignKeyFieldやManyToManyFieldで対象のモデルと接続するんでしょうが、今回はデータベースに銘柄リストを登録していないため、普通の登録・削除の機能に近いです。 2.シリアライザの作成 models.py from rest_framework import serializers from profiles.models import Profile, FavoStock class FavoStockSerializer(serializers.ModelSerializer): profile = serializers.StringRelatedField(read_only=True) class Meta: model = FavoStock exclude = ('updated_at', ) 3.ビューの作成 views.py class FavoStockListAPIView(mixins.ListModelMixin, generics.GenericAPIView): serializer_class = FavoStockSerializer permission_classes = [IsAuthenticated, ] def get_queryset(self): queryset = FavoStock.objects.all() request_uuid = self.kwargs.get('uuid') return queryset.filter(profile__uuid=request_uuid).order_by('-created_at') def get(self, request, uuid): return self.list(request) class FavoStockCreateAPIView(generics.CreateAPIView): queryset = FavoStock.objects.all() serializer_class = FavoStockSerializer permission_classes = [IsAuthenticated, IsOwnFavoStockOrReadOnly] def perform_create(self, serializer): symbol = self.request.data.get('symbol') user_profile = self.request.user.profile queryset = self.get_queryset() has_user_added = self.queryset.filter(symbol=symbol, profile=user_profile).exists() if has_user_added: raise ValidationError('already added.') serializer.save(profile=user_profile) class FavoStockDestroyAPIView(generics.DestroyAPIView): queryset = FavoStock.objects.all() serializer_class = FavoStockSerializer permission_classes = [IsAuthenticated, IsOwnFavoStockOrReadOnly] lookup_field ='symbol' def perform_destroy(self, instance): symbol = self.kwargs.get('symbol') user_profile = self.request.user.profile queryset = self.get_queryset() has_user_added = self.queryset.filter(symbol=symbol, profile=user_profile).exists() if not has_user_added: raise ValidationError('not exists.') instance.delete() ここはもう少し短くできるだろうなとモヤモヤしています。 ListViewではプロファイルのuuidをパラメータとして渡すことで、そこ人のお気に入り一覧を見ることができます。 Create、Destroyでは自分が登録したレコード以外は操作できないようにしています。 重複などを避けるためにhas_user_addedでレコードのチェックを行っています。 4.URLの設定 urls.py path('addfavorite/', FavoStockCreateAPIView.as_view(), name="add-favorite"), path('removefavorite/<str:symbol>/', FavoStockDestroyAPIView.as_view(), name='remove-favorite'), 上記をurlpatternsに追加しました。 (メモ:permissions.py) permissions.py class IsOwnFavoStockOrReadOnly(permissions.BasePermission): def has_object_permission(self, request, view, obj): if request.method in permissions.SAFE_METHODS: return True return obj.profile == request.user.profile 結果 ↓リスト ↓登録成功 ↓登録失敗(すでに登録済み) まとめ とりあえず実装できました。 FavoStockモデルはProfileアプリの中にある一方で、株式の情報取得は別のアプリで行っているなど、まだまだ理解ができていないところが多いですが、少しづつ勉強していきます。 (なんでFavoStockって中途半端な名前にしたんだろう…)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【競プロ典型90問】006の解説(python)

概要 競プロ典型90問の解説記事です。 解説の画像を見ても分からない(理解力が足りない)ことが多々あったので、後で解き直した際に確認できるようまとめました。 ※順次、全ての問題の解説記事を挙げていく予定です。 ※★6以上の問題は難易度的に後回しにしているため、投稿時期が遅くなる可能性があります。(代わりに丁寧に解説してくれる方いたらぜひお願いします) 問題006-Smallest Subsequence 問題 英小文字のみからなる、長さ N の文字列 S が与えられます。 長さが K である S の部分列のうち、辞書順で最小であるものを出力してください。 解き方 まず初めに、部分列を全列挙してから辞書順最小を得る方法が考えられますが、この考えではn文字からk文字を選んだ時の取り出し方 $nCk$ の全列挙になるため、k = 1, n 以外の時は、$O(N^2)$以上となりTLEになってしまいます。 ここで辞書順というのは、先頭の文字から順に順番が決まっていくため、先頭の文字が決まっているときにその後の文字はどうでも良いことが分かります。 (1文字が"a"の文字列Aと"b"の文字列BがあったらAの2文字目以降が全部"z"だろうが関係ない。) そのため、この問題は前側から貪欲に辞書順最小の文字を採用していけば良い問題ということが分かりました。 実装の流れとしては、 1. アルファベットの出現箇所をメモする 2. aから順番に出現箇所と残りの文字数が妥当か※を確認し、問題なければ採用する。また、i回目で採用した文字の出現箇所 < i+1回目の出現箇所にならなければいけないため、その確認に今回は二分探索を用いた。 ※例えば、"bbbbab"という文字列から3文字選択するときに、aを先頭にすることは出来ないため。 3. 2をK文字になるまで繰り返す 引用元:競プロ典型90問 Github 実装 answer.py import string import bisect # 入力の受け取り N, K = map(int, input().split()) S = input() # アルファベットのリスト alphs = list(string.ascii_lowercase) # アルファベットの位置をs_idxに格納 s_idx = {} for a in alphs: s_idx[a] = [] for i in range(len(S)): s_idx[S[i]].append(i) # 答えとなる文字列をans, 出現箇所をメモするmemoを定義 ans = "" memo = 0 while K > 0: for a in alphs: # アルファベットの存在の確認 if len(s_idx[a]) == 0: continue # 二分探索でmemoより後に出現箇所を探索 j = bisect.bisect_left(s_idx[a], memo) # jが範囲外の場合、除外 if j >= len(s_idx[a]): continue # 出現箇所と残りの文字数の確認 if K + s_idx[a][j] <= N: memo = s_idx[a][j]+1 ans += a K -= 1 break print(ans) 最後に 問題の解説を読んでも他の人のコードを見てもさっぱり分からないという方の 力に少しでもなれれば幸いです。 コードの改善点やその他、ご意見などあれば、気軽にコメントください。 また、この記事を読んで理解できた方はぜひLGTMを押していただけると嬉しいです。 (記事投稿のモチベになります。) ここまでお読みいただきありがとうございました。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Deep Learning に関する私の記事まとめ

Deep Learningに関する記事をまとめています。 目的 今後記事を掲載していく中で自分ですぐに見返せることを目的として作成します。 資格 AI実装検定A級 G検定 E検定 ラビットチャレンジレポート ラビットチャレンジ【E資格】 応用数学 ラビットチャレンジ【E資格】 機械学習 ラビットチャレンジ【E資格】 深層学習day1 ラビットチャレンジ【E資格】 深層学習day2 ラビットチャレンジ【E資格】 深層学習day3 ラビットチャレンジ【E資格】 深層学習day4
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

python:requestsでニコニコ動画APIを使う

目的・理由 ・APIを使ってみたかった。 ・本APIは登録など不要ですぐ使用可能だった。 ・久しぶりにニコニコ動画を見たら楽しかった。 参照リンク先 ・APIのURL  ※使い方などの詳細 ・requestsとcurlコマンド  ※curlコマンドをpythonのrequestsで実行 コード code import requests import json import pandas as pd #APIのエンドポイント url = "https://api.search.nicovideo.jp/api/v2/snapshot/video/contents/search" #検索結果をまとめる辞書(普段はリストでやりますがなんとなく辞書型でやってみた) results = {} #関数-------------------------------------------- def nico(year,start,end): global results #検索結果の取得可能な最大数は100件なので、日付期間で分けてデータを取得する #(fromとtoで期間を指定) filter = json.dumps( { "type": "or", "filters": [ { "type": "range", "field": "startTime", "from": f"{year}-{start}T00:00:00+09:00", "to": f"{year}-{end}T00:00:00+09:00", "include_lower": True } ] } ) #qは検索キーワード、タグを検索対象とし、動画タイトルとタグを取得する param = { "q":"検索したい単語を入力、検索のORやAND条件などは本APIサイトを参照", "targets":"tags", "_sort":"-viewCounter", "fields":"title,tags", "_limit":100, "jsonFilter":filter } #上記のfilterとparamsでAPIを使って検索実施 data = requests.get(url,params=param).json()["data"] for i in data: result= {i["title"]:i["tags"]} results.update(result) #---------------------------------------------- #実際に関数を使って、API検索開始 #日付の分け方は適当です、検索結果を見ながらお好みで調整 dates = {"01-01":"04-01","04-02":"07-01","07-01":"10-01","10-02":"12-31"} years = range(2012,2023) for y in years: for d1,d2 in dates.items(): nico(y,d1,d2) #結果件数の出力 print("件数は : " + str(len(results))) #pandasを使ってデータ保存 df = pd.DataFrame(results.values(),index=results.keys()) df.to_excel("nico_api.xlsx") 所感 ・APIというかプログラミングでのデータ収集能力はやはり凄いと実感。(早い!) ・pythonのrequestsがcurlと同等というのを知った。 ・curlコマンドを食わず嫌いしていたが、ちょっと親身に感じた。 ・filterや検索結果の取得の際、jsonデータの勉強になった。 ・なんだかんだでjsonデータ不慣れでしたのでいい勉強になりました。 ・検索結果のタグを見てたらやっぱりタグの秀逸さには笑わせていただきました。 以上。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

YOLOXやーる(Python3.6、Windows10)

はじめに YOLOXやっていくー 開発環境 Windows 10 PC Python 3.6 実装 1.YOLOXをクローンする 2.ライブラリのインストール conda activate py36 pip install -r requirements.txt pip install cython pip install "git+https://github.com/philferriere/cocoapi.git#egg=pycocotools&subdirectory=PythonAPI" 3.yolox_x.pthをダウンロード 4.tools/demo.pyの上部に下記を追記し、yoloxフォルダへのパスを追加する import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), '..')) 5.実行 python tools/demo.py image -n yolox-x -c yolox_x.pth --path assets/dog.jpg --conf 0.25 --nms 0.45 --tsize 640 --save_result --device gpu 追記 Webカメラから実行した場合、6fpsほど YOLOX webcam test#Python #OpenCV #AI #DeepLearning #機械学習 pic.twitter.com/Vmi1u7M6KZ— がちもとさん@メタバース熊本 (@sotongshi) January 10, 2022 参考文献 【物体検出】YOLOシリーズ最新版のYOLOXを試してみる 〜デモから学習まで〜
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

エンジニアじゃない人にも認識器を使ってもらいたい!!!そんな人向けに、Flask+Docker+pytorchで、認識webアプリの作り方!【簡単、webで単眼Depth推定】

なにこれ? 単眼Depth推定をWebアプリで認識できる簡単なツールです。 エンジニアじゃない人でも簡単に認識器を試してもらえるようにエイヤで作ってみました! 改造すればいろいろ拡張できるかな~と思います。 どうやってるの? dockerでサーバーを立ててます。 フロントエンドはflask、バックエンドはpytorchです。 動作環境 OS: Ubuntu PC 18 GPU: GTX 3090 ソースコード ツリー構成 こんな感じのツリーで作成していきます。 各ソースコードは下の方にすべて記載してます。 ├── app.py ├── docker │   ├── Dockerfile │   └── build.sh ├── results │   └── <ここに認識結果のデータを格納します。> ├── run_docker.sh ├── run_server.sh ├── src │   ├── configs.py │   ├── dnn │   │   └── MiDaS.py │   └── recognizer.py ├── static │   └── <ここにアップロードされた画像が格納されます> └── templates └── index.html 使い方はこんな感じになります。 ./run_docker.shでdocker起動。 ./run_server.sh [GPU No]で起動。 ブラウザで44444にアクセス。 参考にしたHP https://qiita.com/oyngtmhr/items/433974829015abac753c https://gray-code.com/blog/flask-on-docker/ ソースコードすべて(gitにアップしなくてごめんなさい。。) こだわり: いろんな認識器を差し替えられるように拡張性を持たせてます。 configの値を変えると、モデルを差し替えられるようにしています。 recognizer classで、MiDaSのモデルをロードしています。 MODELをtestにすると、ただのグレースケール化する処理が走ります。 ラジオボタンでセグメンテーションに切り替えられるようにとかしたいなぁ app.py from flask import Flask, request, redirect, url_for,flash, render_template from flask import send_from_directory from werkzeug.utils import secure_filename import os import argparse from src.configs import configs from src.recognizer import recognizer_class ###### app = Flask(__name__) app.config['UPLOAD_FOLDER'] = configs.UPLOAD_FOLDER recognizerClass = recognizer_class(configs.MODEL , configs) ###### def allwed_file(filename): # .があるかどうかのチェックと、拡張子の確認 # OKなら1、だめなら0 return '.' in filename and filename.rsplit('.', 1)[1].lower() in configs.ALLOWED_EXTENSIONS ##### @app.route('/', methods=['GET', 'POST']) def uploads_file(): # リクエストがポストかどうかの判別 if request.method == 'GET': return render_template("index.html") elif request.method == 'POST': # ファイルがなかった場合の処理 if 'file' not in request.files: flash('ファイルがありません') return redirect(request.url) # データの取り出し file = request.files['file'] # ファイル名がなかった時の処理 if file.filename == '': flash('ファイルがありません') return redirect(request.url) # ファイルのチェック if file and allwed_file(file.filename): # 危険な文字を削除(サニタイズ処理) filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) # ファイルの保存 file.save(filepath) # 認識 resultpath = recognizerClass.recognize(filepath) # アップロード後のページに転送 return render_template("index.html", filepath=filepath, resultpath=resultpath, result =True) #### @app.route('/results/<filename>') # ファイルを表示する def uploaded_file(filename): return send_from_directory(configs.RESULTS_FOLDER, filename) #### if __name__ == "__main__": if not os.path.exists("static"): os.makedirs("static") if not os.path.exists("results"): os.makedirs("results") parser = argparse.ArgumentParser() parser.add_argument('--gpu', '-g', default=0, type=int, help='GPU ID (negative value indicates CPU)') args = parser.parse_args() if args.gpu==-2: multigpuFlag=True else: os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu) app.run(host="0.0.0.0", port=5000, debug=True) templates/index.html <html> <head> <meta charset="utf-8"> <title>flask recognizer</title> </head> <body> <h1> ファイルをアップロードして認識しよう(MiDaS:DPT_Hybrid) </h1> Created by sugupoko <BR> {% if result %} 入力画像 <BR> <IMG SRC="{{filepath}} " BORDER="1"> <BR> <HR> 出力結果 <BR> 認識結果を0-1に線形で正規化、疑似カラーの処理をして表示しています。<BR> <IMG SRC="{{resultpath}} " BORDER="1"> <BR> <HR> {% endif %} ファイルを選択して送信してください<BR> <form action = "./" method = "POST" enctype = "multipart/form-data"> <input type = "file" name = "file" /> <input type = "submit"/> </form> </body> </html> src/config.py class configs(): #PATH =============== ROOT="./" MODEL = "MiDaS" # 画像のアップロード先のディレクトリ UPLOAD_FOLDER = ROOT + 'static/' # 結果が収納される場所 RESULTS_FOLDER = ROOT + "results/" # HTMLの場所(デフォルト変更するには、要調査) HTML_FOLDER = ROOT + 'templates/' # アップロードされる拡張子の制限 ALLOWED_EXTENSIONS = set(['png', 'jpg', 'gif']) src/recognizer.py import os import cv2 from src.dnn.MiDaS import MiDaS class recognizer_class: def __init__(self,mode, configs): print("init ==================================") self.mode = mode self.configs = configs if self.mode == "MiDaS": print("run MiDaS") self.model_class = MiDaS() self.func = self.model_class.run elif self.mode == "test": print("run test") self.func = eval("rgb2gray") def __del__(self): print("del class ==================================") def some_function(self): print(self.mode) def recognize(self, filepath): filename = os.path.basename(filepath) resultpath = os.path.join(self.configs.RESULTS_FOLDER, filename) img = cv2.imread(filepath) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img_out = self.func(img) cv2.imwrite(resultpath, img_out) return resultpath def rgb2gray(image): img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) return img_gray src/dnn/MiDaS.py from glob import glob from PIL import Image import numpy as np from torchvision import transforms import torch import timm import cv2 class MiDaS: def __init__(self): self.model_type = "DPT_Hybrid" self.model = torch.hub.load("intel-isl/MiDaS", self.model_type) self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") self.model.to(self.device) self.model.eval() midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms") if self.model_type == "DPT_Large" or self.model_type == "DPT_Hybrid": self.transform = midas_transforms.dpt_transform else: self.transform = midas_transforms.small_transform def __del__(self): print("del") def run(self, input_image): input_batch = self.transform(input_image).to(self.device) # move the input and model to GPU for speed if available if torch.cuda.is_available(): input_batch = input_batch.to('cuda') self.model.to('cuda') with torch.no_grad(): prediction = self.model(input_batch) prediction = torch.nn.functional.interpolate( prediction.unsqueeze(1), size=input_image.shape[:2], mode="bicubic", align_corners=False, ).squeeze() result = prediction.cpu().numpy() result = normalize_depth(result, bits=1) result = cv2.applyColorMap(result, cv2.COLORMAP_JET) del prediction, input_batch return result def normalize_depth(depth, bits): depth_min = depth.min() depth_max = depth.max() max_val = (2**(8*bits))-1 if depth_max - depth_min > np.finfo("float").eps: out = max_val * (depth - depth_min) / (depth_max - depth_min) else: out = np.zeros(depth.shape, dtype=depth.type) if bits == 1: return out.astype("uint8") elif bits == 2: return out.astype("uint16") docker/Dockerfile FROM nvidia/cuda:11.4.2-cudnn8-devel-ubuntu20.04 ARG use_cudnn=1 ENV DEBIAN_FRONTEND=noninteractive ENV HOME=/root \ DEBIAN_FRONTEND=noninteractive \ LANG=ja_JP.UTF-8 \ LC_ALL=${LANG} \ LANGUAGE=${LANG} \ TZ=Asia/Tokyo RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \ echo $TZ > /etc/timezone RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ sudo \ cmake \ git \ wget \ libatlas-base-dev \ libboost-all-dev \ libgflags-dev \ libgoogle-glog-dev \ libhdf5-serial-dev \ libleveldb-dev \ liblmdb-dev \ libprotobuf-dev \ libsnappy-dev \ protobuf-compiler \ python3-dev \ python3-pip \ python3-setuptools \ python3-tk \ python3-matplotlib \ less \ aptitude \ software-properties-common \ ssh \ unzip \ qt5-default \ qttools5-dev-tools \ libqt5widgets5 \ glew-utils \ libglew-dev \ libglm-dev \ tcsh \ aptitude \ freeglut3-dev \ libqt5opengl5-dev \ libcanberra-gtk-dev \ libcanberra-gtk-module \ libgtest-dev \ emacs \ spyder \ valgrind && \ rm -rf /var/lib/apt/lists/* # cmake RUN pip3 install numpy==1.18.2 RUN pip3 install pillow==7.1.1 ipython==7.13.0 matplotlib==1.5.1 WORKDIR /workspace # for python profiling RUN pip3 install line_profiler==3.0.2 && \ pip3 install pyprof2calltree==1.4.4 RUN pip3 install scipy==1.4.1 RUN pip3 install --upgrade pip==20.0.2 setuptools==46.1.3 RUN pip3 install tensorflow-gpu==2.7.0 RUN pip3 install keras==2.7.0 RUN pip3 install opencv-python RUN pip3 install opencv-contrib-python RUN pip install torch==1.10.0+cu113 torchvision==0.11.1+cu113 torchaudio===0.10.0+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html RUN pip install pandas RUN pip install tqdm RUN pip install albumentations RUN pip install sklearn RUN pip install seaborn RUN pip install tensorboard RUN pip install hydra-core RUN pip install tensorflow-model-optimization RUN pip install timm #=============================================================== WORKDIR /app RUN pip install Flask docker/build.sh nvidia-docker build --no-cache -t flask_dnn . run_docker.sh #!/bin/bash SCRIPT_DIR=$(cd $(dirname $0); pwd) DOCKSHARE=${SCRIPT_DIR} DOCKIMG="flask_dnn:latest" INITDIR="/home/dockshare" docker run -it --runtime=nvidia \ -p 44444:5000 \ -w ${INITDIR} --rm - \ -v ${DOCKSHARE}:${INITDIR} ${DOCKIMG} bash run_server.sh if [ $# -ne 1 ]; then echo "usage $0 [GPU_ID]" exit 1 fi GPU_ID=$1 if [ $GPU_ID != -2 ]; then echo "GPU : $GPU_ID" export CUDA_VISIBLE_DEVICES=$GPU_ID else echo "multi GPU train" fi python3 ./app.py -g $GPU_ID
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Project Euler】Problem 19: 日曜日を数える

本記事はProjectEulerの「100番以下の問題の説明は記載可能」という規定に基づいて回答のヒントが書かれていますので、自分である程度考えてみてから読まれることをお勧めします。 問題 19. 日曜日を数える 原文 Problem 19: Counting Sundays 問題の要約:20世紀の日曜日の数を数えよ 問題には親切に、2月は普通28日、うるう年は29日、ただし100年に一度は違うけど400年に一度はうるう年、等の説明があるので自力で書けと言うことだと思いますが、、ここはpythonのdatetimeを使って簡単に、、、weekday()メソッドの出力は6が日曜日のようです。 import datetime print([datetime.date(year,month,1).weekday() for month in range(1,12+1) for year in range(1901,2000+1)].count(6)) (開発環境:Google Colab)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Project Euler】Problem 18: 経路の合計の最大値 (その1)

本記事はProjectEulerの「100番以下の問題の説明は記載可能」という規定に基づいて回答のヒントが書かれていますので、自分である程度考えてみてから読まれることをお勧めします。 問題 18. 経路の合計の最大値 (その1) 原文 Problem 18: Maximum path sum 問題の要約:上から下まで数字をたどった経路の合計の最大値を求めよ 一番下の行から順に値の大きい方を足して行くことによって最後に最大値が求まります。この例題の場合の推移を表のステップ#1-3で示します。 ステップ1 ステップ2 ステップ3 3 3 3 23 7  4 7  4 20 19 20 19 2  4  6 10 13 15 10 13 15 10 13 15 8  5  9  3 8  5  9  3 8  5  9  3 8  5  9  3 プログラムにすると以下のようになります。三角形の配列triarrを作るコードは長いので下に載せました。 for y in range(len(triarr)-2,-1,-1): for x in range(y+1): triarr[y][x] += max(triarr[y+1][x],triarr[y+1][x+1]) print(f"Answer: {triarr[0][0]}") import numpy as np s = "75 "\ "95 64 "\ "17 47 82 "\ "18 35 87 10 "\ "20 04 82 47 65 "\ "19 01 23 75 03 34 "\ "88 02 77 73 07 63 67 "\ "99 65 04 28 06 16 70 92 "\ "41 41 26 56 83 40 80 70 33 "\ "41 48 72 33 47 32 37 16 94 29 "\ "53 71 44 65 25 43 91 52 97 51 14 "\ "70 11 33 28 77 73 17 78 39 68 17 57 "\ "91 71 52 38 17 14 91 43 58 50 27 29 48 "\ "63 66 04 68 89 53 67 30 73 16 69 87 40 31 "\ "04 62 98 27 23 09 70 98 73 93 38 53 60 04 23 " N = 15 nums = s.split() triarr, top = [], 0 for i in range(N): top += i triarr.append(list(map(int,nums[top:top+i+1])))
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Project Euler】Problem 17: 数字を英語で書いたときの文字数

本記事はProjectEulerの「100番以下の問題の説明は記載可能」という規定に基づいて回答のヒントが書かれていますので、自分である程度考えてみてから読まれることをお勧めします。 問題 17. 数字を英語で書いたときの文字数 原文 Problem 17: Number letter counts 問題の要約:1から1000までの数字を英語で書いたときの文字数の合計を求めよ(スペースやハイフォンは除く) 結構面倒くさいですが、地道に各桁ごとに英語に直すプログラムを作りました。 nname1 = ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine",\ "ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eithteen", "nineteen"] nname10 = ["", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy", "eighty", "ninety"] def digit19(n): return nname1[n] def digit99(n): if n < 20: return digit19(n) return nname10[n//10]+" "+nname1[n%10] def digit999(n): if n < 100: return digit99(n) if n%100 == 0: return nname1[n//100]+" hundred" return nname1[n//100]+" hundred and "+digit99(n%100) def digit9999(n): if n < 1000: return digit999(n) if n%1000 == 0: return nname1[n//1000]+" thousand" return nname1[n//1000]+" thousand and "+digit999(n%1000) これらを使って1000までの文字数(スペースは削除)の合計をだします。 print(sum([len( digit9999(i).replace(" ","")) for i in range(1,1001)])) しかしその後いろいろ調べたらpythonにnum2wordsというモジュールがあることが分かりました。Google Colabには標準で入っていないのでインストールする必要があります。 !pip install num2words from num2words import num2words print([num2words(i) for i in [14, 21, 99, 115, 299]]) ['fourteen', 'twenty-one', 'ninety-nine', 'one hundred and fifteen', 'two hundred and ninety-nine'] これを使えば1行で完成。ハイフォンもあるので正規表現モジュールのre.subを使って削除しました。 import re print(sum([len(re.sub('[ -]','', num2words(i))) for i in range(1,1001)])) (開発環境:Google Colab)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Project Euler】Problem 16: 2のべき乗の数字和

本記事はProjectEulerの「100番以下の問題の説明は記載可能」という規定に基づいて回答のヒントが書かれていますので、自分である程度考えてみてから読まれることをお勧めします。 問題 16. 2のべき乗の数字和 原文 Problem 16: Power digit sum 問題の要約:2の1000乗の数字和を求めよ この問題も桁数制限のないPythonでは簡単です。数字和は今後もよく使うのでdigitsumという関数にしておきます。 def digitsum(n): return sum(map(int,str(n))) print(f"Answer : {digitsum(2**1000)}") (開発環境:Google Colab)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

そのオープンソースプロジェクト、公開前に大事な情報コミットしてない?

通常、秘密鍵やAPI Tokenはソースコードとは分離させ、.envファイルなどに保存しておいて、Gitのコミットには含めない。しかし、実装の初期には.envを一時的にコミットに含めている事例も珍しくない。それを後で削除してコミットしても、履歴としては残っている。 この記事では、スマートコントラクト用の秘密鍵や、APIトークンなどの情報をGitのコミット履歴から検索し、リポジトリを公開する前に安全を確認できるツールを紹介する。 コミット履歴を展開する 私の作ったesightというツールを使う。 git logを使って確認するのが一般的だが、一度全て展開した方が、後の編集や確認が楽になる。 インストール pip3 install git+https://github.com/TakutoYoshikai/elemental-sight.git コミット履歴の展開 出力先ディレクトリに、全てのコミットの差分ファイルが展開される。そこからターゲットとなる文字列を検索し、それを修正すればよい。 esight /path/to/repo {BRANCH} -o {OUTPUT DIR} 展開されたディレクトリはこうなっている コミットが順番に番号のディレクトリに出力されて、差分のファイルが保存されている。 APIトークンや秘密鍵などの文字列を検索するツール このesightを使用して、gitのコミット履歴を展開した後、正規表現でランダムな文字列を検索するツールshibaを開発した。 このshibaは、正規表現によってhex文字列や、普通の文字列を検索した後に、その文字列がランダムに生成されたかどうか、文字の出現頻度の分散を計算し、閾値以上のものを表示する。 インストール git clone https://github.com/TakutoYoshikai/shiba cd shiba ./install.sh #パスを通す echo "export PATH=\$PATH:/path/to/shiba/bin" >> ~/.bash_profile 使い方 # APIトークンを探す shiba /path/to/repo {BRANCH} {TOKEN LENGTH} # ./SHIBA_RESULT にコミット履歴が展開されている # hex文字列を探す shiba /path/to/repo {BRANCH} hex {TOKEN LENGTH} #例: ローカルにcloneしたreactのリポジトリの中から32文字のhex文字列を検索する shiba ./react master hex 32 結果は以下のように表示される ./0/message.json 4hQ0v2z7fNRvvOji0jShV5SpNqYEgajJ ./1/message.json zkjhJgeY336mqEADvkvE494xxi2XyB8u ./2/env oe7tGfeBW1YJga7DieAjtrxCgNhs1c5X コミット履歴を確認した後 公開してはいけないデータを見つけた場合の対処法が、以下のメルカリの記事に修正の方法がまとめられている。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Blenderで球面上の頂点から凸包を作り色を割り当てよう

やること 球面上にランダムに頂点を作成します。 Blenderで凸包を作ろうを使って、頂点から凸包を作ります。 Blenderで隣り合う面に異なる色を割当(四色問題)を使って、凸包に色を割り当てます。 実行例 Pythonのコード 30個の頂点を球面上にランダムに作成し、凸包を作って色を割り当てます。 Scriptingワークスペースで新規作成してコピペして実行してください。 import bmesh import bpy import numpy as np from mathutils import Vector def rand_sphere(radius): while True: x = np.random.randn(3) r = np.linalg.norm(x) if r: return x / r * radius def add_vertex_on_sphere(n: int, radius: float = 1, name: str = ""): """球面上にランダムに頂点を作成し凸包にする :param n: 頂点数 :param radius: 半径 :param name: 名前 """ pts = [rand_sphere(radius) for _ in range(n)] mesh = bpy.data.meshes.new(name=name or "Sphere") mesh.from_pydata([Vector(pt) for pt in pts], [], []) obj = bpy.data.objects.new(mesh.name, mesh) bpy.context.layer_collection.collection.objects.link(obj) bpy.context.view_layer.objects.active = obj bpy.ops.object.mode_set(mode="EDIT") bpy.ops.mesh.convex_hull() bpy.ops.object.mode_set(mode="OBJECT") return obj def set_five_color(obj): """隣り合う面に異なる色を割当""" colors = [ (1, 0.1, 0.1, 1), (0.1, 0.2, 1, 1), (0.8, 0.8, 0, 1), (0, 0.8, 0.1, 1), (0.8, 0.2, 0, 1), ] bpy.context.view_layer.objects.active = obj # 編集モード bpy.ops.object.mode_set(mode="EDIT") bm = bmesh.from_edit_mesh(obj.data) # マテリアルのスロットを5つ用意 for _ in range(len(colors) - len(obj.material_slots)): bpy.ops.object.material_slot_add() # 5つのマテリアルを作成 for i, color in enumerate(colors): material = f"M{i}" mat = bpy.data.materials.get(material) or bpy.data.materials.new(name=material) mat.use_nodes = True mat.node_tree.nodes["Principled BSDF"].inputs["Base Color"].default_value = color obj.active_material_index = i obj.active_material = mat # 面ごとに隣り合う面と異なるように、色を若番から割当 n = len(bm.faces) res = [0] * n dj = [[] for _ in range(n)] # 面ごとの禁止領域リスト for edge in bm.edges: if len(edge.link_faces) == 2: i = edge.link_faces[0].index j = edge.link_faces[1].index dj[max(i, j)].append(min(i, j)) for i in range(n): res[i] = ({1, 2, 3, 4, 5} - {res[j] for j in dj[i]}).pop() # 色のマテリアルを面に設定 for face, i in zip(bm.faces, res): if i: obj.active_material_index = i - 1 bpy.ops.mesh.select_all(action="DESELECT") face.select = True bpy.ops.object.material_slot_assign() bm.free() # オブジェクトモード bpy.ops.object.mode_set(mode="OBJECT") if __name__ == "__main__": obj = add_vertex_on_sphere(30) set_five_color(obj) 以上
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

AWS上にマイニングリグの監視/自動復旧システム作ってみた(①監視/自動復旧編)

目次 1.背景 2.構成 2-1.監視ロジック 2-2.Lambda 2-2-1.Lambda関数の作成 2-2-2.プログラムソース 2-2-3.コンフィグ 2-2-4.モジュールの導入 2-2-5.外部APIキーの取得 2-2-5-1.NiceHash 2-2-5-2.LINE 2-2-5-3.SwitchBot 2-2-6.EventBridgeによるトリガー定義 2-3.RDB 2-3-1.DB作成 2-3-2.テーブル作成 2-4.電源スイッチとBOTの設置 3.実行結果 4.終わりに 5.更新履歴 1.背景  Windows10にNiceHash入れて稼働させているリグがBSODで死ぬことがあり自動復旧できるようにしたいと思った。OS側が死ぬとquickminer側に実装されているErrorHanding(プロセス再起動)でもどうしようもできないから対策したかった。 2.構成  NiceHash APIで定期的にリグステータスを取得して、ステータスに応じてOSハング有無を判断、ハングしていた場合には、swichbotで物理的にリグをリブートする構成 ※ Win10をホストOSとしてHyper-V上でNiceHashを動かすことも考えたが、仮想化するとGPU性能をフルに活用できなそうだったので、泣く泣く物理的に落としにいく構成とした... 処理の流れ  ①10分に1度、リグ監視用のLambdaがキックされる  ②LambdaからDBサーバへアクセスして監視フラグが有効な場合には、   NiceHash APIによりリグのマイニングステータスを取得する  ③マイニングステータスに変化があった場合には、DBを更新してLINE通知する  ④マイニングステータスが異常の場合には、switchbot APIで対象リグを強制リブートする 2-1.監視ロジック フローチャート 監視状態(監視フラグ/障害フラグ/異常カウンタ)をDBサーバ上で管理して フラグと取得マイニングステータスの状態に応じて実行する処理を制御する 2-2.Lambda 2-2-1.Lambda関数の作成 呼び出されるLambda関数本体を作成する 【Lambda】  関数名:「nicehash-surveillance」  ランタイム:「Python 3.6」  ※DBサーバへアクセスできるようにVPC設定も必要 2-2-2.プログラムソース Lambdaにデプロイするプログラム nicehash-surveillance nicehash-surveillance/ ├ lambda_function.py ├ db_data_deal.py ├ nicehash.py ├ rig_healthcheck.py ├ line_notify.py ├ switchbot.py ├ line_config.py ├ mysql_config.py ├ nicehash_config.py └ switchbot_config.py Lambdaメインプログラム lambda_function.py import json import requests import os import datetime import boto3 import db_data_deal import nicehash import rig_healthcheck import line_notify import switchbot #Function kicked by AWS Lambda def lambda_handler(event, context): Sqldealer = db_data_deal.sqldealer() Sqldealer.get_rig_db_info() # 監視対象Rigが少なくとも一つある場合 if 1 in Sqldealer.db_data_dict['surveillance_FLG']: # NiceHach API 情報取得 Nicehash = nicehash.private_api() rig_miner_status_dict = Nicehash.get_miner_status(Sqldealer.db_data_dict) # Rigの正常性確認 hang_up_rig_name_list = rig_healthcheck.get_hang_up_rig_name_list(rig_miner_status_dict) # Rigで既に障害が発生していた場合 if 1 in Sqldealer.db_data_dict['incident_FLG']: # Rigが正常の場合 if len(hang_up_rig_name_list) == 0: line_notify.send_restore_msg() {Sqldealer.update_incident_flg_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']} {Sqldealer.update_error_cnt_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']} print("return3 監視対象Rigの復旧を確認しました...") return 3 # Rigが復旧できなかった場合 else: # 復旧試行回数が5回以上の場合 if 5 in Sqldealer.db_data_dict['error_CNT']: line_notify.send_surveillance_stop_msg(hang_up_rig_name_list) {Sqldealer.update_surveillance_flg_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']} print("return4 監視対象Rigを復旧できませんでした、監視を停止します...") return 4 # 復旧試行回数が5回未満の場合 else: line_notify.send_reboot_retly_msg(hang_up_rig_name_list) Sqldealer.db_data_dict = rig_healthcheck.add_reboot_req_flg(Sqldealer.db_data_dict, hang_up_rig_name_list) # Rigの再起動 switchbot.hang_up_rig_reboot(Sqldealer.db_data_dict) {Sqldealer.update_error_cnt_increment(rig_name) for rig_name in hang_up_rig_name_list} print("return5 監視対象Rigの復旧を確認できませんでした、再度再起動します...") return 5 else: # Rigが正常の場合 if len(hang_up_rig_name_list) == 0: print("return1 監視対象Rigは正常です...") return 1 # Rigで新たに障害を検知した場合 else: line_notify.send_incident_msg(hang_up_rig_name_list) Sqldealer.db_data_dict = rig_healthcheck.add_reboot_req_flg(Sqldealer.db_data_dict, hang_up_rig_name_list) # Rigの再起動 switchbot.hang_up_rig_reboot(Sqldealer.db_data_dict) {Sqldealer.update_incident_flg_to_1(rig_name) for rig_name in hang_up_rig_name_list} print("return2 監視対象Rigで障害を検知、復旧を試みました...") return 2 else: print("return0 監視対象Rigがないため処理を終了します...") return 0 DBからの情報取得/DB更新処理を行うクラス db_data_deal.py ### Module ### pip install -t ./ mysql-connector-python import os import json from json import JSONEncoder import mysql.connector import boto3 import datetime import mysql_config as SQLconfig class sqldealer: def __init__(self): self.connection = mysql.connector.connect(user=SQLconfig.user, password=SQLconfig.password, host=SQLconfig.host, database=SQLconfig.database) self.db_data_dict = dict() def road_data(self,sql): with self.connection.cursor() as cur: select_sql = sql cur.execute(select_sql) row_db_data = cur.fetchall() return row_db_data def commit_data(self,sql): with self.connection.cursor() as cur: cur.execute(sql) cur.execute('commit;') def get_rig_db_info(self): sql = 'SELECT * FROM nicehash_surveillance_info;' columns = ["rig_no","rig_id","rig_name","surveillance_FLG","incident_FLG","error_CNT","switchbot_dev_id"] self.db_data_dict = {key:[] for key in columns} row_db_data = self.road_data(sql) print("DB-info01:success road_data") for j, col in enumerate(columns): for i in range(len(row_db_data)): self.db_data_dict[col].append(row_db_data[i][j]) print("DB-info02:success get_rig_db_info") def update_incident_flg_to_0(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET incident_FLG=0 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info03:success update_incident_flg_to_0") def update_incident_flg_to_1(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET incident_FLG=1 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info04:success update_incident_flg_to_1") def update_surveillance_flg_to_0(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET surveillance_FLG=0 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info05:success update_surveillance_flg_to_0") def update_surveillance_flg_to_1(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET surveillance_FLG=1 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info06:success update_surveillance_flg_to_1") def update_error_cnt_increment(self,rig_name): idx = self.db_data_dict['rig_name'].index(rig_name) error_CNT = self.db_data_dict['error_CNT'][idx] + 1 sql = 'UPDATE nicehash_surveillance_info SET error_CNT='+str(error_CNT)+' WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info07:success update_error_cnt_increment") def update_error_cnt_to_0(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET error_CNT=0 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info08:success update_error_cnt_to_0") class DateTimeEncoder(JSONEncoder): #Override the default method def default(self, obj): if isinstance(obj, (datetime.date, datetime.datetime)): return obj.isoformat() NiceHashからリグステータスを取得するためのクラス NiceHash APIのリクエスト/引数については、公式のdocsを参照。 nicehash.py from datetime import datetime from time import mktime import uuid import hmac import requests import json from hashlib import sha256 import optparse import sys import nicehash_config as NICEHASHconfig class private_api: def __init__(self, verbose=False): self.key = NICEHASHconfig.key self.secret = NICEHASHconfig.secret self.organisation_id = NICEHASHconfig.organisation_id self.host = NICEHASHconfig.host self.verbose = verbose def request(self, method, path, query, body): xtime = self.get_epoch_ms_from_now() xnonce = str(uuid.uuid4()) message = bytearray(self.key, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(str(xtime), 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(xnonce, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(self.organisation_id, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(method, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(path, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(query, 'utf-8') if body: body_json = json.dumps(body) message += bytearray('\x00', 'utf-8') message += bytearray(body_json, 'utf-8') digest = hmac.new(bytearray(self.secret, 'utf-8'), message, sha256).hexdigest() xauth = self.key + ":" + digest headers = { 'X-Time': str(xtime), 'X-Nonce': xnonce, 'X-Auth': xauth, 'Content-Type': 'application/json', 'X-Organization-Id': self.organisation_id, 'X-Request-Id': str(uuid.uuid4()) } s = requests.Session() s.headers = headers url = self.host + path if query: url += '?' + query if self.verbose: print(method, url) if body: response = s.request(method, url, data=body_json) else: response = s.request(method, url) if response.status_code == 200: print("NiceHash-info01:success request") return response.json() elif response.content: raise Exception(str(response.status_code) + ": " + response.reason + ": " + str(response.content)) else: raise Exception(str(response.status_code) + ": " + response.reason) def get_epoch_ms_from_now(self): now = datetime.now() now_ec_since_epoch = mktime(now.timetuple()) + now.microsecond / 1000000.0 return int(now_ec_since_epoch * 1000) def get_miner_status(self, rig_db_info): rig_miner_status_dict = dict() for rig_id,rig_name in zip(rig_db_info['rig_id'],rig_db_info['rig_name']): rig_info = self.request("GET", "/main/api/v2/mining/rig2/" + rig_id, "", None) rig_miner_status_dict[rig_name] = rig_info['minerStatus'] print("NiceHash-info02:success get_miner_status") return rig_miner_status_dict リグステータスからヘルスチェックする関数リスト rig_healthcheck.py def get_hang_up_rig_name_list(rig_miner_status_dict): # BENCHMARKING/MINING/STOPPEDであれば正常と判定 ACTIVE_STATUS_LIST = ["BENCHMARKING","MINING","STOPPED"] hang_up_rig_name_list = list() for rig_name in rig_miner_status_dict.keys(): if rig_miner_status_dict[rig_name] not in ACTIVE_STATUS_LIST: hang_up_rig_name_list.append(rig_name) print("HC-info01:success get_hang_up_rig_name_list") return hang_up_rig_name_list def add_reboot_req_flg(db_data_dict, hang_up_rig_name_list): db_data_dict['reboot_req_flg'] = [0]*len(db_data_dict['rig_name']) for err_rig_name in hang_up_rig_name_list: idx = db_data_dict['rig_name'].index(err_rig_name) db_data_dict['reboot_req_flg'][idx] = 1 return db_data_dict switchbot経由でリグを再起動する関数 SwitchBot APIのリクエストや引数については、公式Githubを参照。 switchbot.py import json import requests import os import datetime import boto3 import switchbot_config as sb_cnf def hang_up_rig_reboot(db_data_dict): headers = { 'Content-Type': 'application/json; charset: utf8', 'Authorization': sb_cnf.access_token } body = { "command":"turnOn", "parameter":"default", "commandType":"command" } input_action = json.dumps(body) print(input_action) for i,dev_id in enumerate(db_data_dict['switchbot_dev_id']): if db_data_dict['reboot_req_flg'][i] == 1: url = sb_cnf.api_url + "/v1.0/devices/" + dev_id + "/commands" result = requests.post(url, data=input_action, headers=headers) print(result) リグ監視状況をLINE通知する関数 line_notify.py import json import requests import line_config as LINEconfig def send_msg(msg): headers = {"Authorization": "Bearer %s" % LINEconfig.LINE_NOTIFY_ACCESS_TOKEN} url = LINEconfig.NOTIFICATION_URL payload = {'message': msg} requests.post(url, data=payload, headers=headers) def send_incident_msg(hang_up_rig_name_list): err_rigs = ','.join(hang_up_rig_name_list) msg = err_rigs+' で障害発生。'+'\n'+'対象リグを再起動します。' send_msg(msg) def send_restore_msg(): msg = '監視対象リグの復旧を確認。'+'\n'+'監視を継続します。' send_msg(msg) def send_reboot_retly_msg(hang_up_rig_name_list): err_rigs = ','.join(hang_up_rig_name_list) msg = err_rigs+' の復旧を確認できません。'+'\n'+'再度リブートします。' send_msg(msg) def send_surveillance_stop_msg(hang_up_rig_name_list): err_rigs = ','.join(hang_up_rig_name_list) msg = err_rigs+' を復旧できませんでした。'+'\n'+'リグの監視を中止します。' send_msg(msg) 2-2-3.コンフィグ 各サービス/外部APIと連携するためにコンフィグに必要な設定値を指定する 下記コンフィグの設定値詳細については、2-2-5項を参照。 line_config.py NOTIFICATION_URL = "https://notify-api.line.me/api/notify" LINE_NOTIFY_ACCESS_TOKEN = "[LINEアクセストークン]" mysql_config.py user = "[MySQLアクセスユーザ]" password = "[MySQLアクセスユーザpw]" host = "[DBサーバの静的IP]" database = "[MySQLで構築したDatabase名]" nicehash_config.py host = "https://api2.nicehash.com" organisation_id = "[NiceHash組織ID]" key = "[NiceHash APIアクセスキー]" secret = "[NiceHash APIシークレットアクセスキー]" switchbot_config.py api_url = "https://api.switch-bot.com" access_token = "[switchbotアクセストークン]" 2-2-4.モジュールの導入 Lambdaの実行に必要なパッケージを取り込む ・Lambda:nicehash-surveillanceには「mysql-connector-python」が必要なので、AWS Cloud9上でディレクトリを切って、下記コマンドを実行して環境を整備する。LambdaへのデプロイもCloud9上で行うと楽なのでおすすめ。 nicehash-surveillance ec2-user:~/environment (master) $ mkdir nicehash-surveillance ec2-user:~/environment (master) $ cd nicehash-surveillance ec2-user:~/environment/nicehash-surveillance (master) $ pip install -t ./ mysql-connector-python 2-2-5.外部APIキーの取得 外部APIの認証に必要な鍵情報/トークンを取得する 2-2-5-1.NiceHash APIキー(コンフィグに記載) ・こちら記載の手順を元にAPIキーを取得する。 リグID(DBに登録) ・NiceHashのダッシュボード or アプリから確認する。  ※ダッシュボードの場合、以下の黄色部分に記載されている。 2-2-5-2.LINE アクセストークン(コンフィグに記載) ・こちら記載の手順を元に取得する。 2-2-5-3.SwitchBot アクセストークン(コンフィグに記載) ・アクセストークンの取得方法はこちらの記事を参照。 ・外部サービスと連携するために、SwitchBotアプリ側でも「クラウドサービス」をONにしておく。 deviceId(DBに登録) ・下記curlコマンドで各botのdeviceIdを取得する。 /usr/bin/curl -X GET "https://api.switch-bot.com/v1.0/devices" -H "Authorization:[Access_token]" 2-2-6.EventBridgeによるトリガー定義 定期ジョブとしてLambdaをキックするためのトリガーを定義する ・下記トリガーを作成して、Lambda:nicehash-surveillanceにアタッチする ルール:「新規ルールの作成」 ルール名:DailyTrigger ルールタイプ:スケジュール式 固定速度ごと:10分 2-3.RDB リグ情報、監視ステータスを管理するDBを用意する 2-3-1.DB作成 節約のためRDSは使用せずに、UNIX OSのEC2インスタンスにMySQLを直接インストールしてDBを構築する ※MySQLのインストールはこの辺を参照 2-3-2.テーブル作成 nicehash_surveillance_infoテーブルをDB上に定義する、最低限以下定義があればOK。 ※以下はリグ3個で運用している場合の例 mysql> SHOW COLUMNS FROM nicehash_surveillance_info; +------------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +------------------+--------------+------+-----+---------+-------+ | rig_no | int(11) | YES | | NULL | | | rig_id | varchar(100) | YES | | NULL | | | rig_name | varchar(100) | YES | | NULL | | | surveillance_FLG | int(11) | YES | | NULL | | | incident_FLG | int(11) | YES | | NULL | | | error_CNT | int(11) | YES | | NULL | | | switchbot_dev_id | varchar(100) | YES | | NULL | | +------------------+--------------+------+-----+---------+-------+ 7 rows in set (0.00 sec) mysql> select * from nicehash_surveillance_info; +--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+ | rig_no | rig_id | rig_name | surveillance_FLG | incident_FLG | error_CNT | switchbot_dev_id | +--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+ | 1 | [myExcavator1のリグID] | myExcavator1 | 1 | 0 | 0 | [myExcavator1用BOTのdeviceId] | | 2 | [myExcavator2のリグID] | myExcavator2 | 1 | 0 | 0 | [myExcavator2用BOTのdeviceId] | | 3 | [MainPCのリグID] | MainPC | 1 | 0 | 0 | [MainPC用BOTのdeviceId] | +--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+ 3 rows in set (0.00 sec) rig_no:リグ番号 rig_id:NiceHash上のリグID rig_name:NiceHash上のリグ名称 surveillance_FLG:監視状態を制御するフラグ(1:監視有効/0:監視無効) incident_FLG:障害発生状態管理するフラグ(1:障害発生中/0:正常稼働中) error_CNT:再起動処理のエラーカウンタ switchbot_dev_id:SwitchBot上のdeviceId 2-4.電源スイッチとBOTの設置 SwitchBotでリグを再起動できるように電源スイッチにBOTを設置する でかでか電源ボタンをリグにつないで、BOTがResetボタンを押せるよう固定する。 リグが複数ある場合には、それぞれでかでか電源ボタンとBOTのセットを設置する 3. 実行結果 リグがBSODで死にっぱなしになることがなくなった...! LINEで通知されるメッセージは以下の通り。 4. 終わりに HiveOSなら問題にならないかもですが、NiceHashではBSODで悩まされされることがあるので、物理的に無理やりOS立ち上げるゴリ押し運用でどうにかできるようにしました...(笑) (それにしても、BSODの原因は何なのだろうか。) (マザボ/ライザー/電源/複数GPU間のコンパチが良くないのだろうか...) (そもそも、1万程度のマザボに品質を求めるのはおかしいのか...) 5. 更新履歴 ver. 1.0 初版投稿 2022/01/10
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

AWS上にリグの監視/自動復旧システムを作ってみた(①監視/自動復旧編)

目次 1.背景 2.構成 2-1.監視ロジック 2-2.Lambda 2-2-1.Lambda関数の作成 2-2-2.プログラムソース 2-2-3.コンフィグ 2-2-4.モジュールの導入 2-2-5.外部APIキーの取得 2-2-5-1.NiceHash 2-2-5-2.LINE 2-2-5-3.SwitchBot 2-2-6.EventBridgeによるトリガー定義 2-3.RDB 2-3-1.DB作成 2-3-2.テーブル作成 2-4.電源スイッチとBOTの設置 3.実行結果 4.終わりに 5.更新履歴 1.背景  Windows10×NiceHashのリグが謎のBSODで死ぬことが何度かあり収益が落ち込むことがあったのでどうにかしたかった。OS側が死ぬとquickminer側のErrorHanding(プロセス再起動)ではどうしようもできないので対策が必要だった。 2.構成  NiceHash APIで定期的にリグステータスを取得、ステータスに応じてOSハングを検知する。ハングしていた場合には、swichbot経由で物理的にリグをリブートとするというゴリ押し構成。 ※ Win10をホストOSとしてHyper-V上でNiceHashを動かすことも考えたが、仮想化するとGPU性能をフルに活用できなそうだったので、泣く泣く物理的に落としにいく構成に... 処理の流れ  ①10分に1度、リグ監視用のLambdaがキックされる  ②LambdaからDBサーバへアクセスして監視フラグが有効な場合には、   NiceHash APIによりリグのマイニングステータスを取得する  ③マイニングステータスに変化があった場合には、DBを更新してLINE通知する  ④マイニングステータスが異常の場合には、switchbot APIで対象リグを強制リブートする 2-1.監視ロジック 監視状態(監視フラグ/障害フラグ/異常カウンタ)をDBサーバ上で管理して フラグと取得マイニングステータスの状態に応じて実行する処理を制御する 2-2.Lambda 2-2-1.Lambda関数の作成 呼び出されるLambda関数本体を作成する 【Lambda】  関数名:「nicehash-surveillance」  ランタイム:「Python 3.6」  ※DBサーバへアクセスできるようにVPC設定も必要 2-2-2.プログラムソース Lambdaにデプロイするプログラム nicehash-surveillance nicehash-surveillance/ ├ lambda_function.py ├ db_data_deal.py ├ nicehash.py ├ rig_healthcheck.py ├ line_notify.py ├ switchbot.py ├ line_config.py ├ mysql_config.py ├ nicehash_config.py └ switchbot_config.py Lambdaメインプログラム lambda_function.py import json import requests import os import datetime import boto3 import db_data_deal import nicehash import rig_healthcheck import line_notify import switchbot #Function kicked by AWS Lambda def lambda_handler(event, context): Sqldealer = db_data_deal.sqldealer() Sqldealer.get_rig_db_info() # 監視対象Rigが少なくとも一つある場合 if 1 in Sqldealer.db_data_dict['surveillance_FLG']: # NiceHach API 情報取得 Nicehash = nicehash.private_api() rig_miner_status_dict = Nicehash.get_miner_status(Sqldealer.db_data_dict) # Rigの正常性確認 hang_up_rig_name_list = rig_healthcheck.get_hang_up_rig_name_list(rig_miner_status_dict) # Rigで既に障害が発生していた場合 if 1 in Sqldealer.db_data_dict['incident_FLG']: # Rigが正常の場合 if len(hang_up_rig_name_list) == 0: line_notify.send_restore_msg() {Sqldealer.update_incident_flg_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']} {Sqldealer.update_error_cnt_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']} print("return3 監視対象Rigの復旧を確認しました...") return 3 # Rigが復旧できなかった場合 else: # 復旧試行回数が5回以上の場合 if 5 in Sqldealer.db_data_dict['error_CNT']: line_notify.send_surveillance_stop_msg(hang_up_rig_name_list) {Sqldealer.update_surveillance_flg_to_0(rig_name) for rig_name in Sqldealer.db_data_dict['rig_name']} print("return4 監視対象Rigを復旧できませんでした、監視を停止します...") return 4 # 復旧試行回数が5回未満の場合 else: line_notify.send_reboot_retly_msg(hang_up_rig_name_list) Sqldealer.db_data_dict = rig_healthcheck.add_reboot_req_flg(Sqldealer.db_data_dict, hang_up_rig_name_list) # Rigの再起動 switchbot.hang_up_rig_reboot(Sqldealer.db_data_dict) {Sqldealer.update_error_cnt_increment(rig_name) for rig_name in hang_up_rig_name_list} print("return5 監視対象Rigの復旧を確認できませんでした、再度再起動します...") return 5 else: # Rigが正常の場合 if len(hang_up_rig_name_list) == 0: print("return1 監視対象Rigは正常です...") return 1 # Rigで新たに障害を検知した場合 else: line_notify.send_incident_msg(hang_up_rig_name_list) Sqldealer.db_data_dict = rig_healthcheck.add_reboot_req_flg(Sqldealer.db_data_dict, hang_up_rig_name_list) # Rigの再起動 switchbot.hang_up_rig_reboot(Sqldealer.db_data_dict) {Sqldealer.update_incident_flg_to_1(rig_name) for rig_name in hang_up_rig_name_list} print("return2 監視対象Rigで障害を検知、復旧を試みました...") return 2 else: print("return0 監視対象Rigがないため処理を終了します...") return 0 DBからの情報取得/DB更新処理を行うクラス db_data_deal.py ### Module ### pip install -t ./ mysql-connector-python import os import json from json import JSONEncoder import mysql.connector import boto3 import datetime import mysql_config as SQLconfig class sqldealer: def __init__(self): self.connection = mysql.connector.connect(user=SQLconfig.user, password=SQLconfig.password, host=SQLconfig.host, database=SQLconfig.database) self.db_data_dict = dict() def road_data(self,sql): with self.connection.cursor() as cur: select_sql = sql cur.execute(select_sql) row_db_data = cur.fetchall() return row_db_data def commit_data(self,sql): with self.connection.cursor() as cur: cur.execute(sql) cur.execute('commit;') def get_rig_db_info(self): sql = 'SELECT * FROM nicehash_surveillance_info;' columns = ["rig_no","rig_id","rig_name","surveillance_FLG","incident_FLG","error_CNT","switchbot_dev_id"] self.db_data_dict = {key:[] for key in columns} row_db_data = self.road_data(sql) print("DB-info01:success road_data") for j, col in enumerate(columns): for i in range(len(row_db_data)): self.db_data_dict[col].append(row_db_data[i][j]) print("DB-info02:success get_rig_db_info") def update_incident_flg_to_0(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET incident_FLG=0 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info03:success update_incident_flg_to_0") def update_incident_flg_to_1(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET incident_FLG=1 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info04:success update_incident_flg_to_1") def update_surveillance_flg_to_0(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET surveillance_FLG=0 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info05:success update_surveillance_flg_to_0") def update_surveillance_flg_to_1(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET surveillance_FLG=1 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info06:success update_surveillance_flg_to_1") def update_error_cnt_increment(self,rig_name): idx = self.db_data_dict['rig_name'].index(rig_name) error_CNT = self.db_data_dict['error_CNT'][idx] + 1 sql = 'UPDATE nicehash_surveillance_info SET error_CNT='+str(error_CNT)+' WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info07:success update_error_cnt_increment") def update_error_cnt_to_0(self,rig_name): sql = 'UPDATE nicehash_surveillance_info SET error_CNT=0 WHERE rig_name="'+rig_name+'";' self.commit_data(sql) print("DB-info08:success update_error_cnt_to_0") class DateTimeEncoder(JSONEncoder): #Override the default method def default(self, obj): if isinstance(obj, (datetime.date, datetime.datetime)): return obj.isoformat() NiceHashからリグステータスを取得するためのクラス NiceHash APIのリクエスト/引数については、公式のdocsを参照。 nicehash.py from datetime import datetime from time import mktime import uuid import hmac import requests import json from hashlib import sha256 import optparse import sys import nicehash_config as NICEHASHconfig class private_api: def __init__(self, verbose=False): self.key = NICEHASHconfig.key self.secret = NICEHASHconfig.secret self.organisation_id = NICEHASHconfig.organisation_id self.host = NICEHASHconfig.host self.verbose = verbose def request(self, method, path, query, body): xtime = self.get_epoch_ms_from_now() xnonce = str(uuid.uuid4()) message = bytearray(self.key, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(str(xtime), 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(xnonce, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(self.organisation_id, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(method, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(path, 'utf-8') message += bytearray('\x00', 'utf-8') message += bytearray(query, 'utf-8') if body: body_json = json.dumps(body) message += bytearray('\x00', 'utf-8') message += bytearray(body_json, 'utf-8') digest = hmac.new(bytearray(self.secret, 'utf-8'), message, sha256).hexdigest() xauth = self.key + ":" + digest headers = { 'X-Time': str(xtime), 'X-Nonce': xnonce, 'X-Auth': xauth, 'Content-Type': 'application/json', 'X-Organization-Id': self.organisation_id, 'X-Request-Id': str(uuid.uuid4()) } s = requests.Session() s.headers = headers url = self.host + path if query: url += '?' + query if self.verbose: print(method, url) if body: response = s.request(method, url, data=body_json) else: response = s.request(method, url) if response.status_code == 200: print("NiceHash-info01:success request") return response.json() elif response.content: raise Exception(str(response.status_code) + ": " + response.reason + ": " + str(response.content)) else: raise Exception(str(response.status_code) + ": " + response.reason) def get_epoch_ms_from_now(self): now = datetime.now() now_ec_since_epoch = mktime(now.timetuple()) + now.microsecond / 1000000.0 return int(now_ec_since_epoch * 1000) def get_miner_status(self, rig_db_info): rig_miner_status_dict = dict() for rig_id,rig_name in zip(rig_db_info['rig_id'],rig_db_info['rig_name']): rig_info = self.request("GET", "/main/api/v2/mining/rig2/" + rig_id, "", None) rig_miner_status_dict[rig_name] = rig_info['minerStatus'] print("NiceHash-info02:success get_miner_status") return rig_miner_status_dict リグステータスからヘルスチェックする関数リスト rig_healthcheck.py def get_hang_up_rig_name_list(rig_miner_status_dict): # BENCHMARKING/MINING/STOPPEDであれば正常と判定 ACTIVE_STATUS_LIST = ["BENCHMARKING","MINING","STOPPED"] hang_up_rig_name_list = list() for rig_name in rig_miner_status_dict.keys(): if rig_miner_status_dict[rig_name] not in ACTIVE_STATUS_LIST: hang_up_rig_name_list.append(rig_name) print("HC-info01:success get_hang_up_rig_name_list") return hang_up_rig_name_list def add_reboot_req_flg(db_data_dict, hang_up_rig_name_list): db_data_dict['reboot_req_flg'] = [0]*len(db_data_dict['rig_name']) for err_rig_name in hang_up_rig_name_list: idx = db_data_dict['rig_name'].index(err_rig_name) db_data_dict['reboot_req_flg'][idx] = 1 return db_data_dict switchbot経由でリグを再起動する関数 SwitchBot APIのリクエストや引数については、公式Githubを参照。 switchbot.py import json import requests import os import datetime import boto3 import switchbot_config as sb_cnf def hang_up_rig_reboot(db_data_dict): headers = { 'Content-Type': 'application/json; charset: utf8', 'Authorization': sb_cnf.access_token } body = { "command":"turnOn", "parameter":"default", "commandType":"command" } input_action = json.dumps(body) print(input_action) for i,dev_id in enumerate(db_data_dict['switchbot_dev_id']): if db_data_dict['reboot_req_flg'][i] == 1: url = sb_cnf.api_url + "/v1.0/devices/" + dev_id + "/commands" result = requests.post(url, data=input_action, headers=headers) print(result) リグ監視状況をLINE通知する関数 line_notify.py import json import requests import line_config as LINEconfig def send_msg(msg): headers = {"Authorization": "Bearer %s" % LINEconfig.LINE_NOTIFY_ACCESS_TOKEN} url = LINEconfig.NOTIFICATION_URL payload = {'message': msg} requests.post(url, data=payload, headers=headers) def send_incident_msg(hang_up_rig_name_list): err_rigs = ','.join(hang_up_rig_name_list) msg = err_rigs+' で障害発生。'+'\n'+'対象リグを再起動します。' send_msg(msg) def send_restore_msg(): msg = '監視対象リグの復旧を確認。'+'\n'+'監視を継続します。' send_msg(msg) def send_reboot_retly_msg(hang_up_rig_name_list): err_rigs = ','.join(hang_up_rig_name_list) msg = err_rigs+' の復旧を確認できません。'+'\n'+'再度リブートします。' send_msg(msg) def send_surveillance_stop_msg(hang_up_rig_name_list): err_rigs = ','.join(hang_up_rig_name_list) msg = err_rigs+' を復旧できませんでした。'+'\n'+'リグの監視を中止します。' send_msg(msg) 2-2-3.コンフィグ 各サービス/外部APIと連携するためにコンフィグに必要な設定値を指定する 下記コンフィグの設定値詳細については、2-2-5項を参照。 line_config.py NOTIFICATION_URL = "https://notify-api.line.me/api/notify" LINE_NOTIFY_ACCESS_TOKEN = "[LINEアクセストークン]" mysql_config.py user = "[MySQLアクセスユーザ]" password = "[MySQLアクセスユーザpw]" host = "[DBサーバの静的IP]" database = "[MySQLで構築したDatabase名]" nicehash_config.py host = "https://api2.nicehash.com" organisation_id = "[NiceHash組織ID]" key = "[NiceHash APIアクセスキー]" secret = "[NiceHash APIシークレットアクセスキー]" switchbot_config.py api_url = "https://api.switch-bot.com" access_token = "[switchbotアクセストークン]" 2-2-4.モジュールの導入 Lambdaの実行に必要なパッケージを取り込む ・Lambda:nicehash-surveillanceには「mysql-connector-python」が必要なので、AWS Cloud9上でディレクトリを切って、下記コマンドを実行して環境を整備する。LambdaへのデプロイもCloud9上で行うと楽なのでおすすめ。 nicehash-surveillance ec2-user:~/environment (master) $ mkdir nicehash-surveillance ec2-user:~/environment (master) $ cd nicehash-surveillance ec2-user:~/environment/nicehash-surveillance (master) $ pip install -t ./ mysql-connector-python 2-2-5.外部APIキーの取得 外部APIの認証に必要な鍵情報/トークンを取得する 2-2-5-1.NiceHash APIキー(コンフィグに記載) ・こちら記載の手順を元にAPIキーを取得する。 リグID(DBに登録) ・NiceHashのダッシュボード or アプリから確認する。  ※ダッシュボードの場合、以下の黄色部分に記載されている。 2-2-5-2.LINE アクセストークン(コンフィグに記載) ・こちら記載の手順を元に取得する。 2-2-5-3.SwitchBot アクセストークン(コンフィグに記載) ・アクセストークンの取得方法はこちらの記事を参照。 ・外部サービスと連携するために、SwitchBotアプリ側でも「クラウドサービス」をONにしておく。 deviceId(DBに登録) ・下記curlコマンドで各botのdeviceIdを取得する。 /usr/bin/curl -X GET "https://api.switch-bot.com/v1.0/devices" -H "Authorization:[Access_token]" 2-2-6.EventBridgeによるトリガー定義 定期ジョブとしてLambdaをキックするためのトリガーを定義する ・下記トリガーを作成して、Lambda:nicehash-surveillanceにアタッチする ルール:「新規ルールの作成」 ルール名:DailyTrigger ルールタイプ:スケジュール式 固定速度ごと:10分 2-3.RDB リグ情報、監視ステータスを管理するDBを用意する 2-3-1.DB作成 節約のためRDSは使用せずに、UNIX OSのEC2インスタンスにMySQLを直接インストールしてDBを構築する ※MySQLのインストールはこの辺を参照 2-3-2.テーブル作成 nicehash_surveillance_infoテーブルをDB上に定義する、最低限以下定義があればOK。 ※以下はリグ3個で運用している場合の例 mysql> SHOW COLUMNS FROM nicehash_surveillance_info; +------------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +------------------+--------------+------+-----+---------+-------+ | rig_no | int(11) | YES | | NULL | | | rig_id | varchar(100) | YES | | NULL | | | rig_name | varchar(100) | YES | | NULL | | | surveillance_FLG | int(11) | YES | | NULL | | | incident_FLG | int(11) | YES | | NULL | | | error_CNT | int(11) | YES | | NULL | | | switchbot_dev_id | varchar(100) | YES | | NULL | | +------------------+--------------+------+-----+---------+-------+ 7 rows in set (0.00 sec) mysql> select * from nicehash_surveillance_info; +--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+ | rig_no | rig_id | rig_name | surveillance_FLG | incident_FLG | error_CNT | switchbot_dev_id | +--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+ | 1 | [myExcavator1のリグID] | myExcavator1 | 1 | 0 | 0 | [myExcavator1用BOTのdeviceId] | | 2 | [myExcavator2のリグID] | myExcavator2 | 1 | 0 | 0 | [myExcavator2用BOTのdeviceId] | | 3 | [MainPCのリグID] | MainPC | 1 | 0 | 0 | [MainPC用BOTのdeviceId] | +--------+--------------------------+--------------+------------------+--------------+-----------+-------------------------------+ 3 rows in set (0.00 sec) rig_no:リグ番号 rig_id:NiceHash上のリグID rig_name:NiceHash上のリグ名称 surveillance_FLG:監視状態を制御するフラグ(1:監視有効/0:監視無効) incident_FLG:障害発生状態管理するフラグ(1:障害発生中/0:正常稼働中) error_CNT:再起動処理のエラーカウンタ switchbot_dev_id:SwitchBot上のdeviceId 2-4.電源スイッチとBOTの設置 SwitchBotでリグを再起動できるように電源スイッチにBOTを設置する でかでか電源ボタンをリグにつないで、BOTがResetボタンを押せるよう固定する。 リグが複数ある場合には、それぞれでかでか電源ボタンとBOTのセットを設置する 3. 実行結果 リグがBSODで死にっぱなしになることがなくなった...! LINEで通知されるメッセージは以下の通り。 4. 終わりに HiveOSなら問題にならないかもですが、NiceHashではBSODで悩まされされることがあるので、物理的に無理やりOS立ち上げるゴリ押し運用でどうにかできるようにしました...(笑) (それにしても、BSODの原因は何なのだろうか。) (マザボ/ライザー/電源/複数GPU間のコンパチが良くないのだろうか...) (そもそも、1万程度のマザボに品質を求めるのはおかしいのか...) また、常時リグが監視がされている状態だとメンテなどで止める際も、障害と誤検知されてしまい不便なので、LINEから監視を制御できるようにもしてます。詳細は次回の「AWS上にリグの監視/自動復旧システムを作ってみた(②監視抑止操作編)」にまとめます。 5. 更新履歴 ver. 1.0 初版投稿 2022/01/10
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【微小領域群】「地球平面説」から、いかに逃れるか?

「この世の一般人には微積分の知識はいらない」という意見もあります。 『麻生財務相「微分積分いらない」発言まさかの拍手喝さい?支持の裏にアキバの“あの演説”』への皆さんの反応まとめ きっかけは麻生氏が昨年9月、ネットと通信制を活用した私立N高校(角川ドワンゴ学園)の政治部の特別授業(高校生のための主催者教育)に講師として参加した時の発言内容が最近、一部メディアで取り上げられたことだ。 麻生氏はこの時、日本の義務教育に触れ、「きちんとした教育はもう小学校までで十分じゃないかと。中学まで義務にする必要があるのかと」などと持論を展開。さらに「例えば、微分積分・因数分解とかやらされますけども、大人になってこのドワンゴの人だって因数分解を使った人だって居ないですよ。それが必要かね? 義務として」などと語った。 まぁ、シャーロックホームズも第一作「緋色の研究(A Study in Scarlet,1886年)」の中で「太陽が地球の周りを回っているのではなく、地球が太陽の周りを回ってるなんて知識は私には余計」と断言してますし。「地球平面説」の流行には確実にそういう考え方が潜んでいる様に思われます。 世界に広がる「地球平面説」 その背景にあるものは? どうしてこの二つの考え方を結びつけて語ろうとするかというと、実際関わり合ってくる話だからです。それでは皆さん、実際にはどうやって「地球が丸い」事を納得してらっしゃるんでしょうか? シャーロック・ホームズは天動説を知らなかったのか、という驚きからフッサールを経由してどこかに行く話 「地球は丸い」が納得出来たり、出来なかったりする理由。 以下の投稿で提言した「微小領域環(Microregion Ring)」の概念から出発します。話を単純化する為に二次元で考えると文字通り「微小領域αの輪」、すなわち円形に一直線に並んで連続分布する微小領域aの構成する環A($a \in A$)が考察の対象となる訳ですね。 【微小領域環】直交座標系や極座標系との対応。 特定の演算結果に合致する範囲を近傍(Neighbourhood)と呼びますが、ここで微小領域の特徴を把握してないと問題が生じます。円の性質上、それぞれの微小領域は中心から伸ばされた垂線と直交し、かつ連続する微小領域-α,0(α-α),αが直線を構成するとされますが、この定義自体が人間の直感に反しているからですね。 Pythonでの実装例 %matplotlib nbagg import math as m import cmath as c import numpy as np import matplotlib.pyplot as plt import matplotlib.animation as animation #単位円データ作成 c0=np.linspace(-m.pi,m.pi,61,endpoint = True) s0=[] for num in range(len(c0)): s0.append(complex(m.cos(c0[num]),m.sin(c0[num]))) s1=np.array(s0) #垂直線 Vert_st0=complex(1,-5) print(c.polar(Vert_st0)) print(abs(Vert_st0)) print(c.phase(Vert_st0)) Vert_ed0=complex(1,5) plt.style.use('default') fig = plt.figure() #関数定義 def unit_circle(n): plt.cla() #スポーク線描画 for num in range(len(s1)): plt.plot([0,s1[num].real],[0,s1[num].imag],color="gray",lw=0.5); #円周描画 plt.plot(s1.real,s1.imag,color="blue", label="Unit Cylinder") plt.ylim([-1.1,1.1]) plt.xlim([-1.1,1.1]) plt.title("Unit Circle") plt.xlabel("Real") plt.ylabel("Imaginal") ax = fig.add_subplot(111) ax.set_aspect('equal', adjustable='box') ax.legend(loc='upper right') #補助線描画 plt.axvline(0, 0, 1,color="red") plt.axhline(0, 0, 1,color="red") #移動線描画 plt.plot([0,s1[n].real],[0,s1[n].imag],color="green",lw=1) #垂直線 Vert_st=c.rect(abs(Vert_st0),c.phase(Vert_st0)+c0[n]) Vert_ed=c.rect(abs(Vert_ed0),c.phase(Vert_ed0)+c0[n]) plt.plot([Vert_st.real,Vert_ed.real],[Vert_st.imag,Vert_ed.imag],color="green",lw=1) ani = animation.FuncAnimation(fig, unit_circle, interval=50,frames=len(s1)) ani.save("orth01.gif", writer="pillow") 何しろ連続微小領域側の視界には「曲率0」と写るので近傍が直線としか観測されないのに対し、全体観測者の視界には「曲率1」と写るのでこの近傍が全く存在しない様にしか観測されないのです。フーリエ変換やラプラス変換ではさらに規模の大きな「任意の地点からの観測結果の下限が-∞で、上限が+∞である様な円環(周期)」なんて概念まで登場しますが、この辺りのイメージを矛盾を感じずハンドリングするにはどうしても解析学的教養(Mathematical Analytic Education)が必要不可欠となってくる訳です。 この考え方こそ(統計学をも含む)本来の社会科学(Social Science)の出発点ではなかったか? ここでいう微小領域環なる数学的構造=抽象概念は「個人と社会(王国と臣民、国民と国家…)」といった諸概念へも射影可能で、世の中がそんな具合になってくれば、さらに多くの陰謀論への耐性がつくんじゃないでしょうか。ふとそんな考えを思いついたので、メモがてら…
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

python ロギング dictConfigの使い方 

pythonでLOGしたことなかったのでやってみた。 ログの設定内容はsettings/config.pyにまとめて、main.py(root)からimportして使うかたちを取ることにした。 infoは頻繁にログ出力することからログローテーションをつかう。これにより、ログサイズが指定サイズ(maxByte)を超えたら自動的に新しいログファイルが作成されることになる。backupCount回まで作成したら、後ろから上書き(厳密には違うが)されていくので1ファイルがデカくなりすぎることはない。 settings/config.py import os """ 参考にした書き方 https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig """ import os """ 参考にした書き方 https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig """ """ ログ設計指針 https://qiita.com/nanasess/items/350e59b29cceb2f122b3 ◆バッチ処理の例◆ INFO 処理開始時 INFO 途中経過 INFO 処理終了時 WARN イベント発生時 ERROR 例外発生時 INFO その他、必要に応じて ◆WEBアプリケーションの例◆ INFO リクエスト開始時 - 処理概要、実行クラス名、メソッド名 INFO 途中経過 - 実行条件、処理対象オブジェクトのキーとなる値等(customer_id, order_id 等) INFO 処理終了時 - 実行結果(OK/NG 等)、リダイレクト先 WARN イベント発生時 - 画面に表示したエラーメッセージ等 ERROR 例外発生時 - 例外クラス、例外メッセージ INFO その他、必要に応じて """ """ ★ Python ログレベルについて ★ DEBUG 10 問題探求に必要な詳細な情報を出力したい場合 INFO 20 想定内の処理が行われた場合 WARNING 30 想定外の処理やそれが起こりそうな場合 ERROR 40 重大な問題により、機能を実行出来ない場合 CRITICAL 50 プログラムが実行不可となるような重大なエラーが発生した場合 """ # PATH SETTINGS_DIR = __file__ # ここのパス ROOT_DIR = '/'.join(SETTINGS_DIR.split('/')[:-2]) # rootは1つ上の階層 LOG_DIR = os.path.join(ROOT_DIR, 'logs') MAIN_LOG_DIR = os.path.join(LOG_DIR, 'main') # main.py用 NB_LOG_DIR = os.path.join(LOG_DIR, 'notebook') # jupytenotebook用 # ディレクトリ作成 if not os.path.exists(LOG_DIR): os.mkdir(LOG_DIR) if not os.path.exists(MAIN_LOG_DIR): os.mkdir(MAIN_LOG_DIR) if not os.path.exists(NB_LOG_DIR): os.mkdir(NB_LOG_DIR) LOGGING_CONFIG = { 'version': 1, # loggersで''として指定してもOK # 'root': { # 'level': 'NOTSET', # 'handlers': ['debug_console_handler', 'info_rotating_file_handler', 'error_file_handler'], # }, 'loggers': { '': { # root 'level': 'NOTSET', 'handlers': ['debug_console_handler', 'info_rotating_file_handler', 'error_file_handler'], }, 'notebook': { 'level': 'NOTSET', 'handlers': ['info_notebook', 'error_notebook'], }, }, 'handlers': { 'debug_console_handler': { 'level': 'DEBUG', 'formatter': 'info', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', }, # ログ・ファイルのサイズがmaxBytes以上になったらinfo1.2.3..nと # 新しいログファイルを作成(rotating)する # backupCount回まで作成したら、後ろから上書きされていく(厳密には違うが) 'info_rotating_file_handler': { 'level': 'INFO', 'formatter': 'info', 'class': 'logging.handlers.RotatingFileHandler', 'filename': os.path.join(MAIN_LOG_DIR, 'info.log'), 'mode': 'a', 'maxBytes': 1048576, # 1MBまで 'backupCount': 10 # 10ローテ }, 'error_file_handler': { 'level': 'ERROR', 'formatter': 'error', 'class': 'logging.FileHandler', 'filename': os.path.join(MAIN_LOG_DIR, 'error.log'), 'mode': 'a', }, 'info_notebook': { 'level': 'INFO', 'formatter': 'info', 'class': 'logging.handlers.RotatingFileHandler', 'filename': os.path.join(NB_LOG_DIR, 'info.log'), 'mode': 'a', 'maxBytes': 1048576, # 1MBまで 'backupCount': 10 # 10ローテ }, 'error_notebook': { 'level': 'ERROR', 'formatter': 'error', 'class': 'logging.FileHandler', 'filename': os.path.join(NB_LOG_DIR, 'error.log'), 'mode': 'a', }, }, 'formatters': { 'info': { 'format': '%(asctime)s-%(levelname)s-%(name)s::%(module)s|%(lineno)s:: %(message)s' }, 'error': { 'format': '%(asctime)s-%(levelname)s-%(name)s-%(process)d::%(module)s|%(lineno)s:: %(message)s' }, }, } https://blog.hiros-dot.net/wp-content/uploads/2021/01/ScreenShot_55.jpg main.py from logging import getLogger,config from settings.config import LOGGING_CONFIG config.dictConfig(LOGGING_CONFIG) logger = getLogger() for i in range(1000): if i % 2 ==0: logger.info(i) if i % 3 == 0: logger.error(i) 出力 2022-01-10 12:09:55,249-INFO-root::899213607|14:: 0 2022-01-10 12:09:55,250-ERROR-root::899213607|16:: 0 2022-01-10 12:09:55,251-INFO-root::899213607|14:: 2 2022-01-10 12:09:55,252-ERROR-root::899213607|16:: 3 2022-01-10 12:09:55,253-INFO-root::899213607|14:: 4 2022-01-10 12:09:55,253-INFO-root::899213607|14:: 6 2022-01-10 12:09:55,254-ERROR-root::899213607|16:: 6 2022-01-10 12:09:55,256-INFO-root::899213607|14:: 8 2022-01-10 12:09:55,256-ERROR-root::899213607|16:: 9 2022-01-10 12:09:55,257-INFO-root::899213607|14:: 10 2022-01-10 12:09:55,258-INFO-root::899213607|14:: 12 2022-01-10 12:09:55,258-ERROR-root::899213607|16:: 12 2022-01-10 12:09:55,260-INFO-root::899213607|14:: 14 2022-01-10 12:09:55,260-ERROR-root::899213607|16:: 15 2022-01-10 12:09:55,262-INFO-root::899213607|14:: 16 2022-01-10 12:09:55,262-INFO-root::899213607|14:: 18 2022-01-10 12:09:55,263-ERROR-root::899213607|16:: 18 2022-01-10 12:09:55,264-INFO-root::899213607|14:: 20 2022-01-10 12:09:55,265-ERROR-root::899213607|16:: 21 2022-01-10 12:09:55,266-INFO-root::899213607|14:: 22 2022-01-10 12:09:55,267-INFO-root::899213607|14:: 24 2022-01-10 12:09:55,268-ERROR-root::899213607|16:: 24 2022-01-10 12:09:55,269-INFO-root::899213607|14:: 26 2022-01-10 12:09:55,270-ERROR-root::899213607|16:: 27 2022-01-10 12:09:55,271-INFO-root::899213607|14:: 28 2022-01-10 12:09:55,272-INFO-root::899213607|14:: 30 2022-01-10 12:09:55,272-ERROR-root::899213607|16:: 30 2022-01-10 12:09:55,274-INFO-root::899213607|14:: 32 2022-01-10 12:09:55,275-ERROR-root::899213607|16:: 33 2022-01-10 12:09:55,275-INFO-root::899213607|14:: 34 2022-01-10 12:09:55,277-INFO-root::899213607|14:: 36 2022-01-10 12:09:55,278-ERROR-root::899213607|16:: 36 2022-01-10 12:09:55,279-INFO-root::899213607|14:: 38 2022-01-10 12:09:55,280-ERROR-root::899213607|16:: 39 2022-01-10 12:09:55,281-INFO-root::899213607|14:: 40 2022-01-10 12:09:55,281-INFO-root::899213607|14:: 42 2022-01-10 12:09:55,282-ERROR-root::899213607|16:: 42 2022-01-10 12:09:55,284-INFO-root::899213607|14:: 44 2022-01-10 12:09:55,285-ERROR-root::899213607|16:: 45 2022-01-10 12:09:55,286-INFO-root::899213607|14:: 46 2022-01-10 12:09:55,287-INFO-root::899213607|14:: 48 2022-01-10 12:09:55,288-ERROR-root::899213607|16:: 48 2022-01-10 12:09:55,288-INFO-root::899213607|14:: 50 2022-01-10 12:09:56,121-INFO-root::899213607|14:: 994 2022-01-10 12:09:56,122-INFO-root::899213607|14:: 996 2022-01-10 12:09:56,124-ERROR-root::899213607|16:: 996 2022-01-10 12:09:56,125-INFO-root::899213607|14:: 998 2022-01-10 12:09:56,126-ERROR-root::899213607|16:: 999
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Tkinter CookBook ~文字列置換アプリケーション~

概要 単純な文字列置換アプリケーションをTkinterで作成します。 最終的な外観や操作は以下のような感じになります。 前提 実行環境 本ページで記載しているコードはWindows10, python3.7で実行できることを確認しております。 必要なpythonモジュール デフォルトでインストールされているモジュールのみを使用します。 ソースコードについて GitHubで公開しております。 https://github.com/The-town/ReplaceString 記事内のコードは若干手を加えているため、GitHub上のコードと差異があります。 それでは、はじめましょう。 STEP1 GUIオブジェクトの設定 gui_component.py import tkinter.scrolledtext as scrolled_text import tkinter as tk class Frame(tk.Frame): def __init__(self, master=None): tk.Frame.__init__(self, master) self["width"] = 100 self["height"] = 100 self["bg"] = "white" class InputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 class RuleReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 25 self["height"] = 20 class OutputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 class Button(tk.Button): def __init__(self, master=None): tk.Button.__init__(self, master) self["height"] = 1 self["width"] = 10 class ReplaceButton(Button): def __init__(self, master=None): Button.__init__(self, master) self["text"] = "置換" class CheckButton(tk.Checkbutton): def __init__(self, master=None): tk.Checkbutton.__init__(self, master) self["bg"] = "white" self["onvalue"] = "yes" self["offvalue"] = "no" class StringTypeCheckButton(CheckButton): def __init__(self, master=None): CheckButton.__init__(self, master) self["text"] = "改行文字列\\nを使い、1行の文字列として置換する" ここでは、基本的なウィジェット(Widget)を定義しています。 各ウィジェットがGUIのどこで使用されるかについては、以下の画像で表されます。 tk.Frameやtk.ButtonなどのTkinterオブジェクトを、自分が作ったClassオブジェクトで継承しています。 これには以下のような理由があります。 Tkinterオブジェクト側の仕様を 自分のClassオブジェクトで覆う ことで、Tkinter側の仕様変更に備える 各ウィジェットに共通する背景色などを設定し統一化を図る STEP2 GUIオブジェクトの配置 gui_component.py import tkinter.scrolledtext as scrolled_text import tkinter as tk class Main: def __init__(self) -> None: root: tk.Tk = tk.Tk() root.title("文字列置換") text_frame: Frame = Frame(root) text_frame.grid(column=0, row=1) self.input_replace_text: InputReplaceText = InputReplaceText(master=text_frame) self.input_replace_text.grid(column=0, row=0) self.rule_replace_text: RuleReplaceText = RuleReplaceText(master=text_frame) self.rule_replace_text.grid(column=1, row=0) self.output_replace_text: OutputReplaceText = OutputReplaceText(master=text_frame) self.output_replace_text.grid(column=2, row=0) function_frame: Frame = Frame(root) function_frame.grid(column=0, row=0) replace_button: ReplaceButton = ReplaceButton(function_frame) replace_button.grid(column=0, row=1) self.string_type_check_button: StringTypeCheckButton = StringTypeCheckButton(function_frame) self.string_type_check_button.grid(column=0, row=0) root.mainloop() class Frame ... ここではMainクラスを追加して、各ウィジェットを配置しています。 注目したいのは、各ウィジェットのmasterです。整理すると以下の表のとおりになります。 ウィジェット master text_frame root input_replace_text text_frame rule_replace_text text_frame output_replace_text text_frame function_frame root replace_button function_frame string_type_check_button function_frame ここでmasterとなっているウィジェットは3つあります。 rootとtext_frameとfunction_frameです。 rootはTkinterを実行した際に出力されるウィンドウ全体のことを表します。 そのため、必ずrootがあらゆるウィジェットの最終的なmasterになります。 このコードでは、それ以外に2つのFrameウィジェットをmasterとして間に挟んでいます。 先程お見せした画像へFrameウィジェットの情報を加えてみます。 このように、rootと各ウィジェット(例えばButtonやText)の間に、Frameウィジェットを挟む目的は、各ウィジェットを配置しやすくするためです。 また、後で変更しやすくするためでもあります。 過去の自分の記事から引用します。 「root」つまり画面のオブジェクト上に直接「Label」オブジェクトや「Entry」オブジェクトを配置しない理由は様々でしょう。このコードでそのようにしない理由は後々の配置換えを容易にするためです。 直接「root」上にオブジェクトを乗せ、「grid()」で配置する方法はオブジェクト数が少なければ非常に有効です。 ただし、オブジェクトの数が多くなると相対的な位置関係は複雑になります。また、あるオブジェクトAをあるオブジェクトBのとなりに配置したくなった場合、オブジェクト数が多いとほとんどすべてのオブジェクトの配置を見直す必要が出て来ます。 これは面倒なので、関連するオブジェクトを「Frame」オブジェクト上にまとめてしまい、「root」の上に乗るオブジェクトは「Frame」オブジェクトのみとするのです。このようにすることで、後々オブジェクトを追加したい場合はそのオブジェクトを配置するのに適切な「Frame」オブジェクトを見つけ、その「Frame」オブジェクト上に乗っているオブジェクトのみ配置を変えればよいのです。 この考え方は私たちが普段から何気なく行っていることなのです。WindowsでもLinuxでもファイルを作成した後は、適切なフォルダへと配置するでしょう。あらゆるファイルを一つのフォルダへ放り込んでいる人は少ないのではないでしょうか。 Tkinter CookBook 1. 新しい「Frame」オブジェクトを作成 STEP3 置換関数の作成 gui_component.py class Main: ... def execute_replace_rule(self, event=None) -> None: input_text: str = self.input_replace_text.get(1.0, tk.END)[:-1] rule_replace_text: str = self.rule_replace_text.get(1.0, tk.END)[:-1] strings_before_change, strings_after_change = analyze_replace_rules(rule_replace_text) self.output_replace_text.delete(1.0, tk.END) output_text = change_string(input_text, tuple(strings_before_change), tuple(strings_after_change)) self.output_replace_text.insert(tk.END, output_text) ここではinput_replace_textに入力された文字列を、rule_replace_textに入力された規則に従って変換し、output_replace_textへ出力するという処理を行っています。 入力された置換規則を解析するanalyze_replace_rules関数、入力された文字列を置換するchange_string関数は後ほど解説します。 途中でself.output_replace_text.delete(1.0, tk.END)をしているのは、出力された文字列を消すためです。 以前に置換した文字列を削除した上で出力することで、何度も置換ボタンを押した際に同一の挙動をするようにしています。 # deleteなしの場合 1回目の置換: 置換後の文字列です。 2回目の置換: 置換後の文字列です。 置換後の文字列です。 ... # deleteありの場合 1回目の置換: 置換後の文字列です。 2回目の置換: 置換後の文字列です。 ... analyze_replace_rules replace_rule.py from typing import Tuple, List def analyze_replace_rules(rule_replace_text: str) -> Tuple[List, List]: """ 入力された置換ルールを解析して、置換前の文字列と置換後の文字列を分ける関数 Parameters ---------- rule_replace_text: str 置換ルール Returns ------- strings_before_change, strings_after_change: tuple 置換前の文字列と置換後の文字列のリスト """ split_string: str = " -> " strings_before_change: List[str] = [] strings_after_change: List[str] = [] for rule in rule_replace_text.split("\n"): if split_string in rule: strings_before_change.append(rule.split(split_string)[0]) strings_after_change.append(rule.split(split_string)[1]) return strings_before_change, strings_after_change 置換ルールは置換前の文字列と置換後の文字列、そして->という区切り文字列で構成されます。 置換ルール hogehoge -> hugahuga hogehogeという文字列をhugahugaという文字列へ置換 また、置換ルールは複数記入することが可能です。 そのため、置換前の文字列を改行文字列\nで各行に分けた後、->で置換前と置換後の文字列に分割します。 change_string replace_rule.py import re from typing import Tuple, List def analyze_replace_rules(rule_replace_text: str) -> Tuple[List, List]: ... def change_string(text: str, strings_before_change: Tuple[str], strings_after_change: Tuple[str]) -> str: """ 文章中にある、複数の指定した文字列を、複数の指定した文字列へ変換する関数 Parameters ---------- text: str 置換対象の文章 strings_before_change: Tuple[str] 置換する文字列のタプル strings_after_change: Tuple[str] 置換後の文字列のタプル Returns -------- text: str 置換後の文章 """ for i in range(len(strings_after_change)): re_obj_change_string: re.Pattern = re.compile(rf"{strings_before_change[i]}") text = re.sub(re_obj_change_string, strings_after_change[i], text) return text 正規表現に対応した置換としたいためreモジュールを使用します。 参考:re --- 正規表現操作 STEP4 置換関数とボタンの紐付け gui_component.py class Main: def __init__(self) -> None: ... replace_button: ReplaceButton = ReplaceButton(function_frame) replace_button.grid(column=0, row=1) replace_button["command"] = self.execute_replace_rule # 追加 ... replace_buttonのcommand属性へ、置換関数execute_replace_ruleを設定します。 これでボタンが押された際に置換関数が実行されます。 STEP5 実行 以下のmain.pyよりここまでのコードを実行してみます。 main.py from gui_component import Main if __name__ == '__main__': gui_main: Main = Main() フォルダ構成は以下のようになっているかと思います。 main.py gui_component.py replace_rule.py 置換ボタンを押せば文字列の置換を行います。 【追加STEP】STEP6 見た目を整える gui_component.py import tkinter.scrolledtext as scrolled_text import tkinter as tk from replaceRule import analyze_replace_rules, change_string, change_string_list class Main: def __init__(self) -> None: root.configure(background='white') ... class Frame(tk.Frame): def __init__(self, master=None): tk.Frame.__init__(self, master) self.grid(column=0, row=0) self["width"] = 100 self["height"] = 100 self["padx"] = 10 self["pady"] = 10 self["bg"] = "white" class InputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 self["font"] = ("メイリオ", 12) class RuleReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 25 self["height"] = 20 self["font"] = ("メイリオ", 12) class OutputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 self["font"] = ("メイリオ", 12) class Button(tk.Button): def __init__(self, master=None): tk.Button.__init__(self, master) self["height"] = 1 self["width"] = 10 class ReplaceButton(Button): def __init__(self, master=None): Button.__init__(self, master) self["text"] = "置換" self["font"] = ("メイリオ", 15) class CheckButton(tk.Checkbutton): def __init__(self, master=None): tk.Checkbutton.__init__(self, master) self["bg"] = "white" self["text"] = "test" self["font"] = ("メイリオ", 15) class StringTypeCheckButton(CheckButton): def __init__(self, master=None): CheckButton.__init__(self, master) self["text"] = "改行文字列\\nを使い、1行の文字列として置換する" 背景色、フォントを整えました。 ここは自由にカスタマイズしましょう。 【追加STEP】STEP7 チェックボタンを使う gui_component.py class Main: def execute_replace_rule(self, event=None) -> None: ... output_text: str = "" if self.string_type_check_button.state.get() == "one_line": output_text = change_string(input_text, tuple(strings_before_change), tuple(strings_after_change)) elif self.string_type_check_button.state.get() == "many_lines": output_text = change_string_list(tuple(input_text.split("\n")), tuple(strings_before_change), tuple(strings_after_change)) self.output_replace_text.insert(tk.END, output_text) class CheckButton(tk.Checkbutton): def __init__(self, master=None): tk.Checkbutton.__init__(self, master) self["bg"] = "white" self["text"] = "test" self["font"] = ("メイリオ", 15) self.state: tk.StringVar = tk.StringVar() self.state.set("no") self["variable"] = self.state self["onvalue"] = "yes" self["offvalue"] = "no" class StringTypeCheckButton(CheckButton): def __init__(self, master=None): CheckButton.__init__(self, master) self["text"] = "改行文字列\\nを使い、1行の文字列として置換する" self.state.set("many_lines") self["onvalue"] = "one_line" self["offvalue"] = "many_lines" replaceRule.py def change_string_list(texts: Tuple[str, ...], strings_before_change: Tuple[str, ...], strings_after_change: Tuple[str, ...]) -> str: """ リストとして受け取った文章中にある、複数の指定した文字列を、複数の指定した文字列へ変換する関数 正規表現の「^」や「$」をリストの要素ごとに実行するために作成した。 例; texts = [123, abc, 145c] ^1 -> a [a23, abc, a45c] Parameters ---------- texts: Tuple[str, ...] 置換対象の文章 strings_before_change: Tuple[str, ...] 置換する文字列のタプル strings_after_change: Tuple[str, ...] 置換後の文字列のタプル Returns -------- text: str 置換後の文章 """ tmp_texts: List[str] = list(texts) for i in range(len(strings_after_change)): re_obj_change_string: re.Pattern = re.compile(rf"{strings_before_change[i]}") tmp_texts = [re.sub(re_obj_change_string, strings_after_change[i], text) for text in tmp_texts] return "\n".join(tmp_texts) チェックボタンによって処理を切り替えられるようにします。 StringTypeCheckButtonのonvalue・offvalueは、それぞれチェックボタンにチェックが入っているときと入っていないときの状態を表す文字列です。 状態を取り出すためにstring_type_check_button.state.get()を使います。 参考:30. ttk.Checkbutton
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Tkinter CookBook ~文字列置換アプリケーション~

概要 単純な文字列置換アプリケーションをTkinterで作成します。 最終的な外観や操作は以下のような感じになります。 前提 実行環境 本ページで記載しているコードはWindows10, python3.7で実行できることを確認しております。 必要なpythonモジュール デフォルトでインストールされているモジュールのみを使用します。 ソースコードについて GitHubで公開しております。 https://github.com/The-town/ReplaceString 記事内のコードは若干手を加えているため、GitHub上のコードと差異があります。 それでは、はじめましょう。 STEP1 GUIオブジェクトの設定 gui_component.py import tkinter.scrolledtext as scrolled_text import tkinter as tk class Frame(tk.Frame): def __init__(self, master=None): tk.Frame.__init__(self, master) self["width"] = 100 self["height"] = 100 self["bg"] = "white" class InputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 class RuleReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 25 self["height"] = 20 class OutputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 class Button(tk.Button): def __init__(self, master=None): tk.Button.__init__(self, master) self["height"] = 1 self["width"] = 10 class ReplaceButton(Button): def __init__(self, master=None): Button.__init__(self, master) self["text"] = "置換" class CheckButton(tk.Checkbutton): def __init__(self, master=None): tk.Checkbutton.__init__(self, master) self["bg"] = "white" self["onvalue"] = "yes" self["offvalue"] = "no" class StringTypeCheckButton(CheckButton): def __init__(self, master=None): CheckButton.__init__(self, master) self["text"] = "改行文字列\\nを使い、1行の文字列として置換する" ここでは、基本的なウィジェット(Widget)を定義しています。 各ウィジェットがGUIのどこで使用されるかについては、以下の画像で表されます。 tk.Frameやtk.ButtonなどのTkinterオブジェクトを、自分が作ったClassオブジェクトで継承しています。 これには以下のような理由があります。 Tkinterオブジェクト側の仕様を 自分のClassオブジェクトで覆う ことで、Tkinter側の仕様変更に備える 各ウィジェットに共通する背景色などを設定し統一化を図る STEP2 GUIオブジェクトの配置 gui_component.py import tkinter.scrolledtext as scrolled_text import tkinter as tk class Main: def __init__(self) -> None: root: tk.Tk = tk.Tk() root.title("文字列置換") text_frame: Frame = Frame(root) text_frame.grid(column=0, row=1) self.input_replace_text: InputReplaceText = InputReplaceText(master=text_frame) self.input_replace_text.grid(column=0, row=0) self.rule_replace_text: RuleReplaceText = RuleReplaceText(master=text_frame) self.rule_replace_text.grid(column=1, row=0) self.output_replace_text: OutputReplaceText = OutputReplaceText(master=text_frame) self.output_replace_text.grid(column=2, row=0) function_frame: Frame = Frame(root) function_frame.grid(column=0, row=0) replace_button: ReplaceButton = ReplaceButton(function_frame) replace_button.grid(column=0, row=1) self.string_type_check_button: StringTypeCheckButton = StringTypeCheckButton(function_frame) self.string_type_check_button.grid(column=0, row=0) root.mainloop() class Frame ... ここではMainクラスを追加して、各ウィジェットを配置しています。 注目したいのは、各ウィジェットのmasterです。整理すると以下の表のとおりになります。 ウィジェット master text_frame root input_replace_text text_frame rule_replace_text text_frame output_replace_text text_frame function_frame root replace_button function_frame string_type_check_button function_frame ここでmasterとなっているウィジェットは3つあります。 rootとtext_frameとfunction_frameです。 rootはTkinterを実行した際に出力されるウィンドウ全体のことを表します。 そのため、必ずrootがあらゆるウィジェットの最終的なmasterになります。 このコードでは、それ以外に2つのFrameウィジェットをmasterとして間に挟んでいます。 先程お見せした画像へFrameウィジェットの情報を加えてみます。 このように、rootと各ウィジェット(例えばButtonやText)の間に、Frameウィジェットを挟む目的は、各ウィジェットを配置しやすくするためです。 また、後で変更しやすくするためでもあります。 過去の自分の記事から引用します。 「root」つまり画面のオブジェクト上に直接「Label」オブジェクトや「Entry」オブジェクトを配置しない理由は様々でしょう。このコードでそのようにしない理由は後々の配置換えを容易にするためです。 直接「root」上にオブジェクトを乗せ、「grid()」で配置する方法はオブジェクト数が少なければ非常に有効です。 ただし、オブジェクトの数が多くなると相対的な位置関係は複雑になります。また、あるオブジェクトAをあるオブジェクトBのとなりに配置したくなった場合、オブジェクト数が多いとほとんどすべてのオブジェクトの配置を見直す必要が出て来ます。 これは面倒なので、関連するオブジェクトを「Frame」オブジェクト上にまとめてしまい、「root」の上に乗るオブジェクトは「Frame」オブジェクトのみとするのです。このようにすることで、後々オブジェクトを追加したい場合はそのオブジェクトを配置するのに適切な「Frame」オブジェクトを見つけ、その「Frame」オブジェクト上に乗っているオブジェクトのみ配置を変えればよいのです。 この考え方は私たちが普段から何気なく行っていることなのです。WindowsでもLinuxでもファイルを作成した後は、適切なフォルダへと配置するでしょう。あらゆるファイルを一つのフォルダへ放り込んでいる人は少ないのではないでしょうか。 Tkinter CookBook 1. 新しい「Frame」オブジェクトを作成 STEP3 置換関数の作成 gui_component.py class Main: ... def execute_replace_rule(self, event=None) -> None: input_text: str = self.input_replace_text.get(1.0, tk.END)[:-1] rule_replace_text: str = self.rule_replace_text.get(1.0, tk.END)[:-1] strings_before_change, strings_after_change = analyze_replace_rules(rule_replace_text) self.output_replace_text.delete(1.0, tk.END) output_text = change_string(input_text, tuple(strings_before_change), tuple(strings_after_change)) self.output_replace_text.insert(tk.END, output_text) ここではinput_replace_textに入力された文字列を、rule_replace_textに入力された規則に従って変換し、output_replace_textへ出力するという処理を行っています。 入力された置換規則を解析するanalyze_replace_rules関数、入力された文字列を置換するchange_string関数は後ほど解説します。 途中でself.output_replace_text.delete(1.0, tk.END)をしているのは、出力された文字列を消すためです。 以前に置換した文字列を削除した上で出力することで、何度も置換ボタンを押した際に同一の挙動をするようにしています。 # deleteなしの場合 1回目の置換: 置換後の文字列です。 2回目の置換: 置換後の文字列です。 置換後の文字列です。 ... # deleteありの場合 1回目の置換: 置換後の文字列です。 2回目の置換: 置換後の文字列です。 ... analyze_replace_rules replace_rule.py from typing import Tuple, List def analyze_replace_rules(rule_replace_text: str) -> Tuple[List, List]: """ 入力された置換ルールを解析して、置換前の文字列と置換後の文字列を分ける関数 Parameters ---------- rule_replace_text: str 置換ルール Returns ------- strings_before_change, strings_after_change: tuple 置換前の文字列と置換後の文字列のリスト """ split_string: str = " -> " strings_before_change: List[str] = [] strings_after_change: List[str] = [] for rule in rule_replace_text.split("\n"): if split_string in rule: strings_before_change.append(rule.split(split_string)[0]) strings_after_change.append(rule.split(split_string)[1]) return strings_before_change, strings_after_change 置換ルールは置換前の文字列と置換後の文字列、そして->という区切り文字列で構成されます。 置換ルール hogehoge -> hugahuga hogehogeという文字列をhugahugaという文字列へ置換 また、置換ルールは複数記入することが可能です。 そのため、置換前の文字列を改行文字列\nで各行に分けた後、->で置換前と置換後の文字列に分割します。 change_string replace_rule.py import re from typing import Tuple, List def analyze_replace_rules(rule_replace_text: str) -> Tuple[List, List]: ... def change_string(text: str, strings_before_change: Tuple[str], strings_after_change: Tuple[str]) -> str: """ 文章中にある、複数の指定した文字列を、複数の指定した文字列へ変換する関数 Parameters ---------- text: str 置換対象の文章 strings_before_change: Tuple[str] 置換する文字列のタプル strings_after_change: Tuple[str] 置換後の文字列のタプル Returns -------- text: str 置換後の文章 """ for i in range(len(strings_after_change)): re_obj_change_string: re.Pattern = re.compile(rf"{strings_before_change[i]}") text = re.sub(re_obj_change_string, strings_after_change[i], text) return text 正規表現に対応した置換としたいためreモジュールを使用します。 参考:re --- 正規表現操作 STEP4 置換関数とボタンの紐付け gui_component.py class Main: def __init__(self) -> None: ... replace_button: ReplaceButton = ReplaceButton(function_frame) replace_button.grid(column=0, row=1) replace_button["command"] = self.execute_replace_rule # 追加 ... replace_buttonのcommand属性へ、置換関数execute_replace_ruleを設定します。 これでボタンが押された際に置換関数が実行されます。 STEP5 実行 以下のmain.pyよりここまでのコードを実行してみます。 main.py from gui_component import Main if __name__ == '__main__': gui_main: Main = Main() フォルダ構成は以下のようになっているかと思います。 main.py gui_component.py replace_rule.py 置換ボタンを押せば文字列の置換を行います。 【追加STEP】STEP6 見た目を整える gui_component.py import tkinter.scrolledtext as scrolled_text import tkinter as tk from replaceRule import analyze_replace_rules, change_string, change_string_list class Main: def __init__(self) -> None: root.configure(background='white') ... class Frame(tk.Frame): def __init__(self, master=None): tk.Frame.__init__(self, master) self.grid(column=0, row=0) self["width"] = 100 self["height"] = 100 self["padx"] = 10 self["pady"] = 10 self["bg"] = "white" class InputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 self["font"] = ("メイリオ", 12) class RuleReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 25 self["height"] = 20 self["font"] = ("メイリオ", 12) class OutputReplaceText(scrolled_text.ScrolledText): def __init__(self, master=None): scrolled_text.ScrolledText.__init__(self, master) self["width"] = 50 self["height"] = 20 self["font"] = ("メイリオ", 12) class Button(tk.Button): def __init__(self, master=None): tk.Button.__init__(self, master) self["height"] = 1 self["width"] = 10 class ReplaceButton(Button): def __init__(self, master=None): Button.__init__(self, master) self["text"] = "置換" self["font"] = ("メイリオ", 15) class CheckButton(tk.Checkbutton): def __init__(self, master=None): tk.Checkbutton.__init__(self, master) self["bg"] = "white" self["text"] = "test" self["font"] = ("メイリオ", 15) class StringTypeCheckButton(CheckButton): def __init__(self, master=None): CheckButton.__init__(self, master) self["text"] = "改行文字列\\nを使い、1行の文字列として置換する" 背景色、フォントを整えました。 ここは自由にカスタマイズしましょう。 【追加STEP】STEP7 チェックボタンを使う gui_component.py class Main: def execute_replace_rule(self, event=None) -> None: ... output_text: str = "" if self.string_type_check_button.state.get() == "one_line": output_text = change_string(input_text, tuple(strings_before_change), tuple(strings_after_change)) elif self.string_type_check_button.state.get() == "many_lines": output_text = change_string_list(tuple(input_text.split("\n")), tuple(strings_before_change), tuple(strings_after_change)) self.output_replace_text.insert(tk.END, output_text) class CheckButton(tk.Checkbutton): def __init__(self, master=None): tk.Checkbutton.__init__(self, master) self["bg"] = "white" self["text"] = "test" self["font"] = ("メイリオ", 15) self.state: tk.StringVar = tk.StringVar() self.state.set("no") self["variable"] = self.state self["onvalue"] = "yes" self["offvalue"] = "no" class StringTypeCheckButton(CheckButton): def __init__(self, master=None): CheckButton.__init__(self, master) self["text"] = "改行文字列\\nを使い、1行の文字列として置換する" self.state.set("many_lines") self["onvalue"] = "one_line" self["offvalue"] = "many_lines" replaceRule.py def change_string_list(texts: Tuple[str, ...], strings_before_change: Tuple[str, ...], strings_after_change: Tuple[str, ...]) -> str: """ リストとして受け取った文章中にある、複数の指定した文字列を、複数の指定した文字列へ変換する関数 正規表現の「^」や「$」をリストの要素ごとに実行するために作成した。 例; texts = [123, abc, 145c] ^1 -> a [a23, abc, a45c] Parameters ---------- texts: Tuple[str, ...] 置換対象の文章 strings_before_change: Tuple[str, ...] 置換する文字列のタプル strings_after_change: Tuple[str, ...] 置換後の文字列のタプル Returns -------- text: str 置換後の文章 """ tmp_texts: List[str] = list(texts) for i in range(len(strings_after_change)): re_obj_change_string: re.Pattern = re.compile(rf"{strings_before_change[i]}") tmp_texts = [re.sub(re_obj_change_string, strings_after_change[i], text) for text in tmp_texts] return "\n".join(tmp_texts) チェックボタンによって処理を切り替えられるようにします。 StringTypeCheckButtonのonvalue・offvalueは、それぞれチェックボタンにチェックが入っているときと入っていないときの状態を表す文字列です。 状態を取り出すためにstring_type_check_button.state.get()を使います。 参考:30. ttk.Checkbutton
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む