20211130のPythonに関する記事は30件です。

キャリブレーションパターンを Python で作って見よう

TL;DR はじめに 本記事はOpenCV Advent Calendar 2021 の6日目の記事です。 5日目の記事はdandelion1124先生による最新のOpenCVで導入されたプラグイン制度の紹介です。是非お読み下さい。 その他の記事は目次をご覧ください。 キャリブレーションパターンというのは以下の写真のような、既知のサイズの円や正方形が描かれてるものを指します このパターンを画像で撮影することで空間中のカメラの位置姿勢、レンズの画角などの特性を計算から求めることができます。 パターンの特徴としては簡単なものながら、こいつをきっちり作らないとキャリブレーションがちゃんと収束しないため、丁寧に作る必要があります 4年前に公開したQiita記事では、パターンの大きさを測定する誤差をなくすために「パワーポイント」を使って、指定した長さ/大きさピッタリに印刷できるスクリプトを自作/公開/紹介しました そのときはVisual Studio + CMakeというヘンテコな組み合わせでパワーポイントを作ったのですが、今回はPythonスクリプトの紹介です 使い方 $ ls generator.py $ python3 generator.py $ ls calibration_pattern.pptx generator.py 工夫した点 基本的には4年前に公開したスクリプトと同じで、以下の3種類のパターンを生成できます チェッカーボード 円形パターン(格子) 円形パターン(互い違いに配置) 内部的にはpython-pptxというライブラリを使用しており、これでパワーポイントが作れます。 前回みたいなxmlのdirtyハックは不要です。 Pythonは個人的には嫌いで、できれば避けて通りたかったのですが業務でpython-pptxに触れざるを得ず、結果4年前の宿題を終わらせることに。 ベクタ形式で作れるので、変なピクセルのアンチエイリアスに悩む必要はありません プリンタで出力される際に変なサイズに縮小/拡大される問題も、「等倍で印刷」すれば1mm以下の誤差で印刷ができます。 印刷したチェッカーボードの位置やサイズを定規で測る必要もありません 工夫しなかった点 どうやらPythonには引数を受け取ったり、スクリプトを実行バイナリに落とし込む方法とかもあるみたいですが、そんな高等なことは一切しておりません。 サイズや個数を変える場合はハードコードされている定数やパラメータを直接書き換えて下さい。 Pull Request、いつでも受け付けております 4年前のツールではヘッダファイルを生成することで、キャリブレーションパターンに関する情報をスムーズにC++のコードに読み込めるようにしましたが、今回はPython版なので、このあたりは割愛しました。 Pull Request、まじでいつでも受け付けております スクリーンショット チェッカーボード 円形パターン(格子) 円形パターン(互い違いに配置) その他 本記事は以下のバージョンで確認しました Python 3.6.9 python-pptx 0.6.21 Ubuntu 18.04 (x86_64) ベクタ形式で作るだけならば、OpenCV本体内にsvgファイルを生成するPythonスクリプトが同梱されており、ドキュメントも公開されている 本記事が気に入った方は、記事のLGTMの他、Githubのスター評価をしていただけると幸いです。 キャリブレーションはパターンをちゃんと平面にする必要があるし、撮影時の手ブレやカメラがずれることに神経を尖らせるのが常です。 本スクリプトを使えば、少なくとも、「長さを定規で丁寧に測定する」手間が1ステップ省けます。 Happy Calibration Lifeを! 明日はhon_no_mushiさんの担当で、本記事執筆時点では「アイエエエエ! ニンジャ!? ニンジャナンデ!?」となってます。全OpenCVユーザ注目っていうか何てタイトルなんだぁ!!
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Introduction to Spot programming (with ROS)

Introduction to Spot programming (with ROS) こんにちは、東大のJSKでBoston Dynamics社のSpotというロボットをベースにシステムを作ってるsktometometoです。 この記事では、開発者がSpotを使ってどんなことができるのか、開発者はどのようなプログラミングインターフェースが用意されているのかについて、僕個人の見解を交えながら解説していきます。 (追記予定) 参考ページ類 Spot SDK spot-sdk (github) Knowledge Center Community Forum spot_ros Spotができること 皆さんご存知の通り、Spotは4脚の移動ロボットです。4脚移動ロボットはこの世に多くあれど、Spotほど一般のユーザーが使用することを意識し開発されたロボットはまだ珍しいです。 一般的なユーザーはSpotをコントローラ(専用APPのインストールされたAndroidタブレット)で操作することを意図されており、コントローラを用いてSpotを移動させたり、自律移動のための地図を作ったり、自律移動のタスクを編集したりすることができます。 基本的な使い方については、以下の公式のTutorialsを見るとわかりやすいです。 Getting started Mobility Autowalk Sensor integration Application ユーザーはそれと意識することなく、環境認識結果に基づく歩行をロボットにさせることができるので、段差の昇り降りや障害物の回避についてはロボットに自動で行わせつつ大体の移動方向を指示するのみでロボットに移動させることができます。 このように、環境認識・コントロール・ナビゲーションなどのロボットを動作させるために必要な各要素についてユーザーが注意することなく、監視などの必要な仕事を行わせることができるようなシステムになっているのですが、一方で開発者が利用できるSpotの機能には制限があります。 もっとも大きい制約のうちの一つは脚の関節へ関節角度指令を送ることができない点(参考:Separate movement of Spot's limbs )で、開発者はSpotの関節角度・トルクの情報を取得することはできるもの、関節角度・トルクなど低レベルの指令を送ることができません。開発者が送ることのできる最も低レベルな指令は、2次元平面内での速度指令、bodyリンクの高さ・姿勢の指令、足の配置を微調整する指令、与えられた位置まで移動する指令あたりになります。 また、より上位の指令についても、自律移動機能であるGraphnavを用いて地図上の目的のノードまで移動する指令などであり、Spot内部で使用されている歩行動作生成機能やナビゲーションのプランニング・コントロール機能、障害物認識やHeightMap生成などの認識機能、自律移動機能における経路計画や地図についても、用意されたいくつかのパラメータをいじることはできるものの、開発者はある機能を単体で利用したり、部分的に機能を自前のものに置き換えて動かしたりすることはできません。 これは、歩行制御だったり、動作の学習だったりの研究にSpotを使用することはできないことを意味していて、そのためかシミュレータ等でもSpotのモデルを使って遊んでいるのはあまり見かけない気がします。お金のある大富豪がハックして好き勝手いじれるようになるのが待たれます。 このため、脚で移動するため階段や多少の不整地でも移動することのできるという点では特殊ですが、開発者側からすると一般的な台車ロボット同様、移動する機能そのものはすでにあるとして、それを利用してどのようなアプリケーションを構築するかという点に着目するべきものだと思います。 Spot Programming 上記のリンク先にあるように、タブレットからの操作により、平地を移動させたり、階段を昇り降りしたり、自律移動用の地図を構築したりすることができます。タブレットから実行可能なものは開発者用のインターフェースとしても用意されており、開発者はPythonのクライアントライブラリであるspot-sdkを用いてSpotへ指令を送ったり、Spotから情報を取得するようになっています。 Concepts より転載 このクライアントライブラリを用いる際には、Spot本体と外部制御用PCとをWi-FiやEthernet経由のIPネットワークで接続し、gRPCにより通信することでロボットとの情報をやり取りします。 一般的には、Spot本体背面にSpot CORE等の開発用PCを取り付け、PCとSpot本体とをEthernetで接続し、開発用PC上でクライアントライブラリを使用するPythonスクリプトを実行することによりロボットを動作させることが想定されているようです。 hello_spot.pyはロボットを動作させるプログラムの一例です。 この中では、ロボットをAPIを通じて動作させるための認証・時刻合わせ・権限の獲得の他、RobotCommandを用いてパラメータを変更させつつstandさせたり、カメラのセンサ情報を取得させたりしています。 APIを通じてロボットを動作させる際に、ロボットに動作指令を送る権限を取得できるPythonプロセスは一つのみなので、ロボットにある動作を行わせる際には、Spot Detection and FollowのサンプルのようにあるPythonプロセスでデータの取得・認識の計算・ロボットへの指令を送るなどやや面倒なPythonスクリプトを書く必要があるようです。 ただ、Spot内部の機能と連携して動作させるにはこのAPIを利用する必要があり、例えばSpotのもともとの機能として、自律移動タスク中にObject Detectionを利用するには、外部からAPIを用いて通信する認識サーバーを立てる必要があります。Network Compute Bridge Spot Programming with ROS 上記のように、Boston Dynamicsにより用意されている開発者用プログラミングインターフェースは、pythonクライアントライブラリを用いたスクリプトにより、gRPCでSpot本体と通信してロボットを動かすというものですが、各種センサーのインテグレーションだったり、既存の認識機能の流用だったり、RVizを用いた可視化だったりを考えると、開発者としてはROSを利用したくなります。 たとえば、JSKではcoral_usb_rosパッケージを用いた認識機能を組み合わせることで、人を見つけてインタラクションするなどしています。 spot-sdkのROSドライバーはいくつか実装がありますが、JSKではclearpathroboticsのspot_rosを利用しています。 spot_ros は spot-sdk のROSインターフェースを提供する形になるので、利用できる機能は spot-sdk + ROSの域を出ません。2021/12/1現在のところ、以下のような機能があります。 camera, joint, battery などのロボット体内・センサ情報の取得 ロボットのodometry情報 速度指令・位置指令・姿勢指令などの移動に関する指令 階段モードON/OFFなどの移動パラメータの変更 Docking Station への Dock/Undock 指令 Autowalk の記録・再生指令 /cmd_velによる速度指令インターフェースや/odomのオドメトリトピック、ロボットの/odom -> /base_link のTFなどはあるため、move_baseのような形式でロボットを移動させるのに十分なインターフェースは用意されています。 まだROSのインターフェースが用意されていない機能もあり、 Armや各種Payload特有の機能 choreography機能 などがあります。spotをROSで利用する際、欲しい機能があれば是非contributionしてみてください。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Scrapyを使ってスクレイピング覚書

PythonのScrapyを使ってQiitaトップページからトレンドTOP30をスクレイピングする方法を雑に残しておきます。 Scrapyのインストール ※環境によっては他にもいろいろインストールが必要かもしれません。 pip install scrapy プロジェクト作成 scrapy startproject qiita spiderの作成 プロジェクトを作成するとディレクトリが作成されるので、そこに移動してからspiderを作成します。 cd qiita scrapy genspider qiita_trend qiita.com ソースコードの修正 setting.pyに以下を追加 setting.py FEED_EXPORT_ENCODING = 'utf-8' setting.pyに以下を有効化し'Accept-Language': 'en'を'ja'に修正 setting.py DEFAULT_REQUEST_HEADERS = { # 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ja', } ディレクトリqiita配下のqiita_trend.pyを修正(※1タイトルだけならコメントアウト行を有効化して下の行をコメントアウト) qiita_trend.py import scrapy class QiitaTrend1dSpider(scrapy.Spider): name = 'qiita_trend' allowed_domains = ['qiita.com'] start_urls = ['https://qiita.com/'] def parse(self, response): # title = response.xpath('//article[1]/h2/a/text()').get() titles = response.xpath('//article/h2/a/text()').getall() yield{ 'titles': titles, } spiderの実行 spiderを実行しqiita_trend.jsonに出力 scrapy crawl qiita_trend -o qiita_trend.json 出力結果 qiita_trend.json [ { "titles": [ "転職活動の面接でいただいた質問集", "AWS re:Invent 2021で発表された新サービス/アップデートまとめ", "Nikita Popov、PHP開発から離れる & PHP財団設立のお知らせ", "エンジニアとして普段よく使うサービスをまとめてみた", "【CTF】OSINT問題で個人的に使用するツール・サイト・テクニックまとめ", "地味に便利なdisplay: contents;", "5行のHTMLで15パズル作ってみた", "【しくじり先生】失敗続きの未経験からのエンジニア転職1年間を振り返って最短で満足のいく転職をするにはどうすれば良いかを考えてみた", "【React】【Vue】 など流行りのフレームワークを使うもののためのJS基礎", "リモートワークの目次 2021.11", "個人的「Java Bronze試験つまずいた項目」5選", "同人プロジェクトマネジメント入門:目次", "Azureの資格取得に向けた勉強法", "Pythonで人検知の通知機能プログラムを作成してみる", "launchWhen〇〇〇は止めてrepeatOnLifecycleを使おう!", "Clean Architecture 輪読会まとめ", "【Android】Modifier.requiredXXXについて、モヤモヤしなくなるくらいに理解する【Jetpack Compose】", "TypeScript のインターセクション型は2つの型のプロパティを持った型ではない", "ffmpeg で low latency DASH server 作ってみた", "Denoに今後導入される新機能(2021秋ver)", "CircleCIを使ってAutifyのテスト結果をTeamsに通知する", "エンジニアコミュニティのやり方&よくある失敗例", "ALL Pair生成、手作業 vs PictMaster vs GIHOZ", "アドベントカレンダーに参加しようかまだ迷っている人に", "転職活動ログ(2021)", "Rapberry Pi 初期設定の半自動化", "Pythonのオブジェクトへの参照とガベージコレクション", "【Jetpack Compose】Composeをオーバーレイ表示する", "C# の Json.NET を理解する", "AtCoderをRustでやるのにcargo-compete導入してみた" ] } ]
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Cython を含む Python パッケージを poetry で作成・公開する

Cython を含む Python パッケージを poetry で作成・公開する 背景 Cython を用いた高速な Pythonライブラリを書いたものの、Poetry を使って PyPI へ publish する手順をよく知らなかったので調べてみました。 前提 例として、以下の構造のプロジェクトを用いて説明します。便宜上、Cython公式チュートリアルに記されている手順を実行したときに生成される *.so や *.cpp 等のファイルは省略しています。 cy_test/ ├── cy_test/ │ ├── __init__.py │ ├── test.pyx │ ... ├── pyproject.toml ... 以下は test.pyx の中身です。生の Python が特に苦手とするような、なんか大量の逐次計算をしてクソデカ配列を作成してくれる関数を提供することにします。 cimport cython import numpy as np cimport numpy as cnp cdef cnp.ndarray[cnp.int64_t, ndim=2] create(int n): cdef cnp.ndarray[cnp.int64_t, ndim=1] a = np.empty([n**2], dtype=np.int64) cdef int i for i in range(n**2): a[i] = int((i-i/2)**2) return a.reshape([n,n]) cpdef cnp.ndarray get_100m_array(int n): return create(n) やりかた pyproject.toml に以下の項目を追記・変更します。 [tool.poetry] ... build = "build.py" ... [build-system] requires = ["poetry-core>=1.0.0", "setuptools", "cython", "numpy"] ... 以下のような内容で build.py を作成します。置く場所は pyproject.toml と同じ階層です。 import numpy as np try: from Cython.Build import cythonize except ImportError: def build(setup_kwargs): pass else: from setuptools import Extension from setuptools.dist import Distribution from distutils.command.build_ext import build_ext def build(setup_kwargs): extensions = [ "cy_test/test.pyx" ] setup_kwargs.update({ 'ext_modules': cythonize( extensions, language_level=3, compiler_directives={'linetrace': True}, ), 'cmdclass': {'build_ext': build_ext}, 'include_dirs': [np.get_include()] }) あとは以下のとおりに Poetry コマンドを実行するだけです。ラクですね。 $ poetry build $ poetry publish 別の仮想 Python 環境を作成し、その中で先ほど公開した cy_test パッケージをインストールして試してみると、ちゃんと test.pyx がビルドされた上で動作していることを確認できます。 $ poetry new ~/example $ cd ~/example $ poetry add cy_test $ poetry install $ poetry run python >>> from cy_test import test >>> test.get_100m_array(100) array([[ 0, 0, 1, ..., 2352, 2401, 2450], [ 2500, 2550, 2601, ..., 9702, 9801, 9900], [ 10000, 10100, 10201, ..., 22052, 22201, 22350], ..., [23522500, 23527350, 23532201, ..., 23995302, 24000201, 24005100], [24010000, 24014900, 24019801, ..., 24487652, 24492601, 24497550], [24502500, 24507450, 24512401, ..., 24985002, 24990001, 24995000]]) 参考 Basic Tutorial — Cython 3.0.0a9 documentation python - How to use Cython with Poetry? - Stack Overflow
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Cython を含む Python パッケージを poetry で作成・公開

はじめに これは「Cython を利用して高速な Python ライブラリを書いたものの、PyPI へ publish する手順知らね〜!」という poetry ユーザ へ向けたシンプルな記事です。 前提 例として、以下の構造のプロジェクトを用いて説明します。便宜上、Cython公式チュートリアルに記されている手順を実行したときに生成される *.so や *.cpp 等のファイルは省略しています。 cy_test/ ├── cy_test/ │ ├── __init__.py │ ├── test.pyx │ ... ├── pyproject.toml ... 以下は test.pyx の中身です。生の Python が特に苦手とするような、なんか大量の逐次計算をしてクソデカ配列を作成してくれる関数を提供することにします。 cimport cython import numpy as np cimport numpy as cnp cdef cnp.ndarray[cnp.int64_t, ndim=2] create(int n): cdef cnp.ndarray[cnp.int64_t, ndim=1] a = np.empty([n**2], dtype=np.int64) cdef int i for i in range(n**2): a[i] = int((i-i/2)**2) return a.reshape([n,n]) cpdef cnp.ndarray get_100m_array(int n): return create(n) やりかた pyproject.toml を以下のように追記・変更します。 [tool.poetry] ... build = "build.py" ... [build-system] requires = ["poetry-core>=1.0.0", "setuptools", "cython", "numpy"] ... 続いて、以下のような内容で build.py を作成し、 pyproject.toml と同じ階層に配置します。 import numpy as np try: from Cython.Build import cythonize except ImportError: def build(setup_kwargs): pass else: from setuptools import Extension from setuptools.dist import Distribution from distutils.command.build_ext import build_ext def build(setup_kwargs): extensions = [ "cy_test/test.pyx" ] setup_kwargs.update({ 'ext_modules': cythonize( extensions, language_level=3, compiler_directives={'linetrace': True}, ), 'cmdclass': {'build_ext': build_ext}, 'include_dirs': [np.get_include()] }) 最後に以下の Poetry コマンドを実行すると、PyPI に公開されます。 $ poetry build $ poetry publish 結果 別の仮想 Python 環境を作成し、その中で先ほど公開した cy_test パッケージをインストールして試してみると、test.pyx がきちんと動作していることが確認できます。 $ poetry new ~/example $ cd ~/example $ poetry add cy_test $ poetry install $ poetry run python >>> from cy_test import test >>> test.get_100m_array(100) array([[ 0, 0, 1, ..., 2352, 2401, 2450], [ 2500, 2550, 2601, ..., 9702, 9801, 9900], [ 10000, 10100, 10201, ..., 22052, 22201, 22350], ..., [23522500, 23527350, 23532201, ..., 23995302, 24000201, 24005100], [24010000, 24014900, 24019801, ..., 24487652, 24492601, 24497550], [24502500, 24507450, 24512401, ..., 24985002, 24990001, 24995000]]) 以下のように pip で自作 Cython モジュールをインストールできます。 $ pip install cy_test 参考 Basic Tutorial — Cython 3.0.0a9 documentation python - How to use Cython with Poetry? - Stack Overflow
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonの弱い文法を補う記事

はじめに Pythonをかけると言っても,所々にわからない文法があるかと思います。 そんなものを一つずつ公式ドキュメントを調べつつ,実験したいと思います。 argparse decorator class method v.s. static method property , setter Union yield 次回 : torch/autograd/grad_mode.pyのgithubを理解する. 1. argparse プログラムの仕様を記述できます。サンプルは以下です。 import argparse def func(args): return sum(args) parser = argparse.ArgumentParser(description='Process some integers.') # 複数の種類なら nargsは固定に, 任意の長さのものを複数取ると,前に集約される parser.add_argument('dorararara', metavar='Crazy Diamond', type=int, nargs='+', help='an integer for the accumulator') parser.add_argument('--DIO', dest='UNKNOWN', action='store_const', const=func, default=max, help='sum the integers (default: find the max)') args = parser.parse_args() # 受け取り parser.print_help() # helpのprint print(args.UNKNOWN(args.dorararara)) # 計算結果 # tmp.py 1 2 3 4 --DIO (https://docs.python.org/ja/3/library/argparse.html#the-add-argument-method より) ここで,私個人的に思ったのが,const(ant)とdefaultが同じような意味合いを持っている感触だったので見分けが多少覚えづらいなと思いました。 2. Decorator 公式documentにある通り,wrapper構文で書かれるのが一般的です。 試しにwrapper構文以外で書くと,定義をするだけで下記のコードはprintされてしましました。 def Vs_Dio(who): for _ in range(2): print("Mudamudamudamuda") who() @Vs_Dio def Jotaro(*args, **kwargs): print("Oraoraoraora") print("-"*50) def vs_dio(who): def wrapper(*args, **kwargs): for _ in range(2): print("muda"*3) who() return wrapper @vs_dio def jotaro(*args, **kwargs): print("ora"*3) jotaro() 3. Method Static v.s. Class 簡単に言えば,classは引数にclassを, staticは引数を必要としません。 class Nen: def __init__(self,type_p) -> None: self.type_p=type_p @classmethod def syugyo(cls): GON=cls("POWER") return GON @staticmethod def mizumishiki(s): if s=="POWER": return "OVER" else: return "NO-OVER" gon=Nen.syugyo() kirua=Nen("NO-POWER") print(Nen.mizumishiki(gon.type_p)) print(getattr(gon,"type_p")) print(Nen.mizumishiki(kirua.type_p)) print(getattr(kirua,"type_p")) setattr(kirua,"type_p","POWER") print(Nen.mizumishiki(kirua.type_p)) 4. property , setter これは,クラスの値のsetやgetを明示的に定義できるようになります。 from sys import argv, flags flag=argv[1]=="t" class Git(): def __init__(self, name, password) -> None: self.name=name self.__password=password # name, mangling @property def password(self): return self.__password if flag: @password.setter def password(self, value): print("setter") self.__password=value boku=Git("ma", "1234password") print(boku._Git__password) if flag: boku.password="chon" else: boku._Git__password="chon" print(boku.password) 5. Union unionは公式docsそのままです from typing import Union, List import numpy as np def get_len_1(lst:Union[List, np.ndarray]) -> int: print(len(list(lst))) get_len_1([1,2,3]) get_len_1(np.array([[1,2,3]])) 6. yield generatorを作成するために使用します。 from typing import Generator def my_gene(a: int) -> Generator: c=0 while(c<a): yield (c+10)*2**c c+=1 for x in my_gene(10): print(x) 7. with ブロックの実行を、コンテキストマネージャによって定義されたメソッドでラップするために使われる。 そのため,デコレータのように扱うことができようなケースも多い。 import contextlib class A: def __init__(self,value) -> None: self.value=value tmp=A(3) @contextlib.contextmanager def func(a:int) -> None: old_value=getattr(tmp, "value")# 前処理 setattr(tmp,"value",old_value+a) try: yield finally: setattr(tmp,"value",old_value)# 終了後 with func(100): print(tmp.value) print(tmp.value) print("-"*20) @func(200) def ora(): print(tmp.value) ora() print(tmp.value) 一般的にopenなどでよく使われる. 参考記事 - 【Python】with構文を1分で解説―with open/ファイル操作― - Pythonの関数デコレータでopenしてcloseする 終わりに 他にもたくさん知らないことがあるため,都度都度キャッチアップをしたいと感じた。また,自作でライブラリを作る場合はこの辺りの知識は非常に重要であると同時に,正しくライブラリを理解するためにもこの辺りの知識が非常に重要であると感じた。 次回の予定 : torch/autograd/grad_mode.pyのgithubを理解する.
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonの理解の乏しい文法を補う記事

はじめに Pythonをかけると言っても,所々にわからない文法があるかと思います。 そんなものを一つずつ公式ドキュメントを調べつつ,実験したいと思います。 argparse decorator class method v.s. static method property , setter Union yield 次回 : torch/autograd/grad_mode.pyのgithubを理解する. 1. argparse プログラムの仕様を記述できます。サンプルは以下です。 import argparse def func(args): return sum(args) parser = argparse.ArgumentParser(description='Process some integers.') # 複数の種類なら nargsは固定に, 任意の長さのものを複数取ると,前に集約される parser.add_argument('dorararara', metavar='Crazy Diamond', type=int, nargs='+', help='an integer for the accumulator') parser.add_argument('--DIO', dest='UNKNOWN', action='store_const', const=func, default=max, help='sum the integers (default: find the max)') args = parser.parse_args() # 受け取り parser.print_help() # helpのprint print(args.UNKNOWN(args.dorararara)) # 計算結果 # tmp.py 1 2 3 4 --DIO (https://docs.python.org/ja/3/library/argparse.html#the-add-argument-method より) ここで,私個人的に思ったのが,const(ant)とdefaultが同じような意味合いを持っている感触だったので見分けが多少覚えづらいなと思いました。 2. Decorator 公式documentにある通り,wrapper構文で書かれるのが一般的です。 試しにwrapper構文以外で書くと,定義をするだけで下記のコードはprintされてしましました。 def Vs_Dio(who): for _ in range(2): print("Mudamudamudamuda") who() @Vs_Dio def Jotaro(*args, **kwargs): print("Oraoraoraora") print("-"*50) def vs_dio(who): def wrapper(*args, **kwargs): for _ in range(2): print("muda"*3) who() return wrapper @vs_dio def jotaro(*args, **kwargs): print("ora"*3) jotaro() 3. Method Static v.s. Class 簡単に言えば,classは引数にclassを, staticは引数を必要としません。 class Nen: def __init__(self,type_p) -> None: self.type_p=type_p @classmethod def syugyo(cls): GON=cls("POWER") return GON @staticmethod def mizumishiki(s): if s=="POWER": return "OVER" else: return "NO-OVER" gon=Nen.syugyo() kirua=Nen("NO-POWER") print(Nen.mizumishiki(gon.type_p)) print(getattr(gon,"type_p")) print(Nen.mizumishiki(kirua.type_p)) print(getattr(kirua,"type_p")) setattr(kirua,"type_p","POWER") print(Nen.mizumishiki(kirua.type_p)) 4. property , setter これは,クラスの値のsetやgetを明示的に定義できるようになります。 from sys import argv, flags flag=argv[1]=="t" class Git(): def __init__(self, name, password) -> None: self.name=name self.__password=password # name, mangling @property def password(self): return self.__password if flag: @password.setter def password(self, value): print("setter") self.__password=value boku=Git("ma", "1234password") print(boku._Git__password) if flag: boku.password="chon" else: boku._Git__password="chon" print(boku.password) 5. Union unionは公式docsそのままです from typing import Union, List import numpy as np def get_len_1(lst:Union[List, np.ndarray]) -> int: print(len(list(lst))) get_len_1([1,2,3]) get_len_1(np.array([[1,2,3]])) 6. yield generatorを作成するために使用します。 from typing import Generator def my_gene(a: int) -> Generator: c=0 while(c<a): yield (c+10)*2**c c+=1 for x in my_gene(10): print(x) 7. with ブロックの実行を、コンテキストマネージャによって定義されたメソッドでラップするために使われる。 そのため,デコレータのように扱うことができようなケースも多い。 import contextlib class A: def __init__(self,value) -> None: self.value=value tmp=A(3) @contextlib.contextmanager def func(a:int) -> None: old_value=getattr(tmp, "value")# 前処理 setattr(tmp,"value",old_value+a) try: yield finally: setattr(tmp,"value",old_value)# 終了後 with func(100): print(tmp.value) print(tmp.value) print("-"*20) @func(200) def ora(): print(tmp.value) ora() print(tmp.value) 一般的にopenなどでよく使われる. 参考記事 - 【Python】with構文を1分で解説―with open/ファイル操作― - Pythonの関数デコレータでopenしてcloseする 終わりに 他にもたくさん知らないことがあるため,都度都度キャッチアップをしたいと感じた。また,自作でライブラリを作る場合はこの辺りの知識は非常に重要であると同時に,正しくライブラリを理解するためにもこの辺りの知識が非常に重要であると感じた。 次回の予定 : torch/autograd/grad_mode.pyのgithubを理解する.
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonを使ってcsvから特定期間内のデータを集計する

作成背景 初投稿! 早いもので2021年も残り1ヶ月ですねえ。 うちの会社では、顧客からの問合せを受けた際に、内容の履歴を残します(一般的)。 一応webベースかつオンプレで存在する問合せ管理システムなんですが、これがいかんせん使いづらい。。 「担当者別に集計して、この期間はどれくらい問合せがあるの見れたらいいのに」なんて思ってたところ、 Pythonでcsvからデータ集計すればええやん!てことで調べまくったのが最初。 (csvのデータは吐き出せるけどExcelではやりたくなかったのが本音) 最近プログラミングが好きになってPythonとかGOとかPHPとかRubyとか触ってますが、 Python以外はProgateちょこちょこやってるくらいなので初心者に毛の生えたレベルやと思ってください。 調べまくってツギハギだらけのコードなので、もしかすると無駄な部分もあるかも・・・? 何卒おてやわらかに(いつかちっちゃい船じゃなくてメリー号くらい作れるようになりたい) 環境 Windows10。Microsoft Officeが入っていると良き。 Python 3.10.0 全体像 resample_enquiries.py import pandas as pd import numpy as np from datetime import datetime as dt from datetime import time from dateutil.parser import parse as ps import numpy as np import collections import itertools import os import sys from time import sleep df = pd.read_csv(r'C:\Users\user\Desktop\<ファイル名>.csv', usecols=['datetime', 'person_in_charge'], encoding='cp932' ) df_date = df.set_index('datetime') #エラーチェック用 def check(x): #print(x) pd.to_datetime(x) df_date.index.map(check) df_date.index=pd.to_datetime(df_date.index,format='%Y%m%d %H:%M') df_date.sort_index(inplace=True) df_date['count'] = 1 df_multi = df_date.set_index([df_date.index.year, df_date.index.month, df_date.index.weekday, df_date.index.hour, df_date.index]) df_multi.index.names = ['year', 'month', 'weekday', 'hour', 'date'] df_date['person_in_charge'] = df_date['person_in_charge'].str.split(',') all_tag_list = list(itertools.chain.from_iterable(df_date['person_in_charge'])) c = collections.Counter(itertools.chain.from_iterable(df_date['person_in_charge'])) tags = pd.Series(c) df_tag_list = [] top_tag_list = tags.sort_values(ascending=False).index[:11].tolist() for t in top_tag_list: df_tag = df_date[df_date['person_in_charge'].apply(lambda x: t in x)] df_tag_list.append(df_tag[['count']].resample('1M').sum()) df_tags = pd.concat(df_tag_list, axis=1) df_tags.columns = top_tag_list df_tags.to_csv(r'C:\Users\user\Desktop\2021_result.csv', encoding='cp932') print("done") 使用ライブラリ ライブラリとかはこのあたり⬇を今回使ってます。適宜 pip でインストールしてください。 ※matplotlibも組み合わせた場合のもまたアップ予定 resample_enquiries.py import pandas as pd import numpy as np from datetime import datetime as dt from datetime import time from dateutil.parser import parse as ps import numpy as np import collections import itertools import os import sys from time import sleep CSVファイルの前処理について 今回は2021年分を月単位、担当者ごとに集計したいと思います。 (後述しますが、週単位や日単位の集計もできる柔軟仕様です) 必要なcsv上のカラム名は「datetime」と「person_in_charge」 それぞれ問合せを受けた日、担当者カラムにこれらの名前をつけます。 エラーがこわいのでdatetimeカラムの空欄とかは前もってサクッておく。 コード ファイルをpandasで読み込ませます。 resample_enquiries.py df = pd.read_csv(r'C:\Users\user\Desktop\<ファイル名>.csv', usecols=['datetime', 'person_in_charge'], encoding='cp932' ) df.set_index で「datetime」カラムをインデックスに割り当て。 ※後半の箇所は空欄セルがないかのチェック用なので省いてもok resample_enquiries.py df_date = df.set_index('datetime') def check(x): #print(x) pd.to_datetime(x) df_date.index.map(check) データ型をdatetime用フォーマットに変換+ソート resample_enquiries.py df_date.index=pd.to_datetime(df_date.index,format='%Y%m%d %H:%M') df_date.sort_index(inplace=True) #resamplingのためにカウント列を追加 df_date['count'] = 1 日や時間ごとにグルーピングとか resample_enquiries.py df_multi = df_date.set_index([df_date.index.year, df_date.index.month, df_date.index.weekday, df_date.index.hour, df_date.index]) df_multi.index.names = ['year', 'month', 'weekday', 'hour', 'date'] df_date['person_in_charge'] = df_date['person_in_charge'].str.split(',') all_tag_list = list(itertools.chain.from_iterable(df_date['person_in_charge'])) #iiterateをcollectionに渡す。各タグの出現回数をカウント c = collections.Counter(itertools.chain.from_iterable(df_date['person_in_charge'])) #seriesに変換 tags = pd.Series(c) タグをDataFrameに格納。 resample_enquiries.py df_tag_list = [] top_tag_list = tags.sort_values(ascending=False).index[:11].tolist() ここが肝。 df_tag_list.append(df_tag[['count']].resample('1M').sum()) の「'1M'」部分を例えば「'2W'」にすれば2週間ごと、「'7D'」にすれば7日ごとの集計が可能。 resample_enquiries.py for t in top_tag_list: df_tag = df_date[df_date['person_in_charge'].apply(lambda x: t in x)] df_tag_list.append(df_tag[['count']].resample('1M').sum()) df_tags = pd.concat(df_tag_list, axis=1) df_tags.columns = top_tag_list 集計データを新規ファイルに出力 resample_enquiries.py df_tags.to_csv(r'C:\Users\user\Desktop\2021_result.csv', encoding='cp932') #終わったら「done」がでてくる print("done") 出力結果 こんな感じで月集計のcsvファイルが出ます。 うまーく装飾すれば見やすくなりますね。 みなさん良いお年を
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

G空間情報センター 人流オープンデータを使ってみた

G空間情報センターの人流オープンデータ G空間情報センターは、一般社団法人 社会基盤情報流通推進協議会が2016年から運営を開始しています。曰く、「産官学の様々な機関が保有する地理空間情報を円滑に流通し、社会的な価値を生み出すことを支援する機関」とのこと。ここから人流オープンデータがあったので、pythonで読み込み、滞在人口をfoliumのHeatMapで表示させるまでをやってみました。既に人流オープンデータで表示できるので、ほとんど意味はないんだけど。 人流オープンデータ仕様 -提供エリア:全国 -集計期間 :2019年1月~2020年12月の各月 -集計単位 :(平休日)全日/平日/休日        (時間帯)終日/昼 /夜        (居住地)同市区町村/同都道府県/同地方/それ以外 ※市町村単位発地別データのみ。 -データ形式:CSV形式 使ったデータ -1kmメッシュ別の滞在人口データ(monthly_mdp_mesh1km)        1kmメッシュ別に、いつ、何人が滞在したのかを収録したデータ  -[補足データ]1kmメッシュ属性(attribute)        1kmメッシュデータにおけるメッシュIDの座標を示すデータ データの読み込み データは基本的にCSV形式ですが、ダウンロードファイルはZIP形式で圧縮されているので解凍するか、zipfileで読み込みます。特に1kmメッシュ別の滞在人口データ(monthly_mdp_mesh1km)は、月毎に圧縮されているのでいちいち全部解凍するのは面倒だと思います。 attributeデータの読み込み 1kmメッシュ属性データを読み込みます。 import pandas as pd from zipfile import ZipFile # read attribute attribute_zipfile = './data/attribute.zip' zf_attribute = ZipFile(attribute_zipfile) attribute_files = zf_attribute.infolist() attribute_files # [<ZipInfo filename='attribute/attribute_mesh1km_2019.csv.zip' external_attr=0x20 file_size=3965732>, # <ZipInfo filename='attribute/attribute_mesh1km_2020.csv.zip' external_attr=0x20 file_size=3965724>] 2019年と2020年の1kmメッシュの緯度・経度データが含まれます。中身はこんな感じ(2019年分)。 # 2019 attribute df_attribute[0] mesh1kmid lon_center lat_center lon_max lat_max lon_min lat_min prefcode citycode 0 30365006 136.081253 20.420834 136.087494 20.424999 136.074997 20.416666 13 13421 1 30365015 136.068756 20.429167 136.074997 20.433332 136.062500 20.424999 13 13421 … 1kmのメッシュの4方と中央の緯度・経度座標があります。prefcodeは都道府県のコードで13は東京都です。citycodeは市区町町村のコードで、別のファイル(prefcodecitycodemaster)で対応づけられています。今回は使いません。 monthly_mdp_mesh1kmデータの読み込み これは、都道府県ごとに提供されているので、今回は東京のみを対象としてみました。東京のprefcodeは13なので、monthlymdpmesh1km13.zipだけを読み込みます。中にディレクトリ構造(年ディレクトリの下に月ディレクトリ)があるため、次のようにしてみました。 # 東京1㎞メッシュデータ mesh13_data = './data/monthlymdpmesh1km13.zip' zf = ZipFile(mesh13_data) csv_files = zf.infolist() df_mesh1km_tokyo=[] for csv_file in csv_files: if csv_file.filename.endswith('.zip') : csv_file_name=csv_file.filename d=zf.open(csv_file_name) df = pd.read_csv(d,compression='zip') df_mesh1km_tokyo.append(df) 例えば、2019年4月のデータはこんな感じ。 df_mesh1km_tokyo[3] mesh1kmid prefcode citycode year month dayflag timezone population 0 53394519 13 13101 2019 4 0 0 14417 1 53394519 13 13101 2019 4 0 1 6053 2 53394519 13 13101 2019 4 0 2 9667 3 53394519 13 13101 2019 4 1 0 70932 4 53394519 13 13101 2019 4 1 1 9786 … dayflagは、0:休日、1:平日、2:全日、timezoneは、0:昼、1:深夜、2:終日。populationが各メッシュでの対応する年・月・デイ・タイムゾーンでの平均滞在人数です。 データを結合して、foliumでHeatMap表示 attributeデータとmonthly_mdp_mesh1kmデータの結合 緯度、経度と平均滞在人数をmesh1kmidをキーにして結びつけます。後はデイフラグとタイムゾーンを適当に指定して描画へ。 #attributeとmonthly_mdp_mesh1kmデータを結合 df_tokyo_201904 = pd.merge(df_attribute[0],df_mesh1km_tokyo[3],on='mesh1kmid',how='inner') #必要なカラム(中心緯度、中心経度、デイフラグ、タイムゾーン、平均滞在人数)を抽出 df_tokyo_201904_dt=df_tokyo_201904[['mesh1kmid','lon_center','lat_center','dayflag','timezone','population']] #dayflagを1(平日)、timezoneを0(昼)に指定 df_tokyo_201904_plot=df_tokyo_201904_dt[(df_tokyo_201904_dt['dayflag']==1) & (df_tokyo_201904_dt['timezone']==0)] foliumでHeatMap表示 import folium import matplotlib.pyplot as plt from folium.plugins import HeatMap #空白地図の作成 center = [35.67083, 139.74375] m = folium.Map(center, zoom_start = 12) #メッシュの緯度、経度、population列を抽出し、重みのリストを作成 weight_list = df_tokyo_201904_plot[['lat_center','lon_center','population']].values # print(weight_list) # ヒートマップの表示 HeatMap(weight_list, radius=10, blur=8).add_to(m) m ”平均”滞在人数だからでしょうか?あまり地域差がないように見えます。別の月、時間帯で比較するといいかもしれません。後はHeatMapのパラメータを変えるとか? とりあえず、ここまでで。お疲れさでした。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

str.containsで複数の式を利用

概要 str.containsで「or」といった複数の条件を設定する際には正規表現を利用 実装 データフレームの作成 import pandas as pd dict1=dict(date=[20151204,20151204,20151204],prefecture=["東京都","神奈川県","和歌山県"]) list1=pd.DataFrame(data=dict1) list1 date prefecture 20151204 東京都 20151204 神奈川県 20151204 大阪府 都道府県から任意の地方に該当するかについてstr.containsの複数の式を利用 list1['関東']= list1['prefecture'].str.contains('神奈川県|東京都') list1 date prefecture 関東 20151204 東京都 True 20151204 神奈川県 True 20151204 大阪府 False 正規表現を使わないと以下のようになってしまいます。 list1['関東']= list1['prefecture'].str.contains('東京都' or '神奈川県') list1 date prefecture 関東 20151204 東京都 True 20151204 神奈川県 False 20151204 大阪府 False
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

画像をGoogle PhotosにアップロードするPythonスクリプトと基本的な考え方

Google Photos APIを使って画像を一枚だけアップロードする簡単なPythonスクリプトを作ってみました(成果物@gist)。 スクリプトはHeadlessで動いているRaspberry Piで動かしたかったので、認証モデルとしてはブラウザがないデバイス向けのもの(TV and Limited-input devices)を利用しています。 その際に分かりづらかったところについて備忘録を残します。 認証〜API呼び出しの基本的な考え方 組み込み機器などで動かす場合(Limited input device)の場合、認証(OAuth)については基本的に、以下の手順になります: 事前準備: Googleの解説 に従って、APIを呼び出すために必要なClient IDを作成し、それに対応するJSONファイルをダウンロードしておいてください。 この際、クライアントの種類は「TV and Limited-input device」を選びます。 基本的に、Googleがメンテナンスしている以下のライブラリ群を使いましょう: from google_auth_oauthlib.flow InstalledAppFlow -- OAuthのワークフローを処理するためのクラス from google.auth.transport.requests AuthorizedSession -- Authorization の結果得られる Credential を利用して、Authorization の必要なREST APIを呼び出すためのクラス from google.auth.transport.requests Request -- Credential のリフレッシュの処理の際に利用している、REST APIの処理を行うためのクラス InstalledAppFlow.from_client_secrets_file に前述のファイルを与えて、OAuth ワークフローを管理するオブジェクト(以下 flow と表記)を作成します。 flow.authorization_url() メソッドで認証を実施するためのURLを取得し、ユーザに表示します。 外部の作業: PCやスマホのブラウザを使って前のステップで取得したURLにアクセスして認証を実施します。 認証の処理をブラウザで行った際に表示される Authorization Code を使って、flow.fetch_token() メソッドによりCredentialを取得します。 取得したCredentialは flow.credentials としてアクセスできます。 取得したCredentialを利用して、AuthotizedSession オブジェクトを生成し、このオブジェクトを利用して各種のREST API呼び出しを行います。 また、実際に使う場合には以下のような点も考えると良いでしょう 生成したCredentialは pickle などで保存すると再利用が楽です。 Credentialには利用期限があるので、期限を超えていたら Refresh する必要があります。 Google Photos APIを利用した画像のアップロードの仕方 Google Photos APIに関しては Python 向けの(公式な)ライブラリが見つからなかったので、AuthorizedSession のオブジェクトを使って一つ一つREST APIを叩くことで動かします。 この際、画像は基本的に2ステップでアップロードする必要があります。 画像データのアップロード https://photoslibrary.googleapis.com/v1/uploads (参考: Uploading bytes) アップロードに成功すると、HTTPSのResponseとして uploadToken が得られます。Responseには uploadToken がプレーンテキストで記載されています(※JSONではないので注意) アップロードしたデータを mediaItem として登録するhttps://photoslibrary.googleapis.com/v1/mediaItems:batchCreate (参考: Creating media item) 前述の uploadToken を使って、mediaItem を作成します。これにより Google Photos から画像が見えるようになります。 補足(試していないこと) mediaItems:batchCreate は複数の mediaItem を一気に作ることができます。 mediaItems:batchCreate の際にはオプションとしてアルバムの指定などを行うことも可能です。 環境に関して 以下の環境で検証しました。検証は2021/11/30日に実施。 $ uname -a Darwin Arigas-MBA-M1.local 21.1.0 Darwin Kernel Version 21.1.0: Wed Oct 13 17:33:24 PDT 2021; root:xnu-8019.41.5~1/RELEASE_ARM64_T8101 x86_64 $ python --version Python 3.7.9 $ pip list | grep google google-api-core 2.0.1 google-api-python-client 2.31.0 google-auth 2.2.1 google-auth-httplib2 0.1.0 google-auth-oauthlib 0.4.6 google-cloud-core 2.0.0 google-crc32c 1.2.0 google-resumable-media 2.0.3 googleapis-common-protos 1.53.0
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Cloud Composer(airflow)でGoogle WorkspaceのAPIを使う

はじめに これは Kyash Advent Calendar 2021 の7日目の記事です Kyashで、経理・会計・資金決済法の担当エンジニアをしているPrinceチームの清水(@thimi0412)です。 Princeチームについてはこちら → https://foxtoy.hateblo.jp/entry/2021/07/19/100223 KyashではGoogle Cloud Composerを使用して会計業務に使うViewテーブルの作成などをおこなっています。 その中でBigQueryに対してクエリを実行し、結果をスプレッドシートに書き込み指定された共有ドライブのフォルダ内に入れる等の操作が必要になりCloudComposerでGoogle Workspace系のAPIを使う機会があったのでハマった点やどう使っているかを書きます。 Cloud Composerとは Apache Airflow(ワークフロー管理ツール) で構築された、フルマネージドのワークフロー オーケストレーション サービスです。 https://cloud.google.com/composer Google Workspace系のOperatorは共有ドライブに対応していないので注意 スプレッドシートを作成したのちそのシートに対してBigQueryのクエリ結果を書き込む処理をしたいと思ったのでGoogleSheetsCreateSpreadsheetOperatorを使用してスプレッドシートを作成したのですがフォルダIDが見つからないというエラーが出ました。 Operatorの使用の例 SPREADSHEET = { "name": sheet_name, # シート名 "parents": [parent_folder_id], # シートを作成するフォルダID "mimeType": "application/vnd.google-apps.spreadsheet", } create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator( task_id="create_spreadsheet", spreadsheet=SPREADSHEET ) ググっても情報が出てこなかったのでairflowのソースコードを調べました。 分かったことはcreate()の引数としてsupportsAllDrives=trueのパラメータがセットされていないことが問題でした。 共有ドライブに作成する際にはこのパラメータをセットする必要があります(他のGoogle Workspace系のOperatorも同様)。なんで対応してないんだ? ref Implement shared drive support def create_spreadsheet(self, spreadsheet: Dict[str, Any]) -> Dict[str, Any]: """ Creates a spreadsheet, returning the newly created spreadsheet. :param spreadsheet: an instance of Spreadsheet https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets#Spreadsheet :type spreadsheet: Dict[str, Any] :return: An spreadsheet object. """ self.log.info("Creating spreadsheet: %s", spreadsheet['properties']['title']) response = ( self.get_conn().spreadsheets().create(body=spreadsheet).execute(num_retries=self.num_retries) ) self.log.info("Spreadsheet: %s created", spreadsheet['properties']['title']) return response GoogleSheetsCreateSpreadsheetOperatorは共有ドライブには対応していないのでスプレッドシート作成の関数を自作しました。 この関数をPythonOperator内から呼び出してスプレッドシートを作成しています。フォルダを作成する場合はmimeTypeをapplication/vnd.google-apps.folderにすればフォルダも作成できます。 from airflow.providers.google.suite.hooks.sheets import GSheetsHook def create_spread_sheet(sheet_name, parent_folder_id): """スプレッドシートを作成する Args: sheet_name (string): シート名 parent_folder_id (string): シートを作成する親フォルダのID Returns: [type]: [description] """ hook = GoogleDriveHook() drive = hook.get_conn() file_metadata = { "name": sheet_name, "parents": [parent_folder_id], "mimeType": "application/vnd.google-apps.spreadsheet", # "mimeType": "application/vnd.google-apps.folder", フォルダの場合 } # supportsAllDrives=Trueがないと共有ドライブに作成できない res = drive.files().create(body=file_metadata, supportsAllDrives=True).execute() return res フォルダIDとシートIDはXComで受け取ると便利 XComは異なるタスク間で値をやりとりする手段です。 フォルダ作成→スプレッドシート作成→データの書きこみの依存関係はこのように作成していて、フォルダ、スプレッドシートの作成はそれぞれPythonOperator内でおこなっているのでPythonOperatorも戻り値としてレスポンスのid(フォルダやスプレッドシートのID)を返すことで別のタスクからXComを使用してその値を受け取れます。 def create_folder(**kwargs): """フォルダを作成する Returns: [str]: 作成されたフォルダのID """ folder_name = '<yyyy-mm-dd>' folder_id = '<FOLDER_ID>' res = create_drive_folder(folder_name, folder_id) return res["id"] def create_spreadsheet(**context): """スプレッドシートを作成する Returns: [str]: 作成したスプレッドシートのID """ # 前taskのcreate_folderから作成したフォルダのIDを受け取る folder_id = context["task_instance"].xcom_pull(task_ids="create_folder") sheet_name = "sheet_A" res = create_spread_sheet(sheet_name, folder_id) return res["id"] with DAG( DAG_NAME, description="spreadsheet_test", schedule_interval="0 23 * * *", # 毎日8:00(JST)に実行 catchup=False, default_args=default_args, ) as dag: task_create_folder = PythonOperator( task_id="create_folder", python_callable=create_folder, provide_context=True, dag=dag, ) task_create_spreadsheet_A = PythonOperator( task_id="create_spreadsheet_A", python_callable=create_spreadsheet, provide_context=True, dag=dag, ) task_create_spreadsheet_B = PythonOperator( task_id="create_spreadsheet_B", python_callable=create_spreadsheet, provide_context=True, dag=dag, ) task_create_spreadsheet_C = PythonOperator( task_id="create_spreadsheet_C", python_callable=create_spreadsheet, provide_context=True, dag=dag, ) task_create_folder >> task_create_spreadsheet_A task_create_folder >> task_create_spreadsheet_B task_create_folder >> task_create_spreadsheet_C etc task失敗時のslack通知 各taskが失敗した際にon_failure_callbackに関数を登録しておくとslackの通知するようにしています。 logのurlの取得ができるのでslackの通知から直接logに飛べるのでとても便利。 ↓のコードではslackwebを使用して通知していますがairflowのproviderにもslackのhookが用意されているのでそっちを使ってもよかったと作ってから思いました。 - 参考にしたもの https://www.mikulskibartosz.name/send-cusomized-slack-notification-when-airflow-task-fails/ - slack provider https://github.com/apache/airflow/tree/main/airflow/providers/slack def failure_notification(context): task_instance = context["task_instance"] dag_name = context["dag"] task_name = task_instance.task_id log_link = f"<{task_instance.log_url}|{task_name}>" error_message = str(context.get("exception") or context.get("reason")) slack = slackweb.Slack(url=SLACK_URL) attachments = [ { "username": "Cloud Composer(Airflow)", "icon_emoji": ":airflow:", "color": "danger", "title": "エラーが発生しました", "fields": [ {"title": "Project", "value": f"{PROJECT_ID}"}, {"title": "<DAG name> Task name", "value": f"{dag_name} {task_name}"}, {"title": "Log URL", "value": log_link}, {"title": "Error Message", "value": error_message}, ], } ] slack.notify(attachments=attachments) 最後に CloudComposer(airflow)のprovider周りのOperatorはあんまりドキュメントが整理されていないので直接airflowのソースコードを読んでどういった仕様になっているのか調べたほうがいいなと思いました。 今回はGoogle WorkspaceのAPI周りを調べましたがairflowのソースコードとGoogle WorkspaceのAPIのドキュメントを繰り返し見て作業してました。 providerはgoogleやslack以外にも多くあるので他のサービスと連携する際にproviderを見てみて連携するサービスがあれば一から実装しなくても良くなるかもしれないのでチェックしておくといいかもと思いました。PR送れるくらいGoogleのproviderのコードを読んだので共有ドライブ対応のPRを送ろうと思っています。 Kyash Advent Calendar 2021の他の記事も是非ご覧ください〜!
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Databricks】モデルの作成、確認を行ってみる

こんにちは。 前回の記事ではデータをimportしたり、データの中身を確認してみたりしました。 今回は実際にモデルを作成し、その精度等を確認してみたいと思います。 今回使うデータはStore Item Demand Forecasting Challengeを使いたいと思います。 データの詳細について今回は省略します。 時系列データなので、FBprophetというライブラリを利用したいと思います。 データの準備 まずは上記ページからCSVデータをダウンロードし、DataBricks上にアップロードします。 これらの方法は前回の記事を参照してください。 ファイルはData内の/FileStore/testdata/train.csvとしてアップロードしました。 fbprophetのインストール 今回利用するライブラリをインストールします。 自分はここで詰まってしまったのですが、前提ライブラリの最新版に対応していないことが問題だったので、そこのダウングレードを一緒に行っています。 notebook !echo y | %pip uninstall pystan %pip install pystan==2.19.1.1 %pip install fbprophet これで必要な準備はできました。 モデルの作成 まずモデルを作成するために使うデータをimportします。 notebook from pyspark.sql.types import * # 今回利用するデータのスキーマを指定 (型修正オプションで自動読み取りも可能) schema = StructType([ StructField("date", DateType(), False), StructField("store", IntegerType(), False), StructField("item", IntegerType(), False), StructField("sales", IntegerType(), False) ]) # FileStore に格納されているCSVを読み込み inputDF = spark.read.format("csv")\ .options(header="true", inferSchema="true")\ .load("/FileStore/testdata/train.csv", schema=schema) # クエリを発行可能な状態にするために、一時ビューを作成 inputDF.createOrReplaceTempView('history') history = inputDF # Spark3.0以降だとDATE_FORMATがうまく動作しないので、timeParserPolicyをLEGACYにすることで対応する spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY") 続いて、作成するモデルの定義を行います。 notebook # モデルの定義 def define_prophet_model(params): model = Prophet( interval_width=params["interval_width"], # 信頼区間 growth=params["growth"], # モデルの種類。線形回帰(linear)またはロジスティック回帰(logistic) daily_seasonality=params["daily_seasonality"], # 日ごとの周期性の有無 weekly_seasonality=params["weekly_seasonality"], # 月ごとの周期性の有無 yearly_seasonality=params["yearly_seasonality"], # 年ごとの周期性の有無 seasonality_mode=params["seasonality_mode"] # 周期性の傾向 ) return model # 予測 def make_predictions(model, number_of_days): return model.make_future_dataframe(periods=number_of_days, freq='d', include_history=True) 実際にモデルを作成します。 notebook from pyspark.sql.functions import to_date, col from pyspark.sql.types import IntegerType # 学習用の pandas df を用意 history_sample = history.where(col("date") >= "2015-01-01").sample(fraction=0.01, seed=123) history_pd = history_sample.toPandas().rename(columns={'date':'ds', 'sales':'y'})[['ds','y']] # fbprophetを利用した場合のメッセージを非表示に import logging logging.getLogger('py4j').setLevel(logging.ERROR) # モデルの定義 from fbprophet import Prophet params = { "interval_width": 0.95, # 信頼区間95% "growth": "linear", # 線形回帰 "daily_seasonality": False, # 日ごとの周期性なし "weekly_seasonality": True, # 月ごとの周期性あり "yearly_seasonality": True, # 年ごとの周期性あり "seasonality_mode": "multiplicative" # 周期性を増加させていく } model = define_prophet_model(params) # 過去のデータをモデルに学習させる model.fit(history_pd) これでモデルを作成することができました。 実際にモデルの精度を確認してみたいと思います。 モデルの確認 まずは実際に予測してみたいと思います。 notebook # 過去のデータと先90日間を含むデータフレームを定義 future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) # データセット全体に対して予測実行 forecast_pd = model.predict(future_pd) display(forecast_pd) 実際に予測を行い、その結果がforecast_pd内に格納されます。 上記では表形式で表示されていると思うので、グラフにして視覚化したいと思います。 notebook trends_fig = model.plot_components(forecast_pd) display(trends_fig) 1つ目のグラフは販売数の推移を表しています。16年8月以降上昇が鈍化しています。 2つ目のグラフは1週間の周期性を表しています。月曜に販売数が落ち、土日に向けて上昇していることがわかります。 3つ目のグラフは年単位の周期性を表しています。夏に向かい上昇し、冬に向かい減少するという大まかな傾向がわかります。 傾向については上記で確認できました。 続いて、実際の予測値がどのようになっているかを視覚化してみたいと思います。 わかりやすいように過去一年の予測結果のみ表示してみます。 notebook predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales') # 出力されるデータを過去1年と予測機関のみに絞る xlim = predict_fig.axes[0].get_xlim() new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0) predict_fig.axes[0].set_xlim(new_xlim) display(predict_fig) 黒い点が実測値、青い線が予測値になります。 また、薄い青の範囲が95%の信頼区間を表しています。 このグラフから良いモデルなのかを判断することは難しいので、よく使われるモデルの指標を算出してみたいと思います。 今回利用する指標として以下の3つを算出したいと思います。 平均二乗誤差 (MSE: Mean Squared Error) 各データにおいて実測値と予測値の誤差を二乗し、平均をとったもの 二乗平均平方根誤差 (RMSE: Root Mean Squared Error) MSE の平方根をとったもの。MAEに比べて大きな誤差を厳しく評価する特徴がある。 平均絶対誤差 (MAE: Mean Absolute Error) 誤差の絶対値を取り、平均をとったもの。 RMSE に比べて外れ値の影響を受けにくいと言われる。 それぞれ0に近いほど誤差が小さく、精度が高いことがわかります。 notebook import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from math import sqrt from datetime import date # 比較のために過去の実績と予測を取得 predicted_pd = forecast_pd[ pd.to_datetime(forecast_pd['ds']).dt.date < date(2018, 1, 1) ]['yhat'] actuals_pd = history_pd[ pd.to_datetime(history_pd['ds']).dt.date < date(2018, 1, 1) ]['y'].sample(len(predicted_pd)) # 制度指標の計算 mae = mean_absolute_error(actuals_pd, predicted_pd) mse = mean_squared_error(actuals_pd, predicted_pd) rmse = sqrt(mse) print('-----------------------------') print( '\n'.join(['MSE: {0}', 'RMSE: {1}', 'MAE: {2}']).format(mae, mse, rmse) ) 算出してみた結果が以下になります。 実際にはここからさらにチューニングしていき、モデルの精度を上げていくことになると思います。 ----------------------------- MSE: 1040.242690489999 RMSE: 32.25279353001844 MAE: 25.966749442943733 まとめ 以上でモデルの作成からモデルの確認までを行いました。 実際は最後に出した値やグラフなどを確認しながらより精度が上がるようにチューニングしていくことになります。 今回はモデルの作成、確認についての理解ということでこちらは省略します。 次回は、モデルの大規模データに対するアプローチやリモデルについてをまとめられたらいいなと思います。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ヤコビの二平方定理を使ってみました

二平方の和の個数 今回はヤコビの二平方定理 (Wiki)という面白い定理を見つけたので、これを使うプログラムを書いてみました。以下のような問題を考えてみます。 問題: $N=10^{12}$、$a^2+b^2=c^f (a,b,cは自然数,0<a<b, f\geq 3,\space c^f<N)$となる(a,b,c)の数を数えよ これを素直にインプリすると、以下のようになりますが、Python(Google Colab)では約45分かかります。 from time import time def count2sq(f,N): ret = 0 for c in itertools.count(2): c_f = c**f if c_f > N: break for b in itertools.count(2): b2 = b**2 if (1+b2) > c_f: break a2 = (c_f - b2) if a2 >= b2: continue a = int(a2**(1/2)+0.00001) if (a**2 == a2): ret += 1 return ret N, count = 10**12, 0 start = time() for f in itertools.count(3): if 2**f > N: break count += count2sq(f,N) print(f"**** count = {count}, {time() - start:.2f}sec ****") # **** count = 15992, 2697.05sec **** ヤコビの二平方定理 日本語版のヤコビの二平方定理 (Wiki)の説明だと分かりづらいので英語版のSum of squares functionの$k=2$の説明を引用します。 $a^2+b^2=n$を満たす整数のペア$(a,b)$の個数$r_2(n)$は $n$の素因数分解が以下のように表されるとすると、 $\space n=2^gp^{f_1}_1p^{f_2}_2\cdots q^{h_1}_1q^{h_2}_2\cdots$ ($p_i\equiv 1\space (mod 4)$、$q_i\equiv 3\space (mod 4)$ ) $r_2(n)$は以下の式で求められます $r_2(n)=4(f_1+1)(f_2+1)\cdots$ ($h_1,h_2,\cdots$がすべて偶数の時) $r_2(n)=0$ ($h_i$の一つでも奇数の時) 要は$n$の素因数を求めてmod 4が1になるものの数を数えればいいということになります。元の問題は$n=c^f$なので、$c$の素因数分解をして各々$f$倍すれば良いということでヤコビの二平方定理が効率よく利用できます。 ただしこの$r_2(n)$は$a,b$の各々の$\pm$と$a,b$の入れ替えを含んでいるので問題の数を求めるには$4\times 2=8$で割る必要があります。 Pythonでインプリメントすると以下のようになり、無事、同じ答えが0.10secで求まりました。 from sympy import factorint import itertools from time import time def sumE2f(N, f): sum = 0 for c in itertools.count(2): c_f = c**f if c_f > N: break cfct = factorint(c) rn, p3 = 4, 0 for p in cfct.keys(): cfp = cfct[p]*f if p%4==3 and cfp % 2 == 1: rn = 0 if p%4==1: rn *= (cfp+1) if rn > 4: sum += rn // 8 return sum start = time() np, sum2 = 12, 0 N = 10**np for f in itertools.count(3): if 2**f > N: break sm = sumE2f(N,f) if sm > 0: sum2 += sm print(f"**** count = {sum2}, {time() - start:.2f}sec ****") #**** count = 15992, 0.10sec **** このアルゴリズムはEuler Project: Problem 678等を解くのに役に立ちます。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【?初心者向け】競技プロ頻出Python基礎文法(全項目リンク付き)【随時更新】

競プロを戦う上で必須となるPythonの基礎文法を紹介します。 具体的な適用例を省略している場合があるため、その際はリンクをご参照ください。 必要だと感じた文法は随時更新していきます。 1.基礎文法 ・if – break文 ・タプルの基礎 要素の取り出しはリストと同じ。 ・辞書の基礎 list(辞書.keys()) #list()としないとdict_keys(リスト)で出力される。 ・文字列のスライス val = "いろはにほへと散りぬるを"、val[0] 'い'、val[3:7] 'にほへと' ・リストのスライス リスト[-4:]  # -4番目から末尾までの全要素を取り出す ・リストから要素を削除する方法 1.del リスト[インデックス] / del リスト[開始インデックス:終了インデックス] 2.リスト[開始インデックス:終了インデックス] = [] 3.リスト.pop() / リスト.pop(インデックス) 4.リスト.remove(値)  5.リスト.clear()  ・リストに要素を追加する方法 1.リスト.append([6, 7])  # 要素[6, 7]を末尾に追加 2.リスト.extend([4, 5])  # リストの要素を末尾に追加 3.リスト.extend(6)  # TypeError 4.リスト.extend((3, 4))  # タプルの要素がリスト末尾に追加される 5.リスト.insert(5, [3.25, 3.5]) # 第2引数に指定した値は単一の要素として挿入される。 print(リスト) # [0, 1, 2, 2.5, 3, [3.25, 3.5], 4] ・文字列や数値の連結 ','.join(リスト)  #リストの要素をカンマ区切りで出力。数値は文字列に変換 ・小数点以下を切り捨て/切り上げ 切り捨て: math.floor() 切り上げ: math.ceil() ・区切り出力sep() print(5, 4, sep=' ') #5 4 ・zip関数と辞書の内包表記 dict = {key:value for key, value in zip(keys, values)} ・特定要素の数 ['aa', 'bb', 'cc', 'aa'].count('aa') → 2 from collections import Counter Counter(['aa', 'bb', 'cc', 'aa']) → Counter({'aa': 2, 'bb': 1, 'cc': 1}) ・リストの要素を昇順または降順に並び替える sorted(リスト)は元のリストの順番は変わらない。 リスト.sort()は元のリストは参照できなくなる。 ・組み合わせ  順列 lis = [1,2,3,4] a = [pair for pair in itertools.combinations(lis, 2)] [(1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (3, 4)] ・逆順ループ for x in reversed(range(0, 5)): ・無名関数 lambda 関数の引数に関数を渡したい時に有用 リスト = [0, -1, 2, -3, 4, -5] list(map(lambda n : n**2 if n>0 else n**2*-1, リスト)) #[0, -1, 4, -9, 16, -25] もう一つのリンク ・filter関数 - 配列から関数の条件にあう要素を抽出 words = ['Python', 'CSS', 'HTML', 'JavaScript'] x = filter(lambda word:True if len(word) > 5 else False, words) print(list(x))  #['Python', 'JavaScript'] ・プログラム実行時間計測 timeモジュール timeitモジュール、jupyterなら%%timeit ・numpy行列  1.行列作成:np.array(リスト/タプル) 2.列の取り出し:行列[:, 0]  #1列目の取り出し 3.対角要素:np.diag(行列)  4.転置:行列.T numpy.transpose(行列) 5.配列の1次元化:行列.flatten() 6.二次元配列のかっこをはずして出力:for i in 二次元配列: print(*i) 7.条件を満たす要素数をカウント : np.count_nonzero(条件式) ・テクニック if~else → リスト.append(max()):実行時間はif~elseのほうが短いことに注意。 2.ご意見・ご感想をお待ちしております 当方、未熟なプログラマーのため、よりよいコード等ありましたら教えていただけると幸いです。 皆様のメッセージをお待ちしております。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Slack API のトークンローテーション完全ガイド

こんにちは、Slack で公式 SDK 開発と日本の DevRel を担当しております @seratch と申します。 こちらの記事では、今年 Slack がリリースしたトークンローテーションに関する詳細な説明を日本語でチュートリアル形式でやっていきたいと思います。 トークンローテーションとは Slack の OAuth アクセストークンは、長らく refresh token がなく、一度発行した access token は明に無効化(auth.revoke API やワークスペースのアプリ管理画面から revoke できます)しない限り、無期限で使える仕様でした。 しかし、2021 年に refresh token とともに access token を発行する機能が提供されるようになりました。 この記事ではこれを「トークンローテーション(Token Rotation)」と呼びます。なお、このトークンローテーションはデフォルトでは有効ではなく Slack アプリの管理画面から(または App Manifest の設定によって) opt-in する必要があります。この記事では、この辺の手順・注意点も改めて日本語で紹介していきます。 通常の Slack アプリのトークンローテーションを試してみる 通常の bot token / user token を持つアプリのトークンローテーションの利用法を説明していきます。 Slack アプリを作成〜設定する こちらの URL にアクセスして、新しいアプリ設定をつくります。以下の動画のように、開発用に使用するホームのワークスペースを選んだら、自動的に設定が読み込まれているはずです。まずはそのままアプリを作ってみてください。 上のリンクには App Manifest という YAML 形式の設定がクエリストリングとして仕込んでありました。その内容を以下に貼っておきます。手動で設定するときは、これをベースに設定してみてください。各項目の説明はこちらを参考にしてください。 _metadata: major_version: 1 minor_version: 1 display_information: name: token-rotation-test-app features: bot_user: display_name: token-rotation-test-app oauth_config: redirect_urls: - https://TOBEUPDATED.ngrok.io/slack/oauth_redirect scopes: user: - chat:write bot: - app_mentions:read - chat:write settings: event_subscriptions: request_url: https://TOBEUPDATED.ngrok.io/slack/events bot_events: - app_mention token_rotation_enabled: true 画面から設定する場合は Settings > OAuth & Permissions のページで Opt in します。英語で書かれている通り OAuth の Redirect URL の設定が必須となります(つまり、この管理画面からのインストールではなく OAuth フローでのみ refresh token は発行されます)。 これでアプリの設定はできましたので、実際に OAuth フローによる Slack ワークスペースへのインストールを実行してローテーション可能なアクセストークンを取得してみましょう。ここからは Python と Node.js でそれぞれサンプルアプリを動かしていきます。 なお、ここでの例ではソケットモードを使っていませんが、OAuth フロー以外のイベントを処理する部分などにはソケットモードを使用することも可能です。ソケットモードを使いたい場合は、以下のサンプルや記事を参考にしてみてください。 Slack ソケットモードの最も簡単な始め方 Python での OAuth とソケットーモードの併用コード例 JavaScript での OAuth とソケットモードの併用コード例 Python でアプリを実装する Python で Slack の OAuth フロー(アクセストークンを発行して Slack ワークスペースでアプリを有事公にする手順)と Slack から来たイベントに対してトークンローテーションをしながら応答するアプリを動かしてみましょう。 プロジェクトの新規作成 まず使用する Python のバージョンが 3.6 以上であるかを確認してください。 最近では、システム標準の python3 や pip3 などのコマンドも 3.6 以上のバージョンだとは思いますが、常に最新のバージョン(この記事投稿時点で 3.10 です)を使用するために pyenv などのツールを使って Python のランタイムを管理することをお勧めします。 その 3.6 以上の Python で、以下のような依存ライブラリを解決したまっさらな環境を作ります。 echo 'slack-bolt>=1.10,<2' > requirements.txt python3 -m venv .venv source .venv/bin/activate pip install -r requirements.txt Poetry を使うなら、以下のコマンドで同じことができます。 poetry init -n poetry shell poetry add slack-bolt 仮想環境を準備できたので、ここでアプリを実装して起動していきます。 環境変数を設定して、アプリケーションを起動 以下のような Python コードを app.py という名前で保存してください。 # より詳細なログを出力するためにログレベルを DEBUG に変更します import logging logging.basicConfig(level=logging.DEBUG) import os from slack_bolt import App from slack_bolt.oauth.oauth_settings import OAuthSettings from slack_sdk.oauth.installation_store import FileInstallationStore from slack_sdk.oauth.state_store import FileOAuthStateStore oauth_settings = OAuthSettings( # ここの二つの環境変数は必須で、正しく対象のアプリのものが設定されている必要があります client_id=os.environ["SLACK_CLIENT_ID"], client_secret=os.environ["SLACK_CLIENT_SECRET"], # これは管理画面における Bot Scopes です scopes=["app_mentions:read", "chat:write"], # これは管理画面における User Scopes です user_scopes=["chat:write"], # Slack ワークスペースへのインストール情報を管理する実装、ここではローカルファイルに保存します installation_store=FileInstallationStore(base_dir="./data/installations"), # OAuth フローの state パラメーターの永続化の実装、ローカルファイルに保存します state_store=FileOAuthStateStore(expiration_seconds=600, base_dir="./data/states"), # 以下の設定は後ほど説明します # token_rotation_expiration_minutes=60 * 24, ) app = App( # これは OAuth フローでは使いません、Slack からのイベントリクエストの検証に使います signing_secret=os.environ["SLACK_SIGNING_SECRET"], # OAuth 関連の設定をここで渡します oauth_settings=oauth_settings ) # これはこのアプリの bot user をメンションしたときのイベントに対して応答するリスナーです @app.event("app_mention") def handle_app_mention_events(event, client, context, say): if context.user_token is not None: # インストールしたユーザー自身からのメンションだったとき client.chat_postMessage( token=context.user_token, channel=context.channel_id, text=f"<@{event['user']}> のユーザートークンをお預かりしているので、こんなことができます :nerd_face:", ) else: say(f"<@{event['user']}> こんにちは!") if __name__ == "__main__": # アプリを http://localhost:3000/ で起動します app.start(port=int(os.environ.get("PORT", 3000))) # このアプリは 3 つの URL をサーブします # - http://localhost:3000/slack/install # - http://localhost:3000/slack/oauth_redirect # - http://localhost:3000/slack/events まずは、そのままの状態で動作させてみましょう。この状態で以下の環境変数を設定します。 Settings > Basic Information のページに App Credentials というセクションがありますので、 そこから Client ID、Client Secret、Signing Secret の値を、それぞれ環境変数 SLACK_CLIENT_ID、SLACK_CLIENT_SECRET、SLACK_SIGNING_SECRET に設定します。 # OAuth フローのために必要 export SLACK_CLIENT_ID=1234567890.1234567890123 export SLACK_CLIENT_SECRET=XXXXXXXXXXXXXXXXXXXXXXXXX # Slack からのリクエストか検証するために必要 export SLACK_SIGNING_SECRET=XXXXXXXXXXXXXXXXXXXXXXXXX そして、アプリを以下のコマンドで起動してみてください。 python app.py 以下のようなログがコンソールに出力されていれば、とりあえずは OK です! INFO:slack_bolt.App:⚡️ Bolt app is running! (development server) ngrok で公開された URL を用意して設定に反映 アプリ起動の最後のステップとして公開された URL を準備するために ngrok というツールを使います。使ったことがない方は https://ngrok.com/ でダウンロードして設定してみてください。 ngrok を使えるようになったら、以下のコマンドを実行してください。これによってインターネットに公開された URL に来たリクエストを http://localhost:3000/ で起動している上の Python アプリにフォーワードすることができるようになります。 ngrok http 3000 ngrok の有料プランのアカウントをお持ちの場合は ngrok http 3000 --subdomain my-token-rotation-app のように固定されたサブドメインを使うことが可能ですが、無料利用の場合は毎回ランダムに決まります。いずれにせよ、ngrok を起動して表示された https://{あなたのサブドメイン}.ngrok.io のドメイン名で App Manifest に埋め込まれている TOBEUPDATED.ngrok.io を差し替えてください。 YAML を直接編集するモードの場合は、以下のように Edit Manifest ボタンから編集モードに移動して oauth_config.redirect_urls[0] と settings.event_subscriptions.request_url の二箇所を編集します。 settings.event_subscriptions.request_url の方は以下のようなワーニングメッセージが出てくるかと思います。 python app.py でアプリが起動していて、かつ ngrok も手元で動いている前提で Click here to verify というリンクを押すと、検証リクエストが送信されます。疎通ができたら設定完了です。 なお、この YAML 設定のエディターのやり方はちょっと慣れない・やりづらいという場合、左の方に「Revert to the old design」というリンクがありますので、ここから旧来の画面に戻すこともできます。 旧画面から設定する場合は、それぞれ Features > OAuth & Permissions > Redirect URLs と Features > Event Subscriptions > Request URL を適切に設定してください。その際、URL の末尾が OAuth の Redirect URL は /slack/oauth_redirect で Event Subscriptions の Request URL は /slack/events であることに注意してください。 Slack アプリのインストールを実行 ここまでできたら、ブラウザーを開いて https://{あなたのサブドメイン}.ngrok.io/slack/install にアクセスしてみてください。 以下のようなシンプルなボタンだけの画面が表示されれば OK です。 「Add to Slack」ボタンからインストールを実行していきます。 「Allow」ボタンをクリックして、以下のような画面に遷移すれば成功です。 これでブラウザを使ったインストール作業は完了です。 では、実際インストールによってどのようなトークンを得られたかを見てみましょう。このサンプルアプリでは、インストール結果をローカルファイルに保存するようになっていますので、以下のようなファイルがつくられているはずです。 $ tree data data ├── installations │   └── none-T12345678 │   ├── bot-1637909589.684739 │   ├── bot-latest │   ├── installer-1637909589.684739 │   ├── installer-U12345678-1637909589.684739 │   ├── installer-U12345678-latest │   └── installer-latest └── states これらのファイルについて軽く説明しておきます。none-T12345678 はワークスペースの階層(none のところは Enterprise Grid の場合に enterprise_id が入ります)で、その下のファイルは installer は一度のインストールに含まれる bot やそのユーザー固有の設定、Incoming Webhooks など全てが含まれるファイルで bot は bot に関するものに絞ったものです。履歴の保持がデフォルトでオンになっているので -latest とタイムスタンプごとのファイルがそれぞれ同じ内容で保存しされています。もう一度インストールすると -latest の方は上書きとなります。 インストール情報の中身を見てみてましょう。 cat data/installations/none-T12345678/installer-latest | jq のようにして表示してみてください。ここでの説明に関係ない null の値は削っていますが、以下のようなものが表示されるはずです。 { "app_id": "A1234567890", "team_id": "T1234567890", "team_name": "Acme Corp", "bot_token": "xoxe.xoxb-1-xxx", "bot_id": "B1234567890", "bot_user_id": "U1234567890", "bot_scopes": [ "app_mentions:read", "chat:write" ], "bot_refresh_token": "xoxe-1-xxx", "bot_token_expires_at": 1637952233, "user_id": "U2222222222", "user_token": "xoxe.xoxp-1-xxx", "user_scopes": [ "chat:write" ], "user_refresh_token": "xoxe-1-xxx", "user_token_expires_at": 1637952232, "token_type": "bot", "installed_at": 1637909589.684739 } 以下のテーブルは、トークンローテーションにおいて知っておくべき項目について簡単に解説しています。 項目 説明 bot_token bot のアクセストークンです。一定時間が経過すると(デフォルトでは 12 時間)期限切れになります bot_refresh_token bot のアクセストークンを更新するための refresh token です bot_token_expires_at token が発行されたときの Unix time に oauth.v2.access API から返された expires_in (秒) を加算したものを保持しています user_token アプリのインストールを実行したユーザー個人のアクセストークンです。一定時間が経過すると(デフォルトでは 12 時間)期限切れになります user_refresh_token ユーザー個人のアクセストークンを更新するための refresh token です user_token_expires_at token が発行されたときの Unix time に oauth.v2.access API から返された expires_in (秒) を加算したものを保持しています なお、コマンドラインで現在の Unix time (秒)を手軽に知りたい場合、以下のコマンドで値を取得することができます。 $ python3 -c 'import time; print(int(time.time()))' 1637916854 トークンローテーションの様子を確認 それでは、このアプリをインストールした Slack ワークスペースの画面を開いてください。テスト用のチャンネルにこのアプリの bot user を招待してください。@token-rotation-test-app をメンションしたら、「招待しますか?」と聞かれますので、そのまま招待してあげてください。 招待したら、そのボットユーザーを再度メンションしてみてください。すると、以下のように自分自身から返信が来るはずです。これは先程発行したトークンのうち、ユーザートークン側を使っているためです。 コードを以下のようにシンプルなものにして起動し直してみてください。 # これはこのアプリの bot user をメンションしたときのイベントに対して応答するリスナーです @app.event("app_mention") def handle_app_mention_events(event, say): say(f"<@{event['user']}> こんにちは!") 今度は bot からの返事に変わります。 ともあれ、アプリは正常に動いているようです。ここまでのアプリのコンソールログを見てみてください。トークンはローテーションされているのでしょうか? いえ、デフォルトでは、毎回ローテーションはしない挙動になっています。上の各項目を説明するテーブルでも書いた通り、発行されたトークンは 12 時間程度有効なので、それよりは少し短い時間の間、リフレッシュせずにそのまま使う挙動になっています。これはアプリの実行パフォーマンスにオーバーヘッドを与えないための配慮です。 しかし、(ローテーションの挙動を確認したいときなどのために)この設定をカスタマイズできるようになっています。最初に貼ったコードの以下の部分のコメントアウトを外して起動し直してみてください。 # 以下の設定は後ほど説明します token_rotation_expiration_minutes=60 * 24, この設定にするとアクセストークンの期限切れ 24 時間前になったら、リフレッシュするようになります。つまり、常にトークンをリフレッシュするようになるということです。 この状態でまた app_mention のイベントを送信すると、先ほどのログとは違う点として、以下の二つの API コールが追加されていることに気づくはずです。 これらがやっていることは、このワークスペースの bot token とこのアクセスユーザーに紐づく user token 両方を oauth.v2.access API を使って refresh しています。その結果は Bolt の内部実装によって自動的に InstallationStore が管理するデータに反映されます。 DEBUG:slack_sdk.web.base_client:Sending a request - url: https://www.slack.com/api/oauth.v2.access, query_params: {}, body_params: {'grant_type': 'refresh_token', 'refresh_token': 'xoxe-1-xxx'}, files: {}, json_body: None, headers: {'Authorization': '(redacted)'} DEBUG:slack_sdk.web.base_client:Received the following response - status: 200, headers: {}, body: {"ok":true,"access_token":"xoxe.xoxb-1-xxx","expires_in":43200,"refresh_token":"xoxe-1-xxx","token_type":"bot","app_id":"A1234567890","scope":"app_mentions:read,chat:write","bot_user_id":"U1234567890","team":{"id":"T1234567890","name":"Acme Corp"},"enterprise":null,"is_enterprise_install":false} DEBUG:slack_sdk.web.base_client:Sending a request - url: https://www.slack.com/api/oauth.v2.access, query_params: {}, body_params: {'grant_type': 'refresh_token', 'refresh_token': 'xoxe-1-xxx'}, files: {}, json_body: None, headers: {'Authorization': '(redacted)'} DEBUG:slack_sdk.web.base_client:Received the following response - status: 200, headers: {}, body: {"ok":true,"access_token":"xoxe.xoxp-1-xxx","expires_in":43200,"refresh_token":"xoxe-1-xxx","token_type":"user","app_id":"A1234567890","scope":"identify,chat:write","user_id":"U2222222222","team":{"id":"T1234567890","name":"Acme Corp"},"enterprise":null,"is_enterprise_install":false} 先程の data/installations が管理するデータを見ると refresh される度に -latest が更新され、また履歴データが増えていくことがわかります(もしこの挙動を変えたい場合は、履歴を全て取る実装をオフにする設定にしてください)。 実装に興味がある方は以下のコードを見てみてください。 この Bolt のコードで self.token_rotator がやっていること TokenRotator とそのテストコード 今回の例では、一度にデモするために bot token と user token を両方使っていますが、もちろん bot token だけ、user token だけを利用する場合にも TokenRotator はそのまま使用することができます。 また、実際に運用するアプリであればデータベースやより安全な場所にトークンを保持することになるかと思います。Python SDK では、組み込みでファイルに加えて SQLAlchemy、Amazon S3、SQLite3 に対応しています。これらのモジュールのソースコードはこちらを参照してください。Django のアプリケーションでトークンローテーションしたい場合は、こちらの Bolt for Python を使ったサンプルを参考にしてみてください。 Node.js でアプリを実装する 繰り返しを避けるために、上記の Python との差分のところのみ紹介しておきます。 Node.js の場合、プロジェクト自体は npm init -y npm install @slack/bolt でつくります。Node.js の利用可能バージョンは、こちらのバージョン指定で確認してください。 そして index.js として以下のコードを保存します。 const { LogLevel } = require("@slack/logger"); const { App, FileInstallationStore } = require("@slack/bolt"); const app = new App({ // Basic Information のページから設定してください clientId: process.env.SLACK_CLIENT_ID, // Basic Information のページから設定してください clientSecret: process.env.SLACK_CLIENT_SECRET, // これは管理画面における Bot Scopes です scopes: ['commands', 'chat:write'], // これは管理画面における User Scopes です userScopes: ['chat:write'], // Slack ワークスペースへのインストール情報を管理する実装、ここではローカルファイルに保存します installationStore: new FileInstallationStore({ baseDir: './data/installations', clientId: process.env.SLACK_CLIENT_ID, }), // これは OAuth フローの state パラメーターの値を生成する際に使われます stateSecret: 'my-state-secret', // これは OAuth フローでは使いません、Slack からのイベントリクエストの検証に使います signingSecret: process.env.SLACK_SIGNING_SECRET, // ログレベルを変更しています、指定しない場合はより詳細なログを出力するために DEBUG logLevel: process.env.SLACK_LOG_LEVEL || LogLevel.DEBUG, }); // これはこのアプリの bot user をメンションしたときのイベントに対して応答するリスナーです app.event("app_mention", async ({ logger, event, say }) => { logger.debug("app_mention event payload:\n\n" + JSON.stringify(event, null, 2) + "\n"); }); (async () => { // アプリを http://localhost:3000/ で起動します await app.start(process.env.PORT || 3000); console.log("⚡️ Bolt app is running!"); })(); // このアプリは 3 つの URL をサーブします // - http://localhost:3000/slack/install // - http://localhost:3000/slack/oauth_redirect // - http://localhost:3000/slack/events 上の Python と同様に環境変数を設定した上で # OAuth フローのために必要 export SLACK_CLIENT_ID=1234567890.1234567890123 export SLACK_CLIENT_SECRET=XXXXXXXXXXXXXXXXXXXXXXXXX # Slack からのリクエストか検証するために必要 export SLACK_SIGNING_SECRET=XXXXXXXXXXXXXXXXXXXXXXXXX 以下のコマンドでアプリを起動したら npx node index.js Python と同様に ngrok で公開 URL からリクエストがフォーワードされるようにします。インストールが完了すると、若干命名規則は異なるものの、同様にインストール情報が保存されます。 $ tree data data └── installations └── 3485157640.2764408644502 └── T1234567890 ├── app-1637925605512 ├── app-latest ├── user-U1234567890-1637925605512 └── user-U1234567890-latest 3 directories, 4 files Node の SDK での rotation の実装は、こちらのコードを参考にしてみてください。 Sign in with Slack のトークンローテーションを試してみる OpenID Connect 互換の Sign in with Slack のフローで発行されたトークンもトークンローテーションに対応しています。こちらについてもどのように処理すべきか、簡単に紹介します。 以下のような App Manifest で Slack アプリの設定を作ってください。 _metadata: major_version: 1 minor_version: 1 display_information: name: token-rotation-siws-test-app features: bot_user: display_name: token-rotation-siws-test-app oauth_config: redirect_urls: - https://TOBEUPDATED.ngrok.io/slack/oauth_redirect scopes: user: - openid - email - profile bot: - commands settings: token_rotation_enabled: true 以下は、Sing in with Slack をハンドリングする Flask の Web アプリケーション例です。必要な依存ライブラリは以下の 2 つです。 pip install flask slack-sdk 大きな違いとして、リフレッシュするための API が openid.connect.token であること、auth.test などのメソッドは使えず openid.connect.userInfo だけを使える点が異なります。ですが、基本的には同じようなフローでトークンを管理するだけで OK です。 なお、TokenRotator は Sign in with Slack (OpenID Connect) には対応していないので、以下のコード例のように直接 API を呼び出すコードを必要に応じて書いてください。 # より詳細なログを出力するためにログレベルを DEBUG に変更します import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) import json import os # 必要な設定をあらかじめ環境変数に設定しておきます client_id = os.environ["SLACK_CLIENT_ID"] client_secret = os.environ["SLACK_CLIENT_SECRET"] redirect_uri = os.environ["SLACK_REDIRECT_URI"] # 最低限必要なのは "openid" です scopes = ["openid", "email", "profile"] from slack_sdk.web import WebClient from slack_sdk.oauth import OpenIDConnectAuthorizeUrlGenerator, RedirectUriPageRenderer from slack_sdk.oauth.state_store import FileOAuthStateStore state_store = FileOAuthStateStore(expiration_seconds=300) # https://slack.com/openid/connect/authorize?... な URL を生成します authorization_url_generator = OpenIDConnectAuthorizeUrlGenerator( client_id=client_id, scopes=scopes, redirect_uri=redirect_uri, ) # Flask のアプリケーションとして実装していますが、他のフレームワークでももちろん OK です from flask import Flask, request, make_response app = Flask(__name__) app.debug = True # Sign in with Slack の画面を表示します @app.route("/slack/install", methods=["GET"]) def oauth_start(): state = state_store.issue() url = authorization_url_generator.generate(state=state) return ( '<html><head><link rel="icon" href="data:,"></head><body>' f'<a href="{url}" style="align-items:center;color:#000;background-color:#fff;border:1px solid #ddd;border-radius:4px;display:inline-flex;font-family:Lato, sans-serif;font-size:16px;font-weight:600;height:48px;justify-content:center;text-decoration:none;width:256px"><svg xmlns="http://www.w3.org/2000/svg" style="height:20px;width:20px;margin-right:12px" viewBox="0 0 122.8 122.8"><path d="M25.8 77.6c0 7.1-5.8 12.9-12.9 12.9S0 84.7 0 77.6s5.8-12.9 12.9-12.9h12.9v12.9zm6.5 0c0-7.1 5.8-12.9 12.9-12.9s12.9 5.8 12.9 12.9v32.3c0 7.1-5.8 12.9-12.9 12.9s-12.9-5.8-12.9-12.9V77.6z" fill="#e01e5a"></path><path d="M45.2 25.8c-7.1 0-12.9-5.8-12.9-12.9S38.1 0 45.2 0s12.9 5.8 12.9 12.9v12.9H45.2zm0 6.5c7.1 0 12.9 5.8 12.9 12.9s-5.8 12.9-12.9 12.9H12.9C5.8 58.1 0 52.3 0 45.2s5.8-12.9 12.9-12.9h32.3z" fill="#36c5f0"></path><path d="M97 45.2c0-7.1 5.8-12.9 12.9-12.9s12.9 5.8 12.9 12.9-5.8 12.9-12.9 12.9H97V45.2zm-6.5 0c0 7.1-5.8 12.9-12.9 12.9s-12.9-5.8-12.9-12.9V12.9C64.7 5.8 70.5 0 77.6 0s12.9 5.8 12.9 12.9v32.3z" fill="#2eb67d"></path><path d="M77.6 97c7.1 0 12.9 5.8 12.9 12.9s-5.8 12.9-12.9 12.9-12.9-5.8-12.9-12.9V97h12.9zm0-6.5c-7.1 0-12.9-5.8-12.9-12.9s5.8-12.9 12.9-12.9h32.3c7.1 0 12.9 5.8 12.9 12.9s-5.8 12.9-12.9 12.9H77.6z" fill="#ecb22e"></path></svg>Sign in with Slack</a>' "</body></html>" ) # デフォルトのエラー画面を表示するために使います redirect_page_renderer = RedirectUriPageRenderer( install_path="/slack/install", redirect_uri_path="/slack/oauth_redirect", ) # Slack の確認画面から遷移してきたときの URL です @app.route("/slack/oauth_redirect", methods=["GET"]) def oauth_callback(): # クエリストリングから code, state パラメーターを取得してチェックします if "code" in request.args: state = request.args["state"] if state_store.consume(state): code = request.args["code"] try: client = WebClient() # code パラメーターを使って access_token / refresh_token を取得します token_response = client.openid_connect_token( client_id=client_id, client_secret=client_secret, code=code, ) # refresh token を使ってリフレッシュを実施します refreshed_token_response = client.openid_connect_token( client_id=client_id, client_secret=client_secret, token=token_response.get("access_token"), refresh_token=token_response.get("refresh_token"), grant_type="refresh_token", ) # リフレッシュされた token が問題ないか openid.connect.userInfo API を呼び出してテスト refreshed_user_token = refreshed_token_response.get("access_token") user_info_response = client.openid_connect_userInfo(token=refreshed_user_token) logger.info(f"openid.connect.userInfo response: {user_info_response}") # 実際にはここで access_token / refresh_token を何らかの形で保存することになります # あくまでデモとして結果を Web ページに表示しています return f""" <html> <head> <style> body h2 {{ padding: 10px 15px; font-family: verdana; text-align: center; }} </style> </head> <body> <h2>openid.connect.userInfo response</h2> <pre>{json.dumps(user_info_response.data, indent=2)}</pre> </body> </html> """ except Exception: logger.exception("Failed to perform openid.connect.token API call") return redirect_page_renderer.render_failure_page( "Failed to perform openid.connect.token API call" ) else: return redirect_page_renderer.render_failure_page( "The state value is already expired" ) error = request.args["error"] if "error" in request.args else "" return make_response( f"Something is wrong with the installation (error: {error})", 400 ) if __name__ == "__main__": # export SLACK_CLIENT_ID=111.222 # export SLACK_CLIENT_SECRET=xxx # export FLASK_ENV=development # export SLACK_REDIRECT_URI=https://{your-domain}/slack/oauth_redirect # python3 app.py app.run("localhost", 3000) # このアプリは 2 つの URL をサーブします # - http://localhost:3000/slack/install # - http://localhost:3000/slack/oauth_redirect Node.js の実装についてはこちらのコードなどを同様に参考にしてみてください。 その他の FAQ Java のサンプルもありますか? はい、あります!以下のサンプルコードを確認してみてください。 通常のアプリのトークンローテーション Sign in with Slack のトークンローテーション Bolt を使っていないアプリやスクリプトではどうすればよいですか? なお、Bolt を使っていないアプリやスクリプト内での利用の場合は、この TokenRotator だけをアプリケーション内で利用することも可能です。 その場合は何らかの InstallationStore の実装で find_bot() / find_installation() でトークンを取得し、それを使う前に TokenRotator を呼び出して InstallationStore#save() でリフレッシュしたものを保存しておくという流れになります。 既存のアプリを切り替えるにはどうすればよいですか? 英語の方のドキュメントに詳細がありますが、refresh token とペアになっていない access token を oauth.v2.exchange という API を使って切り替えるという流れになります。この API に client_id、client_secret、token をパラメーターとして渡すと access token + refresh token がレスポンスで返ってきます。 上の Python のコード例で言うと InstallationStore から存在する access token を全部持ってきて、一つずつ切り替えて、それを保存するということになります。 トークンローテーションに opt-in したらすぐに一気にやってしまえるよう、あらかじめスクリプトを用意しておくとよいでしょう。 最後に トークンローテーションは Slack の OAuth フローを実装しないと試せない機能なので、実際に検証したことがない方も多かったのではないかと思います。 こちらの記事の手順通りにやれば簡単に試せるかと思いますので、ぜひやってみてください
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

SQLModel×FastAPIのチュートリアルのメモ③ SQLModelを使ったFastAPIのレスポンスモデル

今回はこのセクションの和訳です。 機械翻訳をガッツリ使って、一部意訳しているので間違いがあるかもしれません。 SQLModelを使ったFastAPIのレスポンスモデル それでは、FastAPIのresponse_modelをSQLModelで使用する方法をご紹介します。 インタラクティブなAPIドキュメント¶。 これまで使用してきたコードでは、APIドキュメントはクライアントが送信しなければならないデータを知っています。 インタラクティブな API ドキュメント UI Swagger UIは、標準のOpenAPIを使用してすべてのデータスキーマ(データ形状)を持つAPIを定義した大きなJSONコンテンツを読み込み、それを美しいUIで表示します。 FastAPIは、Swagger UIがそれを読むためのOpenAPIを自動的に生成します。 そして、APIが扱うデータのスキーマを知るためにPydanticモデル(ここではSQLModelモデル)とタイプアノテーションを使用して、あなたが書いたコードに基づいて生成されます。 レスポンスデータ¶について しかし、これまでのところ、APIドキュメントのUIは、アプリが送り返すレスポンスのスキーマを知りません。 コード200の「Successful Response」の可能性があることはわかりますが、レスポンスデータがどのようになっているのかはわかりません。 レスポンスデータのスキーマがないAPIドキュメントのUI 今、私たちはFastAPIに受信したいデータを伝えるだけで、送り返したいデータはまだ伝えていません。 今、それをやってみましょう。? response_modelの使用 response_modelを使って、送り返したいデータのスキーマをFastAPIに伝えることができます。 例えば、同じHero SQLModelクラスを渡すことができます(Pydanticモデルでもあるため)。 # Code above omitted ? @app.post("/heroes/", response_model=Hero) def create_hero(hero: Hero): with Session(engine) as session: session.add(hero) session.commit() session.refresh(hero) return hero # Code below omitted ? response_model¶のヒーローのリスト Pydanticのフィールドと同じように、他のタイプアノテーションを使用することもできます。例えば、Herosのリストを渡すことができます。 まず、typingからListをインポートし、List[Hero]でresponse_modelを宣言します。 from typing import List, Optional # Code here omitted ? @app.get("/heroes/", response_model=List[Hero]) def create_hero(hero: Hero): with Session(engine) as session: session.add(hero) session.commit() session.refresh(hero) return hero # Code below omitted ? FastAPIとレスポンスモデル¶について FastAPIは、このresponse_modelを使って、レスポンスのデータ検証とフィルタリングを行います。つまり、これは我々のアプリケーションとクライアントの間の契約のように機能します。 これについては、FastAPIのドキュメントであるresponse_modelを参照してください。 新しいAPIドキュメントのUI ここでdocs UIに戻ると、受信するレスポンスのスキーマが表示されていることがわかります。 レスポンスデータのスキーマがないAPI Docs UI クライアントは期待すべきデータを知ることができます。 自動クライアント¶。 response_modelを使うことの最も目に見える利点は、API docs UIに表示されることです。 しかし、FastAPIがこのモデルを使用して応答データの自動データ検証とフィルタリングを行うなど、他の利点もあります。 さらに、スキーマは標準を使用して定義されているため、これを利用できるツールがたくさんあります。例えば、クライアントジェネレータは、多くの言語でAPIと対話するために必要なコードを自動的に作成することができます。 infomation 標準について興味がある場合、FastAPIは内部的にJSON Schemaを使用するOpenAPIを生成します。 これについては、FastAPIドキュメント - ファーストステップですべて読むことができます。 Recap response_modelを使用して、送り返したいデータのスキーマをFastAPIに伝え、素晴らしいデータAPIを手に入れましょう。?
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

SQLModel × FastAPIのチュートリアルのメモ②

今回はFastAPIとSQLModelを用いて簡単なヒーローのデータベースとAPIを作成するチュートリアルの和訳です。 引用元は上記。 機械翻訳をガッツリ使って、一部意訳しているので間違いがあるかもしれません。 Simple Hero API with FastAPI¶ まずは、FastAPIを使って、シンプルなヒーローWeb APIを作ってみましょう。✨ Install FastAPI¶ まず最初にFastAPIをインストールします。 FastAPIはweb APIを作るためのフレームワークです。 しかし、WebAPIの実行にはサーバーと呼ばれる別のアプリも必要です。FastAPIではUvicorn を使います。Uvicornを標準的な依存関係とともにインストールしていきます。 仮想環境が有効になっていることを確認してください。 確認できたら、FastAPIとUvicornをインストールします。 $ python -m pip install fastapi "uvicorn[standard]" SQLModel Code - Models, Engine¶ それでは、SQLModelのコードを見ていきましょう。 ここでは、ヒーローだけの 最もシンプルなバージョン から始めます(まだチームはありません)。 他のSQLModelのチュートリアルで見てきたものとほとんど同じコードです。 from typing import Optional # One line of FastAPI imports here later ? from sqlmodel import Field, Session, SQLModel, create_engine, select class Hero(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str secret_name: str age: Optional[int] = None sqlite_file_name = "database.db" sqlite_url = f"sqlite:///{sqlite_file_name}" connect_args = {"check_same_thread": False} engine = create_engine(sqlite_url, echo=True, connect_args=connect_args) def create_db_and_tables(): SQLModel.metadata.create_all(engine) # Code below omitted ? ここで以前使っていたコードからの変更点は1つだけで、connect_argsのcheck_same_threadです。これは、SQLAlchemyがデータベースとの通信を担当する低レベルライブラリに渡す設定です。check_same_thread はデフォルトでは True に設定されていますが、これは単純なケースで誤用することを防ぐためです。しかし、ここでは複数のリクエストで同じsessionを共有しないようにします。これが、この設定が必要とする問題を防ぐための、実際の最も安全な方法です。 また、FastAPIでは各リクエストが複数のインタラクティブなスレッドで処理される可能性があるため、これを無効にする必要があります。 情報 詳細はFastAPI docs for async and awaitを参照してください。 重要なのは、複数のリクエストで同じ セッション を共有しないようにすることで、安全にコードを扱えます。 FastAPI App¶ 次のステップでは、FastAPIアプリを作成します。 まず、fastapiからFastAPIクラスをインポートします。 そして、そのFastAPIクラスのインスタンスであるappオブジェクトを作成します。 from typing import Optional from fastapi import FastAPI from sqlmodel import Field, Session, SQLModel, create_engine, select # SQLModel code here omitted ? app = FastAPI() # Code below omitted ? Create Database and Tables on startup¶ アプリの実行が始まったら、関数create_tablesが呼ばれるようにしたいと思います。データベースとテーブルを作成するためです。 これは起動時に一度だけ呼ばれるべきもので、毎回のリクエストの前に呼ばれるものではありませんので、startupイベントを処理する関数の中に入れています。 # 上のコードは省略しています?。 app = FastAPI() @app.on_event("startup") def on_startup(): create_db_and_tables() # 以下のコードは省略? Create Heroes Path Operation¶ infomation Path Operationとは何か(特定のHTTP Operationを持つエンドポイント)、そしてFastAPIでどのように動作するかについて再確認する必要がある場合は、FastAPI First Steps docsをご覧ください。 新しいヒーローを作成するための Path Operation コードを作成してみましょう。 これは、ユーザーが /heroes/ path に POST operation でリクエストを送ったときに呼び出されます。 # Code above omitted ? app = FastAPI() @app.on_event("startup") def on_startup(): create_db_and_tables() @app.post("/heroes/") def create_hero(hero: Hero): with Session(engine) as session: session.add(hero) session.commit() session.refresh(hero) return hero # Code below omitted ? 情報 これらのコンセプトについて再確認する必要がある場合は、FastAPIのドキュメントをチェックしてください。 ファーストステップ パスパラメータ - データ検証とデータ変換](https://fastapi.tiangolo.com/tutorial/path-params/) リクエストボディ The SQLModel Advantage¶ ここで、SQLModelクラスのモデルが、SQLAlchemyモデルとPydanticモデルの両方を同時に持っていることが光ります。✨ ここでは、同じクラスモデルを使って、APIが受け取るリクエストボディ**を定義しています。 FastAPI**はPydanticをベースにしているので、同じモデル(Pydanticの部分)を使って自動的にデータを検証し、JSONリクエストから実際のHeroクラスのインスタンスであるオブジェクトに変換します。 そして、この同じSQLModelオブジェクトはPydanticモデルのインスタンスであるだけでなく、SQLAlchemyモデルのインスタンスでもあるので、データベースに行を作成するためにsessionで直接使用することができます。 直感的にPythonの標準的なtypeアノテーションを使用することができ、データベースモデルとAPIデータモデルのための多くのコードを重複させる必要はありません。? ヒント このコードは後でさらに改良する予定ですが、今のところ、SQLModelクラスがSQLAlchemyモデルとPydanticモデルの両方を同時に持つことの威力をすでに示しています。 heroを読む Path Operation¶ では、すべてのヒーローを読むために、別のパス操作を追加してみましょう。 # Code above omitted ? app = FastAPI() @app.on_event("startup") def on_startup(): create_db_and_tables() @app.post("/heroes/") def create_hero(hero: Hero): with Session(engine) as session: session.add(hero) session.commit() session.refresh(hero) return hero @app.get("/heroes/") def read_heroes(): with Session(engine) as session: heroes = session.exec(select(Hero)).all() return heroes クライアントが パス /heroes/ に GET HTTP 操作 でリクエストを送ると、データベースからヒーローを取得して返すこの関数を実行します。 One Session per Request¶ 操作のグループごとにSQLModelのsessionを使用し、他の関係のない操作が必要な場合は別のセッションを使用するべきだということを覚えていますか? ここではそれがより明確になります。 通常、ほとんどの場合、リクエストごとに1つのセッションを使用するべきです。 いくつかの孤立したケースでは、内部に新しいセッションを持ちたいと思うかもしれません、つまり、1つのリクエストに対して1つ以上のセッションを持つことになります。 しかし、異なるリクエスト間で同じセッションを共有することは ありません 。 この単純な例では、パス操作関数で新しいセッションを手動で作成するだけです。 今後の例では、FastAPI Dependencyを使ってsessionを取得し、他の依存関係と共有したり、テスト中に置き換えることができるようにします。? Run the FastAPI Application¶ これでFastAPIアプリケーションを実行する準備が整いました。 すべてのコードを main.py というファイルに入れてください。 そして、それを Uvicorn で実行します。 uvicorn main:app INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) INFO: Started reloader process [28720] INFO: Started server process [28722] INFO: Waiting for application startup. INFO: Application startup complete. infomation コマンド uvicorn main:app は以下を参照しています。 - - main: ファイル main.py (Python の「モジュール」). - - app: main.py の中で app = FastAPI() という行で作成されたオブジェクトです。 Uvicorn --reload¶ 開発中に(開発中に限り)、Uvicorn に --reload というオプションを追加することもできます。これはコードに変更を加えるたびにサーバーを再起動するもので、こうすることでより早く開発を進めることができるようになります。? uvicorn main:app --reload INFO: Will watch for changes in these directories: ['/home/user/code/sqlmodel-tutorial'] INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) INFO: Started reloader process [28720] INFO: Started server process [28722] INFO: Waiting for application startup. INFO: Application startup complete. 必要以上にリソースを消費したり、エラーが発生しやすくなるため、運用時には絶対に--reloadを使用しないようにしてください。 Check the API docs UI¶ これで、ブラウザでそのURLに行くことができます http://127.0.0.1:8000。ルートパス / に対して path operation を作成していないので、その URL だけでは "Not Found" エラーが表示されるだけです... その "Not Found" エラーは、FastAPI アプリケーションによって生成されます。 しかし、パス /docs にある 自動的に生成されたインタラクティブ API ドキュメント に行くことができます。http://127.0.0.1:8000/docs. ✨ この 自動 API ドキュメント UI は、上で定義した path をその operations と共に持っており、path 操作 が受け取るデータの形状を既に知っていることがわかります。 Play with the API¶ 実際にボタンをクリックすることができます 試しに、Create Hero path operationでいくつかのヒーローを作成するリクエストを送ってみましょう。 そして、Read Heroes path operationでそれらを返してもらうことができます。 Check the Database¶ これで、ターミナルに戻ってCtrl+Cを押すことで、そのUvicornサーバーを終了させることができます。 そして、DB Browser for SQLiteを開いてデータベースをチェックし、データを調べて、確かにヒーローを保存したことを確認してください。? Recap¶ これはもう、ヒーローのデータベースと対話するためのFastAPI web API アプリケーションです。? 改良や拡張できる点がいくつかあります。例えば、私たちはデータベースに各新しいヒーローのIDを決定してもらいたいのですが、ユーザーがそれを送信することを許可したくありません。 これらの改善はすべて次の章で行います。?
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

SQLModel×FastAPIのチュートリアルメモ① 

はじめに FastAPIの作者がSQLmodelというSQLAlchemyをベースとしたORマッパーを作成したらしい。 FastAPIを用いる際にはスキーマの作成が省略でき、より簡素にコーディングできるとのこと。 この界隈は全くの初心者ですが、気になるのでチュートリアルをみてみました。 FastAPIのチュートリアルは非常に丁寧で、一部和訳され、言及しているところも多い印象です。しかしSQLmodelの記事はまだ少ないようです。ですのでFastAPIとの連携の部分を和訳してみました。 以下、下記からの引用です。 機械翻訳をガッツリ使って、一部意訳しているので間違いがあるかもしれません。 FastAPIとPydantic - Intro¶ SQLModelが最も輝くユースケースの一つであり、それが作られた主な理由は、FastAPIと組み合わせることでした。✨ FastAPIは、SQLModelの同じ作者によって作成された、Web APIを構築するためのPython Webフレームワークです。FastAPIもPydanticの上に構築されています。 この章では、これまで見てきたようなSQLデータベースのテーブルを表すSQLModelのtableモデルと、データのみを表すdataモデル(実際には裏ではPydanticモデルになっている)を組み合わせる方法を見ていきます。 SQLModelのtableモデルと純粋なdataモデルを組み合わせることができるのは、それだけでも便利ですが、すべての例をより具体的にするために、FastAPIを使ってみましょう。 最終的には、データベース内のデータを操作するためのシンプルだが完全なWeb APIを手に入れることができるでしょう。? Learning FastAPI¶ FastAPI を使用したことがない場合は、続行する前に少し勉強してみるのが良いでしょう。 FastAPI メインページの例を読んで試すだけで十分で、10 分もかからないはずです。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

趣味と機械学習 アイデア

したいこと、すべきこと、できること 何をやるにもモチベーションがないと進まない。無理なく、前向きにアウトプットできるように上記が重なる範囲で取り組んでみたいアイデアを書き連ねようと思う。 したいこと 趣味の範疇:お酒(ワイン、ビール、日本酒)、アイドル・日本語ラップ鑑賞、オンラインゲーム(FORTNITE、APEX) すべきこと 新規性があること、少なからずQiitaやgoogleで調べて既報でなさそうこと 誰かしらに面白いと思ってもらえそうなこと(感性) できること pythonコード初心者 kaggleやNishikaでサブミットはできる スクレイピング勉強中 アイデア目次 日本のワイン生産地とボーリング調査の紐づけ解析 好きなアイドル・ラッパーのメッセージを読み解く フォートナイトの戦績を自己管理するデータ収集 基本的に人がやってきたコードを流用して、自分の興味に対して適応させながら勉強してできることをどんどん増やしていきたい。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

趣味と~機械学習~プログラミング アイデア

したいこと、すべきこと、できること 何をやるにもモチベーションがないと進まない。無理なく、前向きにアウトプットできるように上記が重なる範囲で取り組んでみたいアイデアを書き連ねようと思う。 したいこと 趣味の範疇:お酒(ワイン、ビール、日本酒)、アイドル・日本語ラップ鑑賞、オンラインゲーム(FORTNITE、APEX) すべきこと 新規性があること、少なからずQiitaやgoogleで調べて既報でなさそうこと 誰かしらに面白いと思ってもらえそうなこと(感性) できること pythonコード初心者 kaggleやNishikaでサブミットはできる スクレイピング勉強中 アイデア目次 日本のワイン生産地とボーリング調査の紐づけ解析 好きなアイドル・ラッパーのメッセージを読み解く フォートナイトの戦績を自己管理するデータ収集 基本的に人がやってきたコードを流用して、自分の興味に対して適応させながら勉強してできることをどんどん増やしていきたい。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

機械学習ことはじめにscikit-learnを触ってみた(scikit-learnのサンプルデータセットをmatplotlibで表示するところまで)

はじめに 初めまして、株式会社じげんの毛援援です。日本に来て3年目で、社会人も3年目です。主にrailsアプリケーションの開発をしてます。よろしくお願い致します。 機械学習を勉強しようと思い、今回初めてscikit-learn(sklearn)に挑戦してみました。 この記事では、インストール手順を含めて、一連の工程をお伝えします。 インストールする sklearnの例(The Digit Dataset)を触ってみる sklearnを使ってデータセットをダウンロードする matplotlibで表示する sklearnや機械学習に興味がある方や、私と同じく機械学習ビギナーの方は、是非コメントをお願いします。 機械学習:機械学習(きかいがくしゅう、英: machine learning)とは、経験からの学習により自動で改善するコンピューターアルゴリズムもしくはその研究領域で、人工知能の一種であるとみなされている。「訓練データ」もしくは「学習データ」と呼ばれるデータを使って学習し、学習結果を使って何らかのタスクをこなす。例えば過去のスパムメールを訓練データとして用いて学習し、スパムフィルタリングというタスクをこなす、といった事が可能となる。 (https://ja.wikipedia.org/wiki/機械学習 より引用) scikit-learn(sklearn):Pythonに基づく機械学習ツールです。 予測データ分析のためのシンプルで効率的なツール NumPy、SciPy、およびmatplotlib上に構築 オープンソース、商用利用可能-BSDライセンス (https://scikit-learn.org/stable より引用) 環境 MacBook Pro 11.6 (2.6 GHz 6コアIntel Core i7) python 3.9 pip 21.3.1 sklearn 1.0.1 matplotlib 3.5.0 インストール 1. pipのインストール(pipがインストールされている場合は、ここを無視していいです) curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 2108k 100 2108k 0 0 7052k 0 --:--:-- --:--:-- --:--:-- 7076k sudo python3 get-pip.py WARNING: The directory '/Users/mao/Library/Caches/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag. DEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621 Collecting pip Downloading pip-21.3.1-py3-none-any.whl (1.7 MB) |████████████████████████████████| 1.7 MB 3.9 MB/s Installing collected packages: pip Attempting uninstall: pip Found existing installation: pip 21.3.1 Uninstalling pip-21.3.1: Successfully uninstalled pip-21.3.1 DEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621 Successfully installed pip-21.3.1 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv pip -V or pip3 -V で確認 pip 21.3.1 from /usr/local/lib/python3.9/site-packages/pip (python 3.9) 2. sklearnをインストール pip install -U scikit-learn 注意: scikit-learn には SciPy と NumPy が必須のパッケージとなります。まだインストールしていない方は SciPy と NumPy を事前にインストールしておきましょう。 DEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621 Collecting scikit-learn Using cached scikit_learn-1.0.1-cp39-cp39-macosx_10_13_x86_64.whl (8.0 MB) Requirement already satisfied: scipy>=1.1.0 in /usr/local/lib/python3.9/site-packages (from scikit-learn) (1.7.2) Requirement already satisfied: threadpoolctl>=2.0.0 in /usr/local/lib/python3.9/site-packages (from scikit-learn) (3.0.0) Requirement already satisfied: joblib>=0.11 in /usr/local/lib/python3.9/site-packages (from scikit-learn) (1.1.0) Requirement already satisfied: numpy>=1.14.6 in /usr/local/lib/python3.9/site-packages (from scikit-learn) (1.21.4) Installing collected packages: scikit-learn DEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621 Successfully installed scikit-learn-1.0.1 インストールしたsklearnを確認 python3 -m pip show scikit-learn: scikit-learnでインストールされているバージョンと場所を確認します。 Name: scikit-learn Version: 1.0.1 Summary: A set of python modules for machine learning and data mining Home-page: http://scikit-learn.org Author: Author-email: License: new BSD Location: /usr/local/lib/python3.9/site-packages Requires: joblib, numpy, scipy, threadpoolctl Required-by: python3 -m pip freeze: 実行中の環境でインストールしたパッケージを表示させます。 fonttools==4.28.2 joblib==1.1.0 kiwisolver==1.3.2 matplotlib==3.5.0 numpy==1.21.4 packaging==21.3 pandas==1.3.4 Pillow==8.4.0 pyparsing==3.0.6 python-dateutil==2.8.2 pytz==2021.3 scikit-learn==1.0.1 scipy==1.7.2 setuptools-scm==6.3.2 six==1.16.0 threadpoolctl==3.0.0 tomli==1.2.2 python3 -c "import sklearn; sklearn.show_versions()": ちゃんと動いているかどうかを確認します。 System: python: 3.9.2 (default, Mar 15 2021, 17:37:51) [Clang 12.0.0 (clang-1200.0.32.29)] executable: /usr/local/opt/python@3.9/bin/python3.9 machine: macOS-11.6-x86_64-i386-64bit Python dependencies: pip: 21.3.1 setuptools: 54.1.2 sklearn: 1.0.1 numpy: 1.21.4 scipy: 1.7.2 Cython: None pandas: 1.3.4 matplotlib: 3.5.0 joblib: 1.1.0 threadpoolctl: 3.0.0 Built with OpenMP: True sklearnを触ってみる 今回はThe Digit Datasetをやってみます。 このデータセットは、1797枚8x8の0から9までの手書き数字の画像で構成されています。 下のソースコードは公式ページからの引用です。 数字画像をmatplotlibで表示しています。 # Code source: Gaël Varoquaux # Modified for documentation by Jaques Grobler # License: BSD 3 clause from sklearn import datasets import matplotlib.pyplot as plt # Load the digits dataset digits = datasets.load_digits() # Display the first digit plt.figure(1, figsize=(3, 3)) plt.imshow(digits.images[-1], cmap=plt.cm.gray_r, interpolation="nearest") plt.show() 説明 先ほどのソースコードを詳しく見てみます。 from sklearn import datasets: sklearnからThe Digit Datasetを導入します。 import matplotlib.pyplot as plt: matplotlib.pyplotを導入して、pltという別名が使えます。 matplotlib.pyplot: matplotlibのインターフェースです。 これはMATLABのような暗黙的なプロット方法を提供します。 画面上にフィギュアを開き、フィギュアGUIマネージャーとして機能します。 datasets.load_digits(): 数字のデータセットをロードして、Dictionary-likeオブジェクト(Bunch)を返します。 plt.figure(1, figsize=(3, 3)): idは1で、3x3インチの新しいフィギュアを作成します。 plt.figure(num, figsize): 新しいフィギュアを作成するか、既存のフィギュアをアクティブにします。 num(int or str or Figure, optional): フィギュアの唯一的な識別子です。1ならフィギュア名はFigure 1です。もし'test'にしたら、識別子とフィギュア名はtestになります。 figsize((float, float), default: rcParams["figure.figsize"] (default: [6.4, 4.8])): フィギュアの幅、高さ(インチ)です。 num=1 num='test' plt.imshow(digits.images[-1], cmap=plt.cm.gray_r, interpolation="nearest"): データセット最後の項目を最近傍補間(nearest)して、逆グレースケール画像を作ります。 plt.imshow(X, cmap, interpolation): データを画像として、つまり2Dの通常のラスターに表示します。 X(array-like or PIL image): 画像のデータです。digits.images[-1]はデータセットの最後の項目です。 cmap(str or Colormap, default: rcParams["image.cmap"] (default: 'viridis')): Colormapインスタンスまたは登録済みカラーマップ名が入ります。これにより、スカラーデータと色を対応させます。plt.cm.gray_rはplt.cm.grayと逆です。値が0の時は白で、最大値(8ビットでは255、16ビットでは65,535)では黒です。 interpolation(str, default: rcParams["image.interpolation"] (default: 'antialiased')): 補間を設定します。'nearest'とは最近傍補間(ニアレストネイバー Nearest neighbor)で、最も近い位置にある画素の輝度値を参照します。 plt.cm.gray_r plt.cm.gray default: 'antialiased' 'nearest' default: 'antialiased' 'bilinear' 'bicubic' 終わりに 今回は初めて簡単なsklearnの例を実行しました。Railsなどと全然違って、新鮮な体験でした。 今後も引き続き、機械学習の勉強を頑張ります。 記事は会社の皆さんが手伝ってくれたおかげで、完了しました。ありがとうございました!!! 参考 matplotlib.pyplot: matplotlib.pyplotライブラリの説明です。 sklearn.datasets.load_digits: sklearn.datasets.load_digits メソッドの説明です。 sklearn.utils.Bunch: Bunchの説明です。 matplotlib.pyplot.figure: matplotlib.pyplot.figure メソッドの説明です。 Matplotlib plt.figure()を使う理由|FigureとAxesの関係を把握しよう: Figureの説明で、FigureとAxesの関係の説明です。 matplotlib.pyplot.imshow: matplotlib.pyplot.imshow メソッドの説明です。 Interpolations for imshow: matplotlib.pyplot.imshow の内挿の説明です。 画素の補間(Nearest neighbor,Bilinear,Bicubic)の計算方法: 最近傍補間などの説明です。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Slack Connectでアプリを利用する時にハマったこと

Slackアプリの開発が楽しい最近。 こんなアプリを開発してみました。 TASUKARU-TaskAll- Task Manager for Slack Slack用のタスクマネージャーアプリです。 私自身がスレッドやメンション画面で依頼されたタスクを探すことが多いのですが、既存の画面では処理が終わったスレッドをアーカイブすることは出来ません。 加えて、処理したあとにコメントをスレッドに追加すると、スレッドタブの上位にソートされるので、未処理タスクが下へ下へと埋もれていってしまいます。 このアプリでは、やり取りのあったスレッドをリストするに加えて、アーカイブ機能を付けました。アプリの画面だけみていれば必要なやり取り全てが把握できるという状態を目指して作られたアプリです。 細かい仕様については上記のリンク先に譲るのですが、こちらのアプリの開発中に困った出来事がありました。 TASUKARUでは、発言者とメンション先の人との間の情報を蓄積するアプリです。 一つのワークスペースで使っている段階では気付かなかった難点が、Slack Connect時に発生しました。 具体的な内容の前に、処理方法についての簡単な説明をします。 TASUKARUでのデータ処理方法について TASUKARUでは、上述の通り、メンション付きpost及び、それに付随したスレッド内容をトラッキングするアプリになっています。 history系SCOPEを取得することで、message eventを取得するようにしています。 bolt.py @bolt_app.event("message") def handle_message_events(client, event, body, logger, view, payload): print('---------------------------') print('event message--------------') print('---------------------------') SCOPEが正しく設定されていれば、こんな風に書いておくことでメッセージイベントを取得して処理することができるようになります。 問題になった挙動は、Slack Connect時の挙動です。 SlackConnectで共有されたチャンネルの双方のワークスペースでTASUKARUをインストールしていると、こんな状態になります。 TASUKARUが2つはいっていることが分かります。 bot_id としては別物ですので当然そのようになります。 次に、課題がなんだったのかについて。 2つアプリが入っていた場合に想定する挙動 開発者として想定していた挙動は、botが2つはいっているので何かしらのpostがあった場合にはmessageイベントが2回発火するという挙動です。 しかし、どうやらSlackの仕様で、あとに入れたアプリ側でしか反応してくれないようなのです。 そうなると、情報の取得という意味では、片方のワークスペースにしか対応しないという状態になってしまいます。 この仕様に気付くまでにかなり時間を要しました。そうだったのか! 解決策 TASUKARUで必要な情報は、そのチャンネルにコネクトしているワークスペースのIDです。これさえあれば、各ワークスペース用のレコードとして保存が可能になります。 そこで利用できるAPI methodがapps.event.authorizations.listです。 今まで知らなかったのですが、App-level tokenというものを使う必要があるAPIです。 SCOPEもauthorizations:readというものを使います。 設定画面はアプリ画面のBasic Information画面の中です。 from slack_sdk import WebClient response = client.apps_event_authorizations_list( token="xapp-1-A111-111-xxx", event_context="4-eyJldCI6Im1lc3NhZ2UiLCJ0aWQiOiJUSktSOEg5TDIiLCJhaWQiOiJBMDJQMlNRSzRLQiIsImNpZCI6IkMwMTYwSFI4TTZGIn0", ) こんな形で呼び出すと、authorizationsが返ってきます。詳細については12/2の @seratch さんの記事を参照ください。 該当部分はこちらです event_contextはmessage eventのどこにあるかと言うとbody["event_context"]で取得できます。 こんな感じの処理で、Slackコネクトで接続しているワークスペースIDをリストで取得できます。 環境変数にTOKENを入れておいてください export APP_LEVEL_TOKEN=xxxxxxxxxxxxx bolt.py from slack_sdk import WebClient @bolt_app.event("message") def handle_message_events(client, event, body, logger, view, payload): print('---------------------------') print('event message--------------') print('---------------------------') slack_ws_id_list = get_authorization_ws(body['event_context']) def get_authorization_ws(event_context): APP_LEVEL_TOKEN = os.environ["APP_LEVEL_TOKEN"] app_client = WebClient(token=APP_LEVEL_TOKEN) authorizations_list = [] try: authorizations_list = app_client.apps_event_authorizations_list( event_context=event_context ) except SlackApiError as e: print("Error fetching authorizations_list: {}".format(e)) authorizations = authorizations_list.get("authorizations") slack_ws_id_list = [] for auth in authorizations: slack_ws_id_list.append(auth.get("team_id")) return slack_ws_id_list これを利用することで、課題を解決することができました???? 苦心作のTASUKARU。是非これを期にお試しください。 TASUKARU-TaskAll- Task Manager for Slack
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python】Crank-Nicholson法を用いた1次元拡散方程式の数値計算

はじめに  今回は、非定常1次元拡散方程式を時間陰解法の一種であるCrank-Nicholson法を用いて数値計算していこうと思う。  Python3でのシミュレーションを行っている。 問題  厚さを無視できる長さ1.0mmの薄い板を考える。初め、板は21℃で一定温度であった。片側の端を断熱し、もう一方の端を30℃に加熱した時の温度分布変化を描画する。 離散化方法  非定常1次元拡散方程式は、 $$\frac{\partial{q}}{\partial{t}}=\alpha\frac{\partial^2{q}}{\partial{x^2}}\tag{1}$$ 流体解析に対する時間陰解法として有名なBeam-Warming法は、 \frac{q^{n+1}_{j}-q^{n}_{j}}{\Delta{t}}=\alpha\Bigl\{(1-\lambda)\frac{q^{n+1}_{j+1}-2q^{n+1}_{j}+q^{n+1}_{j-1}}{2\Delta{x}}-\lambda\frac{q^{n}_{j+1}-2q^{n}_{j}+q^{n}_{j-1}}{2\Delta{x}}\Bigr\}\tag{2} 特に、式(2)において、$\lambda=0.5$の時がCrank-Nicholson法として知られている。ちなみに、$\lambda=0$の時が時間陽解法であり、$\lambda=1$の時がオイラー陰解法である。  さて、今回はCrank-Nicholson法を利用する。 \frac{q^{n+1}_{j}-q^{n}_{j}}{\Delta{t}}=\frac{1}{2}\alpha\Bigl\{\frac{q^{n+1}_{j+1}-2q^{n+1}_{j}+q^{n+1}_{j-1}}{2\Delta{x}}-\frac{q^{n}_{j+1}-2q^{n}_{j}+q^{n}_{j-1}}{2\Delta{x}}\Bigr\}\tag{3} 変形をすると、 (\frac{2}{\nu}+2)q^{n+1}_{j}-(q^{n+1}_{j+1}+q^{n+1}_{j-1})=(\frac{2}{\nu}-2)q^{n}_{j}+(q^{n}_{j+1}+q^{n}_{j-1})\tag{4} ここで、 $$\nu=\alpha\frac{\Delta{t}}{\Delta{x^2}}\tag{5}$$ である。式(4)の左辺はn+1時間ステップの状態量、右辺はn時間ステップの状態量を表す。n+1時間ステップの各点jにおけるqを計算するには、以下のような三項方程式を解く必要がある。 \left[ \begin{array}{ccc} \frac{2}{\nu}+2 & 1 & \cdots & \cdots & 0 \\ 1 & \ddots & & & \vdots \\ \vdots & &\ddots & &\vdots \\ \vdots & & &\ddots & 1\\ 0 & \cdots &\cdots & 1 & \frac{2}{\nu}+2 \end{array} \right] \left[ \begin{array}{ccc} \vdots \\ q^{n+1}_{j-1} \\ q^{n+1}_{j} \\ q^{n+1}_{j+1} \\ \vdots \end{array} \right] = \left[ \begin{array}{ccc} \\ \\ \\ RHS \\ \\ \\ \\ \end{array} \right] \tag{6} RHSはnステップで評価される式(4)の右辺をまとめたものである。Pythonプログラムでは、np.linalg.solveメソッドを使うことで解くことができる。 Crank-Nicholson法の特性  Crank-Nicholson法の特性について、陽解法と比較してまとめておく。 プログラム プログラムに関しては、こちらの記事を参考にさせていただきました。 https://qiita.com/sci_Haru/items/960687f13962d63b64a0 import numpy as np import matplotlib.pyplot as plt import matplotlib.animation as animation from mpl_toolkits.mplot3d import Axes3D Nx = 100 Nt = 5000 Lx = 1.0 Lt = 1.5 delta_x = Lx/Nx delta_t = Lt/Nt x = list(range(Nx)) y = list(range(Nt)) im = [] ims = [] alpha = 0.1 TT = np.zeros([Nx, Nt]) #初期条件の設定 TT[:,0] = 21 #境界条件の設定 for i in range(Nt): TT[0, i] = 21 TT[-1, i] = 30 #熱伝達率の行列を作成 coe = np.ones([Nx, Nt]) for i in range(Nx): coe[i,:] = alpha #安定性を決める数 alpha*dt/dx^2 をもつ行列の作成 q = np.zeros([Nx, Nt]) vT = np.ones([Nx, Nt]) for i in range(Nx): vT[i,:] = delta_t * coe[i,:] / (delta_x ** 2) #クランクニコルソン法の適用 for t in range(Nt-1): #連立方程式の係数行列Aの作成 A = np.zeros([Nx-2,Nx-2]) for i in range(Nx-2): A[i,i] = 2/vT[i,t] +2 if i >=1 : A[i-1,i] = -1 if i <= Nx-4 : A[i+1,i] = -1 #係数行列bの作成 b = np.zeros([Nx-2]) for i in range(Nx-2): b[i] = TT[i,t]+(2/vT[i+1,t]-2) * TT[i+1,t] + TT[i+2,t] +q[i+1,t] b[0] += TT[0,t+1] b[Nx-3] += TT[-1,t+1] #Aq=bをqについて解く T = np.linalg.solve(A,b) for i in range(Nx-2): TT[i+1,t+1] = T[i] X, Y = np.meshgrid(x,y) def functz(T): z=T[X,Y] return z Z = functz(TT) fig = plt.figure() ax = Axes3D(fig) ax.plot_wireframe(X,Y,Z, color='r') ax.set_xlabel('t') ax.set_ylabel('x') ax.set_zlabel('T') plt.show() 参考文献 [1] 藤井孝蔵、立川智章、Pythonで学ぶ流体力学の数値計算法、2020/10/20 [2] [Pythonによる科学・技術計算] クランク-ニコルソン法(陰解法)とFTCS法(陽解法)による1次元非定常熱伝導方程式の数値解法,放物型偏微分方程式、https://qiita.com/sci_Haru/items/960687f13962d63b64a0、 最終閲覧日2021/11/30
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

FastAPI シンプル チャット - Websocket

以下の過去記事でも述べていますが、FastAPIはStarletteを基盤として構築されており、WebSocketのサポートも継承しています。 Starletteで作る Simple Web Server - Qiita WebSocket - FastAPI公式サイト 今回は公式サイトにあるチャットプログラムを少し修正し、特にクライアントは簡単なVueプログラムに置き替えています。 1. クライアント - Vue.js 過去記事のFastAPI OAuth2 クライアント - Qiitaと同じディレクトリ構成で、FastAPIのディレクトリのサブディレクトリstaticにindex.htmlとindex.jsを置きます。 index.htmlは2つのフィールドから構成されています。ログイン画面とメッセージ送信画面です。メッセージ送信ボタンはログインするまでは無効化されています。 static/index.html <html> <head> <link rel="stylesheet" href="index.css"> <script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script> </head> <body> <div id="app"> <div id="login"> <h1>Login</h1> <p><label for="username">ユーザ名</label> <input id="username" v-model="username"></p> <p><label for="password">パスワード</label> <input id="password" v-model="password"></p> <button v-on:click="doLogin">ログイン</button> <hr> <h1>Chat message</h1> <p><input v-model="message"></p> <button v-bind:disabled="isButtonDisabled" v-on:click="sendMessage">メッセージ送信</button> </div> <ul id='messages'> <li v-for="(message,index) in logs"> {{index}} {{message}} </li> </ul> </div> <script src="index.js"></script> </body> </html> Vueのメッソドは2つあります。ログインを行うdoLoginと、メッセージを送信するsendMessageです。ログイン時の処理として、WebSocketのエンドポイントにはクエリパラメータでユーザ名とパスワードを渡します。これはWebSocketのインスタンスを作成するときに行われます。 index.js var app = new Vue({ el: '#app', data: { ws: null, username: 'yamada', password: 'pass123', message: '', logs: [], isButtonDisabled: true }, methods: { doLogin: function(event) { this.ws = new WebSocket("ws://localhost:8000/ws?username=" + this.username + "&password=" + this.password); this.isButtonDisabled = false that = this this.ws.onmessage = function(event) { that.logs.push(event.data) }; event.preventDefault() }, sendMessage: function(event) { this.ws.send(this.message) this.message = '' event.preventDefault() } } }) 2. サーバプログラム - FastAPI サーバ側で注意すべき点は、path operation function ( websocket_endpoint ) の第一引数にWebSocketインスタンスが渡されることです。その後にクエリパラメータのusernameとpasswordが続きます。ここではナンチャッテ認証としてusernameとpasswordを検証していますが、もう少しそれらしくしたいのなら「FastAPI OAuth2 クライアント」でのコードをそのまま適用できるでしょう。 またブロードキャストを実現したい目的で、クラスConnectionManagerを定義し、接続クライアント( connection )を管理するようにしています。 main.py from typing import List from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws") async def websocket_endpoint( websocket: WebSocket, username: str = Query(..., max_length=50), password: str = Query(..., max_length=50) ): print(f'username={username} password={password}') if username not in ['yamada','tanaka'] or password != 'pass123': return None await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.send_personal_message(f"You wrote: {data}", websocket) await manager.broadcast(f"Client #{username} says: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"Client #{username} left the chat") Starletteの功績といえるのでしょうが、結構簡単にSocketプログラミングが可能です。まあ、ただしElixier/Phoenixのフレームワークにはまだ届いていない気はしますが。 3. プログラムの動作 3-1.初期画面 以下のURLで初期画面にアクセスできます。メッセージ送信ボタンが無効化されています。 http://localhost:8000/static/index.html 3-2.ログイン ユーザ名とパスワードの初期値そのままにログインします。ログインするとメッセージ送信ボタンが有効化されます。 3-3.メッセージ送信 まず「こんにちは、山田です」と入力します 次に送信ボタンを押します。入力欄がクリアーされて、下部にメッセージログが表示されます。 3-4.別ブラウザからのログイン 別ブラウザを立ち上げ、tanakaでログインします。 3-5.別ブラウザからメッセージ送信 左が山田さんのブラウザ、右が田中さんのブラウザです。 今回は以上です。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

python-ternaryで三角図(ヒートマップ)を描いてみた

三角図(三角グラフ)は、 $ x + y + z = 1 , x ≧ 0, y ≧ 0, z ≧ 0 $ を満たすような $ x, y , z $ を2次元の三角形で表す図である。 三角図の例や見方については一旦おいておき、python-ternary を使って、関数を用いた三角図のヒートマップを実装してみようと思う。 !pip install python-ternary ここで、今回取り上げるディリクレ分布についての紹介を。 ディリクレ分布は、各次元が正であり総和が1となるような $ K $ 次元ベクトルが従う分布である。 数式は以下。 $$ Dir(\textbf{π}|\textbf{α}) = C_D(\textbf{α}) \prod_{k=1}^{K} {\pi_k}^{\alpha_k - 1} $$ ここで、$ \bf{α} $ は $ K $ 次元ベクトルのパラメータであり、各要素 $ \alpha_k $ は正の実数値である。 定数項は、 $$ C_D(\textbf{α}) = \frac{Γ(\sum_{k=1}^{K} \alpha_k)}{\prod_{k=1}^{K} Γ(\alpha_k)} $$ $ \pi_k $ ($ \textbf{π} $ のk番目の要素)の期待値は、 $$ \frac{\alpha_k}{\sum_{i=1}^{K} \alpha_i} $$ である。 $ K = 3 $ のとき、ディリクレ分布は3つの次元それぞれが正であり総和が1となる3次元ベクトルを生成してくれる。 python-ternaryを使う上ではディリクレ分布についてはあまり深く知る必要はない。 とりあえずディリクレ分布を実装してみよう。 先に言っておくと、下記の関数は引数を二つとることに注意。 import numpy as np from scipy.special import gamma def Dirichlet(pi_vec, alpha_vec=[1, 2, 3]): # πベクトル、αベクトルをnumpyのndarrayとして扱う pi_vec = np.array(pi_vec) alpha_vec = np.array(alpha_vec) # Πは、各インデックスに対する値を格納したnp.ndarrayにnp.prod(・)を適用することで実装 main = np.prod(pi_vec ** (alpha_vec - 1)) C_D = gamma(np.sum(alpha_vec)) / np.prod(gamma(alpha_vec)) return main * C_D 上記のコードでは、$ K = 3 $ を想定して引数のデフォルト値を用意した。 早速やってみよう。 全体のコードから断片を拾い上げ、最後にまとめたコードを紹介する。 まずは必要なライブラリのインポート。python-ternaryのインポートは import ternary でいい。 $ \textbf{α} $ は固定で決める。 from functools import partial import matplotlib.pyplot as plt import matplotlib # 三角グラフ描画用のライブラリをインポート import ternary alpha = (2, 4, 6) ternary.figureで土台をつくる。これはなんだかMatplotlibと似たような感じ。 ternary.figureの引数にscaleがあるが、これは分割数を意味している。 戻り値でfigureとtaxを取得しているが、このtaxは、Matplotlibのaxと似たような感じで使える。 # 分割数。 scale = 100 figure, tax = ternary.figure(scale=scale) # 表示させる画像のサイズを指定 figure.set_size_inches(10, 8) 今回はtaxのheatmapfというメソッドを利用する。関数を使ってヒートマップを描画しよう、という感じ。 このheatmapfのfunc引数に渡された関数には、順次三つ組みのタプル ―例えば、(0.15, 0.28, 0.57)ー が引き渡され、各座標における値が計算される。 今回は自作のDirichlet関数を使用しているが、Dirichlet関数には引数が二つある。引数alpha_vecは固定したいので、Python標準ライブラリfunctoolsのpartialを用いて、部分適用された関数を作成する。 heatmapfのboundary引数は、境界での値を計算するかどうか。今回取り上げたディリクレ分布が返すベクトル $ \textbf{π} $ は各次元共に正である条件があるので、境界の座標 ―例えば、(0.2, 0.8, 0.0)― では計算しないでほしいのでFalseにした。 heatmapfのstyle引数は、"triangular" "dual-triangular" "hexagonal" のいずれかである。試してみてほしい。 gridlinesは三角図の内部に描かれる直線のことで、各辺に平行なこれが何本も描かれることで、特定変数についての等高線を得ることもできる。 # funcには、総和1のベクトルがscaleの分割数に合わせて渡される。 # funcには関数を渡す必要がある。今回は引数pi_vec, alpha_vecのうち、alpha_vecのみ先に指定したうえで関数として渡したいので、Python標準ライブラリfunctoolsのpartialを用いて、部分適用された関数を作り、func引数 tax.heatmapf(func=partial(Dirichlet, alpha_vec=alpha), boundary=False, style="triangular", cmap='rainbow') tax.boundary(linewidth=2.0) tax.gridlines(color="black", multiple=10, linewidth=1) ここは、ディリクレ分布の期待値を計算しているところ。 本編とは少しずれるかも。 alpha = np.array(alpha) alpha_sum = alpha.sum() # tupleに戻すのは、このまま表示させるとそれっぽく見えるから。 Exp = tuple([np.around((alpha_i / alpha_sum), decimals=2) for alpha_i in alpha]) ここは割と見たまんま。 下、右、左に軸のラベルを表示させる。 offsetは、三角図の辺からの距離のことをいっているみたい。 # Set Axis labels and Title fontsize = 25 offset = 0.14 tax.set_title(title=f'α = {tuple(alpha)}, Exp = {Exp}', loc='center', fontsize=fontsize) tax.bottom_axis_label("$\\alpha_1$", fontsize=fontsize, offset=offset) tax.right_axis_label("$\\alpha_2$", fontsize=fontsize, offset=offset) tax.left_axis_label("$ \\alpha_3 $", fontsize=fontsize, offset=offset) tax.ticksで設定しているのは、数字の表示。 今回は、実装の初めにscale=100とした。 ここで初めて言う事実がひとつある。 heatmapfで三角図を表示させた場合、各辺ごとに1~scaleまでの整数が表示される。今回であれば、1~100の整数だ。 だが、あくまでもこれはscale(100)分割したうちの何番目かを示しているだけで、あくまでも座標値の総和は1である。 辺ごとに表示される整数を1/scaleすれば座標値が得られるという認識でいてくれればいい。 tax.ticksのmultiple引数を設定することで、値をとびとびにできる。 1~100までの100個の数字を表示させるのはだれしも嫌だろう。 multipleに設定した数だけ値がとびとびになる。 tax.clear_matplotlib_ticks() は公式 (https://github.com/marcharper/python-ternary) にて、MatplotlibのデフォルトのAxexを取り除く、との目的で使われている。今回は外しても何も変わらなかった。 tax.get_axes().axis('off') をしないと、三角図を囲う四角い枠が表示されることになり、ダサい。 # multiple、デフォルトでは1みたい。 tax.ticks(axis='lbr', linewidth=1, multiple=10) tax.clear_matplotlib_ticks() tax.get_axes().axis('off') tax.show() というわけで、上記のコードをまとめたのが以下である。 from functools import partial import matplotlib.pyplot as plt import matplotlib # 三角グラフ描画用のライブラリをインポート import ternary alpha = (2, 4, 6) # 分割数。 scale = 100 # taxはmatplotlibのaxのようなものだと思ってもらえればたぶん大丈夫 figure, tax = ternary.figure(scale=scale) # 表示させる画像のサイズを指定 figure.set_size_inches(10, 8) # funcには、総和1のベクトルがscaleの分割数に合わせて渡される。 # funcには関数を渡す必要がある。今回は引数pi_vec, alpha_vecのうち、alpha_vecのみ先に指定したうえで関数として渡したいので、Python標準ライブラリfunctoolsのpartialを用いて、部分適用された関数を作り、func引数 tax.heatmapf(func=partial(Dirichlet, alpha_vec=alpha), boundary=False, style="triangular", cmap='rainbow') tax.boundary(linewidth=2.0) tax.gridlines(color="black", multiple=10, linewidth=1) #tax.gridlines(color="black", multiple=100, linewidth=1) # (r,p)の組と、期待値('Exp')を表示 alpha = np.array(alpha) # np.array()に対しても、内法表記使えるみたいだね。便利。 alpha_sum = alpha.sum() Exp = tuple([np.around((alpha_i / alpha_sum), decimals=2) for alpha_i in alpha]) # 結局αはtupleに戻すんだけどね。表示用に。 # Set Axis labels and Title fontsize = 25 offset = 0.14 tax.set_title(title=f'α = {tuple(alpha)}, Exp = {Exp}', loc='center', fontsize=fontsize) tax.bottom_axis_label("$\\alpha_1$", fontsize=fontsize, offset=offset) tax.right_axis_label("$\\alpha_2$", fontsize=fontsize, offset=offset) tax.left_axis_label("$ \\alpha_3 $", fontsize=fontsize, offset=offset) # multiple、デフォルトでは1みたい。 tax.ticks(axis='lbr', linewidth=1, multiple=10) #tax.clear_matplotlib_ticks() tax.get_axes().axis('off') tax.show() python-ternaryでは、3次元の座標データから三角図の散布図を作成することもできるし、色々なメソッドが用意されているらしい。 興味のある方はぜひ使ってみてね。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

boto3からの解放。python3の標準ライブラリのみでAWSサービスを取り扱うには

概要 こんにちは! KDDIアジャイル開発センターの小板橋です。 今回の記事は、KDDI Engineer & Designer Advent Calendar 2021の3日目の記事となります。 ある日、こんな要望が舞い降りてきました。 「python3の標準ライブラリのみでAWSサービスに対して操作(今回は、S3に対するCRUD操作-GetとPostに限定)できるようにして欲しいな。」 そんな時に、どうやってあの便利なboto3を使用せずにこの要件を満たせるかを検証してみたというものです。 boto3のDEBUGログ boto3依存をやめるとなると、まずやらなければならないのはboto3が何を隠蔽してくれているのかを確認する必要があります。 まあ、単純に言えばBoto3がどんなREST APIのリクエストを投げているのかを確認すればよいのです。 そこで、Boto3でDEBUGログを出力してあげると簡単にREST APIのリクエストの内容が確認できます。 DEBUGログの設定 debug.py import boto3 # boto3が対象で、ログレベルはDEBUG boto3.set_stream_logger() # boto3/botocoreの詳細指定まで可能で、ログレベルの変更も可能 boto3.set_stream_logger('boto3.resources', logging.WARN) # パッケージの指定を''にすると # boto3/botocore全てのログが出力される。 boto3.set_stream_logger('') S3に対するGet処理 まず、AWSへのAPIリクエストを行う場合(AWS SDKや、AWS CLI、Boto3などのAWSツールを使わない場合)、リクエストの署名するためのコードを含める必要があります。 基本的には、この署名を気にすることはありません。(AWS SDK、AWS CLI、Boto3などのAWSツールは、ツールの設定時に指定するアクセスキーを使用してAPIリクエストに署名します。) そもそも何で署名が必要なの?? 簡単に言えば、署名によってリクエストのセキュリティ確保をしたい為です。 AWSのリクエストに対する署名では以下の点でセキュリティの確保を行なっています。 リクエスタのIDの確認 署名により、有効なアクセスキーを持っている人がリクエストを送信したことを確認できます。 送信中のデータの保護 送信中のリクエストの改ざんを防ぐために、リクエストの要素からハッシュ値を計算し、得られたハッシュ値をリクエストの一部として含めます。 AWSがリクエストを受け取ると、同じ情報を使用してハッシュを計算し、リクエストに含まれているハッシュ値と比較します。ハッシュ値が一致しない場合、AWSはそのリクエストを拒否します。 => Canonical Request この時、HTTP Authorization ヘッダーを使用します。 潜在的なリプレイ攻撃の防止 リクエストに含まれるタイムスタンプの5分以内にAWSに到達する必要があります。その条件を満たさない場合、AWSはリクエストを拒否します。 署名のバージョン AWSでは、署名バージョン4と署名バージョン2がサポートされています。AWS CLI, AWS SDKは、署名バージョン4をサポートするすべてのサービスに対して自動で署名バージョン4を使用します。 今回も、使用する署名のバージョンは署名バージョン4にします。 S3におけるCanonical Requestでは何が必要?? Canonical Requestでは、次のStepで署名の検証が行われます。 ①: 署名するための文字列を決めます。 ②: 署名キーを使用して、署名する文字列のHMAC-SHA256ハッシュを計算します。 ③: s3は認証されたリクエストを受信すると、署名を計算しリクエストで指定した署名と比較します。 (そのため、s3と同じ方法で署名を計算する必要があります。=> ここで、署名のために合意された形式でリクエストを送信するプロセスは、正規化と呼ばれます。 ①:Canonical Requestの作成 まず、下記にあるのがS3におけるcanonical requestのフォーマットになります。 <HTTPMethod>\n <CanonicalURI>\n <CanonicalQueryString>\n <CanonicalHeaders>\n <SignedHeaders>\n <HashedPayload> HTTPMethodは、GET/PUT/HEAD/DELETE等のHTTPメソッドの1つです。 CanonicalURIは、URIのURIエンコードバージョンです。ドメイン名に続く「/」で始まり、文字列末尾まで、または疑問符文字( '?')までのすべてを指定します。 CanonicalQueryStringは、URIエンコーディングされたクエリパラメータを指定します。 CanonicalHeadersは、リクエストヘッダーとその値のリストです。個々のヘッダー名と値のペアは、改行文字( "\ n")で区切られます。 ヘッダー名は小文字にする必要があります。また、CanonicalHeadersは下記のものを必ず含めなければなりません。 HTTPホストヘッダー Content-Typeヘッダーがリクエストに存在する場合は、追加 リクエストに含める予定のx-amz-*ヘッダーも追加。たとえば、一時的なセキュリティクレデンシャルを使用している場合は、リクエストにx-amz-security-tokenを含める必要があります。 下記、CanonicalHeadersのサンプル。 host:s3.amazonaws.com x-amz-content-sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b785 2b855 x-amz-date:20130708T220855Z SignedHeadersは、アルファベット順にソートされたセミコロンで区切られた小文字のリクエストヘッダー名のリストです。 リスト内のリクエストヘッダーは、CanonicalHeaders文字列に含めたものと同じヘッダーです。 HashedPayloadは、リクエストペイロードのSHA256ハッシュの16進値です。 ちなみに、GETリクエストを使用してオブジェクトを取得する場合、空の文字列ハッシュを計算します。 ②: 署名する文字列を作成 下記が署名する文字列の例になります。 "AWS4-HMAC-SHA256" + "\n" + timeStampISO8601Format + "\n" + <Scope> + "\n" + Hex(SHA256Hash(<CanonicalRequest>)) AWS4-HMAC-SHA256は、ハッシュアルゴリズムHMAC-SHA256を使用していることを示します。 timeStampは、ISO8601形式の現在のUTC時刻を入れます。 Scopeは、結果の署名を特定の日付、AWSリージョン、およびサービス名を連結したものを入れます。 結果の署名は、特定の地域および特定のサービスでのみ機能し、署名は指定された日付から7日間有効です。 ちなみに、Scopeで連結したものの例が下記になります。 date.Format(<YYYYMMDD>) + "/" + <region> + "/" + <service> + "/aws4_request" ③: 署名を計算 AWS署名バージョン4では、AWSアクセスキーを使用してリクエストに署名する代わりに、最初に特定のリージョンとサービスを対象とする署名キーを作成します。 下記が作成する署名キーの例になります。 DateKey = HMAC-SHA256("AWS4"+"<SecretAccessKey>", "<YYYYMMDD>") DateRegionKey = HMAC-SHA256(<DateKey>, "<aws-region>") DateRegionServiceKey = HMAC-SHA256(<DateRegionKey>, "<aws-service>") SigningKey = HMAC-SHA256(<DateRegionServiceKey>, "aws4_request") 実装コード(Python3) それでは、上記の内容を元にPython3の標準ライブラリのみでS3の特定のbucketに対してGetのリクエストを投げてみましょう。 下記がその時のコードになります。 sample.py import sys, os, base64, datetime, hashlib, hmac import urllib.request, urllib.response import urllib.parse method = 'GET' service = 's3' host = 'xxxxbucket.s3.xxxregion.amazonaws.com' region = 'us-east-1' request_parameters = '' # Key derivation functions. See: # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python def sign(key, msg): return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() def getSignatureKey(key, dateStamp, regionName, serviceName): kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp) kRegion = sign(kDate, regionName) kService = sign(kRegion, serviceName) kSigning = sign(kService, 'aws4_request') return kSigning access_key = os.environ.get('AWS_ACCESS_KEY_ID') secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') if access_key is None or secret_key is None: print('No access key is available.') sys.exit() # Create a date for headers and the credential string t = datetime.datetime.utcnow() amzdate = t.strftime('%Y%m%dT%H%M%SZ') datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope # ************* TASK 1: CREATE A CANONICAL REQUEST ************* canonical_uri = "https://%s%s" % (host, "s3_bucketのkey") canonical_querystring = request_parameters payload_hash = hashlib.sha256(("").encode("utf-8")).hexdigest() canonical_headers = 'host:' + host + '\n' + 'x-amz-content-sha256:' + payload_hash + '\n' + 'x-amz-date:' + amzdate + '\n' signed_headers = 'host;x-amz-content-sha256;x-amz-date' canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash # ************* TASK 2: CREATE THE STRING TO SIGN************* algorithm = 'AWS4-HMAC-SHA256' credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request' string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() # ************* TASK 3: CALCULATE THE SIGNATURE ************* signing_key = getSignatureKey(secret_key, datestamp, region, service) signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest() # ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST ************* authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature headers = {'x-amz-date':amzdate, 'x-amz-content-sha256': payload_hash, 'Authorization':authorization_header} request = urllib.request.Request( canonical_uri, headers=headers, method="GET", ) try: with urllib.request.urlopen(request) as response: print("リクエスト送信に成功" + response) except (ValueError, Exception): print("リクエスト送信に失敗") raise (参考先コード) S3に対するPost処理 S3に対するアップロードの処理で気をつけなければならないのは、アップロードするデータの種類を固定させるわけにはいかないので、複合データ型(=multipart)を扱える、multipart/form-dataという形式でアップロードしなければなりません。 multipart/form-dataでの送信 multipart/form-dataは、複数の種類のデータを一度に扱える形式です。 気をつけなければならないのは、下記のようにboundaryをコンテンツの境界を示す文字列として入れてあげなければなりません。 Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryO5quBRiT4G7Vm3R7 今回は、Python3の標準ライブラリのみを使用して、s3に対してアップロードしたいと考えています。 外部ライブラリで有名なrequestsというものがありますが、こちらを使うと簡単にmultipart/form-data形式の送信ができますが、今回はurllibで頑張って実装していきます。 requestsを使わずにurllibでmultipart/form-data送信する multipart/form-data送信するクラスが下記になります。 multipart_post_handler.py #!/usr/bin/env python3 __all__ = ["MultipartPostHandler"] from email.generator import _make_boundary from os.path import basename from io import IOBase as FILE_TYPE from urllib.parse import urlencode from urllib.request import BaseHandler def b(str_or_bytes): if not isinstance(str_or_bytes, bytes): return str_or_bytes.encode("utf-8") else: return str_or_bytes NEWLINE = "\r\n" class MultipartPostHandler(BaseHandler): handler_order = BaseHandler.handler_order - 10 def _encode_form_data(self, fields, files): boundary = _make_boundary() parts = [] for name, value in fields: parts.append(b("--%s" % boundary)) parts.append(b("Content-Disposition: form-data; name=\"%s\"" % name)) parts.append(b("")) parts.append(b(value)) for name, fp in files: filename = basename(fp.name) fp.seek(0) parts.append(b("--%s" % boundary)) parts.append(b("Content-Disposition: form-data; name=\"%s\"; " \ "filename=\"%s\"" % (name, filename))) parts.append(b("")) parts.append(fp.read()) parts.append(b("--%s--" % boundary)) data = b(NEWLINE).join(parts) return boundary, data def http_request(self, req): data = req.data if data and isinstance(data, dict): fields = [] files = [] for key, value in data.items(): if isinstance(value, FILE_TYPE): files.append((key, value)) else: fields.append((key, value)) if files: boundary, data = self._encode_form_data(fields, files) req.add_header("Content-Type", "multipart/form-data; " \ "boundary=\"%s\"" % boundary) req.add_header("Content-Length", len(data)) else: data = urlencode(fields, doseq=True) req.data = data return req https_request = http_request あとは、このクラスを使用しS3に対してmultipart/form-dataでアップロードを実行してあげれば完成です。 sample_post.py #!/usr/bin/env python3 from typing import Any, Dict, Optional, Tuple, BinaryIO from module.multipart_post_handler import MultipartPostHandler import urllib.request, urllib.response import urllib.parse def main(): response = s3_presigned_post("urlを入れてください", "formを入れてください", "file(バイナリで入れてください。)") logging.debug(response) def s3_presigned_post( url: str, form: Dict[str, str], file: BinaryIO, verify: bool = True ): opener = urllib.request.build_opener(MultipartPostHandler()) params = form.copy() params["key"] = params["key"].encode("utf-8") params["file"] = file return opener.open(url, params) if __name__ == "__main__": main() まとめ いかがだったでしょうか。 もし、何かの参考になれば幸いです。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

WebTransportでも4K配信がしたい! 〜序章 : まずはストリーム編〜

ストリームだと4K配信には、、、もう一工夫必要!? 今を生きている皆さんこんにちは。 そろそろアドベントカレンダーの季節ですが、そんなことは気にせずガンガン書いていきます! さて、WebRTCで出来なくてWebTransportに期待されていることの一つに4K配信などの高画質配信があるのではないでしょうか? WebRTCではブラウザの制限により1.5Mbps - 5Mbpsという制限があるため、フルHDでさえ"もやっと"する画質でした。 データグラムは次回試してみます WebTransport(が使っているQUIC)にはストリームとデータグラムがあります。 リアルタイム性を追求するならデータグラムを使うのが良いのですが、 ~正直面倒なのとマスタリングTCP/IP RTP編を半分しか読めていないのと~すでに去年QuicTransportでの実装記事を書かれている方がいらっしゃるので、ストリームでやってみました。 まあ、道のりは険しそうです。 動くには動いたが、、、(数秒程度でカックカク) とりあえずスクショです。左が配信用画面で右が視聴用画面です。(数秒遅れで数秒だけ再生されて、取得できないフレームがあるためデコーダがエラーを吐く) Chrome側でバッファリングしているような挙動になるので、タイムスタンプを比較してオンタイムにMediaStreamに流してやらないといけないのかもしれません。 改善点や正しく実装できているか怪しいところも多いので、今回はWebCodecsとの繋ぎ込みとして記事を書いておきます。 今回作ったもの 家に4Kのウェブカメラがなかったので、動画をVideoタグに埋め込み、配信ページと視聴ページに分けて作りました。 ソースコードをgithubにアップしておきました。 (簡単に図にするとただこれだけのことです) おさらい : WebTransportって何? WebTransportとはブラウザで低レベルな通信をするためのAPIです。 HTTP/3を利用し、それはQUICの上で動きます。 UDPのように高速で信頼性の低いデータグラムと、TCPとUDPのいいとこ取りをしたストリームがあり、今回はいいとこ取りのストリームを使います。 WebRTCがP2Pなのに対してこっちはサーバー・クライアントモデルです。 今まで書いた記事 プログラム解説 さて、まだまだ課題は多そうですがWebTransport + WebCodecsという一番ホットな組み合わせを作れたということでまとめていきます。 QuicTransportからの差分 QuicTransportでビデオチャットネタは2,3記事くらいあったので、そこから新しくなったところをまず書いておきます。 Videoの読み取りは、VideoTrackReader から MediaTrackProcessor になった Audioのエンコーダ・デコーダが実装されいてる 出力するところは MediaTrackGenerator を使う WebCodecsとはなんぞや? WebCodescとはブラウザにおいて動画・音声フレームをエンコードしたりデコードしたりするためのAPIです。 Chrome M94にてリリースされました。 WebCodecs APIにはエンコード・デコードのためのインターフェースと、データモデルが定義されています。 入出力のためのAPIはまた別のようです。 以下はWebCodecsとデバイスとデータの流れを示したものです。 ウェブカメラや動画ファイルからは一度デコードされてしまいます。 また動画フォーマットは今はvp8しか実装されていません。 今後のハードウェアサポートやUVC(ウェブカメラのストリーム)周りの改良が期待されるところです。 全体像 用途に応じてWebTransportのコネクションを使い分けます。 具体的にはHTTP/3のパスを用途ごとに分けて接続し、サーバー側で処理を分けます。 こうすることでコネクションの種類ごとに何のデータが通信されるか、処理を分けることができます。 データの取り扱いですが、動画は1フレーム、音声は1dataごとにWebTransportのストリームを一つ消費して送信します。 ストリームを使えばQUIC側でパケットの並び替えや再送などをやってくれるためです。(データグラムより楽) フロントエンド側 フロントエンド側はエンコード処理を行うためにWebWorkerを使います。 これにより重たいエンコード処理を別スレッドに分けてパフォーマンスを良くすることができます。(わけないとUIが反応しづらくなります) WebWorkerは一つのjsファイルを別スレッドで起動し、postMessage()でデータをやり取りします。 WebWorkerを起動するには次のようにします。 // workerを読み込む streamWorker = new Worker("./stream_worker.js"); // worker からのメッセージはログに出力する streamWorker.addEventListener('message', function(e) { addToEventLog(e.data); }, false); // エンコード・デコード・ストップなどのコマンドを送る streamWorker.postMessage({ type: "connect", url, /* ストリーム情報など */}, [/* 所有権を渡すパラメータ */]); streamWorker.postMessage({ type: "stop" }); WorkerでのWebTransportの処理 コネクションを張り、受信ストリームを設定し、ストリームごとの受信処理を書きます。 WebTransport 配信側概要 まずは配信側です。 接続する -> 1フレーム読み込む -> エンコードする -> 送信する という順番です。 非同期処理だらけですが、基本的にはエンコードされたデータの順番がおかしくなることはなさそうです。(保証されているかは不明) type: "connect"でコネクションを開始させ、エンコード処理と配信を開始します。 type: "stop"で配信を停止してコネクションをクローズします。 let wt_video = null; let wt_audio = null; self.addEventListener('message', async (e) => { const type = e.data.type; if (type === "connect") { stopped = false; const {media: {video, audio}, url} = e.data; wt_video = new WebTransport(url + '/video/stream'); wt_audio = new WebTransport(url + '/audio/stream'); await wt_video.ready; await wt_audio.ready; wt_video.closed.then(() => { self.postMessage('video Connection closed normally.'); }) .catch(() => { self.postMessage('video Connection closed abruptl.'); }); wt_audio.closed.then(() => { self.postMessage('audio Connection closed normally.'); }) .catch(() => { self.postMessage('audio Connection closed abruptl.'); }); // 送信のみなのでストリームの受け入れは不要 // エンコード処理と送信する処理 streamVideo(video); streamAudio(audio); return; } // コネクションを閉じる if (type === "stop") { stopped = true; wt_video.close(); wt_audio.close(); } }, false) // 動画をフレームごとに送信する。 async function streamVideo(video) { // エンコーダを準備する ... enoder = new VideoEncoder({ output: (chunk) => { // エンコードされた時のコールバック ... // 圧縮されたデータに必要なメタデータをつける // フレームを送信する sendBinaryData(wt_video, payload); ... }); // 動画データを読み込む while(true) { // stopしたら抜ける if (stopped) { ... return; ] // 1フレーム読み込む ... // エンコードする encoder.encode(...); // 上記の VideoEncoder()に渡した outputコールバックが呼ばれる ... } } // 音声も同様 ... // バイナリデータを送信する async function sendBinaryData(transport, data) { let stream = await transport.createUnidirectionalStream(); let writer = stream.getWriter(); await writer.write(data); await writer.close(); } WebTransport 視聴側概要 次に視聴側です。 ストリームで動画・音声データを受信するようにします。 接続する -> ストリームを待ち受ける -> ストリームから受信する -> デコードする -> VideoTrackに追加する という順番です。 後ほど詳細は述べますが、受信にタイムラグがあったり、0バイトしか読み出せないことがあったり、Chromeが大量のエラーをプロンプトのほうに吐き出したりと不安定です。 実装上の問題なのかこちらの使い方が悪いのかは追って検証していきます。 // コネクションなどは(このjs内の)グローバル変数に持たせておく let stopped = false; let wait_keyframe = true; let wt_video = null, frameWriter = null; let wt_audio = null, audioWriter = null; self.addEventListener('message', async (e) => { if (type === "connect") { // WebTransportを接続する処理は上記と同じ ... streamVideo(video); // 動画を受信してデコードする streamAudio(audio); // 音声を受信してデコードする return; } ... }) // ビデオを取得してデコードする async function streamVideo(video) { // デコーダの準備 let decoder = new VideoDecoder({ output: (frame) => { // デコードされた時のコールバック // フレームをvideoタグのVideoTrackに書き込む (videoタグに表示される) frameWriter.write(frame); } }) ... // ストリームを受け付ける acceptUnidirectionalStreams(wt_video, async (payload) => { // 受信したデータのコールバック // データからメタデータを取り出す ... // key_frameが来るまで待つ ... // デコードする if (!wait_keyframe) { decoder.decode(chunk); // デコーダーの output コールバックが呼ばれる } }) } WebTransport ストリーム受信処理 データ受信の処理です。 ログを見る限りでは50Bから10KBほどのデータとして取得できますが、(ChromeのQUIC層である程度結合される) 一度に全データを取得できるわけではないので自分でデータを全て結合します。(その割に数秒程度バッファリングされてまとめて処理が走るのが謎) ストリームを受け付ける処理はリファレンスとほぼ同じです。 echoサンプルではリアルタイムにストリーム入力を受け付けてくれますが、 今回の動画配信だと、サーバーからデータを受け取っても1から10秒くらいは処理を開始してくれません。 // ストリームを受け付ける async function acceptUnidirectionalStreams(transport, onstream) { let reader = transport.incomingUnidirectionalStreams.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) { self.postMessage('Done accepting unidirectional streams!'); return; } let stream = value; readFromIncomingStream(stream, onstream); } } catch (e) { self.postMessage('Error while accepting streams: ' + e); } } データを読み込む処理です。 ここでも1/30の確率で0バイトのデータ読み取りになってしまいます。 逆に受信できたデータは、サイズを照合する限りでは送信側やサーバーのサイズと一致しています。 なので上記の問題と合わせてChromeの実装上の問題なのかもしれません。(アクティブでないタブの受信処理は後回しにされる??) // データを読み込む async function readFromIncomingStream(stream, onstream) { let reader = stream.getReader(); let payload = new Uint8Array(); while (true) { const { value, done } = await reader.read(); if (done) { // ここではvalueはundefinedになる // 1/30くらいの確率で0バイトのデータになることがある? if (payload.byteLength == 0) { console.log("invalud payload"); return; } // 細かい処理はコールバックでやる onstream(payload.buffer); return; } // データを結合する buffer = new Uint8Array(payload.byteLength + value.byteLength); buffer.set(payload, 0); buffer.set(new Uint8Array(value), payload.byteLength); payload = buffer; } } エンコードされたデータ構造について WebTransportで転送するのはEncodedVideoChunkとEncodedAudioChunkというデータになります。 動画なら1フレーム、音声なら1ブロックずつエンコードされてきます。 そのため、これらのデータをリアルタイムに送受信できれば良さそうです。 ちなみに、動画にはキーフレームかデルタ(差分)というのもがあり、データ容量を抑えるためにキーフレームからの差分をデータとして扱います。 最近ではあまりみなくなりましたが、破損した動画データを再生すると、動きのある部分がブロックノイズのようになるのをみたことがあると思います。 それはこのキーフレームと差分データによるものです。(キーフレームは一定間隔で送られてくるので、それを受信すると直ります。データ量は10倍くらい差があります) 付随するデータとして (動画のみ)type (key or delta)、timestampとdurationが必要になります。 そのため、これらのデータを一緒に送信する必要があります。 今回は単純にフレームデータの先頭にくっつけて送ることにします。 key size type 1byte timestamp 8byte duration 8byte data data.byteLength 配信側 async function streamVideo(video) { let encoder = new VideoEncoder({ output: (chunk) => { // データがエンコードされた時 ... // header(17) = type(1byte) + timestamp(8) + duration(8) let payload = new ArrayBuffer(17 + chunk.byteLength); const view = new DataView(payload); view.setUint8(0, (chunk.type === "key" ? 1 : 2)); view.setBigInt64(1, BigInt(chunk.timestamp)); // 仕様では long long だが実際はNumber view.setBigUint64(9, BigInt(chunk.duration)); // 仕様では unsigned long long だが実際はNumber chunk.copyTo(new DataView(payload, 17)); // フレームを送信する sendBinaryData(wt_video, payload); }); ... } 受信する側では逆に必要な情報を取り出します。 受信側 // ビデオを取得してデコードする async function streamVideo(video) { // デコーダーの準備 ... // 受信した処理のコールバック acceptUnidirectionalStreams(wt_video, async (payload) => { // payloadからデータを復元する // header(17) = type(1byte) + timestamp(8) + duration(8) let view = new DataView(payload, 0); const type = view.getUint8(0); const chunk = new EncodedVideoChunk({ type: (type === 1 ? 'key' : 'delta'), timestamp: Number(view.getBigInt64(1)), // 仕様では long long だが実際はNumber duration: Number(view.getBigInt64(9)), // 仕様では unsigned long long だが実際はNumber data: new DataView(payload, 17), }); ... // デコーダにデータを渡す ... }); ... } エンコード処理 さて、エンコード処理です。 動画・音声データの読み取りとエンコード処理に分かれます。 ウェブカメラを使う場合であっても getUserMedia()からMediaStreamを取得するのでほぼ同じです。 動画・音声フレーム取得 MediaStreamからVideoTrackもしくはAudioTrackを取得し、MediaStreamTrackProcessorでデータを読み取るためのオブジェクトを生成します。 stream.js // 動画を再生したらストリームを開始する。停止したらストリーム配信も止める video.onplay = () => { const [videoTrack] = document.getElementById('video').captureStream().getVideoTracks(); const [audioTrack] = document.getElementById('video').captureStream().getAudioTracks(); const videoProcessor = new MediaStreamTrackProcessor(videoTrack); const audioProcessor = new MediaStreamTrackProcessor(audioTrack); const frameStream = videoProcessor.readable; const audioStream = audioProcessor.readable; ... // workerに処理を投げる streamWorker.postMessage({ type: "connect", url, {video: {stream: frameStrea}, audio: {stream: audioStream}}}, [frameStream, audioStream]); }; 動画エンコード この読み取り用オブジェクトからデータを読み取り、エンコーダに渡していきます。 const frameReader = video.stream.getReader(); ... let encoder = new VideoEncoder({ output: (chunk) => { // 1フレーム送信する (分割・結合はQUICにお任せする) ... // データにtypeやtimestamp, durationをつける // フレームを送信する sendBinaryData(wt_video, payload); }, error: (e) => { // エラー処理 } }); encoder.configure({ codec: 'vp8', // これしか使えない width: video.width, height: video.height, framerate: 1, // 指定できない latencyMode: "realtime", // 特に変わらない }); // データを読み取る while(true) { // 動画を停止したら処理をやめる if (stopped) { frameReader.close(); encoder.close(); self.postMessage("frame stream stopped."); break; } // WebTransportのストリームの受信と似たような感じ const {value, done} = await frameReader.read(); if (done) { self.postMessage("frame stream ended."); break; } var frame = value; // 30フレームに一度キーフレームにする encoder.encode(frame, {keyFrame: (frameCount % 30 == 0 ? true : false)}); frame.close(); } 音声エンコード 音声データの場合もほぼ同様です。 async function streamAudio(audio) { const frameReader = audio.stream.getReader(); self.postMessage('Start audio frame encode.'); let encoder = new AudioEncoder({ output: (chunk) => { ... // timestamp, durationをバイナリデータに含める // フレームを送信する sendBinaryData(wt_audio, payload); }, error: (e) => { self.postMessage("encoding error. " + e.message) } }); encoder.configure({ codec: 'opus', numberOfChannels: 2, sampleRate: 48000, // AudioContextからサンプルレートを取得したいがここでは使えない }); // データを取得する while(true) { // 動画を停止した場合は止める if (stopped) { frameReader.close(); encoder.close(); self.postMessage("frame stream stopped."); break; } // 読み取る const {value, done} = await frameReader.read(); if (done) { self.postMessage("frame stream ended."); break; } var frame = value; encoder.encode(frame); // エンコードする frame.close(); } wt_audio.close(); } デコードする デコードの場合はデータ受信 -> デコード ->データ書き込み となるのでやや処理の流れが分かりにくいです。 まずは接続する際にMediaTrackGeneratorでトラックを作成し、これをvideoタグに紐付けます。 動画・音声ストリーム作成 viewer.js // WebTransportで接続する ... // デコードされた動画を受け取るためのストリームを作る const videoTrack = new MediaStreamTrackGenerator({ kind: 'video' }); const audioTrack = new MediaStreamTrackGenerator({ kind: 'audio' }); const frameStream = videoTrack.writable; const audioStream = audioTrack.writable; const media = { video: { stream: frameStream, }, audio: { stream: audioStream, }, }; // workerに受信とデコード処理を投げる viewerWorker.postMessage({type: "connect", url, media}, [frameStream, audioStream]); // ストリームをビデオタグに設定する const stream = new MediaStream(); stream.addTrack(videoTrack); stream.addTrack(audioTrack); document.getElementById('video').srcObject = stream; 動画デコード 受信したデータをデコーダに渡し、frameWriterに書き込みます。 // デコーダーの準備 frameWriter = video.stream.getWriter(); let decoder = new VideoDecoder({ output: (frame) => { frameWriter.write(frame); // ここで書き込む }, error: (e) => { console.log(e); } }); decoder.configure({ codec: 'vp8', // これしか使えない optimizeForLatency: true, // 有効かどうか不明 }); // ストリームを受信した時のコールバック acceptUnidirectionalStreams(wt_video, async (payload) => { ... decoder.decode(chunk); // デコードする }) 音声デコード 音声もほぼ同じです。 async function streamAudio(audio) { // デコーダーの準備 audioWriter = audio.stream.getWriter(); let frameCount = 0; let decodedFrameCount = 0; let decoder = new AudioDecoder({ output: (frame) => { audioWriter.write(frame); decodedFrameCount++ }, error: (e) => { console.log(e); self.postMessage(e) } }); decoder.configure({ codec: 'opus', numberOfChannels: 2, sampleRate: 48000, // audioCtx.sampleRate, }); // ストリームを受信した時のコールバック acceptUnidirectionalStreams(wt_audio, async (payload) => { ... // payloadからchunkを取り出す decoder.decode(chunk); // デコードする }); } なお、WebCodecsは8000ピクセルまでのwidthに対応しているようなので、8kも視野に入れているものかと思います。 WebTransportを使わずに別のvideoタグに書き出してみたところ、フルHDの動画ならば問題なくエンコード・デコードできました。 サーバー側 WebTransportが接続されるたびに class WebTransportProtocol(QuicConnectionProtocol): インスタンスが作成されます。 ハンドラの追加 HTTP/3のハンドラに動画・音声のハンドラを追加し、データ受信と配信のためのクラスを追加します。 class WebTransportProtocol(QuicConnectionProtocol): def __init__(self, *args, **kwargs) -> None: ... self._handler = None # pathに応じて ChatHandler, VideoReceiver, VideoSubscriber を使い分ける def _handshake_webtransport(self, if path == b"/chat": assert(self._handler is None) self._handler = ChatHandler(stream_id, self._http) self._send_response(stream_id, 200) elif path == b"/audio/stream": # 音声配信するときにアクセスするパス(サーバーから見れば、音声を受信する) assert(self._handler is None) self._handler = AudioReceiver(stream_id, self._http) self._send_response(stream_id, 200) elif path == b"/audio/view":# 音声受信するときにアクセスするパス(サーバーから見れば、音声を配信する) assert(self._handler is None) self._handler = AudioSubscriber(stream_id, self._http) self._send_response(stream_id, 200) elif path == b"/video/stream": # 動画配信するときにアクセスするパス assert(self._handler is None) self._handler = VideoReceiver(stream_id, self._http) self._send_response(stream_id, 200) elif path == b"/video/view": # 動画視聴するときにアクセスするパス assert(self._handler is None) self._handler = VideoSubscriber(stream_id, self._http) self._send_response(stream_id, 200) 動画・音声を受け取る処理 配信者が動画と音声を配信するための処理を書きます。 サーバーから見ればデータ受信処理になります。 受け取ったデータをそのまま流すだけなので特に難しいことはありません。 # 動画 class VideoReceiver: def __init__(self, session_id, http: H3ConnectionWithDatagram) -> None: self._session_id = session_id self._http = http self._buf = defaultdict(bytes) def h3_event_received(self, event: H3Event) -> None: if isinstance(event, WebTransportStreamDataReceived): # 1フレームを1ストリームで送る self._buf[event.stream_id] += event.data if event.stream_ended: broadcast_video(self._buf[event.stream_id]) self.stream_closed(event.stream_id) def stream_closed(self, stream_id) -> None: try: del self._buf[stream_id] except KeyError: pass def session_closed(self) -> None: # ビデオ送信がストップされた # 特に何もする必要はない return # 音声 class AudioReceiver: def __init__(self, session_id, http: H3ConnectionWithDatagram) -> None: self._session_id = session_id self._http = http self._buf = defaultdict(bytes) def h3_event_received(self, event: H3Event) -> None: if isinstance(event, WebTransportStreamDataReceived): # 1フレームを1ストリームで送る self._buf[event.stream_id] += event.data if event.stream_ended: broadcast_audio(self._buf[event.stream_id]) self.stream_closed(event.stream_id) def stream_closed(self, stream_id) -> None: try: del self._buf[stream_id] except KeyError: pass def session_closed(self) -> None: # 音声送信がストップされた # 特に何もする必要はない return 動画・音声を配信する処理 サーバーにコネクションリストを持ち、視聴用のパスに接続されたらリストに追加します。 あとは上記の動画・音声データを受信し終わったときに全員に配信します。 # 動画 # video_member = {"connection_id": {'connection': h3_connection, 'session_id': session_id}} viewers = defaultdict(Any) def broadcast_video(payload): print("send video " + str(len(payload))) for viewer in viewers.values(): stream_id = viewer['connection'].create_webtransport_stream( viewer['session_id'], is_unidirectional=True) viewer['connection']._quic.send_stream_data( stream_id, payload, end_stream=True) class VideoSubscriber: def __init__(self, session_id, http: H3ConnectionWithDatagram) -> None: # viewersに登録する print("add viewer.") self._connection_id = http._quic.host_cid viewers[self._connection_id] = {"connection": http, "session_id": session_id} def h3_event_received(self, event: H3Event) -> None: # 特に何も受け取らない return def session_closed(self) -> None: # viewersから削除する try: del viewers[self._connection_id] except KeyError: pass # 音声 # listeners = {"connection_id": {'connection': h3_connection, 'session_id': session_id}} listeners = defaultdict(Any) def broadcast_audio(payload): print("send audio " + str(len(payload))) for viewer in listeners.values(): stream_id = viewer['connection'].create_webtransport_stream( viewer['session_id'], is_unidirectional=True) viewer['connection']._quic.send_stream_data( stream_id, payload, end_stream=True) class AudioSubscriber: def __init__(self, session_id, http: H3ConnectionWithDatagram) -> None: # listenersに登録する print("add viewer.") self._connection_id = http._quic.host_cid listeners[self._connection_id] = {"connection": http, "session_id": session_id} def h3_event_received(self, event: H3Event) -> None: # 特に何も受け取らない return def session_closed(self) -> None: # listenersから削除する try: del listeners[self._connection_id] except KeyError: pass なお、PythonのサーバーはCPU負荷が高いものの、データが破損することもなくほぼ遅延なく処理を行なっているようです。 スムーズに動画が配信できない。何が原因か さて、ここまでの処理でエンコード・デコード・配信・受信などの処理ができました。 ただ、冒頭に書いた通り期待通りに動画が再生されることはなく、特にデータ受信が不安定でした。 確認できたこと 負荷が高くて処理が追いつかない $K動画をリアルタイムにエンコードするにはCPUが少し足りない PythonサーバーのCPU負荷が高すぎる (100KBのダミーデータを30フレーム送ると90%程度) 実装が枯れていない ストリームを使っているのにChromeがストリームを読み取るときに空のデータを返すことがある Chromeのストリームの受信バッファが溢れる そもそも考えることが多い 受信側で動画・音声のバッファリングしないといけない? データグラムの場合、RTP的なパケロス対策や誤り訂正などが必要 ストリームでもデータ結合やシリアライズが必要 非同期処理周りで何か間違っているかも どれくらい重たいか どこの処理が重たいのか、処理を切り出して比較してみました。 4K動画 使用したデータは焚き火を使ってみました。 key value サイズ 3840 x 2180 fps 29.97 bps 20.80Mbps フォーマット H264 そもそも4K動画を再生すること自体無理があるのでしょうか? 少しずつ処理を足して確認してみます。 4K動画をvideoタグで再生するだけ 流石にハードウェア支援が効くので全く負荷はありませんでした。 Chrome Helper (GPU) が7.8%使うくらいでした。 フレームを読み取るだけ デコードされたフレームを読み取る処理はどうでしょうか? エンコード・送信処理をコメントアウトして試してみます。 タスク CPU使用率 Chrome Helper(Renderer) 8.3% Chrome Helper(GPU) 7.8% ほぼ問題ないレベルです。 エンコードするだけ(送信はしない) VP8はCPUエンコードなので、かなり重たいもののギリギリ30フレーム遅延くらいで処理が間に合っていました。 下のスクリーンショットは4K動画をエンコードだけ行い、送信しなかった場合の30フレームごとのサイズと遅延フレーム数です。 フレームサイズは約60KB、30フレームなので、60KB x 30 = 14.4Mbpsくらいのデータ量です。 遅延フレーム数はおおよそ16 - 20くらいで安定しました。 Read 150 frames. last frame = 1 Read 150 frames. last frame = 22 Read 150 frames. last frame = 16 Read 150 frames. last frame = 16 Read 150 frames. last frame = 16 Read 150 frames. last frame = 21 タスクマネージャーを見ると案の定、CPU負荷が高くなっていました。 これにPythonサーバーへの通信処理を加えると、エンコード処理が追いつかずChromeが大量のエラーを出力してフリーズしました。 (last frame = 66, 76 が処理できずに溜まっているフレーム数) フルHDではどうか? 次に動画をフルHDにしてみました。 遅延フレーム数は1となり、エンコードと送信のCPU負荷は問題ないようです。 ただし、配信開始してから数秒から10秒程度反応せず、そのあと早送りで再生されるような挙動でした。 また、4K配信の時もそうですが1分ほど配信しているとitermの方に下記のエラーが延々と出力されて固まることがありました。 [85146:13571:1127/215410.468886:ERROR:quic_stream.cc(781)] Fin already buffered Chromeのソースコードを見るとこの辺なので、バッファ周りの処理なのかもしれません。 それではVGAでは? もっと軽いVGA動画ではどうでしょうか? 試してみたところ、負荷は全く問題ありませんでしたが10秒間隔でバッファリングされたような挙動になり、 期待していたようなスムーズな受信処理ができませんでした。 Chrome側の実装の問題でスケジューリング周りの処理がまだ不安定なだけなら今後の実装である程度スムーズな配信ができるかもしれません。 もし、ブラウザ側である程度バッファリングしてからまとめて処理が走るようであれば、独自にバッファリングしてタイムスタンプを見ながらMediaStreamに流し込む処理を実装する必要がありそうです。 今はローカルでの処理ですし、WebSocketでさえ10秒バッファリングされるようなこともないので、早いところ実装がこなれてくると良いなぁと思います。 WebWorkerだから? とか非同期処理周りなど色々変えてみたりしましたが、 そもそも前回のテキストチャットの時点でリアルタイムではなく数秒のラグがありました。 まとめ まだデータグラムを試していないので次はそれでやってみます。 ともあれ、エンコーダーが4Kに対応しているのと24Mbpsくらいのダミーデータであれば問題なく送信はできているので 十分に4K配信できるだけのポテンシャルはありそうです。 しかも! MozillaがRustで実装しているQUICサーバーのNeqoがWebTransport関連のプルリクをマージしたようです! もし使えるようであれば、いよいよWebTransportサーバーを本格的に実装していけるかもしれません! WebTransportは元々がクラウドゲーミングの需要から実装が始まっていますが、 自分のパソコンが完全にクラウドになってしまう日も近いのかもしれませんね。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python3】0から作るPython初心者プログラミング【04】-あっち向いてほいプログラム-

【ご挨拶】こんにちは! ぬかさんエンジニアリングです。(4回目‼) 今回は、大人数であっち向いてほいトーナメントバトルが出来るプログラムをつくりたいと思います! 基本的には土台として前回の大人数じゃんけんプログラムのコードを転用して作ることになります。作り方だけ見たいという方は前回の記事も一緒に見てもらった方が理解度は高まるかと思います。己の力のみで0からお題に挑戦したいという方ものために「機能要件」を用意していますので、それだけ見て作ってみるのもいいかもしれません。 今回のプログラムからいよいよゲームらしくなっていきます。それに伴って初心者の中には少し難易度が高いと感じる方もいるかもしれません。どうしても行き詰った場合はまず寝てください。もしくは〔作り方〕をサラッと見てください。いずれにせよ最後まで作りきることを目標にして下さい。私もこの記事を作るためにクラスについて勉強し直して結構時間をが掛かりましたが、完成した時の達成感と脱力感はいい物でした。質問等も受け付けておりますので一緒に頑張りましょう! LGTMも是非よろしくお願いします‼ 本シリーズ初めての方へ 【趣旨】 Python初心者プログラマーが入門書で学んだ知識を実践的なコーディングを通じて身に着けていくためのお題を提供します。 お題を基に各自コーディングに挑戦していただいた後、この記事でコーディングの過程を答え合わせします。 【対象】 Pythonの入門書を読んで理解はしたけど何か目的をもって実践的にコーディングをしたい方。 ProgateでPythonコースをLv5まで勉強したけど応用力が身に着いていないと感じている方。 【初心者とは】 この記事シリーズでの初心者は下記の項目を理解済みであることが目安となっています。 [演算子, 標準ライブラリ, 条件分岐, 繰り返し処理, 例外処理, リスト, タプル, セット, 辞書, オブジェクト指向] 【利用ライブラリ & Python3 --version】 ・Google Colaboratory 以下のリンクからアクセスして使い方を確認した後、左上のファイルタブから「ノートブックを新規作成」を選択して自分のプログラムを作りましょう。ファイルはGoogleDriveに保存されるため、自分のGoogleアカウントと連携させるのを忘れないようにしましょう。 Colaboratory へようこそ ←ここからリンクへ飛ぶ ・Python 3.7.12 $$$$ それではさっそく本題に入っていきましょう 第【04】回 -あっち向いてほいプログラム- このシリーズ第四回目の今回は、前回の大人数じゃんけんプログラムから更にゲーム性をアップしてあっち向いてほいトーナメントバトルが出来るプログラムを実装します! ゲーム性がアップしただけありゲームの進み方も複雑になっていきます。どんな可能性でもプログラムがエラーを起こさず最後まで処理できるように何度もチェックしながら決められた「要件」を満たすプログラムを一緒に作っていきましょう! 【お題】あっち向てほいでトーナメントバトルが出来るプログラムをつくろう! hoi_battle_ground() #出力 ============================== あっち向いてほいBATTLEGROUNDS >>GAMESTART<< Press A ============================== A ============================================================ あっち向いてほいBATTLEGROUNDSへようこそ! このゲームでは最大64人でトーナメント戦を行います。 ルールは単純です。 あっち向いてほいで決められた点数を先に獲得したら勝利です。 トーナメントを勝ち抜いて優勝を目指して頑張りましょう! ============================================================ ============================================================ あっち向いてほいBATTLEGROUNDSの設定を決めてください 参加人数を4,8,16,32,64人から選択してください >8 プレイヤーの人数を選択してください(最大人数は8人です) >3 プレイヤー名を記入してください。>nu <nu>さんですね。エントリー完了しました。 プレイヤー名を記入してください。>ka <ka>さんですね。エントリー完了しました。 プレイヤー名を記入してください。>san <san>さんですね。エントリー完了しました。 試合のマッチポイントを入力してください。 >3 ============================================================ 3ポイントマッチ制 8人トーナメントスタート!! ----------------------------------- 第1回戦対戦表 コンピュータ5 vs ka コンピュータ3 vs コンピュータ4 コンピュータ1 vs nu コンピュータ2 vs san ----------------------------------- ----------------------------------- 第1回戦 第1試合 コンピュータ5 vs ka ----------------------------------- >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉1 コンピュータ5の手:パー kaの手:グー ◎◎コンピュータ5の勝ち!◎◎ >>>あっち向いてほい!<<< あなたの顔の向きを選択してください(整数:1,2,3,4) 〉1 コンピュータ5の手:← kaの手:↑ ××じゃんけんに戻る×× >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉1 コンピュータ5の手:グー kaの手:グー △△あいこでしょ!!△△ 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉1 コンピュータ5の手:チョキ kaの手:グー ◎◎kaの勝ち!◎◎ >>>あっち向いてほい!<<< {1: '↑', 2: '→', 3: '↓', 4: '←'} あなたの指の向きを選択してください(整数:1,2,3,4) 〉1 kaの手:↑ コンピュータ5の手:↑ ◎◎◎kaの勝利!1ポイント獲得!◎◎◎ 現在のポイント コンピュータ5:0ポイント ka:1ポイント >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉1 コンピュータ5の手:チョキ kaの手:グー ◎◎kaの勝ち!◎◎ >>>あっち向いてほい!<<< {1: '↑', 2: '→', 3: '↓', 4: '←'} あなたの指の向きを選択してください(整数:1,2,3,4) 〉1 kaの手:↑ コンピュータ5の手:↑ ◎◎◎kaの勝利!1ポイント獲得!◎◎◎ 現在のポイント コンピュータ5:0ポイント ka:2ポイント >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉1 コンピュータ5の手:グー kaの手:グー △△あいこでしょ!!△△ 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉1 コンピュータ5の手:チョキ kaの手:グー ◎◎kaの勝ち!◎◎ >>>あっち向いてほい!<<< {1: '↑', 2: '→', 3: '↓', 4: '←'} あなたの指の向きを選択してください(整数:1,2,3,4) 〉1 kaの手:↑ コンピュータ5の手:↑ ◎◎◎kaの勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりkaの勝利!<><><>< ----------------------------------- 第1回戦 第2試合 コンピュータ3 vs コンピュータ4 ----------------------------------- >>>じゃんけんぽん!<<< コンピュータ3の手:パー コンピュータ4の手:パー △△あいこでしょ!!△△ コンピュータ3の手:グー コンピュータ4の手:チョキ ◎◎コンピュータ3の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ3の手:↓ コンピュータ4の手:↑ ××じゃんけんに戻る×× >>>じゃんけんぽん!<<< コンピュータ3の手:チョキ コンピュータ4の手:チョキ △△あいこでしょ!!△△ コンピュータ3の手:グー コンピュータ4の手:チョキ ◎◎コンピュータ3の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ3の手:→ コンピュータ4の手:→ ◎◎◎コンピュータ3の勝利!1ポイント獲得!◎◎◎ 現在のポイント コンピュータ3:1ポイント コンピュータ4:0ポイント >>>じゃんけんぽん!<<< コンピュータ3の手:パー コンピュータ4の手:パー △△あいこでしょ!!△△ コンピュータ3の手:チョキ コンピュータ4の手:グー ◎◎コンピュータ4の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ4の手:← コンピュータ3の手:← ◎◎◎コンピュータ4の勝利!1ポイント獲得!◎◎◎ 現在のポイント コンピュータ3:1ポイント コンピュータ4:1ポイント >>>じゃんけんぽん!<<< コンピュータ3の手:チョキ コンピュータ4の手:パー ◎◎コンピュータ3の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ3の手:← コンピュータ4の手:← ◎◎◎コンピュータ3の勝利!1ポイント獲得!◎◎◎ 現在のポイント コンピュータ3:2ポイント コンピュータ4:1ポイント >>>じゃんけんぽん!<<< コンピュータ3の手:パー コンピュータ4の手:チョキ ◎◎コンピュータ4の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ4の手:↓ コンピュータ3の手:↓ ◎◎◎コンピュータ4の勝利!1ポイント獲得!◎◎◎ 現在のポイント コンピュータ3:2ポイント コンピュータ4:2ポイント >>>じゃんけんぽん!<<< コンピュータ3の手:チョキ コンピュータ4の手:グー ◎◎コンピュータ4の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ4の手:↓ コンピュータ3の手:↓ ◎◎◎コンピュータ4の勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりコンピュータ4の勝利!<><><>< ----------------------------------- 第1回戦 第3試合 コンピュータ1 vs nu ----------------------------------- >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} nuの出す手を入力してください(整数:1, 2, 3) 〉2 コンピュータ1の手:グー nuの手:チョキ ・ ・ ・ ・ ・ >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} nuの出す手を入力してください(整数:1, 2, 3) 〉2 コンピュータ1の手:グー nuの手:チョキ ◎◎コンピュータ1の勝ち!◎◎ >>>あっち向いてほい!<<< あなたの顔の向きを選択してください(整数:1,2,3,4) 〉2 コンピュータ1の手:→ nuの手:→ ◎◎◎コンピュータ1の勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりコンピュータ1の勝利!<><><>< ----------------------------------- 第1回戦 第4試合 コンピュータ2 vs san ----------------------------------- >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} sanの出す手を入力してください(整数:1, 2, 3) 〉3 コンピュータ2の手:チョキ sanの手:パー ・ ・ ・ ・ ・ >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} sanの出す手を入力してください(整数:1, 2, 3) 〉3 コンピュータ2の手:パー sanの手:パー △△あいこでしょ!!△△ 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} sanの出す手を入力してください(整数:1, 2, 3) 〉3 コンピュータ2の手:チョキ sanの手:パー ◎◎コンピュータ2の勝ち!◎◎ >>>あっち向いてほい!<<< あなたの顔の向きを選択してください(整数:1,2,3,4) 〉3 コンピュータ2の手:↓ sanの手:↓ ◎◎◎コンピュータ2の勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりコンピュータ2の勝利!<><><>< ----------------------------------- 準決勝対戦表 ka vs コンピュータ4 コンピュータ1 vs コンピュータ2 ----------------------------------- ----------------------------------- 準決勝戦 第1試合 ka vs コンピュータ4 ----------------------------------- >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉2 kaの手:チョキ コンピュータ4の手:パー ・ ・ ・ ・ ・ >>>じゃんけんぽん!<<< 選択肢: {1: 'グー', 2: 'チョキ', 3: 'パー'} kaの出す手を入力してください(整数:1, 2, 3) 〉2 kaの手:チョキ コンピュータ4の手:グー ◎◎コンピュータ4の勝ち!◎◎ >>>あっち向いてほい!<<< あなたの顔の向きを選択してください(整数:1,2,3,4) 〉2 コンピュータ4の手:→ kaの手:→ ◎◎◎コンピュータ4の勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりコンピュータ4の勝利!<><><>< ----------------------------------- 準決勝戦 第2試合 コンピュータ1 vs コンピュータ2 ----------------------------------- >>>じゃんけんぽん!<<< コンピュータ1の手:パー コンピュータ2の手:グー ・ ・ ・ ・ ・ >>>じゃんけんぽん!<<< コンピュータ1の手:パー コンピュータ2の手:グー ◎◎コンピュータ1の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ1の手:← コンピュータ2の手:← ◎◎◎コンピュータ1の勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりコンピュータ1の勝利!<><><>< ----------------------------------- 決勝対戦表 コンピュータ4 vs コンピュータ1 ----------------------------------- ----------------------------------- 決勝戦 コンピュータ4 vs コンピュータ1 ----------------------------------- >>>じゃんけんぽん!<<< コンピュータ4の手:チョキ コンピュータ1の手:グー ・ ・ ・ ・ ・ >>>じゃんけんぽん!<<< コンピュータ4の手:チョキ コンピュータ1の手:パー ◎◎コンピュータ4の勝ち!◎◎ >>>あっち向いてほい!<<< コンピュータ4の手:↑ コンピュータ1の手:↑ ◎◎◎コンピュータ4の勝利!1ポイント獲得!◎◎◎ ><><><>3ポイント先取によりコンピュータ4の勝利!<><><>< +*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+* *+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+ 栄えある優勝者はコンピュータ4!!おめでとうございます!!! *+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+ +*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+*+* もう一度プレイしますか?(Yes:y/No:n)n 【要件定義】 1.最大64人対戦が出来るようにしてください。 2.人間のプレイヤーの数は一人~プレイヤー全員まで自由に選択できるようにしてください。 3.プレイヤー全員のうち人間のプレイヤー以外は自動的にコンピュータを割り当ててください。 4.人間のプレイヤーに名前を付けて試合中もその名前で表示されるようにしてください。 5.試合のマッチポイントは自由に選択できるようにしてください。 6.トーナメントの各回戦の初めに全試合の対戦表を表示して分かりやすくしてください。 7.各試合の初めにその試合が「第○回戦第○試合」で「誰と誰」の対戦か表示して分かりやすくしてください。 8.試合で得点が入るたびに両者の現在のポイントを表示して進行具合を分かりやすくしてください。 9.各試合の勝者の名前を表示してください。 10.優勝者を表示してください。 11.ゲーム終了後に再度プレイするか選択できるようにしてください。 【機能要件】 (0.このゲームの概要とルール説明を表示する)←有無は問わない 1.4人、8人、16人、32人、64人からトーナメントバトルの参加人数を選択(入力)できる機構を作成してゲームに反映できるようにしてください。 2.人間が操作するプレイヤーの人数を入力できる機構を作成してゲームに反映できるようにしてください。 3.各人間プレイヤーの名前を入力できる機構を作成してゲームに反映できるようにしてください。 4.試合のマッチポイント数を入力できる機構を作成してゲームに反出来るようにしてください。 5.トーナメントにおけるプレイヤーの組み合わせがランダムになるように設計してください。 6.トーナメントの初めに「○ポイントマッチ制 ○人トーナメントスタート」表示してください。 7.トーナメントの各回戦の最初に「第○回戦対戦表」と全試合分の「プレイヤー名 vs プレイヤー名」を出力してください。 8.各試合の最初に「第○回戦 第○試合」とその試合の「プレイヤー名 vs プレイヤー名」を出力してください。 9.最後とそのひとつ前の回戦は「第○回戦」を「決勝」「準決勝」に変換して表示してください。 10.試合のルールは、じゃんけんに勝った人が指の向きを、じゃんけんに負けた人が顔の向きを選択し、同じ向きであった場合じゃんけんの勝者に勝利ポイントが入ります。 あっち向いてほいでお互いが選択した向きが異なる場合は再びじゃんけんからやり直すように設計してください。ポイントのマイナスはありません。 11.試合中どちらかがポイントを獲得したタイミングで両者の現在のポイントを表示するようにしてください。 12.決勝で勝ち残ったプレイヤーの名前を表示して表彰してください。 13.再度プレイするか選択できる機構を作成してください。 〔お助けヒント〕 ヒント1 プレイヤーのインスタンスは一つ一つ変数に代入せずに配列にまとめて格納し、取り出す場合はfor文を使うようにすると便利です。 ヒント2 新たにクラスを作成し初期化メソッドでプレイヤーのインスタンスの配列をインスタンス変数に代入します。そして、試合の一連の流れを実行できるインスタンスメソッドたちを作成してクラスにまとめるとあっち向いてほいプログラムが作りやすくなります。 ヒント3 このプログラムを要約すると「for文で1回戦~決勝まで回す中で更にfor文で第一試合~最終試合まで回し、その中で更にwhile文でどちらかがマッチポイントを獲るまでじゃんけんとあっち向いてほいを繰り返すプログラム」です。こんがらがったらまずは図示! ヒント4 インスタンスメソッド内でインスタンスメソッドを実行するネスト構造を使って処理を連鎖させて、返り値はネストを遡ってくる方法を使うと便利です。 【解答】 解答は以下の通りです。 ※この解答はあくまで私の回答です。各々の方法でお題が解けていればそれで全然かまいません。むしろ、もっと簡潔な方法があればご教示頂けると助かります import random #スコアシートクラス(プレイヤーの名前を保持、プレイヤーの得点を保持) class Scoresheet: def __init__(self, names, score=0): self.names = names self.score = score #ヒューマンクラス(じゃんけんの手の選択、あっち向いてほいの手の選択) class Human(Scoresheet): ##人間プレイヤーの手を選択 def pon(self): print('選択肢:', HANDS) while True: try: hand = input(f"{self.names}の出す手を入力してください(整数:1, 2, 3) 〉") if hand in ("1", "2", "3"): self.hand = HANDS[int(hand)] return except: print("整数:1, 2, 3を入力してください。") ##人間プレイヤーの指か顔の向きを選択 def hoi(self, j_result): if j_result == "w": print(FINGER) while True: try: finger = input("あなたの指の向きを選択してください(整数:1,2,3,4) 〉") if finger in ("1","2","3","4"): self.finger = FINGER[int(finger)] return except: print("整数1,2,3,4を入力してください") else: while True: try: face = input("あなたの顔の向きを選択してください(整数:1,2,3,4) 〉") if face in ("1","2","3","4"): self.face = FACE[int(face)] return except: print("整数1,2,3,4を入力して下さい") #コンピュータクラス(じゃんけんの手の選択、あっち向いてほいの手の選択) class Computer(Scoresheet): ##コンピュータの手を選択 def pon(self): self.hand = random.choice(list(HANDS.values())) ##コンピュータの指か顔の向きを選択 def hoi(self, j_result): if j_result == "w": self.finger = random.choice(list(FINGER.values())) else: self.face = random.choice(list(FACE.values())) #トーナメントクラス(試合のマッチング、各回戦の一連の試合を実行し敗者を選出、各回戦のスコアリセット) class Tournament: def __init__(self, players, tournament_num=None): self.players = players self.tournament_num = tournament_num ##プレイヤーの順番をランダムにシャッフル def shuffle(self): random_list = [player for player in self.players] random.shuffle(random_list) return random_list ##2人ずつのグループに分けて対戦表を作成&出力 def match(self): game_num = int(len(self.players) / 2) print("-"*35) if game_num == 1: print("決勝対戦表") elif game_num == 2: print("準決勝対戦表") else: print(f"第{self.tournament_num}回戦対戦表\n") for n in range(0, game_num): group_n = self.players[n*2:n*2+2] print(f"{group_n[0].names} vs {group_n[1].names}") print("-"*35) return game_num ##第〇回戦の全試合を実施して負けた参加者を配列(リスト型)に格納 def battle(self, game_num, match_point): losers_list = [] for n in range(0, game_num): group_n = self.players[n*2:n*2+2] print("-"*35) if game_num == 1: print("決勝戦") elif game_num == 2: print(f"準決勝戦 第{n+1}試合") else: print(f"第{self.tournament_num}回戦 第{n+1}試合") print(f"\n{group_n[0].names} vs {group_n[1].names}") print("-"*35) while True: loser = Janken(group_n).play(match_point) if loser: losers_list.append(loser) break return losers_list ##スコアリセット def score_reset(self): for player in self.players: player.score = 0 #じゃんけんクラス(じゃんけんをする際の手の選択と出力、判定、ポイントの加算と出力) class Janken: def __init__(self, group_n): self.group_n = group_n ##じゃんけんの一連の流れを実行 def play(self, match_point): print("\n>>>じゃんけんぽん!<<<") self.pon() while True: j_winner, j_loser = self.judge() if j_winner: print(f"\n\n◎◎{j_winner.names}の勝ち!◎◎") break else: print("\n△△あいこでしょ!!△△") self.pon() return Hoi(j_winner, j_loser).play(self.group_n, match_point) ##ヒューマンクラスとコンピュータクラスのponインスタンスメソッドを実行して各参加者のじゃんけんの手を出力 def pon(self): for player in self.group_n: player.pon() for player in self.group_n: print(f"{player.names}の手:{player.hand}") ##プレイヤーのじゃんけんの手があいこか否かを判定、あいこでなければ勝者を決定 def judge(self): hands = set(player.hand for player in self.group_n) if len(hands) != 2: return None, None hand1, hand2 = hands win = hand1 if hand1.stronger_than(hand2) else hand2 for player in self.group_n: if player.hand == win: winner = player else: loser = player return winner, loser #あっち向いてほいクラス(あっち向いてほいをする際の手と指の選択&出力、勝敗判定) class Hoi: def __init__(self, j_winner, j_loser): self.j_winner = j_winner self.j_loser = j_loser ##あっち向いてほいをプレイして勝者を出力し敗者を返り値で返す def play(self, group_n, match_point): print("\n\n>>>あっち向いてほい!<<<") self.hoi() if self.j_winner.finger == self.j_loser.face: print(f"\n\n◎◎◎{self.j_winner.names}の勝利!1ポイント獲得!◎◎◎") self.j_winner.score += 1 if self.j_winner.score < match_point: print(f"\n\n現在のポイント \ \n{group_n[0].names}:{group_n[0].score}ポイント \ \n{group_n[1].names}:{group_n[1].score}ポイント") return else: print(f"\n\n><><><>{match_point}ポイント先取により{self.j_winner.names}の勝利!<><><><\n\n") return self.j_loser else: print("××じゃんけんに戻る××") return ##プレイヤー同士の出した指と顔の向きを出力 def hoi(self): self.j_winner.hoi("w") self.j_loser.hoi("l") print(f"{self.j_winner.names}の手:{self.j_winner.finger}") print(f"{self.j_loser.names}の手:{self.j_loser.face}") #ハンドクラス(判定で2つの手の勝敗を決定) class Hand: def __init__(self, shape, stronger_than): self.shape = shape self._stronger_than = stronger_than ##デバッグ時にインスタンスを指定の形で出力する用 def __repr__(self): return repr(self.shape) ##インスタンスをstr型で出力する際にself.shapeを出力 def __str__(self): return self.shape ##2つの手のうち、一方の手が勝つ時の相手の手と実際の相手の手が同じだという返り値を返す def stronger_than(self, enemy): return self._stronger_than == enemy.shape #じゃんけんの手、あっち向いてほいの顔、あっち向いてほいの指の辞書 HANDS = {1: Hand("グー", "チョキ"), 2: Hand("チョキ", "パー"), 3: Hand("パー", "グー")} FACE = {1:"↑",2:"→",3:"↓",4:"←"} FINGER = {1:"↑",2:"→",3:"↓",4:"←"} #あっち向いてほいバトルグラウンドをスタート def hoi_battle_ground(): print("="*30 + "\n\nあっち向いてほいBATTLEGROUNDS\n\n\n >>GAMESTART<<\n\n Press A\n\n" + "="*30) while True: Button = input() if Button == "A": break print("\n\n\n\n\n" + "="*60 + "\n\n あっち向いてほいBATTLEGROUNDSへようこそ!" \ + "\n\n\n このゲームでは最大64人でトーナメント戦を行います。" \ + "\n ルールは単純です。" \ + "\n あっち向いてほいで決められた点数を先に獲得したら勝利です。" \ + "\n\n\n トーナメントを勝ち抜いて優勝を目指して頑張りましょう!\n\n " + "="*60 ) print("\n\n\n\n\n" + "="*60 + "\n\nあっち向いてほいBATTLEGROUNDSの設定を決めてください") ##トーナメントプレイ人数を選択 while True: try : all_player_num = int(input("\n\nプレイ人数を4,8,16,32,64人から選択してください >")) if all_player_num in [4, 8, 16, 32, 64]: break except: print("4,8,16,32,64を入力してください。") ##人間プレイヤーの人数を選択 while True: try: player_num = int(input(f"\n\n人間プレイヤーの人数を選択してください(最大人数は{all_player_num}人です) >")) if 1 <= player_num & player_num <= all_player_num: break except: print("数字以外を入力しないでください。") ##各人間プレイヤーの名前の入力 player_name_list =[] for _ in range(0, player_num): player_name = input("\n\n人間プレイヤーの名前を記入してください。>") player_name_list.append(player_name) print(f"<{player_name}>さんですね。エントリー完了しました。") ##試合のマッチポイントの入力 while True: try: match_point = int(input(f"\n\n試合のマッチポイントを入力してください。 >")) if match_point >= 1: break except: print("1以上の整数を入力してください。") print("\n" + "="*60 + "\n\n\n\n\n") ##プレイヤーのインスタンスを配列(タプル型)に格納 com_num = all_player_num - player_num players = *[Human(i) for i in player_name_list], *[Computer(f"コンピュータ{i}") for i in range(1, com_num + 1)] ##プレイヤーのインスタンスをシャッフルして配列(リスト型)に格納 random_list = Tournament(players).shuffle() ##トーナメントが何回戦まであるか算出 tournament_num = [i for i in range(1, all_player_num + 1) if all_player_num % i == 0] ##試合開始 print(f"\n\n\n{match_point}ポイントマッチ制 {all_player_num}人トーナメントスタート!!\n\n") for n in range(1, len(tournament_num)): ###各2人組に分けた際の試合数 game_num = Tournament(random_list, n).match() ###プレイヤーのリストから試合の敗者を除外 losers_list = Tournament(random_list, n).battle(game_num, match_point) for loser in losers_list: random_list.remove(loser) if len(random_list) == 1: print(" " + "+*"*33) print("*+"*34) print(f" 栄えある優勝者は{random_list[0].names}!!おめでとうございます!!!") print("*+"*34) print(" " + "+*"*33) ###勝ち残った参加者の勝利ポイントをリセット Tournament(random_list).score_reset() ##もう一度プレイするか選択 while True: try: restart = input("\n\nもう一度プレイしますか?(Yes:y/No:n)") if restart == "y": hoi_battle_ground() break elif restart == "n": break except: print("yかnを入力してください") #あっち向いてほいグラウンドメソッドを実行します。 hoi_battle_ground() 【解説】 〔作り方〕 ①各種ゲーム設定を入力できる機構を作成 ⑴プレイヤー人数入力機構 ##トーナメントプレイ人数を選択 while True: try : all_player_num = int(input("\n\nプレイ人数を4,8,16,32,64人から選択してください >")) if all_player_num in [4, 8, 16, 32, 64]: break except: print("4,8,16,32,64を入力してください。") ⑵人間プレイヤー人数入力機構 ##人間プレイヤーの人数を選択 while True: try: player_num = int(input(f"\n\n人間プレイヤーの人数を選択してください(最大人数は{all_player_num}人です) >")) if 1 <= player_num & player_num <= all_player_num: break except: print("数字以外を入力しないでください。") ⑶各人間プレイヤーの名前入力機構 ##各人間プレイヤーの名前の入力 player_name_list =[] for _ in range(0, player_num): player_name = input("\n\n人間プレイヤーの名前を記入してください。>") player_name_list.append(player_name) print(f"<{player_name}>さんですね。エントリー完了しました。") ⑷試合のマッチポイント入力機構 ##試合のマッチポイントの入力 while True: try: match_point = int(input(f"\n\n試合のマッチポイントを入力してください。 >")) if match_point >= 1: break except: print("1以上の整数を入力してください。") ②プレイヤーのインスタンスを配列(タプル)に格納 ⑴プレイヤーのインスタンスを配列に格納してまとめて扱いやすくする ##プレイヤーとコンピュータのインスタンスを配列(タプル型)に格納 com_num = all_player_num - player_num players = *[Human(i) for i in player_name_list], *[Computer(f"コンピュータ{i}") for i in range(1, com_num + 1)] """ playersの中身(player:3, computer:5) (<__main__.Human object at 0x7fd29607cad0>, <__main__.Human object at 0x7fd29607cc90>, <__main__.Human object at 0x7fd29607c2d0>, <__main__.Computer object at 0x7fd2960cd050>, <__main__.Computer object at 0x7fd29607cc50>, <__main__.Computer object at 0x7fd29607cb10>, <__main__.Computer object at 0x7fd29607c8d0>, <__main__.Computer object at 0x7fd29607ce90>) """ 詳しい解説は前回の「③参加者のインスタンス配列を受け取りじゃんけんを実行するJankenクラスを作成」をご覧ください。 ③各回戦の結果から優勝者を出力する機構を作成 ⑴各回戦の敗者をプレイヤーリストから除き、最後の一人を優勝者として出力する機構 ##トーナメントが何回戦まであるか算出(総プレイヤー数の約数-1) tournament_num = [i for i in range(1, all_player_num + 1) if all_player_num % i == 0] ##試合開始 print(f"\n\n\n{match_point}ポイントマッチ制 {all_player_num}人トーナメントスタート!!\n\n") for n in range(1, len(tournament_num)): ###各2人組に分けた際の試合数 game_num = Tournament(random_list, n).match() ###プレイヤーのリストから試合の敗者を除外 losers_list = Tournament(random_list, n).battle(game_num, match_point) for loser in losers_list: random_list.remove(loser) if len(random_list) == 1: print(" " + "+*"*33) print("*+"*34) print(f" 栄えある優勝者は{random_list[0].names}!!おめでとうございます!!!") print("*+"*34) print(" " + "+*"*33) ###勝ち残った参加者の勝利ポイントをリセット Tournament(random_list).score_reset() この機構では、総プレイヤー人数からトーナメントが何回戦まであるか算出しその回数分for文を回します。各回戦ごとにlosers_listに敗者のインスタンスを格納してrandom_listからremoveメソッドを使って除いていきます。 敗者を取り除いたrandom_listにプレイヤーのインスタンスが複数残っている場合はfor文で次回戦に移ります。その際、前回戦で獲得したスコアを0ポイントにリセットします。 インスタンスが1つしか残っていない場合はそのインスタンスを優勝者として名前を出力します。 Tournamentクラスのインスタンスメソッドが何をしているかについては③で詳しく解説します。 トーナメントが何回戦まであるか算出する方法 総プレイヤー数を1~総プレイヤー数で割って整数で割り切れる回数-1 == 約数-1 16人の場合 16/1,16/2,16/4,16/8,16/16 →4回戦まで 32人の場合 32/1,32/2,32/4,32/8,32/16,32/32 →5回戦まで ④プレイヤーのインスタンス配列を受け取りトーナメントの各試合を実行するTournamentクラスを作成 ⑴初期化メソッドを作成 #トーナメントクラス(試合のマッチング、各回戦の一連の試合を実行し敗者を選出、各回戦のスコアリセット) class Tournament: def __init__(self, players, tournament_num=None): self.players = players self.tournament_num = tournament_num このTournamentクラスは初期化メソッドでインスタンス変数self.playersに全プレイヤーのインスタンスをまとめて代入します。 前回はJankenクラスがこの役割を果たしていましたが、今回はこのTournamentクラスが担っています。全プレイヤーを扱うインスタンスメソッドを作成して有効的にインスタンス変数self.playersを使っていきます。 ⑵プレイヤーのインスタンス配列の順序をランダムに変更するshuffleインスタンスメソッドを作成 ##プレイヤーの順番をランダムにシャッフル def shuffle(self): random_list = [player for player in self.players] random.shuffle(random_list) return random_list ##プレイヤーのインスタンスをシャッフルして配列(リスト型)に格納 random_list = Tournament(players).shuffle() Tournamentクラスのインスタンス作成とshuffle インスタンスメソッドの実行を同時に行うと、playersの中身のインスタンスがランダムにシャッフルされrandom_listとして戻ってきます。 一回トーナメントバトルが終わるまではずっとここで決まったインスタンスの組み合わせで試合が行われます。 ⑶各回戦ごとにトーナメントの対戦表を出力するmatchインスタンスメソッドを作成 ##2人ずつのグループに分けて対戦表を作成&出力 def match(self): game_num = int(len(self.players) / 2) print("-"*35) if game_num == 1: print("決勝対戦表") elif game_num == 2: print("準決勝対戦表") else: print(f"第{self.tournament_num}回戦対戦表\n") for n in range(0, game_num): group_n = self.players[n*2:n*2+2] print(f"{group_n[0].names} vs {group_n[1].names}") print("-"*35) return game_num ##試合開始 print(f"\n\n\n{match_point}ポイントマッチ制 {all_player_num}人トーナメントスタート!!\n\n") for n in range(1, len(tournament_num)): ###各2人組に分けた際の試合数 game_num = Tournament(random_list, n).match() #<<<<<<<<<< 試合開始して各回戦が始まる前にTournamentクラスのmatchインスタンスメソッドを実行します。 このインスタンスメソッドは各回戦の試合数を計算しgame_numに代入して返り値として返します。また、各回戦のトーナメント対戦表を出力してくれます。 ⑷各試合の一連の流れを実行するbattleインスタンスメソッドを作成 ##第〇回戦の全試合を実施して負けた参加者を配列(リスト型)に格納 def battle(self, game_num, match_point): losers_list = [] for n in range(0, game_num): group_n = self.players[n*2:n*2+2] print("-"*35) if game_num == 1: print("決勝戦") elif game_num == 2: print(f"準決勝戦 第{n+1}試合") else: print(f"第{self.tournament_num}回戦 第{n+1}試合") print(f"\n{group_n[0].names} vs {group_n[1].names}") print("-"*35) while True: loser = Janken(group_n).play(match_point) if loser: losers_list.append(loser) break return losers_list ##試合開始 print(f"\n\n\n{match_point}ポイントマッチ制 {all_player_num}人トーナメントスタート!!\n\n") for n in range(1, len(tournament_num)): ###各2人組に分けた際の試合数 game_num = Tournament(random_list, n).match() ###プレイヤーのリストから試合の敗者を除外 losers_list = Tournament(random_list, n).battle(game_num, match_point) #<<<<<<<<<< 試合開始して各回戦が始まるとTournamentクラスのbattleインスタンスメソッドを実行します。このインスタンスメソッドを実行すると、各回戦ごとの全試合から敗者をリストにして返してくれます。 battleインスタンスメソッドの引数にgame_num(試合数)とmatch_pointを渡すことで、一試合match_point分先取制で受け取った試合数game_num分をfor文で繰り返します。 一試合の流れを説明すると、まずその試合が何回戦で第何試合で誰と誰が対戦しているのか出力します。 次に、Jankenクラスのplayインスタンスメソッドを実行して返り値として敗者のインスタンスが帰ってきたらwhile文を終了してlosers_listに敗者のインスタンスを追加します。 for文を全試合数分回したらloser_listをbattleインスタンスメソッドの返り値として返します。 ⑸各回戦ごとに勝ち残ったプレイヤーの勝利ポイントを0にリセットするscore_resetインスタンスメソッドを作成 ##スコアリセット def score_reset(self): for player in self.players: player.score = 0 ##試合開始 print(f"\n\n\n{match_point}ポイントマッチ制 {all_player_num}人トーナメントスタート!!\n\n") for n in range(1, len(tournament_num)): ###各2人組に分けた際の試合数 game_num = Tournament(random_list, n).match() ###プレイヤーのリストから試合の敗者を除外 losers_list = Tournament(random_list, n).battle(game_num, match_point) for loser in losers_list: random_list.remove(loser) if len(random_list) == 1: print(" " + "+*"*33) print("*+"*34) print(f" 栄えある優勝者は{random_list[0].names}!!おめでとうございます!!!") print("*+"*34) print(" " + "+*"*33) ###勝ち残った参加者の勝利ポイントをリセット Tournament(random_list).score_reset() #<<<<<<<<<< 各回戦が終了した時点で勝ち残っているプレイヤーの勝利ポイントは全員3ポイントあります。このまま次回戦を始めてしまうと最初から3ポイント獲得した状態になってしまいます。 この問題を解決するのがこのscore_resetインスタンスメソッドです。 このインスタンスメソッドを実行すると各プレイヤーの勝利ポイントに0を代入してくれます。これで問題なく次回戦に進ませることが出来ます。 ⑤Jankenクラスを一部変更 ⑴勝敗判定部分を一部変更 ##じゃんけんの一連の流れを実行 def play(self, match_point): print("\n>>>じゃんけんぽん!<<<") self.pon() while True: ###judgeインスタンスメソッドの返り値の第一項をj_winnerに、第二項をj_loserに代入 j_winner, j_loser = self.judge() if j_winner: ###j_winnerがTrue、つまり勝者がいる場合 print(f"\n\n◎◎{j_winner.names}の勝ち!◎◎") break else: ###いない場合はもう一度じゃんけん print("\n△△あいこでしょ!!△△") self.pon() ##勝者が決まったら返り値であっち向いてほいを実行する return Hoi(j_winner, j_loser).play(self.group_n, match_point) ##プレイヤーのじゃんけんの手があいこか否かを判定、あいこでなければ勝者を決定 def judge(self): hands = set(player.hand for player in self.group_n) if len(hands) != 2: return None, None hand1, hand2 = hands win = hand1 if hand1.stronger_than(hand2) else hand2 ###二人のうち勝者はインスタンスをwinnerに代入、敗者はloserに代入する形に変更 for player in self.group_n: if player.hand == win: winner = player else: loser = player ###複数返り値 return winner, loser 前回の大人数じゃんけんプログラムで使用したコードを基本的には転用していますが、今回は勝者に加え敗者も明確にする必要があるのでjudgeインスタンスメソッドを変更しました。 詳しくは上記のコードのメモをご覧ください。 ⑵返り値でHoiクラスのplayインスタンスメソッドを実行 ##じゃんけんの一連の流れを実行 def play(self, match_point): print("\n>>>じゃんけんぽん!<<<") self.pon() while True: ###judgeインスタンスメソッドの返り値の第一項をj_winnerに、第二項をj_loserに代入 j_winner, j_loser = self.judge() if j_winner: ###j_winnerがTrue、つまり勝者がいる場合 print(f"\n\n◎◎{j_winner.names}の勝ち!◎◎") break else: ###いない場合はもう一度じゃんけん print("\n△△あいこでしょ!!△△") self.pon() ##勝者が決まったら返り値であっち向いてほいを実行する return Hoi(j_winner, j_loser).play(self.group_n, match_point) #<<<<<<<<<< Hoiクラスのインスタンスにj_winner, j_loser(勝者と敗者)を渡します。 同時にHoiクラスのplayインスタンスメソッドに引数としてself.group_n(その試合の二人のプレイヤー)を第一項に、match_pointを第二項に渡しています。 これであっち向いてほいを実行することができます。 詳しい意味は⑥で説明します。 ⑥Hoiクラスを作成 ⑴初期化メソッドを作成 #あっち向いてほいクラス(あっち向いてほいをする際の手と指の選択&出力、勝敗判定) class Hoi: def __init__(self, j_winner, j_loser): self.j_winner = j_winner self.j_loser = j_loser Jankenクラスの返り値としてHoiクラスのインスタンスを作成しました。その際に受け取った勝者と敗者のインスタンスを初期化メソッドでself.j_winner、self.j_loserに保持します。 ⑵あっち向いてほいの一連の流れを実行するplayインスタンスメソッドを作成 ##あっち向いてほいをプレイして勝者を出力し敗者を返り値で返す def play(self, group_n, match_point): print("\n\n>>>あっち向いてほい!<<<") self.hoi() if self.j_winner.finger == self.j_loser.face: print(f"\n\n◎◎◎{self.j_winner.names}の勝利!1ポイント獲得!◎◎◎") self.j_winner.score += 1 if self.j_winner.score < match_point: print(f"\n\n現在のポイント \ \n{group_n[0].names}:{group_n[0].score}ポイント \ \n{group_n[1].names}:{group_n[1].score}ポイント") return else: print(f"\n\n><><><>{match_point}ポイント先取により{self.j_winner.names}の勝利!<><><><\n\n") return self.j_loser else: print("××じゃんけんに戻る××") return このインスタンスメソッドではあっち向いてほいの一連の流れを実行します。 まずはself.hoiを実行してプレイヤーに顔か指を選択させ出力します。 詳しい内容は⑥の⑶で解説します。 次に、じゃんけん勝者の指とじゃんけん敗者の顔の向きが同じか判定します。 向きが同じ場合はじゃんけん勝者の勝利となり、勝者の名前を出力した後にポイントを追加します。 勝利ポイントがマッチポイント未満である場合は両者の現在のポイントを出力した後に空の返り値を返します。ここで空の返り値を返すとJankenクラスのplayインスタンスメソッドの返り値も空になり、Tournamentクラスのbattleインスタンスメソッドのwhile文から抜け出せずもう一度じゃんけんからやり直しになります。 勝利ポイントがマッチポイントまで達した場合は勝者の名前を出力した上で敗者のインスタンスを返り値として返します。敗者を受け取ることでTournamentクラスのbattleインスタンスメソッドのwhile文から抜け出せて次の試合に移ることができます。 向きが異なる場合は空の返り値を返して、勝利ポイントがマッチポイント未満であった場合と同じくじゃんけんをやり直します。 ⑶プレイヤーに顔か指を選択させ出力するhoiインスタンスメソッドを作成 ##プレイヤー同士の出した指と顔の向きを出力 def hoi(self): self.j_winner.hoi("w") self.j_loser.hoi("l") print(f"{self.j_winner.names}の手:{self.j_winner.finger}") print(f"{self.j_loser.names}の手:{self.j_loser.face}") このインスタンスメソッドではプレイヤーにあっち向いてほいの顔か指を選択させます。 Humanクラスのインスタンスメソッドself.j_winner.hoi()とComputerクラスのインスタンスメソッドself.j_loser.hoi()を実行することで選択してもらいます。引数で"w"と"l"を渡しているのは、HumanクラスとComputerクラス側で顔の選択か指の選択か場合分けを行う為です。詳しくは⑦の⑴と⑵でコードを確認してみてください。 ⑦HumanクラスとComputerクラスを一部変更 ⑴ヒューマンクラスにhoiインスタンスメソッドを作成 #ヒューマンクラス(じゃんけんの手の選択、あっち向いてほいの手の選択) class Human(Scoresheet): ##人間プレイヤーの手を選択 def pon(self): print('選択肢:', HANDS) while True: try: hand = input(f"{self.names}の出す手を入力してください(整数:1, 2, 3) 〉") if hand in ("1", "2", "3"): self.hand = HANDS[int(hand)] return except: print("整数:1, 2, 3を入力してください。") ##人間プレイヤーの指か顔の向きを選択 def hoi(self, j_result): if j_result == "w": print(FINGER) while True: try: finger = input("あなたの指の向きを選択してください(整数:1,2,3,4) 〉") if finger in ("1","2","3","4"): self.finger = FINGER[int(finger)] return except: print("整数1,2,3,4を入力してください") else: while True: try: face = input("あなたの顔の向きを選択してください(整数:1,2,3,4) 〉") if face in ("1","2","3","4"): self.face = FACE[int(face)] return except: print("整数1,2,3,4を入力して下さい") FACE = {1:"↑",2:"→",3:"↓",4:"←"} FINGER = {1:"↑",2:"→",3:"↓",4:"←"} Humanクラスにhoiインスタンスメソッドを追加します。 Hoiクラスのhoiインスタンスメソッドで渡された引数"w"か"l"によって顔の選択か指の選択か場合分けを行います。指はself.fingerに、顔はself.faceに代入します。 ⑵コンピュータクラスにhoiインスタンスメソッドを作成 #コンピュータクラス(じゃんけんの手の選択、あっち向いてほいの手の選択) class Computer(Scoresheet): ##コンピュータの手を選択 def pon(self): self.hand = random.choice(list(HANDS.values())) ##コンピュータの指か顔の向きを選択 def hoi(self, j_result): if j_result == "w": self.finger = random.choice(list(FINGER.values())) else: self.face = random.choice(list(FACE.values())) FACE = {1:"↑",2:"→",3:"↓",4:"←"} FINGER = {1:"↑",2:"→",3:"↓",4:"←"} Computerクラスにhoiインスタンスメソッドを追加します。 Hoiクラスのhoiインスタンスメソッドで渡された引数"w"か"l"によって顔の選択か指の選択か場合分けを行います。指はself.fingerに、顔はself.faceに代入します。 ⑧スーパークラスScoresheetを作成 ⑴HumanクラスとComputerクラスに共通する「属性」である"名前"と"勝利ポイント"をスーパークラスのインスタンス変数に保持させる #スコアシートクラス(プレイヤーの名前を保持、プレイヤーの得点を保持) class Scoresheet: def __init__(self, names, score=0): self.names = names self.score = score HumanクラスとComputerクラスに共通してある「属性」は、"名前"と"勝利ポイント"です。前回の「②ヒューマンクラスとコンピュータクラスに名前と勝利ポイントを受け取る初期化メソッドを追加」ではHumanクラスとComputerクラスにそれぞれ初期化メソッドで"名前"と"勝利ポイント"を保持していたのですが、今回はScoresheetスーパークラスを使います。ScoresheetクラスはHumanクラスやComputerクラスを包含する関係にあるため、プレイヤーの”名前”と”勝利ポイント”をまとめて全員分一つのクラスの初期化メソッドに保持しておくと関係が分かりやすいです。また、後々全員の「属性」に対してまとめて処理を施したいときに役立ちます。 クラスの継承の基礎について 既存のクラスAのインスタンス変数やインスタンスメソッドを新たに作ったクラスBが引き継いで使用することができます。この仕組みを継承と言います。 継承されるクラスを「スーパークラス」「親クラス」「基底クラス」、 継承するクラスを「サブクラス」「子クラス」「派生クラス」などと言います。 ※今後は「スーパークラス/サブクラス」を使って説明していきます。 オーバーライドについて スーパークラスに既存のインスタンス変数やインスタンスメソッドをサブクラスで新たに書き換えたい場合に行うのがオーバーライドです。 だた、書き換えた後に元のインスタンスメソッドやインスタンス変数を使いたい場合があります。そんな時に使うのがsuper()です。 インスタンスメソッドの場合 super().スーパー<font color="81C4FF">**クラス**</font>のインスタンスメソッド名() インスタンス変数の場合 def __init__(self, スーパークラスの初期化メソッドの引数, サブクラスの初期化メソッドの引数): super().__init__(スーパークラスの初期化メソッドの引数) このようにしてサブクラスで書き換えたものと元のスーパークラスのもの両方使えるようになります。 ⑨再度プレイするか選択させる機構を作成 ⑴再度プレイ機構 ##もう一度プレイするか選択 while True: try: restart = input("\n\nもう一度プレイしますか?(Yes:y/No:n)") if restart == "y": hoi_battle_ground() break elif restart == "n": break except: print("yかnを入力してください") 【終わりに】 今回はあっち向いてほいプログラムを作っていきました。 難易度はいかがだったでしょうか。機能要件はいかがだったでしょうか。解説はいかがだったでしょうか。なるべく正確かつ分かりやすい解説に努めましたが、もしもコーディングの部分で間違いなどありましたら編集リクエストからご指摘いただけると幸いです。また、その他質問などございましたらコメントをお願い致します。 今回作ったプログラムで地味に大変だったのが、出力の見やすさを確保するためにprintでの改行や装飾の調整をすることでした。もっと見やすい形で出力できないかなと考えていた時に出会ったのがGUIでした。GUIはGraphical User Interfaceと言って、ユーザーが見やすいようにウィンドウ上で図やボタンなどを表示して選択や操作ができるインターフェイスのことです。私たちが利用しているユーザー向けアプリのほとんどはこのGUIを使って使いやすさを実現しているようです。 ということで次回は、今回作ったあっち向いてほいプログラムから「あっち向いてほいプログラム」のGUIアプリを作りたいと思います。下記リンクからどうぞ! →ぬかさんエンジニアリングの次回作をお待ちください この記事が良かったと感じたらLGTMを宜しくお願いします!それではまた次回!
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python初心者がノンプロ研でPython学んでみた vol.3

はじめに こんにちは、な~です。ノンプロ研でPythonを学び始めました。その学びについての第3段です。 今回学ぶこと リスト タプル 辞書 リスト リストはインデックスで管理するデータの集合です。 リストは角括弧[]内に、リストの要素をカンマで区切って並べます。 また、要素を呼び出すときは、リスト名[インデックス]で呼び出すことができます。 また、リストの部分選択をスライスといいます。 リスト名[開始位置,終了位置、増分]でリストの部分選択ができます。 リスト要素の存在チェックは、値 in リストで確認できます。 for inを使って、リスト内の要素1つ1つについて繰り返し処理ができます。 2次元リストは、[]の中に[]を入れることによって表現できます。 number = [[10, 30, 20, 40],[11,31,21],[12]] number[0]では、外側の配列の1つ目の要素([10, 30, 20, 40])が取得できます。 number[1][2]は、外側の配列の2つ目の要素の中の3つ目の要素([21])が取得できます。 タプル タプルは、インデックスで管理するデータの集合ですが、リストとは違いタプルの要素は上書きが禁止されています。 タプルは、括弧()内に、タプルの要素をカンマで区切って並べます。また、要素を呼び出すときは、タプル名[インデックス]で呼び出すことができます。 タプルは、リストと同じように「値 in タプル」で存在確認をしたり、「for 変数 in タプル」で繰り返し処理ができます。 辞書 辞書とは、キーで管理するデータの集合です。 辞書は、{}の間にキー:バリューをセットにしてカンマで区切って定義します。 {キー1: バリュー1, キー2: バリュー2, …} 要素の参照は、辞書[キー]です。 また、存在しないキーを使って代入すると辞書に要素を追加できます。 リストやタプルと同じように「値 in 辞書」で存在確認をしたり、「for 変数 in 辞書」で繰り返し処理ができます。ただし、「for 変数 in 辞書」の変数はキーになるので注意が必要そうです。 まとめ 今回は、リスト・タプル・辞書について学びました。 まだまだ、しっかりイメージできている自信はありませんが、GASの配列やオブジェクトと比べたり、使っていくことでしっかりと身に付けたいです。 いつものように、ツイートもまとめました。 ノンプロ研 Python初心者講座第3回目 byな~
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む