- 投稿日:2019-08-11T23:48:48+09:00
【Python入門】クラスの使い方
きっかけ
classの概念は初心者である私にとって非常に難しい。
今まで学習から逃げ続けてきたが、現在取り組んでいるkaggleコンペではよく、sklearnの準拠モデルから、分類器を作成しているのをよく見かけますので、復習も兼ねて再整理をしてみます。
クラスの定義とインスタンス(オブジェクト)の生成
class Car: def display(self): print('model:', self.model) print('model:', self.color) car1 = Car() car1.model = 'boxcar' car1.color = 'white' car1.display()model: boxcar model: whitecar2 = Car() car2.model = 'sedan' car2.color = 'gray' car2.display()model: sedan model: grayメソッドとは、クラス内で定義した関数のこと。
構造は、class クラス名: def メソッド名(self, 引数) 処理
__init__
メソッドclass Car: def __init__(self, model, color): self.model = model self.color = color def display(self): print('model:', self.model) print('color:', self.color) car1 = Car('roadster', 'red') car1.display()model: roadster color: redインスタンスのデータ属性名に値を設定するには...
方法①
変数 = クラス名 変数.データ属性名 = 値方法② インスタンス生成と同時に、データ属性を設定
class クラス名: def __init__(self, 引数, ...): self.データ属性名 = 値 変数 = クラス名(引数, ...)メリット:
①データ属性の設定が防げる!
②プログラムを簡潔にできる!*コンストラクタとは、インスタンスが生成される時に実行されるメソッド(関数)の一種です。インスタンスの生成、扱う変数などの初期化を主に行います。
マングリング
クラス内部だけで使うメソッドを定義。
内部だけで使うメソッドを間違って外部から呼ぶのを防ぐ。class Car: def __init__(self, model, color): #__init__はマングリングではない self.model = model self.color = color self.speed = 0 def display(self): print('model:', self.model) print('color:', self.color) print('speed:', self.speed) def accelerate(self, pressure): self.speed += pressure self.__check_speed() #__check_speedメソッド呼び出し def __check_speed(self): #__check_speedはマングリング if self.speed > 100: self.speed = 100 car1 = Car('roadster', 'red') car1.display() #()が抜けるとメソッド呼び出しができないmodel: roadster color: red speed: 0car1.accelerate(50) car1.accelerate(60) car1.display()model: roadster color: red speed: 100クラス属性
class Car: SPEED_LIMIT = 90 def __init__(self, model, color): #__init__はマングリングではない self.model = model self.color = color self.speed = 0 def display(self): print('model:', self.model) print('color:', self.color) print('speed:', self.speed) def accelerate(self, pressure): self.speed += pressure self.__check_speed() def __check_speed(self): #__check_speedはマングリング if self.speed > Car.SPEED_LIMIT: #or self.SPEED_LIMIT でもOK self.speed = Car.SPEED_LIMIT car1 = Car('roadster', 'red') car1.accelerate(40) car1.accelerate(60) car1.display()継承とオーバーライド
''' class Car: SPEED_LIMIT = 90 def __init__(self, model, color): #__init__はマングリングではない self.model = model self.color = color self.speed = 0 def display(self): print('model:', self.model) print('color:', self.color) print('speed:', self.speed) def accelerate(self, pressure): self.speed += pressure self.__check_speed() def __check_speed(self): #__check_speedはマングリング if self.speed > Car.SPEED_LIMIT: #or self.SPEED_LIMIT でもOK self.speed = Car.SPEED_LIMIT ''' class Truck(Car): #Carクラスを基底として、Truckクラスを派生される def __init__(self, model, color, cargo): #cargo追加 Car.__init__(self, model, color) self.cargo = cargo def display(self): print('cargo:', self.cargo) truck1 = Truck('trailer', 'yellow', 'potato') truck1.accelerate(30) truck1.display()cargo: potato継承:既存のクラスが持つ機能を新しいクラスですべて引き継ぐ
継承におけるメソッドの追加
''' class Car: SPEED_LIMIT = 90 def __init__(self, model, color): #__init__はマングリングではない self.model = model self.color = color self.speed = 0 def display(self): print('model:', self.model) print('color:', self.color) print('speed:', self.speed) def accelerate(self, pressure): self.speed += pressure self.__check_speed() def __check_speed(self): #__check_speedはマングリング if self.speed > Car.SPEED_LIMIT: #or self.SPEED_LIMIT でもOK self.speed = Car.SPEED_LIMIT ''' class Truck(Car): #オーバーライド def __init__(self, model, color, cargo): Car.__init__(self, model, color) self.cargo = cargo def display(self): Car.display(self) print('cargo:', self.cargo) #新規メソッド def load(self, cargo): self.cargo = cargo truck1 = Truck('trailer', 'yellow', 'potato') truck1.load('lettuce') truck1.display()model: trailer color: yellow speed: 0 cargo: lettuce参照
・100問でわかるPython 単行本 松浦 健一郎 (著), 司 ゆき (著)
・sklearn準拠モデルの作り方
・https://techacademy.jp/magazine/18842
- 投稿日:2019-08-11T23:30:37+09:00
自作 Package の submodule の import したら __all__ に入れる?
21世紀も20年経とうとしている今、Python スクリプトを走らせてスペルミスなどの単純ミスて止まるのは文明的じゃない; flake8 などの静的解析を使おう。それは多くの記事があるのでいいとして、自作の package に flake8 を使うと flake8
F401 'mypackage.module1' imported but unusedみたいなことをいう。その import には意味があるんだけど... という話。
設定
package とは複数のモジュールがひとつのディレクトリに入ったもの:
mypackage/ + __init__.py + module1.py + module2.pyimport mypackageして使う。
module1.py
では関数function1()
が、module2 にはclass Class2
が定義してあるとしよう。問題
import mypackage mypackage.module1.function1() c = mypackage.Class2()のように使うために
__init__.py
にimport mypackage.module1 from mypackage.module2 import Class2と書きたい。すると、
$ flake8 mypackage/*.py mypackage/__init__.py:1:1: F401 'mypackage.module1' imported but unused mypackage/__init__.py:2:1: F401 'mypackage.module2.Class2' imported but unusedと言われる。確かに
__init__.py
では使ってないけど意味はあるんだ。これらの import 文が無いとimport mypackage.module1 from mypackage.module2 import Class2と毎回書かなければならないので、それはナイ。PEP8的な正解は?
解決法1
編集中...
解決法2
from . import module1 from .module2 import Class2 __all__ = ['module1', 'Class2']まず、
.
を使えば「このパッケージ」を意味するので、mypackage
と繰り返し書かなくて良い。パッケージ名を変えて修正しなくていいという利点もある。そして__all__
というリストに import したものを書く。すると、unused でなくなったので flake8 さんは満足。
__all__
とはflake8 を満足させるためだけのダミーの変数というわけではなく、一応意味はある。
__init__.py
に import を書かず、__all__ = ['module1']と書くと、奨励されないアイツ
from mypackage import *
した時にfrom mypackage import * module1.funtion()
module1
が読み込まれる。もし、__all__
もfrom . import module1
も無ければ、import *
でもmodule1
は読み込まれない。とはいえ、
from . import module1
があれば__all__ = ['module1']
は不要なので__all__
は flake8 向けの重複とは言える。もっと良い方法があればコメントください。参考
https://stackoverflow.com/questions/31079047/python-pep8-class-in-init-imported-but-not-used
- 投稿日:2019-08-11T23:30:37+09:00
自作 Package の submodule の import と flake8
21世紀も20年経とうとしている今、Python スクリプトを走らせてスペルミスなどの単純ミスて止まるのは文明的じゃない; flake8 などの静的解析を使おう。それは多くの記事があるのでいいとして、自作の package に flake8 を使うと
F401 'mypackage.module1' imported but unusedみたいなことをいう。その import には意味があるんだけど... という話。
設定
package とは複数のモジュールがひとつのディレクトリに入ったもの:
mypackage/ + __init__.py + module1.py + module2.pyimport mypackageして使う。
module1.py
では関数function1()
が、module2 にはclass Class2
が定義してあるとしよう。問題
import mypackage mypackage.module1.function1() c = mypackage.Class2()のように使うために
__init__.py
にimport mypackage.module1 from mypackage.module2 import Class2と書きたい。すると、
$ flake8 mypackage/*.py mypackage/__init__.py:1:1: F401 'mypackage.module1' imported but unused mypackage/__init__.py:2:1: F401 'mypackage.module2.Class2' imported but unusedと言われる。確かに
__init__.py
では使ってないけど意味はあるんだ。これらの import 文が無いとimport mypackage.module1 from mypackage.module2 import Class2と毎回書かなければならないので、それはナイ。PEP8的な正解は?
解決法1(手っ取り早い方法)
# noqa
とコメントすると flake8 などの linter が無視する。from . import module1 # noqa from .module2 import Class2 #noqa解決法2
やらなくても動くけど、package の公式の機能は
__all__
にリストアップべき。Any backwards compatibility guarantees apply only to public interfaces. Accordingly, it is important that users be able to clearly distinguish between public and internal interfaces.
To better support introspection, modules should explicitly declare the names in their public API using the
__all__
attribute.https://www.python.org/dev/peps/pep-0008/
すなわち、
from . import module1 from .module2 import Class2 __all__ = ['module1', 'Class2']まず、
.
を使えば「このパッケージ」を意味するので、mypackage
と繰り返し書かなくて良い。パッケージ名を変えても修正しなくていいという利点もある。そして__all__
というリストに import したものを書く。すると、unused でなくなったので flake8 さんは満足。
__all__
とはflake8 を満足させるためだけのダミーの変数というわけではなく、一応意味はある。
__init__.py
に import を書かず、__all__ = ['module1']と書くと、奨励されないアイツ
from mypackage import *
した時にfrom mypackage import * module1.funtion()
module1
が読み込まれる。もし、__all__
もfrom . import module1
も無ければ、import *
でもmodule1
は読み込まれない。
from . import module1
があれば__all__ = ['module1']
は不要なので__all__
は重複とは言える。逆に、__all__
だけではimport mypackage
の時には自動 import してくれない。とはいえ、PEP8 に書かれていることなので、他の人も使う package として公開するときはちゃんと__all__
を書くのが良さそう。参考
すべてはここに書いてあった。
https://stackoverflow.com/questions/31079047/python-pep8-class-in-init-imported-but-not-used
- 投稿日:2019-08-11T23:00:31+09:00
Pythonバージョン管理、仮想環境作成、IDLE起動(Mac)
pyenvでインストールできるバージョンを確認
$ pyenv install --listpyenvで Python 3.6.8をインストール
$ pyenv install 3.6.8インストールしたPythonのバージョンを確認
$ pyenv versionsバージョンの切替 グローバル
$ pyenv global 3.6.8バージョンの切替 ローカル
$ pyenv local 3.6.8バージョン確認
$ python --version仮想環境の作成
$ cd <作成したい場所> *作成したいディレクトリに移動 $ python3 -m venv <作成する環境名> *ディレクトリ内に仮想環境を作成作成したディレクトリに入る
$ cd <作成する環境名>仮想環境の有効化
$ source bin/activate仮想環境でIDLEを起動
$ python -m idlelib.idleIDLEを閉じる(ショートカット ⌘+W)
IDLE起動時に以下警告が表示される場合
WARNING: The version of Tcl/Tk (8.5.9) in use may be unstable. Visit http://www.python.org/download/mac/tcltk/ for current information. WARNING: The system preference "Prefer tabs when opening documents" is set to "Always". This will cause various problems with IDLE. For the best experience, change this setting when running IDLE (via System Preferences -> Dock).メッセージのとおり
- Macのシステム環境設定でDockを開き、「書類を開くときはタブで開く:常に」 になっていたら他の項目に変更
- the ActiveState web site から ActiveTcl 8.6 をインストール(インストール後にPython の再インストールが必要)Python の再インストール
$ pyenv uninstall 3.6.8 $ brew info tcl-tk #バージョンが8.6以上か確認 $ pyenv install 3.6.8仮想環境を無効化
$ deactivate参考:
How to launch python Idle from a virtual environment (virtualenv)
- 投稿日:2019-08-11T23:00:31+09:00
Pythonバージョン管理、仮想環境作成、仮想環境でIDLE起動(Mac)
pyenvでインストールできるバージョンを確認
$ pyenv install --listpyenvで Python 3.6.8をインストール
$ pyenv install 3.6.8インストールしたPythonのバージョンを確認
$ pyenv versionsバージョンの切替 グローバル
$ pyenv global 3.6.8バージョンの切替 ローカル
$ pyenv local 3.6.8バージョン確認
$ python --version仮想環境の作成
$ cd <作成したい場所> *作成したいディレクトリに移動 $ python3 -m venv <作成する環境名> *ディレクトリ内に仮想環境を作成作成したディレクトリに入る
$ cd <作成する環境名>仮想環境の有効化
$ source bin/activate仮想環境でIDLEを起動
$ python -m idlelib.idleIDLEを閉じる(ショートカット ⌘+W)
IDLE起動時に以下警告が表示される場合
WARNING: The version of Tcl/Tk (8.5.9) in use may be unstable. Visit http://www.python.org/download/mac/tcltk/ for current information. WARNING: The system preference "Prefer tabs when opening documents" is set to "Always". This will cause various problems with IDLE. For the best experience, change this setting when running IDLE (via System Preferences -> Dock).メッセージのとおり
- Macのシステム環境設定でDockを開き、「書類を開くときはタブで開く:常に」 になっていたら他の項目に変更
- the ActiveState web site から ActiveTcl 8.6 をインストール(インストール後にPython の再インストールが必要)Python の再インストール
$ pyenv uninstall 3.6.8 $ brew info tcl-tk #バージョンが8.6以上か確認 $ pyenv install 3.6.8仮想環境を無効化
$ deactivate参考:
How to launch python Idle from a virtual environment (virtualenv)
- 投稿日:2019-08-11T22:10:42+09:00
Automation Anywhere から PowerShell やPython などの外部プログラムを呼び出す
概要
Automation Anywhere でのボット作成にある程度慣れてくると、外部モジュールを実行してできることを拡張したくなります。たとえば、Automation Anywhere から PowerShell スクリプトの処理を呼び出したり、Python を呼び出して機械学習の処理をさせたり、といったことをしたくなります。これらをどう実行できるかについて整理したいと思います。
前提
- Automation Anywhere
- Enterprise 11.3.2
- Community Edition (2019年8月現在、UIは英語です)
- Windows 10 バージョン1903
Automation Anywhere で実行できる外部モジュール呼び出し方法
まず、Automation Anywhereでどのように外部モジュール呼び出し (=任意の操作の実行) ができるかについて一覧にしてみました。
方法 カテゴリ/コマンド名 概要と特徴 別タスクの呼び出し Task / Run Task Automation Anywhere で作成した別のボットを呼び出す。自分で作成したよく使うアクションリストをサブルーチン的に使うのに便利。引き継ぐローカル変数をダイアログボックスでビジュアルに任意数設定できるので、情報の受け渡しはとても楽。リピート有無や実行スピードも設定可能。 Meta Bot (およびBot Store) Run Logic Meta Bot Designerの権限を持つ人が汎用的につくったボットや、Bot Storeからインストールしたボットを部品として呼び出すのに使う。通常のボットでは使えない .DLL の呼び出しが可能なので、パッケージ内での複雑で汎用的なタスクが可能。Meta Botで定義されているInput / Output パラメータをダイアログボックスでビジュアルに設定可能。 Excelマクロの呼び出し Excel / Run Excel Macro 開いているExcelファイルの中のExcelマクロを引数リストを渡して呼び出せる。 Windowsアプリの呼び出し Open Program/File Windows 実行形式ファイル(.exe)や、これに関連付けられたファイルを指定して実行できます。ファイルの開始ディレクトリや引数も指定可能。さまざまな種類の外部モジュールを最も汎用的に呼び出せる。ただし、グラフィカルユーザーインターフェイス (GUI) を持っている必要がある(持っていなくても実行されるがエラーが出る)。 OS サービスの呼び出し Services / Start Service インストール済みのサービスを開始できる。サービスモジュールを自分でプログラミングすることは少ないと思われるので、この方法はほぼ使わないかも。 VBScript/JScriptの呼び出し Run Script ローカルにある .vbs / .js を指定して実行します。パラメータや戻り値を得るための変数を設定することもできます。コマンドラインユーザーインターフェイス (CUI) のみでも実行可能。 データベース内のスクリプトの呼び出し Database / Run Stored Procedure ストアドプロシージャの関数名/引数を指定して実行できます。引数は任意数追加でき、結果をCSVにエクスポートするオプションもあります。 ウェブページ内のスクリプトの呼び出し Web Recorder / Execute JavaScript Function ウェブページ内のJavaScript関数と、実行結果を返す変数を指定して呼び出せます。 XML内のスクリプトの呼び出し XML / Execute XPath Function XPath Function を実行できます。実行するXPath 式と、戻り値を得る変数を指定します。 SOAP/REST ウェブサービスの呼び出し SOAP Web Service SOAP/REST ウェブサービスのURIを呼び出してサービスを実行して戻り値を変数に返します。必要に応じてヘッダーパラメーターを任意数、出力結果をXMLとして保存、認証がある場合のアカウント情報やクライアント証明書を指定できます。 今回呼び出したいPowerShellスクリプトの場合、Open Program/File アクションを利用するのが良さそうです。
Open Program/File による外部モジュールの呼び出し
Open Program/Fileは以下のダイアログボックスでパラメーターの指定が可能です。
ここに呼び出したいPowerShellスクリプトを指定するのですが、今回は2つの方法を考えてみました。
ちなみに、PowerShellスクリプトは以下のものを用意します。
Hello.ps1Add-Type -Assembly System.Windows.Forms [System.Windows.Forms.MessageBox]::Show("Hello World!!", "こんにちは")方法1: PS1ファイルの関連付けを利用して実行する
PowerShell のスクリプトファイル .ps1 はアイコンを見ればわかる通り、関連付けがされています。
したがって、.ps1 を以下のダイアログボックスの「Program/File パス」に直接指定しても動作するはずです。
これで実行してみると...スクリプトの中身が実行される代わりにメモ帳でスクリプトファイルが開かれてしまいました...
これがなぜだったかを見てみると、実は.ps1のアクションの関連付けは、少なくともWindows 10の環境ではメモ帳にされていることがわかります。
したがって、この関連付けをpowershell.exeに変更すると動作するはずですので、変更してみましょう。
PowerShell.exe は
"%systemroot%\system32\windowspowershell\v1.0\powershell.exe"
にあるので、このファイルを指定します。
また、表の中でも記載した通り、このコマンドはGUIがあるプログラムを前提としており、CUIのプログラムを呼び出した場合は「WaitForInputIdleに失敗しました。プロセスがグラフィック インターフェイスを含んでいない可能性があります。」というエラーが出ます。これは無視できるので、Error Handlingで囲みます。
これで実行すると、以下の通りスクリプトが実行されることがわかります。
(注) デフォルトのセキュリティレベルではPowerShellはスクリプトとして実行が禁止されているので、スクリプトを実行する前にPowerShellコマンド
Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser
を実行しておくようにしましょう。終わったらSet-ExecutionPolicy -ExecutionPolicy Restricted -Scope CurrentUser
でセキュリティを元に戻します。方法2: 「PowerShell実行ファイル」+「引数=PS1ファイル」を指定して実行する
さて、関連付けがされていればファイルを直接「Program/File パス」に指定すれば実行されることはわかりましたが、この関連付けは通常はされていないため、実行形式ファイルと引数としてのスクリプトファイルの2つを指定する方法を試してみたいと思います。
「Program/File パス」にはシステム変数をうまく使って汎用的に
$System(SystemRoot)$\System32\WindowsPowerShell\v1.0\powershell.exe
と記載します。「パラメータ」にはHello.ps1のパスを指定します。
前述のエラー処理も取り入れると、アクションリストは以下のようになります。
これで実行すると、やはり以下の通りスクリプトが実行されることがわかります。
外部モジュールでの処理を待つようにする
ちなみに、今回のOpen Program/File アクションは、外部スクリプト呼出し後、処理を待たずに次のアクションに移ってしまうので、外部スクリプトの結果を待つには、たとえば共通のテキストファイルをコミュニケーション用に用意して、値が書き込まれたら次の処理に移る、などの工夫が必要です。たとえば以下のようにアクションリストを設定してみると、PowerShellでのダイアログボックスで「OK」ボタンをクリックした直後に、アクションリストの次のダイアログボックスが表示されることがわかります。
Hello.ps1Add-Type -Assembly System.Windows.Forms [System.Windows.Forms.MessageBox]::Show("Hello World!!", "こんにちは") "Done`n" > "C:\Automation-Anywhere-Temporary-Folder\result.txt"(おまけ) 同様の方法でPython スクリプトも実行
以上の方法は、PowerShellに限らず、ほかのスクリプトでも応用可能です。Python については、インストーラでセットアップを行うと、.pyファイルがpython.exeに関連付けられているので、そのまま.pyファイルを指定して実行可能です。以下のようなアクションリストとPythonスクリプトを組むことで、同様の処理を行うことができます。(以下はPython 3.6以上での例)
Hello.pyimport pathlib key = input('Hello World!') p_text = pathlib.Path('C:\Automation-Anywhere-Temporary-Folder\result.txt') p_text.write_text('Done')
- 投稿日:2019-08-11T21:29:58+09:00
Python3.8の新機能(3) - Pickleの改良
はじめに
2019年10月にリリースを予定しているPython3.8で新たに加わる変更をPython3.8の新機能 (まとめ)という記事でまとめ始めています。分量のある変更は別記事にしていて、これはその第3弾のPickleの改良についての解説です。
Pickleとは
PickleはPythonのデータシリアライゼーション(データ直列化)のライブラリです。Pythonで扱っているデータをファイルなどに書き出しておきたい場合に、データ構造や型の情報を含めて記録し、読み込み時にはそのファイルにある情報のみで再構成をできるようにする仕組みです。JSONなどとも用途が似通っていますが、Pickleは以下のような特徴を持ちます。
- バイナリフォーマットなのでJSONやXMLのようなテキストフォーマットよりコンパクト
- 一般的な型だけでなくユーザ定義のクラスのインスタンスの直列化も可能
- 標準ライブラリに含まれているので追加のパッケージインストールが不要
- Python専用のフォーマットなので他の言語へのデータ受け渡しには不向き
Python 3.8ではこのPickleにいくつかの変更が加えられています
3.8での変更点
大きく2つの変更点があります。
- デフォルトのプロトコルバージョンが4になった
- プロトコルバージョン5が追加され、out-of-bandデータのサポートが追加された
Pickleは1995年に最初に設計されて以来、これまでも何度か拡張されていてその度にプロトコルのバージョンが上がっています。これまでの最新プロトコルバージョンはPython3.4で導入された4でしたが、デフォルトのプロトコルバージョンはPython3.0で導入された3でした。これが、Python3.8からデフォルトのPickleプロトコルバージョンが4になり性能の改善が期待できます。バージョン4の詳細はPEP 3154にあります。
2つ目のOut-of-bandデータのサポートですが、これはPEP 574で提案された変更によるものです。ちょっと詳しく次の章で見てみたいと思います。
なぜOut-of-bandデータが必要か?
PEP-574によると、Pickleは最近ではファイルへの書き出しというよりも(それらにはより汎用なJSONなどのフォーマットが使われる)、Pythonのプロセス間でのデータのやり取りに使われているようです。そして、データ解析などの応用ではとても大きなデータをPythonで扱う事が増えていて、それらをpickleでシリアライズする時に別手段で(out-of-band)で引きた渡したいという要求があります。
Out-of-bandデータは元々はネットワーク用語で、通常経路(in-band)とは別の経路(わき道)でデータ転送することです。データ経路が詰まっている時でも緊急度の高いコマンドなどを送れるようにする為に使われてました。経路は物理的に別れている場合と単に論理的な分離である場合とがありますが、アプリからは2つの経路があるように見える点では一緒です。そしてPickleの場合、Out-of-bandはシリアライズデータの中に入れずに別の手段での受け渡しになるわけですが、なぜそれが必要なのか? それはできるだけコピーを避けるためです。
データを含めてin-bandでシリアライズ(dump)するとまずその全てを保持するためのバッファを用意してそこにデータをコピーすることになります。そしてそれを受け取った側でもそのシリアライズデータをデシリアライズ(load)する時にまたコピーが発生します。データが小さければそれほど大きな差にはなりませんが、数MBを超えるデータとなると話が変わってきます。それをOut-of-bandでシリアライズデータから切り離して引き渡すことでコピーの回数を減らせる可能性が出てきます。データを値ではなくポインター or 参照で引き渡すイメージですね。
Out-of-bandデータの引き渡し方
まずは比較のためにProtocol Version 4の場合を見ます。
'abc'
を100回繰り返すバイト列をシリアライズしてみます。pickle_test_4.pyimport pickle import pickletools data = bytearray('abc' * 100, 'ascii') pickled = pickletools.optimize(pickle.dumps(data, protocol=4)) print("Serialized:", pickled) print("Deserialized:", pickle.loads(pickled))結果はこうなります。
Serialized: b'\x80\x04\x95J\x01\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x8c\tbytearray\x93B,\x01\x00\x00abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabc\x85R.' Deserialized: bytearray(b'abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabc')
pickle.dumps
でバイト列にシリアライズされたデータがpickle.loads
で元に戻されている事がわかります。シリアライズされたデータの中身が良くわからないのでpickletools.dis(pickled)
で逆アセンブル(?)してみます。0: \x80 PROTO 4 2: \x95 FRAME 330 11: \x8c SHORT_BINUNICODE 'builtins' 21: \x8c SHORT_BINUNICODE 'bytearray' 32: \x93 STACK_GLOBAL 33: B BINBYTES b'abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabc' 338: \x85 TUPLE1 339: R REDUCE 340: . STOP highest protocol among opcodes = 4ほぼコード通りですが、組み込みのbytearrayを使って初期値
b'abcabc...'
からバイト列を生成するというという風になっている様です。Protocol Version 4までは、この初期値の部分がどんなに大きくてもここに格納することになり、シリアライズしたデータはその分大きくなってしまいます。そして生成の際にその分無駄なコピーが発生してしまいます。これをProtocol Version 5の機能を使ってout-fo-band転送してみます。変更点は以下の3つ。
- out-of-band転送したいデータを含むクラスに
__reduce_ex__
メソッドを追加する。pickle.dumps
でbuffer_callback
にコールバック関数を指定する。pickle.loads
でbuffers
に引き渡しのためのバッファを指定する。これだけだと良くわからないので、サンプルのコードで説明します。上記の例は Out-of-band転送する場合にはこの様になります。
pickle_test_5.pyimport pickle import pickletools class barray(bytearray): def __reduce_ex__(self, protocol): return type(self), (pickle.PickleBuffer(self),) buffers = [] def buf_cb(pickle_buffer): data = pickle_buffer.raw().obj buffers.append(data) data = barray('abc' * 100, 'ascii') pickled = pickletools.optimize(pickle.dumps( data, protocol=5, buffer_callback=buf_cb)) print("Serialized:", pickled) print("Deserialized:", pickle.loads(pickled, buffers=buffers)) pickletools.dis(pickled)まずひとつ目の
__reduce_ex__
メソッドですが、それぞれの型をどの様にシリアライズしたいかを定義できて、返り値として、2要素以上のタプルを返します。1要素目がそのデータを再構成する時に呼ばれるオブジェクト(通常はクラスオブジェクト)で、2要素目がその呼び出し引数(初期値)です。通常はデータそのものを2つ目の要素として指定しますが、Out-of-bandにしたい時はそれをpickle.PickleBuffer
型にして返します。なお、bytearrayは組み込みの型で直接メソッドを追加できないので、ここの例ではそれを基底クラスとする新しいクラス
barray
を作り、そこに__reduce_ex__
メソッドを追加しています。そして2つ目の変更点が
dumps()
のbuffer_callback
引数で、ここにPickleBuffer
を引数として取るコールバック関数を指定します。このコールバック関数はdumps()
がデータをシリアライズする過程でPickleBuffer
に遭遇する度に呼ばれますが、ここの例ではOut-of-band転送したいデータをbuffers
という配列に追加していっています。で、3つ目の変更点の
loads()
のbuffers
引数。ここにdumps()
のコールバック関数で設定されたbuffers
という配列を指定します。それを理解するには上記のコードでどのようなシリアライズデータが作成されるのかを見るのが早いと思います。実行結果はこんな風になります。
Serialized: b'\x80\x05\x95\x17\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x8c\x06barray\x93\x97\x85R.' Deserialized: barray(b'abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabc') 0: \x80 PROTO 5 2: \x95 FRAME 23 11: \x8c SHORT_BINUNICODE '__main__' 21: \x8c SHORT_BINUNICODE 'barray' 29: \x93 STACK_GLOBAL 30: \x97 NEXT_BUFFER 31: \x85 TUPLE1 32: R REDUCE 33: . STOP highest protocol among opcodes = 5まずシリアライズされたデータのサイズが全然違います(Version 4が341バイトだったのに対してVersion 5では34バイト)。上記の逆アセンブルされたシリアライズデータを見るとわかると思いますが、初期化データはここには入っていません。代わりに
NEXT_BUFFER
というopcodeが見えます。loads()
はこのopcodeに遭遇する度に引数で与えられた配列からデータを一つずつ取ってきて使います。上手くやればコピーなしでデータの引き渡しができるでしょう。まとめ
これまであまりPickleって使ったことがなかったのですが、今回の変更をきっかけに少し理解が深まりました。ファイルに書き出しておくだけならJSONとかの方が汎用性もあって便利だろうなと思っていましたが、Python間でのデータの受け渡しに、ユーザ定義の型情報も含んだ形で行えるのはちょっと便利かも知れません。今後、なにか作るときの選択肢の一つとして考えたいと思います。
- 投稿日:2019-08-11T20:56:55+09:00
LambdaからLambdaをさっくりデプロイするためのメモ
tl;dr
- 管理ソースが多くなるにつれていちいちコンソールで編集やアップロードが面倒になったのでCIからデプロイで自動化したくなった
- LambdaへのデプロイをLambdaにやらせるのが手っ取り早そう
- CI(zip化してS3デプロイ) -> S3 -> イベントトリガーでLambda起動(デプロイ) -> Lambda関数の更新
用意するもの
- CI(よしなに)
- zip放置用のS3
- デプロイ実行用のLambda関数
- デプロイされるLambda関数
CI(よしなに)
- 好きなものをどうぞ
- ビルド -> zip化 -> S3までデプロイする感じで
import boto3 import glob import os import shutil import mimetypes from datetime import datetime from datetime import timedelta import traceback import json BUILD_DIR = "YOUR_BUILD_DIR" LAMBDA_PROJECT_LIST_JSON = "./YOUR_PROJECT.json" AWS_ACCESS_KEY = "YOUR_AWS_ACCESS_KEY" AWS_ACCESS_SECRET = "YOUR_AWS_ACCESS_SECRET" AWS_S3_BUCKET = "YOUR_AWS_S3_BUCKET" AWS_REGION = "YOUR_AWS_REGION" def run(): # YOUR_BUILD_DIRディレクトリ削除 # YOUR_BUILD_DIRディレクトリ作成 # zipファイル作ってYOUR_BUILD_DIRに突っ込む # s3デプロイ with open(LAMBDA_PROJECT_LIST_JSON,"r") as l: lambda_project_list = json.load(l) try: if (AWS_ACCESS_KEY in os.environ)==False or (AWS_ACCESS_SECRET in os.environ)==False: raise "undefined access_key" except: traceback.print_exception() return session = boto3.session.Session( aws_access_key_id=os.environ[AWS_ACCESS_KEY], aws_secret_access_key=os.environ[AWS_ACCESS_SECRET], region_name=AWS_REGION, ) client = session.client('s3') shutil.rmtree(BUILD_DIR, ignore_errors=True) os.makedirs(BUILD_DIR, exist_ok=True) keys = lambda_project_list.keys() for keyname in keys: val = lambda_project_list[keyname] shutil.make_archive(BUILD_DIR+"/"+val["lambda"], 'zip', root_dir="./"+keyname) file_list = glob.glob(BUILD_DIR+"/*.zip") now = datetime.today() expires = now+timedelta(hours=1) print("DEPLOY:"+AWS_REGION) for val in file_list: mimetype = mimetypes.guess_type(val)[0] fileneme = os.path.basename(val) with open(val,"rb") as f: try: response = client.put_object( ACL='private', Body=f, Key=fileneme, Bucket=AWS_S3_BUCKET, ContentType=mimetype, Expires=expires, ) print("UPLOAD:"+fileneme) except: traceback.print_exception() breakビルドははしょり(懺悔)subprocess使えばいいんじゃなかろうか(適当)
デプロイ実行用のLambda関数
IAMはよしなに付与しましょう
import sys import boto3 import os AWS_REGION = "YOUR_AWS_REGION" def lambda_handler(event, context): # s3に上がったzipを取得 # lambdaへデプロイ if len(event["Records"])==0: return s3_data = event["Records"][0]["s3"] _,ext = os.path.splitext(s3_data["object"]["key"]) # zip以外にアップロードされたものは除外 if ext != ".zip": return function_name = s3_data["object"]["key"].split(".")[0] session = boto3.session.Session( region_name=AWS_REGION, ) client = session.client("lambda") response = client.update_function_code( FunctionName=function_name, S3Bucket=s3_data["bucket"]["name"], S3Key=s3_data["object"]["key"], DryRun=False, ) print(response,file=sys.stderr)
DryRun=False
は必須。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.update_function_codeDryRun (boolean) -- Set to true to validate the request parameters and access permissions without modifying the function code.
zip放置用のS3
- よしなに作成。
- リージョンは
デプロイ実行用のLambda関数
と合わせた方が楽。- バケットの
プロパティ > Events
から PUTでデプロイ実行用のLambda関数
が起動するように設定デプロイされるLambda関数
- よしなに指定。
- リージョンは合わせる
LAMBDA_PROJECT_LIST_JSON
に放り込んでいく課題
- 雑に作ったのでビルドやってないので後日作る(といいなあ)
参考
https://dev.classmethod.jp/cloud/aws/2018-solo-boto3-advent-calendar-day14/
https://qiita.com/asahi0301/items/a88760dd38fe0f5fcee1
- 投稿日:2019-08-11T19:44:12+09:00
Docker for MacでXilinx FPGA用の開発環境を作る
TL;DR
- Xilinx ISEがインストールされたdockerイメージを作成しました。
- Mac上から自動でビルドする環境を整備し、テンプレートとして公開しました。
はじめに
FPGAを販売するXilinx社では、ISE(最近のデバイス用にはVivado)と呼ばれる自社製の開発環境を用意しています。FPGAのコンフィグレーションに必要なBit stream file(
*.bit
)はこのISEを使って生成できますが、逆に言えばXilinx社謹製のISEを使わなければ生成できません。そして、ISEは対応OSがWindowsとLinuxのみなんですよね...見事にMacがハブれれてしまいました。メインで使っている端末がMacということもあり、MacからFPGAの開発ができると嬉しいな...と思いしばらくいじっていたので、備忘録を兼ねて記事にします。以前の開発環境
一年ほど前にFPGAボードの開発をしていた際には、次のような環境で開発していました。
- ターゲットデバイス
- 開発&シミュレーション
- Mac端末
- 言語: VHDL
- エディタ: Visual Studio Code
- テスト用コンパイラ: ghdl
- 実装
- Windows端末
- エディタ&コンパイラ: Xilinx ISE
状況としてはMacの方でVHDLコードを書き、ghdlでコンパイルして挙動を確認します。だいたいいい感じまでできたらソースコードをDropboxなりGithubなりにアップロードして、Windows端末にダウンロードし、Xilinx ISEで作成したプロジェクトにソースコードを加えて、実装用のBit stream fileを生成していました。
なぜこのような面倒な手順を踏んでいたかというと、Windows端末が研究室の端末で、自宅に持ち帰りが難しかったこと、加えてXilinx ISE上でのソースコードの編集がお世辞にもやりやすいとは言えなかった(補完やSyntax Highlightが貧弱だった)ため、ソースの編集はVS Code上でやりたかったという事情があります。Mac上でVMを使うことも考えたのですが、Mac上のストレージの容量が少なく、VMを立てたくもない状況でした。
そんなこんなで、このしち面倒臭い環境でしばらく開発をしており、そのときの目的が達成された後には、もう金輪際FPGAに関わることもないだろうと思っていたわけです。ただ、幸か不幸かまたFPGAに触る機会ができたので、この際Macで実装までできるようにしてやろうと思いまして、環境構築に取り組みました。Mac上でのFPGA開発環境の構築
あらすじは次の通りです。
- Docker for MacでXilinx ISEインストール済みのdockerイメージを作成
- Pythonのinvokeパッケージを使ってコンパイル、テスト環境の構築
以下の手順では次のソフトウェアがインストールされていることを前提にしています。
- Docker for Mac
- Python 3.x
- XQuartz
- invoke (Python package)
- ghdl
docker、XQuartz、Python3、ghdlはHomebrewから、invokeはAnaconda経由で入手しました。invokeはpipでも入手できるようです。
1. Docker for MacでXilinx ISEインストール済みのdockerイメージを作成
~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~
以下の手順でISE Webpack editionをインストールしたものはDocker Hub上でseekworser/ise_webpack
で公開しているため、Webpack版であればdocker pull seekworser/ise_webpackで事足ります。容量が30 GB以上あるので注意してください。
~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~/~dockerを使ってXilinx Vivadoをインストールしている方がいらっしゃったので、その記事を参考にしました。
Xilinx の開発ツールを Docker コンテナに閉じ込めるこの方はVivado HLx 2018.2: All OS installer Single-File Downloadをダウンロードしてきて、インストールまで含めてDockerfileで自動化しているのですが、ISEではSingle file Downloadが存在しないため、泣く泣くインストールは手動で行います。
そこで、まずfull installer for Linuxをダウンロードしてきて、次のようなDockerfileを作成しました。DockerfileFROM ubuntu RUN \ sed -i -e "s%http://[^ ]\+%http://ftp.jaist.ac.jp/pub/Linux/ubuntu/%g" /etc/apt/sources.list && \ apt update && \ apt upgrade -y && \ apt -y --no-install-recommends install \ ca-certificates curl sudo xorg dbus dbus-x11 ubuntu-gnome-default-settings gtk2-engines \ ttf-ubuntu-font-family fonts-ubuntu-font-family-console fonts-droid-fallback lxappearance && \ apt-get autoclean && \ apt-get autoremove && \ rm -rf /var/lib/apt/lists/* && \ echo "%sudo ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers COPY Xilinx_ISE_DS_Lin_14.7_1015_1.tar / RUN mkdir /ise-installer &&\ tar --strip-components 1 -C /ise-installer \ -xvf Xilinx_ISE_DS_Lin_14.7_1015_1.tar &&\ rm /Xilinx_ISE_DS_Lin_14.7_1015_1.tar前半は先のリンクの方から丸パクリで、X11環境のインストールその他です。後半でインストール用のフォルダを
/ise_installer
下に展開しておきます。(ファイル名はバージョンに合わせて変えてください)ここからGUIインストーラを立ち上げ、ISEをインストールします。Docker for MacでX11 GUIを立ち上げる際には次を参考にしました。
Mac+dockerでx11アプリケーションを起動する先ほどのDockerfile、ダウンロードしたインストーラを全て同じフォルダに入れ、さらにhost側でXQuartzを立ち上げた状態で以下のコマンドを実行します。
docker build --no-cache --rm -t ise-installer . xhost + 127.0.0.1 docker run -e DISPLAY=docker.for.mac.localhost:0 --name ise-installed ise-installer /ise-installer/xsetupその後XQuartzでインストーラが起動するため、指示にしたがってインストールを行います。インストール後に、
(host)$ docker commit ise-installed ise-installed (host)$ docker run -i --name ise ise-installed /bin/bash (container)$ rm -rf /ise-installer (container)$ exit (host)$ docker commit ise ise (host)$ docker container rm ise-installer ise-installed ise (host)$ docker image rm ise-installer ise-installedを順に実行し、ISEのインストールされたイメージ
ise
を作成します。2. Pythonのinvokeパッケージを使ってコンパイル、テスト環境の構築
先ほど作成したimageと、Pythonのinvokeパッケージを使用して、コンフィギュレーション用のBit stream fileを作成する環境を作成します。seekworser/ise_project_templateでPapilio pro用に作成したものは公開しています。
ディレクトリ構成は次のようになっています。ise_project_template. ├── Readme.md ├── log/ ├── out/ ├── test/ ├── src/ │ ├── project.yaml │ ├── sample.vhd │ ├── sample_ucf.ucf │ └── test_sample.vhd ├── sample.bit └── tasks.pylogファイルの出力、中間ファイルの出力、ソースコード保管、test用のファイルの出力用のディレクトリとinvoke用のPythonスクリプトが置いてあります。
sample.vhd
がメインのVHDLファイルです。sample.vhdentity sample is -- write ports end sample; architecture behavior of sample is -- write behavior end behavior;また、
sample_ucf.ucf
はFPGAの制約ファイルです。
プロジェクトが変わるごとにMakefile
もといtasks.py
をいじるのは嫌だったので、project.yaml
にコンパイル用のパラメータを格納しておくようにします。ついでに使うFPGAの情報もpart:
以下に書いておきます。project.yamlsrc_files: - file_name: sample.vhd language: vhdl library: work entity_names: - sample test_files: - file_name: test_sample.vhd entity_names: - test_sample - test_sample2 ucf_file: sample_ucf.ucf top_module: sample part: device: xc6slx9 package: tqg144 speed: -2
tasks.py
ではこの情報を読み取って、ビルドやテストなどを行うようコードを書いておきます。例えば次のようなイメージです。tasks.py(一部)import yaml import invoke def get_project_parameter(*keys): param = yaml.load(open(PROJECT_YAML_FILE_NAME, "r+")) for item in keys: param = param[item] return param @invoke.task def bitgen(c): c.run( "docker run --rm -i -v $(PWD):/project seekworser/ise_webpack:latest sh <<_EOT_\n" "cd /project\n" "{ise_base:s}bitgen -w {outdir:s}{top_module:s}.ncd {top_module:s}.bit {outdir:s}{top_module:s}.pcf\n" "mv *.drc {logdir:s}{top_module:s}_drc.log\n" "mv *.bgn {logdir:s}{top_module:s}_bitgen.log\n" "rm -rf *.xwbt _xmsgs xilinx_device_details.xml *.xrpt\n" "_EOT_".format( ise_base=ISE_BASE, top_module=get_project_parameter(TOP_MODULE_KEY), outdir=OUTDIR, logdir=LOGDIR, ) ) return # ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ # some other tasks # ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ISE_BASE = "/opt/Xilinx/14.7/ISE_DS/ISE/bin/lin64/" OUTDIR = "./out/" SRCDIR = "./src/" LOGDIR = "./log/" PROJECT_YAML_FILE_NAME = SRCDIR + "project.yaml" TOP_MODULE_KEY = "top_module"最終的にどうなったか
以上の結果として
invoke buildeのコマンド一発でMacからBit stream fileが生成できるようになりました。ISEの使いにくいエディタを使う必要もなくなったため、開発がよりスムーズに進むようになりました。
- 投稿日:2019-08-11T19:03:18+09:00
pythonで好きなユーザーのツイート情報をグラフに描画
前回までのあらすじ
前回の記事では、スクレイピングを使って取得したいユーザーの投稿情報を取得するところまでをまとめました。
今回は、その投稿情報をもとにして投稿の詳細情報を描画するところまでまとめます。
前回の記事はこちらhttps://qiita.com/Tokyo/items/a12ef262268c48be8ff1
描画したデータ
以下の25個の項目をグラフで表現しました。
- 各時間ごとの投稿数、いいねの平均/中央値、リツイートの平均/中央値
- 各曜日ごとの投稿数、いいねの平均/中央値、リツイートの平均/中央値
- 文字数ごとの投稿数、いいねの平均/中央値、リツイートの平均/中央値
- 改行数ごとの投稿数、 いいねの平均/中央値、リツイートの平均/中央値
- メディア数ごとの投稿数、 いいねの平均/中央値、リツイートの平均/中央値
環境は、Jupyter notebook上で行いました。
スクリプトについて
簡単な統計情報
まずは小手調べに簡単な統計情報を出してみました。
平均値と中央値のいずれもを出している理由は、私のツイート数が少なすぎて何度かバズった外れ値の影響を考慮したからです。
pandasで取得するデータは前回の記事で紹介した方法で取得済みの設定です。easy.pyimport pandas as pd import matplotlib.pyplot as plot df = pd.read_csv('otnk23.csv',encoding='utf-8') #いいねの平均 print(df['likes'].mean()) #いいねの中央値 print(df['likes'].median()) #リツイートの平均 print(df['retweets'].mean()) #リツイートの中央値 print(df['retweets'].median())時間帯ごとの分析
time.py#時間帯ごとの投稿数 df['hour'] = df['date'].dt.hour hour_size =df.groupby(pd.Grouper(key='hour' )).size().sort_index() hour_size.plot.bar() plot.title('hour_posts_size') plot #時間ごとのいいねの中央値 likes_hour_groupby_median = df.groupby('hour')[['likes']].median() likes_hour_groupby_median.plot.bar() plot.title('hour_likes_median') plot #時間ごとのいいねの平均 likes_hour_groupby_mean = df.groupby('hour')[['likes']].mean() likes_hour_groupby_mean.plot.bar() plot.title('hour_likes_mean') plot #時間ごとのリツイートの中央値 retweets_hour_groupby_median = df.groupby('hour')[['retweets']].median() retweets_hour_groupby_median.plot.bar() plot.title('hour_retweets_median') plot #時間ごとのリツイートの平均 retweets_hour_groupby_mean = df.groupby('hour')[['retweets']].mean() retweets_hour_groupby_mean.plot.bar() plot.title('hour_retweets_mean') plot曜日ごとの分析
day.py#曜日ごとの投稿数 df['day_of_week'] = df['date'].dt.day_name() cats = [ 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] day_size =df.groupby(pd.Grouper(key='day_of_week' )).size().reindex(cats) day_size.plot.bar() plot.title('day_posts_size') plot #曜日ごとのいいねの中央値 likes_day_groupby_median = df.groupby('day_of_week')[['likes']].median().reindex(cats) likes_day_groupby_median.plot.bar() plot.title('day_likes_median') plot #曜日ごとのいいねの平均 likes_day_groupby_mean = df.groupby('day_of_week')[['likes']].mean().reindex(cats) likes_day_groupby_mean.plot.bar() plot.title('day_lkes_mean') plot #曜日ごとのリツイートの中央値 retweets_day_groupby_median = df.groupby('day_of_week')[['retweets']].median().reindex(cats) retweets_day_groupby_median.plot.bar() plot.title('day_retweets_median') plot #曜日ごとのリツイートの平均 retweets_day_groupby_mean = df.groupby('day_of_week')[['retweets']].mean().reindex(cats) retweets_day_groupby_mean.plot.bar() plot.title('day_retweets_mean') plotメディア数ごとの分析
media.py#メディア数ごとのサイズ size_media_groupby_median = df.groupby('media').size() size_media_groupby_median.plot.bar() plot.title('media_num_size') plot #メディア数ごとのいいねの中央値 likes_media_groupby_median = df.groupby('media')[['likes']].median() likes_media_groupby_median.plot.bar() plot.title('media_likes_median') plot #メディア数ごとのいいねの平均値 likes_media_groupby_mean = df.groupby('media')[['likes']].mean() likes_media_groupby_mean.plot.bar() plot.title('media_likes_mean') plot #メディア数ごとのリツイートの中央値 retweets_media_groupby_median = df.groupby('media')[['retweets']].median() retweets_media_groupby_median.plot.bar() plot.title('media_retweets_median') plot #メディア数ごとのリツイートの平均値 retweets_media_groupby_mean = df.groupby('media')[['retweets']].mean() retweets_media_groupby_mean.plot.bar() plot.title('media_retweets_mean') plot改行数ごとの分析
block.pydf_text = df['text'] kaigyo = [] for text in df_text: count = text.count('\n\n') kaigyo += [count] df['kaigyo'] = kaigyo #改行(スペースの数)ごとのサイズ likes_kaigyo_groupby_size = df.groupby('kaigyo').size() likes_kaigyo_groupby_size.plot.bar() plot.title('block_size') plot #改行ごとのいいね数の中央値 likes_kaigyo_groupby_median = df.groupby('kaigyo')[['likes']].median() likes_kaigyo_groupby_median.plot.bar() plot.title('block_likes_median') plot #改行ごとのいいね数の平均 likes_kaigyo_groupby_mean = df.groupby('kaigyo')[['likes']].mean() likes_kaigyo_groupby_mean.plot.bar() plot.title('block_likes_mean') plot #改行ごとのリツイート数の中央値 retweets_kaigyo_groupby_median = df.groupby('kaigyo')[['retweets']].median() retweets_kaigyo_groupby_median.plot.bar() plot.title('block_retweets_median') plot #改行ごとのリツイート数の平均 retweets_kaigyo_groupby_mean = df.groupby('kaigyo')[['retweets']].mean() retweets_kaigyo_groupby_mean.plot.bar() plot.title('block_retweets_mean') plot文字数ごとの分析
text_num.pydf_text_num_about = [] df_text_num = df['text_num'] for text in df_text_num: df_text_num_about.append(text//10*10) df['about_num'] = df_text_num_about #文字数ごとのサイズ about_num_groupby_size = df.groupby('about_num').size() about_num_groupby_size.plot.bar() plot.title('text_size') plot #文字数ごとのいいね数の中央値 likes_about_num_groupby_median = df.groupby('about_num')[['likes']].median() likes_about_num_groupby_median.plot.bar() plot.title('text_size_likes_median') plot #文字数ごとのいいね数の平均値 likes_about_num_groupby_mean = df.groupby('about_num')[['likes']].mean() likes_about_num_groupby_mean.plot.bar() plot.title('text_size_likes_mean') plot #文字数ごとのリツイート数の中央値 retweets_about_num_groupby_median = df.groupby('about_num')[['retweets']].mean() retweets_about_num_groupby_median.plot.bar() plot.title('text_size_retweets_median') plot #文字数ごとのリツイート数の平均値 retweets_about_num_groupby_mean = df.groupby('about_num')[['retweets']].mean() retweets_about_num_groupby_mean.plot.bar() plot.title('text_size_retweets_mean') plot感想と結果
せっかくなので、私の投稿の分析結果も載せておきます。
これは、文字数ごとの分析、改行数ごとの分析、メディア数ごとの分析の結果です。
テキスト数に見るからに文字数が多いほうがバズりやそうですね。とはいえ、母数が少ないので何とも言えませんが。。。
それから、改行も0の場合はあまり伸びていませんね。これは中央値、平均値共にその傾向があるので、一定の有意性があるのかもしれません。
曜日に関しては、日曜日の平均が高いのでバズった投稿が集中しているのかもしれません。また時間に関しても、夜になるにつれエンゲージメントが伸びています。
私の投稿数とフォロワーが少ないので伸びにくい傾向が出てしまいましたが、時々気になるデータが出てきているようです。
皆さんも自分のアカウントや気になるアカウントで試してみてください。
- 投稿日:2019-08-11T18:31:50+09:00
Raspberry Pi Zero + Edge TPU搭載のタミヤ戦車の製作
製作の動機
- ブラッドピット主演の映画「Fury」の戦車戦闘シーンを再現したい。
- 趣味のディスプレイ用プラモデルをraspberry piで動かしたい。
- ラジコン、電子回路等は未経験だけど、加工は得意なのでなんとかなりそう。
- GoogleのEdge TPUで物体検出すれば砲塔回転方向を割り出せるはず。
- 車体はとりあえずライントレースでまずは作ってみよう。
貫いたこだわり
- スペースに余裕のある1/16スケールは高価でかつ車種も少ないので、安価で車種も豊富だけどスペースに全く余裕のない1/35スケールでなんとかした。
- 今後のリアルなスケールモデル製作(塗装、ディティールアップ等)に繋げるために、Edge TPUのUSBケーブルの一部とカメラレンズ以外は全て車体内部および車体底面に配置し、「いかにも手作りラズパイマシン」な外観を徹底的に回避。
- 外観を損ねないために、航続距離を犠牲にしてでも絶対に単4電池の分散配置で動かす。
- 安全面を考慮してリチウムイオンバッテリーは使わない。
動画では、とりあえずデモとしてマウスを認識するかを見ています。
(動画中での砲塔上面の電池はただの重りで、現在はとれています。)
物体検出には、バナナロボのコードを一部改変させていただいたものを利用しています。
project-banana-robo
ライントレースは快調ですが、砲塔の指向先の精度が...。
コンセント電源で固定した戦車で視野をディスプレイ表示(バナナロボのコードのデモ)
紆余曲折の末、とりあえず1ヶ月でそれなりにはなったかな、と。今後はさらにEdge TPUのモデルを再学習し、敵のタイガー戦車を認識させ、クロマキー合成で特撮するのが究極的な目標です。
先は長い...。
外観
砲塔内部には、サーボモーターの動力を噛み合わせるためにコの字型に合わせたプラ板と、釣具店に売っている重りを適宜配置。
天板を外した中身の配置は以下の通り。
- 車体前面に収まるように加工したギアボックス
- ミニ四駆用の馬力のあるタイプの走行モーター(トルクチューン2)
- その直上に、大電流を要する走行モーターを制御できるDCモータードライバー(黒色基盤)
- 更にその直上に、raspberry pi電源用に5Vに昇圧できるコンバータ(青色基盤)
- 走行モーター駆動用の単4電池を車体左右、raspberry pi直下、車体外底部に直列で計4つ
- 車体後部に見える白いブレッドボードは、左右のライントレース用赤外線センサーの配線をまとめているもので、可変抵抗器(青色)で黒のテープと白の画用紙を区別するセンサー感度を調節しています。
raspberry piの下には、
- オレンジ色のトルクチューンモーターの後部
- 走行モーター用の単4電池のうちの一つ
- Edge TPU本体から伸びるケーブル
- それをraspberry piに繋ぐ短い自作ケーブル
- 手前に埋もれている砲塔回転用サーボモーター
- サーボモーターの後ろに、ライントレース用の電源配線をまとめているブレッドボード
が格納されています。
車体底面に赤外線センサー、電池、Edge TPUを配置。車体の左右に見える小さなボックスのようなものがライントレース用の赤外線センサーです。車体前面の単4電池2つがraspberry pi本体への電源供給用で、これでEdge TPUと砲塔回転用サーボモーターへの電流もまかないます。
電子回路全体図
模式的にFritzingで作成した回路図です。
電子回路もFritzingも初めてなので、配線には間違いや無駄があるかもしれません。
実際にはブレッドボードは車体に格納するために切り詰めています。
可変抵抗は500kΩ前後に調整することで、ラインの白黒を判別できました。
なお、モータードライバーの該当アイコンがFritzingに見つからなかったので、この模式図では代用品を利用しています。
使用したパーツ(主要部品のみ)
メイン
ブラピ搭乗のM4A3E8戦車
raspberry pi zero wh スターターキット
PowerBoost 1000 Basic - 5V USB Boost(昇圧コンバータ)車体制御系
トルクチューン2モーター
ツインモーターギアボックス
Cytron 4V~16V DCモータードライバ(2チャンネル、連続3A)
赤外線センサー(フォトリフレクタ)
100Ω抵抗
可変抵抗器(1MΩ)
電圧・電流計測用のテスタ砲塔制御系
サーボモーター(SG90)
Raspberry Pi Zero用スパイカメラ
Google Coral Edge TPU USB Accelerator
20cmのUSB type A - type C 通信ケーブル(型番は失念)
USB(microB)コネクタ
USB2.0コネクタ
シールドスリムロボットケーブル
ダイソーのグルーガンとグルースティックその他
秋月電子通商、千石電商、Amazon、SWITCH SCIENCE、タミヤには大変お世話になりました。上記以外にも数多の電子パーツ、ホビー用品、工具等を用いています。
制御の流れ
- ライントレースの要領で走行用DCモーターを制御
- 走行中に砲塔上面のカメラとEdge TPUで対象物を物体検出
- 検出した対象物の方向を計算
- 2.の計算の間に車体が進んだ記録を元に、対象物の方向のズレを粗く予測
- 3.と4.を加味した方向に砲塔旋回用サーボモーターを制御
- 1.と並行して2.から5.の繰り返し
コード
物体検出には、バナナロボのコードを一部改変させていただいたものを利用しています。
project-banana-robo
冒頭のGIF動画の通り、砲塔の回転先の精度が甘く、まだまだ改良の余地があります。
物体検出には2-3s/frameほどかかるので、その間の車体の動きを"forward", "right", "left"の変数に記録しておき、その補正も加味して砲塔を動かそうとしていますがなかなか...。
何か良い案がありましたらお願いします。concurrent_motor.pyfrom edgetpu.detection.engine import DetectionEngine from PIL import Image from PIL import ImageDraw from PIL import ImageFont import time, io, picamera, pigpio, threading import numpy as np import wiringpi as pi forward = 0 right = 0 left = 0 # https://github.com/waveform80/picamera/issues/383 def _monkey_patch_picamera(): original_send_buffer = picamera.mmalobj.MMALPortPool.send_buffer def silent_send_buffer(zelf, *args, **kwargs): try: original_send_buffer(zelf, *args, **kwargs) except picamera.exc.PiCameraMMALError as error: if error.status != 14: raise error picamera.mmalobj.MMALPortPool.send_buffer = silent_send_buffer # Read labels.txt file provided by Coral website def _read_label_file(file_path): with open(file_path, 'r', encoding="utf-8") as f: lines = f.readlines() ret = {} for line in lines: pair = line.strip().split(maxsplit=1) ret[int(pair[0])] = pair[1].strip() return ret # Main loop def detect_motor(): global forward, right, left model_filename = "mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite" label_filename = "coco_labels.txt" engine = DetectionEngine(model_filename) labels = _read_label_file(label_filename) CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 fnt = ImageFont.load_default() # set up for servo motor servo_pin = 18 pigp = pigpio.pi() pulsewidth = 1450 pigp.set_servo_pulsewidth(servo_pin, pulsewidth) # To view preview on VNC, # https://raspberrypi.stackexchange.com/a/74390 with picamera.PiCamera() as camera: _monkey_patch_picamera() camera.resolution = (CAMERA_WIDTH, CAMERA_HEIGHT) camera.framerate = 15 camera.rotation = 180 _, width, height, channels = engine.get_input_tensor_shape() print("{}, {}".format(width, height)) overlay_renderer = None camera.vflip = True camera.hflip = True camera.start_preview() try: stream = io.BytesIO() for foo in camera.capture_continuous(stream, format='rgb', use_video_port=True): # Make Image object from camera stream stream.truncate() stream.seek(0) input = np.frombuffer(stream.getvalue(), dtype=np.uint8) input = input.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) image = Image.fromarray(input) # image.save("out.jpg") # Make overlay image plane img = Image.new('RGBA', (CAMERA_WIDTH, CAMERA_HEIGHT), (255, 0, 0, 0)) draw = ImageDraw.Draw(img) draw.line((CAMERA_WIDTH//2, 0, CAMERA_WIDTH//2, CAMERA_HEIGHT), width=1) draw.line((3*CAMERA_WIDTH//8, 0, 3*CAMERA_WIDTH//8, CAMERA_HEIGHT), width=1) draw.line((5*CAMERA_WIDTH//8, 0, 5*CAMERA_WIDTH//8, CAMERA_HEIGHT), width=1) # Run detection forward = 0 right = 0 left = 0 start_ms = time.time() results = engine.DetectWithImage(image, threshold=0.2, top_k=5) elapsed_ms = (time.time() - start_ms)*1000.0 obj = None if results: obj = next((x for x in results if labels[x.label_id] == "mouse"), None) if obj: box = obj.bounding_box.flatten().tolist() box[0] *= CAMERA_WIDTH box[1] *= CAMERA_HEIGHT box[2] *= CAMERA_WIDTH box[3] *= CAMERA_HEIGHT draw.rectangle(box, outline='red') draw.text((box[0], box[1]-10), labels[obj.label_id], font=fnt, fill="red") obj_width = box[2] - box[0] obj_center = box[0] + obj_width // 2 draw.point((obj_center, box[1] + (box[3] - box[1])//2)) # if object on the right side of the sight if (obj_center - CAMERA_WIDTH // 2) > 0: print("TURN R") mv_deg = 54 * ((obj_center - CAMERA_WIDTH // 2) / CAMERA_WIDTH) # if gunport in the right direction of the vehicle head if pulsewidth <= 1450: mv_deg += int(54 * (forward / 105 + (right-left)/35) * np.sin(((1450-pulsewidth)/950) * (np.pi/2))) # if gunport in the left direction of the vehicle head elif pulsewidth > 1450: mv_deg += int(54 * (-forward / 105 + (right-left)/35) * np.sin(((pulsewidth-1450)/900) * (np.pi/2))) pulsewidth -= mv_deg * 10 print("mv_deg {}, pulsewidth {}".format(mv_deg, pulsewidth)) pigp.set_servo_pulsewidth(servo_pin, pulsewidth) # if object on the left side of the sight elif (obj_center - CAMERA_WIDTH // 2) < 0: print("TURN L") mv_deg = 54 * ((obj_center - CAMERA_WIDTH // 2) / CAMERA_WIDTH) # if gunport in the right direction of the vehicle head if pulsewidth <= 1450: mv_deg += int(54 * (forward / 105 + (right-left)/35) * np.sin(((1450-pulsewidth)/950) * (np.pi/2))) # if gunport in the left direction of the vehicle head elif pulsewidth > 1450: mv_deg += int(54 * (-forward / 105 + (right-left)/35) * np.sin(((pulsewidth-1450)/900) * (np.pi/2))) pulsewidth -= mv_deg * 10 print("mv_deg {}, pulsewidth {}".format(mv_deg, pulsewidth)) pigp.set_servo_pulsewidth(servo_pin, pulsewidth) camera.annotate_text = "{0:.2f}ms".format(elapsed_ms) if not overlay_renderer: overlay_renderer = camera.add_overlay( img.tobytes(), size=(CAMERA_WIDTH, CAMERA_HEIGHT), layer=4, alpha=255) else: overlay_renderer.update(img.tobytes()) finally: if overlay_renderer: camera.remove_overlay(overlay_renderer) camera.stop_preview() def sensor_motor(): # set for IR sensors PIR_PIN1 = 23 PIR_PIN2 = 24 pi.wiringPiSetupGpio() pi.pinMode(PIR_PIN1, pi.INPUT) pi.pinMode(PIR_PIN2, pi.INPUT) # set for driving motors motor_right_pin1 = 16 motor_right_pin2 = 20 motor_left_pin1 = 5 motor_left_pin2 = 6 pi.pinMode(motor_right_pin1, pi.OUTPUT) pi.pinMode(motor_right_pin2, pi.OUTPUT) pi.pinMode(motor_left_pin1, pi.OUTPUT) pi.pinMode(motor_left_pin2, pi.OUTPUT) # set the speed ranges for motors pi.softPwmCreate(motor_right_pin1, 0, 100) pi.softPwmCreate(motor_right_pin2, 0, 100) pi.softPwmCreate(motor_left_pin1, 0, 100) pi.softPwmCreate(motor_left_pin2, 0, 100) # get the motors speed 0 pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, 0) # raise up the speed speed = 0 while ( speed <= 35 ): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) time.sleep(0.1) speed += 5 # Black line trace global forward, right, left while True: if (pi.digitalRead(PIR_PIN1) == pi.HIGH) & (pi.digitalRead(PIR_PIN2) == pi.HIGH): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) forward += 1 elif (pi.digitalRead(PIR_PIN1) == pi.HIGH) & (pi.digitalRead(PIR_PIN2) == pi.LOW): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, 100) pi.softPwmWrite(motor_left_pin2, 100) right += 1 elif (pi.digitalRead(PIR_PIN1) == pi.LOW) & (pi.digitalRead(PIR_PIN2) == pi.HIGH): pi.softPwmWrite(motor_right_pin1, 100) pi.softPwmWrite(motor_right_pin2, 100) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) left += 1 else: while ( speed >= 5 ): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) time.sleep(0.1) speed -= 5 break time.sleep(0.1) # Get back time.sleep(3) while ( speed <= 35 ): pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, speed) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, speed) time.sleep(0.1) speed += 5 pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, speed) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, speed) time.sleep(2) # Stop while ( speed >= 5 ): pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, speed) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, speed) time.sleep(0.1) speed -= 5 pi.softPwmWrite(motor_right_pin1, 100) pi.softPwmWrite(motor_right_pin2, 100) pi.softPwmWrite(motor_left_pin1, 100) pi.softPwmWrite(motor_left_pin2, 100) time.sleep(3) if __name__ == "__main__": thread_1 = threading.Thread(target=detect_motor) thread_1.start() time.sleep(20) thread_2 = threading.Thread(target=sensor_motor) thread_2.start()直近の今後の課題
- 砲塔回転の精度向上
- 目的の対象物の学習データ生成(webスクレイピングまたは観賞用回転ディスプレイを利用)
- 上記データを用いたモデルの再学習
作ったのは米軍の戦車なので、色々な独軍の戦車を認識するモデルをできれば作りたいなと思っています。ただ、スクレイピングで十分にデータが集まるかどうか。
砲塔天蓋のカメラからは自車の砲身が写るので、そのカメラで回転ディスプレイ上の対象物の連続コマ撮りの方がいいのか。でもそれで強いモデルができるのか。背景はどうしよ。などなど課題が山積していますが地道に続けていきます。参考文献・リンク
電子部品ごとの制御を学べる! Raspberry Pi 電子工作 実践講座 改訂第2版
1/35 M1スーパーシャーマン RC化 ① : 朴念仁の艱苦
Raspberry Pi Zero用の小型電源とUSB WiFiモジュール用USBコネクタを自作する
project-banana-robo
- 投稿日:2019-08-11T18:31:50+09:00
Raspberry Pi Zero + Edge TPU搭載のタミヤ自走戦車の製作
製作の動機
- ブラッドピット主演の映画「Fury」の戦車戦闘シーンを再現したい。
- 趣味のディスプレイ用プラモデルをraspberry piで動かしたい。
- ラジコン、電子回路等は未経験だけど、加工は得意なのでなんとかなりそう。
- GoogleのEdge TPUで物体検出すれば砲塔回転方向を割り出せるはず。
- 車体はとりあえずライントレースでまずは作ってみよう。
貫いたこだわり
- スペースに余裕のある1/16スケールは高価でかつ車種も少ないので、安価で車種も豊富だけどスペースに全く余裕のない1/35スケールでなんとかした。
- 今後のリアルなスケールモデル製作(塗装、ディティールアップ等)に繋げるために、Edge TPUのUSBケーブルの一部とカメラレンズ以外は全て車体内部および車体底面に配置し、「いかにも手作りラズパイマシン」な外観を徹底的に回避。
- 外観を損ねないために、航続距離を犠牲にしてでも絶対に単4電池の分散配置で動かす。
- 安全面を考慮してリチウムイオンバッテリーは使わない。
動画では、とりあえずデモとしてマウスを認識するかを見ています。
(動画中での砲塔上面の電池はただの重りで、現在はとれています。)
物体検出には、バナナロボのコードを一部改変させていただいたものを利用しています。
project-banana-robo
ライントレースは快調ですが、砲塔の指向先の精度が...。
コンセント電源で固定した戦車で視野をディスプレイ表示(バナナロボのコードのデモ)
紆余曲折の末、とりあえず1ヶ月でそれなりにはなったかな、と。今後はさらにEdge TPUのモデルを再学習し、敵のタイガー戦車を認識させ、クロマキー合成で特撮するのが究極的な目標です。
先は長い...。
外観
砲塔内部には、サーボモーターの動力を噛み合わせるためにコの字型に合わせたプラ板と、釣具店に売っている重りを適宜配置。
天板を外した中身の配置は以下の通り。
- 車体前面に収まるように加工したギアボックス
- ミニ四駆用の馬力のあるタイプの走行モーター(トルクチューン2)
- その直上に、大電流を要する走行モーターを制御できるDCモータードライバー(黒色基盤)
- 更にその直上に、raspberry pi電源用に5Vに昇圧できるコンバータ(青色基盤)
- 走行モーター駆動用の単4電池を車体左右、raspberry pi直下、車体外底部に直列で計4つ
- 車体後部に見える白いブレッドボードは、左右のライントレース用赤外線センサーの配線をまとめているもので、可変抵抗器(青色)で黒のテープと白の画用紙を区別するセンサー感度を調節しています。
raspberry piの下には、
- オレンジ色のトルクチューンモーターの後部
- 走行モーター用の単4電池のうちの一つ
- Edge TPU本体から伸びるケーブル
- それをraspberry piに繋ぐ短い自作ケーブル
- 手前に埋もれている砲塔回転用サーボモーター
- サーボモーターの後ろに、ライントレース用の電源配線をまとめているブレッドボード
が格納されています。
車体底面に赤外線センサー、電池、Edge TPUを配置。車体の左右に見える小さなボックスのようなものがライントレース用の赤外線センサーです。車体前面の単4電池2つがraspberry pi本体への電源供給用で、これでEdge TPUと砲塔回転用サーボモーターへの電流もまかないます。
電子回路全体図
模式的にFritzingで作成した回路図です。
電子回路もFritzingも初めてなので、配線には間違いや無駄があるかもしれません。
実際にはブレッドボードは車体に格納するために切り詰めています。
可変抵抗は500kΩ前後に調整することで、ラインの白黒を判別できました。
なお、モータードライバーの該当アイコンがFritzingに見つからなかったので、この模式図では代用品を利用しています。
使用したパーツ(主要部品のみ)
メイン
ブラピ搭乗のM4A3E8戦車
raspberry pi zero wh スターターキット
PowerBoost 1000 Basic - 5V USB Boost(昇圧コンバータ)車体制御系
トルクチューン2モーター
ツインモーターギアボックス
Cytron 4V~16V DCモータードライバ(2チャンネル、連続3A)
赤外線センサー(フォトリフレクタ)
100Ω抵抗
可変抵抗器(1MΩ)
電圧・電流計測用のテスタ砲塔制御系
サーボモーター(SG90)
Raspberry Pi Zero用スパイカメラ
Google Coral Edge TPU USB Accelerator
20cmのUSB type A - type C 通信ケーブル(型番は失念)
USB(microB)コネクタ
USB2.0コネクタ
シールドスリムロボットケーブル
ダイソーのグルーガンとグルースティックその他
秋月電子通商、千石電商、Amazon、SWITCH SCIENCE、タミヤには大変お世話になりました。上記以外にも数多の電子パーツ、ホビー用品、工具等を用いています。
制御の流れ
- ライントレースの要領で走行用DCモーターを制御
- 走行中に砲塔上面のカメラとEdge TPUで対象物を物体検出
- 検出した対象物の方向を計算
- 2.の計算の間に車体が進んだ記録を元に、対象物の方向のズレを粗く予測
- 3.と4.を加味した方向に砲塔旋回用サーボモーターを制御
- 1.と並行して2.から5.の繰り返し
コード
物体検出には、バナナロボのコードを一部改変させていただいたものを利用しています。
project-banana-robo
冒頭のGIF動画の通り、砲塔の回転先の精度が甘く、まだまだ改良の余地があります。
物体検出には2-3s/frameほどかかるので、その間の車体の動きを"forward", "right", "left"の変数に記録しておき、その補正も加味して砲塔を動かそうとしていますがなかなか...。
何か良い案がありましたらお願いします。concurrent_motor.pyfrom edgetpu.detection.engine import DetectionEngine from PIL import Image from PIL import ImageDraw from PIL import ImageFont import time, io, picamera, pigpio, threading import numpy as np import wiringpi as pi forward = 0 right = 0 left = 0 # https://github.com/waveform80/picamera/issues/383 def _monkey_patch_picamera(): original_send_buffer = picamera.mmalobj.MMALPortPool.send_buffer def silent_send_buffer(zelf, *args, **kwargs): try: original_send_buffer(zelf, *args, **kwargs) except picamera.exc.PiCameraMMALError as error: if error.status != 14: raise error picamera.mmalobj.MMALPortPool.send_buffer = silent_send_buffer # Read labels.txt file provided by Coral website def _read_label_file(file_path): with open(file_path, 'r', encoding="utf-8") as f: lines = f.readlines() ret = {} for line in lines: pair = line.strip().split(maxsplit=1) ret[int(pair[0])] = pair[1].strip() return ret # Main loop def detect_motor(): global forward, right, left model_filename = "mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite" label_filename = "coco_labels.txt" engine = DetectionEngine(model_filename) labels = _read_label_file(label_filename) CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 fnt = ImageFont.load_default() # set up for servo motor servo_pin = 18 pigp = pigpio.pi() pulsewidth = 1450 pigp.set_servo_pulsewidth(servo_pin, pulsewidth) # To view preview on VNC, # https://raspberrypi.stackexchange.com/a/74390 with picamera.PiCamera() as camera: _monkey_patch_picamera() camera.resolution = (CAMERA_WIDTH, CAMERA_HEIGHT) camera.framerate = 15 camera.rotation = 180 _, width, height, channels = engine.get_input_tensor_shape() print("{}, {}".format(width, height)) overlay_renderer = None camera.vflip = True camera.hflip = True camera.start_preview() try: stream = io.BytesIO() for foo in camera.capture_continuous(stream, format='rgb', use_video_port=True): # Make Image object from camera stream stream.truncate() stream.seek(0) input = np.frombuffer(stream.getvalue(), dtype=np.uint8) input = input.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) image = Image.fromarray(input) # image.save("out.jpg") # Make overlay image plane img = Image.new('RGBA', (CAMERA_WIDTH, CAMERA_HEIGHT), (255, 0, 0, 0)) draw = ImageDraw.Draw(img) draw.line((CAMERA_WIDTH//2, 0, CAMERA_WIDTH//2, CAMERA_HEIGHT), width=1) draw.line((3*CAMERA_WIDTH//8, 0, 3*CAMERA_WIDTH//8, CAMERA_HEIGHT), width=1) draw.line((5*CAMERA_WIDTH//8, 0, 5*CAMERA_WIDTH//8, CAMERA_HEIGHT), width=1) # Run detection forward = 0 right = 0 left = 0 start_ms = time.time() results = engine.DetectWithImage(image, threshold=0.2, top_k=5) elapsed_ms = (time.time() - start_ms)*1000.0 obj = None if results: obj = next((x for x in results if labels[x.label_id] == "mouse"), None) if obj: box = obj.bounding_box.flatten().tolist() box[0] *= CAMERA_WIDTH box[1] *= CAMERA_HEIGHT box[2] *= CAMERA_WIDTH box[3] *= CAMERA_HEIGHT draw.rectangle(box, outline='red') draw.text((box[0], box[1]-10), labels[obj.label_id], font=fnt, fill="red") obj_width = box[2] - box[0] obj_center = box[0] + obj_width // 2 draw.point((obj_center, box[1] + (box[3] - box[1])//2)) # if object on the right side of the sight if (obj_center - CAMERA_WIDTH // 2) > 0: print("TURN R") mv_deg = 54 * ((obj_center - CAMERA_WIDTH // 2) / CAMERA_WIDTH) # if gunport in the right direction of the vehicle head if pulsewidth <= 1450: mv_deg += int(54 * (forward / 105 + (right-left)/35) * np.sin(((1450-pulsewidth)/950) * (np.pi/2))) # if gunport in the left direction of the vehicle head elif pulsewidth > 1450: mv_deg += int(54 * (-forward / 105 + (right-left)/35) * np.sin(((pulsewidth-1450)/900) * (np.pi/2))) pulsewidth -= mv_deg * 10 print("mv_deg {}, pulsewidth {}".format(mv_deg, pulsewidth)) pigp.set_servo_pulsewidth(servo_pin, pulsewidth) # if object on the left side of the sight elif (obj_center - CAMERA_WIDTH // 2) < 0: print("TURN L") mv_deg = 54 * ((obj_center - CAMERA_WIDTH // 2) / CAMERA_WIDTH) # if gunport in the right direction of the vehicle head if pulsewidth <= 1450: mv_deg += int(54 * (forward / 105 + (right-left)/35) * np.sin(((1450-pulsewidth)/950) * (np.pi/2))) # if gunport in the left direction of the vehicle head elif pulsewidth > 1450: mv_deg += int(54 * (-forward / 105 + (right-left)/35) * np.sin(((pulsewidth-1450)/900) * (np.pi/2))) pulsewidth -= mv_deg * 10 print("mv_deg {}, pulsewidth {}".format(mv_deg, pulsewidth)) pigp.set_servo_pulsewidth(servo_pin, pulsewidth) camera.annotate_text = "{0:.2f}ms".format(elapsed_ms) if not overlay_renderer: overlay_renderer = camera.add_overlay( img.tobytes(), size=(CAMERA_WIDTH, CAMERA_HEIGHT), layer=4, alpha=255) else: overlay_renderer.update(img.tobytes()) finally: if overlay_renderer: camera.remove_overlay(overlay_renderer) camera.stop_preview() def sensor_motor(): # set for IR sensors PIR_PIN1 = 23 PIR_PIN2 = 24 pi.wiringPiSetupGpio() pi.pinMode(PIR_PIN1, pi.INPUT) pi.pinMode(PIR_PIN2, pi.INPUT) # set for driving motors motor_right_pin1 = 16 motor_right_pin2 = 20 motor_left_pin1 = 5 motor_left_pin2 = 6 pi.pinMode(motor_right_pin1, pi.OUTPUT) pi.pinMode(motor_right_pin2, pi.OUTPUT) pi.pinMode(motor_left_pin1, pi.OUTPUT) pi.pinMode(motor_left_pin2, pi.OUTPUT) # set the speed ranges for motors pi.softPwmCreate(motor_right_pin1, 0, 100) pi.softPwmCreate(motor_right_pin2, 0, 100) pi.softPwmCreate(motor_left_pin1, 0, 100) pi.softPwmCreate(motor_left_pin2, 0, 100) # get the motors speed 0 pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, 0) # raise up the speed speed = 0 while ( speed <= 35 ): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) time.sleep(0.1) speed += 5 # Black line trace global forward, right, left while True: if (pi.digitalRead(PIR_PIN1) == pi.HIGH) & (pi.digitalRead(PIR_PIN2) == pi.HIGH): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) forward += 1 elif (pi.digitalRead(PIR_PIN1) == pi.HIGH) & (pi.digitalRead(PIR_PIN2) == pi.LOW): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, 100) pi.softPwmWrite(motor_left_pin2, 100) right += 1 elif (pi.digitalRead(PIR_PIN1) == pi.LOW) & (pi.digitalRead(PIR_PIN2) == pi.HIGH): pi.softPwmWrite(motor_right_pin1, 100) pi.softPwmWrite(motor_right_pin2, 100) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) left += 1 else: while ( speed >= 5 ): pi.softPwmWrite(motor_right_pin1, speed) pi.softPwmWrite(motor_right_pin2, 0) pi.softPwmWrite(motor_left_pin1, speed) pi.softPwmWrite(motor_left_pin2, 0) time.sleep(0.1) speed -= 5 break time.sleep(0.1) # Get back time.sleep(3) while ( speed <= 35 ): pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, speed) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, speed) time.sleep(0.1) speed += 5 pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, speed) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, speed) time.sleep(2) # Stop while ( speed >= 5 ): pi.softPwmWrite(motor_right_pin1, 0) pi.softPwmWrite(motor_right_pin2, speed) pi.softPwmWrite(motor_left_pin1, 0) pi.softPwmWrite(motor_left_pin2, speed) time.sleep(0.1) speed -= 5 pi.softPwmWrite(motor_right_pin1, 100) pi.softPwmWrite(motor_right_pin2, 100) pi.softPwmWrite(motor_left_pin1, 100) pi.softPwmWrite(motor_left_pin2, 100) time.sleep(3) if __name__ == "__main__": thread_1 = threading.Thread(target=detect_motor) thread_1.start() time.sleep(20) thread_2 = threading.Thread(target=sensor_motor) thread_2.start()直近の今後の課題
- 砲塔回転の精度向上
- 目的の対象物の学習データ生成(webスクレイピングまたは観賞用回転ディスプレイを利用)
- 上記データを用いたモデルの再学習
作ったのは米軍の戦車なので、色々な独軍の戦車を認識するモデルをできれば作りたいなと思っています。ただ、スクレイピングで十分にデータが集まるかどうか。
砲塔天蓋のカメラからは自車の砲身が写るので、そのカメラで回転ディスプレイ上の対象物の連続コマ撮りの方がいいのか。でもそれで強いモデルができるのか。背景はどうしよ。などなど課題が山積していますが地道に続けていきます。参考文献・リンク
電子部品ごとの制御を学べる! Raspberry Pi 電子工作 実践講座 改訂第2版
1/35 M1スーパーシャーマン RC化 ① : 朴念仁の艱苦
Raspberry Pi Zero用の小型電源とUSB WiFiモジュール用USBコネクタを自作する
project-banana-robo
- 投稿日:2019-08-11T18:10:10+09:00
【読切】非同期処理の沼~Vue.js Django編~
あらすじ
2019年8月10日。私はVue.jsとDjango REST Frameworkを使い、ある業務システムの画面開発を担当していた。
フレームワークなんて代物はまだ触り始めて2,3か月。とは言えどこを触れば何が変わるか多少は理解してきている自負もあった。
そんな良くも悪くも弾みのついていた私を戒めるかのように、単体テスト中、そいつは突然現れた。出会い
『更新中の…ステータスが…… エラーコード:409』
「(動作確認では何度も正常に作動していた機能開発...そんなはずは...)」
NG判定とエラー内容をテスト記録として記述する。"""
HTTP 409 Conflict はリクエストが現在のサーバーの状態と競合したことを示すステータスコード。競合は PUT メソッドを使用したリクエストのレスポンスで最も発生しやすい。例えば、サーバーにすでに存在しているファイルよりも古いバージョンのファイルをアップロードした際に409の応答が返され、バージョン管理システムの競合が発生する可能性がある。
"""
409 Conflict困った困った。なけなしの知識と経験であれやこれやと思考した。
そして画面の再描画後、再度そいつは現れた。『更新中の…ステータスが…… エラーコード:409』
非同期処理の沼
「ここか。」
答えに辿り着くのに長くはかからなかった。heavyHoge.ts// post(url, data, loading) // url : 呼び出し先のURL // data : REST側に送りたいデータ // loading : 処理終了までローディング画面を表示 // 重ための処理 private async heavyProcess(): Promise<void> { const payload = '画面から送る何かのデータ群'; this.$http.post(url='hoge-url1', data={'payload': payload}, loading=false); this.$http.post(url='hoge-url2', data={'payload': payload}, loading=false); // 画面の更新処理 this.onShown(); }ボタンを押下して、この処理が走っている間に他の作業も進められるようにしたい。数十秒~数分なんて流石に待っていられない。
そのような声は当然にあり、この処理は上記の通り非同期処理で実装していた。
ここでの2つのhttp.post(以下、処理①と処理②)は類似処理を行っているが、具体的には下記の処理を行っている。1.画面内のデータを取得しREST側へPOST
2.取得したデータの条件が合えば、外部APIへ有料の関連データを取得
3.種々のCRUD処理を行う更にエラーの発生パターンについて調べたところ、大きく下記の2つであった。
1.処理①が上記1~3を終えて、戻り値を返して処理②を行おうとした直後
2.処理①のみor処理②のみが走るように画面側で予め条件設定し、1度目の処理を終えた直後に同じ処理のトリガーを引いた直後つまり、処理量の多さにCRUD処理が実際には完了しきいっていない状態の時に再び処理①②を行うことで
それを示すように awaitを付記するとloading画面は処理終了まで続くがエラーは消える。(当然っちゃ当然だが)heavyHoge.ts// post(url, data, loading) // url : // data : 何かのデータ群 // loading : 処理終了までローディング画面を表示 // 重ための処理 private async heavyProcess(): Promise<void> { const payload = '画面から送る何かのデータ群'; await this.$http.post(url='hoge-url1', data={'payload': payload}, loading=false); await this.$http.post(url='hoge-url2', data={'payload': payload}, loading=false); // 画面の更新処理 this.onShown(); }とにかく、今回のエラーの発生要因やその背景なんかを整理すると
・外部APIが有料ということもあり、連続or多数の本チャンデータ取得パターンの確認を怠った。
・1度のボタン押下で行う非同期処理を2つに分けてしまった。
(※片方は他画面で使用しているViewに飛んでいた為、このような実装になった)
・REST側で各処理を行うViewを呼び出す為の、共通クラスを用意しなかった。
(※やってることは2個目と同じ)しかし、上記の課題を全てクリアして当初の設計通り実装しようとしても、まだエラーの発生パターン2が解決しない。
そう、ユーザーが短時間に連続で同じ処理を行おうとした時だ。完全に非同期処理の沼に嵌まってしまったのである。2019年8月11日。27歳の誕生日を迎えた今日、未だ解決策は見つかっていない。
To Be Continued
- 投稿日:2019-08-11T17:48:22+09:00
[EV3DEV Python] Buttonクラス
Buttonクラス
インテリジェントブロックボタンに関するクラス
ev3devにおけるボタンの名前
wait_for_bump()
やwait_for_pressed()
で指定するボタンには以下の6つのいずれかが入る
1. enter
2. up
3. down
4. right
5. left
6. backspaceボタンが 離れた/押された/押されて離された まで待つ
・wait_for_bump(buttons,timeout_ms=None)
指定されたボタンが指定した時間内に押されて離される(バンプ)まで待つ。
時間内にボタンがバンプされたらTrue
,そうでなければFalse
を返す。
引数 説明 デフォルト値 buttons(stringもしくはlist) ボタンの名前(2つ以上の場合はリストで指定) timeout_ms(float) 処理を中断する待ち時間(ミリ秒) None button_wait_for_bump.py#!/usr/bin/env python3 # クラスのインポート from ev3dev2.button import Button from ev3dev2.sound import Sound # インスタンス化 my_button = Button() my_sound = Sound() # 左ボタンが押されて離される(バンプ)まで待つ my_button.wait_for_bump('left') my_sound.beep()・wait_for_pressed(buttons,timeout_ms=None)
指定されたボタンが指定した時間内に押し込まれるまで待つ。
時間内にボタンが押されたらTrue
,押されていなかったらFalse
を返す。button_wait_for_pressed.py#!/usr/bin/env python3 # インポート from ev3dev2.button import Button from ev3dev2.sound import Sound # インスタンス化 my_button = Button() my_sound = Sound() # 上ボタンと下ボタンが押されるまでまつ my_button.wait_for_pressed(['up','down']) my_sound.beep()・wait_for_pressed(buttons,timeout_ms=None)
指定されたボタンが指定した時間内に離されるまで待つ。
時間内にボタンが離されたらTrue
,離されなかったらFalse
を返す。button_wait_for_released.py#!/usr/bin/env python3 # インポート from ev3dev2.button import Button from ev3dev2.sound import Sound # インスタンス化 my_button = Button() my_sound = Sound() # 右ボタンが離されるまでまつ my_button.wait_for_released('right') my_sound.beep()いずれかのボタンが押されているかどうか
・any()
いずれかのボタンが押されていたら
True
何も押されていなけらばFalse
を返すbutton_any.py#!/usr/bin/env python3 # インポート from ev3dev2.button import Button from ev3dev2.sound import Sound import time # インスタンス化 my_button = Button() my_sound = Sound() # ずっと繰り返す while True: # もしボタンが押されていたら if my_button.any(): my_sound.speak("PUSHED",espeak_opts="-a 200 -p 99") break # whileループから抜ける else: time.sleep(0.01)○○ボタンが押されているかどうか
・up/down/left/right/enter/backspace
ボタンが押されているかどうかbool型で返される。指定したボタンが押されてたら
True
を返す。button_push.py#!/usr/bin/env python3 # インポート from ev3dev2.button import Button from ev3dev2.sound import Sound import time # インスタンス化 my_button = Button() my_sound = Sound() # ずっと繰り返す while True: # もし上ボタンが押されてたら if my_button.up: my_sound.speak("UP",espeak_opts="-a 200 -p 99") break # whileループから抜ける else: time.sleep(0.01)これらはプロパティとして実装されている。プロパティは読み込み書き込みを制限できる変数みたいなものという理解で良い
イベントハンドラー
イベントハンドラー・・・特定のイベントが発生した時にあらかじめ登録されている処理を実行する事(ボタンの場合、ボタンが押されたり離されたりした時に呼び出される関数)
・process(new_state=None)
最後にprocess関数を呼び出した時と現在の各ボタンの状態とを比較し、状態が変わったボタンのイベントハンドラーに登録された関数を実行する。
引数 説明 デフォルト値 new_state リファレンスに情報なし(基本的な使い方の時は省略して良い) None ButtonEventHandler.py#!/usr/bin/env python3 # ButtonEventHandler.py 押されたボタンの名前を喋らせる from time import sleep from ev3dev2.button import Button from ev3dev2.sound import Sound # インスタンス化 my_button = Button() my_sound = Sound() # 左ボタンが押された時の処理を関数で宣言 def left(state): if state: my_sound.speak("left") # 右ボタンが押された時の処理を関数で宣言 def right(state): if state: my_sound.speak("right") # 上ボタンが押された時の処理を関数で宣言 def up(state): if state: my_sound.speak("up") # 下ボタンが押された時の処理を関数で宣言 def down(state): if state: my_sound.speak("down") # 決定ボタンが押された時の処理を関数で宣言 def enter(state): if state: my_sound.speak("enter") # 各ボタンが押された時に呼びだす関数をそれぞれ紐づける my_button.on_left = left my_button.on_right = right my_button.on_up = up my_button.on_down = down my_button.on_enter = enter while True: # 前にprocess()を呼び出した状態と比較して、今の状態が違う場合 # 上で紐づけられた関数を実行する my_button.process() sleep(0.01)
- 投稿日:2019-08-11T17:41:29+09:00
共起関係を力学モデルで描画
Twitterの投稿文の共起関係を分析して力学モデルで表示してみました。
単なる数値の羅列ではなく、グラフ表示することで視覚的にイメージできるようにしてみました。
本稿では次の日付のTwitter投稿文を対象としました。
- 2011年3月11日
- 2019年7月18日
共起とは
自然言語処理における共起とは、ある単語Aとある単語Bが同時に出現する場合に、単語Aと単語Bは共起関係にあるといいます。共起関係にある単語は何らかの意味的な近さがあるとみなすことができます。
ただし、単語Aと単語Bが極端に離れているとその関係性は薄れてしまうので、たとえば、単語n件離れている場合などと定義することもあります。本稿では、Twitterの投稿(=高々140文字)を対象とするので、1投稿中に出現した単語同士に共起関係があることにします。(改行や句読点は無視して、同一投稿にある単語は共起関係だと判断します)
力学モデルとは
n件の単語の関係(例えば共起頻度を元に計算された共起強度)からノード間の距離を定義して、2次元上にグラフ表現しようとしても、正確に描画することはできません。どこかにゆがみが出てきてしまいます。
それを可能な限り元の関係性を保持して2次元上に表現する方法の一つとして力学モデルによる描画があります。
単語間の関係の強さに対して、ばね・電磁力・重力などによる計算を施して、安定する位置を決めています。
詳しくは次の説明などを参照してください対象とするデータ
ここで扱うのはTwitterのAPI「Sample realtime Tweets」を使って取得したデータです。(任意のユーザのタイムラインに出てきたデータではありません)
全Tweetの一部(1%以下)のデータを収集することができます。定量的な分析には向きませんが、定性的な分析をするには十分な量のデータを得ることができます。
収集には、次のコードを使っています。APIの公式ドキュメントは次になります
このAPIでは世界中のデータが流れてきます。日本語の情報を見たいので、日本語だけを抜き出します。日本語文を抜き出しす方法は次の記事を参照してください。
形態素解析
以下のモジュールを使用しました。辞書は最終的にはneologdを使用しました。
- mecab-0.996
- mecab-ipadic-2.7.0-20070801
- mecab-ipadic-neologd (2019-07-22版)
形態素解析システムMeCabのインストールに関しては次の記事が参考になります。
共起データの作成手順
- 「Sample realtime Tweets」APIでTwitterの投稿を収集しておきます。
※Streamデータなので後から特定日時のデータを収集することはできません。
※本稿では事前に取得していたデータを使っています。
- 得られたデータから本文のみを抜き出します。
さらに、本文に含まれる不要な情報(@で始まるユーザ名、URL、タグなど)を除去します。 これらが残ったままだと形態素解析結果にノイズが残ってしまい、正しい共起情報が抽出できません。
※アルファベット文字列を強引に形態素解析した結果として出現する単語(未知語/未登録語)は、Twitterの投稿文140文字に対してかなりの割合を占めることになるので無視できません。可能な限り除去します。
- 各行(各投稿)ごとに形態素解析を行い、名詞のみを取り出します。また、名詞の中でも不要なもの(数詞など)は除外します。そのほかに明らかに誤解析になる単語があれば、除外しておきます。
- 単語の頻度、共起の頻度を求めます。
- 共起強度を示す値を計算します。
例えば、出現頻度1000件の単語同士の共起頻度100件と、出現頻度1万件の単語同士の共起頻度100件を同列に扱うわけにはいきません。そこで、単純な共起頻度ではなく、共起強度を示す指標としてJaccard係数を採用しました。
また、すべての情報をグラフ表示しても雑然とするだけなので、閾値を設定してそれより小さい値は表示対象外とします。単語の頻度や共起強度で判定しています。
※この辺りは、対象とするコーパスや解決すべき問題によって、個別の調整が必要になります。
- JSONデータに変換し、Javascriptに埋め込みます。
力学モデルで描画
ここではd3.jsのForce-Directed Graphを使います。
d3.jsの具体的な使い方は、次を参考にしました※なお、d3.jsのバージョンの違いによって、力学モデルの仕様が違うので注意が必要です。
ソースコード
ソースコードは次に格納してあります。
描画例
20110311
2011年3月11日14時46分18秒(日本時間)に東日本大震災が発生しました。
Twitter上では、その直前までいつもと同じやり取りが行われていましたが、その時刻以降内容は一変します。
上記のグラフを見てもわかるようにほかの情報が入ってくる余地はありません。
いくつかのクラスタに分かれているのが見て取れます。これは、数値列眺めただけではわからない情報で、可視化することによって認識できたことです。もちろん、これらを機械的にクラスタ分析する手法は存在しますが、本稿では扱いません。20190718
2019年7月18日の午前10時半ごろ、京都アニメーション放火事件が発生しました。
こちらは、地震のように直接体感できる事象ではないので、多くの人たちはニュース情報として耳にしたのだろうと推測できます。
グラフを見ると、明らかなクラスタが存在しています。他方のクラスタは、連日発信されている情報のようで、前後の日付でも見ることができました。備考
現状、対象単語を出現頻度の上位300件に限定しています。もう少し単語数を増やすことで、他のクラスタも見えてくるかもしれません。
また、連日同じような投稿が頻出しているような場合には、何らかの操作で除外した方がよいのかもしれません。このあたりをまた考えてみたいと思います。
- 投稿日:2019-08-11T16:27:42+09:00
KerasでIrisを深層学習してAPIで公開する
これは何?
- 前の記事で学習したモデルを使って、アヤメの予測をAPIとして公開する手順
- APIはGoogle Cloud Platform(以下GCP)のAppEngine上にデプロイする
- ソースコード: GitHub
環境
- Python3系のスタンダード環境のAppEngine
- Keras
構成
- AppEngineにデプロイしたAPIサーバーへリクエストすると予測結果がJSONで返るだけ
準備
- GCPのアカウントを登録しておく
- GCPのプロジェクトもつくっておく
- 前の記事を参考にAppEngineにデプロイするフォルダを作成する
├── app.yaml # アプリケーション定義ファイル ├── gunicorn_conf.py # gunicorn設定ファイル ├── index.yaml ├── main.py # 今回のメイン処理 ├── requirements.txt # インストールするパッケージ定義 ├── static │ ├── index.html │ ├── model.pkl # 学習済みモデル │ └── scaler.pkl # 標準化オブジェクト └── templates └── index.html
- 前回のJupyter Notebookで標準化と学習モデルを保存するように修正する
# ニューラルネットワークで使いやすいようにデータを整形する from sklearn.preprocessing import StandardScaler # 特徴量の標準化 scaler = StandardScaler() scaler.fit(iris.data) X = scaler.transform(iris.data)import pickle # 標準化オブジェクトの保存 with open("./scaler.pkl", 'wb') as f: pickle.dump(scaler, f) # 学習済みモデルの保存 with open("./model.pkl", 'wb') as f: pickle.dump(model, f)
- 修正後にnotebookを実行し上記のファイルを作成する
- AppEngine用ファルダにある
static
フォルダにファイルをコピーする実装
- 以下は要所のみ
app.yamlinstance_class: F2
- デフォルトの設定だとTensorFlowのモデルの読込で
Exceeded soft memory limit of 256 MB with 310 MB after servicing 0 requests total. Consider setting a larger instance class in app.yaml.
のようなエラーが発生するので、インスタンスタイプを1つ上げるrequirements.txtnumpy scikit-learn setuptools ~= 41.0 tensorflow keras
- KerasのバックエンドはTensorFlowを使っているので関連パッケージをインストールするように定義する
- setuptoolsはバージョン指定しないと古いのが優先されてしまう
main.pysepal_length = request.params.get("sepal_length") sepal_width = request.params.get("sepal_width") petal_length = request.params.get("petal_length") petal_width = request.params.get("petal_width") # TODO: validation data = [sepal_length, sepal_width, petal_length, petal_width]
- GETでのリクエストパラメータを変数に代入する
- 各変数は学習時の特徴量となっている
sepal_length がく片の長さ sepal_width がく片の幅 petal_length 花びらの長さ petal_width 花びらの幅 main.py# 標準化 X = scaler.transform(np.array([data]))
- 学習時に標準化をしているので、予測時にもリクエストからの特徴量を標準化する
main.py# 学習済みモデルの読込 file_name = "./static/model.pkl" with open(file_name, 'rb') as f: model = pickle.load(f) # 予測 y = model.predict(X, batch_size=1)
- pickleでシリアライズした学習済みモデルをデシリアライズして予測する
デプロイ
local
- localでサーバーを起動する
$ dev_appserver.py app.yaml
- パッケージのインストールに時間がかかるので待つ
- 起動したらcURLでHTTPリクエストを投げてみる
$ curl -X GET 'http://localhost:8080?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2'{"label": 0, "accuracy": 0.9960159659385681}
- みたいなレスポンスが返ってきたらOK
AppEngine
- GCP SDKで認証する
$ gcloud auth login
- デプロイするプロジェクトを指定する
$ gcloud config set project <プロジェクトID>
- AppEngineへデプロイする
$ gcloud app deploy
- デプロイが完了したらcURLでHTTPリクエストを投げてみる
$ curl -X GET 'http://<AppEngineのエンドポイント>?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2'{"label": 0, "accuracy": 0.9960159659385681}
- 同様にレスポンスが返ってきたらOK
- 投稿日:2019-08-11T16:13:37+09:00
MorletWaveletの簡単な歴史と一般モールスについて
本記事について
フーリエ変換から、CWT前提にしたGabor/Morlet/GMWまでをまとめます。
ただし、当然僕の理解の範囲内かつ、解析屋さんの視点での話です。
初めの方はフーリエの基礎っぽい所ですが、言いたいのはMorlet→GMW辺りの話です。まず、オイラーの公式があった
18世紀のなかば、レオンハルト・オイラー先生がオイラーの公式を発見しました。
$$e^{ix}=cosx + isinx$$
これは数ある数学の公式の中でも最も美しい公式の一般化であり、人類の宝であります。
よく見て下さい。複素数、指数関数、三角関数が一つの式の中にシンプルな形で存在します。
研ぎ澄まされたかのような美しさの中に、豊かな応用が活きる素晴らしい式です。
wikipedia オイラーの公式
そして、いかにその公式が美しいかは下記に熱弁されています。
wikipedia オイラーの等式
この式はフーリエ変換関連の全ての前提となります。
故に、波形解析やる初心者には僕は必ず「オイラーの公式を崇めよ?」と伝えています。そしてフーリエ変換が生まれた
ジョセフ・フーリエ先生という人がフーリエ変換を19世紀はじめ頃発見したそうです。
波をsinとcosの足し合わせだけで表現するというやつですな。
よほどやばい波(例えば、延々と増大し続けるとか)じゃなければフーリエ変換で全部表せます。
これ、オイラーの公式から導出できるんです。超簡単に言うと、波(永遠に続くやつ限定)を
波(振幅X時間) → フーリエ級数(周波数X複素数)
という風にフーリエ級数という形にするものです。
始めっから複素級数だったかどうかは僕は知らないですが、複素数というのが重要です。
大前提の正規直交基底
何故フーリエ変換が成立するのでしょうか?
それは、各々の周波数ごとのsin、cosが、全部直交(掛け算すると0)するから成立するのです。
つまり、無限に続くaHzのcos波とbHzのcos波は畳み込み積分(大雑把に言うと掛け算)すると
0になるという性質があるのです。
何故この条件が必要なのかというと、理屈上は周波数ごとに「分離」させる必要があるわけで、
もしそれぞれの周波数が「互いに関係が残っている」のであれば、分離できたとは言えないからです。
「細けえこたぁいいんだよ!」という人も居るかもですが、
逆変換とかも成立しなくなるのはあまりうれしくないですよね?応用:窓フーリエ
フーリエ変換は元来永遠に続く波限定という超絶面倒くさい性質を持っています。
なら、波の一部を区切って、それが永遠に続くと仮定して計算すれば
有限の波のフーリエ変換が算出できますね?簡単だね?しかし、落とし穴があります。一部の波を切り取ったら切り取り面が
低周波から超高周波までの合算として解釈されてしまうのです。
切り取り面は常に0でなければおかしくなります。
そのため、窓関数という物を掛け算して両端を0にするのが定石となっています。さて、このように一部切り出してフーリエ変換したとして…短く切った場合に
解析結果(周波数解像度)が正確じゃなくなる事は直感的に分かるかと思います。
一方、長く切った場合はその「長い時間での解析」しか出来ず、時間が荒くなります。
このように、あっちを立てればこっちが立たないのを不確定性原理といいます。この、窓関数問題を解決したのが本記事のテーマであるwavelet変換ですが、後述します。
スペクトル
フーリエ変換すると、複素数が算出されます。
複素数というのは極座標で表せます。
$$acosx+iasinx=r(\theta)$$
つまり、rというのは絶対値と言うか、ノルムみたいなものです。
このスペクトルには色々使いみちがあります。
この赤い線の角度が$\theta$、長さが$a$応用:パワースペクトル
上記rの2乗は高校物理によるとエネルギーに比例するのは自明です。
(物理屋さんにブチ殺される発言)
つまり、rの二乗を求めさえすればその波のエネルギー的なやつを計算できます。
そうやって、波全体のパワーを周波数ごとに計算したものが
パワースペクトル密度(PSD)と言われるやつです。応用:クロススペクトル
他の波との関係性を計算したい場合は、
それぞれのスペクトルを掛け算してあげればいいです。
この時、位相を逆にして掛け算してあげると位相の差が出たりします。Wavelet変換とは
前置きが長かったですね。でも、まだ前置きが続きます。
窓フーリエは元の波を時間ごとに区切って有限の波の解析をしていました。
ですが、これでは「時間ごとの波の解析」の粒度が凄く低くなります。
波は移ろいゆくものですから、出来れば瞬間ごとの波の性質をみたいですよね?
何が悪いのか?そもそも、sinやcosが無限に続く波であるのが悪いのです。
フーリエ変換に使うsinやcosを有限の波にしてやればいいんです。大雑把に言うと
「短い波を掛け算する事で時間ごとの周波数を見ていけるんだぜ!」
的なものです。さて、フーリエ変換の前提として全ての周波数は互いに直交
(掛け算をすると0になる。要するに無関係。)というのがありますが、
wavelet変換にもそのような前提があります。
ここからWaveletの歴史が始まります。微分不可能なWavelet
20世紀初頭にAlfréd Haar先生という数学者がHaar Waveletというのを発明しました。
直交な関数…の中で一番単純なのはこれです。
以下をご覧ください。
全然sinやcosと似てないじゃないか!複素数ですら無いじゃないか!
とお思いでしょう。カクカクしています。
しかし、こいつは紛れもなく小さな波であり、無難に楽にWavelet変換をすることが出来るのであります。
自然界の波の解析には向かないことも多いけれど、もちろん、HaarにはHaarの良さ1があるのです。
ちなみに、Haarの場合は畳み込みとかより速いアルゴリズムがあります。GaborWaveletの誕生
HaarのWaveletはめっちゃカクカクしています。
でも、実際の所、自然界の波ってそんなにカクカクしていますか?そうでもないですよね。
そこでDenis Gaborという人がGaborWaveletという原始的なWaveletを考え出しました。まずは、以下のWaveletの式と図をご覧ください。
$$F(t) = c\sigma \pi^{\frac{-1}{4}}e{\frac{-1}{2}t^2}e^{i\sigma t}$$
青が実軸、橙が虚軸です。そう、ガウシアンな関数をオイラーの公式に畳み込んだやつです。
シンプルでわかりやすく、一見するととても美しい式ですね!✨…これをMorlet waveletと解説している記事はそれなりにあります。
そのように解説している論文もあります。実際、条件によっては大差ありません。
しかし、初心者のために敢えて言います。
初心者はMorlet Waveletではなく、MorletWaveletの先祖と思いましょうこのWaveletモドキは実は条件によっては実用面では遜色ないのですが、
一つの大きな問題をはらんでいます。この波は正規直交基底ではないのです。
周波数解析をした場合、逆変換が存在できないし、なんだか色々おかしくなります。MorletWaveletとは
1980年台に、これを憂いた人たちが手を加え、正規直交にしてくれました。
$$F(t) = c\sigma \pi^{\frac{-1}{4}}e{\frac{-1}{2}t^2}(e^{i\sigma t}-\kappa \sigma)$$
括弧の中に引き算が入りました。正規直交のMorlet Waveletです。2
この魔法の引き算によってMorletWaveletは正規直交基底になるのです。でも、これ、図は綺麗でも式の見た目的に凄く気持ち悪いですよね?
実際に$\sigma$を小さい値にしてみるとガウシアンな曲線がグチャッと潰れて
実軸小さすぎで虚軸が大きすぎな実に気持ち悪い状態になります。
その結果、パラメータによっては波の真ん中よりも周辺の方が過大評価されてしまいます。
青が絶対値、橙が実軸、緑が虚軸です。
フーリエ変換に時間軸にガウシアンな関数を畳み込んで時間軸を加え、
それを正規直交基底に書き直すという自然な発想なのに、
こうなってしまっては何のためにガウシアンにしたのかさっぱりわからないですね?気持ち悪さ対策
$\sigma$を大きく取ればMorletWaveletとGaborWaveletは近似され、ほぼ重なります。
下のプロットを見ても明らかでしょう。
時間解像度を犠牲にすれば全然問題ないね!
実際、僕も$\sigma=7$のMorletWaveletを愛用しており、実用上全く問題ないです。
良かったね!解散!
↓$\sigma$が7のGaborとMorletの比較。…でも、根本的な解決になっていませんね。
そもそも$\sigma$が小さい場合はどうすれば良いのでしょうか…。
この場合は我慢してMorletWaveletを使うか、そうでなければ別のガウシアンに依存しない
Waveletを用いるところだと思います。この世には他にも色々なWaveletがあります。
フーリエ逆変換を使って算出するWavelet
実は、フーリエ逆変換すると算出されるWaveletというのが数種類発見されています。
なんて変態的な導出の仕方なんでしょう!と思います。
これらの利点は挙げろと言われると難しいのですが、
Morletと比べて真ん中が凹まないやつが発見されていますから、そこが利点かも?例えば2000年に入ってから知られるようになった一般モールスWavelet3というWaveletがあります。
(Morseはモールスっていう読み方で正しいのかどうかは分からないので詳しい人教えて下さい)既にこれを脳波解析に応用している論文もあります。定義は
$$\hat{F}(w) = sign(\omega) \alpha_{\beta\gamma}\omega^\beta e^{-\omega^\gamma}$$
という式…ではなく、これのフーリエ逆変換が一般モールスWaveletです。
計算上はFFTやるならフーリエ変換の手間が省けてお得ですね!
MorletWaveletとそっくりです。(パラメータいじって似せました)ちなみに、このwaveletは$\beta$と$\gamma$で調整するんですが、
$\beta$が低かったらちびまる子ちゃんの永沢君みたいに
真ん中が尖った波になります。
永沢君みたいなwaveletが欲しかった人には朗報なんですかね?
(周波数解像度は超絶悪くなります)俺のWaveletパッケージ
いかがだったでしょうか?(←一度やってみたかった)
さて、上記のWaveletは検索してもpythonパッケージを見つけられませんでした。
(探し方が悪かった?)
matlabの世界にはあるんですが、ぼくそんなにお金持ちじゃない…
というわけで、MorseWaveletをpythonで実装してみました。(←こういう宣伝もやってみたかった)
https://github.com/uesseu/nin_wavelets
上記のHaar以外は全部このパッケージで出力しました。
下記はそのコードが吐いた図です。
いや、ほんとGaborとMorletとGMW、条件が合えばそっくりですね…まだ か な り 若いリポジトリなので、多分色んなバグはありますし、
今後破壊的な変更もありえますが、これで一旦は
4の倍数のサンプリング周波数の波ならCWTまでは出来ます。4mne-pythonのEpochsを食べさせる事も出来ます。えらいっ!
こんなパッケージ作りにマジになっちゃってどうするの? 完
計算コストとか、アルゴリズムのシンプルさとか。そもそも、用途が違うんですよね。 ↩
John Ashmead (2012). "Morlet Wavelets in Quantum Mechanics". Quanta. 1 (1): 58–70. arXiv:1001.0250. doi:10.12743/quanta.v1i1.5. ↩
Olhede, S. C., and A. T. Walden. “Generalized morse wavelets.” IEEE Transactions on Signal Processing, Vol. 50, No. 11, 2002, pp. 2661-2670. ↩
糞仕様なのでそのうち取り除きたい ↩
- 投稿日:2019-08-11T15:16:20+09:00
Pythonでmongodbを操作する~その5:delete編~
当記事の記載範囲
この記事ではPythonでmongodbに接続してから、delete(SQLで言ってもdelete)の使い方について記載します。
内容としては以下になります。
- delete_one
- delete_many
mongodbの起動やpymongoのインストール方法については以下の記事をご覧いただければ幸いです。
https://qiita.com/bc_yuuuuuki/items/2b92598434f6cc320112準備データ
mongodbの準備データは以下のとおりです。
> show dbs admin 0.000GB config 0.000GB local 0.000GB test 0.000GB > show collections employee log salary > db.salary.find() { "_id" : ObjectId("5d4acf84de925ae437e2c124"), "name" : "佐藤", "salary" : 400000 } { "_id" : ObjectId("5d4acf84de925ae437e2c125"), "name" : "田中", "salary" : 500000 } { "_id" : ObjectId("5d4f814c950945628d663d95"), "name" : "加藤", "salary" : 400000 } { "_id" : ObjectId("5d4f814c950945628d663d96"), "name" : "松井", "salary" : 500000 } { "_id" : ObjectId("5d4fa35cb9899c6bedb140a1"), "name" : "山田", "salary" : 300000 } { "_id" : ObjectId("5d4fa852ee2d4a2cead2784b"), "name" : "山田", "salary" : 400000 }Pythonでdeleteを使ってみる
まずはdelete_oneから使ってみます。
delete_oneの使い方
早速、サンプルのコードです。
内容はdelete前のデータ取得、delete_one、delete後のデータ取得を行っています。MongoDeleteSample.pyfrom pymongo import MongoClient class MongoDeleteSample(object): def __init__(self, dbName, collectionName): self.client = MongoClient() self.db = self.client[dbName] self.collection = self.db.get_collection(collectionName) def find(self, projection=None,filter=None, sort=None): return self.collection.find(projection=projection,filter=filter,sort=sort) def delete_one(self, filter): return self.collection.delete_one(filter) def delete_many(self, filter): return self.collection.delete_many(filter) def find_one_and_delete(self, filter): return self.collection.find_one_delete(filter) mongo = MongoDeleteSample('test', 'salary') print('--------------------削除前--------------------') find = mongo.find() for doc in find: print(doc) print('-------------------削除情報-------------------') result = mongo.delete_one({'name':'山田'}) print(type(result)) print(result) print(result.deleted_count) print('--------------------削除後--------------------') find = mongo.find() for doc in find: print(doc)実行結果
--------------------削除前-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4f814c950945628d663d95'), 'name': '加藤', 'salary': 400000} {'_id': ObjectId('5d4f814c950945628d663d96'), 'name': '松井', 'salary': 500000} {'_id': ObjectId('5d4fa35cb9899c6bedb140a1'), 'name': '山田', 'salary': 300000} {'_id': ObjectId('5d4fa852ee2d4a2cead2784b'), 'name': '山田', 'salary': 400000} -------------------削除情報------------------- <class 'pymongo.results.DeleteResult'> <pymongo.results.DeleteResult object at 0x00000216FB2E6788> 1 --------------------削除後-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4f814c950945628d663d95'), 'name': '加藤', 'salary': 400000} {'_id': ObjectId('5d4f814c950945628d663d96'), 'name': '松井', 'salary': 500000} {'_id': ObjectId('5d4fa852ee2d4a2cead2784b'), 'name': '山田', 'salary': 400000}'name'が'山田'のデータが1件しか削除されていないことが分かります。
delete_manyの使い方
MongoDeleteSample.pyを変更する
(抜粋)MongoDeleteSample.pymongo = MongoDeleteSample('test', 'salary') print('--------------------削除前--------------------') find = mongo.find() for doc in find: print(doc) print('-------------------削除情報-------------------') result = mongo.delete_many({'name':'山田'}) print(type(result)) print(result) print(result.deleted_count) print('--------------------削除後--------------------') find = mongo.find() for doc in find: print(doc)実行結果
--------------------削除前-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4f814c950945628d663d95'), 'name': '加藤', 'salary': 400000} {'_id': ObjectId('5d4f814c950945628d663d96'), 'name': '松井', 'salary': 500000} {'_id': ObjectId('5d4faefd0d162cddb4fa19d0'), 'name': '山田', 'salary': 300000} {'_id': ObjectId('5d4faf056d857bceb9bd0c06'), 'name': '山田', 'salary': 400000} -------------------削除情報------------------- <class 'pymongo.results.DeleteResult'> <pymongo.results.DeleteResult object at 0x000001722C7CBCC8> 2 --------------------削除後-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4f814c950945628d663d95'), 'name': '加藤', 'salary': 400000} {'_id': ObjectId('5d4f814c950945628d663d96'), 'name': '松井', 'salary': 500000}'name'が'山田'のデータが2件削除されていることが分かります。
感想
Pythonからmongodbを操作する方法がこれでCRUDの一通りのやり方が分かりました。
mongodbのコマンドとパラメーターの設定内容がほとんど一緒で使いやすいなと感じました。関連記事
- 投稿日:2019-08-11T14:06:06+09:00
Python 辞書型(dictionary) まとめ
はじめに
辞書型についてうまくまとめられた記事が見当たらなかったので自分用にまとめてみました。
AtcoderをPython使ってやってます。初期化について
空の辞書型>>> d = {} >>> d {}
{string: int, string: string, int: int}
など色々入れれる値を入れて初期化>>> d = {'x': 1, 'y': 'yyyy', 1: 1000, 'list': [10, 20]} >>> d {'x': 1, 'y': 'yyyy', 1: 1000}dictメソッドを使って>>> d = dict(a=10, b=20) >>> d {'a': 10, 'b': 20}値の追加について
代入>>> d={} >>> d['x'] = 10 >>> d {'x': 10}setdefaultメソッド>>> d = {} >>> d.setdefault('x', 1000) 1000 >>> d {'x': 1000} >>> d.setdefault('x', 2000) 1000 >>> d {'x': 1000}便利なメソッド
キー、バリューの取り出し
keys()>>> d = {'x': 10, 'y': 20} >>> d.keys() dict_keys(['x', 'y'])values()>>> d = {'x': 10, 'y': 20} >>> d.values() dict_values([10, 20])要素をオーバーライド
update()>>> d = {'x': 10, 'y': 20} >>> d2 = {'x': 30, 'i': 100} >>> d {'x': 10, 'y': 20} >>> d2 {'x': 30, 'i': 100} >>> d.update(d2) >>> d {'x': 30, 'y': 20, 'i': 100}要素の取得
get()>>> d = {'x': 10, 'y': 20} >>> d['x'] 10 >>> d.get('x') 10 # keyにないものを取り出そうとするとエラーになる >>> d['z'] Traceback (most recent call last): File "<stdin>", line 1, in <module> KeyError: 'z' # getメソッドを使うとNoneが返ってくる >>> r = d.get('z') >>> type(r) <class 'NoneType'>要素を取り出しつつ削除
pop()>>> d = {'x': 10, 'y': 20} >>> d.pop('x') 10 >>> d {'y': 20}del>>> d = {'x': 10, 'y': 20, 'z': 30} >>> del d['x'] >>> d {'y': 20, 'z': 30} >>> del d >>> d Traceback (most recent call last): File "<stdin>", line 1, in <module> NameError: name 'd' is not defined辞書を空にする>>> d = {'x': 10, 'y': 20} >>> d {'x': 10, 'y': 20} >>> d.clear() >>> d {}キーがあるか判定
in>>> d = {'x': 10, 'y': 20} >>> d {'x': 10, 'y': 20} >>> 'x' in d True >>> 'z' in d Falseコピー
変数はオブジェクトへの参照を保持するため、変数代入ではコピーができない>>> d = {'x': 10} >>> d2 = d >>> d2['x'] = 20 >>> d {'x': 20} >>> d2 {'x': 20}コピーメソッドを使う>>> d = {'x': 10} >>> d2 = d.copy() >>> d2['x'] = 20 >>> d {'x': 10} >>> d2 {'x': 20}ソートメソッド
sorted>>> d = {'z': 10, 'y': 20, 'x': 30} >>> sorted(d.items()) [('x', 30), ('y', 20), ('z', 10)] >>> d {'z': 10, 'y': 20, 'x': 30} >>> d = sorted(d.items()) >>> d [('x', 30), ('y', 20), ('z', 10)]
- 投稿日:2019-08-11T13:32:30+09:00
Docker+flaskで「URL Opener」的なものを作った
はじめに
知らない人向けに説明すると、「URL Opener」とは下記のようなサイトです
使い方は
- textareaに開きたいURLを1行づつ入力する
- Open allをクリックすると、入力された複数のURLが別タブで開かれる
この時、ポップアップブロックを使用していると全て開かれないことがあるので、許可してから使用してください
Keywooとは
textareaに「URL」でなく「検索キーワード」を入力します
検索サイトはjson(後述)にあらかじめ記載するか、フォームより追加しますsrc: https://github.com/ymmmtym/keywoo
Keywooの特徴
- urlでなく検索キーワードを入力するので、毎回URLを準備する必要がなくなる
- タブを開きっぱなしにする必要がなくなる
- flaskで実装したので、軽量に動作する(無料で使えるGCPでも快適に動作します)
flaskとは
- pythonのwebフレームワークであり、軽量であることが主な特徴
- 中小規模のサイトで利用され、Djangoに次ぐ2位の利用率
動作環境
$ docker --version Docker version 18.09.2, build 6247962 $ docker-compose --version docker-compose version 1.23.2, build 1110ad01ディレクトリ構成
. ├── Dockerfile ├── README.md ├── app │ ├── data │ │ └── sites.json │ ├── run.py │ ├── static │ │ ├── favicon.ico │ │ ├── layout.js │ │ ├── main.js │ │ └── stylesheet.css │ └── templates │ ├── index.html │ ├── layout.html │ └── result.html ├── docker-compose.yml └── requirements.in各ディレクトリ説明
各ファイル説明
flaskの動作に必要なライブラリをインストール
requirements.in
flake8
scipy
matplotlib
scikit-learn
requests
beautifulsoup4
Flask
Dockerfile
FROM python:3.6 MAINTAINER ymmmtym USER root WORKDIR /root ENV HOSTNAME="keywoo-container" \ PS1="[\u@\h \W]# " ADD ["requirements.in", "/root/requirements.in"] RUN apt-get -y update && \ pip install --upgrade pip && \ pip install --upgrade setuptools && \ pip install pip-tools && \ pip-compile /root/requirements.in && \ pip-sync ADD ["app", "/root/app"] WORKDIR /root/app EXPOSE 5000 CMD ["python", "/root/app/run.py"]主にpipのインストールやreqiremets.inに記載されたpythonのライブラリの追加などを行なっています
(現在は、dockerhubにbuildしたimageをアップしたので、Dockerfile自体は使用しない)docker-compose.yml
docker build
した後でもdocker restart
で設定が反映されるように、
appディレクトリ配下は永続的にマウントされるようvolumesに記載しています。
デフォルトでportは80を使用していますversion: '3' services: keywoo: image: yumemo/keywoo container_name: keywoo-container hostname: keywoo-container tty: true volumes: - ./app:/root/app ports: - "5000:5000"app/run.py
python/app/run.py#!/usr/bin/python from flask import Flask, render_template, request, jsonify import json app = Flask(__name__) def get_toppage(str): list = str.split('/') return list[0] + '//' + list[2] app.jinja_env.globals['get_toppage'] = get_toppage app.config['JSON_AS_ASCII'] = False with open("./data/sites.json", "r", encoding="utf-8") as sites_json: search_dic = json.load(sites_json) @app.route('/', methods=["GET","POST"]) def index(): if request.method == "POST": if request.form["radio"]: global search_dic if request.form["radio"] == "delete": del_sites = request.form.getlist("check") for site in del_sites: del search_dic[site] if request.form["radio"] == "default": with open("./data/sites.json", "r", encoding="utf-8") as sites_json: search_dic = json.load(sites_json) if request.form["radio"] == "reset": search_dic.clear() if request.form["radio"] == "add": if request.form["site_name"] and request.form["url"]: search_dic.update({str(request.form["site_name"]):str(request.form["url"])}) return render_template("index.html", search_dic = search_dic) @app.route('/result', methods=["GET", "POST"]) def result(): if request.form["search"]: search_text = str(request.form["search"]) search_list = search_text.splitlines() return render_template("result.html", search_list = search_list, search_dic = search_dic) else: return render_template("index.html", search_dic = search_dic) if __name__ == '__main__': app.run(host='0.0.0.0',port=5000,threaded=True)flaskを起動するためのpythonファイルです
app/data/json
data配下にはjsonファイルを格納しています。
検索サイトのデータを保持する目的で利用しています。
大量のデータを扱うわけではないので、DBではなくjsonファイルに記載することにしましたjson/sites.json{ "Google": "https://www.google.com/search?q=", "Weblio English": "https://ejje.weblio.jp/content/", "Amazon": "https://www.amazon.co.jp/s?k=", "Rakuten": "https://search.rakuten.co.jp/search/mall/", "Yahoo Auctions": "https://auctions.yahoo.co.jp/search/search?p=", "Yahoo Auctions(record)": "https://auctions.yahoo.co.jp/search/search?auccat=22260&p=", "Spotify": "https://open.spotify.com/search/results/", "Discogs": "https://www.discogs.com/ja/search/?q=" }app/static
css,jsや画像などの静的ファイルを格納しています。
(デザインを良くするために作成しました。無くても問題なく動作はします)app/templates
htmlを格納しています。
操作方法
server側
pwd # /root/keywoo docker-compose up -dその後、下記にアクセスすると使用できます
http://localhost:5000/web側
- searchの下のtextareaに検索したい複数キーワードを1行ずつ入力
- submitをクリックするとsearch sitesにあるそれぞれ検索結果へのリンクを表示するページを返す
search sitesの設定
対象の項目にチェックをつけてapplyをクリックすると下記のような処理が行われます
delete selected sites
search sitesのテーブル内でチェックの付いているサイトを検索対象から削除するload default sites from json
jsonファイルから検索サイトを読み込むreset all sites
全ての検索サイトを検索対象から削除するadd site
Name:検索サイトの名称(任意)
URL:検索サイト(クエリ文字列までを記載する)今後は
今後は以下のような機能をつけてみたいと思います
- レイアウトの改修(レスポンシブデザインとか)
- jsonをweb上で編集
- login機能をつけてuser毎の検索サイトを保持(現在はpythonの変数に、検索サイトを辞書型で保存しているだけ)
以上です。
- 投稿日:2019-08-11T13:07:51+09:00
Pythonのこのinitでやっているのは、実行コンテキストをbindしている感じであっているのかな。
class PrefectureLandPriceLayerMapping(LayerMapping): """ 公示地価のpostgisデータ登録時にpub_dateを追加する """ def __init__(self, *args, **kwargs): self.custom = kwargs.pop('custom', {}) super(PrefectureLandPriceLayerMapping, self).__init__(*args, **kwargs) def feature_kwargs(self, feature): kwargs = super(PrefectureLandPriceLayerMapping, self).feature_kwargs(feature) kwargs.update(self.custom) return kwargs
- initが実行される
- 追加したいプロパティであるcustomをインスタンス変数にする。(親に影響が内容にpop)
- 継承元のクラスに自分とselfを渡して、コンテキスト(self)の付け変え、引数もそのまま渡す。
- 後の処理のdecoraingしたいとこにも同じように処理を書いてdecoratingみたいなことをしてやる。
親の挙動は変えずに拡張できる。
みたいな話。
- 投稿日:2019-08-11T13:04:27+09:00
【Programming News】Qiitaまとめ記事 Weekly August 2nd week, 2019 Vol.4
筆者が2019/8/4(日)~8/10(土)に気になったQiitaの記事のまとめのまとめを作成しました。日々のまとめ記事のWeekly版です。
皆様が興味のある言語や思いもよらぬハック方法をこの中から見つけられたら幸いです。
Java
- Tips
- Spring Framework
- Spring Boot
Python
- Django
- Tips
- PyAutoGUIを使ったPythonアプリをexe化するまで
- Kali LinuxにWingIDE(Python統合開発環境)インストール
- Watsonで数独を解く! Decision Optimizerを使ってみた
- pythonのパッケージ階層の話
- Gray-Scottモデルで模様のアニメーションを描く
- Expatを使ってPythonでXMLを解析する話
- PipenvでDjango開発環境をつくる
- 【Python】 Pythonで自己位置推定
- WGANgpで無限にラーメン画像生成する話
- 30行で顔認識(OpenCV)してファイル出力!
- PythonでJSONの入出力(エンコーディングもしっかりと)
- Python + Jinja2 でデータを HTML に埋め込んで JavaScript 側から利用する
- Python スタイルガイドとリンター
- Kaggle
- Tools
Ruby
Rails
- Beginner
- Tips
- RubyのRailsをMySQLでHerokuにあげてみた
- RubyOnRails/自動更新機能の実装
- RubyOnRails/インクリメンタルサーチの実装
- Rails6 のちょい足しな新機能を試す63(db:system:change 編)
- Rails 3-5 複数のデータベースに接続する方法
- Railsのerbをslimに変更する方法
- Rails6新機能 ActionText使用方法
- Rails6 のちょい足しな新機能を試す64(db:seed 編)
- RailsのStrong Parametersを一括で処理するメソッド
- Railsで開発されているOSS
- 本番環境でRailsを起動しようとした時のMysql5.7系に関するエラー
- Rails6 のちょい足しな新機能を試す65(timestamps precision編)
- Railsにdeviseを導入する方法
- deviseの日本語対応
- RailsでS3から画像やPDFファイルをダウンロードする方法
- Railsのサービスクラスを作る前に考えるべきこと
- rails font-awesome-sass導入方法
- AWS利用上のセキュリティを確保する方法
- railsでYoutube data APIを使ってみた
- gem'devise'に頼りすぎていて、sessionとcookieの概念が疎かだったので、まとめる
C#
Android
- Tips
- Androidでグラフを作ってみる
- FragmentでLiveData#observeを使うときの第一引数(Owner)には
viewLifecycleOwner
を使う- Cordova 9.0にバージョンアップする際にハマったこと
- Androidスマホの位置情報をKibanaで可視化してみる
- Android Jetpackのリリース前のバージョンを使う方法
- Android JetpackのApp Crawlerを動かしてみる
- Retrofitの返り値は
Observable
とSingle
などとはどっちがいいか- Activityを再起動させたいけど余分な画面を一枚挟まないといけない…という場合に有効なライブラリ
- AndroidX 導入手順
Swift
- Tips
- Apps
Kotlin
- Tips
Flutter
- Beginner
Fulx
JavaScript
- Beginner
- Tips
React
- Beginner
- Tips
- Apps
Node.js
- Tips
- Express
- Tools
- Apps
Vue.js
- Beginner
- Tips
Vuex
Nuxt.js
- Tips
Nest.js
Angular
jQuery
TypeScript
C#
PowerApps
ReactNative
Laravel
- Tips
C
PHP
CakePHP
Rust
Go言語
- Tips
R言語
- Tips
Scala
Haskell
Unity
Spark
Line
- Tips
- Apps
A-Frame
- Beginner
- Tips
PowerApp
Line
HTML
CSS
Sass
SQL
MySQL
- Tips
PostgreSQL
Oracle
MongoDB
SQL Server
Apache
ビッグデータ
Visual Studio Code
IntelliJ IDEA
AI
IoC
Git
- Beginner
- Tips
AWS
- Beginner
- Tips
- EC2
- AWS Lambda
- LambdaでのCloudWatch LogsからS3へのエクスポート
- Lambda×node.jsでGB単位の動画をStreamAPIでアップロードする方法
- AWS Lambdaでのbundle install
- AWS LambdaとPythonでスクレイピング処理をマイクロサービス化する
- API Gateway + LambdaでAPI Keyの値を取得する
- AWS Lambda:API GatewayとApplication Load Balancerの違い
- CloudWatchの空になったロググループ削除
- LambdaでIAMユーザーのアクセスキーを監視
- AWS Lambdaを使ってAndroid JetpackのリリースをRSSで受け取れるようにした
- Amazon S3
- Amazon MQ
- AWS CDK
- AWS CLI
- AWS SAM
- AWS Clinet VPN
- AWS CloudHSM
- AWS Certificate Manager
- AWS CloudFormation
- AWS CodeCommit
- AWS Textract
- AWS Fargate
- AWS WAF
- AWS Inspector
- DynamoDB
Azure
- Tips
Oracle Cloud
IBM Cloud
Active Directory
Cloud SQL
Cloud9
インフラ
ブロックチェーン
Ethereum
Hyperledger
セキュリティ
機械学習
自然言語処理
Network
RPA
CI
Docker
- Tips
- Dockerイメージをレジストリに登録する(ECR含む)
- Dockerを使ってKali Linuxの環境を構築した時のメモ
- Dockerでrails開発環境構築(リバースプロキシで複数アプリを構築)
- 簡単な静的ページと簡単なAPIサーバーをGoogle Cloud Runで動かす
- Docker上でLaravelを動かしてみよう(laradockなしで)
- AWS (ECS + RDS)+ CircleciによるCI/CDの理解(初学者がインプットすべき情報)~用語と概念理解編~
- Autoware(1.9.1) Docker セットアップ
- Docker for WindowsでDockerイメージを一括削除するワンライナースクリプト
- Laradockでローカルサーバを立ち上げてブラウザ表示する方法
- Docker上にDjangoとReactの環境を構築する
- macにdocker-composeでwordpressをたてる
Heroku
VirtualBox
kubernetes
OpenStack
Swagger
- Tips
AMP
OpenID
OAuth2.0
Elasticsearch
Linux
Cent OS
Windows
Mac
Redis
- Beginner
Google API
Google Apps Script
Google Cloud Platform
Google Colaboratory
Google Cloud Data Catalog
Google Drive
VB.Net
Firebase
Server Side
CSS
BootStrap
WordPress
Develop
- Beginner
- Tips
- Tools
Keras
PowerShell
Vim
Atom
awk
LaTex
Redmine
UML
- Beginner
Raspberry
- Tips
- Apps
RPA
IoT
Alexa
Line
SharePoint
VBA
ShellScript
PowerShell
Slack
Nim
Emacs
WPF
UI
Ansible
Arduino
Julia
Coral
ionic
- Tips
QRCode
OCR
EC-CUBE
- Beginner
資格
転職
更新情報
Kotlin
- Kotlin入門
Android
Java
IDE
- 投稿日:2019-08-11T12:27:55+09:00
ラズパイとMAMORIOとLINEで出退勤を記録する仕組みを作る
はじめに
ラズパイでMAMORIOの近接監視をしてLINEに状況を通知するのをやってみた。
近づいたり離れたりしたらLINEに通知が来るので出退勤の記録に使えるかもしれない。環境
2019/8/11時点
- ラズパイ:Raspberry Pi 3 Model b+
- ラズパイのOS:raspbian stretch (2019-04-08)
- Python 2.7.13
- bluez 5.50
- code-oss
- MAMORIO×2個
- iPhone6s
補足
今回の試みは前の記事と同じ状態から始めたいと思います。以下の状態です。
- ラズパイのバージョンについて
- ラズパイはここまでセットアップした状態からはじめます
- oss-code(VSCODE)入れとくと便利(無くてもOK)概要図
- げぼ(人)はMAMORIO geboを持っている
- MAMORIO checkはラズパイに貼りつけておく
- げぼが出勤してきたら管理者のLINEに通知する
- げぼがどっかに行ったら管理者のLINEに通知する
準備:LINE Notify
まずはLINEで通知する部分だけやります。
LINEにはLINE Notifyという無料のサービスがあってHTTPのリクエストからLINEメッセージを送信することができます。1)トークンの発行
パソコンからやります。(ラズパイのブラウザでもいいけど)
1-1)LINE Notifyにアクセス
https://notify-bot.line.me/ja/
1-2)右上から[ログイン]する
1-3)マイページ→[トークンを発行する]
1-4)トークンが生成されるのでメモっとく
2)通知テスト
ラズパイから以下のpyを実行します。
access_token変数にはさっきメモったトークンを代入するようしてください。
LINEに通知がきたらテストOKnotify_test.py#!/usr/bin/python # -*- coding: utf-8 -*- import requests if __name__ == '__main__': access_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxx" url = "https://notify-api.line.me/api/notify" headers = {'Authorization': 'Bearer ' + access_token} message = "テスト" payload = {'message': message} requests.post(url, headers=headers, params=payload,)実行する$ python notify_test.py
準備:BlueZのセットアップ
- MAMORIOはiBeaconです。
- iBeaconはBLEのアドバタイジングという機能で自分がいることを周囲に発信することができます。
- MAMORIOは30秒に1回程度、アドバタイジングしています。
- ラズパイにはBLEが付いているのでアドバタイズパケットを受信することができるわけですが、Pythonで取り扱うために、BlueZというモジュールを使います。
- と、いうわけで、ラズパイでMAMORIOとお話するためにBlueZをセットアップします。
- どこかにワークフォルダを作ってその中でやっといたほうがいいかもしれません。
必要パッケージのインストール$ sudo apt install bluetooth libusb-dev libdbus-1-dev libglib2.0-dev libudev-dev libical-dev libreadline-dev libdbus-glib-1-dev libbluetooth-devBlueZは上記のようなパッケージが無いようなので、以下の手順でソースを落としてきてビルドします。
BlueZのビルド$ wget http://www.kernel.org/pub/linux/bluetooth/bluez-5.50.tar.xz $ xz -dv bluez-5.50.tar.xz $ tar -xf bluez-5.50.tar $ cd bluez-5.50/ $ ./configure --enable-experimental $ make // makeが長いのでしばらく待ちます・・・ $ sudo make install以上で準備完了
PythonでiBeaconを受信する
bluezを使ってiBeaconのアドバタイズパケットを受信するコードを書きます。
bluezのAPIは手順が難しくて説明サイトもないのでコードはおまじない状態です。
ここは思考停止してinterfaces_added()
、properties_changed()
の中を実装していくことにします。注意
このソースでimportしている
bluezutils
はさっきインストールしたbluezのtest
フォルダに置いてあるbluezutils.py
を使いましょう。iBeaconのアドバタイズパケットを受信する#!/usr/bin/python # -*- coding: utf-8 -*- import dbus import dbus.mainloop.glib try: from gi.repository import GObject except ImportError: import gobject as GObject import bluezutils devices = {} // アドバタイズパケット受信したときの処理 def interfaces_added(path, interfaces): properties = interfaces["org.bluez.Device1"] if not properties: return if path in devices: devices[path] = dict(devices[path].items() + properties.items()) else: devices[path] = properties // ここら辺で自分がやりたい処理を書く // devies[path]オブジェクトがアドバタイズパケット // アドバタイズパケット受信したときの処理 def properties_changed(interface, changed, invalidated, path): if interface != "org.bluez.Device1": return if path in devices: devices[path] = dict(devices[path].items() + changed.items()) else: devices[path] = changed // ここら辺で自分がやりたい処理を書く // devies[path]オブジェクトがアドバタイズパケット if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() adapter = bluezutils.find_adapter() bus.add_signal_receiver(interfaces_added, dbus_interface = "org.freedesktop.DBus.ObjectManager", signal_name = "InterfacesAdded") bus.add_signal_receiver(properties_changed, dbus_interface = "org.freedesktop.DBus.Properties", signal_name = "PropertiesChanged", arg0 = "org.bluez.Device1", path_keyword = "path") om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = om.GetManagedObjects() for path, interfaces in objects.iteritems(): if "org.bluez.Device1" in interfaces: devices[path] = interfaces["org.bluez.Device1"] scan_filter = dict() scan_filter.update({ "Transport": "le" }) adapter.SetDiscoveryFilter(scan_filter) adapter.StartDiscovery() mainloop = GObject.MainLoop() mainloop.run()アドバタイズパケットってなに?
さっきのサンプルで
interfaces_added(),properties_changed()
でのdevies[path]オブジェクトがアドバタイズパケットって事なんですが、これが何か、という話です。このdevies[path]オブジェクトはdict型でいわゆるディクショナリーです。
詳細資料がないのでよくわからず、わかるところだけ埋めています。
ManufacturerDataが今回重要なデータです。
key 説明 AddressType ? Paired ? ServicesResolved ? Adapter ? Alias ? ManufacturerData 製造者固有データ.iBeaconの詳細情報が詰まっている Connected ? UUIDs ? Address BLEアドレス RSSI RawSignalStrengthInDBm.シグナル強度、いわゆるRSSI。数値が大きいほどシグナルが強い、近くから発信されている、ということ Trusted ? Blocked ? iBeaconをスキャンしたときの処理def scan_ibeacon(properties): address = "<unknown>" manufac = 0 # AddressとManufacturerDataをGETする for key, value in properties.iteritems(): if type(value) is dbus.String: value = unicode(value).encode('ascii', 'replace') if (key == "Address"): address = value if (key == "ManufacturerData"): manufac = value # print("address = %" % (address)) if type(manufac) is dbus.Dictionary: // manufac 中身をチェックするManufacturerDataってなに?
iBeaconはManufacturerDataに詳細情報を入れることになっているらしく、バイト配列で、MAMORIOの個体識別IDなどが入っています。
以下のフォーマットです。
index size 説明 0-1 2 Bluetooth SIGが企業に発行した識別子 2-18 16 UUID 18-19 2 Major.unsingned short 20-21 2 Minor.unsingned short 22 1 Measured Power.signed 8-bit integer 23 1 ? UUID
- 16バイトの識別子
- MAMORIOの場合は
b9407f30-f5f8-466e-aff925556b57fe6e
Major,Minor
- MAMORIOの個体識別ID
- unsingned shortでラズビアンはリトルエンディアンなので、変換の際は注意しましょう。
というわけなのでUUID,Major,MinorでMAMORIOを特定することができます。
ManufacturerDataデータをパースするdef parse_manufacture(mdata): uuid = "" major = 0 minor = 0 if( type(mdata) is dbus.Array and len(mdata) > 21): # UUID for item in mdata[2:6]: uuid = uuid + format(item,'02x') uuid = uuid + "-" for item in mdata[6:8]: uuid = uuid + format(item,'02x') uuid = uuid + "-" for item in mdata[8:10]: uuid = uuid + format(item,'02x') uuid = uuid + "-" for item in mdata[10:18]: uuid = uuid + format(item,'02x') # Major # - unsingned int(2バイト) # - リトルエンディアン majortmp = bytearray([int(mdata[19]),int(mdata[18])]) major = struct.unpack('H',majortmp)[0] # Minor # - unsingned int(2バイト) # - リトルエンディアン minortmp = bytearray([int(mdata[21]),int(mdata[20])]) minor = struct.unpack('H',minortmp)[0] return uuid,major,minor特定のMAMORIOを検知したらLINE通知をする
ここまでで
- LINE通知
- 特定のMAMORIOの検知
ができるようになりました。
さっきのソースをちょっと改造するだけです。
特定のMAMORIを検知したらLINE通知をする
特定のMAMORIを検知したらLINE通知をするMAMORIO_UUID = "b9407f30-f5f8-466e-aff925556b57fe6e" # target [0]name,[1]major,[2]minor,[3]linetoken MAMORIO_TARGETS = { ("check",11111,22222,""), ("gebo", 33333,44444,"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"), ("test", 55555,66666,"yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy") } def scan_ibeacon(properties): address = "<unknown>" manufac = 0 # AddressとManufacturerDataをGETする for key, value in properties.iteritems(): if type(value) is dbus.String: value = unicode(value).encode('ascii', 'replace') if (key == "Address"): address = value if (key == "ManufacturerData"): manufac = value # print("address = %" % (address)) if type(manufac) is dbus.Dictionary: # 特定のMAMORIOかどうかをチェックする uuid = "" major = 0 minor = 0 for mdata in manufac.values(): uuid,major,minor = parse_manufacture(mdata) if( uuid.lower() == MAMORIO_UUID.lower()): #print(" MAMORIO! - uuid = %s" % uuid) #print(" - major = %d , minor = %d" % (major,minor)) # ターゲットを検索 username = "" token = "" founds = [i for i in MAMORIO_TARGETS if i[1]==major and i[2]==minor] if(len(founds) <= 0): return username = founds[0][0] token = founds[0][3] print("%s - major = %d , minor = %d -> user = %s" % (datetime.now(),major,minor,username)) # LINE通知する access_token = token url = "https://notify-api.line.me/api/notify" headers = {'Authorization': 'Bearer ' + access_token} message = username payload = {'message': message} requests.post(url, headers=headers, params=payload,)細かい考慮
ここまでだとのアドイズパケットを検知するたびにLINE通知が来て困りますね。
いったん通知したMAMORIOの個体識別IDをメモリに保存して再通知しないようにしましょう。
そうすると、LINE通知が1回来たら二度とこなくなる・・・困った。
要するに、MAMORIOが無くなったことを検知する必要があります。
定期タイマで保存されているMAMORIOの個体識別IDをチェックして1分間アドバタイズパケットが来ていなかったら削除します。
ここの実装ですが、もう一個MAMORIOがあるんで、これをラズパイに貼りつけておいて定期タイマの代わりにして消込処理をしました(もっと賢いやり方があるね普通は)。検知したMAMORIOが60秒更新されていなければ削除するdef delete_targets(): datenow = datetime.now() delete_username ="" for key, value in targets.iteritems(): span = datenow - value #print(span) if(span.seconds>60): delete_username=key break if( len(delete_username)>0): targets.pop(delete_username) print("delete %s - span = %s " % (delete_username,span)) return delete_username実行してみる
そんなこんなで以下のような実装になりました。
- ラズパイに げぼ が近寄ってきたら(gebo用のMAMORIOが近づいてきたら)LINE通知する
- ラズパイから げぼ が離れたら(gebo用のMAMORIOが離れて60秒たったら)LINE通知する
サンプルコードをまとめたものはコチラ
https://github.com/gebogebogebo/notify_ibeacon/blob/master/source/notify_ibeacon.pyおつかれさまでした
夏休みの課題でした、ラズパイは面白い。
以下のサイトは大変参考になりました。
- 投稿日:2019-08-11T12:14:31+09:00
ディープラーニング基礎知識まとめ
もともとは、日本ディープラーニング協会 E資格対策用に自学速習資料として作っていました。ただ、ディープラーニング全般を浅く、ほどよく広く学んだ人の復習用に使えそうな気もします。
*まだ作りかけです
TODO
- VAE
- seq2seq
- アテンション機構
- GRU
- メモリネットワーク
- 参考文献
行列
- 特異値分解
A = U \Sigma V^{\mathrm{T}}確率
分布 確率密度(質量)関数 期待値 分散 離散確率分布 $f_X(x)=P(X=x)$ $\sum_{i} x_i p_i $ $\operatorname{E}[X^2]-\operatorname{E}[X]^2 $ ベルヌーイ分布 $f(x;p)=p^x(1-p)^{1-x}$ $p$ $p(1-p)$ マルチヌーイ分布 * $np_i$ $np_i (1-p_i )$ ガウス分布 $f(x)=\frac{1}{\sqrt{2\pi \sigma^2}} \exp \left( -\frac{(x-\mu)^2}{2\sigma^2} \right) $ $\mu$ $\sigma^2$ *マルチヌーイ分布の確率密度関数
f(x_1 ,\cdots ,x_k ;n,p_1 ,\cdots ,p_k )=\begin{cases} \dfrac{n!}{x_1 !\cdots x_k !} p_1^{x_1} \cdots p_k^{x_k} &\text{when } \sum_{i=1}^k x_i =n \\[1ex] 0 &\mbox{上記以外} \end{cases}条件付き確率
$$
\operatorname{P}(A\mid B)=\frac{\operatorname{P}(A \cap B)}{\operatorname{P}(B)}
$$$$
\operatorname{P}(A \cap B)= \operatorname{P}(B)\operatorname{P}(A\mid B) = \operatorname{P}(A)\operatorname{P}(B\mid A)
$$ベイズの定理
$$
\operatorname{P}(A\mid B)=\frac{ \operatorname{P}(A)\operatorname{P}(B \mid A)}{\operatorname{P}(B)}
$$情報
自己情報量
事象$E$が起こる確率を$\operatorname{P}(E)$とするとき、 事象$E$が起こったことを知らされたとき受け取る(選択)情報量$\operatorname{I}(E)$は
$$
\operatorname{I}(E) = −\log \operatorname{P}(E)
$$平均情報量(エントロピー)
$ \Omega $ を、台が有限集合である確率空間とする。$ \Omega $ 上の確率分布Pが与えられたとき、各事象$ A\in \Omega $ の選択情報量$-\log \operatorname{P}(A)$の期待値は
$$
H(P) = - \sum_{A\in\Omega} P(A) \log \operatorname{P}(A)
$$$X$ の取りうる値は 0,1 の2値。それぞれの値が生じる確率は $1−p,p$ であるから、 エントロピーは
$$
\left. H(X)= - p \log{p} - (1-p)\log{(1-p)} \right.
$$KL情報量
\begin{align} D_{\mathrm{KL}}(P\|Q) &= \sum_i P(i) \log \frac{P(i)}{Q(i)} \\ &= -\sum_x p(x) \log q(x) + \sum_x p(x) \log p(x) \end{align}機械学習
検証方法
- k-分割交差検証法
- ホールドアウト法
- 回帰問題の評価指標、相互誤差、決定係数
損失関数
クロスエントロピー
\mathrm{H}(p, q) = -\sum_x p(x)\, \log q(x) \!def cross_entropy(y, t): loss = -1 * (t * np.log(y) + (1 – t) * np.log(1 –y)) return loss行列積
行列積class MatMul(object): def __init__(self, x1, x2): self.x1 = x1 self.x2 = x2 def forward(self): y = np.dot(self.x1, self.x2) self.y = y return y def backward(self, grad): grad_x1 = np.dot(grad, self.x2.T) grad_x2 = np.dot(self.x1.T, grad) return (grad_x1, grad_x2)全結合層
class Affine: def __init__(self, W, b): self.W =W self.b = b self.x = None self.original_x_shape = None # 重み・バイアスパラメータの微分 self.dW = None self.db = None def forward(self, x): # テンソル対応 self.original_x_shape = x.shape x = x.reshape(x.shape[0], -1) self.x = x out = np.dot(self.x, self.W) + self.b return out def backward(self, dout): dx = np.dot(dout, self.W.T) self.dW = np.dot(self.x.T, dout) self.db = np.sum(dout, axis=0) dx = dx.reshape(*self.original_x_shape) # 入力データの形状に戻す(テンソル対応) return dx活性化関数
シグモイド
f(x) = \frac{1}{1+e^{- x}}$x$に関する$f(x)$の微分は、
\frac{\partial f(x)}{\partial x} = f(x)(1-f(x))class Sigmoid(object): def __init__(self): self.params, self.grads = [], [] self.out = None def forward(self, x): out = 1 / (1 + np.exp(-x)) self.out = out return out def backward(self, dout): dx = dout * (1.0 - self.out) * self.out return dxReLU
f(x) = \max(0, x) \\ \mbox{または、} \\ f(x) = \begin{cases} x & (x>0) \\ 0 & (x\leqq0) \end{cases}$x$に関する$f(x)$の微分は、
\frac{\partial f(x)}{\partial x} = \begin{cases} 1 & (x>0) \\ 0 & (x\leqq0) \end{cases}class Relu(object): def __init__(self): self.mask = None def forward(self, x): self.mask = (x<=0) out = x.copy() out[self.mask] = 0 return out def backward(self, dout): dout[self.mask] = 0 dx = dout return dxLeakyReLU
f(x) = \begin{cases} x & (x>0) \\ ax & (x\leqq0) \end{cases}$x$に関する$f(x)$の微分は、
\frac{\partial f(x)}{\partial x} = \begin{cases} 1 & (x>0) \\ a & (x\leqq 0) \end{cases}class LeakyRelu(object): def __init__(self, a): self.mask = None self.a = a def forward(self, x): self.mask = (x<=0) out = x.copy() out[self.mask] = self.a*x[self.mask] return out def backward(self, dout): dout[self.mask] = self.a*dout[self.mask] dx = dout return dxTanh
f(x) = \frac{\sinh x}{\cosh x} = \frac {e^x - e^{-x}} {e^x + e^{-x}} = \frac{e^{2x} - 1} {e^{2x} + 1}$x$に関する$f(x)$の微分は、
\frac{\partial f(x)}{\partial x} = (1-f(x)^2)class Tanh: def __init__(self): self.params, self.grads = [], [] self.out = None def forward(self, x): y = np.tanh(x) self.y = y return y def backward(self, dout): dx = dout * (1.0 - self.out**2) return dxHardtanh
f(x) = \begin{cases} -1 & (x < -1) \\ x & (-1\leqq z \leqq 1) \\ 1 & (x > 1) \end{cases}$x$に関する$f(x)$の微分は、
\frac{\partial f(x)}{\partial x} = \begin{cases} 1 & (-1\leqq z \leqq 1) \\ 0 & 上記以外 \end{cases}最適化
1次 Momentum 物理法則に準じる動きをする。進む方向と勾配方向が同じであれば移動量が大きくなる。 Nesterov AG Momentumの改良版。次の位置を予測し、その位置の勾配も加味する。 AdaGrad 学習が進むにつれ、見かけの学習率が減衰していく。パラメータ毎に見かけの学習係数が異なる。 AdaDelta AdaGradの改良版。過去の勾配情報よりも直近の勾配情報をより重視する。 RMSProp AdaDeltaの簡易版。過去の勾配情報よりも直近の勾配情報をより重視する。 Adam RMSPropとMomentumを組み合わせたような方法。 2次 ニュートン法 ヘッセ行列を使用する。 共役勾配法 共役方向によって逆ヘッセ行列の計算を効率化する。 BFGS 各ステップでのヘッセ行列の近似を改良したもの。 L-BFGS 省メモリ化したBFGS。 SGD
表記1
$$
\theta_{t+1} = \theta_t - \eta \frac{\partial L}{\partial \theta_t}
$$表記2
$$
w := w - \eta \nabla Q(w) = w - \eta \sum_{i=1}^n \nabla Q_i(w)
$$class SGD(object): def __init__(self, lr): self.lr = lr def update(self, params, grads): for key in params.key(): params[key] -= self.lr * grads[key]MomentumSGD
v_{t+1} = \alpha v_t - \eta \frac{\partial L}{\partial \theta_t} \\ \theta_{t+1} = \theta_t + v_{t+1}class MomentumSGD: def __init__(self, lr, momentum): self.lr = lr self.momentum = momentum self.v = None def update(self, params, grads): if self.v is None: self.v = {} for key, param in params.items(): self.v[key] = np.zeros_like(param) for key in params.key(): self.v[key] = self.momentum * self.v[key] - self.lr * grads[key] params[key] += self.v[key]Nesterov Accelerated Gradient
v_{t+1} = \alpha v_t - \eta \frac{\partial L}{\partial ( \theta_t + \alpha v_t)} \\ \theta_{t+1} = \theta_{t}+v_{t+1}class NesterovAG: def __init__(self, lr, momentum): self.lr = lr self.momentum = momentum self.v = None def update(self, params, grads): if self.v is None: self.v = {} for key, param in params.items(): self.v[key] = np.zeros_like(param) for key in params.key(): v_pre = self.v[key].copy() self.v[key] = v_pre * self.momentum - self.lr * grads[key] params[key] += -self.momentum* v_pre + (self.momentum+1) * self.v[key] #self.v[key] *= self.momentum #self.v[key] -= self.lr * grads[key] #params[key] += self.momentum * self.momentum * self.v[key] #params[key] -= (1 + self.momentum) * self.lr * grads[key]AdaGrad
h_{t+1} = h_t + \frac{\partial L}{\partial \theta_t} \odot \frac{\partial L}{\partial }{\partial \theta_t} \\ \theta_{t+1} = \theta_t - \eta \frac{1}{\varepsilon + \sqrt{h_{t+1}}} \odot \frac{\partial L}{\partial \theta_t}class AdaGrad: def __init__(self, lr): self.lr = lr self.h = None def update(self, params, grads): if self.h is None: self.h = {} for key, param in params.items(): self.h[key] = np.zeros_like(param) for key in params.key(): self.h[key] += grads[key] * grads[key] params[key] -= self.lr * grads[key] / (np.sqrt(self.h[key]) + 1e-7)AdaDelta
h_{t+1} = \rho h_t + (1-\rho)\frac{\partial L}{\partial \theta_t} \odot \frac{\partial L}{\partial \theta_t} \\ \varDelta\theta_t = - \frac{\sqrt{\varepsilon + r_t}}{\sqrt{\varepsilon + h_{t+1}}} \odot \frac{\partial L}{\partial \theta_t} \\ \theta_{t+1} = \theta_t + \varDelta\theta_t \\ r_{t+1} = \rho r_t + (1- \rho) \varDelta \theta_t \odot \Delta \theta_tRMSProp
- AdaDeltaの簡易版。
- 勾配の2乗の移動平均を用いて学習率を変化させていく。
- 移動平均をとると、過去の情報が少しずつ薄れていき、新しい情報が反映されやすくなる。
h_{t+1} = \rho h_t + (1 - \rho) \frac{\partial L}{\partial \theta_t} \odot \frac{\partial L}{\partial \theta_t} \theta_{t+1} = \theta_t - \eta \frac{1}{\sqrt{\varepsilon + h_{t+1}}} \odot \frac{\partial L}{\partial \theta_t}class RMSprop: def __init__(self, lr=0.01, decay_rate = 0.99): self.lr = lr self.decay_rate = decay_rate self.h = None def update(self, params, grads): if self.h is None: self.h = {} for key, param in params.items(): self.h[key] = np.zeros_like(param) for key in params.key(): self.h[key] *= self.decay_rate self.h[key] += (1 - self.decay_rate) * grads[key] * grads[key] params[key] -= self.lr * grads[key] / (np.sqrt(self.h[key]) + 1e-7)Adam
- RMSPropとMomuntumSGDを組み合わせたような方法
m_{t+1} = \rho_1 m_t + (1-\rho_1) \frac{\partial L}{\partial \theta_t} \\ v_{t+1} = \rho_2 v_t + (1-\rho_2) \frac{\partial L}{\partial \theta_t} \\ \hat{m}_{t+1} = \frac{m_{t+1}}{1-\rho_1^t} \\ \hat{v}_{t+1} = \frac{v_{t+1}}{1-\rho_2^t} \\ \\ \theta_{t+1} = \theta_t - \eta \frac{1}{\sqrt{\hat{v}_{t+1}} + \varepsilon} \odot \hat{m}_{t+1}class Adam: def __init__(self, lr=0.001, beta1=0.9, beta2=0.999): self.lr = lr self.beta1 = beta1 self.beta2 = beta2 self.iter = 0 self.m = None self.v = None def update(self, params, grads): if self.m is None: self.m, self.v = {}, {} for key, param in params.items(): self.m[key] = np.zeros_like(param) self.v[key] = np.zeros_like(param) self.iter += 1 lr_t = self.lr * np.sqrt(1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter) for key in params.key(): self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key]) self.v[key] += (1 - self.beta2) * (grads[key]**2 - self.v[key]) params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)CNN
畳み込み後のサイズ
$$
\mathrm{O}_h = \frac{H+2P-F}{S}+1
$$im2col
def im2col(input_data, filter_h, filter_w, stride=1, pad=0, constant_values=0): """ input_data : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ filter_h : フィルターの高さ filter_w : フィルターの幅 stride : ストライド pad : パディング return : 2次元配列 """ N, C, H, W = input_data.shape # 出力データ(畳み込みまたはプーリングの演算後)の形状を計算する out_h = (H + 2*pad - filter_h)//stride + 1 out_w = (W + 2*pad - filter_w)//stride + 1 # パディング処理 img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant', constant_values=constant_values) col = np.zeros((N, C, filter_h, filter_w, out_h, out_w)) # 配列を並び替える(フィルター内のある1要素に対応する画像中の画素を取り出してcolに代入する) for y in range(filter_h): #フィルターの高さ方向のループ y_max = y + stride*out_h for x in range(filter_w): #フィルターの幅方向のループ x_max = x + stride*out_w col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride] col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1) return coldef col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0, is_backward=False): """ col : 2次元配列 input_shape : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ filter_h : フィルターの高さ filter_w : フィルターの幅 stride : ストライド pad : パディング return : (データ数, チャンネル数, 高さ, 幅)の4次元配列,画像データの形式を想定 """ N, C, H, W = input_shape # 出力(畳み込みまたはプーリングの演算後)の形状を計算する out_h = (H + 2*pad - filter_h)//stride + 1 out_w = (W + 2*pad - filter_w)//stride + 1 col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2) img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1)) for y in range(filter_h): #フィルターの高さ方向のループ y_max = y + stride*out_h for x in range(filter_w): #フィルターの幅方向のループ x_max = x + stride*out_w if is_backward: img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :] else: img[:, :, y:y_max:stride, x:x_max:stride] = col[:, :, y, x, :, :] return img[:, :, pad:H + pad, pad:W + pad]LeNet
- LeNet-5は、6層構造(CNN層が5つ)のネットワーク。
- 32x32pxのパッチ画像を入力とする。
- 1998年頃にYann LeCunらによって提案された。
- 畳み込み層の考え方は、1989年頃から提案された。
AlexNet
VGG
- 2014年のILSVRCで2位になったネットワーク。
- AlexNetよりも深いネットワーク構造(16/19)。
- パラメータ数は約1億4,000万個
GoogLeNet
- 2014年のILSVRCで1位になったネットワーク。
- Inception module
- 大きな畳込みフィルタを小さな畳込みフィルタのグループで近似することで、 モデルの表現力とパラメータ数のトレードオフを改善していると言える。
- 1*1の畳み込みフィルタが使われているが、このフィルターは次元削減と等価な 効果がある。
- 小さなネットワークを1つのモジュールとして定義し、モジュールの積み重ねで ネットワークを構築する。
- Auxiliary Loss
- ネットワークの途中から分岐させたサブネットワークにおいてもクラス分類を行う。
- AuxililaryLossを導入しない場合でもBatch Normalizationを加えることにより、同様に学習がうまく進むことがある。
- アンサンブル学習と同様の効果が得られるため、汎化性能の向上が期待できる。
ResNet
- 2015年のILSVRCで1位になったネットワーク。
- Residual Block
- ブロックへの入力似これ以上の変換が必要ない場合は重みが0となり、小さな変換が求められる場合は対応する小さな変動をより見つけやすくなることが期待される。
- Identity mapping
- 層をまたがる結合のこと。このショートカット結合により、逆伝播時に購買が減衰していくことを抑えることができる。
DenseNet
- ResNetに似たショートカット結合を持つネットワーク。
- 特徴量を再利用する結合になっている。
- DenseBlockによって構成される。
R-CNN
- 物体候補領域画像をアスペクト比を保たずリサイズして、CNNで特徴を取り出す。
- 特徴抽出のCNN、カテゴリ推定のSVMなど各学習の目的ごとに別々に学習させる必要がある。
手順
- 入力画像に対して、物体が写っている領域の候補を選択的 検索法(Selective Search) で約2,000個抽出する
- CNN のインプットの大きさに合うようにそれぞれの領域 中の画像をリサイズする
- それぞれの物体領域候補に対して、 CNN (原著論文では AlexNet) で特徴マップを計算する
- それぞれの領域に何が写っているか SVM で分類する
- 物体の詳細位置を決めるために、特徴マップとバウンディ ングボックス座標を回帰させる問題を解く
Fast R-CNN
- 画像全体を入力して計算した特徴マップをすべ ての物体領域候補で再利用することで、R-CNN を高速化させたもの。
手順
- 選択的検索法を利用して物体領域候補を算出する
- 画像全体をCNNに入力して特徴マップを計算する
- 物体領域候補ごとに以下の計算を行う
- 物体領域候補に対応する特徴マップを抽出する
- 特徴マップをRoI(Region of Interest)プーリング層に渡し、次元数の異なる特徴マップを一定の大きさのベクトルに変換する
- 物体の詳細位置を決めるために、特徴マップとバウンディングボックス座標を回帰させる問題を解く
Faster R-CNN
- end-to-end学習が可能なネットワークアーキテクチャである。
- 特徴マップから物体領域候補を推定する領域提案ネットワーク(RPN)とFast R-CNNの2つのネットワークで構成される。
- 2つのネットワークで特徴マップを共有しているため、 計算が効率的。
- 矩形領域回帰の誤差関数はFast R-CNNと同じ。
- 提案領域に対してCNNで特徴マップを生成し、RoI Poolingという手法で固定サイズに変形する。
YOLO
- 画像全体をグリッドに分割(セル)し、セル毎にクラスとバウンディングボックスを求める方法。
- 検出速度は高速だが、画像内のオブジェクト同士が複数重なっている場合に上手く検出できないという欠点がある。
- セル内で出力できるクラスは1つのみ。
SSD
- YOLOと似たアルゴリズム。
- 様々な階層の出力層からマルチスケールな 検出枠を出力できるように工夫されている。
- NNでは、出力層に近いほど特徴マップのサイズが小さくなるので、それぞれの層の 特徴マップには様々なサイズの物体を検出できる情報があるはずである。つまり、出力層に近い特徴マップほど大きな物体を検知しやすくなり、入力層に近い特徴マップほど小さな物体を検知しやすくなると考えられる。
FCN
- プーリング層では、特徴マップはダウンサンプリングされる。
- プーリング層の後、逆畳込みを用いてアップサンプリングを施すことで、出力される画像サイズを大きくする。
- 損失は、ピクセルごとのクロスエントロピー誤差の合計。
- ダウンサンプリングされた特徴マップに対して単純に逆畳み込みを適用するだけでは、出力結果が粗くなってしまうため、スキップ接続を用いて情報ロスが発生する前の情報をアップサンプリング処理に入力する。
SegNet
- エンコーダ(encoder)部分とデコーダ(decoder)部分で構成されるセマンティックセグメンテーションのためのネットワーク。
- FCNのようにプーリング前の特徴をアップサンプリング層にコピーするのではなく、 エンコーダ部分のマックスプーリング層で採用した値の場所を記録しておき、デコーダ部分のアップサンプリング時にその場所を使って特徴マップを拡大する。
![]()
U-Net
- 横向きのパスは畳み込み
- 下向きのパスはmax-pooling
- 上向きのパスはup-conv (逆畳み込み)
- 最終的な特徴マップを1x1 convで セグメンテーションマップに変換
- FCNでは入力側の中間表現を出力側に反映する際、チャンネルごとの足し算を行っていたが、U-Netでは足し算ではなく”連結”することで入力を位置情報を保持している。
GAN
\min_G \max_D V(G,D) = \mathbb{E}_{x \sim p_{data}(x)} [\log D(x)] + \mathbb{E}_{z \sim p_{z}(z)} [\log (1 - D(G(z)))]オートエンコーダ
- 特徴抽出器(次元圧縮)、ノイズ除去、階層型NNの初期パラメータの取得、復元誤差を利用した異常検知などに利用される。
- 入力層(エンコーダ)と復元層(デコーダ)に同じデータを入れて学習を行う。
- スパース自己符号化器は、自己符号化器に正則化項を加えて学習させる方法。
- 雑音除去自己符号化器は、ランダムなノイズを加えたデータを入力層に入れる方法。復元層から出てくるデータがノイズを加える前のデータに近くなるように学習させる。
- 変分自己符号化器(VAE)は、データが生成するメカニズムを仮定する(生成モデル)。
VAE
- 自己符号化器と同じように、入力を再現する学習を行う。
- データが生成するメカニズムを仮定する点が自己符号化器と異なる。
洗剤変数に確率分布(ガウス分布など)を仮定する。
Reparameterization trick
- サンプリングで誤差逆伝播法を使えないことを解決するための方法
VAEを最尤推定問題として定式化すると以下になる。
\mathcal{L}(q) = \mathbb{E}_{z\sim q(z|x)}\log p_{model}(x|z) - D_{KL}(q(z|x)||p_{model}(z)) \\ \mathbb{E}_{z\sim q(z|x)}\log p_{model}(x|z) = \sum^D_{i=1} x_i \log y_i + (1-x_i)\cdot \log(1-y_i) \\ D_{KL}(q(z|x)||p_{model}(z)) = D_{KL}(N(\mu, \Sigma)||N(O,I)) = - \frac{1}{2} \sum^J_j(1+\log(\sigma_j^2)-\mu_j^2-\sigma_j^2)損失関数は以下になる。
L=-\sum^D_{i=1}x_i \log y_i + (1-x_i) \cdot \log(1-y_i) - \frac{1}{2}\sum_j^J(1+\log(\sigma_j^2)-\mu_j^2-\sigma_j^2)制約ボルツマンマシン
RNN
LSTM
- 出力ゲート(Output gate)
- 取り出した記憶について、各要素を弱める度合いを決めるゲート
- 忘却ゲート(Forget gate)
- 記憶セルの各要素を弱める(忘れる)度合いを決めるゲート
- 入力ゲート(Input gate)
- 記憶セルに追加されるGの各要素を弱める割合を決めるゲート
- 記憶セル(Memory cell)
- 過去の情報を記録する部分
\begin{align} &F = \sigma \left( X^tW_x^{(f)} + H^{t-1}W_h^{(f)}+B^{(f)}\right) \\ &G = \operatorname{tanh} \left( X^tW_x^{(g)} + H^{t-1}W_h^{(g)}+B^{(g)}\right) \\ &I = \sigma \left( X^tW_x^{(i)} + H^{t-1}W_h^{(i)}+B^{(i)}\right) \\ &O = \sigma \left( X^tW_x^{(o)} + H^{t-1}W_h^{(o)}+B^{(o)}\right) \\ &H^t = O \odot \operatorname{tanh} C^t \\ &C^t = G \odot I + F \odot C^{t-1} \end{align}GRU
seq2seq
テンション機構
メモリネットワーク
強化学習
- 動的計画法 ・・・環境のモデルが既知の場合にベルマン方程式を解いて最適な方策を得る
- 方策反復法
- 価値反復法
- モンテカルロ法 ・・・ 実際に思考を繰り返して得られた報酬から価値を推定する
- TD学習 ・・・ 実際の報酬から方策関数や価値関数を改善しながらベルマン方程式を解く
- Sarsa
- Q学習
- DQN
- 方策勾配法
- マルチステップ学習法
- Actor-Critic
A (Long) Peek into Reinforcement Learningベルマン方程式
価値観数を再帰的に定義し期待値を展開したものをベルマン方程式という。ベルマン方程式が成り立つためには、マルコフ決定過程である必要がある。
状態価値に関するベルマン方程式
\begin{align} V^\pi(s) &= \mathbb{E}^\pi[G_t|s_t=s] \\ &= \mathbb{E}^\pi \left[\gamma \sum^{\infty}_{k=0} \gamma^kr_{t+1+k}|S_t=S\right] \\ &= \mathbb{E}^\pi \left[ r_{t+1} + \gamma \sum^{\infty}_{k=0} \gamma^kr_{t+2+k}|S_t=S\right] \\ &= \sum_a\pi(a|s)\sum_{s'}P(s'|s,a)[r(s,a,s')+\gamma V^{\pi}(s')] \end{align} \boxed{ \begin{align} \pi(s,a) & \mbox{:方策関数} \\ P(s'|s,a) & \mbox{:状態遷移関数} \\ r(s,a,s') & \mbox{:報酬関数} \end{align} }行動価値関数に関するベルマン方程式
\begin{align} Q^{\pi}(s,a) &= \mathbb{E}^\pi[G_t|s_t=s, a_t=a] \\ &= \mathbb{E}^\pi \left[ \sum^\infty_k \gamma^k r_{t+1+k}| s_t=s,a_t=a \right] \\ &= \mathbb{E}^\pi \left[ r_{t+1} + \sum^\infty_k \gamma^k r_{t+2+k}|s_t=s, a_t=a \right] \\ &= \sum_{s'} P(s'|s,a)[r(s,a,s')+\gamma V^\pi (s')] \end{align} \\ V^\pi (s) = \sum_a \pi(a|s)Q^\pi(s,a) \mbox{より、}\\ Q^\pi(s,a)=\sum_{s'}P(s'|s,a)[r(s,a,s')+\gamma \sum_a \pi(a'|s')Q^\pi(s',a')]DQN
- 強化学習に深層学習を適用し成功した例
- ゲームの画面自体を入力とするCNNモデルを行動価値関数(Q関数)として利用している。
- CNNでは、入力として状態情報(ある時点におけるゲーム画面の画像)を受け取り、そのとき取りうる各行動の価値を出力する。
- NNで出力される行動価値が真の行動価値に近づくように学習する。
- Experience Replay
- 時系列データは、近い時間のデータが似通ってしまうため、相関性の強い系列となる。
- 強い相関性を持つ入力系列に対して学習を行うと、直近の入力に引きずられてパラメータが修正されるため、過去の入力に対する推定が悪化し、収束性が悪くなる。
- エージェントが経験した状態・行動・報酬・遷移先は一旦メモリに保存される。 損失の計算を行う際には、メモリに保存した値からランダムサンプリングを行う。
L(\theta) = \mathbb{E}_{s,a,r,s'\sim D} \left[ \left( r + \gamma \max_{a'}Q(s',a';\theta) -Q(s,a;\theta) \right)^2 \right]\\ \boxed{ \begin{align} \mathbb{E}_{s,a,r,s'\sim D} & \mbox{:ランダムサンプリングされたデータを用いて学習する。}\\ r + \gamma \max_{a'}Q(s',a';\theta) & \mbox{:目標値(教師信号)}\\ Q(s,a;\theta) & \mbox{:NNの出力} \end{align} }
- Target Q-Networkの固定
- 学習の目標値(教師信号)算出に用いるネットワークと、行動価値Qの推定に用いるネットワークが同じ場合、行動価値関数を更新すると目標値(教師信号)も変化してしまい、学習が不安定になる。
- 学習の目標値の計算に用いる価値関数を固定する。一定期間の学習の間、目標値の計算に用いる価値関数ネットワークのパラメータを固定し、一定周期でこれを 更新することで学習を安定させる。
$r + \gamma \max_{a'}Q(s',a';\theta^-)$ ・・・過去のある時点のパラメータ$\theta^-$を使って行動価値関数$Q$が最大になる行動$a$を求める。
報酬のクリッピング
- ゲームの種類によって得点の範囲が異なるため、報酬のスケールがゲームによって異なる。このままだと学習が不安定になる。
- 報酬の値を[-1, 0, 1] の3通りに制限することで学習を安定させる。
Huber損失
- 誤差が大きい場合に二乗誤差を使用すると誤差関数の出力が大きくなりすぎて学習が安定しづらい。
\begin{equation} L_\delta (a) = \begin{cases} \frac{1}{2}a^2 & |a|\leqq 1 \\ \delta (|a| - \frac{1}{2}\delta) & \text{上記以外} \end{cases} \end{equation}参考
https://ja.wikipedia.org/w/index.php?title=%E7%A2%BA%E7%8E%87%E7%9A%84%E5%8B%BE%E9%85%8D%E9%99%8D%E4%B8%8B%E6%B3%95
https://en.wikipedia.org/wiki/Autoencoder
- 投稿日:2019-08-11T11:53:29+09:00
Pythonでmongodbを操作する~その4:insert編~
当記事の記載範囲
この記事ではPythonでmongodbに接続してから、insert(SQLで言ってもinsert)の使い方について記載します。
内容としては以下になります。
1. insert_one
2. insert_manymongodbの起動やpymongoのインストール方法については以下の記事をご覧いただければ幸いです。
https://qiita.com/bc_yuuuuuki/items/2b92598434f6cc320112準備データ
mongodbの準備データは以下のとおりです。
> show dbs admin 0.000GB config 0.000GB local 0.000GB test 0.000GB > show collections employee log salary > db.salary.find() { "_id" : ObjectId("5d4acf84de925ae437e2c124"), "name" : "佐藤", "salary" : 400000 } { "_id" : ObjectId("5d4acf84de925ae437e2c125"), "name" : "田中", "salary" : 500000 } { "_id" : ObjectId("5d4b81a4de925ae437e2c126"), "name" : "山田", "salary" : 500000 }Pythonでinsertを使ってみる
まずはinsert_oneから使ってみます。
insert_oneの使い方
早速、サンプルのコードです。
内容はinsert前のデータ取得、insert_one、insert後のデータ取得を行っています。MongoInsertSample.pyfrom pymongo import MongoClient class MongoInsertSample(object): def __init__(self, dbName, collectionName): self.client = MongoClient() self.db = self.client[dbName] self.collection = self.db.get_collection(collectionName) def find(self, projection=None,filter=None, sort=None): return self.collection.find(projection=projection,filter=filter,sort=sort) def insert_one(self, document): return self.collection.insert_one(document) def insert_many(self, documents): return self.collection.insert_many(documents) mongo = MongoInsertSample('test', 'salary') find = mongo.find() print('--------------------登録前--------------------') for doc in find: print(doc) print('-------------------登録情報-------------------') result = mongo.insert_one({'name':'加藤','salary':400000}) print(type(result)) print(result) print(result.inserted_id) print('--------------------登録後--------------------') find = mongo.find() for doc in find: print(doc)実行結果
--------------------登録前-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4b81a4de925ae437e2c126'), 'name': '山田', 'salary': 500000} -------------------登録情報------------------- <class 'pymongo.results.InsertOneResult'> <pymongo.results.InsertOneResult object at 0x000001E7054CD748> 5d4f74cb42f88d7822517a76 --------------------登録後-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4b81a4de925ae437e2c126'), 'name': '山田', 'salary': 500000} {'_id': ObjectId('5d4f74cb42f88d7822517a76'), 'name': '加藤', 'salary': 400000}insert_oneの戻り値として’pymongo.results.InsertOneResult'のクラスが戻ってくるようです。
このクラスには登録したdocumentsの主キー(_id)を取得できるようです。insert_manyの使い方
MongoInsertSample.pyを一部変更します。
(抜粋)MongoInsertSample.pymongo = MongoInsertSample('test', 'salary') find = mongo.find() print('--------------------登録前--------------------') for doc in find: print(doc) print('-------------------登録情報-------------------') result = mongo.insert_many([{'name':'加藤','salary':400000},{'name':'松井','salary':500000}]) print(type(result)) print(result) print(result.inserted_ids) find = mongo.find() print('--------------------登録後--------------------') for doc in find: print(doc)実行結果
--------------------登録前-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4b81a4de925ae437e2c126'), 'name': '山田', 'salary': 500000} -------------------登録情報------------------- <class 'pymongo.results.InsertManyResult'> <pymongo.results.InsertManyResult object at 0x00000249F8ABCD08> [ObjectId('5d4f814c950945628d663d95'), ObjectId('5d4f814c950945628d663d96')] --------------------登録後-------------------- {'_id': ObjectId('5d4acf84de925ae437e2c124'), 'name': '佐藤', 'salary': 400000.0} {'_id': ObjectId('5d4acf84de925ae437e2c125'), 'name': '田中', 'salary': 500000.0} {'_id': ObjectId('5d4b81a4de925ae437e2c126'), 'name': '山田', 'salary': 500000} {'_id': ObjectId('5d4f814c950945628d663d95'), 'name': '加藤', 'salary': 400000} {'_id': ObjectId('5d4f814c950945628d663d96'), 'name': '松井', 'salary': 500000}注意すべき点は特にないかなと思います。
強いて言えば、insert_oneとinsert_manyの戻り値のクラスが異なります。
登録した情報を取得する時の取得の仕方が少し異なります。
- inserted_id
- inserted_ids感想
updateと同様にinsertもmongodbのコマンドとパラメーターの設定方法が同じで使いやすいです。
関連記事
- 投稿日:2019-08-11T11:49:06+09:00
配列の範囲指定をうっかり間違えたために本当にあった怖い話
要点
- Rubyの配列の要素取り出しは
ary[p..q]
とary[p...q]
という表記があって、Python のary[p:q]
に相当するのは後者。- オーディオサンプリング周波数 44100 を 4 倍して 1 足すと素数になる (←知ってました?)。
- FFT の一番広く使われている Cooley-Tukey のアルゴリズムはデータを因数分解した数に分けて細切れにしていく。
以上で何がいいたいかわかった方はそれで十分です。気をつけましょう…
本当にあった怖くて恥ずかしい話
Python の numpy で 4 s のオーディオ信号を FFT するプログラムがありました。
ぼくは Python が嫌いなので、それを mrkn 氏の素晴らしい Pycall (https://github.com/mrkn/pycall.rb) を使って Ruby に移植しました。その際、Python で 4 s 切り出す
data[ptr:ptr + 44100 * 4]
という感じの部分を、うっかりdata[ptr..(ptr + 44100 * 4)]
と書いてしまった のです。もちろん正解は..
ではなく...
です。その結果 謎のフリーズ。いや実際は単に計算に時間がかかっていただけなんですが、10 s 以上も固まれば、そりゃ
^C
押しますって。そして何は原因なんだろうpycallかな、などとあらぬ疑いを頓珍漢な方向に向けたりして リアルに3-4時間くらい時間を無駄にしました。(恥ずかしい。)衝撃の再現コード
巷では Python の方が人気と聞くので Python のみで動くように書きました。実行すれば驚きを体験できます。
#!/usr/bin/python3 import numpy as np fs = 44100 duration = 4 data = np.random.randn(fs * 10) # dummy data print("test 1") y = np.fft.fft(data[0:fs * duration]) print("test 2") y = np.fft.fft(data[0:fs * duration + 1])本当はいろいろ Ruby で書きたいんだけど numpy とか keras とかがないから必要悪と割り切って Python を使っている人は pycall.rb (やそれを使った numpy.rb) を使いましょう!上記のコードならこんな↓感じにほぼベタ移植できます。すごい > pycall
#!/usr/bin/ruby require "numpy" # gem install numpy fs = 44100 duration = 4 data = Numpy.random.randn(fs * 10) # dummy data puts "test 1" y = Numpy.fft.fft(data[0...fs * duration]) puts "test 2" y = Numpy.fft.fft(data[0..fs * duration])おわりに
- Ruby は文法がきれいで書きやすくて好きなんですが、まさかこういうハマり方をするとは思ってもいませんでした。自分が悪いんですけど。
- $44100\times4+1$ という大きめの覚えやすい素数が手に入ったのは収穫。ちなみに $44100\times4-1 = 419\times 421$ も ふたつの素数の積なので注意。
- 投稿日:2019-08-11T11:25:32+09:00
サーバレスでRSSを取得しDynamoDBに登録する30行
とあるサイトの情報をRSSで取得し、DynamoDBに突っ込むだけの処理。最初、ライブラリとかをアップロードしないとできないかなと思ったが、標準ライブラリだけで大丈夫だった。CloudWatch Eventsから起動して使う。
import urllib.request import xml.etree.ElementTree as ET import boto3 import json from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('xxxxxxxx') def lambda_handler(event, context): cnt = 0 url = 'https://note.mu/usop/m/m79fbb598dd82/rss' with urllib.request.urlopen(url) as response: html = response.read() root = ET.fromstring(html) for item in root.iter('item'): link = item.find('link').text res = table.scan( FilterExpression=Attr('url').eq(link) ) if res["Count"] == 0: resall = table.scan() cnt = resall["Count"] table.put_item(Item={"id":cnt+1,"url":link}) return { 'count': cnt }
- 投稿日:2019-08-11T10:53:03+09:00
CloudWatch Logs Insights の分析結果をLambdaで監視する
はじめに
CloudWatch Insightsの分析結果を監視するLambdaを作成しました。
CloudWatch Logs Insightsとは
CloudWatch Logs Insightsとは、2018年のre:Inventで発表されたサービスです。
詳細に関しては、以下のサイトをご覧いただければと思いますが、一言で説明すればCloudWatch Logs内のログデータを簡単に集計・分析できるサービスです。
ロググループに対する専用のクエリ言語がサポートされており、そのクエリを投げることで簡単にログの集計・分析が行え、さらにグラフを作成して結果を可視化することもできます。これらの分析結果をCloudWatchダッシュボードに追加することもできます。
新機能 – Amazon CloudWatch Logs Insights – 高速でインタラクティブなログ分析
CloudWatch Logs Insights を使用したログデータの分析やったこと
今回、CloudWatch Insightsで得られた分析結果を監視するLambdaを作成しました。
具体的には、APIGatwayのアクセスログのステータスコードに400以上のものがある場合にSlackに通知する
ということを行いました。
注:「LambdaでInsightsの分析結果を使う」ことをしたかったため、そのままCloudWatchのメトリクスを使えばもっと簡単にできます。以下、構成図です。
APIGatewayのアクセスログをCloudWatch Logsに吐き出し、
LambdaでCloudWatch Logs Insightsのクエリを実行し分析結果を取得、
その分析結果の内容にしたがってSlackに通知するということを行います。
なお、2019年8月現在、Insightsの分析結果をCloudWatch Alermのメトリクスに指定することはできなかったため、LambdaはCloudwatch Eventsによって5分ごとに動かすようにしました。
事前準備
Lambdaを作成する前に、いくつかの事前準備を行います。
SlackWebhookのURLを取得する。
Slack通知を行う場合、SlackWebhookのURLを取得しておいてください。
IAMロールの作成
今回使用するIAMロールは以下の2つあるので、それらを作成します。
1.APIGateway用IAMロール
後述しますが、APIGatewayはデフォルト状態ではアクセスログをCloudWatch Logsに書き出す仕様ではないためその設定を行う必要があります。その際、APIGatewayがCloudWatch Logsにログを書き込む権限が必要となるためそのIAMロールを作成します。
デフォルトで存在するポリシー「AmazonAPIGatewayPushToCloudWatchLogs」をアタッチしたロールを作成し、ARNをコピーしておいてください。2.Lambda用IAMロール
Lambdaに割り当てるロールを作成します。今回、CloudWatchLogsへのアクセス権限が必要なため、以下のIAMポリシーをアタッチしたロールを作成しました。IAMポリシー{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:*" ], "Effect": "Allow", "Resource": "*" } ] }ロググループの作成
APIGatewayのアクセスログを書き出すためのCloudWatch Logsのロググループを作成しておきます。
APIGatewayの設定
APIGatewayの対象APIのアクセスログの設定を行います。デフォルト状態では、アクセスログは取得しない設定になっているためアクセスログをCloudWatch Logsに吐き出す設定を行います。
サービス > APIGateway > 設定
でARNにAPIGatway用に作成したIAMロールのARNを設定します。アクセスログを取得するAPIを選択 > ステージを選択 > ログのトレース
CloudWatch ログを有効化し、アクセスログを有効化、ログの形式を設定 (今回は、JSONにしました)。これでAPIGatwayの設定は完了です。
Lambdaの設定
環境変数
今回、以下の環境変数を設定しました。
CHANNEL_LIST :通知するSlackのチャンネル名をカンマ区切りで設定。
LOGGROUP_NAMES : 監視するCloudWatchLogsのロググループをカンマ区切りで設定。Insightsの結果をLambdaで監視するimport os import datetime import time import ast import slackweb import boto3 at_color = "danger" client = boto3.client('logs') channel_list = os.environ.get('CHANNEL_LIST').split(',') logGroupNames = os.environ.get('LOGGROUP_NAMES').split(',') slack_api = "Slack WebhookのURLを記載" def lambda_handler(event, context): five_minutes_ago = datetime.datetime.now() - datetime.timedelta(minutes = 5) startTime = five_minutes_ago.replace(second = 0, microsecond = 0) endTime = startTime + datetime.timedelta(minutes = 5) - datetime.timedelta(milliseconds = 1) queryString = 'fields @timestamp, @message | sort @timestamp desc | filter (status >= 400)' for logGroupName in logGroupNames: error_logs = results(logGroupName, startTime, endTime, queryString) if len(error_logs) > 0: for error_log in error_logs: dict_message = ast.literal_eval(error_log[1]['value']) text = str(logGroupName) + '\n' + str(dict_message) for channel in channel_list: try: post_slack(channel,text) except Exception as e: post_slack(channel,text) def results(logGroupName, startTime, endTime, queryString): start_query_res = client.start_query( logGroupName=logGroupName, startTime=int(startTime.timestamp()), endTime=int(endTime.timestamp()), queryString=queryString ) queryId = start_query_res['queryId'] get_results_res = client.get_query_results( queryId=queryId ) while get_results_res['status'] == 'Running': #or 'Scheduled': time.sleep(5) get_results_res = client.get_query_results( queryId=queryId ) return get_results_res['results'] def post_slack(channel,text): slack = slackweb.Slack(url=slack_api) slack.notify( username="API-accesslog", channel=channel, attachments = [{ "color" : at_color, "text" : text }] )参考サイト
- 投稿日:2019-08-11T10:35:57+09:00
Pythonでzipファイル解凍後の文字化け解消
「ファイル名に日本語つかわないでよー
」
音声ファイルを取引先が送ってきました。zipで圧縮されていたので解凍しようとしたらめちゃんこ困ったことになりました。ファイル名がことごとく文字化けしてるんです。ファイル名がわからないと何の音源がわからない。
ファイル名に日本語を使わなければこんなことにはならないと思うんです。Qiita読者諸兄なら、取引先とのやり取りには英数字のファイル名を使ってると思います。しかし世間にはそういう認識ってまだまだ足りてないのかなと。
「文字化けしてるんで圧縮しなおしてください」って頼むとまた時間がかかるし、私達には頼れる存在、Pythonがついています!
Pythonをつかって正しいファイル名で解凍してみます(Python3)
import zipfile SRC = '解凍したいzipファイル' DST = '解凍したファイルを置きたい場所' with zipfile.ZipFile(SRC) as z: for info in z.infolist(): info.filename = info.filename.encode('cp437').decode('utf-8') z.extract(info, path=DST)私の今回のケースの場合、文字コード
cp437
を使えば正しいファイル名に変換できました。ここは先方の使っている環境によって変わってくるかもしれません以上、ちょっとしたコツですが、お役に立てば幸いです。くれぐれも、やり取りは英数字のファイル名で
- 投稿日:2019-08-11T10:28:06+09:00
相関行列の確率分布「LKJ分布」の可視化
ベイジアンモデリングをしていると、多変量正規分布の分散共分散行列の事前分布を設定したいというシチュエーションがある。
普通の(単変量)正規分布の標準偏差の事前分布を設定する場合は、パラメータの条件としては正の値を取ることだけなので逆ガンマ分布、半コーシー分布、半正規分布などの正の値しか取らない確率分布を適当に設定すれば良い。
一方、共分散行列の事前分布を設定する場合、半正定値行列を返すような確率分布ではならない。半正定値行列とは固有値が全て0以上の行列のことである。共分散行列がなぜ必ず半正定値行列なのかについての証明はこちらのページが参考になる。
分散共分散行列の事前分布としては逆ウィシャート分布が挙げられるが、PyMC3のドキュメントいわく、moden Bayesian computational methodには適していないらしい。その代わり、相関係数の事前分布である「LKJ分布」を用いることが推奨されている。
LKJ分布の確率密度関数は下記の通り。LKJというのは論文の著者3名のイニシャルらしい。
$$f(\Sigma;\eta)=Z(\eta)\dot \rm{det} (\Sigma)^{\eta-1}$$
パラメータ$\eta$は正値であり、PyMC3, Stanではshape parameter, TensoFlow Probability ではconcentration parameterと呼ばれている。また、$Z(\eta)$は正規化定数($f$の全範囲に渡る積分が1になるよう調整するための定数)である。
$\eta$=1の場合はUniform distributionとなる。また、$\eta$が大きいほど変数間の相関が小さくなり、$\eta$が小さいほど相関が大きくなる。
...と言われてもいまいちイメージが湧かない。そこで相関行列を可視化することを考える。
例えば、
$$\mu=(0,0),\quad \Sigma = \begin{pmatrix}1.0 & 0.8 \\ 0.8 & 1.0 \end{pmatrix}$$
とした二変量正規分布からサンプルを取得し、グレーの点で表示すると下図のようになる。こちらは相関係数0.8と、2変数の相関の高い例である。このグレーの点の分布の形状を表すために、95%信頼区間を赤線で表している。
このように、LKJ分布から得られたサンプル$\Sigma$を用いた二変量正規分布について、赤い楕円をプロットしていけば、相関行列を可視化することができる。
以下に、平均(0, 0)、共分散行列をLKJ分布からのサンプルとした二変量正規分布の95%信頼区間を表示する。$\eta=0.1, 1.0, 10$のLKJ分布から相関行列をそれぞれ50個サンプリングし、重ね書きしている。
$\eta=0.1$の場合は斜めを向いた楕円が多く、相関係数の絶対値が大きい行列が得られやすいことが分かる。$\eta=1$の場合はUniform distributionなので、相関が高いケースから低いケースまで均等にサンプルが得られている。また、$\eta=10$の場合は真円に近く、相関係数が低い相関行列が得られやすいことが分かる。
なお、通常はLKJ分布から相関行列を直接サンプリングすることはなく、コレスキー分解した行列Lをサンプリングする。コレスキー分解では下三角行列$L$とその転置行列の積で共分散行列を表す。
多変量正規分布の計算では共分散行列$\Sigma$の逆行列が必要になるが、その際、コレスキー分解した行列$L$を用いた方が計算が早く数値的にも安定するらしい。
Stanのドキュメントにも相関行列のサンプリングは「実行速度が遅く、メモリ消費量が増え、数値上のエラーが発生する危険が高いコードを書きたいときに使う」との記載がある。笑PyMC3, Stan, TFPでLKJ分布からコレスキー分解された相関行列(共分散行列)を得る方法をまとめてみた。
フレームワーク 名称 備考 PyMC3 pymc3.distributions.multivariate.LKJCholeskyCov sd_dist引数に標準偏差の確率分布を与えることで、共分散行列のサンプリングが可能。 Stan lkj_corr_cholesky TFP tfp.distributions.LKJ コレスキー分解した行列を得るためには、input_output_cholesky=Trueとする必要がある。 また、共分散行列の話がいつの間にか相関行列に変わっているが、例えば二変量の場合、両者は下記のように変換できる。ここで、$\sigma_1, \sigma_2$は各変数の標準偏差、$\sigma_{12}$は共分散、$\rho_{12}$は相関係数である。
$$\begin{pmatrix} \sigma_1^2 & \sigma_{12} \\ \sigma_{12} & \sigma_2^2 \end{pmatrix}=\begin{pmatrix}\sigma_1 & 0 \\ 0 & \sigma_2 \end{pmatrix} \begin{pmatrix}1.0 & \rho_{12} \\ \rho_{12} & 1.0 \end{pmatrix} \begin{pmatrix}\sigma_1 & 0 \\ 0 & \sigma_2 \end{pmatrix}$$
つまり、相関行列に加えて、標準偏差もサンプリングしておくことで、任意の共分散行列が表せることになる。PyMC3のLKJCholeskyCovはこの操作をまとめて行ってくれるので便利そう。
可視化に用いたコードはこちら。TensorFlow Probabilityを使用している。
visualize_LKJ.pyfrom matplotlib.patches import Ellipse from matplotlib import pyplot as plt import seaborn as sns import numpy as np from scipy import stats import tensorflow as tf import tensorflow_probability as tfp sns.set_context('talk') # draw 95% confidence interval of multivariate normal distribution def draw_ellipse(cov, ax): var, U = np.linalg.eig(cov) angle = 180. / np.pi * np.arctan(U[0, 0]/ U[1, 0]) e = Ellipse(np.zeros(2), 2 * np.sqrt(5.991 * var[0]), 2 * np.sqrt(5.991 * var[1]), angle=angle, facecolor='none', edgecolor='r', linewidth=0.5, alpha=0.4) ax.add_artist(e) ax.set_xlim(-4, 4) ax.set_ylim(-4, 4) # visualize samples from LKJ prior def visualize_LKJ(eta, n_sample, ax): dist = tfp.distributions.LKJ(dimension=2, concentration=eta) sample = dist.sample(sample_shape=n_sample, seed=42) with tf.Session() as sess: cov_samples = sess.run(sample) for cov in cov_samples: draw_ellipse(cov, ax) ax.set_title(f'eta={eta}') eta_list = [0.1, 1.0, 10] fig, axes = plt.subplots(1, 3, figsize=(12, 4)) for i, eta in enumerate(eta_list): visualize_LKJ(eta, 50, axes[i]) plt.tight_layout() # plt.savefig('LKJ_sample.png')